Skip to content

Instantly share code, notes, and snippets.

@mauroa
Created June 29, 2017 12:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mauroa/10f092c1e46dd3ab3058bfad41cd03cd to your computer and use it in GitHub Desktop.
Save mauroa/10f092c1e46dd3ab3058bfad41cd03cd to your computer and use it in GitHub Desktop.
System.Net.Mqtt client sample
//Note that you can set more properties and also not set the port,
//in which case the MQTT default will be used
var configuration = new MqttConfiguration { Port = 55555 };
//Creation of the MQTT client
var client = await MqttClient.CreateAsync("192.168.1.29", configuration);
//MQTT connection of the client. You can pass optional args to the
//ConnectAsync method and credentials
await client.ConnectAsync(new MqttClientCredentials("testClient"));
//MQTT subscription to a topic. This only performs the protocol subscription,
//which means that at this point it can start receiving messages from the Broker to that topic
await client.SubscribeAsync("foo/bar/topic", MqttQualityOfService.AtLeastOnce);
//Rx Subscription to receive all the messages for the subscribed topics
client.MessageStream.Subscribe(msg =>
{
//All the messages from the Broker to any subscribed topic will get here
//The MessageStream is an Rx Observable, so you can filter the messages by topic with Linq to Rx
//The message object has Topic and Payload properties. The Payload is a byte[] that you need to deserialize
//depending on the type of the message
Console.WriteLine($"Message received in topic {msg.Topic}");
});
//Rx subscription sample to only receive the messages
//to a specific topic (using System.Reactive.Linq)
client
.MessageStream
.Where(msg => msg.Topic == "foo/bar/topic")
.Subscribe(msg => Console.WriteLine($"Message received in topic foo/bar/topic"));
//MQTT publish to a topic
//The message has a topic and the payload in byte[], which you are in charge of serializing from the original format
//The PublishAsync method has some optional args
var message = new MqttApplicationMessage("test/topic", Encoding.UTF8.GetBytes("Test String Message"));
await client.PublishAsync(message, MqttQualityOfService.AtLeastOnce);
//Method to unsubscribe a topic or many topics, which means that the message will no longer
//be received in the MessageStream anymore
await client.UnsubscribeAsync("foo/bar/topic");
//MQTT disconnection. Note that by now each client instance lifetime is from Connection to Disconnection
//You can't re use an instance or re connect once you disconnected. You will need to create another MqttClient instance
//This is currently reported as an issue and will be fixed for the next public version
await client.DisconnectAsync();
@melzoghbi
Copy link

Hey,
I am getting an error when creating a mqtt client object. Here is the line that throws an exception:

var mqttConfig = new MqttConfiguration { Port = clientOption.port };
 var clientOption = new {
                                 port = 1883,
                                 hostUrl = "mqtt://myqttServer.com"
           };         
 IMqttClient client = await MqttClient.CreateAsync(clientOption.hostUrl, mqttConfig);

Here is the error message:
Exception while executing function: IngestionCoreServices -> An error occurred while initializing a client -> Could not load file or assembly 'System.Net.Sockets, Version=4.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The system cannot find the file specified.

I am not able to add Sockets 4.1 because is not compatible with my azure function app that requires version 4.3. What would be the fix to be able to connect to MQTT server from an azure function.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment