start
This commit is contained in:
parent
a27213b5d7
commit
86c05390a9
|
@ -0,0 +1,266 @@
|
|||
<?php
|
||||
|
||||
namespace dominion\amqp;
|
||||
|
||||
use Yii;
|
||||
use dominion\amqp\AmqpLoger;
|
||||
|
||||
class AmqpHelper
|
||||
{
|
||||
|
||||
protected static $connection;
|
||||
protected static $channel;
|
||||
protected static $exchange;
|
||||
protected static $arQueue;
|
||||
protected static $prefetchCount = 1;
|
||||
|
||||
protected $project;
|
||||
|
||||
public function setPrefetchCount(int $count)
|
||||
{
|
||||
self::$prefetchCount = $count;
|
||||
}
|
||||
|
||||
public function getController($queue)
|
||||
{
|
||||
$key = json_encode($queue);
|
||||
if (!isset(self::$connection[$key]))
|
||||
{
|
||||
$credentials = ['heartbeat' => $queue['heartbeat']] ;
|
||||
self::$connection = new \AMQPConnection($credentials);
|
||||
self::$connection->setLogin(Yii::$app->params['amqp']['login']);
|
||||
self::$connection->setPassword(Yii::$app->params['amqp']["pass"]);
|
||||
self::$connection->setHost(Yii::$app->params['amqp']["serverName"]);
|
||||
self::$connection->setPort(Yii::$app->params['amqp']["port"]);
|
||||
self::$connection->setVhost(Yii::$app->params['amqp']["vhostName"]);
|
||||
try
|
||||
{
|
||||
self::$connection->connect();
|
||||
} catch (\AMQPConnectionException $exception)
|
||||
{
|
||||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
return self::$connection;
|
||||
}
|
||||
|
||||
public function getChannel($queue)
|
||||
{
|
||||
$key = json_encode($queue);
|
||||
if (!isset(self::$channel[$key]))
|
||||
{
|
||||
try
|
||||
{
|
||||
self::$channel[$key] = new \AMQPChannel($this->getController($queue));
|
||||
self::$channel[$key]->setPrefetchCount(self::$prefetchCount);
|
||||
} catch (\AMQPConnectionException $exception)
|
||||
{
|
||||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
return self::$channel[$key];
|
||||
}
|
||||
|
||||
public function getExchange($queue)
|
||||
{
|
||||
$project = $this->getProject();
|
||||
$key = $project['name'].'_'.$project['type'].'_'.$project['flags'];
|
||||
if (!isset(self::$exchange[$key]))
|
||||
{
|
||||
try
|
||||
{
|
||||
self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue));
|
||||
self::$exchange[$key]->setName($project['name']);
|
||||
self::$exchange[$key]->setType($project['type']);
|
||||
self::$exchange[$key]->setFlags($project['flags']);
|
||||
self::$exchange[$key]->declareExchange();
|
||||
} catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception)
|
||||
{
|
||||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
return self::$exchange[$key];
|
||||
}
|
||||
|
||||
public function getQueue($routeKey)
|
||||
{
|
||||
$queue = $this->getQueueConfig($routeKey);
|
||||
$queueName = $queue['name'];
|
||||
if(!isset(self::$arQueue[$queueName]))
|
||||
{
|
||||
try
|
||||
{
|
||||
self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue));
|
||||
self::$arQueue[$queueName]->setName($queueName);
|
||||
self::$arQueue[$queueName]->setFlags($queue['flags']);
|
||||
self::$arQueue[$queueName]->declareQueue();
|
||||
|
||||
foreach ($queue['routing'] as $routing)
|
||||
{
|
||||
self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing);
|
||||
}
|
||||
} catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception)
|
||||
{
|
||||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
return self::$arQueue[$queueName];
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param type $routeKey
|
||||
* @return type
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function getQueueConfig($routeKey)
|
||||
{
|
||||
$project = $this->getProject();
|
||||
if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey]))
|
||||
{
|
||||
$error = 'Ключ маршрутизации '.$routeKey.' не найден';
|
||||
AmqpLoger::log('error', $error, 'system');
|
||||
throw new \Exception($error);
|
||||
}
|
||||
elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"]))
|
||||
{
|
||||
$error = 'Не верный формат '.$routeKey;
|
||||
AmqpLoger::log('error', $error, 'system');
|
||||
throw new \Exception($error);
|
||||
}
|
||||
else
|
||||
{
|
||||
return $project['queues'][$routeKey];
|
||||
}
|
||||
}
|
||||
|
||||
public function setProject($projectName)
|
||||
{
|
||||
$projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : [];
|
||||
$this->project = isset($projects[$projectName]) ? $projects[$projectName] : [];
|
||||
if(empty($this->project))
|
||||
{
|
||||
$error = 'проект '.$projectName.' не найден';
|
||||
AmqpLoger::log('error', $error, 'system');
|
||||
throw new \Exception($error);
|
||||
}
|
||||
}
|
||||
|
||||
public function getProject()
|
||||
{
|
||||
if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"]))
|
||||
{
|
||||
$error = 'не верный формат проекта очереди';
|
||||
AmqpLoger::log('error', $error, 'system');
|
||||
throw new \Exception($error);
|
||||
}
|
||||
return $this->project;
|
||||
}
|
||||
|
||||
public function getQueueObject($queueName)
|
||||
{
|
||||
if(!$this->getQueue($queueName))
|
||||
{
|
||||
$error = 'Очередь ' . $queueName . ' не найдена';
|
||||
AmqpLoger::log('error', $error, 'system');
|
||||
throw new \Exception($error);
|
||||
}
|
||||
else
|
||||
{
|
||||
return $this->getQueue($queueName);
|
||||
}
|
||||
}
|
||||
|
||||
public function ack($queue, $envelope)
|
||||
{
|
||||
try
|
||||
{
|
||||
$queue->ack($envelope->getDeliveryTag());
|
||||
}
|
||||
catch (\Exception $exception)
|
||||
{
|
||||
AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
|
||||
throw new \Exception($exception->getCode() . ' ' . $exception->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Добавляем произвольную запись в очередь
|
||||
* @param string $projectName синоним обменника (используется в коде)
|
||||
* @param string $routeKey псевдоним очереди
|
||||
* @param array $params Поля передаваемого объекта
|
||||
*/
|
||||
public static function publishRaw($projectName, $routeKey, $sendParams)
|
||||
{
|
||||
$amqp = new AmqpHelper;
|
||||
$amqp->setProject($projectName);
|
||||
$queue = $amqp->getQueueConfig($routeKey);
|
||||
$amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']);
|
||||
AmqpLoger::log('producer', $sendParams, $routeKey);
|
||||
return $sendParams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Добавляем запись в очередь
|
||||
* @param string $routeKey псевдоним очереди
|
||||
* @param array $params Поля передаваемого объекта
|
||||
* @param string $object Тип передаваемого объекта
|
||||
* @param string $api Версия API
|
||||
*/
|
||||
public static function publish($projectName, $routeKey, $params, $object, $api = '1.0')
|
||||
{
|
||||
$sendParams = [
|
||||
'id' => uniqid() . '-' . uniqid(),
|
||||
'date' => date(\DateTime::ISO8601),
|
||||
'object' => $object,
|
||||
'api' => $api,
|
||||
'params' => $params
|
||||
];
|
||||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||||
return $sendParams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ставит в очередь для отправки письма
|
||||
* @param type $routeKey
|
||||
* @param type $params
|
||||
* @param type $code
|
||||
* @return string
|
||||
*/
|
||||
public static function sendMail($projectName, $routeKey, $params, $code)
|
||||
{
|
||||
$sendParams = [
|
||||
'id' => uniqid() . '-' . uniqid(),
|
||||
'date' => date(\DateTime::ISO8601),
|
||||
'code' => $code,
|
||||
'params' => $params
|
||||
];
|
||||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||||
return $sendParams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ставит в очередь для обработки событий
|
||||
* @param type $routeKey
|
||||
* @param type $params
|
||||
* @param type $code
|
||||
* @return string
|
||||
*/
|
||||
public static function sendEvent($projectName, $routeKey, $params, $code)
|
||||
{
|
||||
$sendParams = [
|
||||
'id' => uniqid() . '-' . uniqid(),
|
||||
'date' => date(\DateTime::ISO8601),
|
||||
'code' => $code,
|
||||
'params' => $params
|
||||
];
|
||||
AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
|
||||
return $sendParams;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
<?php
|
||||
|
||||
namespace dominion\amqp;
|
||||
|
||||
use Yii;
|
||||
use yii\log\FileTarget;
|
||||
|
||||
class AmqpLoger extends FileTarget
|
||||
{
|
||||
|
||||
public static function log($type, $result, $prefixFile = '')
|
||||
{
|
||||
$model = new AmqpLoger;
|
||||
if (isset(Yii::$app->params['log']['maxFileSize']) && Yii::$app->params['log']['maxFileSize'] > 1)
|
||||
{
|
||||
$model->maxFileSize = Yii::$app->params['log']['maxFileSize'];
|
||||
}
|
||||
if (isset(Yii::$app->params['log']['maxLogFiles']) && Yii::$app->params['log']['maxLogFiles'] > 1)
|
||||
{
|
||||
$model->maxLogFiles = Yii::$app->params['log']['maxLogFiles'];
|
||||
}
|
||||
$model->logFile = Yii::getAlias(Yii::$app->params['log']['dir']) . $prefixFile . (empty($prefixFile) ? $type : ucfirst($type) ) . '.log';
|
||||
$model->messages = [$result];
|
||||
$model->export();
|
||||
}
|
||||
|
||||
public function formatMessage($result)
|
||||
{
|
||||
return sprintf(
|
||||
"[%s]\t%s\t%s",
|
||||
date(\DateTime::ISO8601),
|
||||
getmypid(),
|
||||
json_encode($result, JSON_UNESCAPED_UNICODE)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
<?php
|
||||
|
||||
namespace dominion\amqp;
|
||||
|
||||
use Yii;
|
||||
use yii\base\DynamicModel;
|
||||
|
||||
class CustomDynamicModel extends DynamicModel
|
||||
{
|
||||
public function __get($name)
|
||||
{
|
||||
if (!$this->hasAttribute($name)) {
|
||||
$getter = 'get' . $name;
|
||||
if (method_exists($this, $getter)) {
|
||||
// read property, e.g. getName()
|
||||
return $this->$getter();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
return parent::__get($name);
|
||||
}
|
||||
|
||||
public function arrayValidate($attribute, $params)
|
||||
{
|
||||
if (!is_array($this->$attribute)) {
|
||||
$this->addError($attribute, 'Неверный формат "'.$attribute.'"');
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"name": "dominion/amqp",
|
||||
"description": "Функционал для работы с очередями",
|
||||
"type": "yii2-extension",
|
||||
"keywords": ["yii2","extension"],
|
||||
"license": "MIT",
|
||||
"authors": [
|
||||
{
|
||||
"name": "Rybkin Sasha",
|
||||
"email": "ribkin@dominion.ru"
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"yiisoft/yii2": "~2.0.0"
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"dominion\\amqp\\": ""
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
<?php
|
||||
|
||||
/**
|
||||
* @link https://www.kuvalda.ru/
|
||||
* @copyright
|
||||
* @license
|
||||
*/
|
||||
|
||||
namespace dominion\amqp\console;
|
||||
|
||||
use Yii;
|
||||
use yii\console\Controller;
|
||||
use yii\console\ExitCode;
|
||||
use dominion\amqp\AmqpHelper;
|
||||
|
||||
/**
|
||||
* Работа с очередеми
|
||||
*
|
||||
* @author Rybkin Sasha <ribkin@dominion.ru>
|
||||
* @since 0.1
|
||||
*/
|
||||
class AmqpController extends Controller
|
||||
{
|
||||
|
||||
/**
|
||||
* Создать все очереди
|
||||
* @return int Exit code
|
||||
*/
|
||||
public function actionCreateAllQueue()
|
||||
{
|
||||
$amqp = new AmqpHelper;
|
||||
foreach(Yii::$app->params['amqp']['queue'] as $key =>$value)
|
||||
{
|
||||
$amqp->getQueueObject($key);
|
||||
}
|
||||
return ExitCode::OK;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
<?php
|
||||
|
||||
namespace dominion\amqp\console;
|
||||
|
||||
use Yii;
|
||||
use dominion\amqp\AmqpHelper;
|
||||
use dominion\amqp\CustomDynamicModel;
|
||||
use dominion\amqp\AmqpLoger;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
abstract class BaseController extends \yii\console\Controller
|
||||
{
|
||||
public $projectName = '';
|
||||
public $queueName = '';
|
||||
public $sendQueueName = ''; //ключ маршрутизации ответа
|
||||
public $amqp;
|
||||
public $errorCode = false;
|
||||
public $typeRule = 'request';
|
||||
public $errors = [];
|
||||
public $resultType;
|
||||
public $arErrorCode = [];
|
||||
|
||||
public function publish($result, $param)
|
||||
{
|
||||
$sendParams = [
|
||||
'id' => uniqid() . '-' . uniqid(),
|
||||
'requestId' => isset($param['id']) ? $param['id'] : '',
|
||||
'date' => date(\DateTime::ISO8601),
|
||||
'success' => true,
|
||||
];
|
||||
if (!empty($result))
|
||||
{
|
||||
$sendParams['result'] = $result;
|
||||
$sendParams['resultType'] = $this->resultType;
|
||||
}
|
||||
if ($this->errorCode > 0)
|
||||
{
|
||||
$sendParams['errorCode'] = $this->errorCode;
|
||||
$sendParams['errorMessage'] = $this->getErrorMessage();
|
||||
$sendParams['success'] = false;
|
||||
}
|
||||
$type = 'result';
|
||||
$prefixFile = $this->queueName;
|
||||
if (!empty($this->sendQueueName))
|
||||
{
|
||||
$type = 'producer';
|
||||
$prefixFile = $this->sendQueueName;
|
||||
AmqpHelper::publishRaw($this->projectName, $this->sendQueueName, $sendParams);
|
||||
}
|
||||
AmqpLoger::log($type, $sendParams, $prefixFile);
|
||||
return $sendParams;
|
||||
}
|
||||
|
||||
public function sendResult($queue, $envelope, $result)
|
||||
{
|
||||
$this->publish($result, json_decode($envelope->getBody(), true));
|
||||
$this->amqp->ack($queue, $envelope);
|
||||
}
|
||||
|
||||
public abstract function processMessage($params);
|
||||
|
||||
public function proccess()
|
||||
{
|
||||
$this->amqp = new AmqpHelper;
|
||||
$this->amqp->setProject($this->projectName);
|
||||
$this->amqp->getQueueObject($this->queueName)->consume(function($envelope, $queue)
|
||||
{
|
||||
$value = null;
|
||||
try
|
||||
{
|
||||
$this->errorCode = null;
|
||||
$this->errors = null;
|
||||
$body = $envelope->getBody();
|
||||
$params = json_decode($body, true);
|
||||
AmqpLoger::log('consumer', empty($params) ? $body : $params, $this->queueName);
|
||||
$value = $this->validate($params) ? $this->processMessage($params) : null;
|
||||
}
|
||||
catch (\Exception $exception)
|
||||
{
|
||||
$this->errorCode = 99;
|
||||
$this->errors['system'] = $exception->getCode() . ' ' . $exception->getMessage();
|
||||
// throw $exception;
|
||||
}
|
||||
$this->sendResult($queue, $envelope, $value);
|
||||
});
|
||||
}
|
||||
|
||||
public function validate($params)
|
||||
{
|
||||
$rules = [
|
||||
//Поля сообщения запроса
|
||||
'request' => [
|
||||
[['id', 'date', 'object', 'api', 'params'], 'required'],
|
||||
[['id', 'object'], 'string', 'max' => 255],
|
||||
[['date'], 'string', 'max' => 50],
|
||||
[['params'], 'arrayValidate'],
|
||||
],
|
||||
//Поля сообщения ответа
|
||||
'response' => [
|
||||
[['id', 'requestId', 'date', 'success'], 'required'],
|
||||
[['id', 'requestId'], 'string', 'max' => 255],
|
||||
[['date'], 'string', 'max' => 50],
|
||||
[['success'], 'boolean'],
|
||||
[['errorCode'], 'integer'],
|
||||
[['result'], 'arrayValidate'],
|
||||
],
|
||||
];
|
||||
if(isset($params['errorCode']))
|
||||
{
|
||||
$errorMessage = $params['errorCode'] . ' ' . isset($this->arErrorCode[$params['errorCode']]) ? $this->arErrorCode[$params['errorCode']] : '';
|
||||
$this->setErrors([$params['errorCode'] => $errorMessage]);
|
||||
return false;
|
||||
}
|
||||
return $this->validateArryByRules($params, $rules[$this->typeRule]);
|
||||
}
|
||||
|
||||
public function validateArryByRules($array, $rules, $errorPrefix = '')
|
||||
{
|
||||
if(!is_array($array))
|
||||
{
|
||||
$this->setErrors(['Не верный формат массива'], $errorPrefix);
|
||||
return false;
|
||||
}
|
||||
if($array == array_values($array)) //массив должен быть ассоциативным
|
||||
{
|
||||
$this->setErrors(['Не верный формат массива'], $errorPrefix);
|
||||
return false;
|
||||
}
|
||||
$model = CustomDynamicModel::validateData($array, $rules);
|
||||
if ($model->hasErrors())
|
||||
{
|
||||
$this->setErrors($model->errors, $errorPrefix);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public function getErrorMessage()
|
||||
{
|
||||
$output = '';
|
||||
foreach ($this->errors as $key => $value)
|
||||
{
|
||||
$output .= $key . ': "' . implode('", "', (array)$value) . '"; ';
|
||||
}
|
||||
return $output;
|
||||
}
|
||||
|
||||
public function setErrors($errors, $errorPrefix = '', $code = 98)
|
||||
{
|
||||
foreach ($errors as $key => $value)
|
||||
{
|
||||
$newKey = empty($errorPrefix) ? $key : ($errorPrefix . '.' . $key);
|
||||
$this->errors[$newKey] = $value;
|
||||
}
|
||||
$this->errorCode = 98;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue