中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用Redis怎么實現延遲隊列

發布時間:2021-05-13 15:55:45 來源:億速云 閱讀:285 作者:Leah 欄目:開發技術

本篇文章給大家分享的是有關使用Redis怎么實現延遲隊列,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

方案一:

采用通過定時任務采用數據庫/非關系型數據庫輪詢方案。

優點:

1. 實現簡單,對于項目前期這樣是最容易的解決方案。

缺點:

1. DB 有效使用率低,需要將一部分的數據庫的QPS分配給 JOB 的無效輪詢。

2. 服務資源浪費,因為輪詢需要對所有的數據做一次 SCAN 掃描 JOB 服務的資源開銷很大。

方案二:

采用延遲隊列:

優點:

1. 服務的資源使用率較高,能夠精確的實現超時任務的執行。

2. 減少 DB 的查詢次數,能夠降低數據庫的壓力

缺點:

1. 對于延遲隊列來說本身設計比較復雜,目前沒有通用的比較好過的方案。

基于 Redis 的延遲隊列實現

基于以上的分析,我決定通過 Redis 來實現分布式隊列。

設計思路:

使用Redis怎么實現延遲隊列

1. 第一步將需要發放的消息發送到延遲隊列中。

2. 延遲隊列將數據存入 Redis 的 ZSet 有序集合中score 為當前時間戳,member 存入需要發送的數據。

3. 添加一個 schedule 來進行對 Redis 有序隊列的輪詢。

4. 如果到達達到消息的執行時間,那么就進行業務的執行。

5. 如果沒有達到消息的執行是將,那么消息等待下輪執行。

實現步驟:

由于本處篇幅有限,所以只列舉部分代碼,完整的代碼可以在本文最后訪問 GitHub 獲取。由于本人閱歷/水平有限,如有建議/或更正歡迎留言或提問。先在此謝謝大家駐足閱讀 ? ? ?。

需要注意的問題:

單個 Redis 命令的執行是原子性的,但 Redis 沒有在事務上增加任何維持原子性的機制,所以 Redis 事務的執行并不是原子性的。

事務可以理解為一個打包的批量執行腳本,但批量指令并非原子化的操作,中間某條指令的失敗不會導致前面已做指令的回滾,也不會造成后續的指令不做。

我們可以通過 Redis 的 eval 命令來執行 lua 腳本來保證原子性實現Redis的事務。

實現步驟如下:

1. 延遲隊列接口

/**
 * 延遲隊列
 *
 * @author zhengsh
 * @date 2020-03-27
 */
public interface RedisDelayQueue<E extends DelayMessage> {

    String META_TOPIC_WAIT = "delay:meta:topic:wait";
    String META_TOPIC_ACTIVE = "delay:meta:topic:active";
    String TOPIC_ACTIVE = "delay:active:9999";
    /**
     * 拉取消息
     */
    void poll();

    /**
     * 推送延遲消息
     *
     * @param e
     */
    void push(E e);
}

2. 延遲隊列消息

/**
 * 消息體
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Setter
@Getter
public class DelayMessage {
    /**
     * 消息唯一標識
     */
    private String id;
    /**
     * 消息主題
     */
    private String topic = "default";
    /**
     * 具體消息 json
     */
    private String body;
    /**
     * 延時時間, 格式為時間戳: 當前時間戳 + 實際延遲毫秒數
     */
    private Long delayTime = System.currentTimeMillis() + 30000L;
    /**
     * 消息發送時間
     */
    private LocalDateTime createTime;
}

3. 延遲隊列實現

/**
 * 延遲隊列實現
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Component
public class RedisDelayQueueImpl<E extends DelayMessage> implements RedisDelayQueue<E> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void poll() {
        // todo
    }

    /**
     * 發送消息
     *
     * @param e
     */
    @SneakyThrows
    @Override
    public void push(E e) {
        try {
            String jsonStr = JSON.toJSONString(e);
            String topic = e.getTopic();
            String zkey = String.format("delay:wait:%s", topic);
            String u =
                    "redis.call('sadd', KEYS[1], ARGV[1])\n" +
                            "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" +
                            "return 1";

            Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};
            Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};

            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                Object nativeConnection = connection.getNativeConnection();

                if (nativeConnection instanceof RedisAsyncCommands) {
                    RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                    RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                }
                return 0L;
            });
            logger.info("延遲隊列[1],消息推送成功進入等待隊列({}), topic: {}", result != null && result > 0, e.getTopic());
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private byte[] serialize(String key) {
        RedisSerializer<String> stringRedisSerializer =
                (RedisSerializer<String>) redisTemplate.getKeySerializer();
        //lettuce連接包下序列化鍵值,否則無法用默認的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

4. 定時任務

/**
 * 分發任務
 */
@Component
public class DistributeTask {

    private static final String LUA_SCRIPT;
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private StringRedisTemplate redisTemplate;

    static {
        StringBuilder sb = new StringBuilder(128);
        sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n");
        sb.append("if(next(val) ~= nil) then\n");
        sb.append("    redis.call('sadd', KEYS[2], ARGV[2])\n");
        sb.append("    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n");
        sb.append("    for i = 1, #val, 100 do\n");
        sb.append("        redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n");
        sb.append("    end\n");
        sb.append("    return 1\n");
        sb.append("end\n");
        sb.append("return 0");
        LUA_SCRIPT = sb.toString();
    }

    /**
     * 2秒鐘掃描一次執行隊列
     */
    @Scheduled(cron = "0/5 * * * * ?")
    public void scheduledTaskByCorn() {
        try {
            Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);
            assert members != null;
            for (String k : members) {
                if (!redisTemplate.hasKey(k)) {
                    // 如果 KEY 不存在元數據中刪除
                    redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);
                    continue;
                }

                String lk = k.replace("delay:wait", "delay:active");
                Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};
                Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};
                Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                    Object nativeConnection = connection.getNativeConnection();

                    if (nativeConnection instanceof RedisAsyncCommands) {
                        RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                        RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    }
                    return 0L;
                });
                logger.info("延遲隊列[2],消息到期進入執行隊列({}): {}", result != null && result > 0, TOPIC_ACTIVE);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private byte[] serialize(String key) {
        RedisSerializer<String> stringRedisSerializer =
                (RedisSerializer<String>) redisTemplate.getKeySerializer();
        //lettuce連接包下序列化鍵值,否則無法用默認的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

以上就是使用Redis怎么實現延遲隊列,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

克拉玛依市| 囊谦县| 凤冈县| 友谊县| 潜江市| 慈利县| 安塞县| 四会市| 黄梅县| 嘉定区| 云林县| 威远县| 高邮市| 江达县| 浦东新区| 昭通市| 崇仁县| 木里| 浮梁县| 德令哈市| 肇源县| 西充县| 湟中县| 资阳市| 措勤县| 涞水县| 通化县| 成武县| 泾川县| 昌都县| 西安市| 六枝特区| 丰原市| 刚察县| 拉孜县| 温宿县| 汨罗市| 广德县| 新源县| 金阳县| 阿拉善右旗|