mysql基于binlog回滾工具實(shí)例詳解

?

? ??update、delete的條件寫(xiě)錯(cuò)甚至沒(méi)有寫(xiě),導(dǎo)致數(shù)據(jù)操作錯(cuò)誤,需要恢復(fù)被誤操作的行記錄。這種情形,其實(shí)時(shí)有發(fā)生,可以選擇用備份文件+binlog來(lái)恢復(fù)到測(cè)試環(huán)境,然后再做數(shù)據(jù)修復(fù),但是這樣其實(shí)需要耗費(fèi)一定的時(shí)間跟資源。

? ? 其實(shí),如果binlog format為row,binlog文件中是會(huì)詳細(xì)記錄每一個(gè)事務(wù)涉及到操作,并把每一個(gè)事務(wù)影響到行記錄均存儲(chǔ)起來(lái),能否給予binlog 文件來(lái)反解析數(shù)據(jù)庫(kù)的行記錄變動(dòng)情況呢?
? ? 業(yè)界已有不少相關(guān)的腳本及工具,但是隨著mysql版本的更新、binlog記錄內(nèi)容的變化以及需求不一致,大多數(shù)腳本不太適合個(gè)人目前的使用需求,所以開(kāi)始著手編寫(xiě) mysql的 flash back腳本。
?


?
??? 如果轉(zhuǎn)載,請(qǐng)注明博文來(lái)源:?www.cnblogs.com/xinysu/? ?,版權(quán)歸 博客園 蘇家小蘿卜 所有。望各位支持!
?


?
???? 僅在MySQL 5.6/5.7版本測(cè)試,python運(yùn)行環(huán)境需要安裝pymysql模塊。

1 實(shí)現(xiàn)內(nèi)容

? ? 根據(jù)binlog文件,對(duì)某個(gè)些事務(wù)、某段時(shí)間某些表、某段時(shí)間全庫(kù) 做回滾操作,實(shí)現(xiàn)閃回功能。工具處理過(guò)程中,會(huì)把binlog中的事務(wù)修改的行記錄存儲(chǔ)到表格中去,通過(guò) dml_sql 列,可以查看每一個(gè)事務(wù)內(nèi)部的所有行記錄變更情況,通過(guò) undo_sql 查看回滾的SQL內(nèi)容。如下圖,然后再根據(jù)表格內(nèi)容做回滾操作。?
? ? mysql基于binlog回滾工具實(shí)例詳解??
? ? 那么這個(gè)腳本有哪些優(yōu)點(diǎn)呢?
  • 回滾分為2個(gè)命令:第一個(gè)命令 分析binglog并存儲(chǔ)進(jìn)入數(shù)據(jù)庫(kù);第二個(gè)命令 執(zhí)行回滾操作;

  • 回滾的時(shí)候,可以把執(zhí)行腳本跟回滾腳本統(tǒng)一存放到數(shù)據(jù)庫(kù)中,可以查看 更新內(nèi)容以及回滾內(nèi)容;

  • 根據(jù)存儲(chǔ)的分析表格,方便指定事務(wù)或者指定表格來(lái)來(lái)恢復(fù);

  • 詳細(xì)的日志輸出,說(shuō)明分析進(jìn)度跟執(zhí)行進(jìn)度。

? ? 分析binlog的輸出截圖(分析1G的binlog文件)
? ? mysql基于binlog回滾工具實(shí)例詳解
?
? ?回滾數(shù)據(jù)庫(kù)的輸出截圖:
? ?mysql基于binlog回滾工具實(shí)例詳解

2 原理

? ? 前提:實(shí)例啟動(dòng)了binlog并且格式為ROW。
? ??
? ? 使用python對(duì)mysqlbinlog后的log文件進(jìn)行文本分析處理,在整個(gè)處理過(guò)程中,有以下6個(gè)疑難點(diǎn)需要處理:
  1. 判斷一個(gè)事務(wù)的開(kāi)始跟結(jié)束

  2. 同一個(gè)事務(wù)的執(zhí)行順序需要反序執(zhí)行

  3. 解析回滾SQL

  4. 同一個(gè)事務(wù)操作不同表格處理

  5. 轉(zhuǎn)義字符處理,比如換行符、tab符等等

  6. timestamp數(shù)據(jù)類(lèi)型參數(shù)值轉(zhuǎn)換

  7. 負(fù)數(shù)處理

  8. 單個(gè)事務(wù)涉及到行修改SQL操作了 max_allow

  9. 針對(duì)某個(gè)表格做回滾,而不是全庫(kù)回滾

? ??

2.1 事務(wù)的開(kāi)始與結(jié)束

? ? 按照Xid出現(xiàn)的位置來(lái)判斷,從binlog文件的最開(kāi)始開(kāi)始讀取,遇到SQL語(yǔ)句則提取出來(lái),直到遇到Xid,統(tǒng)一把之前提取出來(lái)的SQL匯總為一個(gè)事務(wù),然后繼續(xù)提取SQL語(yǔ)句,直到遇到下一個(gè)Xid,再把這個(gè)事務(wù)的SQL匯總成一個(gè)事務(wù),一直這樣循環(huán),直至文件順序遍歷結(jié)束。
? ? mysql基于binlog回滾工具實(shí)例詳解

?

2.2?事務(wù)內(nèi)部反序處理

? ? 同一個(gè)事務(wù)中,如果有多個(gè)表格多行記錄發(fā)生變更,在回滾的時(shí)候,應(yīng)該反序回滾SQL,那么,如何將提取出來(lái)的SQL反序存儲(chǔ)呢?思路如下:
  • 每行記錄的修改SQL獨(dú)立出來(lái)

  • 將獨(dú)立出來(lái)的SQL反序存儲(chǔ)

? ?假設(shè)正序的事務(wù)SQL語(yǔ)句存儲(chǔ)在變量 dml_sql 中,反序后的可以回滾的SQL存儲(chǔ)在變量 undo_sql中。按順序把行記錄修改的SQL抽取出來(lái) 存儲(chǔ)到變量 record_sql 中去,然后 賦值 undo_sql =record_sql + undo_sql ,再置空 record_sql 變量,如此,便可實(shí)現(xiàn)反序事務(wù)內(nèi)部的執(zhí)行SQL。

2.3?解析回滾SQL

? ? 首先,查看binlog的日志內(nèi)容,發(fā)現(xiàn)行修改的SQL情形如下,提取過(guò)程中需要注意這幾個(gè)問(wèn)題:
  • 行記錄的列名配對(duì),binlog file存儲(chǔ)的列序號(hào),不能直接使用

  • WHERE部分 跟 SET部分 之間并無(wú)關(guān)鍵字或者符號(hào),需要添加 AND 或者 逗號(hào)

  • DELETE SQL 需要反轉(zhuǎn)為 INSERT

  • UPDATE SQL 需要把WHERE 跟 SET的部分進(jìn)行替換

  • INSERT SQL需要反轉(zhuǎn)為 DELETE

  • mysql基于binlog回滾工具實(shí)例詳解

2.4 同事務(wù)不同表格處理

? ? 同一個(gè)事務(wù)中,允許對(duì)不同表格進(jìn)行數(shù)據(jù)修改,這點(diǎn)在列名替換列序號(hào)的時(shí)候,需要留意處理。
? ? 每一個(gè)的行記錄前有一行記錄,含有 ‘Table_map’ 標(biāo)識(shí),會(huì)說(shuō)明這一行當(dāng)行記錄是修改哪個(gè)表格,可以根據(jù)這個(gè)提示,來(lái)替換binlog里邊的列序號(hào)為列名。

2.5 轉(zhuǎn)義字符處理

? ? binlog文件在對(duì)非空格的空白字符處理,采用轉(zhuǎn)義字符字符串存儲(chǔ),比如,在表格insert一列記錄含換行符,而實(shí)際上在binlog文件中,是使用了 替換了 換行操作,所以在回滾數(shù)據(jù)的過(guò)程中,需要對(duì)轉(zhuǎn)義字符做處理。
? ? mysql基于binlog回滾工具實(shí)例詳解

??? mysql基于binlog回滾工具實(shí)例詳解

?

? ??
? ? 這里注意一個(gè)地方,039的轉(zhuǎn)義字符是沒(méi)有在函數(shù)?esc_code 中統(tǒng)一處理,而是單獨(dú)做另外處理。
? ? mysql基于binlog回滾工具實(shí)例詳解

?

? ? 轉(zhuǎn)移字符表相見(jiàn)下圖:
??? mysql基于binlog回滾工具實(shí)例詳解

?

2.6 timestamp數(shù)據(jù)類(lèi)型處理

? ? timestamp實(shí)際在數(shù)據(jù)庫(kù)中的存儲(chǔ)值是 INT類(lèi)型,需要使用 函數(shù) from_unixtime轉(zhuǎn)換。
? ? 建立測(cè)試表格tbtest,只有一列timestamp的列,存儲(chǔ)值后查看binlog的內(nèi)容,具體截圖如下:
? ? mysql基于binlog回滾工具實(shí)例詳解

?

? ? 在處理行記錄的時(shí)候,要對(duì)timestamp的value做處理,添加from_unixtime函數(shù)轉(zhuǎn)換。

2.7 負(fù)數(shù)值處理

? ? 這個(gè)一開(kāi)始寫(xiě)代碼的時(shí)候,并沒(méi)有考慮到。大量測(cè)試的過(guò)程中發(fā)現(xiàn),所有整型的數(shù)據(jù)類(lèi)型,在存儲(chǔ)負(fù)數(shù)的時(shí)候,都會(huì)存入一個(gè)最大范圍值。binlog在處理這塊的機(jī)制有些不是很了解。測(cè)試如下:
? ? mysql基于binlog回滾工具實(shí)例詳解

?

? ?所以當(dāng)遇到INT的各種數(shù)據(jù)類(lèi)型并且VALUE為負(fù)數(shù)的時(shí)候,需要把 這個(gè)范圍值去除,才能執(zhí)行執(zhí)行undo_sql。

2.8 單個(gè)事務(wù)行記錄總SQL超過(guò)max_allowed_package處理

? ? 分析binlog后存儲(chǔ)兩種sql類(lèi)型,一種是行記錄的修改SQL,即 dml_sql;一種是 行記錄的回滾sql,即 undo_sql。從代碼可知,存儲(chǔ)這兩個(gè)sql的列是longtext,最大可存儲(chǔ)4G的內(nèi)容。但是 MySQL中單個(gè)會(huì)話的包大小是有限制的,限制的參數(shù)為?max_allowed_packet,默認(rèn)大小為 4Mb,最大為1G,所以這個(gè)腳本使用前,請(qǐng)手動(dòng)設(shè)置 存儲(chǔ)binlog file的數(shù)據(jù)庫(kù)實(shí)例以及線上的數(shù)據(jù)庫(kù)實(shí)例這個(gè)參數(shù):
set global max_allowed_packet =?1073741824; #記得后續(xù)修改回來(lái)
? ??
? ? 萬(wàn)一操作了呢?那么回滾只能分段來(lái)回滾,先回滾到 這個(gè)大事務(wù),然后單獨(dú)執(zhí)行這個(gè)大事務(wù),緊接著繼續(xù)回滾,這部分不能使用pymysql嗲用source 文件執(zhí)行,所以只能手動(dòng)做這個(gè)操作。 求高能人士修改這個(gè)邏輯代碼!!!

2.9 針對(duì)性回滾

? ? 假設(shè)誤操作的沒(méi)有明確的時(shí)間點(diǎn),只有一個(gè)區(qū)間,而這個(gè)區(qū)間還有其他的表格操作,那么這個(gè)時(shí)候,需要在分析binlog文件的時(shí)候,添加–database選項(xiàng),先帥選到同一個(gè)數(shù)據(jù)庫(kù)中binlog文件中。
? ? 這里的處理是將這段區(qū)間的dml_sql跟undo_sql都存儲(chǔ)到數(shù)據(jù)庫(kù)表格中,然后再刪除不需要回滾的事務(wù),剩余需要回滾的事務(wù)。再執(zhí)行回滾操作。

3?使用說(shuō)明

3.1 參數(shù)說(shuō)明

? ? 這個(gè)腳本的參數(shù)稍微多些,可以 –help 查看具體說(shuō)明。
? ? mysql基于binlog回滾工具實(shí)例詳解

?

? ? 本人喜歡用各種顏色來(lái)分類(lèi)參數(shù)(blingbling五顏六色,看著多有趣多精神),所以,按顏色來(lái)說(shuō)明這些參數(shù)。
  • 黃色區(qū)域:這6個(gè)參數(shù),提供的是 分析并存儲(chǔ)binlog file的相關(guān)值,說(shuō)明存儲(chǔ)分析結(jié)果的數(shù)據(jù)庫(kù)的鏈接方式、binlog文件的位置以及存儲(chǔ)結(jié)果的表格名字;

  • 藍(lán)色區(qū)域:這4個(gè)參數(shù),提供 與線上數(shù)據(jù)庫(kù)表結(jié)構(gòu)一致的DB實(shí)例連接方式,僅需跟線上一模一樣的表結(jié)構(gòu),不一定需要是主從庫(kù);

  • 綠色區(qū)域:最最重要的選項(xiàng) -a,0代表僅分析binlog文件,1代表僅執(zhí)行回滾操作,必須先執(zhí)行0才可以執(zhí)行1;

  • 紫色區(qū)域:舉例說(shuō)明。

3.2 應(yīng)用場(chǎng)景說(shuō)明

  • 全庫(kù)回滾某段時(shí)間

    • 需要回滾某個(gè)時(shí)間段的所有SQL操作,回滾到某一個(gè)時(shí)間點(diǎn)

    • 這種情況下呢,大多數(shù)是使用備份文件+binlog解決

    • 但是這個(gè)腳本也可以滿足,但請(qǐng)勿直接在線上操作,先 -a=0,看下分析結(jié)果,是否符合,符合的話,停掉某個(gè)從庫(kù),再在從庫(kù)上執(zhí)行,最后開(kāi)發(fā)業(yè)務(wù)接入檢查是否恢復(fù)到指定時(shí)間點(diǎn),數(shù)據(jù)是否正常。

  • 某段時(shí)間某些表格回滾某些操作

    • 比如,開(kāi)發(fā)提交了一個(gè)批量更新腳本,各個(gè)測(cè)試層面驗(yàn)證沒(méi)有問(wèn)題,提交線上執(zhí)行,但是執(zhí)行后,發(fā)現(xiàn)有個(gè)業(yè)務(wù)漏測(cè)試,導(dǎo)致某些字段更新后影響到其他業(yè)務(wù),現(xiàn)在需要緊急 把被批量更新的表格回滾到原先的行記錄

    • 這個(gè)并不能單純從技術(shù)角度來(lái)處理,要綜合考慮

      • mysql基于binlog回滾工具實(shí)例詳解

        ?

      • 這種情況下,如何回顧tab A表格的修改操作呢?

      • 個(gè)人覺(jué)得,這種方式比較行得通,dump tabA表格的數(shù)據(jù)到測(cè)試環(huán)境,然后再分析 binlog file 從11點(diǎn)-12點(diǎn)的undo sql,接著在測(cè)試環(huán)境回滾該表格到11點(diǎn)這個(gè)時(shí)刻,緊接著,由開(kāi)發(fā)跟業(yè)務(wù)對(duì)比測(cè)試環(huán)境11點(diǎn)的數(shù)據(jù)跟線上現(xiàn)有的數(shù)據(jù)中,看下是哪些行哪些列需要在線上進(jìn)行回滾,哪些是不需要的,然后開(kāi)發(fā)再提交SQL腳本,再在線上執(zhí)行。其實(shí),這里邊,DBA僅提供一個(gè)角色,就是把 表格 tab A 在一個(gè)新的環(huán)境上,回滾到某個(gè)時(shí)間點(diǎn),但是不提供直接線上回滾SQL的處理。

  • 回滾某個(gè)/些SQL

    • 這種情況比較常見(jiàn),某個(gè)update某個(gè)delete缺少where條件或者where條件執(zhí)行錯(cuò)誤

    • 這種情況下,找到對(duì)應(yīng)的事務(wù),執(zhí)行回滾即可,回滾流程請(qǐng)參考上面一說(shuō),對(duì)的,我就是這么膽小怕事?mysql基于binlog回滾工具實(shí)例詳解

      ?

3.3 測(cè)試案例

3.3.1?全庫(kù)回滾某段時(shí)間

假設(shè)需要回滾9點(diǎn)10分到9點(diǎn)15分間數(shù)據(jù)庫(kù)的所有操作:
  • 準(zhǔn)備測(cè)試環(huán)境實(shí)例存儲(chǔ)分析后的數(shù)據(jù)?

  • 測(cè)試環(huán)境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python腳本分析文件,action=0

  • 線上測(cè)試環(huán)境修改set global max_allowed_packet = 1073741824

  • 回滾數(shù)據(jù),action=1

  • 線上測(cè)試環(huán)境修改set global max_allowed_packet = 4194304

 1 --測(cè)試環(huán)境(請(qǐng)安裝pymysql):IP: 192.168.9.242,PORT:3310 ,數(shù)據(jù)庫(kù):flashback,表格:tbevent 2 --具有線上表結(jié)構(gòu)的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent   27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions   42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空間忽略不截圖--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2?某段時(shí)間某些表格回滾某些操作

  • 準(zhǔn)備測(cè)試環(huán)境實(shí)例存儲(chǔ)分析后的數(shù)據(jù)?

  • 測(cè)試環(huán)境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python腳本分析文件,action=0

  • 分析帥選需要的事務(wù),rename表格

  • dump 對(duì)應(yīng)的表格到測(cè)試環(huán)境

  • 回滾數(shù)據(jù),action=1

  • 提交給開(kāi)發(fā)業(yè)務(wù)對(duì)比數(shù)據(jù)

3.3.3?回滾某個(gè)/些SQL

  • 準(zhǔn)備測(cè)試環(huán)境實(shí)例存儲(chǔ)分析后的數(shù)據(jù)?

  • 測(cè)試環(huán)境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python腳本分析文件,action=0

  • 分析帥選需要的事務(wù),rename表格

  • dump 對(duì)應(yīng)的表格到測(cè)試環(huán)境

  • 回滾數(shù)據(jù),action=1

  • 提交給開(kāi)發(fā)業(yè)務(wù)對(duì)比數(shù)據(jù)

4 python腳本

???? 腳本會(huì)不定期修復(fù)bug,若是感興趣,可以往github下載: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage=''' usage: python [script's path] [option] 21 ALL options need to assign: 22  23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter    31         is test.tbevent 32  33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37  38 -a    : action,    39         0 just analyse the binlog file ,and store sql in table;    40         1 after execute self.dotype=0, execute the undo_sql in the table 41      42 --help: help document 43  44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent    47                        -oh=192.168.9.244 -oP=3310 -u=root -op=***    48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent    52                        -oh=192.168.9.244 -oP=3310 -u=root -op=***    53                        -a=1 54                            55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 從輸入?yún)?shù)獲取連接數(shù)據(jù)庫(kù)的相關(guān)參數(shù)值 74  75         # 連接數(shù)據(jù)庫(kù),該數(shù)據(jù)庫(kù)是用來(lái)存儲(chǔ)binlog文件分析后的內(nèi)容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 連接數(shù)據(jù)庫(kù),該數(shù)據(jù)庫(kù)的表結(jié)構(gòu)必須跟binlogfile基于對(duì)數(shù)據(jù)庫(kù)表結(jié)構(gòu)一致 82         # 該數(shù)據(jù)庫(kù)用于提供 binlog file 文件中涉及到表結(jié)構(gòu)分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('MySQL connection is ok') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于獲取執(zhí)行命令的輸入?yún)?shù)104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #創(chuàng)建表格,用于存儲(chǔ)分析后的BINLOG內(nèi)容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #獲取表格的列順序?qū)?yīng)的列名,并處理where set的時(shí)候,列與列之間的連接字符串是逗號(hào)還是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一個(gè)事務(wù)記錄一行,若binlog file中的行記錄包含 Table_map,則為事務(wù)的開(kāi)始記錄195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同個(gè)事務(wù)內(nèi)部的行記錄修改SQL,反序存儲(chǔ)239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #處理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #單獨(dú)處理 轉(zhuǎn)移字符: '255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + ' ')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + ' ')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')287                 logging.info('analyzing...')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #處理timestamp類(lèi)型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #處理負(fù)數(shù)存儲(chǔ)方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+' '311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip(' ') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              'x07':'','x08':'','x0c':'','x0a':' ','x0d':' ','x09':'	','x0b':'','x5c':'',334             #'x27':''',335             'x22':'"','x3f':'?','x00':'

日韩av无码久久精品免费|
亚洲精品高清国产一久久|
亚洲精品视频久久久|
精品久久久无码人妻中文字幕|
日韩精品久久久久久久电影蜜臀|
久久最近最新中文字幕大全|
青青久久精品国产免费看|
人妻少妇久久中文字幕一区二区|
国产精品九九九久久九九|
久久亚洲国产成人影院网站|
久久99久久99精品免视看动漫|
久久婷婷综合中文字幕|
99精品国产综合久久久久五月天
|
狠狠精品久久久无码中文字幕|
久久久SS麻豆欧美国产日韩|
精品久久久久久综合日本|
久久久久久久女国产乱让韩|
亚洲欧美日韩精品久久|
亚洲AV无码久久寂寞少妇|
久久亚洲视频|
青青草原综合久久|
久久亚洲AV成人无码电影|
四虎国产精品免费久久|
26uuu久久五月天|
久久久精品人妻一区二区三区四|
久久久久久亚洲精品影院|
久久精品国产亚洲精品|
一本大道加勒比久久综合|
av国内精品久久久久影院|
久久精品国产亚洲av高清漫画|
精品久久久中文字幕人妻
|
欧美久久一区二区三区|
91精品国产91久久|
久久综合久久综合久久|
成人久久精品一区二区三区|
久久精品国产亚洲AV香蕉|
午夜人妻久久久久久久久|
国产69精品久久久久9999APGF|
2019久久久高清456|
久久久久国产精品嫩草影院|
性欧美丰满熟妇XXXX性久久久|