100 lines
2.3 KiB
PHP
100 lines
2.3 KiB
PHP
<?php
|
|
|
|
namespace App\Services;
|
|
|
|
use App\Facade\UserConfig;
|
|
use App\QueueItem;
|
|
use PhpAmqpLib\Channel\AMQPChannel;
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
|
class RabbitMQService
|
|
{
|
|
protected $password;
|
|
|
|
protected $port;
|
|
|
|
protected $queue;
|
|
|
|
protected $server;
|
|
|
|
protected $username;
|
|
|
|
protected $vhost;
|
|
|
|
/**
|
|
* @var AMQPChannel
|
|
*/
|
|
private $channel;
|
|
|
|
/**
|
|
* @var AMQPStreamConnection
|
|
*/
|
|
private $connection;
|
|
|
|
public function __construct()
|
|
{
|
|
$this->server = UserConfig::get('rabbitmq_server');
|
|
$this->port = intval(UserConfig::get('rabbitmq_port'));
|
|
$this->username = UserConfig::get('rabbitmq_username');
|
|
$this->password = decrypt(UserConfig::get('rabbitmq_password'));
|
|
$this->queue = UserConfig::get('rabbitmq_queue');
|
|
|
|
$vhost = UserConfig::get('rabbitmq_vhost');
|
|
$this->vhost = empty($vhost) ? '/' : $vhost;
|
|
}
|
|
|
|
public function queueItem(QueueItem $queueItem)
|
|
{
|
|
$this->connectAndInit();
|
|
|
|
try
|
|
{
|
|
$message = new AMQPMessage(
|
|
$queueItem->id,
|
|
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
|
|
);
|
|
$this->channel->basic_publish($message, '', $this->queue);
|
|
}
|
|
finally
|
|
{
|
|
$this->disconnectAndCleanUp();
|
|
}
|
|
}
|
|
|
|
public function waitOnQueue($callback)
|
|
{
|
|
$this->connectAndInit();
|
|
|
|
try
|
|
{
|
|
$this->channel->basic_consume($this->queue, '', false, false, false, false, $callback);
|
|
|
|
while (count($this->channel->callbacks))
|
|
{
|
|
$this->channel->wait();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
$this->disconnectAndCleanUp();
|
|
}
|
|
}
|
|
|
|
private function connectAndInit()
|
|
{
|
|
$this->connection = new AMQPStreamConnection($this->server, $this->port, $this->username, $this->password, $this->vhost);
|
|
$this->channel = $this->connection->channel();
|
|
|
|
$this->channel->queue_declare($this->queue, false, true, false, false);
|
|
}
|
|
|
|
private function disconnectAndCleanUp()
|
|
{
|
|
$this->channel->close();
|
|
$this->connection->close();
|
|
|
|
$this->channel = null;
|
|
$this->connection = null;
|
|
}
|
|
} |