中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Jedis中怎么實現分布式鎖

發布時間:2021-08-03 14:45:30 來源:億速云 閱讀:194 作者:Leah 欄目:編程語言

本篇文章為大家展示了Jedis中怎么實現分布式鎖,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

package com.xxx.arch.seq.client.redis;

import java.io.Closeable;
import java.util.*;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.*;

import com.xxx.arch.seq.constant.Constants;

/**
 * Jedis配置實例封裝類(兼容單節點連接池和集群節點)
 *
 * @author zhangyang
 * @createDate 2019-01-22
 * @since 2.x
 */
public class JedisConfig {

    private static volatile JedisConfig redisConfig;

    //當前模式:1單例,2哨兵 3集群Cluster
    private int singleton;

    //jedis連接池
    private JedisPool jedisPool;

    private JedisSentinelPool sentinelPool;

    private Jedis jedis;


    //jeids集群
    private JedisCluster jedisCluster;

    private JedisConfig() {
        Properties redisProp = new Properties();
        
		redisProp.setProperty("arch.seq.redis.host", Constants.ARCH_SEQ_REDIS_NODES);
		redisProp.setProperty("arch.seq.redis.password", Constants.ARCH_SEQ_REDIS_PASSWORD);
		redisProp.setProperty("arch.seq.redis.sentinel.master", Constants.ARCH_SEQ_REDIS_SENTINEL_MASTER);

        String hostConf = redisProp.getProperty("arch.seq.redis.host");
        if (hostConf == null) {
            throw new RuntimeException("get redis configuration error");
        }
        if ("${arch.seq.redis.host}".equals(hostConf)) {
            throw new RuntimeException("please check occ var \"arch.seq.redis.host\"");
        }
        if(!hostConf.contains(",")&&!hostConf.contains(">>")){
            singleton = 1;
        }else if(hostConf.contains(">>")){
            singleton=2;
        }else{
            singleton=3;
        }

        if (singleton==1) {
            initJedisPool(redisProp);
        } else if(singleton==2){
            initJedisSentinel(redisProp);
        }else{
            initJedisCluster(redisProp);
        }
    }

    private void initJedisPool(Properties redisProp) {
        String[] hostConf = redisProp.getProperty("arch.seq.redis.host").split(":");
        this.jedisPool = new JedisPool(new JedisPoolConfig(), hostConf[0], Integer.valueOf(hostConf[1]),
                0, redisProp.getProperty("arch.seq.redis.password"));
    }

    private void initJedisCluster(Properties redisProp) {
        String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        String[] hostConf;
        for (String hc : hostConfList) {
            hostConf = hc.split(":");
            nodes.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])));
        }
        jedisCluster = new JedisCluster(nodes, 0, 0, 4,
                redisProp.getProperty("arch.seq.redis.password"), new GenericObjectPoolConfig());
    }

    private void initJedisSentinel(Properties redisProp) {
        String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(">>");
        Set sentinels = new HashSet();
        String[] hostConf;
        for (String hc : hostConfList) {
            hostConf= hc.split(":");
            sentinels.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])).toString());
        }

        sentinelPool = new JedisSentinelPool(redisProp.getProperty("arch.seq.redis.sentinel.master"), sentinels,redisProp.getProperty("arch.seq.redis.password"));
        jedis = sentinelPool.getResource();
    }

    public static JedisConfig getInstance() {
        if (redisConfig == null) {
            synchronized (JedisConfig.class) {
                if (redisConfig == null) {
                    redisConfig = new JedisConfig();
                }
            }
        }
        return redisConfig;
    }

    public JedisConn getConn() {
        if(singleton==1){
            return new JedisConn(jedisPool.getResource());
        }
        if(singleton==2){
            return new JedisConn(sentinelPool.getResource());
        }
        if(singleton==3){
            return new JedisConn(jedisCluster);
        }
        return null;
    }

    /**
     * redis連接封裝類,支持單機和集群,支持常規操作,支持分布式鎖
     */
    public static class JedisConn implements Closeable {

        private JedisCommands invoke;

        public JedisConn(JedisCommands invoke) {
            this.invoke = invoke;
        }

        /**
         * 設置一個必須是不存在的值
         *
         * @param key   - 關鍵字
         * @param value
         * @return 1-成功 0-失敗
         */
        public Long setnx(String key, String value) {
            return invoke.setnx(key, value);
        }

        /**
         * 獲得一個值
         *
         * @param key - 關鍵字
         * @return
         */
        public String get(String key) {
            return invoke.get(key);
        }

        /**
         * 更新一個值
         *
         * @param key   - 關鍵字
         * @param value - 值
         * @return
         */
        public String set(String key, String value) {
            return invoke.set(key, value);
        }

        /**
         * 更新一個值,并返回更新前的老值
         *
         * @param key   - 關鍵字
         * @param value - 值
         * @return 更新前的老值
         */
        public String getSet(String key, String value) {
            return invoke.getSet(key, value);
        }

        /**
         * 刪除一個值
         *
         * @param key - 關鍵字
         */
        public void del(String key) {
            invoke.del(key);
        }

        /**
         * 遞增一個值,并返回最新值
         *
         * @param key - 關鍵字
         * @return 最新值
         */
        public Long incr(String key) {
            return invoke.incr(key);
        }

        /**
         * 遞增一個值,并返回最新值
         *
         * @param key - 關鍵字
         * @return 最新值
         */
        public Long incr(String key, long total) {
            return invoke.incrBy(key, total);
        }

        /**
         * 設置過期時間
         *
         * @param key   - 關鍵字
         * @param expireTime - 過期時間,毫秒
         * @return
         */
        public Long expire(String key, long expireTime) {
            return invoke.pexpire(key, expireTime);
        }



        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";//NX是不存在時才set
        private static final String SET_WITH_EXPIRE_TIME = "PX";//默認毫秒, 解釋:EX是秒,PX是毫秒

        /**
         * 嘗試獲取分布式鎖
         * @param lockKey 鎖
         * @param requestId 請求標識
         * @param expireTime 超期時間
         * @return 是否獲取成功
         */
        public boolean tryLock(String lockKey, String requestId, long expireTime) {

            String result = invoke.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;

        }


        private static final Long RELEASE_SUCCESS = 1L;

        /**
         * 釋放分布式鎖
         * @param lockKey 鎖
         * @param requestId 請求標識
         * @return 是否釋放成功
         */
        public boolean unLock(String lockKey, String requestId) {
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = evalScript(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }

        private Object evalScript(String script, List<String> keys, List<String> args) {
            return (invoke instanceof Jedis)
                    ? ((Jedis)invoke).eval(script, keys, args)
                    : ((JedisCluster)invoke).eval(script, keys, args);
        }


        public void close() {
            if (invoke instanceof Jedis) {
                ((Jedis) invoke).close();
            }
        }



    }


}
package com.xxx.arch.seq.core;

import com.xxx.arch.seq.client.redis.JedisConfig;
import com.xxx.arch.seq.task.ContinuationOfLifeTask;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.*;

/**
 * 基于redis 的分布式鎖
 */
@Slf4j
public final class DistributedLock {

    //續命任務延遲隊列
    private static final DelayQueue<ContinuationOfLifeTask> QUEUE = new DelayQueue<>();
    //續命任務映射緩存
    private static final Map<String, ContinuationOfLifeTask> CACHE = new ConcurrentHashMap<>();
    //延長鎖時間的守護線程
    private static final ExecutorService CONTINUATION_OF_LIFE_EXECUTOR = Executors.newSingleThreadExecutor();

    private static final long TIMEOUT = 1000;
    //限制最大長度
    private static final int SIZE = 5000;

    static {
        /**
         * 延長鎖時間的核心線程代碼
         */
        CONTINUATION_OF_LIFE_EXECUTOR.execute(() -> {
            while (true){
                //獲取優先級最高的任務
                ContinuationOfLifeTask task;
                try {
                    task = QUEUE.take();
                } catch (InterruptedException e) {
                    continue;
                }
                if (task == null){
                    continue;
                }
                //驗證是否活躍
                long nowTime = System.currentTimeMillis();
                if (task.isActive() && !task.isDiscarded(nowTime)){
                    //是否可以執行
                    if (task.isExecute(nowTime)){
                        task.execute();
                        //驗證是否還需要續命
                        if (task.isActive() && task.checkCount()){
                            QUEUE.add(task);
                        }else {
                            //清理不需要任務的緩存
                            CACHE.remove(task.getId());
                        }
                    }else {
                        //清理不需要任務的緩存
                        //如果是時間沒到不能執行的 不需要刪除,一般不存在
                        if (nowTime >= task.getEndTime()){
                            CACHE.remove(task.getId());
                        }
                    }
                }else {
                    //清理過期的或者不活躍的任務
                    CACHE.remove(task.getId());
                }
            }
        });
    }

    private DistributedLock(){}

    /**
     * 獲得分布式鎖
     *
     * @param lockKey    - 分布式鎖的key,保證全局唯一
     * @param requestId  - 本次請求的唯一ID,可用UUID等生成
     * @param expireTime - 鎖獲取后,使用的最長時間,毫秒
     * @param flagCount -  延續鎖的次數
     * @return - 是否成功獲取鎖
     */
    public static boolean getDistributeLock(String lockKey, String requestId, long expireTime,int flagCount) {
        JedisConfig.JedisConn conn = null;
        try {
            conn = JedisConfig.getInstance().getConn();
            //獲取鎖
            if (QUEUE.size() < SIZE && conn.tryLock(lockKey, requestId, expireTime)){
                //創建一個續命任務
                ContinuationOfLifeTask task = ContinuationOfLifeTask.build(lockKey, requestId, expireTime, flagCount);
                //如果放入隊列超時 或者失敗
                if (!QUEUE.offer(task, TIMEOUT, TimeUnit.MILLISECONDS)){
                    //釋放鎖
                    releaseDistributeLock(lockKey, requestId);
                    //返回鎖獲取失敗
                    return false;
                }
                //設置緩存
                CACHE.put(lockKey + requestId, task);
                return true;
            }
            return false;
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    /**
     * 獲取分布式鎖
     * 默認是延長3次鎖壽命
     * @param lockKey  分布式鎖的key,保證全局唯一
     * @param requestId  本次請求的唯一ID,可用UUID等生成
     * @param expireTime 鎖獲取后,使用的最長時間,毫秒
     * @return
     */
    public static boolean getDefaultDistributeLock(String lockKey, String requestId, long expireTime) {
        return getDistributeLock(lockKey, requestId, expireTime, 3);
    }

    /**
     * 獲取永久分布式鎖(默認24小時)
     * 使用時候記得一定要釋放鎖
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean getPermanentDistributedLock(String lockKey, String requestId){
        return getDistributeLock(lockKey, requestId, 10000, 6 * 60 * 24);
    }

    /**
     * 釋放分布式鎖
     *
     * @param lockKey   - 分布式鎖的key,保證全局唯一
     * @param requestId - 本次請求的唯一ID,可用UUID等生成
     * @return
     */
    public static boolean releaseDistributeLock(String lockKey, String requestId) {
        JedisConfig.JedisConn conn = null;
        try {
            ContinuationOfLifeTask task = CACHE.remove(lockKey + requestId);
            if (task != null){
                task.setActive(false);
                QUEUE.remove(task);
            }
            conn = JedisConfig.getInstance().getConn();
            return conn.unLock(lockKey, requestId);
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
}
package com.xxx.arch.seq.task;

import com.xxx.arch.seq.client.redis.JedisConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 續命任務類
 */
@Slf4j
public class ContinuationOfLifeTask implements Delayed {
    private String id;
    //結束時間 即為需要續命的時間
    private long endTime;
    //是否還存活
    private volatile boolean active;
    //鎖的key
    private String lockKey;
    //鎖超時時間
    private long timeout;
    //鎖的持續時間
    private long expireTime;
    //鎖的續命次數 -1 代表無限
    private int flagCount;
    //續命次數統計 count 不能大于 flagCount
    private int count;

    private ContinuationOfLifeTask(String id, String lockKey, long expireTime, long endTime, long timeout, int flagCount) {
        this.id = id;
        this.lockKey = lockKey;
        this.expireTime = expireTime;
        this.endTime = endTime;
        this.timeout = timeout;
        this.flagCount = flagCount;
        this.active = true;
        this.count = 0;
    }

    public void execute() {
        //該續命任務是否還存活
        if (active) {
            JedisConfig.JedisConn conn = null;
            // 當前次數是否小于指定續命次數
            // 當前時間是否大于結束時間
            if (flagCount > count) {
                //重試次數
                int retryCount = 0;
                // 當前時間是否大于過期時間
                while (System.currentTimeMillis() >= endTime && retryCount < 3) {
                    try {
                        // 續命延期鎖的過期時間
                        (conn = JedisConfig.getInstance().getConn()).expire(lockKey, expireTime);
                        long expiration = expireTime / 10;
                        //保證最少提前100毫秒
                        timeout = System.currentTimeMillis() + expireTime;
                        //更新結束時間
                        endTime = timeout - (expiration > 100 ? expiration : 100);
                        //增加執行次數
                        count++;
                        if (log.isDebugEnabled()) {
                            log.debug("【續命】鎖關鍵字:{},續期:{}毫秒,計數:{}", lockKey, expireTime, count);
                        }
                        break;
                    } catch (Exception e) {
                        try {
                            log.error(e.getMessage(), e);
                            retryCount++;
                            Thread.sleep(100L);
                        } catch (InterruptedException ie) {
                            log.error(e.getMessage(), e);
                        }
                    } finally {
                        if (conn != null) {
                            conn.close();
                        }
                    }
                }
            }
        }
    }

    /**
     * 是否可以執行 必須是活躍且執行次數沒有到最大值
     * 且時間沒有過期的任務才能執行
     *
     * @return
     */
    public boolean isExecute(long nowTime) {
        return nowTime >= endTime && nowTime <= timeout && flagCount >= count;
    }

    /**
     * 是否丟棄
     *
     * @return
     */
    public boolean isDiscarded(long nowTime) {
        return nowTime > timeout || flagCount <= count;
    }

    public boolean checkCount() {
        return count < flagCount;
    }

    public static final ContinuationOfLifeTask build(String lockKey, String requestId, long expireTime, int flagCount) {
        if (StringUtils.isAnyBlank(lockKey, requestId)) {
            throw new IllegalArgumentException("lockKey Can't be blank !");
        }
        //校驗入參如果鎖定時間低于 1000 毫秒 延長到 1000 毫秒
        if (expireTime < 1000) {
            expireTime = 1000;
        }
        //校驗 鎖的續命次數 如果小于 -1 則默認等于3
        if (flagCount < -1) {
            flagCount = 3;
        }
        long expiration = expireTime / 10;
        //保證最少提前100毫秒
        long timeout = System.currentTimeMillis() + expireTime;
        long endTime = timeout - (expiration > 500 ? expiration : 500);
        return new ContinuationOfLifeTask(lockKey + requestId, lockKey, expireTime, endTime, timeout, flagCount);
    }

    public long getEndTime() {
        return endTime;
    }

    public ContinuationOfLifeTask setEndTime(long endTime) {
        this.endTime = endTime;
        return this;
    }

    public boolean isActive() {
        return active;
    }

    public ContinuationOfLifeTask setActive(boolean active) {
        this.active = active;
        return this;
    }

    public String getLockKey() {
        return lockKey;
    }

    public ContinuationOfLifeTask setLockKey(String lockKey) {
        this.lockKey = lockKey;
        return this;
    }

    public long getExpireTime() {
        return expireTime;
    }

    public ContinuationOfLifeTask setExpireTime(long expireTime) {
        this.expireTime = expireTime;
        return this;
    }

    public int getFlagCount() {
        return flagCount;
    }

    public ContinuationOfLifeTask setFlagCount(int flagCount) {
        this.flagCount = flagCount;
        return this;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((endTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }
}
package com.xxx.arch.seq.constant;

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import org.apache.commons.lang3.StringUtils;

public class Constants {

	//apollo公共的ZK配置集群NameSpace
	public static final String ZK_NAME_SPACE = "33.zk";
	public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence-redis";
//	public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence";

	public static final String ARCH_SEQ_ZOOKEEPER_CONNECT_STRING = getConfig(ZK_NAME_SPACE,"zk.address", "");
	public static final String ARCH_SEQ_REDIS_NODES = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.nodes", "");
	public static final String ARCH_SEQ_REDIS_SENTINEL_MASTER = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.sentinel.master", "");
	public static final String ARCH_SEQ_REDIS_PASSWORD = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.common.key", "");

	public static String getConfig(String nameSpace,String key,String defultValue){
		if(StringUtils.isBlank(nameSpace)){
			return "";
		}
		Config config = ConfigService.getConfig(nameSpace);
		return config.getProperty(key,defultValue);
	}
}

上述內容就是Jedis中怎么實現分布式鎖,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

收藏| 巴林左旗| 黄浦区| 会泽县| 周至县| 额敏县| 财经| 开平市| 荔浦县| 文登市| 青浦区| 昌宁县| 汶上县| 莎车县| 金门县| 清涧县| 阿巴嘎旗| 柳江县| 滦平县| 沐川县| 万全县| 九台市| 泌阳县| 小金县| 镇巴县| 壶关县| 高阳县| 东至县| 吉安市| 调兵山市| 邵武市| 屏南县| 扶余县| 五指山市| 新晃| 永顺县| 巨野县| 盖州市| 滦平县| 广昌县| 万全县|