Faster detection of interrupted connections during PUT operation with Apache 'HttpClient'

I use Apache HttpClient 4 to communicate with the REST API and do lengthy PUT operations most of the time. Since this can happen as a result of an unstable internet connection, I need to determine if the connection is interrupted and may need to try again (with a prompt to resume).

To try out my routines in the real world, I started a PUT operation and then flipped my laptop's Wi-Fi switch, which caused an immediate complete interruption of any data stream. However, it takes a time-out (maybe 5 minutes or so) until a SocketException is thrown.

How to speed up the process? I would like to set the timeout to maybe around 30 seconds.

Update:

To clarify, my request is a PUT operation. Therefore, for a very long time (perhaps hours), the only operation is write (), and no read operations are performed. There is a timeout parameter for read () operations , but I could not find one for write operations.

I am using my own Entity implementation, and thus I am writing directly to the OutputStream, which will block almost immediately after the internet connection is interrupted. If the OutputStreams had a timeout parameter, so I could write out.write(nextChunk, 30000);

, I could discover such an issue myself. Actually I tried:

public class TimeoutHttpEntity extends HttpEntityWrapper {

  public TimeoutHttpEntity(HttpEntity wrappedEntity) {
    super(wrappedEntity);
  }

  @Override
  public void writeTo(OutputStream outstream) throws IOException {
    try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
      super.writeTo(wrapper);
    }
  }
}


public class TimeoutOutputStreamWrapper extends OutputStream {
  private final OutputStream delegate;
  private final long timeout;
  private final ExecutorService executorService = Executors.newSingleThreadExecutor();

  public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
    this.delegate = delegate;
    this.timeout = timeout;
  }

  @Override
  public void write(int b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b, off, len);
      return null;
    });
  }

  @Override
  public void close() throws IOException {
    try {
      executeWithTimeout(() -> {
        delegate.close();
        return null;
      });
    } finally {
      executorService.shutdown();
    }
  }

  private void executeWithTimeout(final Callable<?> task) throws IOException {
    try {
      executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
      throw new IOException(e);
    } catch (ExecutionException e) {
      final Throwable cause = e.getCause();
      if (cause instanceof IOException) {
        throw (IOException)cause;
      }
      throw new Error(cause);
    } catch (InterruptedException e) {
      throw new Error(e);
    }
  }
}

public class TimeoutOutputStreamWrapperTest {
  private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
  private TimeoutOutputStreamWrapper streamWrapper;
  private OutputStream delegateOutput;

  public void setUp(long timeout) {
    delegateOutput = mock(OutputStream.class);
    streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
  }

  @AfterMethod
  public void teardown() throws Exception {
    streamWrapper.close();
  }

  @Test
  public void write_writesByte() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation
    verify(delegateOutput).write(DEMO_ARRAY);
  }

  @Test(expectedExceptions = DemoIOException.class)
  public void write_passesThruException() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);
    doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation performed by expected exception
  }

  @Test(expectedExceptions = IOException.class)
  public void write_throwsIOException_onTimeout() throws Exception {
    // Setup
    final CountDownLatch executionDone = new CountDownLatch(1);
    setUp(100);
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        executionDone.await();
        return null;
      }
    }).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    try {
      streamWrapper.write(DEMO_ARRAY);
    } finally {
      executionDone.countDown();
    }

    // Evaluation performed by expected exception
  }

  public static class DemoIOException extends IOException {

  }
}

      

It's a bit tricky, but in my unit tests it works very well. And it works in real life too, except it HttpRequestExecutor

catches the exception on line 127 and tries to close the connection. However, when trying to close the connection, it first tries to clear the connection, which is blocked again.

I could dig deeper into the HttpClient and figure out how to prevent this flash operation, but this is not a pretty solution anymore and is about to get worse.

UPDATE

It looks like it cannot be done at the Java level. Can I do it on a different level? (I am using Linux).

+3


source to share


3 answers


Blocking I / O Java does not support socket timeout for write operations. You are completely at the mercy of the OS / JRE to unblock a write-blocked thread. Moreover, this behavior tends to be OS / JRE specific.



This might be a legitimate case to consider using a non-blocking I / O (NIO) based HTTP client such as the Apache HttpAsyncClient .

+5


source


You can configure the socket timeout using RequestConfig:

RequestConfig myRequestConfig = RequestConfig.custom()
    .setSocketTimeout(5000)  // 5 seconds
    .build();

      

When, when you make a call, just assign your new config. For example,



HttpPut httpPut = new HttpPut("...");
httpPut.setConfig(requestConfig);
...
HttpClientContext context = HttpClientContext.create();
....
httpclient.execute(httpPut, context);

      

For more information on timeout settings, there is a good explanation here.

0


source


She is one of the links I came across says connection eviction policy

: here

public static class IdleConnectionMonitorThread extends Thread {

private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;

public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
    super();
    this.connMgr = connMgr;
}

@Override
public void run() {
    try {
        while (!shutdown) {
            synchronized (this) {
                wait(5000);
                // Close expired connections
                connMgr.closeExpiredConnections();
                // Optionally, close connections
                // that have been idle longer than 30 sec
                connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
            }
        }
    } catch (InterruptedException ex) {
        // terminate
    }
}

public void shutdown() {
    shutdown = true;
    synchronized (this) {
        notifyAll();
    }
}}

      

I think you can look at this.

0


source







All Articles