下面小編就為大家帶來一篇基于mysql的sequence實現方法。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
團隊更換新框架。新的業務全部使用新的框架,甚至是新的數據庫–mysql。
這邊之前一直是使用oracle,各種訂單號、流水號、批次號啥的,都是直接使用oracle的sequence提供的數字序列號。現在數據庫更換成Mysql了,顯然以前的老方法不能適用了。
需要新寫一個:
?分布式場景使用
?滿足一定的并發要求
找了一些相關的資料,發現mysql這方面的實現,原理都是一條數據庫記錄,不斷update它的值。然后大部分的實現方案,都用到了函數。
貼一下網上的代碼:
基于mysql函數實現
表結構
CREATE?TABLE?`t_sequence`?( `sequence_name`?varchar(64)?CHARACTER?SET?utf8?COLLATE?utf8_general_ci?NOT?NULL?COMMENT?'序列名稱'?, `value`?int(11)?NULL?DEFAULT?NULL?COMMENT?'當前值'?, PRIMARY?KEY?(`sequence_name`) ) ENGINE=InnoDB DEFAULT?CHARACTER?SET=utf8?COLLATE=utf8_general_ci ROW_FORMAT=COMPACT ;
獲取下一個值
CREATE?DEFINER?=?`root`@`localhost`?FUNCTION?`nextval`(sequence_name?varchar(64)) ?RETURNS?int(11) BEGIN ?declare?current?integer; ?set?current?=?0; ? ?update?t_sequence?t?set?t.value?=?t.value?+?1?where?t.sequence_name?=?sequence_name; ?select?t.value?into?current?from?t_sequence?t?where?t.sequence_name?=?sequence_name; ?return?current; end;
并發場景有可能會出問題,雖然可以在業務層加鎖,但分布式場景就無法保證了,然后效率應該也不會高。
自己實現一個,java版
原理:
?讀取一條記錄,緩存一個數據段,如:0-100,將記錄的當前值從0修改為100
?數據庫樂觀鎖更新,允許重試
?讀取數據從緩存中讀取,用完再讀取數據庫
不廢話,上代碼:
基于java實現
表結構
每次update,都是將SEQ_VALUE設置為SEQ_VALUE+STEP
CREATE?TABLE?`t_pub_sequence`?( ?`SEQ_NAME`?varchar(128)?CHARACTER?SET?utf8?NOT?NULL?COMMENT?'序列名稱', ?`SEQ_VALUE`?bigint(20)?NOT?NULL?COMMENT?'目前序列值', ?`MIN_VALUE`?bigint(20)?NOT?NULL?COMMENT?'最小值', ?`MAX_VALUE`?bigint(20)?NOT?NULL?COMMENT?'最大值', ?`STEP`?bigint(20)?NOT?NULL?COMMENT?'每次取值的數量', ?`TM_CREATE`?datetime?NOT?NULL?COMMENT?'創建時間', ?`TM_SMP`?datetime?NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?ON?UPDATE?CURRENT_TIMESTAMP?COMMENT?'修改時間', ?PRIMARY?KEY?(`SEQ_NAME`) )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4?COMMENT='流水號生成表';
sequence接口
/** ?*?<p></p> ?*?@author?coderzl ?*?@Title?MysqlSequence ?*?@Description?基于mysql數據庫實現的序列 ?*?@date?2017/6/6?23:03 ?*/ public?interface?MysqlSequence?{ ?/** ??*?<p> ??*?獲取指定sequence的序列號 ??*?</p> ??*?@param?seqName?sequence名 ??*?@return?String?序列號 ??*/ ?public?String?nextVal(String?seqName); }
序列區間
用于本地緩存一段序列,從min到max區間
/** ?*?<p></p> ?* ?*?@author?coderzl ?*?@Title?SequenceRange ?*?@Description?序列區間,用于緩存序列 ?*?@date?2017/6/6?22:58 ?*/ ?@Data public?class?SequenceRange?{ ?private?final?long??min; ?private?final?long??max; ?/**?*/ ?private?final?AtomicLong?value; ?/**?是否超限?*/ ?private?volatile?boolean?over?=?false; ?/** ??*?構造. ??* ??*?@param?min? ??*?@param?max? ??*/ ?public?SequenceRange(long?min,?long?max)?{ ??this.min?=?min; ??this.max?=?max; ??this.value?=?new?AtomicLong(min); ?} ?/** ??*?<p>Gets?and?increment</p> ??* ??*?@return? ??*/ ?public?long?getAndIncrement()?{ ??long?currentValue?=?value.getAndIncrement(); ??if?(currentValue?>?max)?{ ???over?=?true; ???return?-1; ??} ??return?currentValue; ?} }
BO
對應數據庫記錄
@Data public?class?MysqlSequenceBo?{ ?/** ??*?seq名 ??*/ ?private?String?seqName; ?/** ??*?當前值 ??*/ ?private?Long?seqValue; ?/** ??*?最小值 ??*/ ?private?Long?minValue; ?/** ??*?最大值 ??*/ ?private?Long?maxValue; ?/** ??*?每次取值的數量 ??*/ ?private?Long?step; ?/**?*/ ?private?Date?tmCreate; ?/**?*/ ?private?Date?tmSmp; ?public?boolean?validate(){ ??//一些簡單的校驗。如當前值必須在最大最小值之間。step值不能大于max與min的差 ??if?(StringUtil.isBlank(seqName)?||?minValue?=?maxValue?||?maxValue?-?minValue??maxValue?)?{ ???return?false; ??} ??return?true;? ?} }
DAO
增刪改查,其實就用到了改和查
public?interface?MysqlSequenceDAO?{ ?/** ?*? ?*/ ?public?int?createSequence(MysqlSequenceBo?bo); ?public?int?updSequence(@Param("seqName")?String?seqName,?@Param("oldValue")?long?oldValue?,@Param("newValue")?long?newValue); ?public?int?delSequence(@Param("seqName")?String?seqName); ?public?MysqlSequenceBo?getSequence(@Param("seqName")?String?seqName); ?public?List<mysqlsequencebo>?getAll(); }</mysqlsequencebo>
Mapper
<?xml version="1.0" encoding="UTF-8" ?>nbsp;mapper?PUBLIC?"-//mybatis.org//DTD?Mapper?3.0//EN"?"http://mybatis.org/dtd/mybatis-3-mapper.dtd"?> <mapper> ?<resultmap> ??<result></result> ??<result></result> ??<result></result> ??<result></result> ??<result></result> ??<result></result> ??<result></result> ?</resultmap> ?<delete> ??delete?from?t_pub_sequence ??where?SEQ_NAME?=?#{seqName,jdbcType=VARCHAR} ?</delete> ?<insert> ??insert?into?t_pub_sequence?(SEQ_NAME,SEQ_VALUE,MIN_VALUE,MAX_VALUE,STEP,TM_CREATE) ??values?(#{seqName,jdbcType=VARCHAR},?#{seqValue,jdbcType=BIGINT}, ??#{minValue,jdbcType=BIGINT},?#{maxValue,jdbcType=BIGINT},?#{step,jdbcType=BIGINT}, ??now()) ?</insert> ?<update> ??update?t_pub_sequence ??set?SEQ_VALUE?=?#{newValue,jdbcType=BIGINT} ??where?SEQ_NAME?=?#{seqName,jdbcType=VARCHAR}?and?SEQ_VALUE?=?#{oldValue,jdbcType=BIGINT} ?</update> ?<select> ??select?SEQ_NAME,?SEQ_VALUE,?MIN_VALUE,?MAX_VALUE,?STEP ??from?t_pub_sequence ?</select> ?<select> ??select?SEQ_NAME,?SEQ_VALUE,?MIN_VALUE,?MAX_VALUE,?STEP ??from?t_pub_sequence ??where?SEQ_NAME?=?#{seqName,jdbcType=VARCHAR} ?</select></mapper>
接口實現
@Repository("mysqlSequence") public?class?MysqlSequenceImpl?implements?MysqlSequence{ ?@Autowired ?private?MysqlSequenceFactory?mysqlSequenceFactory; ?/** ??*?<p> ??*?獲取指定sequence的序列號 ??*?</p> ??* ??*?@param?seqName?sequence名 ??*?@return?String?序列號 ??*?@author?coderzl ??*/ ?@Override ?public?String?nextVal(String?seqName)?{ ??return?Objects.toString(mysqlSequenceFactory.getNextVal(seqName)); ?} }
工廠
工廠只做了兩件事
?服務啟動的時候,初始化數據庫中所有sequence【完成序列區間緩存】
?獲取sequence的下一個值
@Component public?class?MysqlSequenceFactory?{ ?private?final?Lock?lock?=?new?ReentrantLock(); ?/**?*/ ?private?Map<string>?holderMap?=?new?ConcurrentHashMap(); ?@Autowired ?private?MysqlSequenceDAO?msqlSequenceDAO; ?/**?單個sequence初始化樂觀鎖更新失敗重試次數?*/ ?@Value("${seq.init.retry:5}") ?private?int?initRetryNum; ?/**?單個sequence更新序列區間樂觀鎖更新失敗重試次數?*/ ?@Value("${seq.get.retry:20}") ?private?int?getRetryNum; ?@PostConstruct ?private?void?init(){ ??//初始化所有sequence ??initAll(); ?} ?/** ??*?<p>?加載表中所有sequence,完成初始化?</p> ??*?@return?void ??*?@author?coderzl ??*/ ?private?void?initAll(){ ??try?{ ???lock.lock(); ???List<mysqlsequencebo>?boList?=?msqlSequenceDAO.getAll(); ???if?(boList?==?null)?{ ????throw?new?IllegalArgumentException("The?sequenceRecord?is?null!"); ???} ???for?(MysqlSequenceBo?bo?:?boList)?{ ????MysqlSequenceHolder?holder?=?new?MysqlSequenceHolder(msqlSequenceDAO,?bo,initRetryNum,getRetryNum); ????holder.init(); ????holderMap.put(bo.getSeqName(),?holder); ???} ??}finally?{ ???lock.unlock(); ??} ?} ?/** ??*?<p>?</p> ??*?@param?seqName ??*?@return?long ??*?@author?coderzl ??*/ ?public?long?getNextVal(String?seqName){ ??MysqlSequenceHolder?holder?=?holderMap.get(seqName); ??if?(holder?==?null)?{ ???try?{ ????lock.lock(); ????holder?=?holderMap.get(seqName); ????if?(holder?!=?null){ ?????return?holder.getNextVal(); ????} ????MysqlSequenceBo?bo?=?msqlSequenceDAO.getSequence(seqName); ????holder?=?new?MysqlSequenceHolder(msqlSequenceDAO,?bo,initRetryNum,getRetryNum); ????holder.init(); ????holderMap.put(seqName,?holder); ???}finally?{ ????lock.unlock(); ???} ??} ??return?holder.getNextVal(); ?} }</mysqlsequencebo></string>
單一sequence的Holder
?init() 初始化 其中包括參數校驗,數據庫記錄更新,創建序列區間
?getNextVal() 獲取下一個值
public?class?MysqlSequenceHolder?{ ?private?final?Lock?lock????=?new?ReentrantLock(); ?/**?seqName?*/ ?private?String?seqName; ?/**?sequenceDao?*/ ?private?MysqlSequenceDAO?sequenceDAO; ?private?MysqlSequenceBo?sequenceBo; ?/**?*/ ?private?SequenceRange?sequenceRange; ?/**?是否初始化?*/ ?private?volatile?boolean??isInitialize??=?false; ?/**?sequence初始化重試次數?*/ ?private?int?initRetryNum; ?/**?sequence獲取重試次數?*/ ?private?int?getRetryNum; ?/** ??*?<p>?構造方法?</p> ??*?@Title?MysqlSequenceHolder ??*?@param?sequenceDAO? ??*?@param?sequenceBo ??*?@param?initRetryNum?初始化時,數據庫更新失敗后重試次數 ??*?@param?getRetryNum?獲取nextVal時,數據庫更新失敗后重試次數 ??*?@return ??*?@author?coderzl ??*/ ?public?MysqlSequenceHolder(MysqlSequenceDAO?sequenceDAO,?MysqlSequenceBo?sequenceBo,int?initRetryNum,int?getRetryNum)?{ ??this.sequenceDAO?=?sequenceDAO; ??this.sequenceBo?=?sequenceBo; ??this.initRetryNum?=?initRetryNum; ??this.getRetryNum?=?getRetryNum; ??if(sequenceBo?!=?null) ???this.seqName?=?sequenceBo.getSeqName(); ?} ?/** ??*?<p>?初始化?</p> ??*?@Title?init ??*?@param ??*?@return?void ??*?@author?coderzl ??*/ ?public?void?init(){ ??if?(isInitialize?==?true)?{ ???throw?new?SequenceException("["?+?seqName?+?"]?the?MysqlSequenceHolder?has?inited"); ??} ??if?(sequenceDAO?==?null)?{ ???throw?new?SequenceException("["?+?seqName?+?"]?the?sequenceDao?is?null"); ??} ??if?(seqName?==?null?||?seqName.trim().length()?==?0)?{ ???throw?new?SequenceException("["?+?seqName?+?"]?the?sequenceName?is?null"); ??} ??if?(sequenceBo?==?null)?{ ???throw?new?SequenceException("["?+?seqName?+?"]?the?sequenceBo?is?null"); ??} ??if?(!sequenceBo.validate()){ ???throw?new?SequenceException("["?+?seqName?+?"]?the?sequenceBo?validate?fail.?BO:"+sequenceBo); ??} ??//?初始化該sequence ??try?{ ???initSequenceRecord(sequenceBo); ??}?catch?(SequenceException?e)?{ ???throw?e; ??} ??isInitialize?=?true; ?} ?/** ??*?<p>?獲取下一個序列號?</p> ??*?@Title?getNextVal ??*?@param ??*?@return?long ??*?@author?coderzl ??*/ ?public?long?getNextVal(){ ??if(isInitialize?==?false){ ???throw?new?SequenceException("["?+?seqName?+?"]?the?MysqlSequenceHolder?not?inited"); ??} ??if(sequenceRange?==?null){ ???throw?new?SequenceException("["?+?seqName?+?"]?the?sequenceRange?is?null"); ??} ??long?curValue?=?sequenceRange.getAndIncrement(); ??if(curValue?==?-1){ ???try{ ????lock.lock(); ????curValue?=?sequenceRange.getAndIncrement(); ????if(curValue?!=?-1){ ?????return?curValue; ????} ????sequenceRange?=?retryRange(); ????curValue?=?sequenceRange.getAndIncrement(); ???}finally?{ ????lock.unlock(); ???} ??} ??return?curValue; ?} ?/** ??*?<p>?初始化當前這條記錄?</p> ??*?@Title?initSequenceRecord ??*?@Description ??*?@param?sequenceBo ??*?@return?void ??*?@author?coderzl ??*/ ?private?void?initSequenceRecord(MysqlSequenceBo?sequenceBo){ ??//在限定次數內,樂觀鎖更新數據庫記錄 ??for(int?i?=?1;?i??0){ ????sequenceRange?=?new?SequenceRange(curBo.getSeqValue(),newValue?-?1); ????curBo.setSeqValue(newValue); ????this.sequenceBo?=?curBo; ????return; ???}else{ ????continue; ???} ??} ??//限定次數內,更新失敗,拋出異常 ??throw?new?SequenceException("["?+?seqName?+?"]?sequenceBo?update?error"); ?} ?/** ??*?<p>?檢查新值是否合法?新的當前值是否在最大最小值之間</p> ??*?@param?curValue ??*?@param?curBo ??*?@return?boolean ??*?@author?coderzl ??*/ ?private?boolean?checkCurrentValue(long?curValue,MysqlSequenceBo?curBo){ ??if(curValue?>?curBo.getMinValue()?&&?curValue??重置sequence當前值?:當前sequence達到最大值時,重新從最小值開始? ??*?@Title?resetCurrentValue ??*?@param?curBo ??*?@return?long ??*?@author?coderzl ??*/ ?private?long?resetCurrentValue(MysqlSequenceBo?curBo){ ??return?curBo.getMinValue(); ?} ?/** ??*?<p>?緩存區間使用完畢時,重新讀取數據庫記錄,緩存新序列段?</p> ??*?@Title?retryRange ??*?@param?SequenceRange ??*?@author?coderzl ??*/ ?private?SequenceRange?retryRange(){ ??for(int?i?=?1;?i??0){ ????sequenceRange?=?new?SequenceRange(curBo.getSeqValue(),newValue?-?1); ????curBo.setSeqValue(newValue); ????this.sequenceBo?=?curBo; ????return?sequenceRange; ???}else{ ????continue; ???} ??} ??throw?new?SequenceException("["?+?seqName?+?"]?sequenceBo?update?error"); ?} }
總結
?當服務重啟或異常的時候,會丟失當前服務所緩存且未用完的序列
?分布式場景,多個服務同時初始化,或者重新獲取sequence時,樂觀鎖會保證彼此不沖突。A服務獲取0-99,B服務會獲取100-199,以此類推
?當該sequence獲取較為頻繁時,增大step值,能提升性能。但同時服務異常時,損失的序列也較多
?修改數據庫里sequence的一些屬性值,比如step,max等,再下一次從數據庫獲取時,會啟用新的參數
?sequence只是提供了有限個序列號(最多max-min個),達到max后,會循環從頭開始。
?由于sequence會循環,所以達到max后,再獲取,就不會唯一。建議使用sequence來做業務流水號時,拼接時間。如:20170612235101+序列號
業務id拼接方法
@Service public?class?JrnGeneratorService?{ ?private?static?final?String?SEQ_NAME?=?"T_SEQ_TEST"; ?/**?sequence服務?*/ ?@Autowired ?private?MySqlSequence?mySqlSequence; ? ?public?String?generateJrn()?{ ??try?{ ???String?sequence?=?mySqlSequence.getNextValue(SEQ_NAME); ???sequence?=?leftPadding(sequence,8); ???Calendar?calendar?=?Calendar.getInstance(); ???SimpleDateFormat?sDateFormat?=?new?SimpleDateFormat("yyyyMMddHHmmss"); ???String?nowdate?=?sDateFormat.format(calendar.getTime()); ???nowdate.substring(4,?nowdate.length()); ???String?jrn?=?nowdate?+?sequence?+?RandomUtil.getFixedLengthRandom(6);//10位時間+8位序列?+?6位隨機數=24位流水號 ???return?jrn; ??}?catch?(Exception?e)?{ ???//TODO ??} ?} ? ?private?String?leftPadding(String?seq,int?len){ ??String?res?=""; ??String?str?=""; ??if(seq.length()<len></len>