mongodb 的replication是通過一個日志來存儲寫操作的,這個日志就叫做oplog,而下面這篇文章主要給大家介紹了利用mongodb中oplog機(jī)制實(shí)現(xiàn)準(zhǔn)實(shí)時數(shù)據(jù)的操作監(jiān)控的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。
前言
最近有一個需求是要實(shí)時獲取到新插入到MongoDB的數(shù)據(jù),而插入程序本身已經(jīng)有一套處理邏輯,所以不方便直接在插入程序里寫相關(guān)程序,傳統(tǒng)的數(shù)據(jù)庫大多自帶這種觸發(fā)器機(jī)制,但是Mongo沒有相關(guān)的函數(shù)可以用(也可能我了解的太少了,求糾正),當(dāng)然還有一點(diǎn)是需要python實(shí)現(xiàn),于是收集整理了一個相應(yīng)的實(shí)現(xiàn)方法。
一、引子
首先可以想到,這種需求其實(shí)很像數(shù)據(jù)庫的主從備份機(jī)制,從數(shù)據(jù)庫之所以能夠同步主庫是因?yàn)榇嬖谀承┲笜?biāo)來做控制,我們知道MongoDB雖然沒有現(xiàn)成觸發(fā)器,但是它能夠?qū)崿F(xiàn)主從備份,所以我們就從它的主從備份機(jī)制入手。
二、OPLOG
首先,需要以master模式來打開mongod守護(hù),命令行使用–master,或者觸發(fā)器增加master鍵為true。
此時,我們可以在Mongo的系統(tǒng)庫local里見到新增的collection——oplog,此時oplog.$main里就會存儲進(jìn)oplog信息,如果此時還有充當(dāng)從數(shù)據(jù)庫的Mongo存在,就會還有一些slaves的信息,由于我們這里并不是主從同步,所以不存在這些集合。
再來看看oplog結(jié)構(gòu):
"ts"?:?Timestamp(6417682881216249,?1),?時間戳 "h"?:?NumberLong(0),?長度 "v"?:?2,? "op"?:?"n",?操作類型 "ns"?:?"",?操作的庫和集合 "o2"?:?"_id"?update條件 "o"?:?{}?操作值,即document
這里需要知道op的幾種屬性:
insert,'i' update,?'u' remove(delete),?'d' cmd,?'c' noop,?'n'?空操作
從上面的信息可以看出,我們只要不斷讀取到ts來做對比,然后根據(jù)op即可判斷當(dāng)前出現(xiàn)的是什么操作,相當(dāng)于使用程序?qū)崿F(xiàn)了一個從數(shù)據(jù)庫的接收端。
三、CODE
在Github上找到了別人的實(shí)現(xiàn)方式,不過它的函數(shù)庫太老舊,所以在他的基礎(chǔ)上進(jìn)行修改。
Github地址:github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py如下:
#!/usr/bin/python import?pymongo import?re import?time from?pprint?import?pprint?#?pretty?printer from?pymongo.errors?import?AutoReconnect class?OplogWatcher(object): ??def?init(self,?db=None,?collection=None,?poll_time=1.0,?connection=None,?start_now=True): ????if?collection?is?not?None: ??????if?db?is?None: ????????raise?ValueError('must?specify?db?if?you?specify?a?collection') ??????self._ns_filter?=?db?+?'.'?+?collection ????elif?db?is?not?None: ??????self._ns_filter?=?re.compile(r'^%s.'?%?db) ????else: ??????self._ns_filter?=?None ????self.poll_time?=?poll_time ????self.connection?=?connection?or?pymongo.Connection() ????if?start_now: ??????self.start() ??@staticmethod ??def?get_id(op): ????id?=?None ????o2?=?op.get('o2') ????if?o2?is?not?None: ??????id?=?o2.get('_id') ????if?id?is?None: ??????id?=?op['o'].get('_id') ????return?id ??def?start(self): ????oplog?=?self.connection.local['oplog.$main'] ????ts?=?oplog.find().sort('$natural',?-1)[0]['ts'] ????while?True: ??????if?self._ns_filter?is?None:? ????????filter?=?{} ??????else: ????????filter?=?{'ns':?self._ns_filter} ??????filter['ts']?=?{'$gt':?ts} ??????try: ????????cursor?=?oplog.find(filter,?tailable=True) ????????while?True: ??????????for?op?in?cursor: ????????????ts?=?op['ts'] ????????????id?=?self.get_id(op) ????????????self.all_with_noop(ns=op['ns'],?ts=ts,?op=op['op'],?id=id,?raw=op) ??????????time.sleep(self.poll_time) ??????????if?not?cursor.alive: ????????????break ??????except?AutoReconnect: ????????time.sleep(self.poll_time) ??def?all_with_noop(self,?ns,?ts,?op,?id,?raw): ????if?op?==?'n': ??????self.noop(ts=ts) ????else: ??????self.all(ns=ns,?ts=ts,?op=op,?id=id,?raw=raw) ??def?all(self,?ns,?ts,?op,?id,?raw): ????if?op?==?'i': ??????self.insert(ns=ns,?ts=ts,?id=id,?obj=raw['o'],?raw=raw) ????elif?op?==?'u': ??????self.update(ns=ns,?ts=ts,?id=id,?mod=raw['o'],?raw=raw) ????elif?op?==?'d': ??????self.delete(ns=ns,?ts=ts,?id=id,?raw=raw) ????elif?op?==?'c': ??????self.command(ns=ns,?ts=ts,?cmd=raw['o'],?raw=raw) ????elif?op?==?'db': ??????self.db_declare(ns=ns,?ts=ts,?raw=raw) ??def?noop(self,?ts): ????pass ??def?insert(self,?ns,?ts,?id,?obj,?raw,?**kw): ????pass ??def?update(self,?ns,?ts,?id,?mod,?raw,?**kw): ????pass ??def?delete(self,?ns,?ts,?id,?raw,?**kw): ????pass ??def?command(self,?ns,?ts,?cmd,?raw,?**kw): ????pass ??def?db_declare(self,?ns,?ts,?**kw): ????pass class?OplogPrinter(OplogWatcher): ??def?all(self,?**kw): ????pprint?(kw) ????print?#newline if?name?==?'main': ??OplogPrinter()
首先是實(shí)現(xiàn)一個數(shù)據(jù)庫的初始化,設(shè)定一個延遲時間(準(zhǔn)實(shí)時):
self.poll_time?=?poll_time self.connection?=?connection?or?pymongo.MongoClient()
主要的函數(shù)是start() ,實(shí)現(xiàn)一個時間的比對并進(jìn)行相應(yīng)字段的處理:
def?start(self): ?oplog?=?self.connection.local['oplog.$main'] ?#讀取之前提到的庫 ?ts?=?oplog.find().sort('$natural',?-1)[0]['ts'] ?#獲取一個時間邊際 ?while?True: ?if?self._ns_filter?is?None: ??filter?=?{} ?else: ??filter?=?{'ns':?self._ns_filter} ?filter['ts']?=?{'$gt':?ts} ?try: ??cursor?=?oplog.find(filter) ??#對此時間之后的進(jìn)行處理 ??while?True: ??for?op?in?cursor: ???ts?=?op['ts'] ???id?=?self.get_id(op) ???self.all_with_noop(ns=op['ns'],?ts=ts,?op=op['op'],?id=id,?raw=op) ???#可以指定處理插入監(jiān)控,更新監(jiān)控或者刪除監(jiān)控等 ??time.sleep(self.poll_time) ??if?not?cursor.alive: ???break ?except?AutoReconnect: ??time.sleep(self.poll_time)
循環(huán)這個start函數(shù),在all_with_noop這里就可以編寫相應(yīng)的監(jiān)控處理邏輯。
這樣就可以實(shí)現(xiàn)一個簡易的準(zhǔn)實(shí)時Mongo觸發(fā)器監(jiān)控器,下一步就可以配合其他操作來對新入庫的程序進(jìn)行相應(yīng)處理。