- 物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
- 源码(mqtt-simulator-sample02)
- 搭载 emqx 公共免费的broker
- 相关依赖
- 同步 vs 异步
- 基于MqttAsyncClient构建
- 部分源码
- 演示结果
物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
MqttAsyncClient 非阻塞、高吞吐MQTT 应用
源码(mqtt-simulator-sample02)
https://gitee.com/kcnf-iot/mqtt-simulator
搭载 emqx 公共免费的broker
https://www.emqx.com/zh/mqtt/public-mqtt5-broker
相关依赖
<!-- MQTT Client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency>同步 vs 异步
| 维度 | 同步 API(MqttClient) | 异步 API(MqttAsyncClient) |
|---|---|---|
| 阻塞性 | 阻塞等待执行完成 | 非阻塞,操作完回调通知 |
| 线程模型 | 当前线程等待 | 底层 IO 线程,不阻塞主线程 |
| 高并发性能 | 低,大量请求会阻塞 | 极高,适合高吞吐、多设备 |
| 适用场景 | 简单设备、单连接、低频次 | 网关、高吞吐、消息转发、服务端 |
| 核心回调 | 仅消息回调 | IMqttActionListener 监听所有操作结果 |
基于MqttAsyncClient构建
部分源码
package com.jysemel.iot; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; public class MqttAsyncGateway { private MqttAsyncClient upClient; // 上行:收设备消息 private MqttAsyncClient downClient;// 下行:转发消息 // ====================== 频率控制(每秒最多1条,可自己改) ====================== private static final long PUBLISH_INTERVAL = 2000; // 2秒一条 private long lastSendTime = 0; public static void main(String[] args) { new MqttAsyncGateway().start(); } public void start() { try { // 创建两个异步客户端 upClient = new MqttAsyncClient(MqttConst.UP_BROKER, MqttConst.UP_CLIENT_ID, new MemoryPersistence()); downClient = new MqttAsyncClient(MqttConst.DOWN_BROKER, MqttConst.DOWN_CLIENT_ID, new MemoryPersistence()); MqttConnectionOptions options = new MqttConnectionOptions(); options.setKeepAliveInterval(10); options.setCleanStart(true); // 连接上行 upClient.connect(options, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 上行连接成功"); subscribeDeviceTopic(); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 上行连接失败"); } }); // 连接下行 downClient.connect(options, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 下行连接成功"); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 下行连接失败"); } }); // 消息回调 upClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) { String content = new String(message.getPayload()).trim(); // ====================== 频率控制核心 ====================== long now = System.currentTimeMillis(); if (now - lastSendTime < PUBLISH_INTERVAL) { System.out.println("⏱ 频率限制,跳过转发:" + content); return; } lastSendTime = now; System.out.println("\n📥 收到:" + topic + " -> " + content); forwardToDownstream(content); // 转发 } @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {} @Override public void mqttErrorOccurred(MqttException exception) {} @Override public void deliveryComplete(IMqttToken token) {} @Override public void connectComplete(boolean reconnect, String serverURI) {} @Override public void authPacketArrived(int reasonCode, MqttProperties properties) {} }); } catch (Exception e) { e.printStackTrace(); } } // 订阅 private void subscribeDeviceTopic() { try { upClient.subscribe(MqttConst.DEVICE_TOPIC, 1, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 订阅成功:" + MqttConst.DEVICE_TOPIC); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 订阅失败"); } }); } catch (Exception e) { e.printStackTrace(); } } // 异步转发 private void forwardToDownstream(String content) { try { MqttMessage msg = new MqttMessage(content.getBytes()); msg.setQos(1); downClient.publish(MqttConst.FORWARD_TOPIC, msg, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("📤 转发完成:" + content); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 转发失败"); } }); } catch (Exception e) { e.printStackTrace(); } } }