您好,登錄后才能下訂單哦!
今天小編就為大家帶來一篇有關PHP實現延時隊列和處理超時訂單的文章。小編覺得挺實用的,為此分享給大家做個參考。一起跟隨小編過來看看吧。
延時隊列
Delayproducer.Php
Amqpbuilder.Php
AmqpBuilder.php
<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\QueueBuilder; class AmqpBuilder extends QueueBuilder { /** * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments * * @return \Hyperf\Amqp\Builder\Builder */ public function setArguments($arguments) : Builder { $this->arguments = array_merge($this->arguments, $arguments); return $this; } /** * 設置延時隊列相關參數 * * @param string $queueName * @param int $xMessageTtl * @param string $xDeadLetterExchange * @param string $xDeadLetterRoutingKey * * @return $this */ public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self { $this->setArguments([ 'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒 'x-dead-letter-exchange' => ['S', $xDeadLetterExchange], 'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey], ]); $this->setQueue($queueName); return $this; } }
DelayProducer.php
<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Di\Annotation\AnnotationCollector; use PhpAmqpLib\Message\AMQPMessage; use Throwable; class DelayProducer extends Builder { /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool { return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout) { return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout); }); } /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool { $result = false; $this->injectMessageProperty($producerMessage); $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); $pool = $this->getConnectionPool($producerMessage->getPoolName()); /** @var \Hyperf\Amqp\Connection $connection */ $connection = $pool->get(); if ($confirm) { $channel = $connection->getConfirmChannel(); } else { $channel = $connection->getChannel(); } $channel->set_ack_handler(function () use (&$result) { $result = true; }); try { // 處理延時隊列 $exchangeBuilder = $producerMessage->getExchangeBuilder(); // 隊列定義 $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket()); // 路由定義 $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket()); // 隊列綁定 $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey()); // 消息發送 $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey()); $channel->wait_for_pending_acks_returns($timeout); } catch (Throwable $exception) { // Reconnect the connection before release. $connection->reconnect(); throw $exception; } finally { $connection->release(); } return $confirm ? $result : true; } /** * @param ProducerMessageInterface $producerMessage */ private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void { if (class_exists(AnnotationCollector::class)) { /** @var \Hyperf\Amqp\Annotation\Producer $annotation */ $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class); if ($annotation) { $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey); $annotation->exchange && $producerMessage->setExchange($annotation->exchange); } } } }
處理超時訂單
Orderqueueconsumer.Php
Orderqueueproducer.Php
Orderqueueproducer.php
<?php declare(strict_types = 1); namespace App\Amqp\Producer; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder\ExchangeBuilder; use Hyperf\Amqp\Message\ProducerMessage; /** * @Producer(exchange="order_exchange", routingKey="order_exchange") */ class OrderQueueProducer extends ProducerMessage { public function __construct($data) { $this->payload = $data; } public function getExchangeBuilder() : ExchangeBuilder { return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub } }
Orderqueueconsumer.php
<?php declare(strict_types = 1); namespace App\Amqp\Consumer; use App\Service\CityTransport\OrderService; use Hyperf\Amqp\Result; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; /** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */ class OrderQueueConsumer extends ConsumerMessage { public function consume($data) : string { ##業務處理 } public function isEnable() : bool { return true; } }
Demo
$builder = new AmqpBuilder(); $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route'); $que = ApplicationContext::getContainer()->get(DelayProducer::class); var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))
以上就是PHP實現延時隊列和處理超時訂單的代碼展示,如果在日常工作遇到這個問題,希望你能通過這篇文章解決問題。如果想了解更多相關內容,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。