<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python實(shí)時(shí)增量數(shù)據(jù)加載解決方案

          共 10834字,需瀏覽 22分鐘

           ·

          2022-01-21 01:21

          本次主要分享結(jié)合單例模式實(shí)際應(yīng)用案例:實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)加載工具的解決方案。最關(guān)鍵的是實(shí)現(xiàn)一個(gè)可進(jìn)行添加、修改、刪除等操作的增量ID記錄表。

          單例模式:提供全局訪問(wèn)點(diǎn),確保類有且只有一個(gè)特定類型的對(duì)象。通常用于以下場(chǎng)景:日志記錄或數(shù)據(jù)庫(kù)操作等,避免對(duì)用一資源請(qǐng)求沖突。

          單例模式篇:見(jiàn)以往推文單例模式。


          目錄:

          1、創(chuàng)建增量ID記錄表

          2、數(shù)據(jù)庫(kù)連接類

          3、增量數(shù)據(jù)服務(wù)客戶端

          4、結(jié)果測(cè)試


          ?創(chuàng)建增量ID記錄表



          import?sqlite3
          import?datetime
          import?pymssql
          import?pandas?as?pd
          import?time
          pd.set_option('expand_frame_repr',?False)


          ←導(dǎo)入所需模塊



          ?#?創(chuàng)建數(shù)據(jù)表
          database_path?=?r'.\Database\ID_Record.db'
          from?sqlite3?import?connect

          with?connect(database_path)?as?conn:
          ????conn.execute(
          ????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
          ←增量最新記錄ID-F_SDaqID_MAX數(shù)據(jù)庫(kù)存儲(chǔ)
          #數(shù)據(jù)保存到本地txt
          def?text_save(filename,?record):#filename為寫(xiě)入txt文件的路徑,record為要寫(xiě)入F_SDaqID_MAX、record_date數(shù)據(jù)列表.
          ????file?=?open(filename,'a')?追加方式
          ????#?file?=?open(filename,?'w')??#覆蓋方式
          ????for?i?in?range(len(record)):
          ????????s?=?str(record[i]).replace('[','').replace(']','')
          ????????s?=?s.replace("'",'').replace(',','')?+'\n'???#去除單引號(hào),逗號(hào),每行末尾追加換行符
          ????????file.write(s)
          ????file.close()
          增量最新記錄ID-F_SDaqID_MAX臨時(shí)文件存儲(chǔ)
          增量ID記錄提供了兩種實(shí)現(xiàn)方案 ,一個(gè)是數(shù)據(jù)持久化存儲(chǔ)模式,另一個(gè)是臨時(shí)文件存儲(chǔ)模式數(shù)據(jù)持久化模式顧名思義,也就是說(shuō)在創(chuàng)建對(duì)象的時(shí)候,能將操作關(guān)鍵信息如增量ID-F_SDaqID_MAX記錄下來(lái),這種flag記錄映射是常選擇的設(shè)計(jì)模式。


          ?數(shù)據(jù)庫(kù)連接類

          實(shí)現(xiàn)實(shí)時(shí)增量數(shù)據(jù)獲取需要實(shí)現(xiàn)兩個(gè)數(shù)據(jù)庫(kù)連接類:增量數(shù)據(jù)ID存儲(chǔ)類和增量目標(biāo)數(shù)據(jù)源類。這里利用單例模式實(shí)現(xiàn)數(shù)據(jù)庫(kù)操作類,將增量服務(wù)記錄信息按照順序存儲(chǔ)到數(shù)據(jù)庫(kù)或特定的日志文件中,以維護(hù)數(shù)據(jù)的一致性。

          1、增量數(shù)據(jù)ID存儲(chǔ)sqlite連接類代碼

          class?Database_sqlite(metaclass=MetaSingleton):
          ????database_path?=?r'.\Database\energy_rc_configure.db'
          ????connection?=?None
          ????def?connect(self):
          ????????if?self.connection?is?None:
          ????????????self.connection?=?sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None)
          ????????????self.cursorobj?=??self.connection.cursor()
          ????????return?self.cursorobj,self.connection

          ????#?插入最大記錄
          ????@staticmethod
          ????def?Insert_Max_ID_Record(f1,?f2):

          ????????cursor?=?Database_sqlite().connect()
          ????????print(cursor)

          ????????sql?=?f"""insert?into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values("{f1}","{f2}")"""
          ????????cursor[0].execute(sql)

          ????????#?sql?=?"insert??into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values(?,?)"
          ????????#?cursor[0].execute(sql,(f"{f1}",f"{f2}"))

          ????????cursor[1].commit()
          ????????print("插入成功!")
          ????????#?cursor[0].close()
          ????????return?

          ????#?取出增量數(shù)據(jù)庫(kù)中最新一次ID記錄
          ????@staticmethod
          ????def?View_Max_ID_Records():

          ????????cursor?=?Database_sqlite().connect()
          ????????sql?=?"select?max(F_SDaqID_MAX)?from?Incremental_data_max_id_record"
          ????????cursor[0].execute(sql)
          ????????results?=?cursor[0].fetchone()[0]
          ????????#?#單例模式不用關(guān)閉數(shù)據(jù)庫(kù)連接
          ????????#?cursor[0].close()
          ????????print("最新記錄ID",?results)
          ????????return?results

          ????#刪除數(shù)據(jù)記錄ID
          ????@staticmethod
          ????def?Del_Max_ID_Records():
          ????????cursor?=?Database_sqlite().connect()
          ????????sql?=?"delete?from?Incremental_data_max_id_record?where?record_date?=?(select?MAX(record_date)?from?Incremental_data_max_id_record)"
          ????????cursor[0].execute(sql)
          ????????#?results?=?cursor[0].fetchone()[0]
          ????????#?#?cursor[0].close()
          ????????cursor[1].commit()
          ????????print("刪除成功")
          ????????return

          2、增量數(shù)據(jù)源sqlserver連接類代碼


          class?Database_sqlserver(metaclass=MetaSingleton):
          ????"""
          ????#實(shí)時(shí)數(shù)據(jù)庫(kù)
          ????"
          ""
          ????connection?=?None

          ????#?def?connect(self):
          ????def?__init__(self):
          ????????if?self.connection?is?None:
          ????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
          ????????????if?self.connection:
          ????????????????print("連接成功!")
          ????????????#?打開(kāi)數(shù)據(jù)庫(kù)連接
          ????????????self.cursorobj?=?self.connection.cursor()
          ????????#?return?self.cursorobj,?self.connection

          ????#?獲取數(shù)據(jù)源中最大ID
          ????@staticmethod
          ????def?get_F_SDaqID_MAX():
          ????????#?cursor_insert?=?Database_sqlserver().connect()
          ????????cursor_insert?=?Database_sqlserver().cursorobj

          ????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy"""

          ????????cursor_insert.execute(sql_MAXID)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù)

          ????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?獲取記錄

          ????????print("最大ID值:{0}".format(F_SDaqID_MAX))

          ????????return?F_SDaqID_MAX

          ????#?提取增量數(shù)據(jù)
          ????@staticmethod
          ????def?get_incremental_data(incremental_Max_ID):
          ????????#?開(kāi)始獲取增量數(shù)據(jù)
          ????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format(
          ????????????incremental_Max_ID)

          ????????#?cursor_find?=?Database_sqlserver().connect()
          ????????cursor_find?=?Database_sqlserver().cursorobj

          ????????cursor_find.execute(sql_incremental_data)??#?執(zhí)行查詢語(yǔ)句,選擇表中所有數(shù)據(jù)

          ????????Target_data_source?=?cursor_find.fetchall()??#?獲取所有數(shù)據(jù)記錄

          ????????#?cursor_find.close()
          ????????cursor_find.close()

          ????????df?=?pd.DataFrame(
          ????????????Target_data_source,
          ????????????columns=[
          ????????????????"F_ID",
          ????????????????"F_Datetime",
          ????????????????"F_Data"])

          ????????print("提取數(shù)據(jù)",?df)
          ????????return?df
          ????????


          數(shù)據(jù)資源應(yīng)用服務(wù)設(shè)計(jì)主要考慮數(shù)據(jù)庫(kù)操作的一致性和優(yōu)化數(shù)據(jù)庫(kù)的各種操作,提高內(nèi)存或CPU利用率。

          實(shí)現(xiàn)多種讀取和寫(xiě)入操作,客戶端操作調(diào)用API,執(zhí)行相應(yīng)的DB操作。

          注:

          1、使用metaclass實(shí)現(xiàn)創(chuàng)建具有單例特征的類

          Database_sqlserver(metaclass=MetaSingleton)

          Database_sqlite(metaclass=MetaSingleton)

          使用class定義新類時(shí),數(shù)據(jù)庫(kù)類Database_sqlserverMetaSingleton裝飾后即metaclass,那么MetaSingleton的特殊方法__call__方法將自動(dòng)執(zhí)行。


          class?MetaSingleton(type):
          ????_instances={}
          ????def?__call__(cls,?*args,?**kwargs):
          ????????if?cls?not?in?cls._instances:
          ????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
          ????????return?cls._instances[cls]


          以上代碼基于元類的單例實(shí)現(xiàn),當(dāng)客戶端對(duì)數(shù)據(jù)庫(kù)執(zhí)行某些操作時(shí),會(huì)多次實(shí)例化數(shù)據(jù)庫(kù)類,但是只創(chuàng)建一個(gè)對(duì)象,所以對(duì)數(shù)據(jù)庫(kù)的調(diào)用是同步的。

          2、多線程使用同一數(shù)據(jù)庫(kù)連接資源需采取一定同步機(jī)制

          如果沒(méi)采用同步機(jī)制,可能出現(xiàn)一些意料之外的情況

          1)with?cls.lock加鎖


          class?MetaSingleton(type):
          ????_instances={}
          ????lock?=?threading.Lock()
          ????def?__call__(cls,?*args,?**kwargs):
          ????????with?cls.lock:
          ????????????if?cls?not?in?cls._instances:
          ????????????????time.sleep(0.05)??#模擬耗時(shí)
          ????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs)
          ????????????return?cls._instances[cls]


          鎖的創(chuàng)建和釋放需要消耗資源,上面代碼每次創(chuàng)建都必須獲得鎖。

          3、如果我們開(kāi)發(fā)的程序非單個(gè)應(yīng)用,而是集群化的,即多個(gè)客戶端共享單個(gè)數(shù)據(jù)庫(kù),導(dǎo)致數(shù)據(jù)庫(kù)操作無(wú)法同步,而數(shù)據(jù)庫(kù)連接池是更好的選擇。大節(jié)省了內(nèi)存,提高了服務(wù)器地服務(wù)效率,能夠支持更多的客戶服務(wù)。

          數(shù)據(jù)庫(kù)連接池的解決方案是在應(yīng)用程序啟動(dòng)時(shí)建立足夠的數(shù)據(jù)庫(kù)連接,并講這些連接組成一個(gè)連接池,由應(yīng)用程序動(dòng)態(tài)地對(duì)池中的連接進(jìn)行申請(qǐng)、使用和釋放。對(duì)于多于連接池中連接數(shù)的并發(fā)請(qǐng)求,應(yīng)該在請(qǐng)求隊(duì)列中排隊(duì)等待。


          ?增量數(shù)據(jù)服務(wù)客戶端

          增量處理策略:第一次加載先判斷增量數(shù)據(jù)表中是否存在最新記錄,若有直接加載;否則,記錄一下最大/最新的數(shù)據(jù)記錄ID或時(shí)間點(diǎn),保存到一個(gè)增量數(shù)據(jù)庫(kù)或記錄文件中。

          ?從第二次加載開(kāi)始只加載最大/最新的ID時(shí)間點(diǎn)以后的數(shù)據(jù)。當(dāng)加載過(guò)程全部成功完成之后并同步更新增量數(shù)據(jù)庫(kù)或記錄文件,更新這次數(shù)據(jù)記錄的最后記錄ID或時(shí)間點(diǎn)。

          一般這類數(shù)據(jù)記錄表有自增長(zhǎng)列,那么也可以使用自增長(zhǎng)列來(lái)實(shí)現(xiàn)這個(gè)標(biāo)識(shí)特征。比如本次我用到數(shù)據(jù)表增長(zhǎng)F_ID。


          class?IncrementalRecordServer:
          ????_servers?=?[]
          ????_instance?=?None
          ????def?__new__(cls,?*args,?**kwargs):
          ????????if?not?IncrementalRecordServer._instance:
          ????????????#?IncrementalRecordServer._instance?=?super().__new__(cls)
          ????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls)
          ????????return?IncrementalRecordServer._instance

          ????def?__init__(self,changeServersID=None):

          ????????"""
          ????????變量初始化過(guò)程
          ????????"
          ""
          ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
          ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
          ????????self.changeServersID?=?changeServersID

          ????#?回調(diào)更新本地記錄,清空記錄替換,臨時(shí)記錄
          ????def?record(func):
          ????????def?Server_record(self):
          ????????????v?=?func(self)
          ????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)
          ????????????print("保存成功")

          ????????????return?v
          ????????return?Server_record

          ????#增加服務(wù)記錄
          ????@record
          ????def?addServer(self):
          ????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
          ????????print("添加記錄")
          ????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date)

          ????#修改服務(wù)記錄
          ????@record
          ????def?changeServers(self):
          ????????#?self._servers.pop()
          ????????#?此處傳入手動(dòng)修改的記錄ID
          ????????self._servers.append([self.changeServersID,self.record_date])
          ????????#先刪除再插入實(shí)現(xiàn)修改
          ????????Database_sqlite.Del_Max_ID_Records()
          ????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date)
          ????????print("更新記錄")

          ????#刪除服務(wù)記錄
          ????@record
          ????def?popServers(self):
          ????????#?self._servers.pop()
          ????????print("刪除記錄")
          ????????Database_sqlite.Del_Max_ID_Records()

          ????#?最新服務(wù)記錄
          ????def?getServers(self):
          ????????#?print(self._servers[-1])
          ????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records()
          ????????print("查看記錄",Max_ID_Records)
          ????????return?Max_ID_Records

          ????#提取數(shù)據(jù)
          ????def?Incremental_data_client(self):
          ????????"""
          ????????#?提取數(shù)據(jù)(增量數(shù)據(jù)MAXID獲取,并提取增量數(shù)據(jù))
          ????????"
          ""
          ????????#?實(shí)時(shí)數(shù)據(jù)庫(kù)
          ????????#?第一次加載先判斷是否存在最新記錄
          ????????if?self.getServers()?==?None:
          ????????????#?插入增量數(shù)據(jù)庫(kù)ID
          ????????????self.addServer()
          ????????????#?提取增量數(shù)據(jù)
          ????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
          ????????????return?data

          ????????#?獲取增量數(shù)據(jù)庫(kù)中已有的最新最大ID記錄
          ????????incremental_Max_ID?=?self.getServers()

          ????????#添加記錄
          ????????self.addServer()
          ????????#?提取增量數(shù)據(jù)
          ????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID)

          ????????return?Target_data_source


          優(yōu)化策略:

          1、延遲加載方式

          以上增量記錄服務(wù)類IncrementalRecordServer通過(guò)覆蓋__new__方法來(lái)控制對(duì)象的創(chuàng)建,我們?cè)趧?chuàng)建對(duì)象的時(shí)候會(huì)先檢查對(duì)象是否存在。也可以通過(guò)懶加載的方式實(shí)現(xiàn),節(jié)約資源優(yōu)化如下。


          class?IncrementalRecordServer:
          ????_servers?=?[]
          ????_instance?=?None

          ????def?__init__(self,changeServersID=None):
          ????????"""
          ????????變量初始化過(guò)程
          ????????"
          ""
          ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX()
          ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S')
          ????????self.changeServersID?=?changeServersID

          ????????if?not?IncrementalRecordServer._instance:
          ????????????print("__init__對(duì)象創(chuàng)建")
          ????????else:
          ????????????print("對(duì)象已經(jīng)存在:",IncrementalRecordServer._instance)
          ????????????self.getInstance()

          ????@classmethod
          ????def?getInstance(cls):
          ????????if?not?cls._instance:
          ????????????cls._instance?=?IncrementalRecordServer()
          ????????return?cls._instance


          懶漢式實(shí)例化能夠確保實(shí)際需要時(shí)才創(chuàng)建對(duì)象,實(shí)例化a= IncrementalRecordServer()時(shí),調(diào)用初始化__init__方法,但是沒(méi)有新的對(duì)象創(chuàng)建。懶漢式這種方式加載類對(duì)象,也稱為延遲加載方式。

          2、單例模式能有效利用空間資源,每次利用同一空間資源。

          不同操作對(duì)象的內(nèi)存地址相同,且不同對(duì)象初始化將上一個(gè)對(duì)象初始化變量覆蓋,確保最新記錄實(shí)時(shí)更新。表面上以上代碼實(shí)現(xiàn)了單例模式?jīng)]問(wèn)題,但多線程并發(fā)情況下,存在線程安全問(wèn)題,可能同時(shí)創(chuàng)建不同的對(duì)象空間。考慮到線程安全,也可以進(jìn)一步加鎖處理.

          3、適用范圍及注意事項(xiàng)

          本次代碼適用于部署生產(chǎn)指定時(shí)間點(diǎn)運(yùn)行之后產(chǎn)出的增量數(shù)據(jù),長(zhǎng)時(shí)間未啟用再啟動(dòng)需要清空歷史記錄即增量數(shù)據(jù)庫(kù)或文件ID需清空,一般實(shí)時(shí)數(shù)據(jù)增量實(shí)現(xiàn)一次加載沒(méi)有什么問(wèn)題,所以這一點(diǎn)也不用很關(guān)注(文件方式代碼可自行完善);當(dāng)加載歷史數(shù)據(jù)庫(kù)或定時(shí)間隔產(chǎn)生數(shù)據(jù)量過(guò)大時(shí),需要進(jìn)一步修改代碼,需要判斷數(shù)據(jù)規(guī)模,指定起始節(jié)點(diǎn)及加載數(shù)據(jù)量,綜合因素考慮,下次分享一下億級(jí)數(shù)據(jù)量提取方案

          4、進(jìn)一步了解Python垃圾回收機(jī)制;并發(fā)情況下,通過(guò)優(yōu)化線程池來(lái)管理資源。

          最后可以添加一個(gè)函數(shù)來(lái)釋放資源

          def?__del__(self):
          ????class_name?=?self.__class__.__name__
          ????print(class_name,"銷毀")

          del obj 調(diào)用__del__() 銷毀對(duì)象,釋放其空間;只有Python 對(duì)象在不再引用對(duì)象時(shí)被釋放。當(dāng)程序中有其它變量引用該實(shí)例對(duì)象時(shí),即便手動(dòng)調(diào)用 __del__() 方法,該方法也不會(huì)立即執(zhí)行。這和 Python 的垃圾回收機(jī)制的實(shí)現(xiàn)有關(guān)。


          結(jié)果測(cè)試


          if?__name__?==?'__main__':
          ????for?i?in?range(6):
          ????????hc1?=?IncrementalRecordServer()
          ????????hc1.addServer()
          ????????print("Record_ID",hc1._servers[i])
          ????????#?del?hc1
          ????????time.sleep(60)

          ????#Server2-客戶端client
          ????#?最新服務(wù)記錄
          ????hc2?=?IncrementalRecordServer()
          ????hc2.getServers()
          ????#查看增量數(shù)據(jù)
          ????hc2.Incremental_data_client()


          插入記錄

          模擬每1分鐘插入一條記錄,向增量數(shù)據(jù)庫(kù)插入7條


          if?__name__?==?'__main__':
          ????#?Server3-客戶端client
          ????#?手動(dòng)添加增量起始ID記錄
          ????hc3?=?IncrementalRecordServer(changeServersID='346449980')
          ????hc3.changeServers()



          if?__name__?==?'__main__':
          ????#刪除ID
          ????hc3?=?IncrementalRecordServer(changeServersID='346449980')
          ????#?hc3.changeServers()
          ????hc3.popServers()


          以上為本次分享的全部?jī)?nèi)容,文中已包含所有源代碼,各位小伙伴趕快動(dòng)手實(shí)踐一下吧!原創(chuàng)不易,歡迎點(diǎn)贊、分享支持。

          瀏覽 73
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  麻豆传媒学姐 | 日本九九热| 我想看中国台湾特级黄色录像1级特黄特黄的 | 啪啪啪免费 | 欧美日韩A片免费观看 |