使用mqtt发送消息Spring starter

最近有个项目需要发送视频弹幕,实现的手段还是不少的,这里用Mqtt来实现。

准备

首先是环境准备,这里使用 EMQX 5.0 社区版作为Mqtt的消息服务器,使用Docker进行搭建:

version: '3.2'
services:
      emqx:
    image: emqx/emqx:v5.0.0
    container_name: emqx
    environment:
      EMQX_NAME: xinwei-emqx
    restart: always
    ports:
      - '1883:1883'
      - '8081:8081'
      - '8083:8083'
      - '8883:8883'
      - '8084:8084'
      - '8099:8080'
      - '18083:18083'
    volumes:
      - /service/env/emqx/data:/opt/emqx/data
      - /service/env/emqx/etc:/opt/emqx/etc
      - /service/env/emqx/log:/opt/emqx/log

正文

Maven

具体版本可以由项目的spring-boot版本决定。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

配置类

本文不对mqtt进行鉴权,需要的可以自行拓展
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
@RefreshScope
public class MqttProperties {


    private String url;

    private AuthTypeEnum authType = AuthTypeEnum.NONE; 

    private String username;
    private String password;
    private String wsUrl;
    private String client;

    private Topic defaultSendTopic = new Topic("xinwei_mqtt_default", 1);

    private List<Topic> receiveTopics;

    private Integer keepAlive = 15;

    private Integer completionTimeout = 3000;


    /**
     * Mqtt 权限认证类型枚举
     */
    @NoArgsConstructor
    public enum AuthTypeEnum {
        /**
         * 无
         */
        NONE,
        /**
         * 客户端id
         */
        CLIENT_ID,
        /**
         * 用户名
         */
        USERNAME
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Topic {
        private String name;

        private Integer qos;
    }
}

注入配置

  1. 消息发送

    @AutoConfiguration
    @EnableConfigurationProperties(MqttProperties.class)
    public class MqttConfiguration {
    
        @Bean
        public MessageChannel mqttValueInputChannel() {
            return new DirectChannel();
        }
    
        /**
         * 不缓存消息
         *
         * @return 消息通道
         */
        @Bean
        public MessageChannel mqttOutputClearSessionChannel() {
            return new DirectChannel();
        }
    
        /**
         * 缓存消息
         *
         * @return 消息通道
         */
        @Bean
        public MessageChannel mqttOutputRetainSessionChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MessageProducer valueInbound(MqttProperties mqttProperties) {
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttProperties.getClient() + StrPool.DASHED + "receiver",
                mqttClearSessionClientFactory(mqttProperties),
                mqttProperties.getReceiveTopics().stream()
                    .map(MqttProperties.Topic::getName)
                    .toArray(String[]::new));
            adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(mqttProperties.getReceiveTopics().stream()
                .mapToInt(MqttProperties.Topic::getQos)
                .toArray());
            adapter.setOutputChannel(mqttValueInputChannel());
            return adapter;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "mqttOutputClearSessionChannel")
        public MessageHandler clearSessionOutbound(MqttProperties mqttProperties) {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "clear-session-sender", mqttClearSessionClientFactory(mqttProperties));
            messageHandler.setAsync(true);
            messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos());
            messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName());
            return messageHandler;
        }
    
        @Bean
        public MqttPahoClientFactory mqttClearSessionClientFactory(MqttProperties mqttProperties) {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttClearSessionConnectOptions(mqttProperties));
            return factory;
        }
    
        @Bean
        public MqttConnectOptions getMqttClearSessionConnectOptions(MqttProperties mqttProperties) {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    
            if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) {
                mqttConnectOptions.setUserName(mqttProperties.getUsername());
                mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
            }
            mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
            mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
            return mqttConnectOptions;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "mqttOutputRetainSessionChannel")
        public MessageHandler retainSessionOutbound(MqttProperties mqttProperties) {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "retain-session-sender", mqttRetainSessionClientFactory(mqttProperties));
            messageHandler.setAsync(true);
            messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos());
            messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName());
            return messageHandler;
        }
    
    
        @Bean
        public MqttPahoClientFactory mqttRetainSessionClientFactory(MqttProperties mqttProperties) {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttRetainSessionConnectOptions(mqttProperties));
            return factory;
        }
    
        @Bean
        public MqttConnectOptions getMqttRetainSessionConnectOptions(MqttProperties mqttProperties) {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    
            if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) {
                mqttConnectOptions.setUserName(mqttProperties.getUsername());
                mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
            }
            mqttConnectOptions.setCleanSession(false);
            mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
            mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
            return mqttConnectOptions;
        }
    
    }
  2. 消息接收

    /**
     * 消息接收
     *
     * @author Mc
     */
    @Slf4j
    @AutoConfiguration(after = MqttConfiguration.class)
    public class MqttReceiveConfiguration {
    
        @Bean
        @ServiceActivator(inputChannel = "mqttValueInputChannel")
        public MessageHandler handlerValue() {
            return message -> {
                MessageHeaders headers = message.getHeaders();
                //获取消息Topic
                String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
                log.info("获取到的消息的topic :{} ", receivedTopic);
                //获取消息体
                String payload = (String) message.getPayload();
                log.info("获取到的消息的payload :{} ", payload);
            };
        }
    }

imports

META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports文件中放入配置文件路径

com.xxx.cloud.mqtt.config.MqttConfiguration
com.xxx.cloud.mqtt.config.MqttReceiveConfiguration

消息发送

  1. 清除session

    @MessagingGateway(defaultRequestChannel = "mqttOutputClearSessionChannel")
    public interface MqttClearSessionSendHandler {
        /**
         * 使用 Default Topic & Default Qos 发送数据
         *
         * @param data string
         */
        void sendToMqtt(String data);
    
        /**
         * 使用 Default Topic & 自定义 Qos 发送数据
         *
         * @param qos  自定义 Qos
         * @param data string
         */
        void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);
    
        /**
         * 使用 自定义 Topic & Default Qos 发送数据
         *
         * @param topic 自定义 Topic
         * @param data  string
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
    
        /**
         * 使用 自定义 Topic & 自定义 Qos 发送数据
         *
         * @param topic 自定义 Topic
         * @param qos   自定义 Qos
         * @param data  string
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
    
        /**
         * 使用 自定义 Topic & 自定义 Qos 发送数据
         *
         * @param topic   自定义 Topic
         * @param qos     自定义 Qos
         * @param payload 自定义负载
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
  2. 不清除session

    @MessagingGateway(defaultRequestChannel = "mqttOutputRetainSessionChannel")
    public interface MqttRetainSessionSendHandler {
    
        /**
         * 使用 Default Topic & Default Qos 发送数据
         *
         * @param data string
         */
        void sendToMqtt(String data);
    
        /**
         * 使用 Default Topic & 自定义 Qos 发送数据
         *
         * @param qos  自定义 Qos
         * @param data string
         */
        void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);
    
        /**
         * 使用 自定义 Topic & Default Qos 发送数据
         *
         * @param topic 自定义 Topic
         * @param data  string
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
    
        /**
         * 使用 自定义 Topic & 自定义 Qos 发送数据
         *
         * @param topic 自定义 Topic
         * @param qos   自定义 Qos
         * @param data  string
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
    
        /**
         * 使用 自定义 Topic & 自定义 Qos 发送数据
         *
         * @param topic   自定义 Topic
         * @param qos     自定义 Qos
         * @param payload 自定义负载
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }

使用

  1. 引入依赖
  2. 配置文件

    spring:
      mqtt:
        url: tcp://xxx.xxx.com:1883
        ws-url: ws://xxx.xxx.com:8083
        auth-type: NONE
        username: admin
        password: public
    #    ca-crt: classpath:/certs/broker.emqx.io-ca.crt
        client:  ${spring.application.name}
        receive-topics:
          - qos: 1
            name: driver/${spring.application.name}/device/+
          - qos: 1
            name: driver/${spring.application.name}/gateway/+
        default-send-topic:
          qos: 1
          name: default/${spring.application.name}
        keep-alive: 15
        completion-timeout: 3000
  3. 注入后调用方法即可发送

    @Resource
    MqttClearSessionSendHandler sendHandler;
如果觉得我的文章对你有用,请随意赞赏