Block the task until the queue is empty in Rust

I need to submit tasks between a set of tasks. std :: sync :: deque is enough to solve this problem, but I need to block the task if the queue is empty.

The following code (available in the GitHub gist ) is a working example of how to use std::sync::deque

:

extern crate time;

use std::io::timer::sleep;
use std::sync::deque::{BufferPool, Empty, Abort, Data};
use std::time::Duration;

fn main() {

  let start = time::precise_time_s();
  let pool = BufferPool::new();
  let (worker, stealer) = pool.deque();

  for task_id in range(1i, 5) {
    let sc = stealer.clone();
    spawn(proc() {
      loop {
        let elapse = time::precise_time_s() - start;
        match sc.steal() {
          Empty      => { println!("[{} @ {:#7.4}] No items", task_id, elapse); sleep(Duration::milliseconds(300)) },
          Abort      =>   println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse),
          Data(item) =>   println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item)
        }
      }
    });
  }

  for item in range(1i, 1000) {
    for n in range(1i, 20) {
      worker.push(item * n);
    }
    sleep(Duration::milliseconds(1000));
  }

}

      

I saw that std :: sync :: TaskPool exists , but the current implementation dispatches the job to the job even if the thread is busy with an older job.

My question is, what is the best way to block the task until there is any item in the queue?

+3


source to share


1 answer


In case of a possible solution, a semaphore is used:

extern crate time;

use std::io::timer::sleep;
use std::sync::deque::{BufferPool, Empty, Abort, Data};
use std::sync::{Semaphore, Arc};
use std::time::Duration;

fn main() {

  let start = time::precise_time_s();
  let pool = BufferPool::new();
  let (worker, stealer) = pool.deque();
  let sem = Arc::new(Semaphore::new(0));

  for task_id in range(1i, 5) {
    let sc = stealer.clone();
    let s = sem.clone();
    spawn(proc() {
      loop {
        let elapse = time::precise_time_s() - start;
        s.acquire();
        match sc.steal() {
          Empty      => {
              println!("[{} @ {:#7.4}] No items", task_id, elapse);
              sleep(Duration::milliseconds(300))
          },
          Abort      =>   {
              println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse);
              s.release();
          },
          Data(item) =>   println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item)
        }
      }
    });
  }

  for item in range(1i, 1000) {
    for n in range(1i, 20) {
      worker.push(item * n);
      sem.release();
    }
    sleep(Duration::milliseconds(1000));
  }

}

      



As you can see here, you release the semaphore resource for each value received and acquire it before getting the value from the queue. In this case, the return value will never be empty, but Abort is still possible and you must free the resource because nothing is read, but the value is still in the queue.

Another possible solution is also to use channels that block when there are no values ​​you want. For performance, you will have to compare both solutions.

+3


source







All Articles