Synchronization for a method used inside a stream run method

I want to do a parallel calculation for multiple sets of input values. Do I need to sync the method calculate(a, b, inputIndex)

?

private static final String FORMULA = "(#{a} + #{b}) * (#{a} + #{b} * #{b} - #{a})";
private List<Pair<Integer, Integer>> input = Arrays.asList(
        new ImmutablePair<>(1, 2),
        new ImmutablePair<>(2, 2),
        new ImmutablePair<>(3, 1),
        new ImmutablePair<>(4, 2),
        new ImmutablePair<>(1, 5)
);
private List<String> output = new ArrayList<>(Arrays.asList("", "", "", "", ""));

public void calculate() {
    IntStream.range(0, input.size()).forEach(idx -> {
        Pair<Integer, Integer> pair = input.get(idx);

        Thread threadWrapper = new Thread(
                () -> this.calculate(pair.getLeft(), pair.getRight(), idx)
        );
        threadWrapper.start();
    });

    try {
        Thread.sleep(4000); // waiting for threads to finish execution just in case
        System.out.println("Calculation result => " + output);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private void calculate(Integer a, Integer b, int inputIndex) {
    System.out.println("Thread with index " + inputIndex + " started calculation.");
    Evaluator eval = new Evaluator();
    eval.putVariable("a", a.toString());
    eval.putVariable("b", b.toString());

    try {
        String result = eval.evaluate(FORMULA);
        Thread.sleep(3000);
        output.set(inputIndex, result);
        System.out.println("Thread with index " + inputIndex + " done.");
    } catch (EvaluationException | InterruptedException e) {
        e.printStackTrace();
    }
}

      

Because if the method code calculate

was inside the method run

Runnable

, I didn't need to do it. (also i think i don't need synchronized collections because for input

i am only getting data by index, and for output

i am placing the item at a specific position)

+3


source to share


1 answer


It is important to emphasize that trying to code and get the right output is not enough to validate the program, especially if multithreading is not involved. In your case, this can happen by accident, for two reasons:

  • You have debug output statements i.e. System.out.println(…)

    , in your code that introduces thread synchronization as in the reference implementation, PrintStream

    synchronizes internally

  • Your code is simple and doesn't run long enough to optimize the JVM

Obviously, both reasons can go away if you use similar code in a production environment.


Even changing the method calculate(Integer a, Integer b, int inputIndex)

by synchronized

not enough to get the correct program Synchronization is sufficient only to establish connections between processes associated with synchronizing threads on the same object.

Your initiating method is calculate()

not synchronized with the instance this

, but it also does not perform any other action that might be sufficient to establish a computational relationship (such as a call Thread.join()

). calls Thread.sleep(4000)

, which obviously doesn't even guarantee that other threads have completed in that time. In addition, the Java Language Specification explicitly states:



It's important to note that neither Thread.sleep

, nor Thread.yield

has sync semantics. In particular, the compiler does not need to hide entries cached in registers into shared memory before calling Thread.sleep

or Thread.yield

, and the compiler does not need to reload values ​​cached in registers after calling Thread.sleep

or Thread.yield

.

For example, in the following (broken) code snippet, suppose this.done

- this field is not volatile

boolean

:

while (!this.done)
    Thread.sleep(1000);

      

     

The compiler is free to read the field this.done

only once and reuse the cached value each time the loop is executed. This would mean that the loop would never end, even if another thread changed the value this.done

.

Note that what was said in the example about this.done

also applies to the array elements of the backing array of your list. If you are using immutable instances String

, the effects can be even worse.


But you don't need to do the whole method synchronized

, only the data exchange needs to be thread safe. The cleanest solution is to make the whole method side-effect free, i.e. Turn the signature into String calculate(Integer a, Integer b)

, and let the method return a result instead of managing the overall data structure. If a method is free of side effects, it doesn't need synchronization.

The subscriber should collect the result values ​​in List

, but since you are already using the Stream API, this operation is provided free of charge:

private static final String FORMULA = "(#{a} + #{b}) * (#{a} + #{b} * #{b} - #{a})";
private List<Pair<Integer, Integer>> input = Arrays.asList(
        new ImmutablePair<>(1, 2),
        new ImmutablePair<>(2, 2),
        new ImmutablePair<>(3, 1),
        new ImmutablePair<>(4, 2),
        new ImmutablePair<>(1, 5)
);

public void calculate() {
    List<String> output = input.parallelStream()
            .map(pair -> this.calculate(pair.getLeft(), pair.getRight()))
            .collect(Collectors.toList());
    System.out.println("Calculation result => " + output);
}

private String calculate(Integer a, Integer b) {
    System.out.println(Thread.currentThread()+" does calculation of ("+a+","+b+")");
    Evaluator eval = new Evaluator();
    eval.putVariable("a", a.toString());
    eval.putVariable("b", b.toString());

    try {
        String result = eval.evaluate(FORMULA);
        Thread.sleep(3000);
        System.out.println(Thread.currentThread()+" with ("+a+","+b+") done.");
        return result;
    } catch (EvaluationException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}

      

+2


source







All Articles