求大佬 百度云指教,已知一个云平台的IP、端口和mqtt协议涉及的用户名和密码(

MQTT,是:
轻量级的消息订阅和发布(publish/subscribe)协议
建立在TCP/IP协议之上
IoT,internet of things,物联网,MQTT在这方面应用较多。
官方网站:
MQTT协议是针对如下情况设计的:
M2M(Machine to Machine) communication,机器端到端通信,比如传感器之间的数据通讯
因为是Machine to Machine,需要考虑:
Machine,或者叫设备,比如温度传感器,硬件能力很弱,协议要考虑尽量小的资源消耗,比如计算能力和存储等
M2M可能是无线连接,网络不稳定,带宽也比较小
MQTT协议的架构,用一个示例说明。比如有1个温度传感器(1个Machine),2个小的显示屏(2个Machine),显示屏要显示温度传感器的温度值。
可通过查阅详细规范的细节。
显示器需要先通过MQTT协议subscribe(订阅)一个比如叫temperature的topic(主题):
当温度传感器publish(发布)温度数据,显示器就可以收到了:
注:以上两张图,取自
协议里还有2个主要的角色:
client,客户端
broker,服务器端
它们是通过TCP/IP协议连接的。
因为MQTT是协议,所以不能拿来直接用的,就好比HTTP协议一样。需要找实现这个协议的库或者服务器来运行。
这里是官方的。
我服务器端使用nodejs开发,因此选择了:
:MQTT协议的底层实现库,服务器端很简易,需要自己编写代码才可使用
:在MQTT.js基础上完善的服务器端
MQTT.js最基本使用安装是很简单的:
npm install mqtt
MQTT.js实现的服务器端代码如下:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566var mqtt = require('mqtt');var subscribeTopics={};var server = mqtt.createServer(function(client) {
client.on('connect', function(packet) {
client.connack({returnCode: 0});
client.on('publish', function(packet) {
var topic=packet.
var payload=packet.
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
}else{
for(var i in subscribeTopics[topic]){
var client=subscribeTopics[topic][i];
client.publish({
topic: topic,
payload: payload
client.on('subscribe', function(packet) {
var topic=packet.subscriptions[0].
if(subscribeTopics[topic]==null){
subscribeTopics[topic]=[];
if(subscribeTopics[topic].indexOf(client)==-1){
subscribeTopics[topic].push(client); }
client.on('pingreq', function(packet) {
client.pingresp();
client.on('disconnect', function(packet) {
for (var topic in subscribeTopics){
var index=subscribeTopics[topic].indexOf(client);
if(index&-1){
subscribeTopics[topic].splice(index,1);
});});server.listen(1883);
这是一个最基本的服务器端,消息的存储和查询都需要自己编程处理。
比如你如果需要用redis保存和触发数据,可参考这篇中文文章:。
MQTT.js实现的客户端代码:
123456789101112var mqtt = require('mqtt');client = mqtt.createClient(1883, 'localhost');client.subscribe('testMessage');client.publish('testMessage', '发布测试信息');client.on('message', function (topic, message)
console.log(message);
client.end();});
写的很简易,订阅了主题,然后向相同主题发布消息,接收到消息后client停止。
使用MoscaMQTT.js只是实现了最基础的MQTT协议部分,对于服务器端的处理需要自己完成。
有关MQTT.js是否实现了MQTT server,详细的说明,可参见
正好,Mosca在MQTT基础上实现了这些,它可以:
作为独立运行的MQTT服务器运行
集成到nodejs程序里使用
安装很简单:
npm install mosca bunyan -g
作为独立服务器运行运行:
mosca -v | bunyan
然后,还可以用我上文的客户端代码运行测试。
集成在自己程序中使用我考虑的后端持久化,是用MongoDB。Mosca另外几个选项:
Redis,缺点是更注重作为缓存,而不适合可靠持久化
LevelUp,头一次听说,不打算做技术准备了,是用nodejs的包装起来的LevelDB
Memory,使用内存,估计默认的就是这个,不适合我使用的情况
首先要安装mosca的库:
npm install mosca
然后,在本机将mongodb运行起来,应该就可以执行下面的代码了:
123456789101112131415161718192021222324var mosca = require('mosca')var settings = {
port: 1883,
backend:{
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'ascoltatori',
mongo: {}
persistence:{
factory: mosca.persistence.Mongo,
url: "mongodb://localhost:27017/mosca"
}};var server = new mosca.Server(settings);server.on('ready', function(){ console.log('Mosca server is up and running'); });server.on('published', function(packet, client) {
console.log('Published', packet.payload);});
直接运行作者文档中的代码会在多次运行客户端后出现错误,我是参考了他2天前加上的。
作者生活在意大利的博洛尼亚,写代码很勤奋,这个项目更新很快,是不是说明这个方向(mqtt)很活跃呢?
作者也写了个幻灯片,
MQTT高级问题keepalive和PING从这篇文章:
心跳时间(Keep Alive timer)
以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。
下面的代码中我设置的是10秒:
1234567891011var mqtt = require('mqtt');var settings = {
keepalive: 10,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'client-b',
clean: false}client = mqtt.createClient(1883, 'localhost',settings);
可以使用MQTT.js编写简单的服务器代码,观察到服务器端接收到PING请求,并发回PING响应:
1234client.on('pingreq', function(packet) {
client.pingresp();
console.log('pingreq & resp');});
完整代码上面已经贴过,另见
QoSQoS在MQTT中有(摘自):
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
MQTT.js只是支持了MQTT协议,并没有支持QoS,也就是说,只支持最低级别的“至多一次”(QoS0)。
Mosca支持QoS0和1,但不支持2,见
接收离线消息我在应用中的一个主要场景是,使用MQTT.js+Mosca做聊天服务器。
默认Mosca是不支持离线消息的,表现的现象是,如果是有人(client-a)先在主题上发布了消息:
12345678910111213141516171819var mqtt = require('mqtt');var settings = {
keepalive: 10,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'client-a'}client = mqtt.createClient(1883, 'localhost',settings);client.publish('testMessage', '发布new测试信息0',{qos:1,retain: true});client.publish('testMessage', '发布new测试信息1',{qos:1,retain: true});client.publish('testMessage', '发布new测试信息2',{qos:1,retain: true});client.publish('testMessage', '发布new测试信息3',{qos:1,retain: true});setTimeout(function(){
client.end();},1000);
那么另外一个人(client-b),随后订阅,仅能看到最后一条消息:
123456789101112131415161718var mqtt = require('mqtt');var settings = {
keepalive: 10,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'client-b'}client = mqtt.createClient(1883, 'localhost',settings);client.subscribe('testMessage',{qos:1},function(){
console.log('subscribe ok.');});client.on("message", function(topic, payload) {
console.log('message: '+payload);});
运行结果类似这样:
subscribe ok.
message: 发布new测试信息3
离线消息,需要以下几点:
客户端订阅设置QoS=1
客户端连接属性clean: false,作用是断开连接重连的时候服务器端帮助恢复session,不需要再次订阅
用代码说明以下,先运行这段代码:
12345678910111213141516var mqtt = require('mqtt');var settings = {
keepalive: 10,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'client-b',
clean: false}client = mqtt.createClient(1883, 'localhost',settings);client.subscribe('testMessage',{qos:1},function(){
console.log('subscribe ok.');
client.end();});
然后执行刚才发布多条消息的代码。再执行下面的代码:
123456789101112131415var mqtt = require('mqtt');var settings = {
keepalive: 10,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: 'client-b',
clean: false}client = mqtt.createClient(1883, 'localhost',settings);client.on("message", function(topic, payload) {
console.log('message: '+payload);});
运行结果类似这样:
message: 发布new测试信息1
message: 发布new测试信息3
message: 发布new测试信息2
message: 发布new测试信息0
收到消息的顺序是乱的,为什么会这样,其实很好理解,为了小型受限设备以及网络不稳定的情况,消息是不好保证顺序的。
解决办法是发送的消息带时间戳,接收后再做排序。
另外,担心客户端没有做client.end()而非正常退出,那么再次连接是否能恢复session,测试了一下,注释client.end(),没有问题,正常收到多条离线消息。
SSL连接Mosca支持SSL连接,可根据创建公钥私钥。
然后类似这样启动:
1234567891011121314151617181920212223var mosca = require('mosca')var SECURE_KEY = __dirname + '/../../test/secure/tls-key.pem';var SECURE_CERT = __dirname + '/../../test/secure/tls-cert.pem';var settings = {
port: 8443,
logger: {
name: "secureExample",
level: 40,
secure : {
keyPath: SECURE_KEY,
certPath: SECURE_CERT,
}};var server = new mosca.Server(settings);server.on('ready', setup);function setup() {
console.log('Mosca server is up and running')}
这部分我没有测试,直接转自。
认证和授权在提供了个简易的命令行,可创建账号用于认证并授权。
但是它不适合我的需求场景,我需要自己编写认证和授权的逻辑。
虽然在作者官方网站上未找到,但在问题管理记录中提交了这方面的支持:。
有下面两条支持,应该可以写出自己的回调,并集成到Mosca中:
add a callback to authorize a publish.
add a callback to authorize a subscribe.
不过这块没有写代码,只是大致能确定。
性能问题MQTT.js并不是完整解决方案,不需要考虑它的性能问题。
说一下Mosca,有一个这方面问题作者的答复,,问问题的还是个中国人,我前面还引用了他的文章。作者基本意思是:
It basically depends on the RAM. On an AWS large instance it can reach
10k concurrent connections, with roughly 10k messages/second.15220人阅读
消息推送(25)
& & & Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多种协议。
& & & Apollo允许客户端通过开放的MQTT协议连接。该协议主要是用在资源有限的驱动上,以及网络不稳定的情况下使用,是一个订阅、发布模型。这种驱动通常不适用类似http,stomp这类基于文本,或者类似openfire,AMQP等传统二进制协议。MQTT是一个简介的二进制协议,适用这类驱动资源受限,而且是不稳定的网络条件下。之前的稳定发布版本中,MQTT是作为一个Apollo的一个插件提供的。但是现在,这个插件已经变为开发项目的一部分。MQTT在Apollo中已经不需要其他配置文件或者是第三方插件支持了。
MQTT是一个线路层的协议,任何实现该协议的客户端都可以连接到Apollo。当然也可以整合其他MQTT兼容的消息代理中。
更多有关MQTT协议内容,参考the MQTT Specification
MQTT协议配置
为了开始使用MQTT协议,首先使用MQTT3.1协议的客户端,连接到Apollo正在监听端口。Apollo会做协议检测,而且自动识别MQTT连接,而且将连接作为MQTT协议处理。你不必要为MQTT协议打开一个端口(STomp,Openfire,AMQP等都是自动识别)。如果你一定指定连接的协议,有下面两种方式:你可以选择不用协议识别,而是为MQTT指定连接:
&connector id=&tcp& bind=&tcp://0.0.0.0:61613& protocol=&mqtt&/&
或者你可以限制哪种协议可以被自动识别。通过下面的&detece&配置方式:
&connector id=&tcp& bind=&tcp://0.0.0.0:61613&&
& &detect protocols=&mqtt openwire& /&
&/connector&
&detect& 下protocols 对应的参数通过空格来隔开支持的通信协议。如果只支持一种协议,就不要空格,默认情况下对任何协议生效。
如果你想调整MQTT默认设置,在apollo.xml文件中有一个&connector& 元素,通过MQTT参数配置:
&connector id=&tcp& bind=&tcp://0.0.0.0:61613&&
& &mqtt max_message_length=&1000& /&
&/connector&
MQTT元素支持下面几个参数:
max_message_length : The size (in bytes) of the largest message that can be sent to the broker. Defaults to 100MB(broker能接受的最大消息量:默认是100M)
protocol_filters : A filter which can filter frames being sent/received to and from a client. It can modify the frame or even drop it.(一个控制发送和接收,Client的过滤器框架。可以修改,删除这个框架)
die_delay : How long after a connection is deemed to be “dead” before the connec default: 5000ms(在实际断开连接之前,会有默认5000ms的时间被认为连接已经dead)
mqtt 配置元素也可以用来控制目的消息头的解析。下面是支持的参数:
queue_prefix : a tag used to identi default: null(用来确认目的地类型)
path_separator : used to separate segments i default: /(用来分割目的地名称)
any_child_wildcard : indicate all child-level destinations tha default: +(识别子目录)
any_descendant_wildcard : indicate destinations that match the
default:#(目标地址通配符)
regex_wildcard_start : pattern used to identify the start of a regex(表示正则表达开始)
regex_wildcard_end : pattern used to identify the end of a regex(表示正则表达结束)
part_pattern : allows you to specify a regex that constrains the naming of topics. (你可以指定正则表达规则)default: [ a-zA-Z0-9\_\-\%\~\:\(\)]+
&Client 可用函数库
Apollo 支持MQTT3.1 协议,下面是可用的Clients:
Java : mqtt-client, MeQanTT
C : libmosquitto
Erlang : erlmqtt, my-mqtt4erl
.NET : MQTTDotNet, nMQTT
Perl : net-mqtt-perl, [anyevent-mqtt-perl]/beanz/anyevent-mqtt-perl()
Python : nyamuk
Ruby : mqtt-ruby, ruby-em-mqtt
Javascript : Node.js MQTT Client
Delphi : TMQTTCLient
Device specific: Arduino, mbed, Nanode, Netduino
如果要找到新支持的Clients ,可以检索:the MQTT website for its software
在目录example 目录下,你可以找到一些例子,实现了与broker之间收发。
&connecting
为了确保broker配置文件的安全,所以只允许一个admin 用户连接,默认的用户名和密码是:admin ,password.
Mqtt 客户端不能specify 虚拟主机(更多请看:see the section on Virtual Hosts in the user guide),以至于默认情况下虚拟主机已经被使用了。通常第一虚拟主机定义在apollo.xml文件中。
&Destination 类型
MQTT协议是订阅,发布协议,是不允许真正的利用队列点对点的消息收发。因此Apollo仅允许利用主题,还进行MQTT消息发送。订阅的概念和持久的主题订阅 和其他协议提到的有些类似,同时也被MQTT CONNECT 框架的clean session属性控制。
&Clean Sessions
但一个Client 发送一个连接,这个连接中clean session 被设置为false,那么之前连接中有相同Client_id 的session 将会被重复使用。这就意味着Client断开了,订阅依然能收到消息。这就等同与同Apollo建立一个长订阅。
如果 clean session 设置为true ,那么新session就开始了,其他的session会慢慢消失,删除。这就是Apollo中定义的普通的主题订阅。
&Topic Retained Messages
如果消息被发布的同时retain 标记被设置,消息将被主题记住,以至于新的订阅到达,最近的retain 消息会被发送到订阅者。比如说:你想发布一个参数,而且你想让最新的这个参数发布到总是可用的订阅了这个主题的客户端上,你就设置在PUBLISH 框架上设置retain 标签。
注意:retained 消息 不会被设置成retained 在 QoS设置为零的broker 重启过程中。
Last Will and Testament Message
当Client第一次连接的时候,有一个will 消息和一个更QoS相关的消息会跟你有关。will消息是一个基础消息,这个基础消息只有在连接异常或者是掉线的时候才会被发送。一般用在你有一个设备,当他们掉了的时候,你需要知道。所以如果一个医疗Client从broker掉线,will消息将会作为一个闹钟主题发送,而且会被系统作为高优先级提醒。
Reliable Messaging
MQTT协议允许Client 发布消息的时候指定Qos参数:
At Most Once (QoS=0)
At Least Once (QoS=1)
Exactly Once (QoS=2)
这个设置时推送消息给Client,可靠性最低的一种。如果设置Qos=0,那broker就不会返回结果码,告诉你他收到消息了,也不会在失败后尝试重发。这有点像不可靠消息,如JMS。
该设置会确保消息会被至少一次推送到Client。如果推送设置为至少推送一次,Apollo会返回一个回调函数,确保代理已经收到消息,而且确保会确保推送该消息。如果Client 将发布了一个Qos=1的消息,如果在指定的时间内没有收到回复,Client会希望重新发布这个消息。所以可能存在这种情况:代理收到一个需要推送的消息,然后又收到一个消息推送到同一个Client。所以如果传输过程中PUBACK丢失,Client会重新发送,而且不会去检测是否是重发,broker就将消息发送到订阅主题中。
该设置是可靠等级最高的。他会确保发布者不仅仅会推送,而且不会像Qos=1 那样,会被接收两次。当然这个设置会增加网络的负载。当一个消息被发布出去的时候,broker会保存该消息的id,而且会利用任何长连接,坚持要把该消息推送给目标地址。如果Client收到PUBREC 标志,那就表明broker已经收到消息了。 这个时候broker会期待Client发送一个PUBREL 来清除session 中消息id,broker如果发送成功就会发送一个PUBCOMP通知Client。
Wildcard Subscriptions
通配用在主题的目标地址中。这能实现一个主题发送到多个用户,或者多层用户中。
/ is used to separate names in a path(分割路径)
+ is used to match any name in a path(通配地址任何字符)
# is used to recursively match path names(递归通配)
比如通配可能这样来用:
PRICE/# : Any price for any product on any exchange(任何交易中任何产品的价格)
PRICE/STOCK/# : Any price for a stock on any exchange(任何交易中的股票价格)
PRICE/STOCK/NASDAQ/+ : Any stock price on NASDAQ(纳斯达克的任何股票价格)
PRICE/STOCK/+/IBM : Any IBM stock price on any exchange(任何交易中IBM股票价格)
Keep Alive
Apollo只有在Client指定了CONNECT的KeepAlive 值的时候,才会设置保持连接、心跳检测。如果one Client指定了keepalive,apollo 将会使用1.5*keepalive值。这个在MQTT中有说明。
Destination Name Restrictions
路径名称限制了使用(a-z, A-Z, 0-9, _, - %, ~, :, ' ', '(', ')' ,. )字符,通配符(*)在复杂的分隔符中。而且确保使用utf-8来编译你的URL。
源码地址:
官网参考手册:
svn checkOut :
1、下载,安装(安装需要java环境)
& wget&http://www.apache.org/dyn/closer.cgi?path=activemq/activemq-apollo/1.7/apache-apollo-1.7-unix-distro.tar.gz
& tar -zxvf&apache-apollo-1.7-unix-distro.tar.gz
& cd&apache-apollo-1.7
& 新建一个broker实例
& ./bin/apollo create mqtt_test #在当前目录下生成一个mqtt_test目录,其下面包含:
& bin &运行脚本
& etc 环境配置
& data 存储持久化数据
& log &运行日志
& tmp 临时文件
& 注:etc下配置文件说明:
&&一、users.properties:
用来配置可以使用服务器的用户以及相应的密码。
其在文件中的存储方式是:用户名=密码,如:
admin=password
表示新增一个用户,用户名是:admin,密码是:password
& 二、groups.properties:
持有群体的用户映射,可以通过组而不是单个用户简化访问控制列表。
可以为一个定义的组设置多个用户,用户之间用“|”隔开,如:
admins=admin|lily
表示admins组中有admin和lily两个用户
& 三、black-list.txt:
用来存放不允许连接服务器的IP地址,相当于黑名单类似的东西。
例如:10.20.9.147
表示上面IP不能够连接到服务器。
& 四、login.config:
是一个服务器认证的配置文件,为了安全apollo1.6版本提供了认证功能,只有相应的用户名和正确的密码才能够连接
& 五、服务器主配置文件apollo.xml:
该配置文件用于控制打开的端口,队列,安全,虚拟主机设置等。
& & & &1、认证:可以使用&authenticationdomain=&internal& /&来配置是否需要连接认证,如果将其属性enable设置为false表示不用认证,任何人都可以连接服务器,默认为true
& & & &2、access_rule:可以在broker或者virtual_host中用于定义用户对服务器资源的各种行为。如:
&access_rule allow=&users& action=&connect create destroy send receive consume&/&表示群组users里面的用户可以对服务器资源进行的操作有:connect 、create、 destroy、 send 、receive 、consume。详细的操作说明见:
http://activemq.apache.org/apollo/documentation/user-manual.html
& & & &3、message stores:默认情况下apollo使用的是LevelDB store,但是推荐使用BDB store(跨平台的)只能够实用其中一种。使用LevelDB store的配置是:&leveldb_store directory=&${apollo.base}/data&/&默认有提供不用任何修改。使用BDB store需要到网站下jar包支持/maven/com/sleepycat/je/5.0.34/je-5.0.34.jar,将jar包放在服务器的lib目录下面,然后将配置文件改成:&bdb_store
directory=&${apollo.base}/data&/&即可。
& & & &4、connector:用于配置服务器支持的链接协议以及相应的端口。如:
&&connector id=&tcp& bind=&tcp://0.0.0.0:61613& connection_limit=&2000& protocol=&mqtt&/&表示支持tcp链接,使用的端口是61613,链接限制是2000,自动侦听的协议是mqtt协议。
具体查看:
2、配置、启动
配置参考如上说明。若非本机安装,修改控制台web_admin的ip地址,以便访问控制台。
&web_admin bind=&http://0.0.0.0:61680&/&
&web_admin bind=&https://0.0.0.0:61681&/&
./bin/apollo-broker-service start
脚本参数:apollo-broker-service {start|stop|restart|force-stop|status}
启动后,访问http://192.168.36.102:61680/或者https://192.168.36.102:61681。默认用户:admin / password
(1)使用paho的mqttv3测试
package cn.smartslim.mqtt.demo.
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttE
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.MqttT
import org.eclipse.paho.client.mqttv3.internal.MemoryP
public class ApolloServer {
private static String host = &tcp://192.168.36.102:61613&;
private static String userName = &admin&;
private static String passWord = &password&;
private static MqttC
private static MqttT
private static MqttM
private static String topicStr = &mqtt/topic&;
public static void main(String[] args) throws MqttException {
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, &CallbackServer&, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void messageArrived(MqttTopic topicName, MqttMessage message) throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println(&messageArrived----------&);
System.out.println(topicName+&---&+message.toString());
public void deliveryComplete(MqttDeliveryToken token) {
//publish后会执行到这里
System.out.println(&deliveryComplete---------&
+ token.isComplete());
public void connectionLost(Throwable cause) {
// //连接丢失后,一般在这里面进行重连
System.out.println(&connectionLost----------&);
topic = client.getTopic(topicStr);
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+&------ratained状态&);
message.setPayload(&mqtt.....test....&.getBytes());
client.connect(options);
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+&========&);
package cn.smartslim.mqtt.demo.
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttE
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.MqttT
import org.eclipse.paho.client.mqttv3.internal.MemoryP
public class ApolloClient {
private static String host = &tcp://192.168.36.102:61613&;
private static String userName = &admin&;
private static String passWord = &password&;
private static MqttC
private static String topicStr = &mqtt/topic&;
public static void main(String[] args) throws MqttException {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
//MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, &CallbackClient&, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
//这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void messageArrived(MqttTopic topicName, MqttMessage message) throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println(&messageArrived----------&);
System.out.println(topicName+&---&+message.toString());
public void deliveryComplete(MqttDeliveryToken token) {
//publish后会执行到这里
System.out.println(&deliveryComplete---------&
+ token.isComplete());
public void connectionLost(Throwable cause) {
// //连接丢失后,一般在这里面进行重连
System.out.println(&connectionLost----------&);
client.connect(options);
client.subscribe(topicStr, 1);
(2)使用fusesource的Callback阻塞式发布订阅消息
发布消息:
package cn.smartslim.mqtt.demo.
import org.fusesource.mqtt.client.C
import org.fusesource.mqtt.client.CallbackC
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.T
public class CallbackServer {
private static String HOST = &tcp://192.168.36.102:61613&;
private static String USERNAME = &admin&;
private static String PASSWORD = &password&;
private final static boolean CLEAN_START =
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
public static Topic[] topics = { new Topic(&mqtt/fusesource/callback&, QoS.EXACTLY_ONCE) };
public final static long RECONNECTION_ATTEMPT_MAX = 6;
public final static long RECONNECTION_DELAY = 2000;
public final static int SEND_BUFFER_SIZE = 2 * 1024 * 1024;// 发送最大缓冲为2M
public static void main(String[] args) throws Exception {
// 创建MQTT对象
MQTT mqtt = new MQTT();
mqtt.setClientId(&CallbackServer&);
// 设置mqtt broker的ip和端口
mqtt.setHost(HOST);
mqtt.setUserName(USERNAME);
mqtt.setPassword(PASSWORD);
// 连接前清空会话信息
mqtt.setCleanSession(CLEAN_START);
// 设置重新连接的次数
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
// 设置重连的间隔时间
mqtt.setReconnectDelay(RECONNECTION_DELAY);
// 设置心跳时间
mqtt.setKeepAlive(KEEP_ALIVE);
// 设置缓冲的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
// 获取mqtt的连接对象CallbackConnection
CallbackConnection connection = mqtt.callbackConnection();
connection.connect(new Callback&Void&() {
public void onSuccess(Void value) {
System.out.println(&连接成功:&+value);
public void onFailure(Throwable value) {
System.out.println(&连接失败&);
//发布消息
connection.publish(&mqtt/fusesource/callback&, &测试mqtt数据&.getBytes(), QoS.EXACTLY_ONCE,
true, new Callback&Void&() {
public void onSuccess(Void value) {
//与服务器断开连接成功
System.out.println(&发送成功:&+value);
public void onFailure(Throwable value) {
//与服务器断开连接失败
System.out.println(&发送失败&);
Thread.sleep(600000);
订阅消息:
package cn.smartslim.mqtt.demo.
import org.fusesource.hawtbuf.B
import org.fusesource.hawtbuf.UTF8B
import org.fusesource.mqtt.client.C
import org.fusesource.mqtt.client.CallbackC
import org.fusesource.mqtt.client.L
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.T
public class CallbackClient {
private static String HOST = &tcp://192.168.36.102:61613&;
private static String USERNAME = &admin&;
private static String PASSWORD = &password&;
private final static boolean CLEAN_START =
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
public static Topic[] topics = { new Topic(&mqtt/fusesource/callback&, QoS.EXACTLY_ONCE) };
public final static long RECONNECTION_ATTEMPT_MAX = 6;
public final static long RECONNECTION_DELAY = 2000;
public final static int SEND_BUFFER_SIZE = 2 * 1024 * 1024;// 发送最大缓冲为2M
public static void main(String[] args) throws Exception {
// 创建MQTT对象
MQTT mqtt = new MQTT();
mqtt.setClientId(&CallbackClient&);
// 设置mqtt broker的ip和端口
mqtt.setHost(HOST);
mqtt.setUserName(USERNAME);
mqtt.setPassword(PASSWORD);
// 连接前清空会话信息
mqtt.setCleanSession(CLEAN_START);
// 设置重新连接的次数
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
// 设置重连的间隔时间
mqtt.setReconnectDelay(RECONNECTION_DELAY);
// 设置心跳时间
mqtt.setKeepAlive(KEEP_ALIVE);
// 设置缓冲的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
// 获取mqtt的连接对象CallbackConnection
CallbackConnection connection = mqtt.callbackConnection();
connection.connect(new Callback&Void&() {
//可以在connect的onSuccess方法中发布或者订阅相应的主题
public void onSuccess(Void oid) {
//进入该方法表示连接成功连接成功
System.out.println(&连接成功:&+oid);
//onFailure方法中作相应的断开连接等操作
public void onFailure(Throwable cause) {
//进入该方法表示连接失败
System.out.println(&连接失败&);
connection.listener(new Listener() {
//表示成功,可以获取到订阅的主题和订阅的内容(UTF8Buffer topicmsg表示订阅的主题,
//Buffer msg表示订阅的类容),一般的可以在这个方法中获取到订阅的主题和内容然后进行相应的判断和其他业务逻辑操作;
public void onPublish(UTF8Buffer topicmsg, Buffer msg, Runnable ack) {
//utf-8 is used for dealing with the garbled
String topic = topicmsg.utf8().toString();
String payload = msg.utf8().toString();
System.out.println(&topic:&+topic);
System.out.println(&message:&+payload);
//表示监听成功
ack.run();
//表示监听失败,这里可以调用相应的断开连接等方法;
public void onFailure(Throwable arg0) {
//表示监听失败
//表示监听到连接建立,该方法只在建立连接成功时执行一次,
//表示连接成功建立,如果有必要可以在该方法中进行相应的订阅操作;
public void onDisconnected() {
//表示监听到断开连接
//表示监听到连接断开,该方法只在断开连接时执行一次,如有必要可以进行相应的资源回收操作。
public void onConnected() {
//表示监听到连接成功
connection.subscribe(topics, new Callback&byte[]&() {
//Topic[] topics表示定于的主题数组,注意只有在改方法订阅的主题,才能够在监听方法中接收到。
public void onSuccess(byte[] qoses) {
//主题订阅成功
System.out.println(&订阅成功:&+new String(qoses));
public void onFailure(Throwable arg0) {
//状态主题订阅失败
System.out.println(&订阅失败&);
//断开连接
/*connection.disconnect(new Callback&Void&() {
public void onSuccess(Void value) {
//与服务器断开连接成功
public void onFailure(Throwable value) {
//与服务器断开连接失败
//回调将执行与连接相关联的调度队列,以便可以安全使用从回调的连接,但你绝不能在回调中执行任何阻塞操作,否则会改变执行的顺序,
//这样可能出错。如果可能存在阻塞时,最好是在连接的调度队列中执行另外一个线程
connection.getDispatchQueue().execute(new Runnable() {
public void run() {
//在这里进行相应的订阅、发布、停止连接等等操作
Thread.sleep(600000);
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
--------------代码托管---------------
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'}

我要回帖

更多关于 北野武大佬百度云盘 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信