分享用MongoDB中oplog機(jī)制實(shí)現(xiàn)數(shù)據(jù)監(jiān)控實(shí)例

mongodb 的replication是通過一個日志來存儲寫操作的,這個日志就叫做oplog,而下面這篇文章主要給大家介紹了利用mongodb中oplog機(jī)制實(shí)現(xiàn)準(zhǔn)實(shí)時數(shù)據(jù)的操作監(jiān)控的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。

前言

最近有一個需求是要實(shí)時獲取到新插入到MongoDB的數(shù)據(jù),而插入程序本身已經(jīng)有一套處理邏輯,所以不方便直接在插入程序里寫相關(guān)程序,傳統(tǒng)的數(shù)據(jù)庫大多自帶這種觸發(fā)器機(jī)制,但是Mongo沒有相關(guān)的函數(shù)可以用(也可能我了解的太少了,求糾正),當(dāng)然還有一點(diǎn)是需要python實(shí)現(xiàn),于是收集整理了一個相應(yīng)的實(shí)現(xiàn)方法。

一、引子

首先可以想到,這種需求其實(shí)很像數(shù)據(jù)庫的主從備份機(jī)制,從數(shù)據(jù)庫之所以能夠同步主庫是因?yàn)榇嬖谀承┲笜?biāo)來做控制,我們知道MongoDB雖然沒有現(xiàn)成觸發(fā)器,但是它能夠?qū)崿F(xiàn)主從備份,所以我們就從它的主從備份機(jī)制入手。

二、OPLOG

首先,需要以master模式來打開mongod守護(hù),命令行使用–master,或者觸發(fā)器增加master鍵為true。

此時,我們可以在Mongo的系統(tǒng)庫local里見到新增的collection——oplog,此時oplog.$main里就會存儲進(jìn)oplog信息,如果此時還有充當(dāng)從數(shù)據(jù)庫的Mongo存在,就會還有一些slaves的信息,由于我們這里并不是主從同步,所以不存在這些集合。

分享用MongoDB中oplog機(jī)制實(shí)現(xiàn)數(shù)據(jù)監(jiān)控實(shí)例

再來看看oplog結(jié)構(gòu):

"ts"?:?Timestamp(6417682881216249,?1),?時間戳  "h"?:?NumberLong(0),?長度  "v"?:?2,?  "op"?:?"n",?操作類型  "ns"?:?"",?操作的庫和集合  "o2"?:?"_id"?update條件  "o"?:?{}?操作值,即document

這里需要知道op的幾種屬性:

insert,'i'  update,?'u'  remove(delete),?'d'  cmd,?'c'  noop,?'n'?空操作

從上面的信息可以看出,我們只要不斷讀取到ts來做對比,然后根據(jù)op即可判斷當(dāng)前出現(xiàn)的是什么操作,相當(dāng)于使用程序?qū)崿F(xiàn)了一個從數(shù)據(jù)庫的接收端。

三、CODE

在Github上找到了別人的實(shí)現(xiàn)方式,不過它的函數(shù)庫太老舊,所以在他的基礎(chǔ)上進(jìn)行修改。

Github地址:github.com/RedBeard0531/mongo-oplog-watcher

mongo_oplog_watcher.py如下:

#!/usr/bin/python  import?pymongo  import?re  import?time  from?pprint?import?pprint?#?pretty?printer  from?pymongo.errors?import?AutoReconnect    class?OplogWatcher(object):  ??def?init(self,?db=None,?collection=None,?poll_time=1.0,?connection=None,?start_now=True):  ????if?collection?is?not?None:  ??????if?db?is?None:  ????????raise?ValueError('must?specify?db?if?you?specify?a?collection')  ??????self._ns_filter?=?db?+?'.'?+?collection  ????elif?db?is?not?None:  ??????self._ns_filter?=?re.compile(r'^%s.'?%?db)  ????else:  ??????self._ns_filter?=?None    ????self.poll_time?=?poll_time  ????self.connection?=?connection?or?pymongo.Connection()    ????if?start_now:  ??????self.start()    ??@staticmethod  ??def?get_id(op):  ????id?=?None  ????o2?=?op.get('o2')  ????if?o2?is?not?None:  ??????id?=?o2.get('_id')    ????if?id?is?None:  ??????id?=?op['o'].get('_id')    ????return?id    ??def?start(self):  ????oplog?=?self.connection.local['oplog.$main']  ????ts?=?oplog.find().sort('$natural',?-1)[0]['ts']  ????while?True:  ??????if?self._ns_filter?is?None:?  ????????filter?=?{}  ??????else:  ????????filter?=?{'ns':?self._ns_filter}  ??????filter['ts']?=?{'$gt':?ts}  ??????try:  ????????cursor?=?oplog.find(filter,?tailable=True)  ????????while?True:  ??????????for?op?in?cursor:  ????????????ts?=?op['ts']  ????????????id?=?self.get_id(op)  ????????????self.all_with_noop(ns=op['ns'],?ts=ts,?op=op['op'],?id=id,?raw=op)  ??????????time.sleep(self.poll_time)  ??????????if?not?cursor.alive:  ????????????break  ??????except?AutoReconnect:  ????????time.sleep(self.poll_time)    ??def?all_with_noop(self,?ns,?ts,?op,?id,?raw):  ????if?op?==?'n':  ??????self.noop(ts=ts)  ????else:  ??????self.all(ns=ns,?ts=ts,?op=op,?id=id,?raw=raw)    ??def?all(self,?ns,?ts,?op,?id,?raw):  ????if?op?==?'i':  ??????self.insert(ns=ns,?ts=ts,?id=id,?obj=raw['o'],?raw=raw)  ????elif?op?==?'u':  ??????self.update(ns=ns,?ts=ts,?id=id,?mod=raw['o'],?raw=raw)  ????elif?op?==?'d':  ??????self.delete(ns=ns,?ts=ts,?id=id,?raw=raw)  ????elif?op?==?'c':  ??????self.command(ns=ns,?ts=ts,?cmd=raw['o'],?raw=raw)  ????elif?op?==?'db':  ??????self.db_declare(ns=ns,?ts=ts,?raw=raw)    ??def?noop(self,?ts):  ????pass    ??def?insert(self,?ns,?ts,?id,?obj,?raw,?**kw):  ????pass    ??def?update(self,?ns,?ts,?id,?mod,?raw,?**kw):  ????pass    ??def?delete(self,?ns,?ts,?id,?raw,?**kw):  ????pass    ??def?command(self,?ns,?ts,?cmd,?raw,?**kw):  ????pass    ??def?db_declare(self,?ns,?ts,?**kw):  ????pass    class?OplogPrinter(OplogWatcher):  ??def?all(self,?**kw):  ????pprint?(kw)  ????print?#newline    if?name?==?'main':  ??OplogPrinter()

首先是實(shí)現(xiàn)一個數(shù)據(jù)庫的初始化,設(shè)定一個延遲時間(準(zhǔn)實(shí)時):

self.poll_time?=?poll_time  self.connection?=?connection?or?pymongo.MongoClient()

主要的函數(shù)是start() ,實(shí)現(xiàn)一個時間的比對并進(jìn)行相應(yīng)字段的處理:

def?start(self):  ?oplog?=?self.connection.local['oplog.$main']  ?#讀取之前提到的庫  ?ts?=?oplog.find().sort('$natural',?-1)[0]['ts']  ?#獲取一個時間邊際  ?while?True:  ?if?self._ns_filter?is?None:  ??filter?=?{}  ?else:  ??filter?=?{'ns':?self._ns_filter}  ?filter['ts']?=?{'$gt':?ts}  ?try:  ??cursor?=?oplog.find(filter)  ??#對此時間之后的進(jìn)行處理  ??while?True:  ??for?op?in?cursor:  ???ts?=?op['ts']  ???id?=?self.get_id(op)  ???self.all_with_noop(ns=op['ns'],?ts=ts,?op=op['op'],?id=id,?raw=op)  ???#可以指定處理插入監(jiān)控,更新監(jiān)控或者刪除監(jiān)控等  ??time.sleep(self.poll_time)  ??if?not?cursor.alive:  ???break  ?except?AutoReconnect:  ??time.sleep(self.poll_time)

循環(huán)這個start函數(shù),在all_with_noop這里就可以編寫相應(yīng)的監(jiān)控處理邏輯。

這樣就可以實(shí)現(xiàn)一個簡易的準(zhǔn)實(shí)時Mongo觸發(fā)器監(jiān)控器,下一步就可以配合其他操作來對新入庫的程序進(jìn)行相應(yīng)處理。

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊13 分享