How to Receive MQTT Messages From ESP32 Sensors in C# .NET Core

In this video tutorial I demonstrate how to send MQTT messages from an ESP32 microcontroller and receive them in a C# .NET Core application. In this example I’ve used a PIR motion sensor, but it will work with any other type of sensor. MQTT works over wifi so it’s useful if you want to connect your ESP32 to a PC without using a physical cable.

The source code is below.

On the Arduino IDE side the ESP32 requires the standard Wifi libraries and the PubSubClient MQTT library. The C# application uses the MQTTnet Nuget packages.

The ESP32 code in this tutorial could easily be modified for the ESP8266. The ESP8266 is a really good choice for MQTT as code exists to turn it into the MQTT server.

A useful ESP32 pinout reference guide is here. If you want an ESP32 to wake up after being activated by a sensor then you need to connect it to a pin that supports RTC.

Related Videos

Basic introduction to client/server programming using MQTT: https://youtu.be/Igxi6wBQmIM
How to use and test PIR sensors with IoT devices: https://youtu.be/iWpooe98Itk
Sleep ESP32 and wake up again using PIR sensor: https://youtu.be/OcE85Hy8te8

Source Code – Arduino IDE for ESP32

//ESP32 wake up device using PIR sensor and send message to MQTT server
#include <WiFi.h>
#include <WiFiMulti.h>
WiFiMulti WiFiMulti;
 
#include <PubSubClient.h>
 
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";
const char* mqtt_server = "192.168.0.9"; //Change this to the IP address of your PC on your internal network (IPConfig will tell you this)
#define MOTION_SENSOR 32 //Must be a RTC GPIO
 
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE	(50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
 
RTC_DATA_ATTR int wakeupCounter = 0; //Counter used to configure the device the first time it is switched on
 
void setup_wifi() {
 
  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);
 
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);
 
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
 
  randomSeed(micros());
 
  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}
 
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
 
}
 
void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Create a random client ID
    String clientId = "ESP8266Client-";
    clientId += String(random(0xffff), HEX);
    // Attempt to connect
    if (client.connect(clientId.c_str())) {
      Serial.println("connected");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}
 
void setup() {
  pinMode(MOTION_SENSOR, INPUT);
  ++wakeupCounter; //Increment wakeupCounter
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, 1883);
  client.setCallback(callback);
 
  Serial.print("Wakeup counter = ");
  Serial.println(wakeupCounter);
 
  if (wakeupCounter > 1) {
    Serial.println("ESP32 has been woken up!");
    triggerAlert();
  } else {
    sleepDevice();
  }
 
}
 
void loop(){}
 
void triggerAlert() {
  Serial.println("Sending alert via MQTT...");
  if (!client.connected()) {
    reconnect();
  }
  //client.loop();
 
  unsigned long now = millis();
  if (now - lastMsg > 2000) {
    lastMsg = now;
    ++value;
    snprintf (msg, MSG_BUFFER_SIZE, "Sensor 1 Activated:#%ld", wakeupCounter);
    Serial.print("Publish message: ");
    Serial.println(msg);
    client.publish("securitySensors", msg);
  }
 
  Serial.println("Going back to sleep...");
  sleepDevice();
}
 
void sleepDevice() {
    Serial.println("ESP32 is now sleeping!");
    esp_sleep_enable_ext0_wakeup(GPIO_NUM_32, HIGH);
    esp_deep_sleep_start();
}

Source Code – C# for Visual Studio

Make a new C# .NET Core console app and add the following files:

In Program.cs, comment out anything already there and add the following. You might need to change the namespace to your existing namespace.

using MQTTnet.Samples.Server;
await Server_ASP_NET_Samples.Start_Server_With_WebSockets_Support();

Make a new class file called MQTTService.cs and add the following.

// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
// ReSharper disable UnusedType.Global
// ReSharper disable UnusedMember.Global
// ReSharper disable InconsistentNaming
// ReSharper disable EmptyConstructor
// ReSharper disable MemberCanBeMadeStatic.Local
 
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQTTnet.AspNetCore;
using MQTTnet.Server;
using MQTTnet.Extensions.ManagedClient;
using My_MQTT_Server;
 
namespace MQTTnet.Samples.Server;
 
public static class Server_ASP_NET_Samples
{
    public static Task Start_Server_With_WebSockets_Support()
    {
        /*
         * This sample starts a minimal ASP.NET Webserver including a hosted MQTT server.
         */
        var host = Host.CreateDefaultBuilder(Array.Empty<string>())
            .ConfigureWebHostDefaults(
                webBuilder =>
                {
                    webBuilder.UseKestrel(
                        o =>
                        {
                            // This will allow MQTT connections based on TCP port 1883.
                            o.ListenAnyIP(1883, l => l.UseMqtt());
 
                            // This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5000/mqtt"
                            // See code below for URI configuration.
                            o.ListenAnyIP(5000); // Default HTTP pipeline
                        });
 
                    webBuilder.UseStartup<Startup>();
                });
 
        return host.RunConsoleAsync();
    }
 
    sealed class MqttController
    {
        public MqttController()
        {
            // Inject other services via constructor.
        }
 
        /// <summary>
        /// This event is triggered when the client publishes a message to the server
        /// </summary>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        public async Task<Task> OnClientPublishAsync(InterceptingPublishEventArgs eventArgs)
        {
            string topic = eventArgs.ApplicationMessage.Topic;
            string message = eventArgs.ApplicationMessage.ConvertPayloadToString();
 
            //Console.Write("Client payload:" + message);
 
            bool result = await MessageProcessor.ProcessMessage(topic, message);
 
            return Task.CompletedTask;
        }
 
        /// <summary>
        /// This event is called before OnClientPublishAsync and could be a good place to do security checks etc.
        /// </summary>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        public Task ValidateConnection(ValidatingConnectionEventArgs eventArgs)
        {
            Console.WriteLine($"Client '{eventArgs.ClientId}' wants to connect. Accepting!");
            return Task.CompletedTask;
        }
 
    }
 
    sealed class Startup
    {
        public void Configure(IApplicationBuilder app, IWebHostEnvironment environment, MqttController mqttController)
        {
            app.UseRouting();
 
            app.UseEndpoints(
                endpoints =>
                {
                    endpoints.MapConnectionHandler<MqttConnectionHandler>(
                        "/mqtt",
                        httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
                            protocolList => protocolList.FirstOrDefault() ?? string.Empty);
                });
 
            app.UseMqttServer(
                server =>
                {
                    /*
                     * Attach event handlers etc. if required.
                     */
 
                    server.ValidatingConnectionAsync += mqttController.ValidateConnection;
                    server.InterceptingPublishAsync += mqttController.OnClientPublishAsync;
 
                    //The following events might also be useful...
                    //server.ClientConnectedAsync += mqttController.OnClientConnected;
                    //server.InterceptingClientEnqueueAsync += mqttController.OnClientSendMessage;
                    //server.ClientSubscribedTopicAsync += mqttController.OnClientSubscribedTopic;
 
                });
        }
 
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddHostedMqttServer(
                optionsBuilder =>
                {
                    optionsBuilder.WithDefaultEndpoint();
                });
 
            services.AddMqttConnectionHandler();
            services.AddConnections();
 
            services.AddSingleton<MqttController>();
        }
    }
}

Make a new class file called MessageProcessor.cs and add the following:

using System;
 
namespace My_MQTT_Server
{
    public static class MessageProcessor
    {
        /// <summary>
        /// Process the MQTT message. Any long running processes called should be awaitable
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public static async Task<bool> ProcessMessage(string topic, string message)
        {
            if (topic == "securitySensors")
            {
                Console.WriteLine($"ProcessMessage received message with topic: '{topic}' and message: '{message}'.");
                return true;
            } else
            {
                Console.WriteLine($"ProcessMessage received message with topic: '{topic}' but is unable to handle this topic.");
                return false;
            }
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *