Python操作MySQL
點擊關(guān)注上方“SQL數(shù)據(jù)庫開發(fā)”,
設(shè)為“置頂或星標”,第一時間送達干貨
一. python操作數(shù)據(jù)庫介紹
Python 標準數(shù)據(jù)庫接口為 Python DB-API,Python DB-API為開發(fā)人員提供了數(shù)據(jù)庫應(yīng)用編程接口。Python 數(shù)據(jù)庫接口支持非常多的數(shù)據(jù)庫,你可以選擇適合你項目的數(shù)據(jù)庫:
GadFly mSQL MySQL PostgreSQL Microsoft SQL Server 2000 Informix Interbase Oracle Sybase ...
你可以訪問Python數(shù)據(jù)庫接口及API查看詳細的支持數(shù)據(jù)庫列表。
不同的數(shù)據(jù)庫你需要下載不同的DB API模塊,例如你需要訪問Oracle數(shù)據(jù)庫和Mysql數(shù)據(jù),你需要下載Oracle和MySQL數(shù)據(jù)庫模塊。
DB-API 是一個規(guī)范. 它定義了一系列必須的對象和數(shù)據(jù)庫存取方式, 以便為各種各樣的底層數(shù)據(jù)庫系統(tǒng)和多種多樣的數(shù)據(jù)庫接口程序提供一致的訪問接口 。
Python的DB-API,為大多數(shù)的數(shù)據(jù)庫實現(xiàn)了接口,使用它連接各數(shù)據(jù)庫后,就可以用相同的方式操作各數(shù)據(jù)庫。
Python DB-API使用流程:
引入 API 模塊。 獲取與數(shù)據(jù)庫的連接。 執(zhí)行SQL語句和存儲過程。 關(guān)閉數(shù)據(jù)庫連接。
二. python操作MySQL模塊
Python操作MySQL主要使用兩種方式:
DB模塊(原生SQL) PyMySQL(支持python2.x/3.x) MySQLdb(目前僅支持python2.x) ORM框架 SQLAchemy
2.1 PyMySQL模塊
本文主要介紹PyMySQL模塊,MySQLdb使用方式類似
2.1.1 安裝PyMySQL
PyMySQL是一個Python編寫的MySQL驅(qū)動程序,讓我們可以用Python語言操作MySQL數(shù)據(jù)庫。
pip?install?PyMySQL
2.2 基本使用
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
#?創(chuàng)建連接
conn?=?pymysql.connect(host="127.0.0.1",?port=3306,?user='zff',?passwd='zff123',?db='zff',?charset='utf8mb4')
#?創(chuàng)建游標(查詢數(shù)據(jù)返回為元組格式)
#?cursor?=?conn.cursor()
#?創(chuàng)建游標(查詢數(shù)據(jù)返回為字典格式)
cursor?=?conn.cursor(pymysql.cursors.DictCursor)
#?1.?執(zhí)行SQL,返回受影響的行數(shù)
effect_row1?=?cursor.execute("select?*?from?USER")
#?2.?執(zhí)行SQL,返回受影響的行數(shù),一次插入多行數(shù)據(jù)
effect_row2?=?cursor.executemany("insert?into?USER?(NAME)?values(%s)",?[("jack"),?("boom"),?("lucy")])??#?3
#?查詢所有數(shù)據(jù),返回數(shù)據(jù)為元組格式
result?=?cursor.fetchall()
#?增/刪/改均需要進行commit提交,進行保存
conn.commit()
#?關(guān)閉游標
cursor.close()
#?關(guān)閉連接
conn.close()
print(result)
"""
[{'id':?6,?'name':?'boom'},?{'id':?5,?'name':?'jack'},?{'id':?7,?'name':?'lucy'},?{'id':?4,?'name':?'tome'},?{'id':?3,?'name':?'zff'},?{'id':?1,?'name':?'zhaofengfeng'},?{'id':?2,?'name':?'zhaofengfeng02'}]
"""
2.3 獲取最新創(chuàng)建的數(shù)據(jù)自增ID
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
#?創(chuàng)建連接
conn?=?pymysql.connect(host="127.0.0.1",?port=3306,?user='zff',?passwd='zff123',?db='zff',?charset='utf8mb4')
#?創(chuàng)建游標(查詢數(shù)據(jù)返回為元組格式)
cursor?=?conn.cursor()
#?獲取新創(chuàng)建數(shù)據(jù)自增ID
effect_row?=?cursor.executemany("insert?into?USER?(NAME)values(%s)",?[("eric")])
#?增刪改均需要進行commit提交
conn.commit()
#?關(guān)閉游標
cursor.close()
#?關(guān)閉連接
conn.close()
new_id?=?cursor.lastrowid
print(new_id)
"""
8
"""
2.4 查詢操作
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
#?創(chuàng)建連接
conn?=?pymysql.connect(host="127.0.0.1",?port=3306,?user='zff',?passwd='zff123',?db='zff',?charset='utf8mb4')
#?創(chuàng)建游標
cursor?=?conn.cursor()
cursor.execute("select?*?from?USER")
#?獲取第一行數(shù)據(jù)
row_1?=?cursor.fetchone()
#?獲取前n行數(shù)據(jù)
row_2?=?cursor.fetchmany(3)
#
#?#?獲取所有數(shù)據(jù)
row_3?=?cursor.fetchall()
#?關(guān)閉游標
cursor.close()
#?關(guān)閉連接
conn.close()
print(row_1)
print(row_2)
print(row_3)
?? 在fetch數(shù)據(jù)時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:
cursor.scroll(1,mode='relative') ?# 相對當前位置移動 cursor.scroll(2,mode='absolute') ?# 相對絕對位置移動
2.5 防止SQL注入
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
#?創(chuàng)建連接
conn?=?pymysql.connect(host="127.0.0.1",?port=3306,?user='zff',?passwd='zff123',?db='zff',?charset='utf8mb4')
#?創(chuàng)建游標
cursor?=?conn.cursor()
#?存在sql注入情況(不要用格式化字符串的方式拼接SQL)
sql?=?"insert?into?USER?(NAME)?values('%s')"?%?('zhangsan',)
effect_row?=?cursor.execute(sql)
#?正確方式一
#?execute函數(shù)接受一個元組/列表作為SQL參數(shù),元素個數(shù)只能有1個
sql?=?"insert?into?USER?(NAME)?values(%s)"
effect_row1?=?cursor.execute(sql,?['wang6'])
effect_row2?=?cursor.execute(sql,?('wang7',))
#?正確方式二
sql?=?"insert?into?USER?(NAME)?values(%(name)s)"
effect_row1?=?cursor.execute(sql,?{'name':?'wudalang'})
#?寫入插入多行數(shù)據(jù)
effect_row2?=?cursor.executemany("insert?into?USER?(NAME)?values(%s)",?[('ermazi'),?('dianxiaoer')])
#?提交
conn.commit()
#?關(guān)閉游標
cursor.close()
#?關(guān)閉連接
conn.close()
這樣,SQL操作就更安全了。如果需要更詳細的文檔參考PyMySQL文檔吧。不過好像這些SQL數(shù)據(jù)庫的實現(xiàn)還不太一樣,PyMySQL的參數(shù)占位符使用%s這樣的C格式化符,而Python自帶的sqlite3模塊的占位符好像是問號(?)。因此在使用其他數(shù)據(jù)庫的時候還是仔細閱讀文檔吧。Welcome to PyMySQL’s documentation
三. 數(shù)據(jù)庫連接池
上文中的方式存在一個問題,單線程情況下可以滿足,程序需要頻繁的創(chuàng)建釋放連接來完成對數(shù)據(jù)庫的操作,那么,我們的程序/腳本在多線程情況下會引發(fā)什么問題呢?此時,我們就需要使用數(shù)據(jù)庫連接池來解決這個問題!
3.1 DBUtils模塊
DBUtils是Python的一個用于實現(xiàn)數(shù)據(jù)庫連接池的模塊。
此連接池有兩種連接模式:
為每個線程創(chuàng)建一個連接,線程即使調(diào)用了close方法,也不會關(guān)閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接才會自動關(guān)閉 創(chuàng)建一批連接到連接池,供所有線程共享使用(推薦使用)
3.2 模式一
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
from?DBUtils.PersistentDB?import?PersistentDB
import?pymysql
POOL?=?PersistentDB(
????creator=pymysql,??#?使用鏈接數(shù)據(jù)庫的模塊
????maxusage=None,??#?一個鏈接最多被重復(fù)使用的次數(shù),None表示無限制
????setsession=[],??#?開始會話前執(zhí)行的命令列表。如:["set datestyle to ...", "set time zone ..."]
????ping=0,
????# ping MySQL服務(wù)端,檢查是否服務(wù)可用。#?如:0?= None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
????closeable=False,
????#?如果為False時, conn.close()?實際上被忽略,供下次使用,再線程關(guān)閉時,才會自動關(guān)閉鏈接。如果為True時, conn.close()則關(guān)閉鏈接,那么再次調(diào)用pool.connection時就會報錯,因為已經(jīng)真的關(guān)閉了連接(pool.steady_connection()可以獲取一個新的鏈接)
????threadlocal=None,??#?本線程獨享值得對象,用于保存鏈接對象,如果鏈接對象被重置
????host='127.0.0.1',
????port=3306,
????user='zff',
????password='zff123',
????database='zff',
????charset='utf8',
)
def?func():
????conn?=?POOL.connection(shareable=False)
????cursor?=?conn.cursor()
????cursor.execute('select?*?from?USER')
????result?=?cursor.fetchall()
????cursor.close()
????conn.close()
????return?result
result?=?func()
print(result)
3.2 模式二
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?time
import?pymysql
import?threading
from?DBUtils.PooledDB?import?PooledDB,?SharedDBConnection
POOL?=?PooledDB(
????creator=pymysql,??#?使用鏈接數(shù)據(jù)庫的模塊
????maxconnections=6,??#?連接池允許的最大連接數(shù),0和None表示不限制連接數(shù)
????mincached=2,??#?初始化時,鏈接池中至少創(chuàng)建的空閑的鏈接,0表示不創(chuàng)建
????maxcached=5,??#?鏈接池中最多閑置的鏈接,0和None不限制
????maxshared=3,
????#?鏈接池中最多共享的鏈接數(shù)量,0和None表示全部共享。PS:?無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設(shè)置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
????blocking=True,??#?連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
????maxusage=None,??#?一個鏈接最多被重復(fù)使用的次數(shù),None表示無限制
????setsession=[],??#?開始會話前執(zhí)行的命令列表。如:["set datestyle to ...", "set time zone ..."]
????ping=0,
????# ping MySQL服務(wù)端,檢查是否服務(wù)可用。#?如:0?= None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
????host='127.0.0.1',
????port=3306,
????user='zff',
????password='zff123',
????database='zff',
????charset='utf8'
)
def?func():
????#?檢測當前正在運行連接數(shù)的是否小于最大鏈接數(shù),如果不小于則:等待或報raise TooManyConnections異常
????#?否則
????#?則優(yōu)先去初始化時創(chuàng)建的鏈接中獲取鏈接 SteadyDBConnection。
????#?然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中并返回。
????#?如果最開始創(chuàng)建的鏈接沒有鏈接,則去創(chuàng)建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中并返回。
????#?一旦關(guān)閉鏈接后,連接就返回到連接池讓后續(xù)線程繼續(xù)使用。
????conn?=?POOL.connection()
????#?print('連接被拿走了',?conn._con)
????#?print('池子里目前有',?POOL._idle_cache,?'\r\n')
????cursor?=?conn.cursor()
????cursor.execute('select?*?from?USER')
????result?=?cursor.fetchall()
????conn.close()
????return?result
result?=?func()
print(result)
?? 由于pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享,因此是線程安全的。如果沒有連接池,使用pymysql來連接數(shù)據(jù)庫時,單線程應(yīng)用完全沒有問題,但如果涉及到多線程應(yīng)用那么就需要加鎖,一旦加鎖那么連接勢必就會排隊等待,當請求比較多時,性能就會降低了。
3.3 加鎖
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
import?threading
from?threading?import?RLock
LOCK?=?RLock()
CONN?=?pymysql.connect(host='127.0.0.1',
???????????????????????port=3306,
???????????????????????user='zff',
???????????????????????password='zff123',
???????????????????????database='zff',
???????????????????????charset='utf8')
def?task(arg):
????with?LOCK:
????????cursor?=?CONN.cursor()
????????cursor.execute('select?*?from?USER?')
????????result?=?cursor.fetchall()
????????cursor.close()
????????print(result)
for?i?in?range(10):
????t?=?threading.Thread(target=task,?args=(i,))
????t.start()
3.4 無鎖(報錯)
#!?/usr/bin/env?python
#?-*-?coding:?utf-8?-*-
#?__author__?=?"shuke"
#?Date:?2018/5/13
import?pymysql
import?threading
CONN?=?pymysql.connect(host='127.0.0.1',
???????????????????????port=3306,
???????????????????????user='zff',
???????????????????????password='zff123',
???????????????????????database='zff',
???????????????????????charset='utf8')
def?task(arg):
????cursor?=?CONN.cursor()
????cursor.execute('select?*?from?USER?')
????#?cursor.execute('select?sleep(10)')
????result?=?cursor.fetchall()
????cursor.close()
????print(result)
for?i?in?range(10):
????t?=?threading.Thread(target=task,?args=(i,))
????t.start()
此時可以在數(shù)據(jù)庫中查看連接情況: show status like 'Threads%';
四. 數(shù)據(jù)庫連接池結(jié)合pymsql使用
#?cat?sql_helper.py
import?pymysql
import?threading
from?DBUtils.PooledDB?import?PooledDB,?SharedDBConnection
POOL?=?PooledDB(
????creator=pymysql,??#?使用鏈接數(shù)據(jù)庫的模塊
????maxconnections=20,??#?連接池允許的最大連接數(shù),0和None表示不限制連接數(shù)
????mincached=2,??#?初始化時,鏈接池中至少創(chuàng)建的空閑的鏈接,0表示不創(chuàng)建
????maxcached=5,??#?鏈接池中最多閑置的鏈接,0和None不限制
????#maxshared=3, ?#?鏈接池中最多共享的鏈接數(shù)量,0和None表示全部共享。PS:?無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設(shè)置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
????blocking=True,??#?連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
????maxusage=None,??#?一個鏈接最多被重復(fù)使用的次數(shù),None表示無限制
????setsession=[],??#?開始會話前執(zhí)行的命令列表。如:["set datestyle to ...", "set time zone ..."]
????ping=0,
????# ping MySQL服務(wù)端,檢查是否服務(wù)可用。#?如:0?= None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
????host='192.168.11.38',
????port=3306,
????user='root',
????passwd='apNXgF6RDitFtDQx',
????db='m2day03db',
????charset='utf8'
)
def?connect():
????#?創(chuàng)建連接
????#?conn?=?pymysql.connect(host='192.168.11.38',?port=3306,?user='root',?passwd='apNXgF6RDitFtDQx',?db='m2day03db')
????conn?=?POOL.connection()
????#?創(chuàng)建游標
????cursor?=?conn.cursor(pymysql.cursors.DictCursor)
????return?conn,cursor
def?close(conn,cursor):
????#?關(guān)閉游標
????cursor.close()
????#?關(guān)閉連接
????conn.close()
def?fetch_one(sql,args):
????conn,cursor?=?connect()
????#?執(zhí)行SQL,并返回收影響行數(shù)
????effect_row?=?cursor.execute(sql,args)
????result?=?cursor.fetchone()
????close(conn,cursor)
????return?result
def?fetch_all(sql,args):
????conn,?cursor?=?connect()
????#?執(zhí)行SQL,并返回收影響行數(shù)
????cursor.execute(sql,args)
????result?=?cursor.fetchall()
????close(conn,?cursor)
????return?result
def?insert(sql,args):
????"""
????創(chuàng)建數(shù)據(jù)
????:param?sql:?含有占位符的SQL
????:return:
????"""
????conn,?cursor?=?connect()
????#?執(zhí)行SQL,并返回收影響行數(shù)
????effect_row?=?cursor.execute(sql,args)
????conn.commit()
????close(conn,?cursor)
def?delete(sql,args):
????"""
????創(chuàng)建數(shù)據(jù)
????:param?sql:?含有占位符的SQL
????:return:
????"""
????conn,?cursor?=?connect()
????#?執(zhí)行SQL,并返回收影響行數(shù)
????effect_row?=?cursor.execute(sql,args)
????conn.commit()
????close(conn,?cursor)
????return?effect_row
def?update(sql,args):
????conn,?cursor?=?connect()
????#?執(zhí)行SQL,并返回收影響行數(shù)
????effect_row?=?cursor.execute(sql,?args)
????conn.commit()
????close(conn,?cursor)
????return?effect_row
PS: 可以利用靜態(tài)方法封裝到一個類中,方便使用
——End——
后臺回復(fù)關(guān)鍵字:1024,獲取一份精心整理的技術(shù)干貨 后臺回復(fù)關(guān)鍵字:進群,帶你進入高手如云的交流群。 推薦閱讀 這是一個能學到技術(shù)的公眾號,歡迎關(guān)注
