Skip to content

Instantly share code, notes, and snippets.

@tpharaoh
Created December 12, 2020 13:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tpharaoh/00d77cc7bba9bdc46b9deb80e346d1f8 to your computer and use it in GitHub Desktop.
Save tpharaoh/00d77cc7bba9bdc46b9deb80e346d1f8 to your computer and use it in GitHub Desktop.
<?php
namespace App\Command;
use App\Entity\DeviceType;
use App\Message\DeviceFailedProvisionMessage;
use App\Message\DeviceProvisionedMessage;
use App\Message\SaveDeviceSettingMessage;
use App\Message\SaveTelemetryDataMessage;
use App\Repository\DeviceRepository;
use App\Repository\DeviceTypeRepository;
use App\Repository\InventoryRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use PhpMqtt\Client\MQTTClient;
use Symfony\Component\Messenger\MessageBusInterface;
class MqttLoopCommand extends Command
{
protected static $defaultName = 'app:mqttLoop';
/**
* @var DeviceTypeRepository
*/
private $deviceTypeRepository;
/**
* @var EntityManagerInterface
*/
private $em;
/**
* @var MessageBusInterface
*/
private $bus;
/**
* @var DeviceTypeRepository
*/
private $deviceRepository;
public function __construct(DeviceTypeRepository $deviceTypeRepository,EntityManagerInterface $em,MessageBusInterface $bus,DeviceRepository $deviceRepository)
{
$this->deviceTypeRepository=$deviceTypeRepository;
$this->em=$em;
$this->deviceRepository=$deviceRepository;
$this->bus=$bus;
parent::__construct();
}
protected function configure()
{
$this
->setDescription('Add a short description for your command')
->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description')
->addOption('option1', null, InputOption::VALUE_NONE, 'Option description')
;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$arg1 = $input->getArgument('arg1');
$client = new MQTTClient('127.0.0.1','1883');
$client->connect();
$client->subscribe('device/#', function ($topic, $message) use ($io) {
$mqttCommand=explode('/',$topic);
$telemetryValue=json_decode($message,true);
//provision command
if(in_array('provision',$mqttCommand)) {
if ($this->provisionDevice($telemetryValue)) {
$io->info($telemetryValue);
$this->bus->dispatch(new DeviceProvisionedMessage($telemetryValue['serial']));
$io->success('provisioned and send to messenger');
} else {
$this->bus->dispatch(new DeviceFailedProvisionMessage($telemetryValue['serial']));
$io->error('device exists');
}
}
if(in_array('telemetry',$mqttCommand)) {
$io->info('telemetry received');
$this->bus->dispatch(new SaveTelemetryDataMessage($mqttCommand,$message));
$io->info('telemetry in queue');
// $this->processTelemetry($mqttCommand,$message);
}
if(in_array('setting',$mqttCommand)) {
$io->info('settings');
$this->bus->dispatch(new SaveDeviceSettingMessage($mqttCommand,$message));
}
//end
$io->success('You have a new mqtt command! '.serialize($topic));
});
$client->loop(true);
// $io->success('You have a new command! Now make it your own! Pass --help to see your options.');
return Command::SUCCESS;
}
}
@tpharaoh
Copy link
Author

Symfony / MQTT command, works quite smoothly on a cluster as well

@tpharaoh
Copy link
Author

new DeviceProvisionedMessage < can also successfully publish data on mercure used on apiPlatform front end

@tpharaoh
Copy link
Author

for docker-compose:

  emqttd:
        image: emqx/emqx
        ports:
            - '1883:1883'
            # admin / public
            - '18083:18083'

@tpharaoh
Copy link
Author

composer require php-mqtt/client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment