Python實(shí)時(shí)增量數(shù)據(jù)加載解決方案
單例模式詳解篇:見(jiàn)以往推文單例模式。
?創(chuàng)建增量ID記錄表
import?sqlite3
import?datetime
import?pymssql
import?pandas?as?pd
import?time
pd.set_option('expand_frame_repr',?False)←導(dǎo)入所需模塊→
?#?創(chuàng)建數(shù)據(jù)表
database_path?=?r'.\Database\ID_Record.db'
from?sqlite3?import?connect
with?connect(database_path)?as?conn:
????conn.execute(
????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
?數(shù)據(jù)庫(kù)連接類
2、增量數(shù)據(jù)源sqlserver連接類代碼
class?Database_sqlserver(metaclass=MetaSingleton):
????"""
????#實(shí)時(shí)數(shù)據(jù)庫(kù)
????"""
????connection?=?None
????#?def?connect(self):
????def?__init__(self):
????????if?self.connection?is?None:
????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
????????????if?self.connection:
????????????????print("連接成功!")
????????????#?打開(kāi)數(shù)據(jù)庫(kù)連接
????????????self.cursorobj?=?self.connection.cursor()
????????#?return?self.cursorobj,?self.connection
????#?獲取數(shù)據(jù)源中最大ID
????@staticmethod
????def?get_F_SDaqID_MAX():
????????#?cursor_insert?=?Database_sqlserver().connect()
????????cursor_insert?=?Database_sqlserver().cursorobj
????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy"""
????????cursor_insert.execute(sql_MAXID)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù)
????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?獲取記錄
????????print("最大ID值:{0}".format(F_SDaqID_MAX))
????????return?F_SDaqID_MAX
????#?提取增量數(shù)據(jù)
????@staticmethod
????def?get_incremental_data(incremental_Max_ID):
????????#?開(kāi)始獲取增量數(shù)據(jù)
????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format(
????????????incremental_Max_ID)
????????#?cursor_find?=?Database_sqlserver().connect()
????????cursor_find?=?Database_sqlserver().cursorobj
????????cursor_find.execute(sql_incremental_data)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù)
????????Target_data_source?=?cursor_find.fetchall()??#?獲取所有數(shù)據(jù)記錄
????????#?cursor_find.close()
????????cursor_find.close()
????????df?=?pd.DataFrame(
????????????Target_data_source,
????????????columns=[
????????????????"F_ID",
????????????????"F_Datetime",
????????????????"F_Data"])
????????print("提取數(shù)據(jù)",?df)
????????return?df
????????
數(shù)據(jù)資源應(yīng)用服務(wù)設(shè)計(jì)主要考慮數(shù)據(jù)庫(kù)操作的一致性和優(yōu)化數(shù)據(jù)庫(kù)的各種操作,提高內(nèi)存或CPU利用率。
實(shí)現(xiàn)多種讀取和寫(xiě)入操作,客戶端操作調(diào)用API,執(zhí)行相應(yīng)的DB操作。
注:
1、使用metaclass實(shí)現(xiàn)創(chuàng)建具有單例特征的類
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
使用class定義新類時(shí),數(shù)據(jù)庫(kù)類Database_sqlserver由MetaSingleton裝飾后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法將自動(dòng)執(zhí)行。
class?MetaSingleton(type):
????_instances={}
????def?__call__(cls,?*args,?**kwargs):
????????if?cls?not?in?cls._instances:
????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
????????return?cls._instances[cls]
以上代碼基于元類的單例實(shí)現(xiàn),當(dāng)客戶端對(duì)數(shù)據(jù)庫(kù)執(zhí)行某些操作時(shí),會(huì)多次實(shí)例化數(shù)據(jù)庫(kù)類,但是只創(chuàng)建一個(gè)對(duì)象,所以對(duì)數(shù)據(jù)庫(kù)的調(diào)用是同步的。
2、多線程使用同一數(shù)據(jù)庫(kù)連接資源需采取一定同步機(jī)制
如果沒(méi)采用同步機(jī)制,可能出現(xiàn)一些意料之外的情況
1)with?cls.lock加鎖
class?MetaSingleton(type):
????_instances={}
????lock?=?threading.Lock()
????def?__call__(cls,?*args,?**kwargs):
????????with?cls.lock:
????????????if?cls?not?in?cls._instances:
????????????????time.sleep(0.05)??#模擬耗時(shí)
????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
????????????return?cls._instances[cls]
鎖的創(chuàng)建和釋放需要消耗資源,上面代碼每次創(chuàng)建都必須獲得鎖。
3、如果我們開(kāi)發(fā)的程序非單個(gè)應(yīng)用,而是集群化的,即多個(gè)客戶端共享單個(gè)數(shù)據(jù)庫(kù),導(dǎo)致數(shù)據(jù)庫(kù)操作無(wú)法同步,而數(shù)據(jù)庫(kù)連接池是更好的選擇。大大節(jié)省了內(nèi)存,提高了服務(wù)器地服務(wù)效率,能夠支持更多的客戶服務(wù)。
數(shù)據(jù)庫(kù)連接池的解決方案是在應(yīng)用程序啟動(dòng)時(shí)建立足夠的數(shù)據(jù)庫(kù)連接,并講這些連接組成一個(gè)連接池,由應(yīng)用程序動(dòng)態(tài)地對(duì)池中的連接進(jìn)行申請(qǐng)、使用和釋放。對(duì)于多于連接池中連接數(shù)的并發(fā)請(qǐng)求,應(yīng)該在請(qǐng)求隊(duì)列中排隊(duì)等待。
?增量數(shù)據(jù)服務(wù)客戶端
class?IncrementalRecordServer:
????_servers?=?[]
????_instance?=?None
????def?__new__(cls,?*args,?**kwargs):
????????if?not?IncrementalRecordServer._instance:
????????????#?IncrementalRecordServer._instance?=?super().__new__(cls)
????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls)
????????return?IncrementalRecordServer._instance
????def?__init__(self,changeServersID=None):
????????"""
????????變量初始化過(guò)程
????????"""
????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
????????self.changeServersID?=?changeServersID
????#?回調(diào)更新本地記錄,清空記錄替換,臨時(shí)記錄
????def?record(func):
????????def?Server_record(self):
????????????v?=?func(self)
????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)
????????????print("保存成功")
????????????return?v
????????return?Server_record
????#增加服務(wù)記錄
????@record
????def?addServer(self):
????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
????????print("添加記錄")
????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date)
????#修改服務(wù)記錄
????@record
????def?changeServers(self):
????????#?self._servers.pop()
????????#?此處傳入手動(dòng)修改的記錄ID
????????self._servers.append([self.changeServersID,self.record_date])
????????#先刪除再插入實(shí)現(xiàn)修改
????????Database_sqlite.Del_Max_ID_Records()
????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date)
????????print("更新記錄")
????#刪除服務(wù)記錄
????@record
????def?popServers(self):
????????#?self._servers.pop()
????????print("刪除記錄")
????????Database_sqlite.Del_Max_ID_Records()
????#?最新服務(wù)記錄
????def?getServers(self):
????????#?print(self._servers[-1])
????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records()
????????print("查看記錄",Max_ID_Records)
????????return?Max_ID_Records
????#提取數(shù)據(jù)
????def?Incremental_data_client(self):
????????"""
????????#?提取數(shù)據(jù)(增量數(shù)據(jù)MAXID獲取,并提取增量數(shù)據(jù))
????????"""
????????#?實(shí)時(shí)數(shù)據(jù)庫(kù)
????????#?第一次加載先判斷是否存在最新記錄
????????if?self.getServers()?==?None:
????????????#?插入增量數(shù)據(jù)庫(kù)ID
????????????self.addServer()
????????????#?提取增量數(shù)據(jù)
????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
????????????return?data
????????#?獲取增量數(shù)據(jù)庫(kù)中已有的最新最大ID記錄
????????incremental_Max_ID?=?self.getServers()
????????#添加記錄
????????self.addServer()
????????#?提取增量數(shù)據(jù)
????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID)
????????return?Target_data_source
class?IncrementalRecordServer:
????_servers?=?[]
????_instance?=?None
????def?__init__(self,changeServersID=None):
????????"""
????????變量初始化過(guò)程
????????"""
????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
????????self.changeServersID?=?changeServersID
????????if?not?IncrementalRecordServer._instance:
????????????print("__init__對(duì)象創(chuàng)建")
????????else:
????????????print("對(duì)象已經(jīng)存在:",IncrementalRecordServer._instance)
????????????self.getInstance()
????@classmethod
????def?getInstance(cls):
????????if?not?cls._instance:
????????????cls._instance?=?IncrementalRecordServer()
????????return?cls._instancedef?__del__(self):
????class_name?=?self.__class__.__name__
????print(class_name,"銷毀")if?__name__?==?'__main__':
????for?i?in?range(6):
????????hc1?=?IncrementalRecordServer()
????????hc1.addServer()
????????print("Record_ID",hc1._servers[i])
????????#?del?hc1
????????time.sleep(60)
????#Server2-客戶端client
????#?最新服務(wù)記錄
????hc2?=?IncrementalRecordServer()
????hc2.getServers()
????#查看增量數(shù)據(jù)
????hc2.Incremental_data_client()


if?__name__?==?'__main__':
????#?Server3-客戶端client
????#?手動(dòng)添加增量起始ID記錄
????hc3?=?IncrementalRecordServer(changeServersID='346449980')
????hc3.changeServers()


if?__name__?==?'__main__':
????#刪除ID
????hc3?=?IncrementalRecordServer(changeServersID='346449980')
????#?hc3.changeServers()
????hc3.popServers()

