java,mqtt
使用Java 使用org.fusesource.mqtt-client 组件做Mqtt开发
大家可以在gitee或者Maven库网站上面找到对应的的Maven 引用 或者 Gradle 引用
GItee 地址: 点击进入
Maven 库地址:点击进入
这两个上面 都 有相关的依赖引用
我这边用的javaFX,做的一个桌面应用,走的mqtt数据通信。
下面说一下 这个库的配置:
org.fusesource.mqtt-client 这个库里提供了三种调用库的方式:
- 使用BlockingAPI
该MQTT.connectBlocking方法建立连接并为您提供与阻塞API的连接
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
- 使用FutureAPI
该MQTT.connectFuture方法建立连接并为您提供与期货样式API的连接。所有针对连接的操作都是非阻塞的,并通过Future返回结果。
FutureConnection connection = mqtt.futureConnection();Future<Void> f1 = connection.connect();f1.await();Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});byte[] qoses = f2.await();// We can start future receive..Future<Message> receive = connection.receive();// send the message..Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);// Then the receive will get the message.Message message = receive.await();message.ack();Future<Void> f4 = connection.disconnect();f4.await();
上面两种 我不做细讲 感兴趣的朋友 可以去看一下 这篇文章
传送门 ==========================>
- 使用基于回调/继续传递的API
该MQTT.connectCallback方法建立一个连接并为您提供与回调风格API的连接。这是使用API风格最复杂的,但可以提供最佳性能。未来和阻塞API使用封面下的回调API。连接上的所有操作都是非阻塞的,操作的结果将传递给您实现的回调接口。
我这里就是用第三种方式,简单的解释一下,就是 在 调用 库相关的API 情况下都会有一个回调方法,在回调方法里面可以查询,一些相关数据信息,比如数据是否发送成功或者 是否连接上Mqtt服务器之类的信息 。
具体的一些功能说明的函数 大家可以看上面的传送门,里面对基本API都做了解释
- setClientId:用于设置会话的客户端ID。这是MQTT服务器用来识别setCleanSession(false);正在使用的会话的内容。该ID必须为23个字符或更少。默认为自动生成的ID(基于您的套接字地址,端口和时间戳)。
- setCleanSession:如果希望MQTT服务器在客户端会话中保留主题订阅和确认位置,则设置为false。默认为true。
- setKeepAlive:在几秒钟内配置Keep Alive计时器。定义从客户端收到的消息之间的最大时间间隔。它使服务器能够检测到到客户端的网络连接已经丢失,而无需等待较长的TCP / IP超时。
- setUserName :设置用于对服务器进行身份验证的用户名。
- setPassword :设置用于对服务器进行身份验证的密码。
- setWillTopic:如果设置,服务器将在客户端发生意外断开连接时将客户端的Will消息发布到指定主题。
- setWillMessage:将发送的意愿消息。默认为零长度的消息。
- setWillQos:设置用于Will消息的服务质量。默认为QoS.AT_MOST_ONCE。
- setWillRetain:如果您希望使用保留选项发布遗嘱,请设置为true。
- setVersion:设置为“3.1.1”以使用MQTT版本3.1.1。否则默认为3.1协议版本。
下面是给大家看一下Mqtt的配置调用 ,(我用来做笔记 方便自己查阅)
MQtt初始化:
private static MQTT mqttClient = null;private static CallbackConnection connection=null;/** Mqttbase初始化*/public static void MqttCreate(String TargetAddress) {try {//创建实例MQTT mqttClient= new MQTT();//创建客服端IDmqttClient.setClientId("设置客户端ID");mqttClient.setUserName("用户名"); //"BOD_PC_DVC"mqttClient.setHost("目标地址IP", "端口号");// 设置会话心跳时间 单位为秒// 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttClient.setKeepAlive((short) 60);//连接Mqtt时候 获取设备离线状态下 主机设置信息(设备上线的时候会重新获取到离线时候收到的消息) 默认为true//mqttClient.setCleanSession(false);//第一次尝试连接不上时 重新连接Mqtt服务器次数 -1为无限制mqttClient.setConnectAttemptsMax(-1);//如果客户端掉线尝试 连接次数 -1为无限制连接mqttClient.setReconnectAttemptsMax(-1); //mqttClient 设置遗嘱 掉线或者意外关闭时会调用遗嘱信息//设备非正常断开连接的时候调用 遗嘱:数据会 在遗嘱的主题里面发送对应内容mqttClient.setWillTopic("遗嘱主题");//设置遗嘱内容mqttClient.setWillMessage("遗嘱内容");//设置发送默认模式 mqttClient.setWillQos(QoS.AT_LEAST_ONCE);//设置指数避退 设置为1 为重新连接时间等待间距按照指数增加 默认为2mqttClient.setReconnectBackOffMultiplier(1);//设置重连接指数回归。设置为1则停用指数回归,默认为2//打印版本号//System.out.println(mqttClient.getVersion());//创建一个回调连接connection = mqttClient.callbackConnection();//设置监听回调connection.listener(new ExtendedListener() {//当前Mqtt客户端接收到 对应订阅主题时 会调用这个方法↓↓@Overridepublic void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Callback<Callback<Void>> callback) {callback.onSuccess(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//数据接收成功//暂时先打印出来System.out.println("主题:" + utf8Buffer.toString() + "数据:" + buffer.toString()); //这里处理接收 }@Overridepublic void onFailure(Throwable throwable) {//数据接收失败}});}//连接到远端MQTT服务器成功时调用此方法@Overridepublic void onConnected() {System.out.println("连接服务器成功");}//意外断开或者主动断开 都会调用此方法@Overridepublic void onDisconnected() {//connectionSystem.out.println("断开连接");//MqttConnect();}@Overridepublic void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable runnable) {}@Overridepublic void onFailure(Throwable throwable) {System.out.println("连接失败");}});MqttConnect(); } catch (URISyntaxException e) {e.printStackTrace();}}/*** Mqtt连接*/private static void MqttConnect() {connection.connect(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//公共部分上线和离线Topic[] topics = {new Topic("主题1", QoS.AT_LEAST_ONCE), //主题1订阅new Topic("主题2", QoS.AT_LEAST_ONCE), //主题2订阅};AddMqttSubscribe(topics);}@Overridepublic void onFailure(Throwable throwable) {System.out.println("连接失败");}});}
下面时MQTT 发送数据:
/*** Mqtt发送消息** @param SendTheme 发送的主题* @param SendValue 发送的数据值*/public static void MqttSendData(String SendTheme, String SendValue) {if (connection != null) {connection.publish(SendTheme, SendValue.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//测试用// System.out.println("发送成功");}@Overridepublic void onFailure(Throwable throwable) {//发送失败 保留等待 后面处理System.out.println("发送失败"+throwable);}});}}
下面是断开连接:
/*** MQTT 断开连接调用方法*/public static void MqttDisConnect() {if (connection != null) { connection.disconnect(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//MQTT断开连接成功}@Overridepublic void onFailure(Throwable throwable) {//MQTT 断开连接失败}});}}
订阅和取消订阅主题:
/***MQTT 取消主题订阅* @param topics 主题值* @return*/public static void MqttUnsubscribe(final UTF8Buffer[] topics) {if (connection != null) {connection.unsubscribe(topics, new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {System.out.println("取消订阅成功!");}@Overridepublic void onFailure(Throwable throwable) {}});}}public static void AddMqttSubscribe(final Topic[] topics){if (connection != null) {connection.subscribe(topics, new Callback<byte[]>() {@Overridepublic void onSuccess(byte[] bytes) {System.out.println("订阅成功!");}@Overridepublic void onFailure(Throwable throwable) {System.out.println("订阅失败");}});}}
以上就是基本的初始化 方法处理 各位有觉得不妥可以在下发评论区发表下。
注意事项:
下面是我这边在使用过程中有出现过的一些问题和一些需要注意的地方。
- 掉线这个 正常调用API离线和 非正常(强行杀死对应线程) 遗嘱调用,正常关闭的时候不会发布遗嘱主题消息,非正常离线状态才会调用。所以在做设备离线检查的时候需要注意下
- 在服务端配置MQTT服务器的时候,对应端口防火墙需要开放对应端口,之前调试的时候忽略了这个问题。找了好一段时间 。
- 还有在取消订阅 的方法中 传入参数需要注意下:
public void unsubscribe(final UTF8Buffer[] topics, Callback<Void> cb)
connection.unsubscribe 传入参数是 UTF8Buffer[]类型; 传值需要处理下。
java,mqtt
使用Java 使用org.fusesource.mqtt-client 组件做Mqtt开发
大家可以在gitee或者Maven库网站上面找到对应的的Maven 引用 或者 Gradle 引用
GItee 地址: 点击进入
Maven 库地址:点击进入
这两个上面 都 有相关的依赖引用
我这边用的javaFX,做的一个桌面应用,走的mqtt数据通信。
下面说一下 这个库的配置:
org.fusesource.mqtt-client 这个库里提供了三种调用库的方式:
- 使用BlockingAPI
该MQTT.connectBlocking方法建立连接并为您提供与阻塞API的连接
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
- 使用FutureAPI
该MQTT.connectFuture方法建立连接并为您提供与期货样式API的连接。所有针对连接的操作都是非阻塞的,并通过Future返回结果。
FutureConnection connection = mqtt.futureConnection();Future<Void> f1 = connection.connect();f1.await();Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});byte[] qoses = f2.await();// We can start future receive..Future<Message> receive = connection.receive();// send the message..Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);// Then the receive will get the message.Message message = receive.await();message.ack();Future<Void> f4 = connection.disconnect();f4.await();
上面两种 我不做细讲 感兴趣的朋友 可以去看一下 这篇文章
传送门 ==========================>
- 使用基于回调/继续传递的API
该MQTT.connectCallback方法建立一个连接并为您提供与回调风格API的连接。这是使用API风格最复杂的,但可以提供最佳性能。未来和阻塞API使用封面下的回调API。连接上的所有操作都是非阻塞的,操作的结果将传递给您实现的回调接口。
我这里就是用第三种方式,简单的解释一下,就是 在 调用 库相关的API 情况下都会有一个回调方法,在回调方法里面可以查询,一些相关数据信息,比如数据是否发送成功或者 是否连接上Mqtt服务器之类的信息 。
具体的一些功能说明的函数 大家可以看上面的传送门,里面对基本API都做了解释
- setClientId:用于设置会话的客户端ID。这是MQTT服务器用来识别setCleanSession(false);正在使用的会话的内容。该ID必须为23个字符或更少。默认为自动生成的ID(基于您的套接字地址,端口和时间戳)。
- setCleanSession:如果希望MQTT服务器在客户端会话中保留主题订阅和确认位置,则设置为false。默认为true。
- setKeepAlive:在几秒钟内配置Keep Alive计时器。定义从客户端收到的消息之间的最大时间间隔。它使服务器能够检测到到客户端的网络连接已经丢失,而无需等待较长的TCP / IP超时。
- setUserName :设置用于对服务器进行身份验证的用户名。
- setPassword :设置用于对服务器进行身份验证的密码。
- setWillTopic:如果设置,服务器将在客户端发生意外断开连接时将客户端的Will消息发布到指定主题。
- setWillMessage:将发送的意愿消息。默认为零长度的消息。
- setWillQos:设置用于Will消息的服务质量。默认为QoS.AT_MOST_ONCE。
- setWillRetain:如果您希望使用保留选项发布遗嘱,请设置为true。
- setVersion:设置为“3.1.1”以使用MQTT版本3.1.1。否则默认为3.1协议版本。
下面是给大家看一下Mqtt的配置调用 ,(我用来做笔记 方便自己查阅)
MQtt初始化:
private static MQTT mqttClient = null;private static CallbackConnection connection=null;/** Mqttbase初始化*/public static void MqttCreate(String TargetAddress) {try {//创建实例MQTT mqttClient= new MQTT();//创建客服端IDmqttClient.setClientId("设置客户端ID");mqttClient.setUserName("用户名"); //"BOD_PC_DVC"mqttClient.setHost("目标地址IP", "端口号");// 设置会话心跳时间 单位为秒// 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttClient.setKeepAlive((short) 60);//连接Mqtt时候 获取设备离线状态下 主机设置信息(设备上线的时候会重新获取到离线时候收到的消息) 默认为true//mqttClient.setCleanSession(false);//第一次尝试连接不上时 重新连接Mqtt服务器次数 -1为无限制mqttClient.setConnectAttemptsMax(-1);//如果客户端掉线尝试 连接次数 -1为无限制连接mqttClient.setReconnectAttemptsMax(-1); //mqttClient 设置遗嘱 掉线或者意外关闭时会调用遗嘱信息//设备非正常断开连接的时候调用 遗嘱:数据会 在遗嘱的主题里面发送对应内容mqttClient.setWillTopic("遗嘱主题");//设置遗嘱内容mqttClient.setWillMessage("遗嘱内容");//设置发送默认模式 mqttClient.setWillQos(QoS.AT_LEAST_ONCE);//设置指数避退 设置为1 为重新连接时间等待间距按照指数增加 默认为2mqttClient.setReconnectBackOffMultiplier(1);//设置重连接指数回归。设置为1则停用指数回归,默认为2//打印版本号//System.out.println(mqttClient.getVersion());//创建一个回调连接connection = mqttClient.callbackConnection();//设置监听回调connection.listener(new ExtendedListener() {//当前Mqtt客户端接收到 对应订阅主题时 会调用这个方法↓↓@Overridepublic void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Callback<Callback<Void>> callback) {callback.onSuccess(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//数据接收成功//暂时先打印出来System.out.println("主题:" + utf8Buffer.toString() + "数据:" + buffer.toString()); //这里处理接收 }@Overridepublic void onFailure(Throwable throwable) {//数据接收失败}});}//连接到远端MQTT服务器成功时调用此方法@Overridepublic void onConnected() {System.out.println("连接服务器成功");}//意外断开或者主动断开 都会调用此方法@Overridepublic void onDisconnected() {//connectionSystem.out.println("断开连接");//MqttConnect();}@Overridepublic void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable runnable) {}@Overridepublic void onFailure(Throwable throwable) {System.out.println("连接失败");}});MqttConnect(); } catch (URISyntaxException e) {e.printStackTrace();}}/*** Mqtt连接*/private static void MqttConnect() {connection.connect(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//公共部分上线和离线Topic[] topics = {new Topic("主题1", QoS.AT_LEAST_ONCE), //主题1订阅new Topic("主题2", QoS.AT_LEAST_ONCE), //主题2订阅};AddMqttSubscribe(topics);}@Overridepublic void onFailure(Throwable throwable) {System.out.println("连接失败");}});}
下面时MQTT 发送数据:
/*** Mqtt发送消息** @param SendTheme 发送的主题* @param SendValue 发送的数据值*/public static void MqttSendData(String SendTheme, String SendValue) {if (connection != null) {connection.publish(SendTheme, SendValue.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//测试用// System.out.println("发送成功");}@Overridepublic void onFailure(Throwable throwable) {//发送失败 保留等待 后面处理System.out.println("发送失败"+throwable);}});}}
下面是断开连接:
/*** MQTT 断开连接调用方法*/public static void MqttDisConnect() {if (connection != null) { connection.disconnect(new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {//MQTT断开连接成功}@Overridepublic void onFailure(Throwable throwable) {//MQTT 断开连接失败}});}}
订阅和取消订阅主题:
/***MQTT 取消主题订阅* @param topics 主题值* @return*/public static void MqttUnsubscribe(final UTF8Buffer[] topics) {if (connection != null) {connection.unsubscribe(topics, new Callback<Void>() {@Overridepublic void onSuccess(Void unused) {System.out.println("取消订阅成功!");}@Overridepublic void onFailure(Throwable throwable) {}});}}public static void AddMqttSubscribe(final Topic[] topics){if (connection != null) {connection.subscribe(topics, new Callback<byte[]>() {@Overridepublic void onSuccess(byte[] bytes) {System.out.println("订阅成功!");}@Overridepublic void onFailure(Throwable throwable) {System.out.println("订阅失败");}});}}
以上就是基本的初始化 方法处理 各位有觉得不妥可以在下发评论区发表下。
注意事项:
下面是我这边在使用过程中有出现过的一些问题和一些需要注意的地方。
- 掉线这个 正常调用API离线和 非正常(强行杀死对应线程) 遗嘱调用,正常关闭的时候不会发布遗嘱主题消息,非正常离线状态才会调用。所以在做设备离线检查的时候需要注意下
- 在服务端配置MQTT服务器的时候,对应端口防火墙需要开放对应端口,之前调试的时候忽略了这个问题。找了好一段时间 。
- 还有在取消订阅 的方法中 传入参数需要注意下:
public void unsubscribe(final UTF8Buffer[] topics, Callback<Void> cb)
connection.unsubscribe 传入参数是 UTF8Buffer[]类型; 传值需要处理下。