Created
April 6, 2018 18:55
-
-
Save Medalink/04ab1a82f5700637c980d411218d76c4 to your computer and use it in GitHub Desktop.
This job connects to a Google Cloud Datastore and pulls down results into a MySQL DB. The results in the datastore are updated to reflect they have been pulled. Duplicates are also marked as pulled via a version number.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Jobs; | |
use App\Actblue; | |
use Bugsnag\BugsnagLaravel\Facades\Bugsnag; | |
use Google\Cloud\Core\ServiceBuilder; | |
use Illuminate\Bus\Queueable; | |
use Illuminate\Contracts\Queue\ShouldQueue; | |
use Illuminate\Database\QueryException; | |
use Illuminate\Foundation\Bus\Dispatchable; | |
use Illuminate\Queue\InteractsWithQueue; | |
use Illuminate\Queue\SerializesModels; | |
class BatchProcessActBlues implements ShouldQueue | |
{ | |
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; | |
/** | |
* The number of times the job may be attempted. | |
* | |
* @var int | |
*/ | |
public $tries = 1; | |
/** | |
* @var \Google\Cloud\Core\ServiceBuilder | |
*/ | |
public $gcloud; | |
/** | |
* @var \Google\Cloud\Datastore\DatastoreClient | |
*/ | |
public $datastore; | |
/** | |
* Unprocessed entries use -1. | |
* | |
* @var int | |
*/ | |
public $versionToQuery = -1; | |
/** | |
* Processed results will be updated to this version | |
* | |
* @var int | |
*/ | |
public $newVersion = 2; | |
/** | |
* Limit how many results we pull from our datastore at once | |
* | |
* @var int | |
*/ | |
public $pageSize = 300; | |
/** | |
* Keep our total job run time under 60s | |
* | |
* @var int | |
*/ | |
public $maxRunSeconds = 56; | |
/** | |
* Execute the job. | |
* | |
* @return void | |
*/ | |
public function handle() | |
{ | |
$start_time = microtime(true); | |
// Create our service | |
$this->gcloud = new ServiceBuilder([ | |
'keyFilePath' => config('google.service.file'), | |
'projectId' => config('google.service.project'), | |
]); | |
// Connect to our datastore | |
$this->datastore = $this->gcloud->datastore(); | |
// Query for all new donation records | |
$hasMoreResults = true; | |
$totalCount = 0; | |
do { | |
$query = $this->datastore->query() | |
->kind(config('google.service.datastore_kind')) | |
->filter('VERSION', '=', $this->versionToQuery) | |
->limit($this->pageSize); | |
// Push the records to MySQL | |
$results = $this->datastore->runQuery($query); | |
$updatedEntities = []; | |
$dataToInsert = []; | |
foreach ($results as $entity) { | |
$payload = $entity['payload']; | |
$receivedAtStr = $entity['receivedAt']; | |
// Construct a Laravel model from the payload | |
$actblue = $this->constructActBlue($payload, $receivedAtStr); | |
// Add to an array of data to be pushed to MySQL | |
array_push($dataToInsert, $actblue->toArray()); | |
// Update the version number to indicate this record has | |
// been processed. | |
$entity['VERSION'] = $this->newVersion; | |
// Add the updated entity to an array of updates | |
// (updateBatch expects an array not an EntityIterator) | |
array_push($updatedEntities, $entity); | |
// Keep track of how many records have been processed | |
$totalCount++; | |
} | |
if (count($dataToInsert) > 0) { | |
try { | |
// Insert the records into MySQL | |
ActBlue::insert($dataToInsert); | |
// Update the datastore records to indicate they have been processed | |
$this->datastore->updateBatch($updatedEntities); | |
} catch (QueryException $batchException) { | |
Bugsnag::notifyException($batchException); | |
// The batch insert failed. Trying an individual insert. | |
for ($i = 0; $i < count($dataToInsert); $i++) { | |
$datum = $dataToInsert[$i]; | |
try { | |
// Try to insert into MySQL | |
Actblue::insert($datum); | |
} catch (QueryException $singleInsertException) { | |
Bugsnag::notifyException($singleInsertException); | |
} | |
// Update the datastore, we do this because if the query fails it means it's a duplicate | |
// and we need to skip it the next time anyway. | |
$this->datastore->update($updatedEntities[$i]); | |
} | |
} | |
} else { | |
$hasMoreResults = false; | |
} | |
} while ($hasMoreResults && (microtime(true) - $start_time) <= $this->maxRunSeconds); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment