diff --git a/AmqpHelper.php b/AmqpHelper.php new file mode 100644 index 0000000..7274a25 --- /dev/null +++ b/AmqpHelper.php @@ -0,0 +1,266 @@ + $queue['heartbeat']] ; + self::$connection = new \AMQPConnection($credentials); + self::$connection->setLogin(Yii::$app->params['amqp']['login']); + self::$connection->setPassword(Yii::$app->params['amqp']["pass"]); + self::$connection->setHost(Yii::$app->params['amqp']["serverName"]); + self::$connection->setPort(Yii::$app->params['amqp']["port"]); + self::$connection->setVhost(Yii::$app->params['amqp']["vhostName"]); + try + { + self::$connection->connect(); + } catch (\AMQPConnectionException $exception) + { + AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); + throw $exception; + } + } + return self::$connection; + } + + public function getChannel($queue) + { + $key = json_encode($queue); + if (!isset(self::$channel[$key])) + { + try + { + self::$channel[$key] = new \AMQPChannel($this->getController($queue)); + self::$channel[$key]->setPrefetchCount(self::$prefetchCount); + } catch (\AMQPConnectionException $exception) + { + AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); + throw $exception; + } + } + return self::$channel[$key]; + } + + public function getExchange($queue) + { + $project = $this->getProject(); + $key = $project['name'].'_'.$project['type'].'_'.$project['flags']; + if (!isset(self::$exchange[$key])) + { + try + { + self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue)); + self::$exchange[$key]->setName($project['name']); + self::$exchange[$key]->setType($project['type']); + self::$exchange[$key]->setFlags($project['flags']); + self::$exchange[$key]->declareExchange(); + } catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception) + { + AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); + throw $exception; + } + } + return self::$exchange[$key]; + } + + public function getQueue($routeKey) + { + $queue = $this->getQueueConfig($routeKey); + $queueName = $queue['name']; + if(!isset(self::$arQueue[$queueName])) + { + try + { + self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue)); + self::$arQueue[$queueName]->setName($queueName); + self::$arQueue[$queueName]->setFlags($queue['flags']); + self::$arQueue[$queueName]->declareQueue(); + + foreach ($queue['routing'] as $routing) + { + self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing); + } + } catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception) + { + AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); + throw $exception; + } + } + return self::$arQueue[$queueName]; + } + + /** + * + * @param type $routeKey + * @return type + * @throws \Exception + */ + public function getQueueConfig($routeKey) + { + $project = $this->getProject(); + if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey])) + { + $error = 'Ключ маршрутизации '.$routeKey.' не найден'; + AmqpLoger::log('error', $error, 'system'); + throw new \Exception($error); + } + elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"])) + { + $error = 'Не верный формат '.$routeKey; + AmqpLoger::log('error', $error, 'system'); + throw new \Exception($error); + } + else + { + return $project['queues'][$routeKey]; + } + } + + public function setProject($projectName) + { + $projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : []; + $this->project = isset($projects[$projectName]) ? $projects[$projectName] : []; + if(empty($this->project)) + { + $error = 'проект '.$projectName.' не найден'; + AmqpLoger::log('error', $error, 'system'); + throw new \Exception($error); + } + } + + public function getProject() + { + if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"])) + { + $error = 'не верный формат проекта очереди'; + AmqpLoger::log('error', $error, 'system'); + throw new \Exception($error); + } + return $this->project; + } + + public function getQueueObject($queueName) + { + if(!$this->getQueue($queueName)) + { + $error = 'Очередь ' . $queueName . ' не найдена'; + AmqpLoger::log('error', $error, 'system'); + throw new \Exception($error); + } + else + { + return $this->getQueue($queueName); + } + } + + public function ack($queue, $envelope) + { + try + { + $queue->ack($envelope->getDeliveryTag()); + } + catch (\Exception $exception) + { + AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system'); + throw new \Exception($exception->getCode() . ' ' . $exception->getMessage()); + } + } + + /** + * Добавляем произвольную запись в очередь + * @param string $projectName синоним обменника (используется в коде) + * @param string $routeKey псевдоним очереди + * @param array $params Поля передаваемого объекта + */ + public static function publishRaw($projectName, $routeKey, $sendParams) + { + $amqp = new AmqpHelper; + $amqp->setProject($projectName); + $queue = $amqp->getQueueConfig($routeKey); + $amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']); + AmqpLoger::log('producer', $sendParams, $routeKey); + return $sendParams; + } + + /** + * Добавляем запись в очередь + * @param string $routeKey псевдоним очереди + * @param array $params Поля передаваемого объекта + * @param string $object Тип передаваемого объекта + * @param string $api Версия API + */ + public static function publish($projectName, $routeKey, $params, $object, $api = '1.0') + { + $sendParams = [ + 'id' => uniqid() . '-' . uniqid(), + 'date' => date(\DateTime::ISO8601), + 'object' => $object, + 'api' => $api, + 'params' => $params + ]; + AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); + return $sendParams; + } + + /** + * Ставит в очередь для отправки письма + * @param type $routeKey + * @param type $params + * @param type $code + * @return string + */ + public static function sendMail($projectName, $routeKey, $params, $code) + { + $sendParams = [ + 'id' => uniqid() . '-' . uniqid(), + 'date' => date(\DateTime::ISO8601), + 'code' => $code, + 'params' => $params + ]; + AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); + return $sendParams; + } + + /** + * Ставит в очередь для обработки событий + * @param type $routeKey + * @param type $params + * @param type $code + * @return string + */ + public static function sendEvent($projectName, $routeKey, $params, $code) + { + $sendParams = [ + 'id' => uniqid() . '-' . uniqid(), + 'date' => date(\DateTime::ISO8601), + 'code' => $code, + 'params' => $params + ]; + AmqpHelper::publishRaw($projectName, $routeKey, $sendParams); + return $sendParams; + } + + +} diff --git a/AmqpLoger.php b/AmqpLoger.php new file mode 100644 index 0000000..2d4fd92 --- /dev/null +++ b/AmqpLoger.php @@ -0,0 +1,37 @@ +params['log']['maxFileSize']) && Yii::$app->params['log']['maxFileSize'] > 1) + { + $model->maxFileSize = Yii::$app->params['log']['maxFileSize']; + } + if (isset(Yii::$app->params['log']['maxLogFiles']) && Yii::$app->params['log']['maxLogFiles'] > 1) + { + $model->maxLogFiles = Yii::$app->params['log']['maxLogFiles']; + } + $model->logFile = Yii::getAlias(Yii::$app->params['log']['dir']) . $prefixFile . (empty($prefixFile) ? $type : ucfirst($type) ) . '.log'; + $model->messages = [$result]; + $model->export(); + } + + public function formatMessage($result) + { + return sprintf( + "[%s]\t%s\t%s", + date(\DateTime::ISO8601), + getmypid(), + json_encode($result, JSON_UNESCAPED_UNICODE) + ); + } + +} diff --git a/CustomDynamicModel.php b/CustomDynamicModel.php new file mode 100644 index 0000000..1becbdf --- /dev/null +++ b/CustomDynamicModel.php @@ -0,0 +1,30 @@ +hasAttribute($name)) { + $getter = 'get' . $name; + if (method_exists($this, $getter)) { + // read property, e.g. getName() + return $this->$getter(); + } + return null; + } + + return parent::__get($name); + } + + public function arrayValidate($attribute, $params) + { + if (!is_array($this->$attribute)) { + $this->addError($attribute, 'Неверный формат "'.$attribute.'"'); + } + } +} \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..35ea5d5 --- /dev/null +++ b/composer.json @@ -0,0 +1,21 @@ +{ + "name": "dominion/amqp", + "description": "Функционал для работы с очередями", + "type": "yii2-extension", + "keywords": ["yii2","extension"], + "license": "MIT", + "authors": [ + { + "name": "Rybkin Sasha", + "email": "ribkin@dominion.ru" + } + ], + "require": { + "yiisoft/yii2": "~2.0.0" + }, + "autoload": { + "psr-4": { + "dominion\\amqp\\": "" + } + } +} \ No newline at end of file diff --git a/console/AmqpController.php b/console/AmqpController.php new file mode 100644 index 0000000..917ebcd --- /dev/null +++ b/console/AmqpController.php @@ -0,0 +1,39 @@ + + * @since 0.1 + */ +class AmqpController extends Controller +{ + + /** + * Создать все очереди + * @return int Exit code + */ + public function actionCreateAllQueue() + { + $amqp = new AmqpHelper; + foreach(Yii::$app->params['amqp']['queue'] as $key =>$value) + { + $amqp->getQueueObject($key); + } + return ExitCode::OK; + } + +} diff --git a/console/BaseController.php b/console/BaseController.php new file mode 100644 index 0000000..c57c977 --- /dev/null +++ b/console/BaseController.php @@ -0,0 +1,160 @@ + uniqid() . '-' . uniqid(), + 'requestId' => isset($param['id']) ? $param['id'] : '', + 'date' => date(\DateTime::ISO8601), + 'success' => true, + ]; + if (!empty($result)) + { + $sendParams['result'] = $result; + $sendParams['resultType'] = $this->resultType; + } + if ($this->errorCode > 0) + { + $sendParams['errorCode'] = $this->errorCode; + $sendParams['errorMessage'] = $this->getErrorMessage(); + $sendParams['success'] = false; + } + $type = 'result'; + $prefixFile = $this->queueName; + if (!empty($this->sendQueueName)) + { + $type = 'producer'; + $prefixFile = $this->sendQueueName; + AmqpHelper::publishRaw($this->projectName, $this->sendQueueName, $sendParams); + } + AmqpLoger::log($type, $sendParams, $prefixFile); + return $sendParams; + } + + public function sendResult($queue, $envelope, $result) + { + $this->publish($result, json_decode($envelope->getBody(), true)); + $this->amqp->ack($queue, $envelope); + } + + public abstract function processMessage($params); + + public function proccess() + { + $this->amqp = new AmqpHelper; + $this->amqp->setProject($this->projectName); + $this->amqp->getQueueObject($this->queueName)->consume(function($envelope, $queue) + { + $value = null; + try + { + $this->errorCode = null; + $this->errors = null; + $body = $envelope->getBody(); + $params = json_decode($body, true); + AmqpLoger::log('consumer', empty($params) ? $body : $params, $this->queueName); + $value = $this->validate($params) ? $this->processMessage($params) : null; + } + catch (\Exception $exception) + { + $this->errorCode = 99; + $this->errors['system'] = $exception->getCode() . ' ' . $exception->getMessage(); + // throw $exception; + } + $this->sendResult($queue, $envelope, $value); + }); + } + + public function validate($params) + { + $rules = [ + //Поля сообщения запроса + 'request' => [ + [['id', 'date', 'object', 'api', 'params'], 'required'], + [['id', 'object'], 'string', 'max' => 255], + [['date'], 'string', 'max' => 50], + [['params'], 'arrayValidate'], + ], + //Поля сообщения ответа + 'response' => [ + [['id', 'requestId', 'date', 'success'], 'required'], + [['id', 'requestId'], 'string', 'max' => 255], + [['date'], 'string', 'max' => 50], + [['success'], 'boolean'], + [['errorCode'], 'integer'], + [['result'], 'arrayValidate'], + ], + ]; + if(isset($params['errorCode'])) + { + $errorMessage = $params['errorCode'] . ' ' . isset($this->arErrorCode[$params['errorCode']]) ? $this->arErrorCode[$params['errorCode']] : ''; + $this->setErrors([$params['errorCode'] => $errorMessage]); + return false; + } + return $this->validateArryByRules($params, $rules[$this->typeRule]); + } + + public function validateArryByRules($array, $rules, $errorPrefix = '') + { + if(!is_array($array)) + { + $this->setErrors(['Не верный формат массива'], $errorPrefix); + return false; + } + if($array == array_values($array)) //массив должен быть ассоциативным + { + $this->setErrors(['Не верный формат массива'], $errorPrefix); + return false; + } + $model = CustomDynamicModel::validateData($array, $rules); + if ($model->hasErrors()) + { + $this->setErrors($model->errors, $errorPrefix); + return false; + } + return true; + } + + public function getErrorMessage() + { + $output = ''; + foreach ($this->errors as $key => $value) + { + $output .= $key . ': "' . implode('", "', (array)$value) . '"; '; + } + return $output; + } + + public function setErrors($errors, $errorPrefix = '', $code = 98) + { + foreach ($errors as $key => $value) + { + $newKey = empty($errorPrefix) ? $key : ($errorPrefix . '.' . $key); + $this->errors[$newKey] = $value; + } + $this->errorCode = 98; + } + +}