package com.multictrl.common.config; import com.multictrl.common.handler.Mqtt1MessageHandler; import com.multictrl.common.handler.Mqtt2MessageHandler; import lombok.Data; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * MQTT双客户端配置 */ @Data @Configuration @EnableIntegration @ConfigurationProperties(prefix = "mqtt") public class MqttConfig { private Client client1 = new Client(); private Client client2 = new Client(); @Data public static class Client { private String url; private String username; private String password; private String subClientId; private String subTopic; private String pubClientId; } // ==================== 通用工厂方法 ==================== private DefaultMqttPahoClientFactory createFactory(Client c) { MqttConnectOptions opt = new MqttConnectOptions(); opt.setCleanSession(true); opt.setServerURIs(new String[]{c.getUrl()}); opt.setUserName(c.getUsername()); opt.setPassword(c.getPassword().toCharArray()); opt.setConnectionTimeout(10); opt.setKeepAliveInterval(60); opt.setAutomaticReconnect(true); DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(opt); return factory; } private MqttPahoMessageDrivenChannelAdapter createInbound(Client c, MessageChannel channel) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( c.getSubClientId(), createFactory(c), splitTopics(c.getSubTopic())); adapter.setQos(2); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(channel); adapter.setCompletionTimeout(5000); return adapter; } private MessageHandler createOutbound(Client c) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(c.getPubClientId(), createFactory(c)); handler.setDefaultQos(2); handler.setDefaultTopic("/default/topic"); handler.setAsync(true); handler.setCompletionTimeout(5000); return handler; } private String[] splitTopics(String topics) { if (topics == null || topics.isBlank()) return new String[0]; return java.util.Arrays.stream(topics.split(",")).map(String::trim).filter(t -> !t.isEmpty()).toArray(String[]::new); } // ==================== 客户端1 ==================== @Bean("mqttInputChannel1") public MessageChannel mqttInputChannel1() { return new DirectChannel(); } @Bean("mqttInboundAdapter1") public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter1() { return createInbound(client1, mqttInputChannel1()); } @Bean @ServiceActivator(inputChannel = "mqttInputChannel1") public MessageHandler messageHandler1(Mqtt1MessageHandler receiverMessageHandler) { return receiverMessageHandler; } @Bean("mqttOutboundChannel1") public MessageChannel mqttOutboundChannel1() { return new DirectChannel(); } @Bean("mqttOutboundHandler1") @ServiceActivator(inputChannel = "mqttOutboundChannel1") public MessageHandler mqttOutboundHandler1() { return createOutbound(client1); } /*@Bean("mqttOutboundFlow1") public IntegrationFlow mqttOutboundFlow1() { return IntegrationFlow.from(mqttOutboundChannel1()) .handle(mqttOutboundHandler1()) .get(); }*/ // ==================== 客户端2 ==================== @Bean("mqttInputChannel2") public MessageChannel mqttInputChannel2() { return new DirectChannel(); } @Bean("mqttInboundAdapter2") public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter2() { return createInbound(client2, mqttInputChannel2()); } @Bean @ServiceActivator(inputChannel = "mqttInputChannel2") public MessageHandler messageHandler2(Mqtt2MessageHandler receiverMessageHandler) { return receiverMessageHandler; } @Bean("mqttOutboundChannel2") public MessageChannel mqttOutboundChannel2() { return new DirectChannel(); } @Bean("mqttOutboundHandler2") @ServiceActivator(inputChannel = "mqttOutboundChannel2") public MessageHandler mqttOutboundHandler2() { return createOutbound(client2); } /*@Bean("mqttOutboundFlow2") public IntegrationFlow mqttOutboundFlow2() { return IntegrationFlow.from(mqttOutboundChannel2()) .handle(mqttOutboundHandler2()) .get(); }*/ }