python 使用mysql-connector 正確的批量寫入數(shù)據(jù)
批量寫入數(shù)據(jù)需要使用executemany方法,這個方法并不是要執(zhí)行很多條sql,而是一條有待格式化的字符串和多條需要寫入數(shù)據(jù)庫的數(shù)據(jù)。
1. 新建user 表
為了試驗(yàn)批量寫入,新建一張user表
create table user(
id int NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL,
`age` int NOT NULL,
PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
2. executemany
import time
import mysql.connector
# 第一步,創(chuàng)建連接
mydb = mysql.connector.connect(
host="10.110.30.3", # 數(shù)據(jù)庫主機(jī)地址
user="flink_user", # 數(shù)據(jù)庫用戶名
passwd="123456", # 數(shù)據(jù)庫密碼
port=6606,
database='flink_db'
)
sql = "insert into user(name, age)values(%s,%s)"
citys = [
('小明', 14) for i in range(100000)
]
t1 = time.time()
mycursor = mydb.cursor() # 創(chuàng)建cursor
mycursor.executemany(sql, citys) # 批量執(zhí)行
mydb.commit() # 提交
t2 = time.time()
print(t2-t1) # 71秒
想要使用executemany方法,必須遵循以下兩條規(guī)則和一條潛規(guī)則,咱們先說前兩條規(guī)則
sql語句中,values不符需要使用%s占位符,不管字段是什么類型
seq_params 參數(shù)要么是[tuple, tuple...]的形式,要么是(tuple, tuple...) 的形式
這兩條規(guī)則是明處的規(guī)則,我上面的代碼已經(jīng)遵守了這兩條規(guī)則,但是呢,插入10萬條數(shù)據(jù)竟然用了71秒,這個速度著實(shí)不樂觀,我確定是executemany方法使用的不正確,那么問題出在哪里呢?
3. executemany 與 execute
很多文章里說,executemany就是多次調(diào)用執(zhí)行execute方法,現(xiàn)在實(shí)驗(yàn)一下,執(zhí)行10萬次execute,看需要多久
sql = "insert into user(name, age)values('小明', 14)"
t1 = time.time()
mycursor = mydb.cursor() # 創(chuàng)建cursor
for i in range(100000):
mycursor.execute(sql)
mydb.commit() # 提交
t2 = time.time()
print(t2-t1) # 68.127
批量寫入10萬條數(shù)據(jù),與10萬次寫入操作耗時幾乎相同,這么看來,似乎executemany 多次調(diào)用了execute方法,只有這樣才能解釋實(shí)驗(yàn)現(xiàn)象。
但如果是這樣,還有必要分出executemany 和 execute方法么,豈不是多此一舉?
4. 正確的使用executemany批量寫入數(shù)據(jù)
批量寫入一定比多次單條寫入要快,就算是多次單條寫入是在最后進(jìn)行commit也是如此,這是批量寫入的性能優(yōu)勢,數(shù)據(jù)是分批發(fā)送到mysql的,減少了網(wǎng)絡(luò)往返所需要的時間。
事情到了這一步,就需要深入到源碼來一探究竟。
def executemany(self, operation, seq_params):
"""Execute the given operation multiple times"""
# Optimize INSERTs by batching them
if re.match(RE_SQL_INSERT_STMT, operation):
if not seq_params:
self._rowcount = 0
return None
stmt = self._batch_insert(operation, seq_params)
if stmt is not None:
return self.execute(stmt)
rowcnt = 0
try:
for params in seq_params:
self.execute(operation, params)
except (ValueError, TypeError) as err:
raise errors.ProgrammingError(
"Failed executing the operation; {0}".format(err))
self._rowcount = rowcnt
return None
我去除掉了一些無關(guān)緊要的代碼,只保留了最核心的部分, 注意看這部分代碼
# Optimize INSERTs by batching them
這里明顯是批處理優(yōu)化,它采用優(yōu)化的前提條件是sql語句要匹配上RE_SQL_INSERT_STMT,否則就要逐條執(zhí)行execute 方法,也就是很多文章里的說法。那么什么樣的sql才能是開啟批處理優(yōu)化的sql呢,我的sql為什么不行呢,讓我們看一下這個正則表達(dá)式
RE_SQL_INSERT_STMT = re.compile(
r"({0}|\s)*INSERT({0}|\s)*INTO\s+[`'\"]?.+[`'\"]?(?:\.[`'\"]?.+[`'\"]?)"
r"{{0,2}}\s+VALUES\s*\(.+(?:\s*,.+)*\)".format(SQL_COMMENT),
re.I | re.M | re.S)
這個正則不是特別容易理解,但是我逐一到VALUES的前面有一個\s+,它表示1個或多個空格,而我的sql是“insert into user(name, age)values(%s,%s)”, 剛好沒有空格,難不成是這個原因?qū)е聅ql語句沒有匹配上正則表達(dá)式,最終沒有執(zhí)行批處理優(yōu)化,修改代碼測試一下
sql = "insert into user(name, age) values(%s,%s)"
citys = [
('小明', 14) for i in range(100000)
]
t1 = time.time()
mycursor = mydb.cursor() # 創(chuàng)建cursor
mycursor.executemany(sql, citys) # 批量執(zhí)行
mydb.commit() # 提交
t2 = time.time()
print(t2-t1) # 1.6 秒
修改后,寫入10萬條數(shù)據(jù),僅僅需要1.6秒,原來是書寫不規(guī)范導(dǎo)致不能進(jìn)行批量插入操作,可是這么多年來,我一直都是這樣寫sql語句,values前面緊跟著右括號,也符合sql語法,這真是一個大坑啊。
