中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PHP怎么實現RabbitMQ消息列隊

發布時間:2022-05-11 10:21:32 來源:億速云 閱讀:200 作者:iii 欄目:開發技術

這篇“PHP怎么實現RabbitMQ消息列隊”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“PHP怎么實現RabbitMQ消息列隊”文章吧。

業務場景

項目公司是主php做開發的,框架為thinkphp。眾所周知,php本身的運行效率存在一定的缺陷,所以如果有一個很復雜很耗時的業務時,必須開發一個常駐內存的程序。首先我想到了php的workerman與swoole,但是這里應上面的標題哈,想將耗時任務交給另一個服務器,同時列隊處理。所以這里我想獨立部署一個rabbitMQ服務器用于處理列隊任務。

當rabbitMQ服務器我們準備好了,建立了一個持久化命名為ceshi的列隊,如下:

PHP怎么實現RabbitMQ消息列隊

項目上生產者和消費者的開發我這里全部采用tinkphp6+workerman,為便于管理。這里這么做也是因為發現workerman中對rabbitMQ的文檔解釋太少了!

所以開始踩坑!

1、首先部署好thinkphp6框架

過程去看thinkphp6手冊

2、安裝workerman擴展

過程去看thinkphp6手冊

PHP怎么實現RabbitMQ消息列隊

3、生產者

配置一個workerman類

PHP怎么實現RabbitMQ消息列隊

PHP怎么實現RabbitMQ消息列隊

創建的Send類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
    //websocket地址,一會用于測試。
    protected $socket = 'websocket://127.0.0.1:2345';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{
        //websocket發送過來的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創建隊列(Queue)
             * name: ceshi         // 隊列名稱
             * passive: false      // 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建
             * durable: true       // 是否持久化,設置false是存放到內存中RabbitMQ重啟后會丟失,
             *                        設置true則代表是一個持久的隊列,服務重啟之后也會存在,因為服務會把持久化的Queue存放在硬盤上,當服務重啟的時候,會重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項為true則隊列只對當前連接有效,連接斷開后自動刪除
             *  auto_delete: false // 是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) use($data){
            echo "發送消息內容:".$data."\n";

            /**
             * 發送消息
             * body 發送的數據
             * headers 數據頭,建議 ['content_type' => 'text/plain'],這樣消費端是springboot注解接收直接是字符串類型
             * exchange 交換器名稱
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */

            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            //echo " [x] Sent 'Hello World!'\n";
            $client = $channel->getClient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (Client $client) {
            $client->disconnect();
        });
    }

    /**
     * 當連接建立時觸發的回調函數
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當連接斷開時觸發的回調函數
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當客戶端的連接上發生錯誤時觸發
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個進程啟動
     * @param $worker
     */
    public function onWorkerStart($worker)
{


    }
}

上述都OK以后咱們可以項目路徑下通過命令啟動這個生產者:

php think worker:server

PHP怎么實現RabbitMQ消息列隊

測試發送數據:

PHP怎么實現RabbitMQ消息列隊

通過這個網站

連接【ws://127.0.0.1:2345】后發送數據!

PHP怎么實現RabbitMQ消息列隊

前往rabbitMQ控制臺

PHP怎么實現RabbitMQ消息列隊

列隊中有一條消息產生并且等待了!

這個時候你可能問,如果我發送數據不想通過ws發送而是接口發送怎么辦?

笨思路唄:接口給內置服務器發消息->內置服務去發消息給rabbitMQ

PHP怎么實現RabbitMQ消息列隊

將協議改為tcp

然后重新啟動服務

PHP怎么實現RabbitMQ消息列隊

然后去tp6創建一個路由接口

PHP怎么實現RabbitMQ消息列隊

接口代碼

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
    public function index(string $msg)
{
        //連接本地tcp服務
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //發送字符串
        fwrite($client, $msg."\n");
        //斷開服務
        fclose($client);
        return 'OK';
    }

}

執行結果:

PHP怎么實現RabbitMQ消息列隊

說明接口成功的將數據發送給了本地內置的tcp服務。

PHP怎么實現RabbitMQ消息列隊

同時,內置服務將收到的數據給了rabbitMQ服務列隊中。

生產者完成。

4、消費者

同生產者一樣新創建一個thinkphp6及安裝workerman擴展,注意端口別和生產者沖突!這里我設置的是2346端口

PHP怎么實現RabbitMQ消息列隊

創建的Receive類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
    protected $socket = 'tcp://127.0.0.1:2346';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{

    }

    /**
     * 當連接建立時觸發的回調函數
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當連接斷開時觸發的回調函數
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當客戶端的連接上發生錯誤時觸發
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個進程啟動
     * @param $worker
     */
    public function onWorkerStart($worker)
{
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創建隊列(Queue)
             * name: ceshi         // 隊列名稱
             * passive: false      // 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建
             * durable: true       // 是否持久化,設置false是存放到內存中RabbitMQ重啟后會丟失,
             *                        設置true則代表是一個持久的隊列,服務重啟之后也會存在,因為服務會把持久化的Queue存放在硬盤上,當服務重啟的時候,會重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項為true則隊列只對當前連接有效,連接斷開后自動刪除
             *  auto_delete: false // 是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
            $channel->consume(
                function (Message $message, Channel $channel, Client $client) {
                    echo "接收消息內容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });

    }
}

都OK以后咱們可以項目路徑下通過命令啟動這個消費者:

php think worker:server

此時應該會自動消費掉rabbitMQ中等待的消息!

PHP怎么實現RabbitMQ消息列隊

PHP怎么實現RabbitMQ消息列隊

到這里消費者也就結束啦!

5、整體測試

接下來我用cmd來啟動兩個服務,然后用接口發送消息和消費測試!

PHP怎么實現RabbitMQ消息列隊

至于具體怎么靈活應用自行開拓大腦哦~

比如php項目有些業務吃力,可以去做個java的消費端,讓java來完成任務~

PHP怎么實現RabbitMQ消息列隊

以上就是關于“PHP怎么實現RabbitMQ消息列隊”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

德昌县| 宾川县| 九台市| 类乌齐县| 定日县| 盐源县| 平凉市| 东阿县| 如东县| 榆林市| 敦化市| 双辽市| 茂名市| 曲松县| 明水县| 台山市| 富平县| 汉沽区| 休宁县| 无为县| 台前县| 济南市| 松滋市| 亳州市| 建湖县| 桦南县| 安吉县| 台北市| 宝兴县| 泌阳县| 西青区| 稻城县| 饶河县| 同心县| 光山县| 迁西县| 金川县| 柯坪县| 泰宁县| 徐闻县| 三穗县|