From 961603acd59826d689b18698f6151b56791576b7 Mon Sep 17 00:00:00 2001 From: Andy Heathershaw Date: Tue, 9 Jul 2019 22:03:54 +0100 Subject: [PATCH] #121: Photos are now being analysed by the queue --- app/Console/Commands/ProcessQueueCommand.php | 114 ++++++++++++++++++ app/Helpers/ConfigHelper.php | 11 ++ .../Controllers/Admin/AlbumController.php | 6 + .../Controllers/Admin/DefaultController.php | 1 + .../Controllers/Admin/PhotoController.php | 19 +++ app/QueueItem.php | 21 ++++ app/Services/RabbitMQService.php | 95 +++++++++++++++ ..._07_09_203137_create_queue_items_table.php | 45 +++++++ .../themes/base/admin/settings.blade.php | 11 ++ 9 files changed, 323 insertions(+) create mode 100644 app/Console/Commands/ProcessQueueCommand.php create mode 100644 app/QueueItem.php create mode 100644 app/Services/RabbitMQService.php create mode 100644 database/migrations/2019_07_09_203137_create_queue_items_table.php diff --git a/app/Console/Commands/ProcessQueueCommand.php b/app/Console/Commands/ProcessQueueCommand.php new file mode 100644 index 0000000..0832dff --- /dev/null +++ b/app/Console/Commands/ProcessQueueCommand.php @@ -0,0 +1,114 @@ +output->error('The image processing queue is not enabled'); + } + + $rabbitmq = new RabbitMQService(); + + $this->output->writeln('Monitoring queue'); + + $rabbitmq->waitOnQueue([$this, 'processQueueItem']); + } + + /** + * Processes a single item from the queue. + * + * @param $msg + * @return void + */ + public function processQueueItem($msg) + { + $queueItemID = intval($msg->body); + + $this->output->writeln(sprintf('Processing queue item %d', $queueItemID)); + + /** @var QueueItem $queueItem */ + $queueItem = QueueItem::where('id', $queueItemID)->first(); + if (is_null($queueItem)) + { + $this->output->writeln('Queue item does not exist; skipping'); + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + return; + } + + try + { + switch (strtolower($queueItem->action_type)) + { + case 'photo.analyse': + $this->processPhotoAnalyseMessage($queueItem); + break; + + default: + $this->output->writeln(sprintf('Action %s is not recognised, skipping', $queueItem->action_type)); + break; + } + } + finally + { + $queueItem->completed_at = new \DateTime(); + $queueItem->save(); + + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + } + } + + private function processPhotoAnalyseMessage(QueueItem $queueItem) + { + $this->output->writeln(sprintf('Analysing photo ID %l (batch: %s)', $queueItem->photo_id, $queueItem->batch_reference)); + + $photo = Photo::where('id', $queueItem->photo_id)->first(); + + if (is_null($photo)) + { + $this->output->writeln('Photo does not exist; skipping'); + } + + $photoService = new PhotoService($photo); + $photoService->analyse($queueItem->batch_reference); + } +} diff --git a/app/Helpers/ConfigHelper.php b/app/Helpers/ConfigHelper.php index a7d2bc3..7c04665 100644 --- a/app/Helpers/ConfigHelper.php +++ b/app/Helpers/ConfigHelper.php @@ -123,6 +123,7 @@ class ConfigHelper 'rabbitmq_server' => 'localhost', 'rabbitmq_password' => encrypt('guest'), 'rabbitmq_port' => 5672, + 'rabbitmq_queue' => 'blue_twilight', 'rabbitmq_username' => 'guest', 'recaptcha_enabled_registration' => false, 'recaptcha_secret_key' => '', @@ -199,6 +200,16 @@ class ConfigHelper return $config; } + public function isImageProcessingQueueEnabled() + { + return $this->get('rabbitmq_enabled') && + !empty($this->get('rabbitmq_server')) && + !empty($this->get('rabbitmq_port')) && + !empty($this->get('rabbitmq_username')) && + !empty($this->get('rabbitmq_password')) && + !empty($this->get('rabbitmq_queue')); + } + public function isSocialMediaLoginEnabled() { return $this->get('social_facebook_login') || diff --git a/app/Http/Controllers/Admin/AlbumController.php b/app/Http/Controllers/Admin/AlbumController.php index da86365..ce6ee10 100644 --- a/app/Http/Controllers/Admin/AlbumController.php +++ b/app/Http/Controllers/Admin/AlbumController.php @@ -67,6 +67,12 @@ class AlbumController extends Controller $this->authorizeAccessToAdminPanel('admin:manage-albums'); $album = $this->loadAlbum($id, 'upload-photos'); + + if (UserConfig::isImageProcessingQueueEnabled()) + { + dump($queue_token);exit(); + } + $photos = $album->photos() ->where('is_analysed', false) ->orderBy('created_at') diff --git a/app/Http/Controllers/Admin/DefaultController.php b/app/Http/Controllers/Admin/DefaultController.php index 6198363..794f750 100644 --- a/app/Http/Controllers/Admin/DefaultController.php +++ b/app/Http/Controllers/Admin/DefaultController.php @@ -268,6 +268,7 @@ class DefaultController extends Controller 'rabbitmq_port', 'rabbitmq_username', 'rabbitmq_password', + 'rabbitmq_queue', 'sender_address', 'sender_name', 'smtp_server', diff --git a/app/Http/Controllers/Admin/PhotoController.php b/app/Http/Controllers/Admin/PhotoController.php index 273916f..9e550ff 100644 --- a/app/Http/Controllers/Admin/PhotoController.php +++ b/app/Http/Controllers/Admin/PhotoController.php @@ -6,13 +6,16 @@ use App\Album; use App\AlbumSources\IAlbumSource; use App\Facade\Image; use App\Facade\Theme; +use App\Facade\UserConfig; use App\Helpers\FileHelper; use App\Helpers\ImageHelper; use App\Helpers\MiscHelper; use App\Http\Requests\UpdatePhotosBulkRequest; use App\Label; use App\Photo; +use App\QueueItem; use App\Services\PhotoService; +use App\Services\RabbitMQService; use App\Upload; use App\UploadPhoto; use App\User; @@ -304,6 +307,22 @@ class PhotoController extends Controller // Log an activity record for the user's feed $this->createActivityRecord($photo, 'photo.uploaded'); + // If queueing is enabled, store the photo in the queue now + if (UserConfig::isImageProcessingQueueEnabled()) + { + $queueItem = new QueueItem([ + 'batch_reference' => $queueUid, + 'action_type' => 'photo.analyse', + 'album_id' => $photo->album_id, + 'photo_id' => $photo->id, + 'queued_at' => new \DateTime() + ]); + $queueItem->save(); + + $rabbitmq = new RabbitMQService(); + $rabbitmq->queueItem($queueItem); + } + $isSuccessful = true; } } diff --git a/app/QueueItem.php b/app/QueueItem.php new file mode 100644 index 0000000..a1f5a96 --- /dev/null +++ b/app/QueueItem.php @@ -0,0 +1,21 @@ +server = UserConfig::get('rabbitmq_server'); + $this->port = intval(UserConfig::get('rabbitmq_port')); + $this->username = UserConfig::get('rabbitmq_username'); + $this->password = decrypt(UserConfig::get('rabbitmq_password')); + $this->queue = UserConfig::get('rabbitmq_queue'); + } + + public function queueItem(QueueItem $queueItem) + { + $this->connectAndInit(); + + try + { + $message = new AMQPMessage( + $queueItem->id, + array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) + ); + $this->channel->basic_publish($message, '', $this->queue); + } + finally + { + $this->disconnectAndCleanUp(); + } + } + + public function waitOnQueue($callback) + { + $this->connectAndInit(); + + try + { + $this->channel->basic_consume($this->queue, '', false, false, false, false, $callback); + + while (count($this->channel->callbacks)) + { + $this->channel->wait(); + } + } + finally + { + $this->disconnectAndCleanUp(); + } + } + + private function connectAndInit() + { + $this->connection = new AMQPStreamConnection($this->server, $this->port, $this->username, $this->password); + $this->channel = $this->connection->channel(); + + $this->channel->queue_declare($this->queue, false, true, false, false); + } + + private function disconnectAndCleanUp() + { + $this->channel->close(); + $this->connection->close(); + + $this->channel = null; + $this->connection = null; + } +} \ No newline at end of file diff --git a/database/migrations/2019_07_09_203137_create_queue_items_table.php b/database/migrations/2019_07_09_203137_create_queue_items_table.php new file mode 100644 index 0000000..2f16272 --- /dev/null +++ b/database/migrations/2019_07_09_203137_create_queue_items_table.php @@ -0,0 +1,45 @@ +increments('id'); + $table->string('batch_reference')->nullable(true); + $table->string('action_type', 20); + $table->unsignedInteger('album_id'); + $table->unsignedBigInteger('photo_id'); + $table->dateTime('queued_at'); + $table->dateTime('completed_at')->nullable(true); + $table->timestamps(); + + $table->foreign('album_id') + ->references('id')->on('albums') + ->onDelete('cascade'); + + $table->foreign('photo_id') + ->references('id')->on('photos') + ->onDelete('cascade'); + }); + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::dropIfExists('queue_items'); + } +} diff --git a/resources/views/themes/base/admin/settings.blade.php b/resources/views/themes/base/admin/settings.blade.php index 3b61916..e65f4c8 100644 --- a/resources/views/themes/base/admin/settings.blade.php +++ b/resources/views/themes/base/admin/settings.blade.php @@ -182,6 +182,17 @@ @endif + +
+ + + + @if ($errors->has('rabbitmq_queue')) +
+ {{ $errors->first('rabbitmq_queue') }} +
+ @endif +
{{-- E-mail --}}