Distributed TensorFlow - Some workers not working
I am trying to get a very simple example of how distributed TensorFlow works. However, I have an error that does not appear definitely between loops. It works fine on some launches. Outputting something line by line:
Worker 2 | step 0
Worker 0 | step 0
Worker 1 | step 0
Worker 3 | step 0
Worker 2 | step 1
Worker 0 | step 1
Worker 1 | step 1
Worker 3 | step 1
...
However, from time to time, one or more workers do not start, which leads to the following conclusions:
Worker 0 | step 0
Worker 3 | step 0
Worker 0 | step 1
Worker 3 | step 1
Worker 0 | step 2
Worker 3 | step 2
...
If I run the loop indefinitely, it seems that the absent workers always start at some point, but only after a few minutes, which is not practical.
I found that two things make the problem go away (but make the program useless): 1. Don't declare any tf variables inside the scope with tf.device(tf.train.replica_device_setter())
. If I even declare one variable (like nasty_var
below) the problem starts to crop up. and 2. setting the parameter is_chief
to tf.train.MonitoredTrainingSession()
- True
for all workers. This causes the error to go away even if the variables are declared, but it seems wrong to make all workers the boss. The way I am currently installing it below - is_chief=(task_index == 0)
- is taken directly from the TensorFlow tutorial.
Here is the simplest code I can get to reproduce the problem. (You may have to run multiple times to see the error, but it almost always shows up within 5 runs
from multiprocessing import Process
import tensorflow as tf
from time import sleep
from numpy.random import random_sample
cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],
'worker': ['localhost:2223',
'localhost:2224',
'localhost:2225',
'localhost:2226']})
def create_worker(task_index):
server = tf.train.Server(cluster, job_name='worker', task_index=task_index)
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
nasty_var = tf.Variable(0) # This line causes the problem. No issue when this is commented out.
with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):
for step in xrange(10000):
sleep(random_sample()) # Simulate some work being done.
print 'Worker %d | step %d' % (task_index, step)
def create_ps(task_index):
param_server = tf.train.Server(cluster, job_name='ps',
task_index=task_index)
param_server.join()
# Launch workers and ps in separate processes.
processes = []
for i in xrange(len(cluster.as_dict()['worker'])):
print 'Forking worker process ', i
p = Process(target=create_worker, args=[i])
p.start()
processes.append(p)
for i in xrange(len(cluster.as_dict()['ps'])):
print 'Forking ps process ', i
p = Process(target=create_ps, args=[i])
p.start()
processes.append(p)
for p in processes:
p.join()
I guess the reason here is the implicit coordination protocol in how it starts tf.train.MonitoredTrainingSession
, which is implemented here :
-
If this session is the main one:
- Run the variable initializer option.
-
Else (if this session is not master):
- Run op to check if the variables have been initialized.
- While any of the variables has not been initialized yet.
- Wait 30 seconds.
- Try creating a new session and check if the variables have been initialized.
(I discuss the rationale behind this protocol in the video on Distributed TensorFlow .)
When each session is master or there are no variables to initialize, it tf.train.MonitoredTrainingSession
always starts immediately. However, once there is one variable and you only have one boss, you will find that non-main workers have to wait for the boss to take action.
The reason for using this protocol is that it is robust against different processes, and the latency, which is very noticeable when running on just one process, is short compared to the expected running time of a typical distributed learning task.
Looking at execution again , it seems that this 30 second timeout should be configurable (since the argument is recovery_wait_secs
for tf.train.SessionManager()
), but there is currently no way to set this timeout on creation tf.train.MonitoredTrainingSession
as it uses a hard-coded set of arguments to create the session manager . This is similar to an oversight in an API, so please feel free to open a feature request on the GitHub page !
As mrry said, the problem exists because:
- The non-boss relies on the boss to initialize the model.
- If not initialized, it waits 30 seconds.
In terms of performance, there is no difference to wait for the boss in the next 30 years. However, I was doing research recently that required me to have a strict synchronized update and this issue needed to be taken care of.
The key point here is to use a barrier depending on your distributed setup. Suppose you are using thread-1 to start ps and thread-2 ~ 5 to start workers, then you only need:
- Instead of using MonitoredTrainingSession use tf.train.Supervisor which allows you to set recovery_wait_secs, default = 30 seconds. Change it to 1 second to shorten the waiting time.
sv = tf.train.Supervisor (is_chief = is_chief, LogDir = ... init_op = ... ... recovery_wait_secs = 1s)
sess = sv.prepare_or_wait_for_session (server.target, config = sess_config)
- Use a barrier . Let's say you are using streams:
Basically:
barrier = threading.Barrier(parties=num_workers)
for i in range(num_workers):
threads.append(threading.Thread(target=run_model, args=("worker", i, barrier, )))
threads.append(threading.Thread(target=run_model, args=("ps", 0, barrier, )))
In a valid learning function:
_ = sess.run([train_op], feed_dict=train_feed) barrier.wait()
Then it just continues. The barrier will make sure that all models reach this milestone and there are definitely no race conditions.