Skip to content

Instantly share code, notes, and snippets.

@marcusedu
Created September 28, 2022 20:30
Show Gist options
  • Save marcusedu/ca657574a6ceb3d7b14a4c1cef4ae222 to your computer and use it in GitHub Desktop.
Save marcusedu/ca657574a6ceb3d7b14a4c1cef4ae222 to your computer and use it in GitHub Desktop.
<?php
namespace App\Console\Commands;
use Anik\Amqp\ConsumableMessage;
use Anik\Amqp\Exceptions\AmqpException;
use Anik\Amqp\Queues\Queue;
use Anik\Laravel\Amqp\Facades\Amqp;
use App\Models\Device;
use App\Models\DeviceVehicle;
use App\Models\Event;
use Closure;
use Illuminate\Console\Command;
use Illuminate\Database\Eloquent\Collection;
use Illuminate\Support\Facades\DB;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use function App\Helpers\Functions\packWkbPoint;
class ProcessNormalEvents extends Command
{
protected LoopInterface $eventLoop;
protected array $bufferMsgs = [];
protected Queue $normalQueue;
/**
* @throws AmqpException
*/
public function __construct()
{
$this->eventLoop = Loop::get();
$this->normalQueue = Queue::make([
'name' => 'normal',
])->setDeclare(true);
parent::__construct();
}
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'command:process-normal-events';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Processa eventos normais registrados no RabbitMQ';
/**
* Execute the console command.
*
* @return int
*/
public function handle(): int
{
$this->eventLoop->addTimer(0,
fn() => Amqp::consume(function (ConsumableMessage $message) {
if ($this->isInvalidMessage($message)) {
$message->ack();
return;
}
$this->bufferMsgs[] = $message;
}, queue: $this->normalQueue));
$p = $this->eventLoop->addPeriodicTimer(60, $this->processBufferedMessages());
var_dump($p);
$this->eventLoop->run();
return 0;
}
/**
* @param ConsumableMessage $message
* @return bool
*/
function isInvalidMessage(ConsumableMessage $message): bool
{
if (empty(trim($message->getMessageBody()))) {
return true;
}
$data = $message->decodeMessage();
return match (true) {
empty($data),
empty($data['imei']),
empty($data['lat']),
empty($data['lon']),
empty($data['date']),
empty($data['time']),
=> true,
default => false
};
}
/**
* Pega a lista de imeis unicos em [$events]
*
* @param array $events
* @return array
*/
public function getImeis(array $events): array
{
return array_unique(array_filter(
array_map(fn($e) => $e['imei'], $events),
fn($value) => strlen($value) == 15 || strlen($value) == 9
));
}
public function getDevices(array $imeis): Collection|array
{
return Device::query()->whereIn('imei', $imeis)->orWhereIn('imei_part', $imeis)->get();
}
public function getDeviceVehicles(Collection|array $devices): Collection|array
{
return DeviceVehicle::query()
->whereIn("device_id", $devices->pluck('id'))
->get();
}
public function bindDeviceEvent(Collection|array $devices, Collection|array $deviceVehicles, array &$events)
{
$imeiDevice = $this->mapDeviceImei($devices);
$deviceVehicle = $this->mapDeviceVehicle($deviceVehicles);
foreach ($events as &$event) {
$device = $imeiDevice[$event['imei']];
$event['device_id'] = $device->id;
$event['vehicle_id'] = $deviceVehicle[$device->id];
}
}
public function mapDeviceImei(Collection|array $devices): array
{
$return = [];
foreach ($devices as $device) {
$return[$device->imei] = $device;
$return[preg_replace("/\d{5}-?(\d{9})-?\d/", "$1", $device['imei'])] = $device;
}
return $return;
}
public function mapDeviceVehicle(Collection|array $deviceVehicles): array
{
$return = [];
foreach ($deviceVehicles as $deviceVehicle) {
$return[$deviceVehicle->device_id] = $deviceVehicle->vehicle_id;
}
return $return;
}
public function saveEvents(array $events): void
{
$eventsTable = (new Event())->getTable();
$lines = DB::table($eventsTable)->insert($events);
}
public function turnLatLonIntoLocation(array &$events)
{
foreach ($events as &$event) {
$event['location'] = packWkbPoint($event['lat'], $event['lon']);
unset($event['lat']);
unset($event['lon']);
}
}
private function processBufferedMessages(): Closure
{
return function () {
$msgs = $this->bufferMsgs;
$this->clearBuffer();
$events = $this->getEventsFromMessages($msgs);
$this->turnLatLonIntoLocation($events);
$imeis = $this->getImeis($events);
$devices = $this->getDevices($imeis);
$deviceVehicles = $this->getDeviceVehicles($devices);
$this->bindDeviceEvent($devices, $deviceVehicles, $events);
$this->saveEvents($events);
$this->ackMessages($msgs);
};
}
/**
* @return void
*/
function clearBuffer(): void
{
unset($this->bufferMsgs);
$this->bufferMsgs = [];
}
private function ackMessages(array $msgs)
{
foreach ($msgs as $msg) {
$msg->ack();
}
}
/**
* @param array $msgs
* @return array
*/
function getEventsFromMessages(array $msgs): array
{
return array_map(fn($msg) => $msg->decodeMessage(), $msgs);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment