Terminating a stream
I am trying to create a program that receives messages from various applications through a shared queue (as it is, the POSIX message queue) and sends those commands to the worker thread pool.
I have changed the design more than I remember in recent days, so instead of badly reinventing the wheel, I thought I would come here for advice, in particular on how to terminate the program.
I want the completion event to be received through the queue and acknowledged by the employee (so that the dispatcher doesn't have to worry about preprocessing the command, which might mean that the worker has to authenticate).
So I need the information to somehow flow back from the worker to the dispatcher to tell the latter to stop.
In my current iteration, the main thread is the dispatcher thread, receiving from the queue and exchanging signals (via condition variables) with workers to inform idle threads of pending commands. As I see it signals will have to enter an image:
- If the queue is not blocking, the dispatcher will call (in order)
mq_notify
,mq_receive
andsigwait
upon receiptEAGAIN
; - If the queue blocks, the dispatcher will simply call
mq_receive
, which will return with anEINTR
interrupted signal.
In both cases, the dispatcher will start while(atomic_load(state) != POOL_TERMINATING)
to be set by the worker, which authenticates the terminating command and triggers a signal.
Now for questions:
- Is there something I need to be concerned about when using signals and streams, other than not being a trivial exercise? ( sample )
- Is there some reason I should be concerned about the dispatcher's non-blocking queue since its only job is to forward messages?
- If there is, can someone cure me of my paranoid fear that a "new message" notification will come between
if (ret == EAGAIN)
andsigwait()
?
- If there is, can someone cure me of my paranoid fear that a "new message" notification will come between
- If I go to a blocking queue, can I do without a handler? That is, the worker sets
POOL_TERMINATING
, sends a signal, andmq_receive
just returns withEINTR
if the dispatcher received (I don't know enough about signals yet to see a way to tell the system to "interrupt the call, for examplemq_receive
, but don't interfere with the handlers", so I assume it will be empty handler). - Am I DoingItWrong ™?
Thanks in advance.
source to share
All credits for this go to the alc, which made me think twice about "needing" using signals (well, signals like in signal(7)
, since I used
pthread_cond_signal(3)
). In the end, I even managed to use a non-blocking queue without fear of missing a new message notification.
Some pseudocode: (the last code is functional, but ... not suitable for educational purposes).
DISPATCHER:
mq_open();
mq_notify(dispatcher_callback);
while (state != TERMINATING)
switch (state)
case WAITING:
while (dispatcher_event == NO_EVENT)
wait(dispatcher_event);
state = (dispatcher_event == NEW_MSG) ? RECEIVING :
(dispatcher_event == TERM) ? TERMINATING : WAITING;
case RECEIVING:
if (mq_receive() > -1)
signal(worker_event, NEW_MSG);
forward_command();
else if (errno == EAGAIN)
state = WAITING;
dispatcher_event = NO_EVENT;
worker_event = TERM;
broadcast(worker_event, TERM);
for (w ∈ workers)
pthread_join(w.id);
return;
WORKER:
while (state != TERMINATING)
switch (state)
case IDLE:
while (worker_event == NO_EVENT)
wait(worker_event);
state = (worker_event == NEW_MSG) ? BUSY :
(worker_event == TERM) ? TERMINATING : IDLE;
case BUSY:
worker_event = NO_EVENT;
worker_fetch_command();
worker_process_command();
state = IDLE;
return;
The event object consists of a conditional variable, a mutex, and (hence, calls to signal()
/ broadcast()
/ wait()
mixed with assignments and comparisons). while (!predicate_change) wait(predicate);
the idiom is for handling false awakenings.
The dispatcher and worker threads have a common "pool" object that contains two events (one for the dispatcher, one for the entire working pool) and a buffer where the command is taken from.
Not pictured:
- Corresponding
lock()
/unlock()
calls before and after read / write / signaling / waiting on these "events". - Another sync between
dispatcher_forward()
andworker_fetch_command()
. - These "state" variables are thread-local.
-
dispatcher_callback()
will call againmq_notify()
, then signalsdispatcher_event
by setting it toNEW_MSG
. -
process_command()
signalsdispatcher_event
when it recognizes and authenticates the "TERMINATE" command. - Error checking.
- ... Multiple calls
printf()
.
EDIT: After thinking about it a little more, I thought I could do without the dispatcher thread:
- workers can call
mq_receive()
without blocking; - the notification callback can signal / broadcast the event directly to the pool;
- the worker identifying the completion command can also broadcast the event to the pool instead of returning to the dispatcher.
source to share
The signal you are expecting in sigwait()
must be blocked (masked) out sigwait()
to ensure that it is not delivered at any other time.
Given that your dispatch loop is waiting for incoming messages, it is tempting to just block in mq_receive()
. HOWEVER, I don't see how you avoid the signal race when the worker thread that sees the end message signals this fact. Perhaps a mq_send()
special "STOP" message for the send cycle - perhaps by setting a flag POOL_TERMINATING
and sending an empty message?
You specify signaling with state variables to send messages to worker threads. This means that you have an internal queue protected by a mutex condition. If so, you can put your flag POOL_TERMINATING
under the mutex and also use it pthread_cond_broadcast()
to wake up all threads.
If you let the worker threads do mq_receive()
for themselves - which seems plausible enough - then you have a small problem signaling all of them to terminate. In this case, the main thread will wait to be signaled that the completion message has been received. Before collecting workers (I suppose) pthread_join (), you will need pthread_kill()
each one if it is locked in mq_receive()
. HOWEVER, it is not clear to me how you avoid the signal race ... and it starts to look very unpleasant. This may be one of (a few IMHO) times useful to use pthread_cancel()
... you can install workers PTHREAD_CANCEL_DISABLE
and PTHREAD_CANCEL_DEFERRED
and install PTHREAD_CANCEL_ENABLE
just before mq_receive()
. Or ... similar to the above, you could mq_send()
STOP messages for all workers.
Along the way, I think what is required pthread_prod()
, which is the intersection between pthread_kill()
and pthread_cancel()
... a kind of signal masked, except for the (during) cancellation points. This would summarize what it does pselect()
with his magical "atomic" shuffling of signal masks.