您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python怎么封裝數據庫連接池”,在日常操作中,相信很多人在Python怎么封裝數據庫連接池問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python怎么封裝數據庫連接池”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
前言:
線程安全問題:當2個線程同時用到線程池時,會同時創建2個線程池。如果多個線程,錯開用到線程池,就只會創建一個線程池,會共用一個線程池。我用的注解方式的單例模式,感覺就是這個注解的單例方式,解決了多線程問題,但是沒解決線程安全問題,需要優化這個單例模式。
主要通過 PooledDB 模塊實現。
db_config.py
# -*- coding: UTF-8 -*- import pymysql # 數據庫信息 DB_TEST_HOST = "127.0.0.1" DB_TEST_PORT = 3308 DB_TEST_DBNAME = "bt" DB_TEST_USER = "root" DB_TEST_PASSWORD = "123456" # 數據庫連接編碼 DB_CHARSET = "utf8" # mincached : 啟動時開啟的閑置連接數量(缺省值 0 開始時不創建連接) DB_MIN_CACHED = 5 # maxcached : 連接池中允許的閑置的最多連接數量(缺省值 0 代表不閑置連接池大小) DB_MAX_CACHED = 0 # maxshared : 共享連接數允許的最大數量(缺省值 0 代表所有連接都是專用的)如果達到了最大數量,被請求為共享的連接將會被共享使用 DB_MAX_SHARED = 5 # maxconnecyions : 創建連接池的最大數量(缺省值 0 代表不限制) DB_MAX_CONNECYIONS = 300 # blocking : 設置在連接池達到最大數量時的行為(缺省值 0 或 False 代表返回一個錯誤<toMany......> 其他代表阻塞直到連接數減少,連接被分配) DB_BLOCKING = True # maxusage : 單個連接的最大允許復用次數(缺省值 0 或 False 代表不限制的復用).當達到最大數時,連接會自動重新連接(關閉和重新打開) DB_MAX_USAGE = 0 # setsession : 一個可選的SQL命令列表用于準備每個會話,如["set datestyle to german", ...] DB_SET_SESSION = None # creator : 使用連接數據庫的模塊 DB_CREATOR = pymysql
設置連接池最大最小為5個。則啟動連接池時,就會建立5個連接。
singleton.py
#單例模式函數,用來修飾類 def singleton(cls,*args,**kw): instances = {} def _singleton(): if cls not in instances: instances[cls] = cls(*args,**kw) return instances[cls] return _singleton
db_dbutils_init.py
from dbutils.pooled_db import PooledDB import db_config as config # import random from singleton import singleton """ @功能:創建數據庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創建數據庫連接conn和游標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創建數據庫連接池 def __getconn(self): if self.__pool is None: # i = random.randint(1, 100) # print("創建線程池的數量"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實例化 @singleton def get_my_connection(): return MyConnectionPool()
mysqlhelper.py
import time from db_dbutils_init import get_my_connection """執行語句查詢有結果返回結果沒有返回0;增/刪/改返回變更數據條數,沒有返回0""" class MySqLHelper(object): def __init__(self): self.db = get_my_connection() # 從數據池中獲取連接 # # def __new__(cls, *args, **kwargs): # if not hasattr(cls, 'inst'): # 單例 # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs) # return cls.inst # 封裝執行命令 def execute(self, sql, param=None, autoclose=False): """ 【主要判斷是否有參數和是否執行完就釋放連接】 :param sql: 字符串類型,sql語句 :param param: sql語句中要替換的參數"select %s from tab where id=%s" 其中的%s就是參數 :param autoclose: 是否關閉連接 :return: 返回連接conn和游標cursor """ cursor, conn = self.db.getconn() # 從連接池獲取連接 count = 0 try: # count : 為改變的數據條數 if param: count = cursor.execute(sql, param) else: count = cursor.execute(sql) conn.commit() if autoclose: self.close(cursor, conn) except Exception as e: pass return cursor, conn, count # 釋放連接 def close(self, cursor, conn): """釋放連接歸還給連接池""" cursor.close() conn.close() # 查詢所有 def selectall(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchall() return res except Exception as e: print(e) self.close(cursor, conn) return count # 查詢單條 def selectone(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchone() self.close(cursor, conn) return res except Exception as e: print("error_msg:", e.args) self.close(cursor, conn) return count # 增加 def insertone(self, sql, param): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) # _id = cursor.lastrowid() # 獲取當前插入數據的主鍵id,該id應該為自動生成為好 conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 增加多行 def insertmany(self, sql, param): """ :param sql: :param param: 必須是元組或列表[(),()]或((),()) :return: """ cursor, conn, count = self.db.getconn() try: cursor.executemany(sql, param) conn.commit() return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 刪除 def delete(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 更新 def update(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # if __name__ == '__main__': # db = MySqLHelper() # sql = "SELECT SLEEP(10)" # db.execute(sql) # time.sleep(20) # TODO 查詢單條 # sql1 = 'select * from userinfo where name=%s' # args = 'python' # ret = db.selectone(sql=sql1, param=args) # print(ret) # (None, b'python', b'123456', b'0') # TODO 增加單條 # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)' # ret = db.insertone(sql2, ('1', '2', '1', '2', '2')) # print(ret) # TODO 增加多條 # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)' # li = li = [ # ('分省', '123'), # ('到達','456') # ] # ret = db.insertmany(sql3,li) # print(ret) # TODO 刪除 # sql4 = 'delete from userinfo WHERE name=%s' # args = 'xxxx' # ret = db.delete(sql4, args) # print(ret) # TODO 更新 # sql5 = r'update userinfo set password=%s WHERE name LIKE %s' # args = ('993333993', '%old%') # ret = db.update(sql5, args) # print(ret)
修改 db_dbutils_init.py 文件,在創建連接池def __getconn(self):方法下,加一個打印隨機數,方便將來我們定位是否時單例的線程池。
修改后的db_dbutils_init.py 文件:
from dbutils.pooled_db import PooledDB import db_config as config import random from singleton import singleton """ @功能:創建數據庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創建數據庫連接conn和游標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創建數據庫連接池 def __getconn(self): if self.__pool is None: i = random.randint(1, 100) print("線程池的隨機數"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實例化 @singleton def get_my_connection(): return MyConnectionPool()
開始測試:
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db = MySqLHelper() db.execute(sql) db.execute(sql1) time.sleep(20)
在數據庫中,使用 show processlist;
show processlist;
當執行第一個sql時。數據庫連接顯示。
當執行第二個sql時。數據庫連接顯示:
當執行完sql,程序sleep時。數據庫連接顯示:
程序打印結果:
線程池的隨機數43
由以上可以得出結論:
線程池啟動后,生成了5個連接。執行第一個sql時,使用了1個連接。執行完第一個sql后,使用了另外1個連接。 這是一個線性的,線程池中一共5個連接,但是每次執行,只使用了其中一個。
有個疑問,連接池如果不支持并發是不是就毫無意義?
如上,雖然開了線程池5個連接,但是每次執行sql,只用到了一個連接。那為何不設置線程池大小為1呢?設置線程池大小的意義何在呢?(如果在非并發的場景下,是不是設置大小無意義?)
相比于不用線程池的優點:
如果不用線程池,則每次執行一個sql都要創建、斷開連接。 像我們這樣使用連接池,不用反復創建、斷開連接,拿現成的連接直接用就好了。
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() db1 = MySqLHelper() sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db.execute(sql) db1.execute(sql1) time.sleep(20)
第一個實例db,執行sql。線程池啟動了5個連接
第二個實例db1,執行sql:
程序睡眠時,一共5個線程池:
打印結果:
結果證明:
雖然我們依次創建了2個實例,但是(1)創建線程池的打印結果,只打印1次,且從始至終,線程池一共只啟動了5個連接,且連接的id沒有發生改變,說明一直是這5個連接。
證明,我們雖然創建了2個實例,但是這2個實例其實是一個實例。(單例模式是生效的)
import threading from mysqlhelper import MySqLHelper import time def sl1(): time.sleep(2) db = MySqLHelper() sql = "SELECT SLEEP(6)" db.execute(sql) def sl2(): time.sleep(4) db = MySqLHelper() sql = "SELECT SLEEP(15)" db.execute(sql) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=sl1) threads.append(t1) t2 = threading.Thread(target=sl2) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
2個線程間隔了2秒。
觀察數據庫的連接數量:
打印結果:
在并發執行2個sql時,共用了這5個連接,且打印結果只打印了一次,說明雖然并發創建了2次實例,但真正只創建了一個連接池。
import threading from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() sql = "SELECT SLEEP(6)" sql1 = "SELECT SLEEP(15)" threads = [] t1 = threading.Thread(target=db.execute, args=(sql,)) threads.append(t1) t2 = threading.Thread(target=db.execute, args=(sql1,)) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
觀察數據庫連接 :
打印結果:
結果表明:
終端打印了2次,數據庫建立了10個連接,說明創建了2個線程池。這樣的單例模式,存在線程安全問題。
到此,關于“Python怎么封裝數據庫連接池”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。