$queue['heartbeat']] ; self::$connection = new \AMQPConnection($credentials); self::$connection->setLogin(Yii::$app->params['amqp']['login']); self::$connection->setPassword(Yii::$app->params['amqp']["pass"]); self::$connection->setHost(Yii::$app->params['amqp']["serverName"]); self::$connection->setPort(Yii::$app->params['amqp']["port"]); self::$connection->setVhost(Yii::$app->params['amqp']["vhostName"]); try { self::$connection->connect(); } catch (\AMQPConnectionException $exception) { AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); throw $exception; } } return self::$connection; } public function getChannel($queue) { $key = json_encode($queue); if (!isset(self::$channel[$key])) { try { self::$channel[$key] = new \AMQPChannel($this->getController($queue)); self::$channel[$key]->setPrefetchCount(self::$prefetchCount); } catch (\AMQPConnectionException $exception) { AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); throw $exception; } } return self::$channel[$key]; } public function getExchange($queue) { $project = $this->getProject(); $key = $project['name'].'_'.$project['type'].'_'.$project['flags']; if (!isset(self::$exchange[$key])) { try { self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue)); self::$exchange[$key]->setName($project['name']); self::$exchange[$key]->setType($project['type']); self::$exchange[$key]->setFlags($project['flags']); self::$exchange[$key]->declareExchange(); } catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception) { AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); throw $exception; } } return self::$exchange[$key]; } public function getQueue($routeKey) { $queue = $this->getQueueConfig($routeKey); $queueName = $queue['name']; if(!isset(self::$arQueue[$queueName])) { try { self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue)); self::$arQueue[$queueName]->setName($queueName); self::$arQueue[$queueName]->setFlags($queue['flags']); self::$arQueue[$queueName]->declareQueue(); foreach ($queue['routing'] as $routing) { self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing); } } catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception) { AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); throw $exception; } } return self::$arQueue[$queueName]; } /** * * @param type $routeKey * @return type * @throws \Exception */ public function getQueueConfig($routeKey) { $project = $this->getProject(); if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey])) { $error = 'Ключ маршрутизации '.$routeKey.' не найден'; AmqpLoger::log('error', $error, 'system'); throw new \Exception($error); } elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"])) { $error = 'Не верный формат '.$routeKey; AmqpLoger::log('error', $error, 'system'); throw new \Exception($error); } else { return $project['queues'][$routeKey]; } } public function setProject($projectName) { $projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : []; $this->project = isset($projects[$projectName]) ? $projects[$projectName] : []; if(empty($this->project)) { $error = 'проект '.$projectName.' не найден'; AmqpLoger::log('error', $error, 'system'); throw new \Exception($error); } } public function getProject() { if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"])) { $error = 'не верный формат проекта очереди'; AmqpLoger::log('error', $error, 'system'); throw new \Exception($error); } return $this->project; } public function getQueueObject($queueName) { if(!$this->getQueue($queueName)) { $error = 'Очередь ' . $queueName . ' не найдена'; AmqpLoger::log('error', $error, 'system'); throw new \Exception($error); } else { return $this->getQueue($queueName); } } public function ack($queue, $envelope) { try { $queue->ack($envelope->getDeliveryTag()); } catch (\Exception $exception) { AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); throw new \Exception($exception->getCode() . ' ' . $exception->getMessage()); } } /** * Добавляем произвольную запись в очередь * @param string $projectName синоним обменника (используется в коде) * @param string $routeKey псевдоним очереди * @param array $params Поля передаваемого объекта */ public static function publishRaw($projectName, $routeKey, $sendParams) { $amqp = new AmqpHelper; $amqp->setProject($projectName); $queue = $amqp->getQueueConfig($routeKey); $amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']); AmqpLoger::log('producer', $sendParams, $routeKey); return $sendParams; } /** * Добавляем запись в очередь * @param string $routeKey псевдоним очереди * @param array $params Поля передаваемого объекта * @param string $object Тип передаваемого объекта * @param string $api Версия API */ public static function publish($projectName, $routeKey, $params, $object, $api = '1.0') { $sendParams = [ 'id' => uniqid() . '-' . uniqid(), 'date' => date(\DateTime::ISO8601), 'object' => $object, 'api' => $api, 'params' => $params ]; AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); return $sendParams; } /** * Ставит в очередь для отправки письма * @param type $routeKey * @param type $params * @param type $code * @return string */ public static function sendMail($projectName, $routeKey, $params, $code) { $sendParams = [ 'id' => uniqid() . '-' . uniqid(), 'date' => date(\DateTime::ISO8601), 'code' => $code, 'params' => $params ]; AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); return $sendParams; } /** * Ставит в очередь для обработки событий * @param type $routeKey * @param type $params * @param type $code * @return string */ public static function sendEvent($projectName, $routeKey, $params, $code) { $sendParams = [ 'id' => uniqid() . '-' . uniqid(), 'date' => date(\DateTime::ISO8601), 'code' => $code, 'params' => $params ]; AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); return $sendParams; } }