Merge feature/121-rabbitmq-queuing #122
114
app/Console/Commands/ProcessQueueCommand.php
Normal file
114
app/Console/Commands/ProcessQueueCommand.php
Normal file
@ -0,0 +1,114 @@
|
||||
<?php
|
||||
|
||||
namespace App\Console\Commands;
|
||||
|
||||
use App\Facade\UserConfig;
|
||||
use App\Photo;
|
||||
use App\QueueItem;
|
||||
use App\Services\PhotoService;
|
||||
use App\Services\RabbitMQService;
|
||||
use Illuminate\Console\Command;
|
||||
|
||||
class ProcessQueueCommand extends Command
|
||||
{
|
||||
/**
|
||||
* The name and signature of the console command.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $signature = 'queue:process';
|
||||
|
||||
/**
|
||||
* The console command description.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $description = 'Processes items in the processing queue.';
|
||||
|
||||
/**
|
||||
* Create a new command instance.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the console command.
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function handle()
|
||||
{
|
||||
if (!UserConfig::isImageProcessingQueueEnabled())
|
||||
{
|
||||
$this->output->error('The image processing queue is not enabled');
|
||||
}
|
||||
|
||||
$rabbitmq = new RabbitMQService();
|
||||
|
||||
$this->output->writeln('Monitoring queue');
|
||||
|
||||
$rabbitmq->waitOnQueue([$this, 'processQueueItem']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a single item from the queue.
|
||||
*
|
||||
* @param $msg
|
||||
* @return void
|
||||
*/
|
||||
public function processQueueItem($msg)
|
||||
{
|
||||
$queueItemID = intval($msg->body);
|
||||
|
||||
$this->output->writeln(sprintf('Processing queue item %d', $queueItemID));
|
||||
|
||||
/** @var QueueItem $queueItem */
|
||||
$queueItem = QueueItem::where('id', $queueItemID)->first();
|
||||
if (is_null($queueItem))
|
||||
{
|
||||
$this->output->writeln('Queue item does not exist; skipping');
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
switch (strtolower($queueItem->action_type))
|
||||
{
|
||||
case 'photo.analyse':
|
||||
$this->processPhotoAnalyseMessage($queueItem);
|
||||
break;
|
||||
|
||||
default:
|
||||
$this->output->writeln(sprintf('Action %s is not recognised, skipping', $queueItem->action_type));
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
$queueItem->completed_at = new \DateTime();
|
||||
$queueItem->save();
|
||||
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
}
|
||||
}
|
||||
|
||||
private function processPhotoAnalyseMessage(QueueItem $queueItem)
|
||||
{
|
||||
$this->output->writeln(sprintf('Analysing photo ID %l (batch: %s)', $queueItem->photo_id, $queueItem->batch_reference));
|
||||
|
||||
$photo = Photo::where('id', $queueItem->photo_id)->first();
|
||||
|
||||
if (is_null($photo))
|
||||
{
|
||||
$this->output->writeln('Photo does not exist; skipping');
|
||||
}
|
||||
|
||||
$photoService = new PhotoService($photo);
|
||||
$photoService->analyse($queueItem->batch_reference);
|
||||
}
|
||||
}
|
@ -123,6 +123,7 @@ class ConfigHelper
|
||||
'rabbitmq_server' => 'localhost',
|
||||
'rabbitmq_password' => encrypt('guest'),
|
||||
'rabbitmq_port' => 5672,
|
||||
'rabbitmq_queue' => 'blue_twilight',
|
||||
'rabbitmq_username' => 'guest',
|
||||
'recaptcha_enabled_registration' => false,
|
||||
'recaptcha_secret_key' => '',
|
||||
@ -199,6 +200,16 @@ class ConfigHelper
|
||||
return $config;
|
||||
}
|
||||
|
||||
public function isImageProcessingQueueEnabled()
|
||||
{
|
||||
return $this->get('rabbitmq_enabled') &&
|
||||
!empty($this->get('rabbitmq_server')) &&
|
||||
!empty($this->get('rabbitmq_port')) &&
|
||||
!empty($this->get('rabbitmq_username')) &&
|
||||
!empty($this->get('rabbitmq_password')) &&
|
||||
!empty($this->get('rabbitmq_queue'));
|
||||
}
|
||||
|
||||
public function isSocialMediaLoginEnabled()
|
||||
{
|
||||
return $this->get('social_facebook_login') ||
|
||||
|
@ -67,6 +67,12 @@ class AlbumController extends Controller
|
||||
$this->authorizeAccessToAdminPanel('admin:manage-albums');
|
||||
|
||||
$album = $this->loadAlbum($id, 'upload-photos');
|
||||
|
||||
if (UserConfig::isImageProcessingQueueEnabled())
|
||||
{
|
||||
dump($queue_token);exit();
|
||||
}
|
||||
|
||||
$photos = $album->photos()
|
||||
->where('is_analysed', false)
|
||||
->orderBy('created_at')
|
||||
|
@ -268,6 +268,7 @@ class DefaultController extends Controller
|
||||
'rabbitmq_port',
|
||||
'rabbitmq_username',
|
||||
'rabbitmq_password',
|
||||
'rabbitmq_queue',
|
||||
'sender_address',
|
||||
'sender_name',
|
||||
'smtp_server',
|
||||
|
@ -6,13 +6,16 @@ use App\Album;
|
||||
use App\AlbumSources\IAlbumSource;
|
||||
use App\Facade\Image;
|
||||
use App\Facade\Theme;
|
||||
use App\Facade\UserConfig;
|
||||
use App\Helpers\FileHelper;
|
||||
use App\Helpers\ImageHelper;
|
||||
use App\Helpers\MiscHelper;
|
||||
use App\Http\Requests\UpdatePhotosBulkRequest;
|
||||
use App\Label;
|
||||
use App\Photo;
|
||||
use App\QueueItem;
|
||||
use App\Services\PhotoService;
|
||||
use App\Services\RabbitMQService;
|
||||
use App\Upload;
|
||||
use App\UploadPhoto;
|
||||
use App\User;
|
||||
@ -304,6 +307,22 @@ class PhotoController extends Controller
|
||||
// Log an activity record for the user's feed
|
||||
$this->createActivityRecord($photo, 'photo.uploaded');
|
||||
|
||||
// If queueing is enabled, store the photo in the queue now
|
||||
if (UserConfig::isImageProcessingQueueEnabled())
|
||||
{
|
||||
$queueItem = new QueueItem([
|
||||
'batch_reference' => $queueUid,
|
||||
'action_type' => 'photo.analyse',
|
||||
'album_id' => $photo->album_id,
|
||||
'photo_id' => $photo->id,
|
||||
'queued_at' => new \DateTime()
|
||||
]);
|
||||
$queueItem->save();
|
||||
|
||||
$rabbitmq = new RabbitMQService();
|
||||
$rabbitmq->queueItem($queueItem);
|
||||
}
|
||||
|
||||
$isSuccessful = true;
|
||||
}
|
||||
}
|
||||
|
21
app/QueueItem.php
Normal file
21
app/QueueItem.php
Normal file
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
namespace App;
|
||||
|
||||
use Illuminate\Database\Eloquent\Model;
|
||||
|
||||
class QueueItem extends Model
|
||||
{
|
||||
/**
|
||||
* The attributes that are mass assignable.
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $fillable = [
|
||||
'batch_reference',
|
||||
'action_type',
|
||||
'album_id',
|
||||
'photo_id',
|
||||
'queued_at'
|
||||
];
|
||||
}
|
95
app/Services/RabbitMQService.php
Normal file
95
app/Services/RabbitMQService.php
Normal file
@ -0,0 +1,95 @@
|
||||
<?php
|
||||
|
||||
namespace App\Services;
|
||||
|
||||
use App\Facade\UserConfig;
|
||||
use App\QueueItem;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
|
||||
class RabbitMQService
|
||||
{
|
||||
protected $password;
|
||||
|
||||
protected $port;
|
||||
|
||||
protected $queue;
|
||||
|
||||
protected $server;
|
||||
|
||||
protected $username;
|
||||
|
||||
/**
|
||||
* @var AMQPChannel
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/**
|
||||
* @var AMQPStreamConnection
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this->server = UserConfig::get('rabbitmq_server');
|
||||
$this->port = intval(UserConfig::get('rabbitmq_port'));
|
||||
$this->username = UserConfig::get('rabbitmq_username');
|
||||
$this->password = decrypt(UserConfig::get('rabbitmq_password'));
|
||||
$this->queue = UserConfig::get('rabbitmq_queue');
|
||||
}
|
||||
|
||||
public function queueItem(QueueItem $queueItem)
|
||||
{
|
||||
$this->connectAndInit();
|
||||
|
||||
try
|
||||
{
|
||||
$message = new AMQPMessage(
|
||||
$queueItem->id,
|
||||
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
|
||||
);
|
||||
$this->channel->basic_publish($message, '', $this->queue);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->disconnectAndCleanUp();
|
||||
}
|
||||
}
|
||||
|
||||
public function waitOnQueue($callback)
|
||||
{
|
||||
$this->connectAndInit();
|
||||
|
||||
try
|
||||
{
|
||||
$this->channel->basic_consume($this->queue, '', false, false, false, false, $callback);
|
||||
|
||||
while (count($this->channel->callbacks))
|
||||
{
|
||||
$this->channel->wait();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->disconnectAndCleanUp();
|
||||
}
|
||||
}
|
||||
|
||||
private function connectAndInit()
|
||||
{
|
||||
$this->connection = new AMQPStreamConnection($this->server, $this->port, $this->username, $this->password);
|
||||
$this->channel = $this->connection->channel();
|
||||
|
||||
$this->channel->queue_declare($this->queue, false, true, false, false);
|
||||
}
|
||||
|
||||
private function disconnectAndCleanUp()
|
||||
{
|
||||
$this->channel->close();
|
||||
$this->connection->close();
|
||||
|
||||
$this->channel = null;
|
||||
$this->connection = null;
|
||||
}
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
<?php
|
||||
|
||||
use Illuminate\Support\Facades\Schema;
|
||||
use Illuminate\Database\Schema\Blueprint;
|
||||
use Illuminate\Database\Migrations\Migration;
|
||||
|
||||
class CreateQueueItemsTable extends Migration
|
||||
{
|
||||
/**
|
||||
* Run the migrations.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function up()
|
||||
{
|
||||
Schema::create('queue_items', function (Blueprint $table) {
|
||||
$table->increments('id');
|
||||
$table->string('batch_reference')->nullable(true);
|
||||
$table->string('action_type', 20);
|
||||
$table->unsignedInteger('album_id');
|
||||
$table->unsignedBigInteger('photo_id');
|
||||
$table->dateTime('queued_at');
|
||||
$table->dateTime('completed_at')->nullable(true);
|
||||
$table->timestamps();
|
||||
|
||||
$table->foreign('album_id')
|
||||
->references('id')->on('albums')
|
||||
->onDelete('cascade');
|
||||
|
||||
$table->foreign('photo_id')
|
||||
->references('id')->on('photos')
|
||||
->onDelete('cascade');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverse the migrations.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function down()
|
||||
{
|
||||
Schema::dropIfExists('queue_items');
|
||||
}
|
||||
}
|
@ -182,6 +182,17 @@
|
||||
</div>
|
||||
@endif
|
||||
</div>
|
||||
|
||||
<div class="form-group ml-4">
|
||||
<label class="form-control-label" for="rabbitmq-queue">Queue:</label>
|
||||
<input type="text" class="form-control{{ $errors->has('rabbitmq_queue') ? ' is-invalid' : '' }}" id="rabbitmq-queue" name="rabbitmq_queue" value="{{ old('rabbitmq_queue', $config['rabbitmq_queue']) }}">
|
||||
|
||||
@if ($errors->has('rabbitmq_queue'))
|
||||
<div class="invalid-feedback">
|
||||
<strong>{{ $errors->first('rabbitmq_queue') }}</strong>
|
||||
</div>
|
||||
@endif
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{{-- E-mail --}}
|
||||
|
Loading…
Reference in New Issue
Block a user