Possible bug with Mule AMQP transport community 3.6.2

I'm tracking down some strange behavior and I finally managed to isolate it and I believe this is a bug. To reproduce the behavior, I created 4 test streams (in 4 different mule files):

<flow name="testhttpFlow">
    <http:listener config-ref="HTTP_Listener_AMIAB" path="/testsend" allowedMethods="GET, POST" doc:name="HTTP"/>
    <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="masterMeep" exchangeType="direct" responseTimeout="10000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
</flow>

<flow name="testsendFlow" >
    <amqp:inbound-endpoint responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP-0-9" connector-ref="connector.amqp.mule.default" exchangeName="AMQP.DEFAULT.EXCHANGE" exchangeType="direct" queueDurable="true" queueName="masterMeep"/>
    <set-payload value="#[new java.util.ArrayList()]" doc:name="Set Payload to Collection"/>
    <scripting:component doc:name="JavaScript">
        <scripting:script engine="JavaScript"><![CDATA[payload.add("something1");
        payload.add("something2");
        result=payload;]]>
    </scripting:script>
    </scripting:component>
    <set-variable variableName="payloadCollection" value="#[payload]" doc:name="Set payloadCollection"/>
    <foreach doc:name="For Each">
        <choice doc:name="Choice">
            <when expression="payload == 'something1'">
                <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="meep" exchangeType="direct" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9 meep1"/>
            </when>
            <when expression="payload == 'something2'">
                <amqp:outbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" routingKey="meep2" exchangeType="direct" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP-0-9 meep2" connector-ref="connector.amqp.mule.default"/>
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Logger"/>
            </otherwise>
        </choice>
        <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="To String"/>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <scripting:transformer doc:name="Replace array element">
            <scripting:script engine="JavaScript"><![CDATA[var payload = message.getPayload();
                var payloadCollection = message.getInvocationProperty('payloadCollection');
                var counter = message.getInvocationProperty('counter');
                payloadCollection.set(counter - 1, payload);
                result = payload;]]></scripting:script>
        </scripting:transformer>
    </foreach>
    <logger level="INFO" doc:name="Logger" message="#[payload]"/>
</flow>

<flow name="testreceiveFlow">
    <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="meep"  exchangeType="direct" queueDurable="true" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Set Payload"/>
    <set-variable variableName="subPayload" value="#[payload]" doc:name="Variable"/>
   <http:request config-ref="HTTP_Request_Polestar" path="/api/query/ServiceProxyConfigurationLookup" method="GET" doc:name="HTTP Get ConfigXml" followRedirects="true">
        <http:request-builder>
            <http:query-param paramName="ApiName" value="customer"/>
            <http:query-param paramName="Action" value="GET"/>
            <http:query-param paramName="Section" value="TestGPO"/>
        </http:request-builder>
    </http:request>
    <json:json-to-object-transformer doc:name="JSON to Object" returnClass="java.lang.Object"/>
    <set-variable variableName="serviceProxyConfigurationLookup" value="#[payload]" doc:name="Variable"/>
    <set-payload value="#[payload[0].ServiceProxyConfiguration.ConfigurationList + &quot;\nTestReceive\n&quot; + flowVars.subPayload]" doc:name="Set Payload"/>
    <logger level="INFO" doc:name="Logger" message="&quot;Configuration XML =\n #[payload]&quot;"/>
</flow>

<flow name="testreceiveFlow2">
    <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="meep2"  exchangeType="direct" queueDurable="true" responseTimeout="60000" exchange-pattern="request-response" connector-ref="connector.amqp.mule.default" doc:name="AMQP-0-9"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Set Payload"/>
    <set-variable variableName="subPayload" value="#[payload]" doc:name="Variable"/>
   <http:request config-ref="HTTP_Request_Polestar" path="/api/query/ServiceProxyConfigurationLookup" method="GET" doc:name="HTTP Get ConfigXml" followRedirects="true">
        <http:request-builder>
            <http:query-param paramName="ApiName" value="customer"/>
            <http:query-param paramName="Action" value="GET"/>
            <http:query-param paramName="Section" value="TestGPO"/>
        </http:request-builder>
    </http:request>
    <json:json-to-object-transformer doc:name="JSON to Object" returnClass="java.lang.Object"/>
    <set-variable variableName="serviceProxyConfigurationLookup" value="#[payload]" doc:name="Variable"/>
    <set-payload value="#[payload[0].ServiceProxyConfiguration.ConfigurationList + &quot;\nTestReceive2\n&quot; + flowVars.subPayload]" doc:name="Set Payload"/>
    <logger level="INFO" doc:name="Logger" message="&quot;Configuration XML =\n #[payload]&quot;"/>
</flow>

      

There was originally no testhttp thread. The endpoint connector in the testsend thread was an HTTP connector and it worked fine. Immediately I added an additional front-end stream by moving the HTTP connector to that stream and adding an additional AMQP request-AMQP request (which is therefore held pending the completion of the internal AMQP request requests), the AMQP request-response internal connections stopped working ...

Now they just hang until they time out and then come back with NullPayload.

This doesn't sound like the correct bahavir for me, and it feels like there is some mistake with two AMQP Request-Response "layers" happening at the same time.

Has anyone gotten this template correctly?

Thank!

+3


source to share





All Articles