kuvalda-amqp/AmqpHelper.php

267 lines
9.1 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace dominion\amqp;
use Yii;
use dominion\amqp\AmqpLoger;
class AmqpHelper
{
protected static $connection;
protected static $channel;
protected static $exchange;
protected static $arQueue;
protected static $prefetchCount = 1;
protected $project;
public function setPrefetchCount(int $count)
{
self::$prefetchCount = $count;
}
public function getController($queue)
{
$key = json_encode($queue);
if (!isset(self::$connection[$key]))
{
$credentials = ['heartbeat' => $queue['heartbeat']] ;
self::$connection = new \AMQPConnection($credentials);
self::$connection->setLogin(Yii::$app->params['amqp']['login']);
self::$connection->setPassword(Yii::$app->params['amqp']["pass"]);
self::$connection->setHost(Yii::$app->params['amqp']["serverName"]);
self::$connection->setPort(Yii::$app->params['amqp']["port"]);
self::$connection->setVhost(Yii::$app->params['amqp']["vhostName"]);
try
{
self::$connection->connect();
} catch (\AMQPConnectionException $exception)
{
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
throw $exception;
}
}
return self::$connection;
}
public function getChannel($queue)
{
$key = json_encode($queue);
if (!isset(self::$channel[$key]))
{
try
{
self::$channel[$key] = new \AMQPChannel($this->getController($queue));
self::$channel[$key]->setPrefetchCount(self::$prefetchCount);
} catch (\AMQPConnectionException $exception)
{
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
throw $exception;
}
}
return self::$channel[$key];
}
public function getExchange($queue)
{
$project = $this->getProject();
$key = $project['name'].'_'.$project['type'].'_'.$project['flags'];
if (!isset(self::$exchange[$key]))
{
try
{
self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue));
self::$exchange[$key]->setName($project['name']);
self::$exchange[$key]->setType($project['type']);
self::$exchange[$key]->setFlags($project['flags']);
self::$exchange[$key]->declareExchange();
} catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception)
{
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
throw $exception;
}
}
return self::$exchange[$key];
}
public function getQueue($routeKey)
{
$queue = $this->getQueueConfig($routeKey);
$queueName = $queue['name'];
if(!isset(self::$arQueue[$queueName]))
{
try
{
self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue));
self::$arQueue[$queueName]->setName($queueName);
self::$arQueue[$queueName]->setFlags($queue['flags']);
self::$arQueue[$queueName]->declareQueue();
foreach ($queue['routing'] as $routing)
{
self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing);
}
} catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception)
{
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
throw $exception;
}
}
return self::$arQueue[$queueName];
}
/**
*
* @param type $routeKey
* @return type
* @throws \Exception
*/
public function getQueueConfig($routeKey)
{
$project = $this->getProject();
if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey]))
{
$error = 'Ключ маршрутизации '.$routeKey.' не найден';
AmqpLoger::log('error', $error, 'system');
throw new \Exception($error);
}
elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"]))
{
$error = 'Не верный формат '.$routeKey;
AmqpLoger::log('error', $error, 'system');
throw new \Exception($error);
}
else
{
return $project['queues'][$routeKey];
}
}
public function setProject($projectName)
{
$projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : [];
$this->project = isset($projects[$projectName]) ? $projects[$projectName] : [];
if(empty($this->project))
{
$error = 'проект '.$projectName.' не найден';
AmqpLoger::log('error', $error, 'system');
throw new \Exception($error);
}
}
public function getProject()
{
if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"]))
{
$error = 'не верный формат проекта очереди';
AmqpLoger::log('error', $error, 'system');
throw new \Exception($error);
}
return $this->project;
}
public function getQueueObject($queueName)
{
if(!$this->getQueue($queueName))
{
$error = 'Очередь ' . $queueName . ' не найдена';
AmqpLoger::log('error', $error, 'system');
throw new \Exception($error);
}
else
{
return $this->getQueue($queueName);
}
}
public function ack($queue, $envelope)
{
try
{
$queue->ack($envelope->getDeliveryTag());
}
catch (\Exception $exception)
{
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
throw new \Exception($exception->getCode() . ' ' . $exception->getMessage());
}
}
/**
* Добавляем произвольную запись в очередь
* @param string $projectName синоним обменника (используется в коде)
* @param string $routeKey псевдоним очереди
* @param array $params Поля передаваемого объекта
*/
public static function publishRaw($projectName, $routeKey, $sendParams)
{
$amqp = new AmqpHelper;
$amqp->setProject($projectName);
$queue = $amqp->getQueueConfig($routeKey);
$amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']);
AmqpLoger::log('producer', $sendParams, $routeKey);
return $sendParams;
}
/**
* Добавляем запись в очередь
* @param string $routeKey псевдоним очереди
* @param array $params Поля передаваемого объекта
* @param string $object Тип передаваемого объекта
* @param string $api Версия API
*/
public static function publish($projectName, $routeKey, $params, $object, $api = '1.0')
{
$sendParams = [
'id' => uniqid() . '-' . uniqid(),
'date' => date(\DateTime::ISO8601),
'object' => $object,
'api' => $api,
'params' => $params
];
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
return $sendParams;
}
/**
* Ставит в очередь для отправки письма
* @param type $routeKey
* @param type $params
* @param type $code
* @return string
*/
public static function sendMail($projectName, $routeKey, $params, $code)
{
$sendParams = [
'id' => uniqid() . '-' . uniqid(),
'date' => date(\DateTime::ISO8601),
'code' => $code,
'params' => $params
];
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
return $sendParams;
}
/**
* Ставит в очередь для обработки событий
* @param type $routeKey
* @param type $params
* @param type $code
* @return string
*/
public static function sendEvent($projectName, $routeKey, $params, $code)
{
$sendParams = [
'id' => uniqid() . '-' . uniqid(),
'date' => date(\DateTime::ISO8601),
'code' => $code,
'params' => $params
];
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
return $sendParams;
}
}