依赖
1
2
3
4
5
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttServer {

public static void main(String[] args) {
Server();
}

public static void Server(){
//mqtt服务器地址
String MQTT_HOST = "tcp://10.245.228.67:1883";
//设备唯一标识
String MQTT_CLIENT_ID = "server1";
//连接服务器的用户名
String userName = "admin";
//连接服务器的密码
String password = "password";
//订阅标识
String MQTT_TOPIC = "this is my first topic";


MqttClient client = null;
//连接mqtt服务器
try {
client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//会话超时时间
options.setConnectionTimeout(10);
//心跳时间
options.setKeepAliveInterval(20);
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}

try {
//创建message对象
MqttMessage message = new MqttMessage();
//创建消息
MqttTopic topic = client.getTopic(MQTT_TOPIC);
//结果等级,0-只发一次不管是否被接收到,1-发送多次保证至少发送成功给接受者(可能出现消息重复),2-保证接受者只接受一次(消息不会未接收到且不会重复接收)
message.setQos(2);
//服务器是否保留该消息
message.setRetained(false);
//传入推送内容
message.setPayload("这是推送的内容".getBytes());
//进行推送
while (true) {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("已经发布了");
Thread.sleep(10000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}

}

}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {

public static void main(String[] args) {
Client();
}

public static void Client(){

//mqtt服务器地址
String MQTT_HOST = "tcp://10.245.228.67:1883";
//设备唯一标识
String MQTT_CLIENT_ID = "client1";
//连接服务器的用户名
String userName = "admin";
//连接服务器的密码
String password = "password";
//订阅标识
String MQTT_TOPIC = "this is my first topic";

MqttClient client = null;
//连接mqtt服务器
try {
client = new MqttClient(MQTT_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//会话超时时间
options.setConnectionTimeout(10);
//心跳时间
options.setKeepAliveInterval(20);
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}

//订阅消息
try {
//设置订阅的topic(不能为空)
client.subscribe(MQTT_TOPIC);
//设置回调
client.setCallback(new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
System.out.println("连接失去了");
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接受消息成功,topic:"+topic+" 内容:"+message);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {

}
});
} catch (MqttException e) {
e.printStackTrace();
}
}

}