Subscriber.stopAsync () throws a RejectedExecutionException

My code is mostly following the official tutorials, and the main goal is to collect all posts from one subscription (Constants.UNFINISHEDSUBID) and re-post them on another. But I am currently facing a problem that I cannot solve. In my implementation, calling the caller .stopAsync () results in the following exception:

Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@6e13e898 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@2f3c6ac4
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@60d40af2 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@d55b6e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
    at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

      

I also noticed that a random event, sometimes all messages, and sometimes several or not one, are collected. Is caller calling .stopAsync () the wrong way?

My current implementation:

protected void pullUnfinished() throws Exception {
    List<PubsubMessage> jobsToRepublish = new ArrayList<>();
    SubscriptionName subscription =
            SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID);

    MessageReceiver receiver = new MessageReceiver() {
        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            synchronized(jobsToRepublish){
                jobsToRepublish.add(message);
            }
            String unfinishedJob = message.getData().toStringUtf8();
            LOG.info("got message: {}", unfinishedJob);
            consumer.ack();
        }
    };

    Subscriber subscriber = null;
    try {
        ChannelProvider channelProvider = new PlainTextChannelProvider();
        subscriber = Subscriber.defaultBuilder(subscription, receiver)
                               .setChannelProvider(channelProvider)
                               .build();
        subscriber.addListener(new Subscriber.Listener() {
            @Override
            public void failed(Subscriber.State from, Throwable failure) {
                System.err.println(failure);
            }
        }, MoreExecutors.directExecutor());
        subscriber.startAsync().awaitRunning();
        Thread.sleep(60000);
    } finally {
        if (subscriber != null) {
            subscriber.stopAsync(); //Causes the exception
        }
    }
    publishJobs(jobsToRepublish);
}

public class PlainTextChannelProvider implements ChannelProvider {

    @Override
    public boolean shouldAutoClose() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean needsExecutor() {
        // TODO Auto-generated method stub
        return false;
    }

      @Override
      public ManagedChannel getChannel() throws IOException {
        return NettyChannelBuilder.forAddress("localhost", 8085)
          .negotiationType(NegotiationType.PLAINTEXT)
          .build();
      }

      @Override
      public ManagedChannel getChannel(Executor executor) throws IOException {
        return getChannel();
      } 
}

      

0


source to share


1 answer


I had the same problem running similar code from a JUnit test and found this answer on in multithreading in general, assuming the ThreadPool is closed while listening Listeners are still referencing it. I also looked at the Subscriber.java code on GitHub and found an example for receiving multiple messages in JavaDoc on startAsync (), suggesting to wait until stopAsync () ends.

Try to change

subscriber.stopAsync();

      



to

subscriber.stopAsync().awaitTerminated();

      

Worked for me.

0


source







All Articles