Using volatility to avoid race conditions

I learn about java.util.concurrent

. I wrote code like this:

import java.util.concurrent.ArrayBlockingQueue;

public class JUCArrayBlockingQueue {

private static ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(1);
private static volatile int i = 0;
private static volatile int j = 0;

public static void main(String[] args) {
    new Pop("t1").start();
    new Pop("t2").start();
    new Push("p1").start();
    new Push("p2").start();
}

static class Pop extends Thread {
    public Pop(String name) {
        super(name);
    }

    @Override
    public void run() {

        String str;
        try {

            while (++j < 500) {
                str = abq.take();
                System.out.println(j + ">>"
                        + Thread.currentThread().getName() + " take: "
                        + str);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

static class Push extends Thread {
    public Push(String name) {
        super(name);
    }

    @Override
    public void run() {

        while (i < 500) {
            if (abq.offer(Thread.currentThread().getName() + " t" + i
                    + "  ~")) {
                i++;
            }
            ;
        }
    }
}
}

      

And the result will be like this:

488>>t1 take: p2 t486  ~
489>>t2 take: p2 t487  ~
490>>t2 take: p1 t488  ~
491>>t1 take: p1 t489  ~
492>>t1 take: p2 t490  ~
493>>t1 take: p1 t490  ~
494>>t2 take: p1 t492  ~
495>>t2 take: p1 t493  ~
496>>t1 take: p1 t494  ~
497>>t2 take: p1 t495  ~
498>>t1 take: p1 t496  ~
499>>t2 take: p1 t497  ~
500>>t1 take: p2 t498  ~

      

I am puzzled with this result because the left size is expected, but not the right size. I didn't expect duplicate values ​​to be shown on the right side. How can I fix this? Anyone help?

+3


source to share


2 answers


Let's take a look at the code snippet that prints the right side of your output:

if (abq.offer(Thread.currentThread().getName() + " t" + i
                    + "  ~")) {
       i++;
 }

      

Let's scale the condition in the if statement:

abq.offer(Thread.currentThread().getName() + " t" + i
                        + "  ~")

      

So you are using the suggestion method. Let's take a look at the java doc for the offer method:

public boolean offer (E e)

Inserts the specified element into the tail of this queue if it can be done immediately without exceeding the capacity of the queue, returning true on success and false if this queue is full. This method is usually preferable to the add (E) method, which may not insert an element just by throwing an exception.

A call to offer is not a blocking call. This means that multiple threads can call the offer method at the same time. This is most likely the problem:

  • t1

    suggests an item ArrayBlockingQueue

    .i = 0

  • p2

    immediately returns the item from the queue. i = 0

  • t2

    offers an item ArrayBlockingQueue

    even before it t1

    gets a chance to call i++

    . i=0

    ...
  • p2

    immediately returns the item from the queue. i = 0

    ...

You can see from 2) and 4) that there is a chance that two threads will read the value i

before being called i++

and thus see the same value i

.

So how do you fix this problem? Like others have suggested, you can use AtomicInteger

as described below:

The volatile keyword ensures that when reading a record in a variable (memory location) split across multiple threads, it is guaranteed that each thread sees the last value of the variable. Pay attention to the emphasis on . When you speak or , you do the reading together. This is not an atomic operation, and the keyword does not stop the thread from looking at inconsistent values. or

or

i++

j++

and

volatile

Change



private static volatile int i = 0; 

      

To:

private static final AtomicInteger i = new AtomicInteger();

      

AtomicInteger

provides atomic increment operations incrementAndGet()

(= ++i

) and getAndIncrement()

(= i++

) that can be used in this case:

int nr=0; // local holder for message number
while ((nr = i.getAndIncrement()) < 500) {
    abq.put(Thread.currentThread().getName() + " t" + nr + "  ~");
}

      

(1) Note the use of local variable nr

! This ensures that the result of the increment operation is used as the message number in the put(...)

. If we used i

directly instead nr

, other threads could be increasing i

in the meantime, and we would have a different message number or repeating message number!

(2) Also note what offer(...)

is replaced by put(...)

. Since your queue is limited (only for item 1 ), the insert operation should block if it runs out of capacity. ( offer(...)

will return immediately false

in this case.)

(3) Please note that this code does NOT ensure the correct insertion order according to the message number. This ensures that there are no duplicates! If the correct insertion order is required, you will have to use a completely different approach without AtomicInteger

and ArrayBlockingQueue

. Use an unsafe queue instead, and use the good old fashioned way synchronized

to increment and insert an atom:

while (true) {
    synchronized(queue) {
        if (i++ >= 500) break;
        queue.put(... + i + ...);
    }
}

      

The above code is ineffective. Almost all of the code is in sync. There is no real parallel execution and concurrency is useless.

You can change in a j

similar way.

+3


source


Making a variable volatile does not protect it from race conditions, it just indicates that the object the variable is bound to can mutate to prevent compiler optimizations, in your case this is not necessary because i and j are changing in the same thread ( loop thread). To overcome the race condition, you need to make the action i++

and j++

atom by synchronization, for example define them as an atomic integer and use incrementAndGet()

.



0


source







All Articles