159 lines
4.3 KiB
PHP
159 lines
4.3 KiB
PHP
<?php
|
|
|
|
namespace App\Console\Commands;
|
|
|
|
use App\Facade\UserConfig;
|
|
use App\Photo;
|
|
use App\QueueItem;
|
|
use App\Services\PhotoService;
|
|
use App\Services\RabbitMQService;
|
|
use App\UserActivity;
|
|
use Illuminate\Console\Command;
|
|
|
|
class ProcessQueueCommand extends Command
|
|
{
|
|
/**
|
|
* The name and signature of the console command.
|
|
*
|
|
* @var string
|
|
*/
|
|
protected $signature = 'queue:process';
|
|
|
|
/**
|
|
* The console command description.
|
|
*
|
|
* @var string
|
|
*/
|
|
protected $description = 'Processes items in the processing queue.';
|
|
|
|
/**
|
|
* Create a new command instance.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function __construct()
|
|
{
|
|
parent::__construct();
|
|
}
|
|
|
|
/**
|
|
* Execute the console command.
|
|
*
|
|
* @return mixed
|
|
*/
|
|
public function handle()
|
|
{
|
|
if (!UserConfig::isImageProcessingQueueEnabled())
|
|
{
|
|
$this->output->error('The image processing queue is not enabled');
|
|
}
|
|
|
|
$rabbitmq = new RabbitMQService();
|
|
|
|
$this->output->writeln('Monitoring queue');
|
|
|
|
$rabbitmq->waitOnQueue([$this, 'processQueueItem']);
|
|
}
|
|
|
|
/**
|
|
* Processes a single item from the queue.
|
|
*
|
|
* @param $msg
|
|
* @return void
|
|
*/
|
|
public function processQueueItem($msg)
|
|
{
|
|
$queueItemID = intval($msg->body);
|
|
|
|
$this->output->writeln(sprintf('Processing queue item %d', $queueItemID));
|
|
|
|
/** @var QueueItem $queueItem */
|
|
$queueItem = QueueItem::where('id', $queueItemID)->first();
|
|
if (is_null($queueItem))
|
|
{
|
|
$this->output->writeln('Queue item does not exist; skipping');
|
|
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
switch (strtolower($queueItem->action_type))
|
|
{
|
|
case 'photo.analyse':
|
|
$this->processPhotoAnalyseMessage($queueItem);
|
|
break;
|
|
|
|
default:
|
|
$this->output->writeln(sprintf('Action %s is not recognised, skipping', $queueItem->action_type));
|
|
return;
|
|
}
|
|
|
|
$queueItem->completed_at = new \DateTime();
|
|
$queueItem->save();
|
|
}
|
|
catch (\Exception $ex)
|
|
{
|
|
$this->output->error($ex->getMessage());
|
|
}
|
|
finally
|
|
{
|
|
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
|
}
|
|
}
|
|
|
|
private function createActivityRecord(Photo $photo, $type, $activityDateTime = null)
|
|
{
|
|
if (is_null($activityDateTime))
|
|
{
|
|
$activityDateTime = new \DateTime();
|
|
}
|
|
|
|
$userActivity = new UserActivity();
|
|
$userActivity->user_id = $photo->user_id;
|
|
$userActivity->activity_at = $activityDateTime;
|
|
$userActivity->type = $type;
|
|
$userActivity->photo_id = $photo->id;
|
|
$userActivity->save();
|
|
}
|
|
|
|
private function processPhotoAnalyseMessage(QueueItem $queueItem)
|
|
{
|
|
$this->output->writeln(sprintf('Analysing photo ID %l (batch: %s)', $queueItem->photo_id, $queueItem->batch_reference));
|
|
|
|
$photo = Photo::where('id', $queueItem->photo_id)->first();
|
|
|
|
if (is_null($photo))
|
|
{
|
|
$this->output->writeln('Photo does not exist; skipping');
|
|
}
|
|
|
|
/* IF CHANGING THIS LOGIC, ALSO CHECK PhotoController::analyse */
|
|
$photoService = new PhotoService($photo);
|
|
$photoService->analyse($queueItem->batch_reference);
|
|
|
|
// Log an activity record for the user's feed (remove an existing one as the date may have changed)
|
|
$this->removeExistingActivityRecords($photo, 'photo.taken');
|
|
|
|
if (!is_null($photo->taken_at))
|
|
{
|
|
// Log an activity record for the user's feed
|
|
$this->createActivityRecord($photo, 'photo.taken', $photo->taken_at);
|
|
}
|
|
}
|
|
|
|
private function removeExistingActivityRecords(Photo $photo, $type)
|
|
{
|
|
$existingFeedRecords = UserActivity::where([
|
|
'user_id' => $photo->user_id,
|
|
'photo_id' => $photo->id,
|
|
'type' => $type
|
|
])->get();
|
|
|
|
foreach ($existingFeedRecords as $existingFeedRecord)
|
|
{
|
|
$existingFeedRecord->delete();
|
|
}
|
|
}
|
|
}
|