如何實現python3實現并發訪問水平切分表

本篇文章給大家帶來的內容是關于如何實現python3實現并發訪問水平切分表,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。

場景說明

假設有一個mysql表被水平切分,分散到多個host中,每個host擁有n個切分表。
如果需要并發去訪問這些表,快速得到查詢結果, 應該怎么做呢?
這里提供一種方案,利用python3的asyncio異步io庫及aiomysql異步庫去實現這個需求。

代碼演示

import?logging import?random import?asynciofrom?aiomysql? import?create_pool #?假設mysql表分散在8個host,?每個host有16張子表 TBLES?=?{????"192.168.1.01":?"table_000-015",? #?000-015表示該ip下的表明從table_000一直連續到table_015 ????"192.168.1.02":?"table_016-031",?? ??????"192.168.1.03":?"table_032-047",??? ???????"192.168.1.04":?"table_048-063",?? ?????????"192.168.1.05":?"table_064-079",??? ??????????"192.168.1.06":?"table_080-095",?? ????????????"192.168.1.07":?"table_096-0111",?? ??????????????"192.168.1.08":?"table_112-0127", } USER?=?"xxx"PASSWD?=?"xxxx"#?wrapper函數,用于捕捉異常def?query_wrapper(func): ????async?def?wrapper(*args,?**kwargs): ????????try: ????????????await?func(*args,?**kwargs)????????except?Exception?as?e: ????????????print(e)????return?wrapper ????????????#?實際的sql訪問處理函數,通過aiomysql實現異步非阻塞請求@ ????????????query_wrapperasync?def?query_do_something(ip,?db,?table): ????async?with?create_pool(host=ip,?db=db,?user=USER,?password=PASSWD)?as?pool: ????????async?with?pool.get()?as?conn: ????????????async?with?conn.cursor()?as?cur: ????????????????sql?=?("select?xxx?from?{}?where?xxxx") ????????????????await?cur.execute(sql.format(table)) ????????????????res?=?await?cur.fetchall()???????? ??#?then?do?something...#?生成sql訪問隊列,?隊列的每個元素包含要對某個表進行訪問的函數及參數def?gen_tasks(): ????tasks?=?[]????for?ip,?tbls?in?TBLES.items(): ????????cols?=?re.split('_|-',?tbls) ????????tblpre?=?"_".join(cols[:-2]) ????????min_num?=?int(cols[-2]) ????????max_num?=?int(cols[-1])????? ???????????for?num?in?range(min_num,?max_num+1): ????????????tasks.append( ???????????????(query_do_something,?ip,?'your_dbname',?'{}_{}'.format(tblpre,?num)) ????????????)  ????random.shuffle(tasks)??? ?????return?tasks#?按批量運行sql訪問請求隊列def?run_tasks(tasks,?batch_len): ????try:???? ????????for?idx?in?range(0,?len(tasks),?batch_len): ????????????batch_tasks?=?tasks[idx:idx+batch_len] ????????????logging.info("current?batch,?start_idx:%s?len:%s"?%?(idx,?len(batch_tasks)))? ???????????????????????for?i?in?range(0,?len(batch_tasks)): ????????????????l?=?batch_tasks[i] ????????????????batch_tasks[i]?=?asyncio.ensure_future( ????????????????????l[0](*l[1:]) ????????????????) ????????????loop.run_until_complete(asyncio.gather(*batch_tasks))?? ??????????????except?Exception?as?e: ????????logging.warn(e)#?main方法,?通過asyncio實現函數異步調用def?main(): ????loop?=?asyncio.get_event_loop()  ????tasks?=?gen_tasks() ????batch_len?=?len(TBLES.keys())?*?5???#?all?up?to?you ????run_tasks(tasks,?batch_len)  ????loop.close()

? 版權聲明
THE END
喜歡就支持一下吧
點贊6 分享