VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > PHP >
  • PHP 框架 Hyperf 实现处理超时未支付订单和延时队列(2)

  

DelayProducer.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
 /**
 * @param ProducerMessageInterface $producerMessage
 * @param AmqpBuilder              $queueBuilder
 * @param bool                     $confirm
 * @param int                      $timeout
 *
 * @return bool
 * @throws \Throwable
 */
 public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = falseint $timeout = 5) : bool
 {
 return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
 {
 return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
 });
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 * @param AmqpBuilder              $queueBuilder
 * @param bool                     $confirm
 * @param int                      $timeout
 *
 * @return bool
 * @throws \Throwable
 */
 private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = falseint $timeout = 5) : bool
 {
 $result = false;
 $this->injectMessageProperty($producerMessage);
 $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
 $pool = $this->getConnectionPool($producerMessage->getPoolName());
 /** @var \Hyperf\Amqp\Connection $connection */
 $connection = $pool->get();
 if ($confirm) {
 $channel = $connection->getConfirmChannel();
 else {
 $channel = $connection->getChannel();
 }
 $channel->set_ack_handler(function () use (&$result)
 {
 $result = true;
 });
 try {
 // 处理延时队列
 $exchangeBuilder = $producerMessage->getExchangeBuilder();
 // 队列定义
 $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
 // 路由定义
 $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
 // 队列绑定
 $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
 // 消息发送
 $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
 $channel->wait_for_pending_acks_returns($timeout);
 catch (Throwable $exception) {
 // Reconnect the connection before release.
 $connection->reconnect();
 throw $exception;
 }
 finally {
 $connection->release();
 }
 return $confirm ? $result : true;
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 */
 private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
 {
 if (class_exists(AnnotationCollector::class)) {
 /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
 $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
 if ($annotation) {
 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
 $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
 }
 }
 }
}

相关教程