您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“PHP如何實現php-amqplib/php-amqplib實例RabbitMq”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“PHP如何實現php-amqplib/php-amqplib實例RabbitMq”這篇文章吧。
https://gitee.com/owenzhang24/tp5
1: 列出隊列(Listing queues)
如果你想查看Rabbitmq隊列,并且想知道有多少消息存在其中,你(作為特權用戶)可以使用rabbitmqctl 工具:
rabbitmqctl list_queues。
在Windows中,省略sudo:
rabbitmqctl.bat list_queues
2: 工作隊列
我們發現即使使用CTRL+C殺掉了一個工作者(worker)進程,消息也不會丟失。當工作者(worker)掛掉這后,所有沒有響應的消息都會重新發送。
一個很容易犯的錯誤就是忘了basic_ack,后果很嚴重。消息在你的程序退出之后就會重新發送,如果它不能夠釋放沒響應的消息,RabbitMQ就會占用越來越多的內存。
為了排除這種錯誤,你可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在window系統運行,去掉sudo:
$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
3: rabbitmqctl能夠列出服務器上所有的交換器:
$ sudo rabbitmqctl list_exchanges
這個列表中有一些叫做amq.*的交換器。這些都是默認創建的,不過這時候你還不需要使用他們。
4:列出所有現存的綁定 rabbitmqctl list_bindings
5: 如果你想把日志保存到文件中,只需要打開控制臺輸入: ( receive_logs.php 源代碼)
$ php receive_logs.php > logs_from_rabbit.log
如果你希望所有的日志信息都輸出到屏幕中,打開一個新的終端,然后輸入:
$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
如果要觸發一個error級別的日志,只需要輸入:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
windows環境的rabbitmq安裝與啟動
https://my.oschina.net/owenzhang24/blog/5051652
rabbitMq實現的基礎類:application/common/lib/classes/rabbitmq/RabbitMq.php
供外部調用的rabbitMq類:application/common/lib/classes/RabbitMqWork.php
測試發送消息到rabbitMq中的方法:application/index/controller/Index.php
添加php think命令實現接收rabbitMq中的消息:application/common/command/*.php
發送消息時直接在自己的方法中調用RabbitMqWork.php類中的幾個送消息的方法即可。
application/common/command/下的類都是實現添加php think命令的類,在configure方法中的setName()中設置命令名稱,execute()方法是為了執行接收rabbitMq中的消息,同時在application/command.php中return添加設置的命令名稱及對應的命令目錄地址。
貢獻文檔
RabbitMQ 中文文檔-PHP版。https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/
RabbitMQ官方文檔。https://www.rabbitmq.com/getstarted.html
application/common/lib/classes/rabbitmq/RabbitMq.php
<?php //rabbitMq實現的基礎類 namespace app\common\lib\classes\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMq { static private $instance; static private $connection; static private $channel; const DIRECT = 'direct'; const TOPIC = 'topic'; const HEADERS = 'headers'; const FANOUT = 'fanout'; static private $exchangeNames = [ self::DIRECT => 'direct_exchange', self::TOPIC => 'topic_exchange', self::HEADERS => 'headers_exchange', self::FANOUT => 'fanout_exchange', ]; const SEVERITYS = [ 'info', 'warning', 'error' ]; static private $exchangeName = ''; /** * RabbitMq constructor. * @param $exchangeType */ private function __construct($exchangeType) { self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); self::$channel = self::$connection->channel(); if (!empty($exchangeType)) { self::$exchangeName = self::$exchangeNames[$exchangeType]; self::$channel->exchange_declare( self::$exchangeName, //交換機名稱 $exchangeType, //路由類型 false, //don't check if a queue with the same name exists 是否檢測同名隊列 true, //the queue will not survive server restarts 是否開啟隊列持久化 false //the queue will be deleted once the channel is closed. 通道關閉后是否刪除隊列 ); } } /** * 實例化 * @param string $exchangeType * @return RabbitMq */ public static function instance($exchangeType = '') { if (!self::$instance instanceof self) { self::$instance = new self($exchangeType); } return self::$instance; } /** * 防止被外部復制 */ private function __clone() { } /** * 簡單的發送 */ public function send() { self::$channel->queue_declare('hello', false, false, false); $msg = new AMQPMessage('Hello World!'); self::$channel->basic_publish($msg, '', 'hello'); echo "[X] Sent 'Hello World!'\n"; } /** * 簡單的接收 * @param $queueName * @param $callback */ public function receive($callback) { self::$channel->queue_declare('hello', false, false, false, true); echo "[*] Waiting for messages. To exit press CTRL+C\n"; self::$channel->basic_consume('hello', '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } /** * 添加工作隊列 * @param string $data */ public function addTask($data = '') { self::$channel->queue_declare('task_queue', false, true, false, true); if (empty($data)) $data = 'Hello World!'; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); self::$channel->basic_publish($msg, '', 'task_queue'); echo "[x] Sent $data \n"; } /** * 執行工作隊列 * @param $callback */ public function workTask($callback) { self::$channel->queue_declare('task_queue', false, true, false, true); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; self::$channel->basic_qos(null, 1, null); self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } /** * 發布 * @param string $data */ public function sendQueue($data = '') { if (empty($data)) $data = 'info:Hello World!'; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName); echo "[x] Sent $data \n"; } /** * 訂閱 * @param $callback */ public function subscribeQueue($callback) { list($queue_name, ,) = self::$channel->queue_declare( "", //隊列名稱 false, //don't check if a queue with the same name exists 是否檢測同名隊列 true, //the queue will not survive server restarts 是否開啟隊列持久化 true, //the queue might be accessed by other channels 隊列是否可以被其他隊列訪問 false //the queue will be deleted once the channel is closed. 通道關閉后是否刪除隊列 ); self::$channel->queue_bind($queue_name, self::$exchangeName); echo "[*] Waiting for logs. To exit press CTRL+C \n"; self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } /** * 發送(直接交換機) * @param $routingKey * @param string $data */ public function sendDirect($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo "[x] Sent $routingKey:$data \n"; } /** * 接收(直接交換機) * @param \Closure $callback * @param array $bindingKeys */ public function receiveDirect(\Closure $callback, array $bindingKeys) { list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey); } echo "[x] Waiting for logs. To exit press CTRL+C \n"; self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } /** * 發送(主題交換機) * @param $routingKey * @param string $data */ public function sendTopic($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo " [x] Sent ", $routingKey, ':', $data, " \n"; } /** * 接收(主題交換機) * @param \Closure $callback * @param array $bindingKeys */ public function receiveTopic(\Closure $callback, array $bindingKeys) { list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; self::$channel->basic_consume($queueName, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } } /** * 銷毀 */ public function __destruct() { // TODO: Implement __destruct() method. self::$channel->close(); self::$connection->close(); } }
application/common/lib/classes/RabbitMqWork.php
<?php //供外部調用的rabbitMq類 namespace app\common\lib\classes; use app\common\lib\classes\rabbitmq\RabbitMq; class RabbitMqWork { private $RabbitMq; public function __construct($exchageType = '') { $this->RabbitMq = RabbitMq::instance($exchageType); } /** * 發送(普通) */ public function send() { $this->RabbitMq->send(); } /** * 接收(普通) * @param $callback */ public function receive($callback) { $this->RabbitMq->receive($callback); } /** * 發送(工作隊列) * @param $data */ public function addTask($data) { $this->RabbitMq->addTask($data); } /** * 接收(工作隊列) * @param $callback */ public function workTask($callback) { $this->RabbitMq->workTask($callback); } /** * 發布(扇形交換機) * @param $data */ public function sendQueue($data) { $this->RabbitMq->sendQueue($data); } /** * 訂閱(扇形交換機) * @param $callback */ public function subscribeQueue($callback) { $this->RabbitMq->subscribeQueue($callback); } /** * 發送(直接交換機) * @param $bindingKey * @param $data */ public function sendDirect($routingKey, $data) { $this->RabbitMq->sendDirect($routingKey, $data); } /** * 接收(直接交換機) * @param \Closure $callback * @param array $bindingKeys */ public function receiveDirect(\Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveDirect($callback, $bindingKeys); } /** * 發送(主題交換機) * @param $routingKey * @param $data */ public function sendTopic($routingKey, $data) { $this->RabbitMq->sendTopic($routingKey, $data); } /** * 接收(主題交換機) * @param \Closure $callback * @param array $bindingKeys */ public function receiveTopic(\Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveTopic($callback, $bindingKeys); } }
application/index/controller/Index.php
<?php namespace app\index\controller; use app\common\lib\classes\rabbitmq\RabbitMq; use app\common\lib\classes\RabbitMqWork; use app\polymerize\tool\module\es\SearchBlog; use app\polymerize\tool\module\es\SyncBlog; use think\Collection; class Index extends Collection { public function index() { // $this->send(); // $this->addTask(); // $this->sendQueue(); // $this->sendDirect(); $this->sendTopic(); var_dump(11); die(); } public function searchBlog() { // $id=1; // $res = SyncBlog::getInstance()->syncBlog($id); $search='11'; $res = SearchBlog::getInstance()->searchBlog($search, 1, 100); var_dump($res); die(); var_dump(1111); die(); } /** * 發送(普通) */ public function send() { $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->send(); } /** * 發送(工作隊列) */ public function addTask() { $data = input('data', 'This is work task!'); $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->addTask($data); } /** * 發送(扇形交換機) */ public function sendQueue() { $data = input('data', 'This is send queue1'); $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $RabbitMqWork->sendQueue($data); } /** * 發送(直接交換機) */ public function sendDirect() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'info'); $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $RabbitMqWork->sendDirect($routingKey, $data); } /** * 發送(主題交換機) */ public function sendTopic() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'lazy.boy'); $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $RabbitMqWork->sendTopic($routingKey, $data); } }
application/command.php
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- return [ 'simpleMq' => 'application\command\SimpleWork', 'workQueue' => 'application\command\WorkQueue', 'sendQueue' => 'application\command\SendQueue', 'directQueue' => 'application\command\DirectQueue', 'topicQueue' => 'application\command\TopicQueue', ];
application/common/command/*.php
application/command/DirectQueue.php
<?php /** * 接收(直接交換機) * @param \Closure $callback * @param array $bindingKeys */ namespace app\command; use app\common\lib\classes\rabbitmq\RabbitMq; use app\common\lib\classes\RabbitMqWork; use think\console\Command; use think\console\Input; use think\console\Output; class DirectQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('directQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $callback = function ($msg){ echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n"; }; $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS); } }
application/command/SendQueue.php
<?php /** * 訂閱(扇形交換機) * @param $callback */ namespace app\command; use app\common\lib\classes\rabbitmq\RabbitMq; use app\common\lib\classes\RabbitMqWork; use think\console\Command; use think\console\Input; use think\console\Output; use think\Log; class SendQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('sendQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $callback = function ($msg) { echo 'Receive:'; echo "Msg:$msg->body \n"; \Log::error("Msg:$msg->body"); }; $RabbitMqWork->subscribeQueue($callback); } }
application/command/SimpleWork.php
<?php /** * 接收(普通) * @param $callback */ namespace app\command; use app\common\lib\classes\RabbitMqWork; use think\console\Command; use think\console\Input; use think\console\Output; use think\Log; class SimpleWork extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('simpleMq'); } protected function execute(Input $input, Output $output) { $RabbitMqWork= new RabbitMqWork(); $callback = function ($msg){ echo 'Receive:'; $queueName = $msg->delivery_info['routing_key']; $msgData = $msg->body; $isAck = true; echo 'Msg:'.$msgData."\n"; echo 'QueueName:'.$queueName."\n"; if($isAck) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $RabbitMqWork->receive($callback); } }
application/command/TopicQueue.php
<?php /** * 接收(主題交換機) * @param \Closure $callback * @param array $bindingKeys */ namespace app\command; use app\common\lib\classes\rabbitmq\RabbitMq; use app\common\lib\classes\RabbitMqWork; use think\console\Command; use think\console\Input; use think\console\Output; class TopicQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('topicQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $callback = function ($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $bindingKeys = [ '*.orange.*', '*.*.rabbit', 'lazy.#' ]; $RabbitMqWork->receiveTopic($callback,$bindingKeys); } }
application/command/WorkQueue.php
<?php /** * 接收(工作隊列) * @param $callback */ namespace app\command; use app\common\lib\classes\RabbitMqWork; use think\console\Command; use think\console\Input; use think\console\Output; class WorkQueue extends Command { protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('workQueue'); } protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(); $callback = function ($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $RabbitMqWork->workTask($callback); } }
以上是“PHP如何實現php-amqplib/php-amqplib實例RabbitMq”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。