C#使用MQTT协议进行开发
写在前面
MQTT的介绍参照百度百科。
本文用到的所有工具和插件,笔者已打包上传到CSDN资源,嫌官网下载繁琐的可以自行去下载。
安装erlang和rabbitmq
1.安装erlang:网址https://www.erlang.org/downloads,建议版本OTP20.2。
2.安装rabbitmq:网址http://www.rabbitmq.com/download.html,建议版本RabbitMQ3.7.3。
3.配置环境变量(此电脑右键-属性-高级系统设置-环境变量),新建系统变量ERLANG_HOME,如图:
4.新建系统变量RABBITMQ_SERVER,如图:
5.双击系统变量里的Path变量,新建如下两个变量:
6.运行cmd,输入erl回车,看到版本号说明erlang安装成功。
7.运行cmd,先cd到RabbitMQ安装目录的sbin文件夹下,然后输入rabbitmq-plugins enable rabbitmq_management回车,如图:
然后输入rabbitmqctl status回车,无报错则说明安装成功。
安装mqttfx
网址http://www.jensd.de/apps/mqttfx/,建议版本1.7.1。
下载M2Mqtt
网址https://github.com/eclipse/paho.mqtt.m2mqtt,下载zip,解压后编译程序生成M2Mqtt.Net.dll动态链接库,然后把M2Mqtt.Net.dll拿到项目中去引用即可。
开始编程
新建WPF应用程序TestMQTT,在引用里添加M2Mqtt.Net.dll引用。
先做下界面,布局代码:
<Window x:Class="TestMQTT.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:local="clr-namespace:TestMQTT"
mc:Ignorable="d"
Title="MainWindow" Height="600" Width="800"
WindowStartupLocation="CenterScreen" Closed="Window_Closed">
<Grid>
<StackPanel Orientation="Vertical" HorizontalAlignment="Center" VerticalAlignment="Center">
<StackPanel Orientation="Horizontal">
<Label FontSize="16" HorizontalContentAlignment="Right" VerticalContentAlignment="Center">代理地址</Label>
<TextBox x:Name="txt_BrokerAddress" Margin="5,0,0,0" FontSize="16"
Width="120" Height="36" VerticalContentAlignment="Center">localhost</TextBox>
<Label Margin="10,0,0,0" FontSize="16" HorizontalContentAlignment="Right" VerticalContentAlignment="Center">代理端口</Label>
<TextBox x:Name="txt_BrokerPort" Margin="5,0,0,0" FontSize="16"
Width="60" Height="36" VerticalContentAlignment="Center">1883</TextBox>
<Button x:Name="btn_Connect" Margin="10,0,0,0" FontSize="16"
Width="60" Height="30" Click="btn_Connect_Click">连接</Button>
<Button x:Name="btn_DisConnect" Margin="10,0,0,0" FontSize="16"
Width="60" Height="30" Click="btn_DisConnect_Click">断开</Button>
</StackPanel>
<StackPanel Margin="0,10,0,0" Orientation="Horizontal">
<Label FontSize="16" HorizontalContentAlignment="Right" VerticalContentAlignment="Center">发布的主题</Label>
<TextBox x:Name="txt_PublishTopic" Margin="5,0,0,0" FontSize="16"
Width="120" Height="36" VerticalContentAlignment="Center"></TextBox>
<Label Margin="10,0,0,0" FontSize="16" HorizontalContentAlignment="Right" VerticalContentAlignment="Center">发布的消息</Label>
<TextBox x:Name="txt_PublishMessage" Margin="5,0,0,0" FontSize="16"
Width="120" Height="36" VerticalContentAlignment="Center"></TextBox>
<Button x:Name="btn_Publish" Margin="10,0,0,0" FontSize="16"
Width="60" Height="30" Click="btn_Publish_Click">发布</Button>
</StackPanel>
<StackPanel Margin="0,10,0,0" Orientation="Horizontal">
<Label FontSize="16" HorizontalContentAlignment="Right" VerticalContentAlignment="Center">订阅的主题</Label>
<TextBox x:Name="txt_SubscribeTopic" Margin="5,0,0,0" FontSize="16"
Width="120" Height="36" VerticalContentAlignment="Center"></TextBox>
<Button x:Name="btn_Subscribe" Margin="10,0,0,0" FontSize="16"
Width="60" Height="30" Click="btn_Subscribe_Click">订阅</Button>
<Button x:Name="btn_UnSubscribe" Margin="10,0,0,0" FontSize="16"
Width="80" Height="30" Click="btn_UnSubscribe_Click">取消订阅</Button>
</StackPanel>
<StackPanel Margin="0,10,0,0" Orientation="Vertical">
<Label FontSize="16" HorizontalContentAlignment="Center" VerticalContentAlignment="Center">接收的消息</Label>
<TextBox x:Name="txt_SubscribeMessage" Margin="5,0,0,0" FontSize="16"
Width="460" Height="360" VerticalContentAlignment="Top"
TextWrapping="NoWrap" HorizontalScrollBarVisibility="Auto" VerticalScrollBarVisibility="Auto"></TextBox>
</StackPanel>
</StackPanel>
</Grid>
</Window>
接下来完成后台代码:
using System;
using System.Text;
using System.Windows;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace TestMQTT
{
/// <summary>
/// MainWindow.xaml 的交互逻辑
/// </summary>
public partial class MainWindow : Window
{
private MqttClient client = null;
public MainWindow()
{
InitializeComponent();
}
/// <summary>
/// 发布消息成功后调用的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void client_MqttMsgPublished(object sender, MqttMsgPublishedEventArgs e)
{
MessageBox.Show("发布消息成功。", "信息", MessageBoxButton.OK, MessageBoxImage.Information);
}
/// <summary>
/// 订阅主题成功后调用的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void client_MqttMsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
{
MessageBox.Show("订阅主题成功。", "信息", MessageBoxButton.OK, MessageBoxImage.Information);
}
/// <summary>
/// 取消订阅主题成功后调用的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void client_MqttMsgUnsubscribed(object sender, MqttMsgUnsubscribedEventArgs e)
{
MessageBox.Show("取消订阅主题成功。", "信息", MessageBoxButton.OK, MessageBoxImage.Information);
}
/// <summary>
/// 当接收到订阅主题的消息时调用的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
this.Dispatcher.Invoke(new Action(() =>
{
txt_SubscribeMessage.Text += DateTime.Now.ToString("yyyy/MM/dd-HH:mm:ss") + " Received topic: " + e.Topic + ", message: " + Encoding.UTF8.GetString(e.Message) + "\n";
}));
}
/// <summary>
/// 当连接断开时调用的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void client_ConnectionClosed(object sender, EventArgs e)
{
MessageBox.Show("连接已断开。", "信息", MessageBoxButton.OK, MessageBoxImage.Information);
}
/// <summary>
/// 连接
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btn_Connect_Click(object sender, RoutedEventArgs e)
{
try
{
string brokerAddress = txt_BrokerAddress.Text.ToString();
int brokerPort = Int32.Parse(txt_BrokerPort.Text);
client = new MqttClient(brokerAddress, brokerPort, false, null, null, MqttSslProtocols.None, null); //实例化mqtt客户端
string clientId = Guid.NewGuid().ToString("N"); //生成32位全球唯一标识符
client.Connect(clientId); //建立客户端连接
client.MqttMsgPublished += client_MqttMsgPublished; //发布消息成功后调用
client.MqttMsgSubscribed += client_MqttMsgSubscribed; //订阅主题成功后调用
client.MqttMsgUnsubscribed += client_MqttMsgUnsubscribed; //取消订阅主题成功后调用
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived; //当接收到订阅主题的消息时调用
client.ConnectionClosed += client_ConnectionClosed; //连接断开后调用
MessageBox.Show("连接成功。", "信息", MessageBoxButton.OK, MessageBoxImage.Information);
}
catch (Exception ex)
{
MessageBox.Show("连接失败!\n\r" + ex.Message, "警告", MessageBoxButton.OK, MessageBoxImage.Warning);
}
}
/// <summary>
/// 断开连接
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btn_DisConnect_Click(object sender, RoutedEventArgs e)
{
client.Disconnect();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btn_Publish_Click(object sender, RoutedEventArgs e)
{
string publishTopic = txt_PublishTopic.Text.ToString(); //发布的主题
string publishMessage = txt_PublishMessage.Text.ToString(); //发布的消息
client.Publish(publishTopic, Encoding.UTF8.GetBytes(publishMessage)); //发布消息
}
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btn_Subscribe_Click(object sender, RoutedEventArgs e)
{
string[] subscribeTopic = txt_SubscribeTopic.Text.ToString().Split(','); //可以同时订阅多个主题的消息,各主题之间用逗号分隔
byte[] qosLevels = new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE };
client.Subscribe(subscribeTopic, qosLevels);
}
/// <summary>
/// 取消订阅消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btn_UnSubscribe_Click(object sender, RoutedEventArgs e)
{
string[] unSubscribeTopic = txt_SubscribeTopic.Text.ToString().Split(',');
client.Unsubscribe(unSubscribeTopic);
}
/// <summary>
/// 窗口关闭事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void Window_Closed(object sender, EventArgs e)
{
Environment.Exit(0); //强制退出,即使有其他的线程没有结束,因为M2Mqtt调用事件都是在单独的线程中进行的
}
}
}
代码比较简单,相应地方也有注释,就不多说了,这里说明一下窗口关闭事件里Environment.Exit(0)的作用,由于M2Mqtt调用事件都是在单独的线程中进行的,可能我们在程序主界面关闭时并没有关掉某个线程,所以这里用这行代码强制关闭。
开始测试
打开MQTT.fx工具,点击设置按钮,做如下配置:
其中Client ID随便生成一个即可,完成后回到主界面点击Connect,右上角出现绿色灯即说明连接成功。
点击Subscribe,输入主题home,然后订阅即点击输入框右边的Subscribe按钮,运行程序,连接成功后,发布主题home发布消息hello,然后点击发布按钮,如图:
回到MQTT.fx工具,若成功则能看到收到的hello消息。
接下来测试订阅。
在程序界面上输入订阅主题home/topic,然后点击订阅按钮,MQTT.fx工具点击Publish,输入主题home/topic,然后在下边文本处输入发布的消息,比如happy,然后发布即点击Publish按钮。
回到程序,若看到MQTT.fx发来的消息则成功。
创建远程用户
实际应用中,我们可能需要利用MQTT协议远程访问工控机做交互,此时我们就需要在工控机上创建远程用户。此处以创建一个admin用户密码为123举例。
1.在开始菜单打开RabbitMQ Command Prompt工具,如图:
2.运行命令rabbitmqctl add_user admin 123。
3.运行命令rabbitmqctl set_user_tags admin administrator。
4.运行命令rabbitmqctl set_permissions -p “/” admin “." ".” “.*”。
5.运行命令rabbitmqctl list_users,此时若能看到新创建的admin用户则证明创建成功。
6.在需要访问工控机的远程电脑上的MQTT.fx工具设置里把该用户名和密码设置上即可。