Closing stuck in 2.0 when trying to add an item to the queue

We have a use case as below

1-) Start 2 start an instance as data nodes and insert data into cache.

2-) Create a queue and register a remote listener using remoteListen as shown below

//Queue creation         
CollectionConfiguration colCfg = new CollectionConfiguration(); 
colCfg.setCacheMode(PARTITIONED); 
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);



//Remote Listener Closure 
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() { 
                        @Override public boolean apply(CacheEvent evt) { 
                                System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']'); 
                Ignite ignite = Ignition.ignite(); 
                IgniteQueue<String> queue = ignite.queue(queueName, 0, null); 
                String key = evt.key(); 
                BinaryObject profile = (BinaryObject) evt.newValue(); 
                System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + 
                        ", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString()); 

                if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end 
                        && ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){ 
                    queue.add(profile.field("number")); 
                } 
                                return false; 
                        } 
                };       

Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr, 
                                EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); 

      

3-) Do some updates on the cache instances as shown below to get updates on the remote device.

void updateAnyProfile(Double newUsage){         
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10"); 
    List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll(); 
    Profile profile = res.iterator().next().getValue(); 
    profile.setUsage(newUsage); 
    profileCache.put(profile.getCtn(), profile); 
    profile.setUsage(newUsage+1); 
    profileCache.put(profile.getCtn(), profile); 

} 

      

4-) Take items from the queue.

 public void readFromQueue (String queueName) { 
    // Initialize new FIFO queue. 
    IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null); 
    while (true) { 
        String profile = queue.take(); 
        System.out.println("Profile from queue: " + profile.toString()); 
    } 
} 

      

Step 2,3,4 is run from different JVM instances with a client node TRUE. The problem is that applications freeze to perform any operation after executing the above script. Can you help us? We would really appreciate if you could tell us what we are doing wrong?

Below is the thread dump of the hanging datanode and the same datanode is hanging below the code

 IgniteQueue<String> queue = ignite.queue(queueName, 0, null);

      

Sometimes you can successfully update records and after the next update start to hang or even fail to perform an operation in the cache.

"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000] 
   java.lang.Thread.State: WAITING (parking) 
        at sun.misc.Unsafe.park(Native Method) 
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:952) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:303) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:297) 
        at org.apache.ignite.internal.GridEventConsumeHandler$2.onEvent(GridEventConsumeHandler.java:170) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806) 
        - locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:127) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:282) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:277) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$000(GridCacheIoManager.java:100) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:253) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$2100(GridIoManager.java:114) 
        at org.apache.ignite.internal.managers.communication.GridIoManager$7.run(GridIoManager.java:802) 
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483) 
        at java.lang.Thread.run(Thread.java:748) 

      

+3


source to share


1 answer


It is not possible to call the ignite.queue and ignite.affinity methods on the EventListener, as this could lead to a deadlock.

All cache operations, including EventListeners, are performed in the system pool, so it is not recommended to call an EventListener inside operations, which also uses the system pool.



You can read more here under Closing and Thread Pools: https://apacheignite.readme.io/docs/async-support#section-listeners-and-chaining-futures

And here https://apacheignite.readme.io/docs/thread-pools#section-system-pool

0


source







All Articles