您好,登錄后才能下訂單哦!
本篇內容主要講解“如何實現基于Jedis+ZK的分布式序列號生成器”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何實現基于Jedis+ZK的分布式序列號生成器”吧!
部分源碼參考Jedis實現分布式鎖博客:
package com.xxx.arch.seq.utlis; import com.xxx.arch.seq.client.redis.RedisSEQ; import lombok.extern.slf4j.Slf4j; /** * arch-seq 唯一code 獲取客戶端 * * @author jdkleo */ @Slf4j public class SEQUtil { /** * 生成默認KEY的UUID規則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位 * * @param * @return */ public static long getSEQ() { return RedisSEQ.getSEQ(); } /** * 生成默認KEY連續的UUID,共total個 * * @param total - 連續多少個 * @return */ public static long[] getSEQ(long total) { long value = RedisSEQ.getSEQ(total); return getValueArray(value, (int) total); } /** * 生成指定KEY的UUID規則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位 * * @param seqName * @return */ public static long getSEQ(String seqName) { return RedisSEQ.getSEQ(seqName, 1); } /** * 生成指定KEY連續的UUID,共total個 * * @param seqName * @param total * @return */ public static long[] getSEQ(String seqName, long total) { long value = RedisSEQ.getSEQ(seqName, total); return getValueArray(value, (int) total); } private static long[] getValueArray(long value, int total) { int n = total; long[] ret = new long[n]; do { ret[n - 1] = value--; } while (--n > 0); return ret; } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import lombok.extern.slf4j.Slf4j; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; /** * Redis版本SEQ(有序SEQ) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */ @Slf4j public class RedisSEQ extends StreamCloseAble { //默認的REDIS SEQ初始化狀態器KEY private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT"; //默認的REDIS SEQ初始化狀態器VAL private static final String _DEFAULT_SEQ_INIT_PENDING = "pending"; private static final String _DEFAULT_SEQ_INIT_READY = "ready"; //SEQ初始化容器狀態 private static volatile boolean _DEFAULT_SEQ_INIT_STATUS; //默認REDIS SEQ序列號的名稱 private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ"; //本地模式自增ID槽 private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0); static { JedisConfig.JedisConn jedisConn = null; try { jedisConn = JedisConfig.getInstance().getConn(); //if REDIS宕機或第一次:創建初始化狀態成功后,初始化redis keys(該方法可以恢復上次redis宕機數據) if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//搶到REDIS初始化鎖,并將其標記為pending狀態 try { RedisSEQTimer.getInstance().removeNotUsedKeys(); RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,從ZK上讀取初始數據 jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,標記為ready狀態 } catch (Exception e) { log.error(e.getMessage(), e); //初始化arch.seq REDIS數據異常,有可能是ZK相關問題,也有可能是REDIS問題,請排查 log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY); jedisConn.del(_DEFAULT_SEQ_INIT_KEY); } } //else{...} 沒搶到REDIS初始化鎖的話:不作任何處理 } catch (Exception e) { log.error(e.getMessage(), e); log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready"); } finally { close(jedisConn); } } public static Long getSEQ() { return getSEQ(_DEFAULT_SEQ_NAME, 1); } public static Long getSEQ(long total) { return getSEQ(_DEFAULT_SEQ_NAME, total); } public static Long getSEQ(String seqName, long total) { Long result = null; JedisConfig.JedisConn jedisConn = null; try { //獲取redis連接 jedisConn = JedisConfig.getInstance().getConn(); //獲得REDIS初始化狀態不成功 if (!tryInitReady(jedisConn)) { //arch.seq By REDIS版本不能正常初始化,請檢查REDIS服務。 throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service."); } //開啟分布式鎖 //if (jedisConn.tryLock(seqName, 1000, 2000)) { try { String day = RedisSEQTimer.getInstance().getDayFormat(); String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total)); result = Long.parseLong(day + incrVal); } catch (Exception e) { e.printStackTrace(); log.warn("try lock failed,the arch.seq tool will be retry after sleep some times."); Thread.sleep(randTime()); result = getSEQ(seqName, total); } } catch (Throwable e) { log.error(e.getMessage(), e); //redis生成失敗,返回本地ID:15位納秒+1位自然數輪詢 //在獲取【自增序列號:{},序列號分布式鎖:{}】時發生了異常,系統返回了本地生成的自增序列號,不影響系統使用,但請管理員盡快協查! log.error("An exception occurred while acquiring self-incremental sequence number '{}', " + "sequence number distributed lock '{}',The system returns the locally generated self-incremental " + "sequence number, which does not affect the use of the system, but the administrator should check " + "it as soon as possible.", seqName, seqName + "_LOCK"); result = xUUID(); } finally { //切記,一定要釋放分布式鎖(注:釋放鎖的同時jedisConn會自動釋放connection,無需再次CLOSE) if (jedisConn != null) { //jedisConn.unLock(seqName); jedisConn.close(); } if (log.isDebugEnabled()) { log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace()); } } return result; //arch.seq發生了不可預測的異常,請聯系架構部處理! //throw new RuntimeException("arch.seq發生了不可預測的異常,請聯系架構部處理!"); } private static String getStackTrace() { StringBuilder result = new StringBuilder(); StackTraceElement[] element = Thread.currentThread().getStackTrace(); for (int i = 0; i < element.length; i++) { result.append("\t").append(element[i]).append("\n"); } return result.toString(); } private static long randTime() { return new Random().nextInt(50) + 50; } private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException { int times = 0; for (; times < 3; times++) { if (getSEQInitReady(jedisConn)) { break; } Thread.sleep(100); } return times < 3; } /** * 獲得SEQ初始化狀態 * * @param jedisConn * @return */ private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) { if (!_DEFAULT_SEQ_INIT_STATUS) { synchronized (RedisSEQ.class) { if (!_DEFAULT_SEQ_INIT_STATUS) { _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY)); } } } return _DEFAULT_SEQ_INIT_STATUS; } /** * 獲得REDIS自增序列號最新值,并同步更新到ZK備份數據節點守護線程中 * * @param jedisConn * @param day * @param seqName * @param total * @return */ private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) { String key = seqName + "_" + day; Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key); if (incrVal > 9999999999L) { throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal); } //塞到要更新的ZK隊列中 RedisSEQTimer.getInstance().push(key, incrVal); return incrVal; } /** * 單機模式生成UUID * * @return */ private static Long xUUID() { int rand = _LOCAL_INCR.incrementAndGet() % 10; String result = System.nanoTime() + "" + rand; return Long.parseLong(result); } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.client.zk.ZkClientUtil; import org.apache.commons.lang3.time.DateUtils; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class RedisSEQTimer extends StreamCloseAble { public static final String DAY_FORMAT_PATTERN = "yyMMdd"; public static volatile RedisSEQTimer redisSEQTimer; private final ConcurrentHashMap<String, Long> REDIS_INCR_MAP = new ConcurrentHashMap<>(); private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient(); private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS"; //zk節點最大值每次遞增數 private long _REDIS_MAXVALUE_INIT = 10_000L; private Timer _TIMER = new Timer(true); //是否處于清理狀態 private volatile boolean _CLEAN_STATUS; //清理key private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY"; private RedisSEQTimer() { super(); //啟動zk巡查服務 _TIMER.schedule(new TimerTask() { @Override public void run() { checkAndConfigure(); } }, new Date(), 1 * 60 * 1000); //每天定時清理垃圾數據 _TIMER.schedule(new TimerTask() { @Override public void run() { removeNotUsedKeys(); } }, getFirstTime(), 24 * 60 * 60 * 1000); } public static RedisSEQTimer getInstance() { if (redisSEQTimer == null) { synchronized (RedisSEQTimer.class) { if (redisSEQTimer == null) { redisSEQTimer = new RedisSEQTimer(); } } } return redisSEQTimer; } /** * 定期更新ZK節點 */ private synchronized void checkAndConfigure() { if (_CLEAN_STATUS) { return; } if (REDIS_INCR_MAP.isEmpty()) { return; } String endDay = "_" + getDayFormat(); List<String> notTodayKeys = new ArrayList<>(); Set<Map.Entry<String, Long>> entrySet = REDIS_INCR_MAP.entrySet(); for (Map.Entry<String, Long> entry : entrySet) { //不是今天的key不作處理 if (!entry.getKey().endsWith(endDay)) { notTodayKeys.add(entry.getKey()); return; } //將最新的值寫到zk節點上 節點格式:<ARCH_SEQ前綴>/KEY_yyMMdd String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey(); if (_ZK_CLIENT.exists(zkNode)) { _ZK_CLIENT.writeData(zkNode, entry.getValue()); } else { try { _ZK_CLIENT.createPersistent(zkNode, entry.getValue()); } catch (RuntimeException e) { //not to write log ,it's will be retry in next time. } } } ; if (!notTodayKeys.isEmpty()) { for (String key : notTodayKeys) { REDIS_INCR_MAP.remove(key); } } } /** * 刪除不再使用的KEY(包含redis和zk節點) */ public synchronized void removeNotUsedKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } _CLEAN_STATUS = true; JedisConfig.JedisConn jedisConn = null; String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); //保留兩天。考慮到多個機器的時間可能不一致,如果在剛過零點刪除了昨天的sequence,另一臺機器可能還需要使用它,則會出現id重復 Date now = new Date(); Date yesterday = DateUtils.addDays(now, -1); List<String> keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday)); if (list != null && !list.isEmpty()) { jedisConn = JedisConfig.getInstance().getConn(); if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) { JedisConfig.JedisConn finalJedisConn = jedisConn; for (String node : list) { String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length()); if (!keepDays.contains(dayPart)) { REDIS_INCR_MAP.remove(node); finalJedisConn.del(node); removeZkNode(node); } } } } } finally { _CLEAN_STATUS = false; if (jedisConn != null) { if (tryLock) { jedisConn.unLock(_REMOVE_KEY, requestId); } jedisConn.close(); } } } /** * 移除ZK節點 * * @param node */ private void removeZkNode(String node) { String path = _DEFAULT_ZK_NAMESPACE + "/" + node; if (_ZK_CLIENT.exists(path)) { try { _ZK_CLIENT.delete(path); } catch (Exception e) { } } } /** * 獲得每天定時任務的執行時間 * * @return */ private Date getFirstTime() { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.HOUR_OF_DAY, 24); // 24點 可以更改時間 calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分鐘 隨機 calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒 隨機 return calendar.getTime(); } /** * 獲得區間隨機整數 * * @param exclude - 最大數,exclude * @param from - 最小數,include * @return */ private int getRandNum(int exclude, int from) { return new Random().nextInt(exclude) + from; } /** * 將某天的KEY塞到相應隊列 * * @param key - 業務KEY key_yyMMdd * @param val - 值 * @return 是否成功 */ public synchronized void push(String key, Long val) { REDIS_INCR_MAP.put(key, val); } public String getDayFormat() { return getDayFormat(new Date()); } public String getDayFormat(Date date) { return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date); } /** * 初始化redis keys */ public void initRedisKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); if (list != null && !list.isEmpty()) { Long zkVal; JedisConfig.JedisConn jedisConn = null; for (int i = 0; i < list.size(); i++) { zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i)); if (zkVal != null) { String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { jedisConn = JedisConfig.getInstance().getConn(); //獲得鎖才更新,沒獲得鎖就放棄更新 if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) { jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT)); } } finally { if (jedisConn != null) { if (tryLock) { jedisConn.unLock(list.get(i), requestId); } jedisConn.close(); } } } } } } }
package com.xxx.arch.seq.client.tool; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Collections; import java.util.List; @Slf4j public class ZkClient { private CuratorFramework client; public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(serverList) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeout) .retryPolicy(retryPolicy) .build(); client.start(); } public boolean exists(String path) { try { return client.checkExists().forPath(path) != null; } catch (Exception e) { return false; } } public void writeData(String path, Long value) { try { client.setData().forPath(path, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public void createPersistent(String zkNode, Long value) { try { client.create().forPath(zkNode, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public List<String> getChildren(String path) { try { return client.getChildren().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } return Collections.emptyList(); } public Long readData(String path) { try { byte[] data = client.getData().forPath(path); return Long.parseLong(new String(data)); } catch (Exception e) { log.error(e.getMessage(), e); } return null; } public void delete(String path) { try { client.delete().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.zk; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.constant.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkClientUtil { private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class); private static volatile ZkClient zkClient = null; public static ZkClient getZkClient() { if (zkClient == null) { synchronized (ZkClientUtil.class) { if (zkClient == null) { initZkClient(); } } } return zkClient; } private static void initZkClient() { try { String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING; if (logger.isInfoEnabled()) { logger.info("zk cluster[" + serverList + "]"); } if (serverList == null || serverList.trim().isEmpty()) { throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used"); } else { zkClient = new ZkClient(serverList, 15000, 60000); } } catch (Exception e) { logger.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.tool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; /** * Created by zhangyang on 2016/5/31. */ public class StreamCloseAble { private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class); /** * 關閉輸入輸出流 * * @param closeAbles */ public static void close(Closeable... closeAbles) { if (closeAbles == null || closeAbles.length <= 0) { return; } for (Closeable closeAble : closeAbles) { if (closeAble != null) { try { closeAble.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } } }
到此,相信大家對“如何實現基于Jedis+ZK的分布式序列號生成器”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。