Spring boot configure multiple ActiveMQ instances
I have a request to move messages from queues in one ActiveMQ instance to another ActiveMQ instance. Is there a way to connect to two different ActiveMQ instances using spring boot configuration?
Do I need to create multiple connectionFactories? If so, how does the JmsTemplate know which ActiveMQ instance to connect to?
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(JMS_BROKER_URL);
}
Any help and code examples are helpful.
Thanks in advance. GM
source to share
Besides @ Chris's answer, you should create different BrokerService instances using different ports and create different ConnectionFactory to connect to each broker and create different JmsTemplate using these different factories to send messages to different types of brokers.
For example:
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
public static final String LOCAL_Q = "localQ";
public static final String REMOTE_Q = "remoteQ";
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
@Bean
public BrokerService broker2() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5672");
broker.setBrokerName("broker2");
broker.setUseJmx(false);
return broker;
}
@Bean
@Primary
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
return connectionFactory;
}
@Bean
public QueueConnectionFactory jmsConnectionFactory2() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
return connectionFactory;
}
@Bean
@Primary
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory());
jmsTemplate.setDefaultDestinationName(LOCAL_Q);
return jmsTemplate;
}
@Bean
public JmsTemplate jmsTemplate2() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
jmsTemplate.setDefaultDestinationName(REMOTE_Q);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
@Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
To move messages from one AMQ instance to another instance, you can use JmsBridgeConnectors
:
Note that in the example below, you cannot have multiple consumers in the queue you want to forward messages from, because Camel or JmsBridgeConnectors are consuming the message and forwarding it. If you only need one copy of the message you are sending, you have several solutions: 1- Convert your queue to a topic, manage messages for offline consumers with strong followers or retroactive consumers. 2- convert your queue to a composite queue and use DestinationsInterceptors to copy messages to another queue. 3 - using NetworkConnector for Networkof brokers
@Bean
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:5671");
SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
OutboundQueueBridge bridge = new OutboundQueueBridge();
bridge.setLocalQueueName(LOCAL_Q);
bridge.setOutboundQueueName(REMOTE_Q);
OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
broker.setJmsBridgeConnectors(jmsConnectors);
broker.setBrokerName("broker");
broker.setUseJmx(false);
return broker;
}
Or using Camel like below:
@Bean
public CamelContext camelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
context.addRoutes(new RouteBuilder() {
public void configure() {
from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
}
});
context.start();
return context;
}
Your Producer should be like this to use different JMSTemplates:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer implements CommandLineRunner {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("jmsTemplate2")
private JmsTemplate jmsTemplate2;
@Override
public void run(String... args) throws Exception {
send("Sample message");
}
public void send(String msg) {
this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
}
}
and consumer:
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
public void receiveQueue(Session session, String text) {
System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
System.out.println(text);
}
}
source to share
You will need to create multiple instances JmsTemplate
as Beans
in your application, and then use a combination of @Qualifier
and @Primary
annotations to specify which instance JmsTemplate
should go there.
for example
@Bean("queue1")
@Primary
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){
...
}
@Bean("queue2")
@Primary
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){
...
}
...
@Autowired
@Qualifier("queue1")
private JmsTemplate queue1;
...
See here for details .
source to share
You can use Spring Boot's default queue consumer
@JmsListener(destination = "queue.name")
public void consumer(String message) {
// consume the message
}
And for the producer you can create another JmsTemplate @Bean
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(new ActiveMQConnectionFactory("tcp://localhost:5671"));
}
source to share