Spring Rollback AMQP / RabbitMQ transaction in EJB3 CMT
I am trying to test rollback of an EJB3 container initiated transaction that involves calling a Spring JPA repository and a message sender to RabbitMQ using Spring AMQP integration. After rolling back the CMT, I see that the DB transaction is returning, but the message is being delivered to the queue.
I am injecting a Spring bean into an EJB that makes a call to the Rabbit pattern and has a @Transactional annotation. I can see that the TransactionInterceptor commits a transaction after the Spring bean sends the message. I was hoping it was delegating the commit to the container.
Any suggestions / workarounds are appreciated. I was unable to figure out how to initialize the TransactionSynchronizationManager without using the @Transactional annotation.
Here is the code that commits the transaction when the Spring proxy bean executes the TransactionInterceptor code:
ResourceHolderSynchronization
@Override
public void afterCommit()
{
if (!shouldReleaseBeforeCompletion()){ -- this method returns false
processResourceAfterCommit(this.resourceHolder); -- this is calling commitAll
}
}
ConnectionFactoryUtils/RabbitResourceSynchronization
@Override
protected boolean shouldReleaseBeforeCompletion() {
return !this.transacted; -- transacted is true (channelTransacted)
}
RabbitResourceHolder
public void commitAll() throws AmqpException {
try {
for (Channel channel : this.channels) {
if (deliveryTags.containsKey(channel)) {
for (Long deliveryTag : deliveryTags.get(channel)) {
channel.basicAck(deliveryTag, false);
}
}
channel.txCommit();
}
} catch (IOException e) {
throw new AmqpException("failed to commit RabbitMQ transaction", e);
}
}
Here is my Spring config:
<tx:jta-transaction-manager/>
<tx:annotation-driven />
<rabbit:connection-factory id="rabbitConnectionFactory"
addresses="node01:5672,node02:5672" username="user.." password="pwd..." />
<bean id="rabbitTemplate"
class="org.arbfile.monitor.message.broker.api.rabbitmq.CustomRabbitTemplate" >
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="retryTemplate" ref="retryTemplate" />
<property name="exchange" value="user.example.exchange" />
<property name="queue" value="user.example.queue" />
<property name="routingKey" value="user.example.queue" />
<property name="channelTransacted" value="true" />
</bean>
source to share
No one has answered this question yet
Check out similar questions: