package com.template.common.utils; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.template.model.mqtt.MqttConfiguration; import com.template.model.mqtt.MqttPushClient; import com.template.model.mqtt.SpringUtil; import com.template.model.pojo.HouseLock; import com.template.model.pojo.HouseNumber; import com.template.model.pojo.UnlockingRecord; import com.template.services.impl.AlarmMessageServiceImpl; import com.template.services.impl.HouseLockServiceImpl; import com.template.services.impl.HouseNumberServiceImpl; import com.template.services.impl.UnlockingRecordServiceImpl; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; /** * 订阅 * * @author issuser */ @Component @Slf4j public class SubscribeSample{ private MqttPushClient client; private MqttConfiguration mqttConfiguration; public SubscribeSample(MqttPushClient client ,MqttConfiguration mqttConfiguration) { this.client = client; this.mqttConfiguration = mqttConfiguration; } // @Scheduled(cron = "0 10 * * * ? ") public void mqtt() { String HOST = "tcp://www.qspms.cn:1883"; String TOPIC = "smartLock/open/device/2d00b258183146c0a2b19f55250c4596"; int qos = 1; String clientid = "open_cloud_smartLock_2d00b258183146c0a2b19f55250c4596"; String userName = "2d00b258183146c0a2b19f55250c4596"; String passWord = "9ac03de350720057"; try { // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 设置回调函数 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { log.info(cause.getMessage()); try { log.info("==============》》》[MQTT] 连接断开,5S之后尝试重连..."); Thread.sleep(5000); MqttPushClient mqttPushClient = new MqttPushClient(); mqttPushClient.connect(mqttConfiguration); if(MqttPushClient.getClient().isConnected()){ log.info("=============>>重连成功"); } } catch (Exception e) { log.error("=============>>>[MQTT] 连接断开,重连失败!<<============="); } } @Override public void messageArrived(String topic, MqttMessage message) { //实例化入库方法 这里就用到SpringUtil类 来手动的注入 HouseLockServiceImpl houseLockService = SpringUtil.getBean(HouseLockServiceImpl.class); HouseNumberServiceImpl houseNumberService = SpringUtil.getBean(HouseNumberServiceImpl.class); UnlockingRecordServiceImpl unlockingRecordService = SpringUtil.getBean(UnlockingRecordServiceImpl.class); AlarmMessageServiceImpl alarmMessageService = SpringUtil.getBean(AlarmMessageServiceImpl.class); System.out.println("topic:" + topic); System.out.println("Qos:" + message.getQos()); System.out.println("message content:" + new String(message.getPayload())); JSONObject s = MessageDecryptUtil.decryptMessage(message.toString(), "6edfcc178c0f415d8e6628238761976f", "2d00b258183146c0a2b19f55250c4596"); String protocol = s.getString("protocol"); // 设备消息上报 if ("2".equals(protocol)) { UnlockingRecord unlockingRecord = new UnlockingRecord(); unlockingRecord.setType("消息类型"); String data = s.getString("data"); System.out.println("data = " + data); JSONObject jsonObject = JSONObject.parseObject(data); System.out.println("jsonObject = " + jsonObject); // 结果 String result = jsonObject.getString("result"); // 时间 String messageTime = jsonObject.getString("messageTime"); // 房间luid String luid = jsonObject.getString("luid"); // 找到房间号 LambdaQueryWrapper WrapperHL = new LambdaQueryWrapper<>(); WrapperHL.eq(HouseLock::getEquipmentType, luid); List list = houseLockService.list(WrapperHL); if (ObjectUtils.isNotEmpty(list) && list.size() > 0) { HouseLock houseLock = list.get(0); Integer houseNumberId = houseLock.getHouseNumberId(); HouseNumber byId = houseNumberService.getById(houseNumberId); unlockingRecord.setHouseNumberId(houseNumberId); if (ObjectUtils.isNotEmpty(byId)) { String roomNumber = byId.getRoomNumber(); unlockingRecord.setRoomNumber(roomNumber); unlockingRecord.setHouseNumberId(houseNumberId); } } // 消息类型 String messageType = jsonObject.getString("messageType"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = sdf.format(Long.valueOf(messageTime)); unlockingRecord.setDateTime(format); if ("1".equals(messageType)) { //密码开锁 unlockingRecord.setUnlockType("密码开锁"); } else if ("2".equals(messageType)) { // 远程开锁 unlockingRecord.setUnlockType("远程开锁"); } else if ("3".equals(messageType)) { //无网络密码开锁 unlockingRecord.setUnlockType("无网络密码开锁"); } else if ("5".equals(messageType)) { //人脸开锁 unlockingRecord.setUnlockType("人脸开锁"); } else if ("10".equals(messageType)) { // 电池电量更新 unlockingRecord.setUnlockType("电池电量更新"); } DateTimeFormatter dateTimeFormatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); unlockingRecord.setCreateTime(LocalDateTime.now().format(dateTimeFormatter1)); unlockingRecord.setUpdateTime(LocalDateTime.now().format(dateTimeFormatter1)); unlockingRecord.setCreateUser("1"); unlockingRecord.setUpdateUser("1"); unlockingRecordService.getSave(unlockingRecord); } else if ("3".equals(protocol)) { // AlarmMessage alarmMessage = new AlarmMessage(); UnlockingRecord alarmMessage = new UnlockingRecord(); alarmMessage.setType("预警类型"); String data = s.getString("data"); JSONObject jsonObject = JSONObject.parseObject(data); // 消息类型 String messageType = jsonObject.getString("messageType"); // 时间 String messageTime = jsonObject.getString("messageTime"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = sdf.format(Long.valueOf(messageTime)); alarmMessage.setDateTime(format); // 房间luid String luid = jsonObject.getString("luid"); // 找到房间号 LambdaQueryWrapper WrapperHL = new LambdaQueryWrapper<>(); WrapperHL.eq(HouseLock::getEquipmentType, luid); List list = houseLockService.list(WrapperHL); if (ObjectUtils.isNotEmpty(list) && list.size() > 0) { HouseLock houseLock = list.get(0); Integer houseNumberId = houseLock.getHouseNumberId(); HouseNumber byId = houseNumberService.getById(houseNumberId); if (ObjectUtils.isNotEmpty(byId)) { String roomNumber = byId.getRoomNumber(); alarmMessage.setRoomNumber(roomNumber); alarmMessage.setHouseNumberId(houseNumberId); } } if ("5".equals(messageType)) { // 低电量 alarmMessage.setUnlockType("低电量"); } else if ("1".equals(messageType)) { alarmMessage.setUnlockType("密码错误(连续失败五次)"); } else if ("4".equals(messageType)) { alarmMessage.setUnlockType("防拆告警"); } DateTimeFormatter dateTimeFormatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); alarmMessage.setCreateTime(LocalDateTime.now().format(dateTimeFormatter1)); alarmMessage.setUpdateTime(LocalDateTime.now().format(dateTimeFormatter1)); alarmMessage.setCreateUser("1"); alarmMessage.setUpdateUser("1"); unlockingRecordService.getSave(alarmMessage); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }); client.connect(options); // 订阅消息 client.subscribe(TOPIC, qos); System.out.println("订阅完成"); } catch (Exception e) { e.printStackTrace(); } } /** * 编辑连接信息 * * @param userName * @param password * @param outTime * @param KeepAlive * @return */ private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) { //MQTT连接设置 MqttConnectOptions option = new MqttConnectOptions(); //设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接 option.setCleanSession(false); //设置连接的用户名 option.setUserName(userName); //设置连接的密码 option.setPassword(password.toCharArray()); //设置超时时间 单位为秒 option.setConnectionTimeout(outTime); //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 option.setKeepAliveInterval(KeepAlive); //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 //option.setWill(topic, "close".getBytes(StandardCharsets.UTF_8), 2, true); option.setMaxInflight(1000); return option; } }