Browse Source

增加MQTT 连接类库

master
fyf 1 year ago
parent
commit
168bf5d876
9 changed files with 322 additions and 1 deletions
  1. +2
    -0
      app/build.gradle
  2. BIN
     
  3. BIN
     
  4. +9
    -0
      app/src/main/AndroidManifest.xml
  5. +15
    -1
      app/src/main/java/com/example/bpa/MainActivity.java
  6. +5
    -0
      app/src/main/java/com/example/bpa/Model/IMessage.java
  7. +1
    -0
      app/src/main/java/com/example/bpa/app/ICSApp.java
  8. +261
    -0
      app/src/main/java/com/example/bpa/helper/MQTT.java
  9. +29
    -0
      app/src/main/java/com/example/bpa/service/OrderServer.java

+ 2
- 0
app/build.gradle View File

@@ -56,6 +56,8 @@ dependencies {
implementation 'androidx.constraintlayout:constraintlayout:2.1.4'
implementation 'androidx.recyclerview:recyclerview:1.2.1'
implementation 'com.google.android.libraries.places:places:2.5.0'
implementation files('libs\\org.eclipse.paho.android.service-1.1.1.jar')
implementation files('libs\\org.eclipse.paho.client.mqttv3-1.2.5.jar')
testImplementation 'junit:junit:4.13.2'
androidTestImplementation 'androidx.test.ext:junit:1.1.3'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'


BIN
View File


BIN
View File


+ 9
- 0
app/src/main/AndroidManifest.xml View File

@@ -8,6 +8,12 @@
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />

<!--MQTT权限 唤醒,访问网络信息,访问电话权限-->
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.READ_PHONE_STATE" />

<application
android:name=".app.ICSApp"
android:allowBackup="true"
@@ -94,6 +100,9 @@
android:windowSoftInputMode="adjustPan|stateHidden"
android:configChanges="orientation|screenSize|keyboard|keyboardHidden|navigation"
android:exported="false" />
<!-- Mqtt Service -->
<service android:name="org.eclipse.paho.android.service.MqttService">
</service>
</application>

</manifest>

+ 15
- 1
app/src/main/java/com/example/bpa/MainActivity.java View File

@@ -17,7 +17,10 @@ import androidx.fragment.app.FragmentTransaction;

import com.example.bpa.config.ConfigName;
import com.example.bpa.config.DataBus;
import com.example.bpa.helper.MQTT;
import com.example.bpa.helper.ModbusTcpHelper;
import com.example.bpa.helper.ModbusTcpServer;
import com.example.bpa.service.OrderServer;
import com.example.bpa.view.fragment.CloudFragment;
import com.example.bpa.view.fragment.HeplerFragment;
import com.example.bpa.view.fragment.HomeFragment;
@@ -60,6 +63,16 @@ public class MainActivity extends FragmentActivity implements View.OnClickListen
initEvents();
initBusiness();
}

/**
* 界面关闭
*/
@Override
protected void onDestroy() {
MQTT.get().ConnMqttBroken(false);//释放mqtt
ModbusTcpHelper.get().release();//释放modbus
super.onDestroy();
}
//endregion

//region 公有函数
@@ -79,7 +92,6 @@ public class MainActivity extends FragmentActivity implements View.OnClickListen
ShowFragment(homeFragment,"系统主页");
//ShowFragment(systemCapabilitiesFragment,"功能菜单");
DataBus.getInstance().UpdateMainGoods();

}
/**
* 初始化按钮事件
@@ -99,6 +111,8 @@ public class MainActivity extends FragmentActivity implements View.OnClickListen
private void initBusiness() {
//PLC数据监控
ModbusTcpServer.get().Connect(ConfigName.getInstance().Address, ConfigName.getInstance().Post);
//MQTT数据监听
//OrderServer.Get().MqttInit();
}
//endregion



+ 5
- 0
app/src/main/java/com/example/bpa/Model/IMessage.java View File

@@ -0,0 +1,5 @@
package com.example.bpa.Model;

public interface IMessage {
void MessageRecive(String topic,String Message);
}

+ 1
- 0
app/src/main/java/com/example/bpa/app/ICSApp.java View File

@@ -18,6 +18,7 @@ import com.example.bpa.helper.ModbusTcpServer;
import com.example.bpa.helper.SdCart;
import com.example.bpa.helper.ToastUtil;
import com.example.bpa.service.BusinessServer;
import com.example.bpa.service.OrderServer;
import com.example.bpa.view.mode.AlertLogEnum;
import com.example.bpa.view.mode.UserLogEnum;



+ 261
- 0
app/src/main/java/com/example/bpa/helper/MQTT.java View File

@@ -0,0 +1,261 @@
package com.example.bpa.helper;

import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Handler;
import android.util.Log;

import com.example.bpa.Model.IMessage;
import com.example.bpa.Model.IRun;
import com.example.bpa.config.ConfigName;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
* mqtt帮助类
*/
public class MQTT {
//region 单例模式
private static volatile MQTT instance = null;

public static MQTT get() {
MQTT manager = instance;
if (manager == null) {
synchronized (MQTT.class) {
manager = instance;
if (manager == null) {
manager = new MQTT();
instance = manager;
}
}
}
return manager;
}

private MQTT() {
// Connect("emqx_u_block","emqx_p_admin8765490789","10.2.1.21",1883);
}
//endregion

//region 变量
private String string_IMEI_ID = "奶茶机"+java.util.UUID.randomUUID().toString();
private String host = "tcp://broker.emqx.io:1883";
private String userName = "emqx_u_block";
private String passWord = "emqx_p_admin8765490789";
private MqttClient mqttClient;//客户端
private MqttConnectOptions options;//配置 保存控制客户端连接到服务器的方式的选项集。
public IMessage callback=null;//消息回调
public IRun ConnectOk=null;
public IRun Disconnect=null;
//endregion

//region 公有函数

/**
* 初始化配置Mqtt
* @param name
* @param Pass
* @param Ip
* @param post
*/
public void Connect(String name,String Pass,String Ip,int post) {
try {
userName=name;
passWord=Pass;
host="tcp://"+ Ip+":"+post;
//(1)主机地址(2)客户端ID,一般以客户端唯一标识符(不能够和其它客户端重名)(3)最后一个参数是指数据保存在内存(具体保存什么数据,以后再说,其实现在我也不是很确定)
mqttClient = new MqttClient(host, string_IMEI_ID, new MemoryPersistence());
} catch (MqttException e) {
e.printStackTrace();
}
options = new MqttConnectOptions();//MQTT的连接设置
options.setCleanSession(true);//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setUserName(userName);//设置连接的用户名(自己的服务器没有设置用户名)
options.setPassword(passWord.toCharArray());//设置连接的密码(自己的服务器没有设置密码)
mqttClient.setCallback(new MqttCallback() {
@Override//连接丢失后,会执行这里
public void connectionLost(Throwable throwable) {
MessageLog.ShowInfo("mqtt断开连接,尝试重新连接...");
if(Disconnect!=null)
{
Disconnect.Run();
}
Reconnect();
}
@Override//获取的消息会执行这里--arg0是主题,arg1是消息
public void messageArrived(final String ssr, MqttMessage mqttMessage) throws Exception {
final String mqtt_zhuti = ssr;//主题
final String mqtt_message = mqttMessage.toString();//消息
if(callback!=null)
{
callback.MessageRecive(mqtt_zhuti,mqtt_message);
}
}
@Override//订阅主题后会执行到这里
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

}
});
ConnMqttBroken(true);
}

/**
* 连接 或者断开MQTT
* @param bloConn
*/
public void ConnMqttBroken(boolean bloConn)
{
// 必须开启新的线程执行
new Thread(new Runnable() {
@Override
public void run() {
if (bloConn) {
try {
if (mqttClient.isConnected() == false) {
mqttClient.connect(options);//连接服务器,连接不上会阻塞在这
}
MessageLog.ShowInfo("mqtt连接成功!");
if(ConnectOk!=null)
{
ConnectOk.Run();
}
} catch (MqttException e) {
Log.i("MQTT", "mqtt重连接失败!"+e.getMessage());
}
} else {
try {
if (mqttClient!=null && mqttClient.isConnected()) {
mqttClient.disconnect();
}
MessageLog.ShowInfo("mqtt断开成功!");
} catch (MqttException e) {
MessageLog.ShowError("mqtt断开失败!"+e.getMessage());
}
}
}
}).start();
}

/**
* 订阅主题
* @param topic
*/
public void Subscrib(String[] topic)
{
if (mqttClient == null || !mqttClient.isConnected())
{
return;
}
try
{
for (String str:topic)
{
MessageLog.ShowInfo("mqtt订阅主题:"+str);
Log.v("IotMqttService:topic="+str, "订阅topic:"+str);
mqttClient.subscribe(str,2);
}
}
catch(MqttException e)
{
MessageLog.ShowError("mqtt订阅主题失败!"+topic);
}
}

/**
* 发布 (向服务器发送消息)
* @param topic
* @param message
*/
public void publish(String topic, String message) {
Integer qos = 2;
Boolean retained = false;
try {
if(mqttClient.isConnected())
{
Log.v("IotMqttService:topic="+topic, "Mqtt发送消息:"+message);
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
mqttClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
}
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* 判断网络是否连接
*/
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) ConfigName.getInstance().dishesCon.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i("网络", "当前网络名称:" + name);
return true;
} else {
Log.i("网络", "没有可用网络");
/*没有可用网络的时候,延迟3秒再尝试重连*/
// new Handler().postDelayed(new Runnable() {
// @Override
// public void run() {
// ConnMqttBroken(true);
// }
// }, 3000);
return false;
}
}

/**
* 重新连接
*/
private void Reconnect()
{
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
if (isConnectIsNomarl())
{
while (!isConnectIsNomarl())
{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {

}
}
}
} catch (InterruptedException e) {

}


boolean flag = false;
while (!mqttClient.isConnected())
{
try
{
Log.i("MQTT", "mqtt重连中!");
ConnMqttBroken(true);
Thread.sleep(3000);
}
catch (Exception ex)
{
if (!flag)
{
flag = true;
}
}
}
}
}).start();
}
//endregion
}

+ 29
- 0
app/src/main/java/com/example/bpa/service/OrderServer.java View File

@@ -1,6 +1,9 @@
package com.example.bpa.service;

import com.example.bpa.Model.IMessage;
import com.example.bpa.Model.IRun;
import com.example.bpa.Model.IThread;
import com.example.bpa.helper.MQTT;
import com.example.bpa.helper.MessageLog;
import com.example.bpa.helper.ThreadManager;

@@ -40,6 +43,32 @@ public class OrderServer {
// public void RunComplete() throws InterruptedException {
// }
// });


}

/**
* MQTT初始化
*/
public void MqttInit()
{
//消息回调
MQTT.get().callback=new IMessage() {
@Override
public void MessageRecive(String topic, String Message) {
MessageLog.ShowInfo("收到主题:"+topic+",数据:"+Message);
}
};
//连接成功标志
MQTT.get().ConnectOk=new IRun() {
@Override
public void Run() {
String[] Str={"ncj_order"};
MQTT.get().Subscrib(Str);
}
};
//初始化MQTT连接
MQTT.get().Connect("emqx_u_block","emqx_p_admin8765490789","10.2.1.21",1883);
}
//endregion
}

Loading…
Cancel
Save