Parallel database inserts using Java streams

I have a Java program that needs to insert a large number of large rows into a SQL Server database. The number of lines is 800 thousand, and the size of each is about 200 bytes.

They are currently split into batches of 50 and then each batch is inserted using a single operator. (We confirmed during JTDS registration that there is one call to sp_exec per batch.) Setting the batch size from 25 to 250 has no significant effect, 50 is roughly optimal.

I experimented with dividing packets into (say) 5 groups and processing each group in parallel using threads. It's significantly faster - more than twice as fast with 5 threads.

My question is to make the use of a stream reliable. In particular, if any of the parties fails, an exception occurs. I want this exception to be caught and passed on to the caller, and I want to be 100% sure that the rest of the threads have finished (either aborted or terminated) before passing it on. Because when recovering from an exception later in the program, we don't want unexpected rows to arrive in the table.

Here's what I did:

/** Method to insert a single batch. */
private void insertBatchPostings(Collection<Posting> postings) throws PostingUpdateException
{
    // insert the batch using a single INSERT invokation
    // throw a PostingUpdateException if anything goes wrong
}

private static final int insertionThreads = 5;

/** Method to insert a collection of batches in parallel, using the above. */
protected void insertBatchPostingsThreaded(Collection<Collection<Posting>> batches) throws PostingUpdateException
{
    ExecutorService pool = Executors.newFixedThreadPool(insertionThreads);
    Collection<Future> futures = new ArrayList<Future>(batches.size());

    for (final Collection<Posting> batch : batches) {
        Callable c = new Callable() {
            public Object call() throws PostingUpdateException {
                insertBatchPostings(batch);
                return null;
            }            
        };
        /* So we submit each batch to the pool, and keep a note of its Future so we can check it later. */
        futures.add(pool.submit(c));
    }

    /* Pool is running, indicate that no further work will be submitted to it. */
    pool.shutdown();

    /* Check all the futures for problems. */
    for (Future f : futures) {
        try {
            f.get();
        } catch (InterruptedException ex) {
            throw new PostingUpdateException("Interrupted while processing insert results: " + ex.getMessage(), ex);
        } catch (ExecutionException ex) {
            pool.shutdownNow();
            throw (PostingUpdateException) ex.getCause();
        }
    }
}

      

By the time this returns, I want to ensure that all threads are inactive.

Questions

(I'm trying to figure out exactly what I'm asking for.)

  • Is the above code completely reliable as no thread inserts will work after returning insertBatchPostingsThreaded

    ?
  • Are there better and easier ways to use Java concurrency features to achieve this? My code looks ridiculously complex to me (raising suspicion for missing edges cases).
  • What is the best way to make it crash as soon as any thread crashes?

I'm not a natural Java programmer, so I'm hoping to get something that doesn't advertise this fact. :)

+3


source to share


1 answer


Guava Futures.successfulAsList

takes a list of futures as input and returns a future "whose value is a list containing the values โ€‹โ€‹of all of its successful input futures." You can call get()

on the generated Future

one and then review your original list in the future to check for any glitches.



+1


source







All Articles