Service provider ServiceMix + Camel

I am trying to send a messasge to an ActiveMQ queue from a web service that is deployed in ServiceMix 5.4.0. For this reason, I get the producer template from the camel context and send the message like this:

ProducerTemplate template = camelContext.createProducerTemplate();
template.send(
    "activemq://ExecutePaymentInputQueue",
    new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody(
                ...
            );
        }
    }
);

      

Relevant parts from the blueprint.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
    xmlns:jpa="http://aries.apache.org/xmlns/jpa/v1.1.0"
    xmlns:tx="http://aries.apache.org/xmlns/transactions/v1.0.0"
    xsi:schemaLocation="
  http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
  http://www.osgi.org/xmlns/blueprint-ext/v1.1.0 https://svn.apache.org/repos/asf/aries/tags/blueprint-0.3.1/blueprint-core/src/main/resources/org/apache/aries/blueprint/ext/blueprint-ext.xsd  
  http://cxf.apache.org/blueprint/jaxws http://cxf.apache.org/schemas/blueprint/jaxws.xsd
  http://cxf.apache.org/blueprint/jaxrs http://cxf.apache.org/schemas/blueprint/jaxrs.xsd
  http://cxf.apache.org/blueprint/core http://cxf.apache.org/schemas/blueprint/core.xsd
  http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd
  ">

    <!-- Transaction manager setup -->
    <reference id="jtaTransactionManager" interface="javax.transaction.TransactionManager"/>

    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <argument ref="jtaTransactionManager"/>
    </bean>

    <!-- Transaction policy setup -->
    <bean id="DefaultTransactionPolicy" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>

    <!-- BEGIN ActiveMQ -->

    <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
        <property name="userName" value="smx"/>
        <property name="password" value="smx"/>

        <property name="redeliveryPolicy">
            <bean class="org.apache.activemq.RedeliveryPolicy">
                <property name="maximumRedeliveries" value="360"/>
                <property name="redeliveryDelay" value="10000"/>
            </bean>
        </property>
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="activemqConnectionFactory"/>
        <property name="concurrentConsumers" value="8"/>
        <property name="maxConcurrentConsumers" value="16"/>
        <property name="transactionManager" ref="transactionManager"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig" />
    </bean>

    <!-- JMS component setup -->
    <!--<reference id="connectionFactory" interface="javax.jms.ConnectionFactory" />-->

    <!-- END ActiveMQ -->

    <!-- BEGIN SERVICE EXPORTS -->
    <service interface="com.bssys.ebpp.core.api.PaymentService" >
        <bean class="com.bssys.ebpp.core.services.impl.PaymentServiceImpl">
            <tx:transaction method="*" value="Required"/>
        </bean>
    </service>
    <!-- END SERVICE EXPORTS -->

    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/blueprint"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="http://camel.apache.org/schema/blueprint/camel-blueprint-2.13.2.xsd">

       ...

    </camelContext>

</blueprint>

      

There must be an open JTA transaction when the message is sent (see the config for declarative rams in the blueprint.xml file).

The problem is that the producer appears to be ignoring the jta transaction and the message is put on the destination queue outside of that transaction (immediately after the dispatch method returns). How can I change this behavior and force the producer to join the JTA transaction?

+3


source to share





All Articles