中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能

發布時間:2021-10-20 17:02:03 來源:億速云 閱讀:169 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能

netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能

netty的pipeline處理鏈上的handler:需要IdleStateHandler心跳檢測channel是否有效,以及處理登錄認證的UserAuthHandler和消息處理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {
	ch.pipeline().addLast(defLoopGroup,
		//編碼解碼器
		new HttpServerCodec(),
		//將多個消息轉換成單一的消息對象
		new HttpObjectAggregator(65536),
		//支持異步發送大的碼流,一般用于發送文件流
		new ChunkedWriteHandler(),
		//檢測鏈路是否讀空閑,配合心跳handler檢測channel是否正常
		new IdleStateHandler(60, 0, 0),
		//處理握手和認證
		new UserAuthHandler(),
		//處理消息的發送
		new MessageHandler()
	);
}

對于所有連進來的channel,我們需要保存起來,往后的群發消息需要依靠這些channel

public static void addChannel(Channel channel) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        System.out.println("addChannel:" + remoteAddr);
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}", remoteAddr);
        }
        UserInfo userInfo = new UserInfo();
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfo.setTime(System.currentTimeMillis());
        userInfos.put(channel, userInfo);
    }

登錄后,channel就變成有效的channel,無效的channel之后將會丟棄

public static boolean saveUser(Channel channel, String nick, String password) {
        UserInfo userInfo = userInfos.get(channel);
        if (userInfo == null) {
            return false;
        }
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
            return false;
        }
        // 驗證用戶名和密碼
        if (nick == null || password == null) {
            return false;
        }
        LambdaQueryWrapper<account> lambdaQueryWrapper = new LambdaQueryWrapper&lt;&gt;();
        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
        if (account == null) {
            return false;
        }
        // 增加一個認證用戶
        userCount.incrementAndGet();
        userInfo.setNick(nick);
        userInfo.setAuth(true);
        userInfo.setId(account.getId());
        userInfo.setUsername(account.getUsername());
        userInfo.setGroupNumber(account.getGroupNumber());
        userInfo.setTime(System.currentTimeMillis());

        // 注冊該用戶推送消息的通道
        offlineInfoTransmitStatic.registerPull(channel);
        return true;
    }

當channel關閉時,就不再接收消息。unregisterPull就是注銷信息消費者,客戶端不再接取聊天消息。此外,從下方有一個加寫鎖的操作,就是為了避免channel還在發送消息時,這邊突然關閉channel,這樣會導致報錯。

public static void removeChannel(Channel channel) {
        try {
            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
            //加上讀寫鎖保證移除channel時,避免channel關閉時,還有別的線程對其操作,造成錯誤
            rwLock.writeLock().lock();
            channel.close();
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo != null) {
                if (userInfo.isAuth()) {
                    offlineInfoTransmitStatic.unregisterPull(channel);
                    // 減去一個認證用戶
                    userCount.decrementAndGet();
                }
                userInfos.remove(channel);
            }
        } finally {
            rwLock.writeLock().unlock();
        }

    }

為了無縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲和轉發聊天消息這4種狀態,定義如下4個接口。依次是發送單聊消息、群聊消息、客戶端啟動接收消息、客戶端下線不接收消息。

public interface OfflineInfoTransmit {
    void pushP2P(Integer userId, String message);

    void pushGroup(String groupNumber, String message);

    void registerPull(Channel channel);

    void unregisterPull(Channel channel);
}

其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來存儲和轉發聊天消息,它的處理流程如下:

  1. 單聊的模型參考線程池的模型,如果用戶在線,直接通過channel發送給用戶。如果用戶離線,則發往中間件存儲,下次用戶上線時直接從中間件拉取消息。這樣做對比所有消息的發送都通過中間件來轉的好處是提升了性能

  2. 群聊則是完全通過中間件來轉發消息,消息發送中間件,客戶端從中間件接取消息。如果仍像單聊那樣操作,在線用戶直接通過channel發送,操作過于繁瑣,要判斷這個群組的哪些用戶是否在線

  3. 如果用戶在線就注冊消費者,從中間件接取消息。否則,就斷開消費者,消息保留在中間件中,以便客戶端下次上線時拉取。這樣就實現了離線消息的接收。

  4. 不管使用哪種中間件或使用不使用中間件,它的處理流程都遵循上面的3個要求,就能無縫切換上方的4種方法來存儲和轉發消息。需要哪種方法開啟相應注解即可。

netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能

項目地址:https://github.com/shuangyueliao/netty-chat

關于netty無縫切換rabbitmq和activem及qrocketmq實現聊天室單聊、群聊功能就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

德化县| 湘潭县| 来凤县| 根河市| 巴彦淖尔市| 西藏| 始兴县| 宜兰县| 凉山| 巴中市| 南城县| 东方市| 肇源县| 色达县| 新疆| 勐海县| 青冈县| 伊宁市| 普宁市| 明光市| 乐陵市| 通江县| 西林县| 房产| 黄浦区| 文昌市| 郓城县| 金湖县| 隆安县| 平阴县| 卓尼县| 长海县| 大埔区| 营口市| 贡山| 南宁市| 临安市| 威信县| 罗源县| 宣威市| 凉城县|