267 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			267 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
<?php
 | 
						||
 | 
						||
namespace dominion\amqp;
 | 
						||
 | 
						||
use Yii;
 | 
						||
use dominion\amqp\AmqpLoger;
 | 
						||
 | 
						||
class AmqpHelper
 | 
						||
{
 | 
						||
 | 
						||
    protected static $connection;
 | 
						||
    protected static $channel;
 | 
						||
    protected static $exchange;
 | 
						||
    protected static $arQueue;
 | 
						||
    protected static $prefetchCount = 1;
 | 
						||
 | 
						||
    protected $project;
 | 
						||
 | 
						||
    public function setPrefetchCount(int $count)
 | 
						||
    {
 | 
						||
        self::$prefetchCount = $count;
 | 
						||
    }
 | 
						||
 | 
						||
    public function getController($queue)
 | 
						||
    {
 | 
						||
        $key = json_encode($queue);
 | 
						||
        if (!isset(self::$connection[$key]))
 | 
						||
        {
 | 
						||
            $credentials = ['heartbeat' => $queue['heartbeat']] ;
 | 
						||
            self::$connection[$key] = new \AMQPConnection($credentials);
 | 
						||
            self::$connection[$key]->setLogin(Yii::$app->params['amqp']['login']);
 | 
						||
            self::$connection[$key]->setPassword(Yii::$app->params['amqp']["pass"]);
 | 
						||
            self::$connection[$key]->setHost(Yii::$app->params['amqp']["serverName"]);
 | 
						||
            self::$connection[$key]->setPort(Yii::$app->params['amqp']["port"]);
 | 
						||
            self::$connection[$key]->setVhost(Yii::$app->params['amqp']["vhostName"]);
 | 
						||
            try
 | 
						||
            {
 | 
						||
                self::$connection[$key]->connect();
 | 
						||
            } catch (\AMQPConnectionException $exception)
 | 
						||
            {
 | 
						||
                AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
 | 
						||
                throw $exception;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        return self::$connection[$key];
 | 
						||
    }
 | 
						||
 | 
						||
    public function getChannel($queue)
 | 
						||
    {
 | 
						||
        $key = json_encode($queue);
 | 
						||
        if (!isset(self::$channel[$key]))
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                self::$channel[$key] = new \AMQPChannel($this->getController($queue));
 | 
						||
                self::$channel[$key]->setPrefetchCount(self::$prefetchCount);
 | 
						||
            } catch (\AMQPConnectionException $exception)
 | 
						||
            {
 | 
						||
                AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
 | 
						||
                throw $exception;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        return self::$channel[$key];
 | 
						||
    }
 | 
						||
 | 
						||
    public function getExchange($queue)
 | 
						||
    {
 | 
						||
        $project = $this->getProject();
 | 
						||
        $key = $project['name'].'_'.$project['type'].'_'.$project['flags'];
 | 
						||
        if (!isset(self::$exchange[$key]))
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                self::$exchange[$key] = new \AMQPExchange($this->getChannel($queue));
 | 
						||
                self::$exchange[$key]->setName($project['name']);
 | 
						||
                self::$exchange[$key]->setType($project['type']);
 | 
						||
                self::$exchange[$key]->setFlags($project['flags']);
 | 
						||
                self::$exchange[$key]->declareExchange();
 | 
						||
            } catch (\AMQPConnectionException | \AMQPChannelException | \AMQPExchangeException $exception)
 | 
						||
            {
 | 
						||
                AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
 | 
						||
                throw $exception;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        return self::$exchange[$key];
 | 
						||
    }
 | 
						||
 | 
						||
    public function getQueue($routeKey)
 | 
						||
    {
 | 
						||
        $queue = $this->getQueueConfig($routeKey);
 | 
						||
        $queueName = $queue['name'];
 | 
						||
        if(!isset(self::$arQueue[$queueName]))
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                self::$arQueue[$queueName] = new \AMQPQueue($this->getChannel($queue));
 | 
						||
                self::$arQueue[$queueName]->setName($queueName);
 | 
						||
                self::$arQueue[$queueName]->setFlags($queue['flags']);
 | 
						||
                self::$arQueue[$queueName]->declareQueue();
 | 
						||
 | 
						||
                foreach ($queue['routing'] as $routing)
 | 
						||
                {
 | 
						||
                    self::$arQueue[$queueName]->bind($this->getExchange($queue)->getName(), $routing);
 | 
						||
                }
 | 
						||
            } catch (\AMQPChannelException | \AMQPConnectionException | \AMQPQueueException $exception)
 | 
						||
            {
 | 
						||
                AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
 | 
						||
                throw $exception;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        return self::$arQueue[$queueName];
 | 
						||
    }
 | 
						||
 | 
						||
    /**
 | 
						||
     *
 | 
						||
     * @param type $routeKey
 | 
						||
     * @return type
 | 
						||
     * @throws \Exception
 | 
						||
     */
 | 
						||
    public function getQueueConfig($routeKey)
 | 
						||
    {
 | 
						||
        $project = $this->getProject();
 | 
						||
        if (!isset($project['queues'][$routeKey]) || empty($project['queues'][$routeKey]))
 | 
						||
        {
 | 
						||
            $error = 'Ключ маршрутизации '.$routeKey.' не найден';
 | 
						||
            AmqpLoger::log('error', $error, 'system');
 | 
						||
            throw new \Exception($error);
 | 
						||
        }
 | 
						||
        elseif(!isset ($project['queues'][$routeKey]["name"], $project['queues'][$routeKey]["routing"], $project['queues'][$routeKey]["flags"], $project['queues'][$routeKey]["heartbeat"]))
 | 
						||
        {
 | 
						||
            $error = 'Не верный формат '.$routeKey;
 | 
						||
            AmqpLoger::log('error', $error, 'system');
 | 
						||
            throw new \Exception($error);
 | 
						||
        }
 | 
						||
        else
 | 
						||
        {
 | 
						||
            return $project['queues'][$routeKey];
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    public function setProject($projectName)
 | 
						||
    {
 | 
						||
        $projects = isset(Yii::$app->params['amqp'], Yii::$app->params['amqp']['projects']) ? Yii::$app->params['amqp']['projects'] : [];
 | 
						||
        $this->project = isset($projects[$projectName]) ? $projects[$projectName] : [];
 | 
						||
        if(empty($this->project))
 | 
						||
        {
 | 
						||
            $error = 'проект '.$projectName.' не найден';
 | 
						||
            AmqpLoger::log('error', $error, 'system');
 | 
						||
            throw new \Exception($error);
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    public function getProject()
 | 
						||
    {
 | 
						||
        if(!isset($this->project["name"], $this->project["type"], $this->project["flags"], $this->project["queues"]))
 | 
						||
        {
 | 
						||
            $error = 'не верный формат проекта очереди';
 | 
						||
            AmqpLoger::log('error', $error, 'system');
 | 
						||
            throw new \Exception($error);
 | 
						||
        }
 | 
						||
        return $this->project;
 | 
						||
    }
 | 
						||
 | 
						||
    public function getQueueObject($queueName)
 | 
						||
    {
 | 
						||
        if(!$this->getQueue($queueName))
 | 
						||
        {
 | 
						||
            $error = 'Очередь ' . $queueName . ' не найдена';
 | 
						||
            AmqpLoger::log('error', $error, 'system');
 | 
						||
            throw new \Exception($error);
 | 
						||
        }
 | 
						||
        else
 | 
						||
        {
 | 
						||
            return $this->getQueue($queueName);
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    public function ack($queue, $envelope)
 | 
						||
    {
 | 
						||
        try
 | 
						||
        {
 | 
						||
            $queue->ack($envelope->getDeliveryTag());
 | 
						||
        }
 | 
						||
        catch (\Exception $exception)
 | 
						||
        {
 | 
						||
            AmqpLoger::log('error', $exception->getCode() . ' ' . $exception->getMessage(), 'system');
 | 
						||
            throw new \Exception($exception->getCode() . ' ' . $exception->getMessage());
 | 
						||
        }
 | 
						||
    }
 | 
						||
 | 
						||
    /**
 | 
						||
     * Добавляем произвольную запись в очередь
 | 
						||
     * @param string $projectName синоним обменника (используется в коде)
 | 
						||
     * @param string $routeKey псевдоним очереди
 | 
						||
     * @param array $params Поля передаваемого объекта
 | 
						||
     */
 | 
						||
    public static function publishRaw($projectName, $routeKey, $sendParams)
 | 
						||
    {
 | 
						||
        $amqp = new AmqpHelper;
 | 
						||
        $amqp->setProject($projectName);
 | 
						||
        $queue = $amqp->getQueueConfig($routeKey);
 | 
						||
        $amqp->getExchange($queue)->publish(json_encode($sendParams), $queue['name'], AMQP_NOPARAM, ['delivery_mode' => 2, 'content_type' => 'application/json']);
 | 
						||
        AmqpLoger::log('producer', $sendParams, $routeKey);
 | 
						||
        return $sendParams;
 | 
						||
    }
 | 
						||
 | 
						||
    /**
 | 
						||
     * Добавляем запись в очередь
 | 
						||
     * @param string $routeKey псевдоним очереди
 | 
						||
     * @param array $params Поля передаваемого объекта
 | 
						||
     * @param string $object Тип передаваемого объекта
 | 
						||
     * @param string $api Версия API
 | 
						||
     */
 | 
						||
    public static function publish($projectName, $routeKey, $params, $object, $api = '1.0')
 | 
						||
    {
 | 
						||
        $sendParams = [
 | 
						||
            'id' => uniqid() . '-' . uniqid(),
 | 
						||
            'date' => date(\DateTime::ISO8601),
 | 
						||
            'object' => $object,
 | 
						||
            'api' => $api,
 | 
						||
            'params' => $params
 | 
						||
        ];
 | 
						||
        AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
 | 
						||
        return $sendParams;
 | 
						||
    }
 | 
						||
 | 
						||
    /**
 | 
						||
     * Ставит в очередь для отправки письма
 | 
						||
     * @param type $routeKey
 | 
						||
     * @param type $params
 | 
						||
     * @param type $code
 | 
						||
     * @return string
 | 
						||
     */
 | 
						||
    public static function sendMail($projectName, $routeKey, $params, $code)
 | 
						||
    {
 | 
						||
        $sendParams = [
 | 
						||
            'id' => uniqid() . '-' . uniqid(),
 | 
						||
            'date' => date(\DateTime::ISO8601),
 | 
						||
            'code' => $code,
 | 
						||
            'params' => $params
 | 
						||
        ];
 | 
						||
        AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
 | 
						||
        return $sendParams;
 | 
						||
    }
 | 
						||
 | 
						||
    /**
 | 
						||
     * Ставит в очередь для обработки событий
 | 
						||
     * @param type $routeKey
 | 
						||
     * @param type $params
 | 
						||
     * @param type $code
 | 
						||
     * @return string
 | 
						||
     */
 | 
						||
    public static function sendEvent($projectName, $routeKey, $params, $code)
 | 
						||
    {
 | 
						||
        $sendParams = [
 | 
						||
            'id' => uniqid() . '-' . uniqid(),
 | 
						||
            'date' => date(\DateTime::ISO8601),
 | 
						||
            'code' => $code,
 | 
						||
            'params' => $params
 | 
						||
        ];
 | 
						||
        AmqpHelper::publishRaw($projectName, $routeKey, $sendParams);
 | 
						||
        return $sendParams;
 | 
						||
    }
 | 
						||
 | 
						||
 | 
						||
}
 |