Skip to content

Instantly share code, notes, and snippets.

@Medalink
Created April 6, 2018 18:55
Show Gist options
  • Save Medalink/04ab1a82f5700637c980d411218d76c4 to your computer and use it in GitHub Desktop.
Save Medalink/04ab1a82f5700637c980d411218d76c4 to your computer and use it in GitHub Desktop.
This job connects to a Google Cloud Datastore and pulls down results into a MySQL DB. The results in the datastore are updated to reflect they have been pulled. Duplicates are also marked as pulled via a version number.
<?php
namespace App\Jobs;
use App\Actblue;
use Bugsnag\BugsnagLaravel\Facades\Bugsnag;
use Google\Cloud\Core\ServiceBuilder;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\QueryException;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BatchProcessActBlues implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 1;
/**
* @var \Google\Cloud\Core\ServiceBuilder
*/
public $gcloud;
/**
* @var \Google\Cloud\Datastore\DatastoreClient
*/
public $datastore;
/**
* Unprocessed entries use -1.
*
* @var int
*/
public $versionToQuery = -1;
/**
* Processed results will be updated to this version
*
* @var int
*/
public $newVersion = 2;
/**
* Limit how many results we pull from our datastore at once
*
* @var int
*/
public $pageSize = 300;
/**
* Keep our total job run time under 60s
*
* @var int
*/
public $maxRunSeconds = 56;
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$start_time = microtime(true);
// Create our service
$this->gcloud = new ServiceBuilder([
'keyFilePath' => config('google.service.file'),
'projectId' => config('google.service.project'),
]);
// Connect to our datastore
$this->datastore = $this->gcloud->datastore();
// Query for all new donation records
$hasMoreResults = true;
$totalCount = 0;
do {
$query = $this->datastore->query()
->kind(config('google.service.datastore_kind'))
->filter('VERSION', '=', $this->versionToQuery)
->limit($this->pageSize);
// Push the records to MySQL
$results = $this->datastore->runQuery($query);
$updatedEntities = [];
$dataToInsert = [];
foreach ($results as $entity) {
$payload = $entity['payload'];
$receivedAtStr = $entity['receivedAt'];
// Construct a Laravel model from the payload
$actblue = $this->constructActBlue($payload, $receivedAtStr);
// Add to an array of data to be pushed to MySQL
array_push($dataToInsert, $actblue->toArray());
// Update the version number to indicate this record has
// been processed.
$entity['VERSION'] = $this->newVersion;
// Add the updated entity to an array of updates
// (updateBatch expects an array not an EntityIterator)
array_push($updatedEntities, $entity);
// Keep track of how many records have been processed
$totalCount++;
}
if (count($dataToInsert) > 0) {
try {
// Insert the records into MySQL
ActBlue::insert($dataToInsert);
// Update the datastore records to indicate they have been processed
$this->datastore->updateBatch($updatedEntities);
} catch (QueryException $batchException) {
Bugsnag::notifyException($batchException);
// The batch insert failed. Trying an individual insert.
for ($i = 0; $i < count($dataToInsert); $i++) {
$datum = $dataToInsert[$i];
try {
// Try to insert into MySQL
Actblue::insert($datum);
} catch (QueryException $singleInsertException) {
Bugsnag::notifyException($singleInsertException);
}
// Update the datastore, we do this because if the query fails it means it's a duplicate
// and we need to skip it the next time anyway.
$this->datastore->update($updatedEntities[$i]);
}
}
} else {
$hasMoreResults = false;
}
} while ($hasMoreResults && (microtime(true) - $start_time) <= $this->maxRunSeconds);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment