中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如和用RabbitMQ和WebSocket實現消息推送

發布時間:2020-06-09 13:36:06 來源:億速云 閱讀:352 作者:鴿子 欄目:編程語言

基于 Hyperf+ WebSocket +RabbitMQ 實現的一個簡單大屏幕的消息推送。

思路

利用 WebSocket 協議讓客戶端和服務器端保持有狀態的長鏈接,

保存鏈接上來的客戶端 id。訂閱發布者發布的消息針對已保存的客戶端 id 進行廣播消息。

WebSocket 服務

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?php
return [
    'mode' => SWOOLE_PROCESS,
    'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 11111,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
            ],
        ],
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 12222,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
            ],
        ],
    ],

WebSocket 服務器端代碼示例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
     * 發送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新緩存
        $redis = $this->container->get(\Redis::class);
        //獲取所有的客戶端id
        $fdList = $redis->sMembers('websocket_sjd_1');
        //如果當前客戶端在客戶端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd('websocket_sjd_1', $frame->fd);
            $redis->expire('websocket_sjd_1', 7200);
        }
        $server->push($frame->fd, 'Recv: ' . $frame->data);
    }
    /**
     * 客戶端失去鏈接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //刪掉客戶端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem('websocket_sjd_1', $fd);
        var_dump('closed');
    }
    /**
     * 客戶端鏈接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客戶端id
        $redis = $this->container->get(\Redis::class);
        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
        var_dump($res1);
        $res = $redis->expire('websocket_sjd_1', 7200);
        var_dump($res);
        $server->push($request->fd, 'Opened');
    }
}

WebSocket 前端代碼

function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的瀏覽器支持 WebSocket!");
            var num = 0
            // 打開一個 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");
            ws.onopen = function () {
                // Web Socket 已連接上,使用 send() 方法發送數據
                //alert("數據發送中...");
                //ws.send("發送數據");
            };
            window.setInterval(function () { //每隔5秒鐘發送一次心跳,避免websocket連接因超時而自動斷開
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);
            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };
            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 關閉 websocket
                alert("連接已關閉...");
            };
        } else {
            alert("您的瀏覽器不支持 WebSocket!");
        }
    }

AMQP 組件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?php
return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'pool' => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout' => 3.0,
            'heartbeat' => -1,
        ],
        'params' => [
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 6.0,
            'context' => null,
            'keepalive' => false,
            'heartbeat' => 3,
        ],
    ],
];

MQ 消費者代碼

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消費端代碼
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);
        //獲取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers('websocket_sjd_1');
        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }
        return Result::ACK;
    }
}

控制器代碼

  /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            'code' => 200,
            'data' => [
                'userOutName' => 'ccflow',
                'userOutNum' => '9999',
                'recordOutTime' => date("Y-m-d H:i:s", time()),
                'doorOutName' => '教師公寓',
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);
        $user = $this->request->input('user', 'Hyperf');
        $method = $this->request->getMethod();
        return [
            'method' => $method,
            'message' => "{$user}.",
        ];
    }

最終效果

如和用RabbitMQ和WebSocket實現消息推送


以上就是基于 Hyperf + RabbitMQ + WebSocket 實現消息推送的詳細內容,更多請關注億速云其它相關文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泊头市| 乐业县| 玉溪市| 文昌市| 梁平县| 汕尾市| 建宁县| 郁南县| 江阴市| 澎湖县| 永州市| 绍兴县| 久治县| 内乡县| 和顺县| 淮滨县| 武强县| 高青县| 黄梅县| 青海省| 邹平县| 雷波县| 资中县| 延安市| 临夏市| 葵青区| 屏山县| 兰州市| 宿松县| 溧水县| 鸡泽县| 木兰县| 且末县| 财经| 定西市| 荔浦县| 新建县| 常宁市| 瑞安市| 宁夏| 嘉祥县|