uawdijnntqw1x1x1
IP : 216.73.216.107
Hostname : toronto-dev2
Kernel : Linux toronto-dev2 4.15.0-213-generic #224-Ubuntu SMP Mon Jun 19 13:30:12 UTC 2023 x86_64
Disable Function : None :)
OS : Linux
PATH:
/
srv
/
users
/
craft4
/
apps
/
craft4-newsite-space
/
vendor
/
yiisoft
/
yii2-queue
/
src
/
drivers
/
redis
/
.
/
Queue.php
/
/
<?php /** * @link https://www.yiiframework.com/ * @copyright Copyright (c) 2008 Yii Software LLC * @license https://www.yiiframework.com/license/ */ namespace yii\queue\redis; use yii\base\InvalidArgumentException; use yii\base\NotSupportedException; use yii\di\Instance; use yii\queue\cli\Queue as CliQueue; use yii\queue\interfaces\StatisticsProviderInterface; use yii\redis\Connection; /** * Redis Queue. * * @property-read StatisticsProvider $statisticsProvider * * @author Roman Zhuravlev <zhuravljov@gmail.com> */ class Queue extends CliQueue implements StatisticsProviderInterface { /** * @var Connection|array|string */ public $redis = 'redis'; /** * @var string */ public $channel = 'queue'; /** * @var string command class name */ public $commandClass = Command::class; /** * @inheritdoc */ public function init() { parent::init(); $this->redis = Instance::ensure($this->redis, Connection::class); } /** * Listens queue and runs each job. * * @param bool $repeat whether to continue listening when queue is empty. * @param int $timeout number of seconds to wait for next message. * @return null|int exit code. * @internal for worker command only. * @since 2.0.2 */ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if (($payload = $this->reserve($timeout)) !== null) { list($id, $message, $ttr, $attempt) = $payload; if ($this->handleMessage($id, $message, $ttr, $attempt)) { $this->delete($id); } } elseif (!$repeat) { break; } } }); } /** * @inheritdoc */ public function status($id) { if (!is_numeric($id) || $id <= 0) { throw new InvalidArgumentException("Unknown message ID: $id."); } if ($this->redis->hexists("$this->channel.attempts", $id)) { return self::STATUS_RESERVED; } if ($this->redis->hexists("$this->channel.messages", $id)) { return self::STATUS_WAITING; } return self::STATUS_DONE; } /** * Clears the queue. * * @since 2.0.1 */ public function clear() { while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) { usleep(10000); } $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); } /** * Removes a job by ID. * * @param int $id of a job * @return bool * @since 2.0.1 */ public function remove($id) { while (!$this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { usleep(10000); } if ($this->redis->hdel("$this->channel.messages", $id)) { $this->redis->zrem("$this->channel.delayed", $id); $this->redis->zrem("$this->channel.reserved", $id); $this->redis->lrem("$this->channel.waiting", 0, $id); $this->redis->hdel("$this->channel.attempts", $id); return true; } return false; } /** * @param int $timeout timeout * @return array|null payload */ protected function reserve($timeout) { // Moves delayed and reserved jobs into waiting list with lock for one second if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { $this->moveExpired("$this->channel.delayed"); $this->moveExpired("$this->channel.reserved"); } // Find a new waiting message $id = null; if (!$timeout) { $id = $this->redis->rpop("$this->channel.waiting"); } elseif ($result = $this->redis->brpop("$this->channel.waiting", $timeout)) { $id = $result[1]; } if (!$id) { return null; } $payload = $this->redis->hget("$this->channel.messages", $id); if (null === $payload) { return null; } list($ttr, $message) = explode(';', $payload, 2); $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id); $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1); return [$id, $message, $ttr, $attempt]; } /** * @param string $from */ protected function moveExpired($from) { $now = time(); if ($expired = $this->redis->zrevrangebyscore($from, $now, '-inf')) { $this->redis->zremrangebyscore($from, '-inf', $now); foreach ($expired as $id) { $this->redis->rpush("$this->channel.waiting", $id); } } } /** * Deletes message by ID. * * @param int $id of a message */ protected function delete($id) { $this->redis->zrem("$this->channel.reserved", $id); $this->redis->hdel("$this->channel.attempts", $id); $this->redis->hdel("$this->channel.messages", $id); } /** * @inheritdoc */ protected function pushMessage($message, $ttr, $delay, $priority) { if ($priority !== null) { throw new NotSupportedException('Job priority is not supported in the driver.'); } $id = $this->redis->incr("$this->channel.message_id"); $this->redis->hset("$this->channel.messages", $id, "$ttr;$message"); if (!$delay) { $this->redis->lpush("$this->channel.waiting", $id); } else { $this->redis->zadd("$this->channel.delayed", time() + $delay, $id); } return $id; } private $_statistcsProvider; /** * @return StatisticsProvider */ public function getStatisticsProvider() { if (!$this->_statistcsProvider) { $this->_statistcsProvider = new StatisticsProvider($this); } return $this->_statistcsProvider; } }
/srv/users/craft4/apps/craft4-newsite-space/vendor/yiisoft/yii2-queue/src/drivers/redis/./Queue.php