Strange Hibernate behavior in asynchronous EJB application. Race condition?

BRIEF DESCRIPTION OF THE PROBLEM:

I am working on messages handling Java EE applications from financial markets. The application is deployed to the application server: Wildfly-8.2.0.Final .

The message flow in the application is illustrated by the following diagram:

MDB1
     \
      StrategyManager(@Singleton) -> StrategyRunner(@Singleton) -> SomeStrategy(@Singleton)
     /
MDB2

      

Asynchronously called EJB - SomeStrategy (@Singleton) performs Read operation | Update on JPA model defined as JPA objects: StrategyEntity- <ParamEntity [ParamEntity is in @ManyToOne, FetchType.EAGER, CascadeType.ALL relation to StrategyEntity].

StrategyManager and StrategyRunner are an EJB auxiliary decoupler, i.e. database maintenance form, core business logic contained in SomeStrategy (@Singleton) .

StrategyEntity is updated before and saved after Read operations | Updates performed by SomeStrategySingletonEJB.

@Slf4j
@Singleton
@Local
@Startup
public class StrategyRunner {

    @EJB
    private SomeStrategySingletonEJB someStrategySingletonEJB;

    @EJB
    private StrategyDao strategyDao;


    public void runStrategy(StrategyEntity strategyEntity, OrderBookAggregated orderBookAggregated) {
        strategyDao.refresh(strategyEntity);
        log.info("StrategyEntity after refresh: {}", startegyEntity);
        if (!strategyEntity.isRunning()) return;

        someStrategySingletonEJB.updateOnOrderBook(strategyEntity);
        log.info("StrategyEntity before save: {}", startegyEntity);
        strategyDao.save(strategyEntity);

    }

    public void runStrategy(StrategyEntity strategyEntity, OrderQueryReport orderQueryReport) {
        strategyDao.refresh(strategyEntity);
        log.info("StrategyEntity after refresh: {}", startegyEntity);
        if (!strategyEntity.isRunning()) return;
        someStrategySingletonEJB.updateOnExecutionReport(strategyEntity, orderQueryReport);
        log.info("StrategyEntity before save: {}", startsingletoneegyEntity);
        strategyDao.save(strategyEntity);
    }

}

      

The runStrategy () methods are called concurrently from the MDB. The default Singleton lock type is a WRITE lock, so methods should never run at the same time. The StrategyEntity is fetched from the database at the top level (StrategyManager), see DETAIL. The problem is that sometimes (very behind - once per 2000 messages on average) "StrategyEntity before save" is different "StrategyEntity after update"! It looks like some kind of race condition. If I replace strategyDao (RDBMS access) with simple cache implemented as EJB @Singleton the problem goes away. I am outputting an issue with Hibernate operations as an application after replacing the database tier Cache works fine under heavy workload . Do you have any ideas?

Hibernate properties:

<persistence-unit name="AlgorithmEnginePU">
    <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
    <jta-data-source>java:jboss/AlgorithmEngineDS</jta-data-source>
    <exclude-unlisted-classes>false</exclude-unlisted-classes>
    <!--<class>com.main.model.configuration.ExchangeInformation</class>-->
    <properties>

        <!-- Properties for Hibernate -->
        <property name="hibernate.default_schema" value="algorithm_engine"/>
        <property name="hibernate.dialect" value="org.hibernate.dialect.PostgreSQLDialect"/>
        <property name="hibernate.show_sql" value="false"/>
        <property name="hibernate.transaction.jta.platform" value="org.hibernate.service.jta.platform.internal.JBossAppServerJtaPlatform"/>
        <!--<property name="hibernate.hbm2ddl.auto" value="create" />-->
    </properties>
</persistence-unit>

      


DETAILED DESCRIPTION OF THE PROBLEM:

Here is the first MDB. When the StrategyManager is blocked, the received message is not queued or processed, and the onMessage () function returns without calling the EJB. Let me note that the serialized OrderBookEntity is only the transmitting object and is only used as a POJO.

@MessageDriven(name = "OrderBookTReceiver", activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/quotationData"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
        @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NonDurable")})
@DependsOn({"OrderBookManager"})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class OrderBookReceiver implements MessageListener{

    @EJB
    private StrategyManager strategyManager;

    @EJB
    private StrategyOrderBookLock strategyLock;

    @Override
    public void onMessage(Message message) {

        if (message instanceof ObjectMessage) {
            ObjectMessage objectMessage = (ObjectMessage) message;
            try {
                Serializable serializable = objectMessage.getObject();

                if (serializable instanceof OrderBookEntity) {
                    OrderBookEntity orderBookEntity = (OrderBookEntity) serializable;

                        if(strategyLock.tryLock()){
                            try {
                                strategyManager.updateOrderBook(orderBookEntity);
                            }finally{
                                strategyLock.unlock();
                            }
                        }
                }
            } catch (JMSException e) {
                log.error(e.toString());
            }


        }
    }
}

      

The second MDB takes other types of messages and enqueues them for processing on nested singletons as they follow the default WRITE_LOCK:

@Slf4j
@MessageDriven(name = "MessageReceiver", activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/tradeAgentReply"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class TradeAgentClientMDB implements MessageListener {

    @EJB
    private StrategyManager strategyManager;

    @Override
    public void  onMessage(Message message) {
        (...)

        try {

            TradeQueryReport tradeQueryReport = (TradeQueryReport) serializableMessage;

           if (tradeQueryReport instanceof OrderQueryReport ) {
                OrderQueryReport orderQueryReport = (OrderQueryReport) tradeQueryReport;
                strategyManager.updateOrder(orderQueryReport);
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

      

Here's the EJB (StrategyRunner @EJB is described in the BRIEF DESCRIPTION section):

@Slf4j
@Singleton
@Local
@Startup
public class StrategyManager {

    @Inject
    private StrategyDao strategyDao;

    @EJB
    private StrategyRunner strategyRunner;

    @Asynchronous
    public void updateOrderBook(OrderBookAggregated orderBookAggregated) {

        StrategyEntity strategy =  strategyDao.findStrategyByExchange(orderBookAggregated.getAccount().getExchangeEntity());

        if(strategyEntity == null) return;

        strategyRunner.runStrategy(strategyEntity, orderBookAggregated);
    }

    @Asynchronous
    public void updateOrder(OrderQueryReport orderQueryReport) {

        StrategyEntity strategy = strategyDao.findStrategyByOrder(orderQueryReport.getClientOrderId());

        if(strategy == null) return;

        strategyRunner.runStrategy(strategy, orderQueryReport);
    }

    (...)
}

@Singleton
@Local
@Startup
public class StrategyOrderBookLock {
    private java.util.concurrent.locks.Lock updateOrderBookLock = new ReentrantLock();


    @Lock(LockType.READ)
    public boolean tryLock() {
        return updateOrderBookLock.tryLock();
    }

    @Lock(LockType.READ)
    public void unlock() {
        updateOrderBookLock.unlock();
    }
}

      

+3


source to share


1 answer


The code itself looks pretty good and I can't find any obvious flaws. I remember I had a similar situation where the log output did not match what was in the database. Here is my situation:

  • Log output. It is not logged immediately; instead, the data was collected and a log event was added to the queue. This means that the log event has a hard link to the object.
  • Database insert / update
  • End of transaction
  • The second thread got an object from the cache.
  • The object has been changed
  • Another thread started processing log events
  • The log event has been converted to a string.

Because the object was modified prior to the last step, the log output was unexpected: the changes were leaked into the log after the transaction ended.



Decision. Make sure the registrar receives only unchanged data. A simple workaround is

log.info("StrategyEntity after refresh: {}", startegyEntity.toString());

      

0


source







All Articles