1. 前言
这里我们使用springboot搭建一个轻量级的mqtt客户端,连接mqtt的Broker服务。
连接信息写在配置文件里application.properties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60
2. 引入依赖
1 | org.springframework.bootspring-boot-starter-integrationorg.springframework.integrationspring-integration-streamorg.springframework.integrationspring-integration-mqtt |
3. 配置文件
新建MqttProperties.java文件,初始化application里的mqtt配置项
1 2 3 4 5 6 7 8 9 10 11 12 13 | @ConfigurationProperties ( "spring.mqtt" ) @Component @Getter @Setter public class MqttProperties { private String username; private String mqpassword; private String hostUrl; private String clientId; private String defaultTopic; private String completionTimeout; private Integer keepAlive; } |
新建MqttConfiguration.java文件,为mqtt做初始化配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | @Configuration @Slf4j public class MqttConfiguration { @Autowired private MqttProperties mqttProperties; /** * 事件触发 */ @Autowired private ApplicationEventPublisher eventPublisher; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions= new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray()); mqttConnectOptions.setServerURIs( new String[]{mqttProperties.getHostUrl()}); mqttConnectOptions.setKeepAliveInterval( 2 ); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+ "_inbound" , mqttClientFactory(), mqttProperties.getDefaultTopic().split( "," )); adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout())); adapter.setConverter( new DefaultPahoMessageConverter()); //默认添加TopicName中所有tipic adapter.addTopic( "+/+/test" ); adapter.setQos( 2 ); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator (inputChannel = "mqttInputChannel" ) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { String topic = message.getHeaders().get( "mqtt_receivedTopic" ).toString(); String qos = message.getHeaders().get( "mqtt_receivedQos" ).toString(); //触发事件 这里不再做业务处理,包 listener中做处理 eventPublisher.publishEvent( new MqttEvent( this ,topic,message.getPayload().toString())); } }; } /** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator (inputChannel = "mqttOutboundChannel" ) public MessageHandler mqttOutbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory()); // 如果设置成true,发送消息时将不会阻塞。 messageHandler.setAsync( true ); messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } |
4. MQTT消息类
新建MqttEvent.java 消息类。用于发送mqtt的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 | @Getter public class MqttEvent extends ApplicationEvent { private String topic; /** * 发送的消息 */ private String message; public MqttEvent(Object source,String topic,String message) { super (source); this .topic = topic; this .message = message; } } |
5. MQTT消息接收器
新建JobListener.java文件作为 mqtt的消息接收类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | @Slf4j @Component public class JobListener { @Autowired DeviceDao deviceDao; /** * 监听topic * @param mqttEvent */ @EventListener (condition = "#mqttEvent.topic.startsWith('pay')" ) public void onEmqttCall1(MqttEvent mqttEvent) throws Exception { String topic = mqttEvent.getTopic(); //写逻辑处理 } /** * 监听topic * @param mqttEvent */ @EventListener (condition = "#mqttEvent.topic.equals('device')" ) public void onEmqttCallT(MqttEvent mqttEvent){ log.info( "接收到消11111111111:" +mqttEvent.getMessage()); } } |
6. MQTT消息发送器
新建MqttGateway.java 提供发送mqttt消息的接口服务
1 2 3 4 5 6 7 | @Component @MessagingGateway (defaultRequestChannel = "mqttOutboundChannel" ) public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, @Header (MqttHeaders.QOS) int qos, String payload); } |
7. 测试MQTT发送消息
1 2 3 4 5 6 7 8 9 | @SpringBootTest public class Test3 { @Autowired MqttGateway mqttGateway; @Test public void mqttTest () { mqttGateway.sendToMqtt( "111//222/33" , "消息内容" ); } } |
到此这篇关于Springboot集成mqtt客户端详解的文章就介绍到这了,更多相关Springboot集成mqtt内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!