blue-twilight/app/Services/RabbitMQService.php

100 lines
2.3 KiB
PHP

<?php
namespace App\Services;
use App\Facade\UserConfig;
use App\QueueItem;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQService
{
protected $password;
protected $port;
protected $queue;
protected $server;
protected $username;
protected $vhost;
/**
* @var AMQPChannel
*/
private $channel;
/**
* @var AMQPStreamConnection
*/
private $connection;
public function __construct()
{
$this->server = UserConfig::get('rabbitmq_server');
$this->port = intval(UserConfig::get('rabbitmq_port'));
$this->username = UserConfig::get('rabbitmq_username');
$this->password = decrypt(UserConfig::get('rabbitmq_password'));
$this->queue = UserConfig::get('rabbitmq_queue');
$vhost = UserConfig::get('rabbitmq_vhost');
$this->vhost = empty($vhost) ? '/' : $vhost;
}
public function queueItem(QueueItem $queueItem)
{
$this->connectAndInit();
try
{
$message = new AMQPMessage(
$queueItem->id,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($message, '', $this->queue);
}
finally
{
$this->disconnectAndCleanUp();
}
}
public function waitOnQueue($callback)
{
$this->connectAndInit();
try
{
$this->channel->basic_consume($this->queue, '', false, false, false, false, $callback);
while (count($this->channel->callbacks))
{
$this->channel->wait();
}
}
finally
{
$this->disconnectAndCleanUp();
}
}
private function connectAndInit()
{
$this->connection = new AMQPStreamConnection($this->server, $this->port, $this->username, $this->password, $this->vhost);
$this->channel = $this->connection->channel();
$this->channel->queue_declare($this->queue, false, true, false, false);
}
private function disconnectAndCleanUp()
{
$this->channel->close();
$this->connection->close();
$this->channel = null;
$this->connection = null;
}
}