手把手教你搭建一個 Python 連接數(shù)據(jù)庫,快速取數(shù)工具

大家好,我是安果!
在數(shù)據(jù)生產(chǎn)應(yīng)用部門,取數(shù)分析是一個很常見的需求,實際上業(yè)務(wù)人員需求時刻變化,最高效的方式是讓業(yè)務(wù)部門自己來取,減少不必要的重復(fù)勞動,一般情況下,業(yè)務(wù)部門數(shù)據(jù)庫表結(jié)構(gòu)一般是固定的,根據(jù)實際業(yè)務(wù)將取數(shù)需求做成sql 腳本,快速完成數(shù)據(jù)獲取---授人以漁的方式,提供平臺或工具
基于底層數(shù)據(jù)來開發(fā)不難,無非是將用戶輸入變量作為篩選條件,將參數(shù)映射到 sql 語句,并生成一個 sql 語句然后再去數(shù)據(jù)庫執(zhí)行
最后再利用 QT 開發(fā)一個 GUI 界面,用戶界面的點擊和篩選條件,信號觸發(fā)對應(yīng)按鈕與綁定的傳參槽函數(shù)執(zhí)行
具體思路:
一、數(shù)據(jù)庫連接類
此處利用 pandas 讀寫操作 oracle 數(shù)據(jù)庫
二、主函數(shù)模塊
1)輸入?yún)?shù)模塊,外部輸入條件參數(shù),建立數(shù)據(jù)庫關(guān)鍵字段映射
--注:讀取外部 txt 文件,將篩選字段可能需要進(jìn)行鍵值對轉(zhuǎn)換
2)sql 語句集合模塊,將待執(zhí)行的業(yè)務(wù) sql 語句統(tǒng)一存放到這里
3)數(shù)據(jù)處理函數(shù)工廠
4)使用多線程提取數(shù)據(jù)
一、數(shù)據(jù)庫連接類
cx_Oracle 是一個 Python 擴(kuò)展模塊,相當(dāng)于 python 的 Oracle 數(shù)據(jù)庫的驅(qū)動,通過使用所有數(shù)據(jù)庫訪問模塊通用的數(shù)據(jù)庫 API 來實現(xiàn) Oracle 數(shù)據(jù)庫的查詢和更新
Pandas 是基于 NumPy 開發(fā),為了解決數(shù)據(jù)分析任務(wù)的模塊,Pandas 引入了大量庫和一些標(biāo)準(zhǔn)的數(shù)據(jù)模型,提供了高效地操作大型數(shù)據(jù)集所需的方法類和函數(shù)
pandas 調(diào)用數(shù)據(jù)庫主要有 read_sql_table,read_sql_query,read_sql 三種方式
本文主要介紹一下 Pandas 中 read_sql_query 方法的使用
1:pd.read_sql_query()
讀取自定義數(shù)據(jù),返還DataFrame格式,通過SQL查詢腳本包括增刪改查。
pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
sql:要執(zhí)行的sql腳本,文本類型
con:數(shù)據(jù)庫連接
index_col:選擇返回結(jié)果集索引的列,文本/文本列表
coerce_float:非常有用,將數(shù)字形式的字符串直接以float型讀入
parse_dates:將某一列日期型字符串轉(zhuǎn)換為datetime型數(shù)據(jù),與pd.to_datetime函數(shù)功能類似。
params:向sql腳本中傳入的參數(shù),官方類型有列表,元組和字典。用于傳遞參數(shù)的語法是數(shù)據(jù)庫驅(qū)動程序相關(guān)的。
chunksize:如果提供了一個整數(shù)值,那么就會返回一個generator,每次輸出的行數(shù)就是提供的值的大小
read_sql_query()中可以接受SQL語句,DELETE,INSERT INTO、UPDATE操作沒有返回值(但是會在數(shù)據(jù)庫中執(zhí)行),程序會拋出SourceCodeCloseError,并終止程序。SELECT會返回結(jié)果。如果想繼續(xù)運行,可以try捕捉此異常。
2:pd.read_sql_table()
讀取數(shù)據(jù)庫中的表,返還DataFrame格式(通過表名)
import pandas as pd
pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
3:pd.read_sql()
讀數(shù)據(jù)庫通過SQL腳本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
以下創(chuàng)建連接 oracel 數(shù)據(jù)庫的連接類 Oracle_DB
主要提供 2 種操作數(shù)據(jù)的函數(shù)方法。
import cx_Oracle
# Pandas讀寫操作Oracle數(shù)據(jù)庫
import pandas as pd
# 避免編碼問題帶來的亂碼
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class Oracle_DB(object):
def __init__(self):
try:
# 連接oracle
# 方法1:sqlalchemy 提供的create_engine()
# from sqlalchemy import create_engine
# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
# #方法2:cx_Oracle.connect()
self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')
except cx_Oracle.Error as e:
print("Error %d:%s" % (e.args[0], e.args[1]))
exit()
# 查詢部分信息
def search_one(self, sql,sparm):
try:
# #查詢獲取數(shù)據(jù)用sql語句
# 代傳參數(shù):sparm--查詢指定字段參數(shù)
df = pd.read_sql_query(sql, self.engine,params=sparm)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
# 查詢?nèi)啃畔?/span>
def search_all(self, sql):
try:
# #查詢獲取數(shù)據(jù)用sql語句
df = pd.read_sql_query(sql, self.engine)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
二、數(shù)據(jù)提取主函數(shù)模塊
cx_Oracle 是一個 Python 擴(kuò)展模塊,相當(dāng)于 python 的 Oracle 數(shù)據(jù)庫的驅(qū)動,通過使用所有數(shù)據(jù)庫訪問模塊通用的數(shù)據(jù)庫 API 來實現(xiàn) Oracle 數(shù)據(jù)庫的查詢和更新。
1)外部輸入?yún)?shù)模塊
txt 文本中,就包含一列數(shù)據(jù),第一行列名,讀取的時候忽略第一行
#建立ID——編號字典
def buildid():
sqlid = """select * from b_build_info"""
db = Oracle_DB() # 實例化一個對象
b_build_info = db.search_all(sqlid)
ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
return ID_bUILDCODE
#通過文本傳入待導(dǎo)出數(shù)據(jù)清單
def read_task_list():
build_code=buildid()
tasklist=[]
is_first_line=True
with open("./b_lst.txt") as lst:
for line in lst:
if is_first_line:
is_first_line=False
continue
tasklist.append(build_code.get(line.strip('\n'))) #鍵值對轉(zhuǎn)換
return tasklist
2)業(yè)務(wù) sql 語句集合
= {'Start_time':'2021-04-01','End_time':'2021-05-01'},此處參數(shù)可根據(jù)需要改變def sql_d(lst):
# 逐月數(shù)據(jù)
sql_d_energy_item_month = """select * from d_energy_item_month
where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and recorddate < to_date(:End_time, 'yyyy-MM-dd')
and buildid in {0}
order by recorddate asc""".format(lst)
# 逐月數(shù)據(jù)
sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
and d.buildid = '{0}'
order by d.recorddate asc""".format(lst)
# 查詢當(dāng)日數(shù)據(jù)
sql_energy_item_hour_cheak = """select * from d_energy_item_hour
where trunc(sysdate)=trunc(recorddate)
order by recorddate asc""".format(lst)
sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
#此處省略部分sql語句
return sql_collection
3)業(yè)務(wù)數(shù)據(jù)處理
業(yè)務(wù)數(shù)據(jù)處理流程,原始數(shù)據(jù)后處理,這里不作介紹:
def db_extranction(lst,sparm,sql_type):
"""sql_type--輸入需要操作的sql業(yè)務(wù)序號"""
sql_=sql_d(lst)[sql_type] #輸出sql語句
db = Oracle_DB() # 實例化一個對象
res=db.search_one(sql_,sparm)
# 數(shù)據(jù)處理加工
RES=Data_item_factory(res) #此處省略
# res = db.search_all(sql_d_energy_item_month)
print(RES)
return RES多線程提取數(shù)據(jù)部分,這里 tasklist 列表多線程提取數(shù)據(jù)
import threading
# Pandas讀寫操作Oracle數(shù)據(jù)庫
from tools.Data_Update_oracle import Oracle_DB
import pandas as pd
from concurrent import futures
if __name__ == '__main__':
#外部傳入
tasklist= read_task_list()
print(tasklist)
# 輸入時間查找范圍參數(shù),可手動修改
sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
lst = tuple(list(tasklist))
#業(yè)務(wù)類型序號,可手動修改
sql_type=0
#全部提取
db_extranction(lst,sparm,sql_type)
#多線程按字段分批提取
方法一:使用threading模塊的Thread類的構(gòu)造器創(chuàng)建線程
#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
# [threads[i].start() for i in range(len(threads))]
方法二:使用python的concurrent庫,這是官方基于 threading 封裝,先安裝該庫
# with futures.ThreadPoolExecutor(len(tasklist)) as executor:
# executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)
到此整個數(shù)據(jù)庫取數(shù)工具開發(fā)流程介紹完畢,就差最后一步分享給小伙伴使用了,做成 GUI 應(yīng)用此處不做詳細(xì)介紹,構(gòu)建獨立的 python 環(huán)境,快速發(fā)布你的應(yīng)用
