uawdijnntqw1x1x1
IP : 216.73.216.107
Hostname : toronto-dev2
Kernel : Linux toronto-dev2 4.15.0-213-generic #224-Ubuntu SMP Mon Jun 19 13:30:12 UTC 2023 x86_64
Disable Function : None :)
OS : Linux
PATH:
/
srv
/
users
/
craft4
/
apps
/
craft4-newsite-space
/
vendor
/
yiisoft
/
yii2-queue
/
src
/
drivers
/
amqp
/
Queue.php
/
/
<?php /** * @link https://www.yiiframework.com/ * @copyright Copyright (c) 2008 Yii Software LLC * @license https://www.yiiframework.com/license/ */ namespace yii\queue\amqp; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use yii\base\Application as BaseApp; use yii\base\Event; use yii\base\NotSupportedException; use yii\queue\cli\Queue as CliQueue; /** * Amqp Queue. * * @deprecated since 2.0.2 and will be removed in 3.0. Consider using amqp_interop driver instead. * * @author Roman Zhuravlev <zhuravljov@gmail.com> */ class Queue extends CliQueue { public $host = 'localhost'; public $port = 5672; public $user = 'guest'; public $password = 'guest'; public $queueName = 'queue'; public $exchangeName = 'exchange'; public $vhost = '/'; /** * @var int The periods of time PHP pings the broker in order to prolong the connection timeout. In seconds. * @since 2.3.1 */ public $heartbeat = 0; /** * Send keep-alive packets for a socket connection * @var bool * @since 2.3.6 */ public $keepalive = false; /** * @var string command class name */ public $commandClass = Command::class; /** * @var AMQPStreamConnection */ protected $connection; /** * @var AMQPChannel */ protected $channel; /** * @inheritdoc */ public function init() { parent::init(); Event::on(BaseApp::class, BaseApp::EVENT_AFTER_REQUEST, function () { $this->close(); }); } /** * Listens amqp-queue and runs new jobs. */ public function listen() { $this->open(); $callback = function (AMQPMessage $payload) { $id = $payload->get('message_id'); list($ttr, $message) = explode(';', $payload->body, 2); if ($this->handleMessage($id, $message, $ttr, 1)) { $payload->delivery_info['channel']->basic_ack($payload->delivery_info['delivery_tag']); } }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } } /** * @inheritdoc */ protected function pushMessage($message, $ttr, $delay, $priority) { if ($delay) { throw new NotSupportedException('Delayed work is not supported in the driver.'); } if ($priority !== null) { throw new NotSupportedException('Job priority is not supported in the driver.'); } $this->open(); $id = uniqid('', true); $this->channel->basic_publish( new AMQPMessage("$ttr;$message", [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'message_id' => $id, ]), $this->exchangeName ); return $id; } /** * @inheritdoc */ public function status($id) { throw new NotSupportedException('Status is not supported in the driver.'); } /** * Opens connection and channel. */ protected function open() { if ($this->channel) { return; } $this->connection = new AMQPStreamConnection( $this->host, $this->port, $this->user, $this->password, $this->vhost, false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, $this->keepalive, $this->heartbeat, 0.0, null ); $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } /** * Closes connection and channel. */ protected function close() { if (!$this->channel) { return; } $this->channel->close(); $this->connection->close(); } }
/srv/users/craft4/apps/craft4-newsite-space/vendor/yiisoft/yii2-queue/src/drivers/amqp/Queue.php