Merge feature/121-rabbitmq-queuing #122
@ -7,6 +7,7 @@ use App\Photo;
|
||||
use App\QueueItem;
|
||||
use App\Services\PhotoService;
|
||||
use App\Services\RabbitMQService;
|
||||
use App\UserActivity;
|
||||
use Illuminate\Console\Command;
|
||||
|
||||
class ProcessQueueCommand extends Command
|
||||
@ -85,18 +86,37 @@ class ProcessQueueCommand extends Command
|
||||
|
||||
default:
|
||||
$this->output->writeln(sprintf('Action %s is not recognised, skipping', $queueItem->action_type));
|
||||
break;
|
||||
return;
|
||||
}
|
||||
|
||||
$queueItem->completed_at = new \DateTime();
|
||||
$queueItem->save();
|
||||
}
|
||||
catch (\Exception $ex)
|
||||
{
|
||||
$this->output->error($ex->getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
$queueItem->completed_at = new \DateTime();
|
||||
$queueItem->save();
|
||||
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
}
|
||||
}
|
||||
|
||||
private function createActivityRecord(Photo $photo, $type, $activityDateTime = null)
|
||||
{
|
||||
if (is_null($activityDateTime))
|
||||
{
|
||||
$activityDateTime = new \DateTime();
|
||||
}
|
||||
|
||||
$userActivity = new UserActivity();
|
||||
$userActivity->user_id = $photo->user_id;
|
||||
$userActivity->activity_at = $activityDateTime;
|
||||
$userActivity->type = $type;
|
||||
$userActivity->photo_id = $photo->id;
|
||||
$userActivity->save();
|
||||
}
|
||||
|
||||
private function processPhotoAnalyseMessage(QueueItem $queueItem)
|
||||
{
|
||||
$this->output->writeln(sprintf('Analysing photo ID %l (batch: %s)', $queueItem->photo_id, $queueItem->batch_reference));
|
||||
@ -108,7 +128,31 @@ class ProcessQueueCommand extends Command
|
||||
$this->output->writeln('Photo does not exist; skipping');
|
||||
}
|
||||
|
||||
/* IF CHANGING THIS LOGIC, ALSO CHECK PhotoController::analyse */
|
||||
$photoService = new PhotoService($photo);
|
||||
$photoService->analyse($queueItem->batch_reference);
|
||||
|
||||
// Log an activity record for the user's feed (remove an existing one as the date may have changed)
|
||||
$this->removeExistingActivityRecords($photo, 'photo.taken');
|
||||
|
||||
if (!is_null($photo->taken_at))
|
||||
{
|
||||
// Log an activity record for the user's feed
|
||||
$this->createActivityRecord($photo, 'photo.taken', $photo->taken_at);
|
||||
}
|
||||
}
|
||||
|
||||
private function removeExistingActivityRecords(Photo $photo, $type)
|
||||
{
|
||||
$existingFeedRecords = UserActivity::where([
|
||||
'user_id' => $photo->user_id,
|
||||
'photo_id' => $photo->id,
|
||||
'type' => $type
|
||||
])->get();
|
||||
|
||||
foreach ($existingFeedRecords as $existingFeedRecord)
|
||||
{
|
||||
$existingFeedRecord->delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -67,12 +67,6 @@ class AlbumController extends Controller
|
||||
$this->authorizeAccessToAdminPanel('admin:manage-albums');
|
||||
|
||||
$album = $this->loadAlbum($id, 'upload-photos');
|
||||
|
||||
if (UserConfig::isImageProcessingQueueEnabled())
|
||||
{
|
||||
dump($queue_token);exit();
|
||||
}
|
||||
|
||||
$photos = $album->photos()
|
||||
->where('is_analysed', false)
|
||||
->orderBy('created_at')
|
||||
|
@ -54,6 +54,40 @@ class PhotoController extends Controller
|
||||
|
||||
try
|
||||
{
|
||||
if (UserConfig::isImageProcessingQueueEnabled())
|
||||
{
|
||||
// Find the last record that is analysing this photo
|
||||
$photoQueueItem = QueueItem::where('photo_id', $photo->id)
|
||||
->orderBy('queued_at', 'desc')
|
||||
->limit(1)
|
||||
->first();
|
||||
|
||||
$timeToWait = 60;
|
||||
$timeWaited = 0;
|
||||
$continueToMonitor = true;
|
||||
|
||||
while ($continueToMonitor && $timeWaited < $timeToWait)
|
||||
{
|
||||
$continueToMonitor = is_null($photoQueueItem->completed_at);
|
||||
if ($continueToMonitor)
|
||||
{
|
||||
sleep(1);
|
||||
$timeWaited++;
|
||||
|
||||
$photoQueueItem = QueueItem::where('id', $photoQueueItem->id)->first();
|
||||
$continueToMonitor = is_null($photoQueueItem->completed_at);
|
||||
}
|
||||
}
|
||||
|
||||
$result['is_successful'] = !is_null($photoQueueItem->completed_at);
|
||||
if (!$result['is_successful'])
|
||||
{
|
||||
$result['message'] = 'Timed out waiting for queue processing.';
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* IF CHANGING THIS LOGIC, ALSO CHECK ProcessQueueCommand::processPhotoAnalyseMessage */
|
||||
$photoService = new PhotoService($photo);
|
||||
$photoService->analyse($queue_token);
|
||||
|
||||
@ -68,6 +102,7 @@ class PhotoController extends Controller
|
||||
|
||||
$result['is_successful'] = true;
|
||||
}
|
||||
}
|
||||
catch (\Exception $ex)
|
||||
{
|
||||
$result['is_successful'] = false;
|
||||
|
13
resources/systemd/blue-twilight-mq.service
Normal file
13
resources/systemd/blue-twilight-mq.service
Normal file
@ -0,0 +1,13 @@
|
||||
[Unit]
|
||||
Description=Blue Twilight Processing Queue Runner
|
||||
|
||||
[Service]
|
||||
WorkingDirectory=/data/www/blue-twilight.andysh.dev/
|
||||
ExecStart=/usr/bin/php artisan queue:process
|
||||
Restart=on-failure
|
||||
User=www-data
|
||||
Group=www-data
|
||||
Environment=USER=www-data HOME=/var/www
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
Loading…
Reference in New Issue
Block a user