61
Basic MQTT with C#
In this post, we're going to have a look on how to work with MQTT and Csharp. Creating a Broker and a Client that sends data to it.
But first...
According to the MQTT.org:
MQTT (short for Message Queuing Telemetry Transport) is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.
The project structure contains a .sln
file and two c# console projects, as it follows:
.
├── MQTTFirstLook.Broker
├── MQTTFirstLook.Client
└── MQTTFirstLook.sln
This small broker implementation has the responsibility to read data received from connected clients and display it on the screen.
For this, we're going to create a new MQTTServer
that listen to the port 707 on localhost.
Here is what is needed:
These are the libraries used by the server.
using System;
using System.Text;
using MQTTnet;
using MQTTnet.Server;
using Serilog;
Here's how we create a new MQTTServer. In this example, this code is inside the void Main(string[] args)
method.
// Create the options for our MQTT Broker
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
// port used will be 707
.WithDefaultEndpointPort(707)
// handler for new connections
.WithConnectionValidator(OnNewConnection)
// handler for new messages
.WithApplicationMessageInterceptor(OnNewMessage);
// creates a new mqtt server
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
Handlers are callbacks that MQTT calls whenever an action is called. In this example, we have a handler for new connections and for whenever the server gets a new message.
public static void OnNewConnection(MqttConnectionValidatorContext context)
{
Log.Logger.Information(
"New connection: ClientId = {clientId}, Endpoint = {endpoint}",
context.ClientId,
context.Endpoint);
}
public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
MessageCounter++;
Log.Logger.Information(
"MessageId: {MessageCounter} - TimeStamp: {TimeStamp} -- Message: ClientId = {clientId}, Topic = {topic}, Payload = {payload}, QoS = {qos}, Retain-Flag = {retainFlag}",
MessageCounter,
DateTime.Now,
context.ClientId,
context.ApplicationMessage?.Topic,
payload,
context.ApplicationMessage?.QualityOfServiceLevel,
context.ApplicationMessage?.Retain);
}
In our client, we're going to create a new MQTTClient
instance. This instance will connect to our Broker on localhost:707 and send messages to the topic Dev.to/topic/json
What is needed for this implementation is below:
These are the libraries used by the server.
using System;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using Newtonsoft.Json;
using Serilog;
Our MQTTClient will coonnect to our broker via TCP. And same as the server, this code will be inside void Main(string[] args)
method.
// Creates a new client
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId("Dev.To")
.WithTcpServer("localhost", 707);
// Create client options objects
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
// Creates the client object
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
// Set up handlers
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
_mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);
// Starts a connection with the Broker
_mqttClient.StartAsync(options).GetAwaiter().GetResult();
// Send a new message to the broker every second
while (true)
{
string json = JsonConvert.SerializeObject(new { message = "Heyo :)", sent= DateTimeOffset.UtcNow });
_mqttClient.PublishAsync("dev.to/topic/json", json);
Task.Delay(1000).GetAwaiter().GetResult();
}
The client Handlers are called when the client recieves a signal that is connection, connection failed and when is disconnected.
public static void OnConnected(MqttClientConnectedEventArgs obj)
{
Log.Logger.Information("Successfully connected.");
}
public static void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
{
Log.Logger.Warning("Couldn't connect to broker.");
}
public static void OnDisconnected(MqttClientDisconnectedEventArgs obj)
{
Log.Logger.Information("Successfully disconnected.");
}
You can find this project on Github
61