267 lines
9.1 KiB
PHP
267 lines
9.1 KiB
PHP
<?php
|
||
|
||
namespace dominion\amqp;
|
||
|
||
use Yii;
|
||
use dominion\amqp\AmqpLoger;
|
||
|
||
class AmqpHelper
|
||
{
|
||
|
||
protected static $connection;
|
||
protected static $channel;
|
||
protected static $exchange;
|
||
protected static $arQueue;
|
||
protected static $prefetchCount = 1;
|
||
|
||
protected $project;
|
||
|
||
public function setPrefetchCount(int $count)
|
||
{
|
||
self::$prefetchCount = $count;
|
||
}
|
||
|
||
public function getController($queue)
|
||
{
|
||
$key = json_encode($queue);
|
||
if (!isset(self::$connection[$key]))
|
||
{
|
||
$credentials = ['heartbeat' => $queue['heartbeat']] ;
|
||
self::$connection = new \AMQPConnection($credentials);
|
||
self::$connection->setLogin(Yii::$app->params['amqp']['login']);
|
||
self::$connection->setPassword(Yii::$app->params['amqp']["pass"]);
|
||
self::$connection->setHost(Yii::$app->params['amqp']["serverName"]);
|
||
self::$connection->setPort(Yii::$app->params['amqp']["port"]);
|
||
self::$connection->setVhost(Yii::$app->params['amqp']["vhostName"]);
|
||
try
|
||
{
|
||
self::$connection->connect();
|
||
} catch (\AMQPConnectionException $exception)
|
||
{
|
||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||
throw $exception;
|
||
}
|
||
}
|
||
return self::$connection;
|
||
}
|
||
|
||
public function getChannel($queue)
|
||
{
|
||
$key = json_encode($queue);
|
||
if (!isset(self::$channel[$key]))
|
||
{
|
||
try
|
||
{
|
||
self::$channel[$key] = new \AMQPChannel($this->getController($queue));
|
||
self::$channel[$key]->setPrefetchCount(self::$prefetchCount);
|
||
} catch (\AMQPConnectionException $exception)
|
||
{
|
||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||
throw $exception;
|
||
}
|
||
}
|
||
return self::$channel[$key];
|
||
}
|
||
|
||
public function getExchange($queue)
|
||
{
|
||
$project = $this->getProject();
|
||
$key = $project['name'].'_'.$project['type'].'_'.$project['flags'];
|
||
if (!isset(self::$exchange[$key]))
|
||
{
|
||
try
|
||
{
|
||
self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue));
|
||
self::$exchange[$key]->setName($project['name']);
|
||
self::$exchange[$key]->setType($project['type']);
|
||
self::$exchange[$key]->setFlags($project['flags']);
|
||
self::$exchange[$key]->declareExchange();
|
||
} catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception)
|
||
{
|
||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||
throw $exception;
|
||
}
|
||
}
|
||
return self::$exchange[$key];
|
||
}
|
||
|
||
public function getQueue($routeKey)
|
||
{
|
||
$queue = $this->getQueueConfig($routeKey);
|
||
$queueName = $queue['name'];
|
||
if(!isset(self::$arQueue[$queueName]))
|
||
{
|
||
try
|
||
{
|
||
self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue));
|
||
self::$arQueue[$queueName]->setName($queueName);
|
||
self::$arQueue[$queueName]->setFlags($queue['flags']);
|
||
self::$arQueue[$queueName]->declareQueue();
|
||
|
||
foreach ($queue['routing'] as $routing)
|
||
{
|
||
self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing);
|
||
}
|
||
} catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception)
|
||
{
|
||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||
throw $exception;
|
||
}
|
||
}
|
||
return self::$arQueue[$queueName];
|
||
}
|
||
|
||
/**
|
||
*
|
||
* @param type $routeKey
|
||
* @return type
|
||
* @throws \Exception
|
||
*/
|
||
public function getQueueConfig($routeKey)
|
||
{
|
||
$project = $this->getProject();
|
||
if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey]))
|
||
{
|
||
$error = 'Ключ маршрутизации '.$routeKey.' не найден';
|
||
AmqpLoger::log('error', $error, 'system');
|
||
throw new \Exception($error);
|
||
}
|
||
elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"]))
|
||
{
|
||
$error = 'Не верный формат '.$routeKey;
|
||
AmqpLoger::log('error', $error, 'system');
|
||
throw new \Exception($error);
|
||
}
|
||
else
|
||
{
|
||
return $project['queues'][$routeKey];
|
||
}
|
||
}
|
||
|
||
public function setProject($projectName)
|
||
{
|
||
$projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : [];
|
||
$this->project = isset($projects[$projectName]) ? $projects[$projectName] : [];
|
||
if(empty($this->project))
|
||
{
|
||
$error = 'проект '.$projectName.' не найден';
|
||
AmqpLoger::log('error', $error, 'system');
|
||
throw new \Exception($error);
|
||
}
|
||
}
|
||
|
||
public function getProject()
|
||
{
|
||
if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"]))
|
||
{
|
||
$error = 'не верный формат проекта очереди';
|
||
AmqpLoger::log('error', $error, 'system');
|
||
throw new \Exception($error);
|
||
}
|
||
return $this->project;
|
||
}
|
||
|
||
public function getQueueObject($queueName)
|
||
{
|
||
if(!$this->getQueue($queueName))
|
||
{
|
||
$error = 'Очередь ' . $queueName . ' не найдена';
|
||
AmqpLoger::log('error', $error, 'system');
|
||
throw new \Exception($error);
|
||
}
|
||
else
|
||
{
|
||
return $this->getQueue($queueName);
|
||
}
|
||
}
|
||
|
||
public function ack($queue, $envelope)
|
||
{
|
||
try
|
||
{
|
||
$queue->ack($envelope->getDeliveryTag());
|
||
}
|
||
catch (\Exception $exception)
|
||
{
|
||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||
throw new \Exception($exception->getCode() . ' ' . $exception->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Добавляем произвольную запись в очередь
|
||
* @param string $projectName синоним обменника (используется в коде)
|
||
* @param string $routeKey псевдоним очереди
|
||
* @param array $params Поля передаваемого объекта
|
||
*/
|
||
public static function publishRaw($projectName, $routeKey, $sendParams)
|
||
{
|
||
$amqp = new AmqpHelper;
|
||
$amqp->setProject($projectName);
|
||
$queue = $amqp->getQueueConfig($routeKey);
|
||
$amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']);
|
||
AmqpLoger::log('producer', $sendParams, $routeKey);
|
||
return $sendParams;
|
||
}
|
||
|
||
/**
|
||
* Добавляем запись в очередь
|
||
* @param string $routeKey псевдоним очереди
|
||
* @param array $params Поля передаваемого объекта
|
||
* @param string $object Тип передаваемого объекта
|
||
* @param string $api Версия API
|
||
*/
|
||
public static function publish($projectName, $routeKey, $params, $object, $api = '1.0')
|
||
{
|
||
$sendParams = [
|
||
'id' => uniqid() . '-' . uniqid(),
|
||
'date' => date(\DateTime::ISO8601),
|
||
'object' => $object,
|
||
'api' => $api,
|
||
'params' => $params
|
||
];
|
||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||
return $sendParams;
|
||
}
|
||
|
||
/**
|
||
* Ставит в очередь для отправки письма
|
||
* @param type $routeKey
|
||
* @param type $params
|
||
* @param type $code
|
||
* @return string
|
||
*/
|
||
public static function sendMail($projectName, $routeKey, $params, $code)
|
||
{
|
||
$sendParams = [
|
||
'id' => uniqid() . '-' . uniqid(),
|
||
'date' => date(\DateTime::ISO8601),
|
||
'code' => $code,
|
||
'params' => $params
|
||
];
|
||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||
return $sendParams;
|
||
}
|
||
|
||
/**
|
||
* Ставит в очередь для обработки событий
|
||
* @param type $routeKey
|
||
* @param type $params
|
||
* @param type $code
|
||
* @return string
|
||
*/
|
||
public static function sendEvent($projectName, $routeKey, $params, $code)
|
||
{
|
||
$sendParams = [
|
||
'id' => uniqid() . '-' . uniqid(),
|
||
'date' => date(\DateTime::ISO8601),
|
||
'code' => $code,
|
||
'params' => $params
|
||
];
|
||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||
return $sendParams;
|
||
}
|
||
|
||
|
||
}
|