ActiveMQ producer and consumer error with shared directory does not occur
We have two instances of ActiveMQ (version 5.10.0) and I am using shared storage to achieve HA. However, I do not see the ability to switch to another resource for the producer and consumer.
ActiveMQ broker-1 runs on IP1 and broker-2 on IP2 And under the activemq.xml configuration I have a modified save adapter to use the shared directory that is present on IP1.
<persistenceAdapter>
<kahaDB directory="\\IP1\shared-directory\for activemq\data"/>
</persistenceAdapter>
On both producer and consumer sides, I use the following JNDI configurations to get connections and create sessions etc.
jndi.properties
java.naming.factory.initial = ..........ActiveMQInitialContextFactory
java.naming.provider.url = failover:(tcp://IP1:61616,tcp://IP2:61616)?randomize=false
connectionFactoryNames = myConnectionFactory
queue.requestQ = my.RequestQ
The interesting part :
When I start this pair of brokers, I see one of the brokers become the host. When I run a producer that puts a message in Q (for example, a producer puts 100 messages in Q). While my producer is still working; I close the master broker, so the slave broker acquires the file lock and becomes the master. When I open the web console I can see that there are still 100 messages on Q. Even though the manufacturer is running, it no longer posts any messages to this Q.
Likewise for consumers. The consumer was collecting messages from Q, this Q said 100 messages were not loaded, when the master failed, now the master descends, the slave becomes master, I can see that 100 messages are still not loaded, but the consumer does not select any message from Q.
I waited a long time for them when switching to another resource (> 10 minutes). Can anyone suggest what configuration I am missing?
I will copy the producer and consumer instance as it is (I copied this from ActiveMQ in the action book with minor modifications).
Producer
public class Producer {
private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
private transient MessageProducer producer;
private static int count = 10;
private static int total;
private static int id = 1000000;
private String jobs[] = new String[] { "suspend", "delete" };
public Producer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
Producer producer = new Producer();
while (total < 1000) {
for (int i = 0; i < count; i++) {
producer.sendMessage();
}
total += count;
System.out.println("Sent '" + count + "' of '" + total
+ "' job messages");
try {
Thread.sleep(1000);
} catch (InterruptedException x) {
}
}
producer.close();
}
public void sendMessage() throws JMSException {
int idx = 0;
while (true) {
idx = (int) Math.round(jobs.length * Math.random());
if (idx < jobs.length) {
break;
}
}
String job = jobs[idx];
Destination destination = session.createQueue("JOBS." + job);
Message message = session.createObjectMessage(id++);
System.out.println("Sending: id: "
+ ((ObjectMessage) message).getObject() + " on queue: "
+ destination);
producer.send(destination, message);
}
}
Consumer
public class Consumer {
private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
private String jobs[] = new String[] { "suspend", "delete" };
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
Consumer consumer = new Consumer();
for (String job : consumer.jobs) {
Destination destination = consumer.getSession().createQueue(
"JOBS." + job);
MessageConsumer messageConsumer = consumer.getSession()
.createConsumer(destination);
messageConsumer.setMessageListener(new Listener(job));
}
}
public Session getSession() {
return session;
}
}
One more thing: I'm more interested in user opt-out than the manufacturer. One more note, the consumer stops and unexpectedly comes to the command line.
Thank. -JE
source to share
No one has answered this question yet
Check out similar questions: