请注意,本文编写于 930 天前,最后修改于 557 天前,其中某些信息可能已经过时。
使用mqtt发送消息Spring starter
最近有个项目需要发送视频弹幕,实现的手段还是不少的,这里用Mqtt来实现。
准备
首先是环境准备,这里使用 EMQX 5.0 社区版作为Mqtt
的消息服务器,使用Docker
进行搭建:
version: '3.2'
services:
emqx:
image: emqx/emqx:v5.0.0
container_name: emqx
environment:
EMQX_NAME: xinwei-emqx
restart: always
ports:
- '1883:1883'
- '8081:8081'
- '8083:8083'
- '8883:8883'
- '8084:8084'
- '8099:8080'
- '18083:18083'
volumes:
- /service/env/emqx/data:/opt/emqx/data
- /service/env/emqx/etc:/opt/emqx/etc
- /service/env/emqx/log:/opt/emqx/log
正文
Maven
具体版本可以由项目的spring-boot
版本决定。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置
配置类
本文不对mqtt
进行鉴权,需要的可以自行拓展
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
@RefreshScope
public class MqttProperties {
private String url;
private AuthTypeEnum authType = AuthTypeEnum.NONE;
private String username;
private String password;
private String wsUrl;
private String client;
private Topic defaultSendTopic = new Topic("xinwei_mqtt_default", 1);
private List<Topic> receiveTopics;
private Integer keepAlive = 15;
private Integer completionTimeout = 3000;
/**
* Mqtt 权限认证类型枚举
*/
@NoArgsConstructor
public enum AuthTypeEnum {
/**
* 无
*/
NONE,
/**
* 客户端id
*/
CLIENT_ID,
/**
* 用户名
*/
USERNAME
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Topic {
private String name;
private Integer qos;
}
}
注入配置
消息发送
@AutoConfiguration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfiguration { @Bean public MessageChannel mqttValueInputChannel() { return new DirectChannel(); } /** * 不缓存消息 * * @return 消息通道 */ @Bean public MessageChannel mqttOutputClearSessionChannel() { return new DirectChannel(); } /** * 缓存消息 * * @return 消息通道 */ @Bean public MessageChannel mqttOutputRetainSessionChannel() { return new DirectChannel(); } @Bean public MessageProducer valueInbound(MqttProperties mqttProperties) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClient() + StrPool.DASHED + "receiver", mqttClearSessionClientFactory(mqttProperties), mqttProperties.getReceiveTopics().stream() .map(MqttProperties.Topic::getName) .toArray(String[]::new)); adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttProperties.getReceiveTopics().stream() .mapToInt(MqttProperties.Topic::getQos) .toArray()); adapter.setOutputChannel(mqttValueInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttOutputClearSessionChannel") public MessageHandler clearSessionOutbound(MqttProperties mqttProperties) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "clear-session-sender", mqttClearSessionClientFactory(mqttProperties)); messageHandler.setAsync(true); messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos()); messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName()); return messageHandler; } @Bean public MqttPahoClientFactory mqttClearSessionClientFactory(MqttProperties mqttProperties) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttClearSessionConnectOptions(mqttProperties)); return factory; } @Bean public MqttConnectOptions getMqttClearSessionConnectOptions(MqttProperties mqttProperties) { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) { mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); } mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()}); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean @ServiceActivator(inputChannel = "mqttOutputRetainSessionChannel") public MessageHandler retainSessionOutbound(MqttProperties mqttProperties) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "retain-session-sender", mqttRetainSessionClientFactory(mqttProperties)); messageHandler.setAsync(true); messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos()); messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName()); return messageHandler; } @Bean public MqttPahoClientFactory mqttRetainSessionClientFactory(MqttProperties mqttProperties) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttRetainSessionConnectOptions(mqttProperties)); return factory; } @Bean public MqttConnectOptions getMqttRetainSessionConnectOptions(MqttProperties mqttProperties) { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) { mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); } mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()}); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } }
消息接收
/** * 消息接收 * * @author Mc */ @Slf4j @AutoConfiguration(after = MqttConfiguration.class) public class MqttReceiveConfiguration { @Bean @ServiceActivator(inputChannel = "mqttValueInputChannel") public MessageHandler handlerValue() { return message -> { MessageHeaders headers = message.getHeaders(); //获取消息Topic String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); log.info("获取到的消息的topic :{} ", receivedTopic); //获取消息体 String payload = (String) message.getPayload(); log.info("获取到的消息的payload :{} ", payload); }; } }
imports
在META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件中放入配置文件路径
com.xxx.cloud.mqtt.config.MqttConfiguration
com.xxx.cloud.mqtt.config.MqttReceiveConfiguration
消息发送
清除
session
@MessagingGateway(defaultRequestChannel = "mqttOutputClearSessionChannel") public interface MqttClearSessionSendHandler { /** * 使用 Default Topic & Default Qos 发送数据 * * @param data string */ void sendToMqtt(String data); /** * 使用 Default Topic & 自定义 Qos 发送数据 * * @param qos 自定义 Qos * @param data string */ void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data); /** * 使用 自定义 Topic & Default Qos 发送数据 * * @param topic 自定义 Topic * @param data string */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); /** * 使用 自定义 Topic & 自定义 Qos 发送数据 * * @param topic 自定义 Topic * @param qos 自定义 Qos * @param data string */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data); /** * 使用 自定义 Topic & 自定义 Qos 发送数据 * * @param topic 自定义 Topic * @param qos 自定义 Qos * @param payload 自定义负载 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
不清除
session
@MessagingGateway(defaultRequestChannel = "mqttOutputRetainSessionChannel") public interface MqttRetainSessionSendHandler { /** * 使用 Default Topic & Default Qos 发送数据 * * @param data string */ void sendToMqtt(String data); /** * 使用 Default Topic & 自定义 Qos 发送数据 * * @param qos 自定义 Qos * @param data string */ void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data); /** * 使用 自定义 Topic & Default Qos 发送数据 * * @param topic 自定义 Topic * @param data string */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); /** * 使用 自定义 Topic & 自定义 Qos 发送数据 * * @param topic 自定义 Topic * @param qos 自定义 Qos * @param data string */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data); /** * 使用 自定义 Topic & 自定义 Qos 发送数据 * * @param topic 自定义 Topic * @param qos 自定义 Qos * @param payload 自定义负载 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
使用
- 引入依赖
配置文件
spring: mqtt: url: tcp://xxx.xxx.com:1883 ws-url: ws://xxx.xxx.com:8083 auth-type: NONE username: admin password: public # ca-crt: classpath:/certs/broker.emqx.io-ca.crt client: ${spring.application.name} receive-topics: - qos: 1 name: driver/${spring.application.name}/device/+ - qos: 1 name: driver/${spring.application.name}/gateway/+ default-send-topic: qos: 1 name: default/${spring.application.name} keep-alive: 15 completion-timeout: 3000
注入后调用方法即可发送
@Resource MqttClearSessionSendHandler sendHandler;