Skip to content

Instantly share code, notes, and snippets.

@hovsep
Created March 26, 2019 15:51
Show Gist options
  • Save hovsep/b045238ac469abecc7d25676aaab2ab6 to your computer and use it in GitHub Desktop.
Save hovsep/b045238ac469abecc7d25676aaab2ab6 to your computer and use it in GitHub Desktop.
<?php
namespace App\Jobs\EmlCampaign;
use App\Entities\CampaignState;
use App\Entities\RecipientState;
use App\Exceptions\CampaignIsNotRunningException;
use App\Models\EmlCampaign;
use App\Utils\Campaign\Manager;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\QueryException;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
class Enqueue extends \App\Jobs\Job implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* @var EmlCampaign
*/
private $campaign = null;
public function __construct(EmlCampaign $campaign)
{
$this->campaign = $campaign;
}
public function handle()
{
try {
$this->refreshCampaign();
throw_unless($this->campaign->exists(), new \Exception('Campaign does not exist'));
throw_if(CampaignState::RUNNING != $this->campaign->getState(), new CampaignIsNotRunningException('Campaign is not running'));
$this->campaign->setState(CampaignState::RUNNING, 'Enqueuing');
$this->delete();//Remove job from queue, because worker may run longer than RMQ heartbeat
$processingQueues = config('queue.names.campaigns');
DB::table($this->campaign->recipientsList->getDataTableName())->orderBy('email')->chunk(config('triton.campaign_enqueue_chunk_size', 5000), function($recipients) use ($processingQueues) {
foreach ($processingQueues as $mtaId => $queueName) {
foreach ($recipients->where('mta_id', $mtaId)->chunk(config('triton.campaign_enqueue_batch_size', 100)) as $batch) {
Log::debug('Enqueuer: going to dispatch a batch of recipients', ['mta' => $mtaId, 'batch size' => count($batch)]);
dispatch((new Process($this->campaign, $batch))->onQueue($queueName));
Manager::incrementCampaignCounter($this->campaign, RecipientState::ENQUEUED, count($batch));
}
}
});
Log::info('Campaign enqueuing finished', ['campaign' => $this->campaign->id]);
$this->campaign->setState(CampaignState::RUNNING, 'Processing');
} catch (QueryException $qe) {
Log::warning('Failed to enqueue campaign. Query exception', [
'errorInfo' => $qe->errorInfo,
'reason' => $qe->getMessage(),
'previous' => optional($qe->getPrevious())->getMessage(),
'trace' => $qe->getTrace(),
'traceAsString' => $qe->getTraceAsString(),
'campaign' => $this->campaign->id
]);
} catch (CampaignIsNotRunningException $e) {
Log::warning('Campaign enqueuing terminated', ['reason' => $e->getMessage(), 'campaign' => $this->campaign->id]);
} catch (\Exception $e) {
Log::warning('Failed to enqueue campaign', ['reason' => $e->getMessage(), 'campaign' => $this->campaign->id]);
}
}
/**
* Sync model with db
*/
private function refreshCampaign()
{
$this->campaign = $this->campaign->fresh();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment