How do you block threads?

I have a multithreaded C test that can be depicted like this:

Thread 1   Thread 2   Thread 3       Control thread

while(1)   while(1)    while(1)       while(1)
   |          |          |             
   |          |          |                |             
   |          |          |            every one second: 
   |          |          |               wait for other threads to be blocked
   |          |          |               do something with S values
   |          |          |                |             
   |          |          |                |             
 write S1    write S2   write S3          |
   |          |          |                |          
   |          |          |                |
 barrier     barrier   barrier         barrier

      

My problem is with the operator wait for other threads to be blocked

in the diagram above. So far I have come to the following solution to implement it:

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <inttypes.h>

#define NB_THREADS 11

pthread_barrier_t b;
uint8_t blocked_flags[NB_THREADS] = {0};
pthread_mutex_t blocked_flags_mutexes[NB_THREADS];
uint64_t states[NB_THREADS] = {0};

uint64_t time_diff_get(struct timespec *start, struct timespec *end) {
  uint64_t end_ns = end->tv_sec * 1E9 + end->tv_nsec;
  uint64_t start_ns = start->tv_sec * 1E9 + start->tv_nsec;
  uint64_t res = end_ns - start_ns;
  return res;
}

static void *worker_thread(void *arg) {
  uint8_t id = *((uint8_t *)arg);
  int a =  0;
  while(1) {
    for (int i = 0; i < 1000; i++) {
      a++;
    }
    states[id]++;
    pthread_mutex_lock(&blocked_flags_mutexes[id]);
    blocked_flags[id] = 1;
    pthread_mutex_unlock(&blocked_flags_mutexes[id]);
    pthread_barrier_wait(&b);
    pthread_mutex_lock(&blocked_flags_mutexes[id]);
    blocked_flags[id] = 0;
    pthread_mutex_unlock(&blocked_flags_mutexes[id]);
  }
  printf ("a = %d\n", a);
  return NULL;
}

static void *control_thread() {

  struct timespec last_time;
  clock_gettime(CLOCK_REALTIME, &last_time);

  while(1) {

    struct timespec time;
    clock_gettime(CLOCK_REALTIME, &time);
    if (time_diff_get(&last_time, &time) >= 1E9) {

      // Wait for all threads to be blocked
      for (int i = 0; i < NB_THREADS; i++) {
        while (1) {
          pthread_mutex_lock(&blocked_flags_mutexes[i]);
          if (blocked_flags[i] == 1) {
            pthread_mutex_unlock(&blocked_flags_mutexes[i]);
            break;
          }
          pthread_mutex_unlock(&blocked_flags_mutexes[i]);
        }
      }
      for (int i = 0; i < NB_THREADS; i++) {
        pthread_mutex_lock(&blocked_flags_mutexes[i]);
        if (blocked_flags[i] == 0) {
          printf("How could I avoid to be there ??\n");
          exit(-1);
        }
        pthread_mutex_unlock(&blocked_flags_mutexes[i]);
      }

      // Do some intersting stuff here with states array
      // .....
      // .....

      // Save last time
      clock_gettime(CLOCK_REALTIME, &last_time);
    }

    pthread_barrier_wait(&b);
  }
  return NULL;
}

int main() {

  // Init barrier
  pthread_barrier_init(&b, NULL, NB_THREADS + 1);

  // Create worker threads
  pthread_t threads[NB_THREADS];
  uint8_t ids[NB_THREADS];
  for (int i = 0; i < NB_THREADS; i++) {
    ids[i] = i;
    pthread_mutex_init(&blocked_flags_mutexes[i], NULL);
  }
  for (int i = 0; i < NB_THREADS; i++) {
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);
    CPU_SET(i + 1, &cpu_set);
    pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpu_set);
    pthread_create(&threads[i], &attr, worker_thread, &ids[i]);
  }

  // Create control thread
  pthread_t ctrl_thread;
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  cpu_set_t cpu_set;
  CPU_ZERO(&cpu_set);
  CPU_SET(0, &cpu_set);
  pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpu_set);
  pthread_create(&ctrl_thread, &attr, control_thread, NULL);

  // Join on worker threads
  for (int i = 0; i < NB_THREADS; i++) {
    pthread_join(threads[i], NULL);
  }

  return 0;
}

      

But running this benchmark compiled with gcc -O0

on an Intel 12 "processor clearly shows me that I have a" race "problem because the process always exits after a few seconds with a message. How can I solve this?

Note. Following other questions, I was thinking of using a custom barrier, but I need to continue to use pthread_barrier and not a barrier reverser implemented on top of mutex and cond variables.

+3


source to share


3 answers


Your code has an obvious race condition. When your threads are unblocked by waiting for a barrier then they reset the flag to zero. Before they do, their flag is still 1 for some time. A control thread can observe this deprecated value of 1 and think that the corresponding thread is already prepared for blocking, when in fact this thread is just about to clear the flag by just getting out of waiting for the barrier:

// worker thread
pthread_barrier_wait(&b);
// No longer blocked, but blocked_flags[id] is still 1.
// At this point, the control thread grabs the mutex, and observes the 1 value
// The mistake is thinking that 1 means "I'm about to block"; it actually
// means, "I'm either about to block on the barrier, or have just finished".
pthread_mutex_lock(&blocked_flags_mutexes[id]);
blocked_flags[id] = 0;
pthread_mutex_unlock(&blocked_flags_mutexes[id]);

      

This race condition is enough to sometimes fool the control thread that everyone has blocked, and thus get through its first loop. It then enters the second loop where it finds that not all flags are zero.

The crux of your problem is that you have repetitive circular parallel processing done by a herd of threads controlled by a barrier. However, in a loop, you only use a single obstacle, which means that the loop has only one phase. However, semantically, your loop is divided into two phases: threads block and unblock. The mechanism that you have created to distinguish these phases is not thread safe; and the obvious solution is to reuse the barrier to split the cycle into multiple phases.

POSIX barriers have a "serial stream" feature: one of the waiting threads is informed that it is special. This allows you to implement special phases in which only the serial thread performs some important action, and other threads can do something else, for example, call a barrier wait limit to proceed to the next phase. This should eliminate the need to implement hacks, such as flags, with which one thread tries to guess when other threads have become quiet.

NOTE: you cannot choose which thread is the sequential thread pending the POSIX barrier, so you cannot have a dedicated thread of control just for this action. Instead of N + 1 streams, you just use N streams. They all do the same thing, and when they reach the barrier; any of them can be said to be a serial stream. Based on this, the sequential stream executes alternate code compared to others.



So the chart timing:

while(1)   while(1)    while(1)       
   |          |          |             
   |          |          |          
   |          |          | 
   |          |          |   <---- WRITE PHASE  
   |          |          |  
   |          |          |             
   |          |          |                 
 write S1    write S2   write S3
   |          |          |           
   |          |          |      
 barrier     barrier   barrier 
   |          |          |        
   |          |          |     <--- CHECK PHASE
   |          |          |           
   |          |     serial thread!   
   |          |          |           
   |          |       next second?-- YES -> do something with S values!
   |          |          |  NO        |
   |          |          |            |
   |          |          +------------+ 
   |          |          | 
 barrier     barrier   barrier
   |          |          | 
   |          |          | 

back to top, next WRITE PHASE.

      

Here in CHECK PHASE

, the sequential thread (can be any of N threads) does a check: does the time to jump to the next second since the last jump to the next second? If so, it does something with the S values.

The barrier ensures that no other threads touch the values ​​in CHECK_PHASE

so that the serial thread doesn't need a mutex to work with the S values!
You have already paid for this synchronization with an additional barrier call in every cycle.

You may have an additional thread that provides a time base: its job is to sleep until the next second and then increment the counter. The serial thread only needs to check that this counter has increased (relative to its old value stored in another variable). Then take action and update the old counter to match the new one. This way, you don't need to call the operating system to get the current time in the main processing loop.

+1


source


Instead of storing a flag for each worker thread, you can mutex protect one counter, and each worker thread can increment that counter when it is about to block and decrement it after the barrier releases it. This would save you from waiting for the first thread to block, then the second thread, then the third, etc.

I don't see that your control thread is shutting down (other than unexpected) and the main thread doesn't seem to be waiting for it.



You might also like to create your control flow in front of the worker threads.

You might also want to synchronize the worker threads and the control flow so that they wait at the barrier before they are released and start their actual work!

0


source


I think it can happen like this:

  • in the first execution while(1)

    in control_thread (), time_diff_get(&last_time, &time)

    returns a value <1E9, so the thread runs directly into the barrier
  • now all worker threads will eventually enter the barrier.
  • after this has happened, control_thread()

    loop through it a second time and check immediatelyblocked_flags[i]

  • if this happens for at least one thread before that thread has the reset flag, you will have the behavior you expected.

Sorry, I cannot provide a solution at the moment, but if I understand the problem correctly, this is a good start for a solution.

0


source







All Articles