jdbc-input-plugin 只能實現數據庫的追加,對于 elasticsearch 增量寫入,但經常jdbc源一端的數據庫可能會做數據庫刪除或者更新操作。這樣一來數據庫與搜索引擎的數據庫就出現了不對稱的情況。本文主要介紹了mysql 與 elasticsearch 數據不對稱問題解決辦法的相關資料,對于 elasticsearch 增量寫入,但經常jdbc源一端的數據庫可能會做數據庫刪除或者更新操作,這里提供解決辦法,需要的朋友可以參考下,希望能幫助到大家。
當然你如果有開發團隊可以寫程序在刪除或者更新的時候同步對搜索引擎操作。如果你沒有這個能力,可以嘗試下面的方法。
這里有一個數據表 article , mtime 字段定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時間都會變化
mysql>?desc?article; +-------------+--------------+------+-----+--------------------------------+-------+ |?Field????|?Type?????|?Null?|?Key?|?Default????????????|?Extra?| +-------------+--------------+------+-----+--------------------------------+-------+ |?id?????|?int(11)???|?NO??|???|?0???????????????|????| |?title????|?mediumtext??|?NO??|???|?NULL??????????????|????| |?description?|?mediumtext??|?YES?|???|?NULL??????????????|????| |?author???|?varchar(100)?|?YES?|???|?NULL??????????????|????| |?source???|?varchar(100)?|?YES?|???|?NULL??????????????|????| |?content???|?longtext???|?YES?|???|?NULL??????????????|????| |?status???|?enum('Y','N')|?NO??|???|?'N'??????????????|????| |?ctime????|?timestamp??|?NO??|???|?CURRENT_TIMESTAMP???????|????| |?mtime????|?timestamp??|?YES?|???|?ON?UPDATE?CURRENT_TIMESTAMP??|????| +-------------+--------------+------+-----+--------------------------------+-------+ 7?rows?in?set?(0.00?sec)
logstash 增加 mtime 的查詢規則
jdbc?{ ??jdbc_driver_library?=>?"/usr/share/java/mysql-connector-java.jar" ??jdbc_driver_class?=>?"com.mysql.jdbc.Driver" ??jdbc_connection_string?=>?"jdbc:mysql://localhost:3306/cms" ??jdbc_user?=>?"cms" ??jdbc_password?=>?"password" ??schedule?=>?"*?*?*?*?*"?#定時cron的表達式,這里是每分鐘執行一次 ??statement?=>?"select?*?from?article?where?mtime?>?:sql_last_value" ??use_column_value?=>?true ??tracking_column?=>?"mtime" ??tracking_column_type?=>?"timestamp"? ??record_last_run?=>?true ??last_run_metadata_path?=>?"/var/tmp/article-mtime.last" ?}
創建回收站表,這個事用于解決數據庫刪除,或者禁用 status = ‘N’ 這種情況的。
CREATE?TABLE?`elasticsearch_trash`?( ?`id`?int(11)?NOT?NULL, ?`ctime`?timestamp?NULL?DEFAULT?CURRENT_TIMESTAMP, ?PRIMARY?KEY?(`id`) )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8
為 article 表創建觸發器
CREATE?DEFINER=`dba`@`%`?TRIGGER?`article_BEFORE_UPDATE`?BEFORE?UPDATE?ON?`article`?FOR?EACH?ROW BEGIN ?--?此處的邏輯是解決文章狀態變為?N?的時候,需要將搜索引擎中對應的數據刪除。 ?IF?NEW.status?=?'N'?THEN ?insert?into?elasticsearch_trash(id)?values(OLD.id); ?END?IF; ?--?此處邏輯是修改狀態到?Y?的時候,方式elasticsearch_trash仍然存在該文章ID,導致誤刪除。所以需要刪除回收站中得回收記錄。 ??IF?NEW.status?=?'Y'?THEN ?delete?from?elasticsearch_trash?where?id?=?OLD.id; ?END?IF; END CREATE?DEFINER=`dba`@`%`?TRIGGER?`article_BEFORE_DELETE`?BEFORE?DELETE?ON?`article`?FOR?EACH?ROW BEGIN ?--?此處邏輯是文章被刪除同事將改文章放入搜索引擎回收站。 ?insert?into?elasticsearch_trash(id)?values(OLD.id); END
接下來我們需要寫一個簡單地 Shell 每分鐘運行一次,從 elasticsearch_trash 數據表中取出數據,然后使用 curl 命令調用 elasticsearch restful 接口,刪除被收回的數據。
你還可以開發相關的程序,這里提供一個 Spring boot 定時任務例子。
實體
package?cn.netkiller.api.domain.elasticsearch; import?java.util.Date; import?javax.persistence.Column; import?javax.persistence.Entity; import?javax.persistence.Id; import?javax.persistence.Table; @Entity @Table public?class?ElasticsearchTrash?{ ?@Id ?private?int?id; ?@Column(columnDefinition?=?"TIMESTAMP?DEFAULT?CURRENT_TIMESTAMP") ?private?Date?ctime; ?public?int?getId()?{ ?return?id; ?} ?public?void?setId(int?id)?{ ?this.id?=?id; ?} ?public?Date?getCtime()?{ ?return?ctime; ?} ?public?void?setCtime(Date?ctime)?{ ?this.ctime?=?ctime; ?} }
倉庫
package?cn.netkiller.api.repository.elasticsearch; import?org.springframework.data.repository.CrudRepository; import?com.example.api.domain.elasticsearch.ElasticsearchTrash; public?interface?ElasticsearchTrashRepository?extends?CrudRepository<elasticsearchtrash>{ }</elasticsearchtrash>
定時任務
package?cn.netkiller.api.schedule; import?org.elasticsearch.action.delete.DeleteResponse; import?org.elasticsearch.client.transport.TransportClient; import?org.elasticsearch.rest.RestStatus; import?org.slf4j.Logger; import?org.slf4j.LoggerFactory; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.scheduling.annotation.Scheduled; import?org.springframework.stereotype.Component; import?com.example.api.domain.elasticsearch.ElasticsearchTrash; import?com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public?class?ScheduledTasks?{ ?private?static?final?Logger?logger?=?LoggerFactory.getLogger(ScheduledTasks.class); ?@Autowired ?private?TransportClient?client; ?@Autowired ?private?ElasticsearchTrashRepository?alasticsearchTrashRepository; ?public?ScheduledTasks()?{ ?} ?@Scheduled(fixedRate?=?1000?*?60)?//?60秒運行一次調度任務 ?public?void?cleanTrash()?{ ?for?(ElasticsearchTrash?elasticsearchTrash?:?alasticsearchTrashRepository.findAll())?{ ??DeleteResponse?response?=?client.prepareDelete("information",?"article",?elasticsearchTrash.getId()?+?"").get(); ??RestStatus?status?=?response.status(); ??logger.info("delete?{}?{}",?elasticsearchTrash.getId(),?status.toString()); ??if?(status?==?RestStatus.OK?||?status?==?RestStatus.NOT_FOUND)?{ ??alasticsearchTrashRepository.delete(elasticsearchTrash); ??} ?} ?} }
Spring boot 啟動主程序。
package?cn.netkiller.api; import?org.springframework.boot.SpringApplication; import?org.springframework.boot.autoconfigure.SpringBootApplication; import?org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public?class?Application?{ ?public?static?void?main(String[]?args)?{ ?SpringApplication.run(Application.class,?args); ?} }
相關推薦:
Elasticsearch是什么?Elasticsearch?能夠被用在什么地方?
? 版權聲明
文章版權歸作者所有,未經允許請勿轉載。
THE END