添加MQTT依赖
implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2’
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’
在Manifest清单文件中添加服务
<service android:name="org.eclipse.paho.android.service.MqttService" />
两种实现方法
MqttClient的实现方式
MQTT初始化连接线程,实现与服务器的连接、订阅、发布消息
private class ConnectTask extends AsyncTask<Void, String, Boolean> {
//MQTT连接线程,实现与服务器的连接、订阅、发布消息
/**
* 异步任务:AsyncTask<Params, Progress, Result>
* 1.Params:UI线程传过来的参数。
* 2.Progress:发布进度的类型。
* 3.Result:返回结果的类型。耗时操作doInBackground的返回结果传给执行之后的参数类型。
*
* 执行流程:
* 1.onPreExecute()
* 2.doInBackground()-->onProgressUpdate()
* 3.onPostExecute()
*/
@Override
protected void onPreExecute() //执行耗时操作之前处理UI线程事件
{
super.onPreExecute();
isConnecting = true;
connectionStatusTextView.setText("Connecting...");
}
@Override
protected Boolean doInBackground(Void... voids)
{
//在此方法执行耗时操作,耗时操作中收发MQTT服务器的数据
//MQTT服务器地址
String brokerUrl = "tcp://www.10086.com:1883";
//客户端ID,用于在MQTT服务器上
String clientId = MqttClient.generateClientId();
try {
mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
} catch (MqttException e) {
throw new RuntimeException(e);
}
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
//mqtt服务器用户名和密码
connectOptions.setUserName("username");
connectOptions.setPassword("password".toCharArray());
connectOptions.setWill("断线消息主题","断线消息内容".getBytes(),1,true);
connectOptions.setConnectionTimeout(10);
connectOptions.setKeepAliveInterval(20);
try {
mqttClient.connect(connectOptions);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
Log.e(TAG, "连接丢失");//连接丢失的时候可以在这里进行重新连接
publishProgress("Connection lost, reconnecting...");
new ReconnectTask().execute();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息:"+message.toString());
if (topic.equals("sensors/temperature")) {
// Update temperature reading
publishProgress("Temperature: " + message.toString());
} else if (topic.equals("sensors/humidity")) {
// Update humidity reading
publishProgress("Humidity: " + message.toString());
} else if (topic.equals("leds/led1/status")) {
// Update LED 1 status
if (message.toString().equals("on")) {
publishProgress("LED 1 is on");
ledStatusImageView.setText("LED 1 is on");
} else {
publishProgress("LED 1 is off");
ledStatusImageView.setText("LED 1 is off");
}
} else if (topic.equals("leds/led2/status")) {
// Update LED 2 status
if (message.toString().equals("on")) {
publishProgress("LED 2 is on");
ledStatusImageView.setText("LED 2 is on");
} else {
publishProgress("LED 2 is off");
ledStatusImageView.setText("LED 2 is off");
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.i(TAG, "成功发送");
}
});
//这里是订阅的话题
/**
* subscribe的第二个参数如果是消息等级,其代表的意义是:
*
* qos = 0 : 消息最多发送一次,在客户端离线等不可用的情况下客户端将会丢失这条消息。
*
* qos = 1 : 消息至少发送一次,能保证客户端能收到该条消息。
*
* qos = 2 : 消息仅传送一次。
*/
mqttClient.subscribe("zhyj/temperature",1);
mqttClient.subscribe("zhyj/mj1/status",1);
mqttClient.subscribe("zhyj/mj22/status",1);
} catch (MqttException e) {
Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
publishProgress("Error connecting to MQTT broker: " + e.getMessage());
return false;
}
return true;
}
@Override
protected void onProgressUpdate(String... values) {
//用于在主线程处理doInBackground()方法执行完毕后的结果,更新UI或者执行其它操作
super.onProgressUpdate(values);
connectionStatusTextView.setText(values[0]);
}
@Override
protected void onPostExecute(Boolean aBoolean)
{
//用于在主线程处理doInBackground()方法执行完毕后的结果,更新UI或者执行其它操作
super.onPostExecute(aBoolean);
isConnecting = false;
if (aBoolean) {
connectionStatusTextView.setText("Connected");
}
}
}
MQTT重连
private class ReconnectTask extends AsyncTask<Void, String, Boolean> {
@Override
protected void onPreExecute() {
super.onPreExecute();
isConnecting = true;
connectionStatusTextView.setText("Reconnecting...");
}
@Override
protected Boolean doInBackground(Void... voids) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
} catch (MqttException e) {
Log.e(TAG, "Error disconnecting from MQTT broker: " + e.getMessage());
}
}
return new ConnectTask().doInBackground();
}
@Override
protected void onPostExecute(Boolean aBoolean) {
super.onPostExecute(aBoolean);
isConnecting = false;
if (aBoolean) {
connectionStatusTextView.setText("Connected");
}
}
}
MQTT断开
private class DisconnectTask extends AsyncTask<Void, Void, Void> {
@Override
protected void onPreExecute() {
super.onPreExecute();
connectionStatusTextView.setText("Disconnecting...");
}
@Override
protected Void doInBackground(Void... voids) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
} catch (MqttException e) {
Log.e("dbj", "Error disconnecting from MQTT broker: " + e.getMessage());
}
}
return null;
}
@Override
protected void onPostExecute(Void aVoid) {
super.onPostExecute(aVoid);
connectionStatusTextView.setText("Disconnected");
ledStatusImageView.setText("Disconnected");
}
}
发送消息
private class ToggleLedTask extends AsyncTask<String, Void, Void> {
@Override
protected Void doInBackground(String... strings) {
if (mqttClient != null && mqttClient.isConnected()) {
String topic = "zhyj/" + strings[0] + "/control";
MqttMessage message = new MqttMessage();
if (ledStatusImageView.getText().toString().contains("on")) {
message.setPayload("off".getBytes());
} else {
message.setPayload("on".getBytes());
}
try {
//publish()的第三个参数和subscribe的第二个参数的qos同理
mqttClient.publish(topic, message);
} catch (MqttException e) {
Log.e(TAG, "Error publishing message: " + e.getMessage());
}
}
return null;
}
}
MqttAndroidClient service的实现方式
配置文件添加 MQTTService
public class MQTTService extends Service {
public static final String TAG = "dxj";
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String host = "tcp://www.10086.com:1883";
private String userName = "";
private String passWord = "";
private static String myTopic = "zhyj_mj"; //要订阅的主题
private String clientId = "";//客户端标识
private IGetMessageCallBack IGetMessageCallBack;
@Override
public void onCreate() {
super.onCreate();
Log.e(getClass().getName(), "onCreate");
init();
}
public static void publish(String msg) {
try {
if (client != null) {
client.publish(myTopic, msg.getBytes(), 1, false);
}
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
}
}
private void init() {
// 服务器地址(协议+地址+端口号)
String uri = host;
client = new MqttAndroidClient(this, uri, clientId);
// 设置MQTT监听并且接受消息
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
// 这个标志是标志客户端,服务端是否要保持持久化的一个标志。默认是true
//设置客户端和服务端重启或重连后是否需要记住之前的状态
conOpt.setCleanSession(false);
// 设置超时时间,单位:秒
conOpt.setConnectionTimeout(10);
// 心跳包发送间隔,单位:秒
conOpt.setKeepAliveInterval(20);
//连接丢失的情况下,客户端将尝试重新连接到服务器。
// 在尝试重新连接之前,它最初将等待1秒,对于每次失败的重新连接尝试,
// 延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟
// conOpt.setAutomaticReconnect(true);
// 用户名
conOpt.setUserName(userName);
// 密码
conOpt.setPassword(passWord.toCharArray()); //将字符串转换为字符串数组
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + clientId + "\"}";
Log.e(getClass().getName(), "message是:" + message);
String topic = myTopic;
// 最后的遗嘱
// MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
//当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
//当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
try {
conOpt.setWill(topic, message.getBytes(), 1, false);
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
if (doConnect) {
doClientConnection();
}
}
@Override
public void onDestroy() {
stopSelf();
try {
if (isAlreadyConnected()) {
client.disconnect();
client.unregisterResources();
}
IGetMessageCallBack.setStatus("断开连接");
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
}
super.onDestroy();
}
/**
* 连接MQTT服务器
*/
private void doClientConnection() {
if (!isAlreadyConnected() && isConnectIsNormal()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
}
}
}
// MQTT是否连接成功
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
try {
/**
* subscribe的第二个参数如果是消息等级,其代表的意义是:
*
* qos = 0 : 消息最多发送一次,在客户端离线等不可用的情况下客户端将会丢失这条消息。
*
* qos = 1 : 消息至少发送一次,能保证客户端能收到该条消息。
*
* qos = 2 : 消息仅传送一次。
*/
client.subscribe("zhyj/mj/status", 1);
IGetMessageCallBack.setStatus("连接成功");
} catch (MqttException e) {
e.printStackTrace();
IGetMessageCallBack.setStatus("连接失败");
Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
// 连接失败,重连
Log.e(TAG, "Error publishing message: " + arg0.toString());
IGetMessageCallBack.setStatus("连接失败");
}
};
// MQTT监听并且接受消息
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
IGetMessageCallBack.setStatus("收到消息");
String str1 = new String(message.getPayload());
if (IGetMessageCallBack != null) {
IGetMessageCallBack.setMessage(str1);
}
String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
IGetMessageCallBack.setStatus("成功发送");
}
@Override
public void connectionLost(Throwable arg0) {
// 失去连接,重连
Log.e(TAG, "连接丢失 isAlreadyConnected=" + isAlreadyConnected());
IGetMessageCallBack.setStatus("连接丢失");
}
};
/**
* 判断网络是否连接
*/
private boolean isConnectIsNormal() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.e(TAG, "MQTT当前网络名称:" + name);
return true;
} else {
Log.e(TAG, "MQTT 没有可用网络");
return false;
}
}
@Override
public IBinder onBind(Intent intent) {
Log.e(getClass().getName(), "onBind");
return new CustomBinder();
}
public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack) {
this.IGetMessageCallBack = IGetMessageCallBack;
}
public class CustomBinder extends Binder {
public MQTTService getService() {
Log.e(getClass().getName(), "CustomBinder");
return MQTTService.this;
}
}
public void toCreateNotification(String message) {
PendingIntent pendingIntent = PendingIntent.getActivity(this, 1, new Intent(this, MQTTService.class), PendingIntent.FLAG_UPDATE_CURRENT);
NotificationCompat.Builder builder = new NotificationCompat.Builder(this);//3、创建一个通知,属性太多,使用构造器模式
Notification notification = builder
.setTicker("test_title")
.setSmallIcon(R.mipmap.ic_launcher)
.setContentTitle("")
.setContentText(message)
.setContentInfo("")
.setContentIntent(pendingIntent)//点击后才触发的意图,“挂起的”意图
.setAutoCancel(true) //设置点击之后notification消失
.build();
NotificationManager notificationManager = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
startForeground(0, notification);
notificationManager.notify(0, notification);
}
public boolean isAlreadyConnected() {
if (client != null) {
try {
boolean result = client.isConnected();
if (result) {
return true;
} else {
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
} else {
return false;
}
}
}
为了方便Service与Acitivity之间的通信,创建一个工具类作为桥梁
public class MyServiceConnection implements ServiceConnection {
private MQTTService mqttService;
private IGetMessageCallBack IGetMessageCallBack;
@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
mqttService = ((MQTTService.CustomBinder)iBinder).getService();
mqttService.setIGetMessageCallBack(IGetMessageCallBack);
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
}
public MQTTService getMqttService(){
return mqttService;
}
public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack){
this.IGetMessageCallBack = IGetMessageCallBack;
}
几个重要的参数
MqttConnectOptions.setAutomaticReconnect(true)
true表示支持自动重连
连接丢失的情况下,客户端将尝试重新连接到服务器。
在尝试重新连接之前,它最初将等待1秒,对于每次失败的重新连接尝试,
延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟
MqttConnectOptions.setCleanSession(true)
官方注释: 如果设置为false,则客户端和服务器将在重新启动客户端、服务器和连接时保持状态。当状态保持时:
即使重新启动客户端、服务器或连接,消息传递也将可靠地满足指定的QOS。
服务器会将订阅视为持久订阅。
如果设置为true,则客户端和服务器将不会在重新启动客户端、服务器或连接时保持状态。这意味着
如果重新启动客户端、服务器或连接,则无法维持向指定QOS的消息传递
这个标志是标志客户端,服务端是否要保持持久化的一个标志。默认是true
设置客户端和服务端重启或重连后是否需要记住之前的状态。
当setCleanSession为true时,客户端掉线或服务端重启后,服务端和客户端会清掉之前的 session, 重连后客户端会有一个新的session。离线期间发来QoS=0,1,2的消息一律接收不到,且所有之前订阅的topic需要重新订阅。
··························································································
当setCleanSession为false时, 客户端掉线或服务端重启后,服务端和客户端不会清除之前的session。重连后session将会恢复,客户端不用重新订阅主题。且离线期间发来QoS=0,1,2的消息仍然可以接收到。
这里有个需要注意的地方,即setCleanSession为true时,掉线后客户端设置了setAutomaticReconnect为true才会自动重连。为当setCleanSession为false时。不管setAutomaticReconnect为true或者false都会自动重连。
文章来源:https://uudwc.com/A/LmVEx
MqttConnectOptions.setKeepAliveInterval(30);
心跳包发送间隔,单位:秒
MQTT客户端(client)在没有收到或发布任何消息时仍然是保持连接的。服务端(the broker)需要跟踪客户端的连接状态。 所有需要发送心跳包来确定客户端是否是连接状态。心跳包发送的时间间隔就是keepalive设置的。
服务端会维持一个timer。当这个timer记录的时间超过1.5倍keepalive值时,服务端会将这个客户端标记为断开连接,并发送Last Will and Testament (LWT)遗言广播。
每次客户端发送或接收一个消息, 服务端会重置这个timer。文章来源地址https://uudwc.com/A/LmVEx