The guidelines given here to run linear and logistic regression with TensorFlow in a distributed setting are specific to the Urika-GX platform. However, they can be easily be generalised to any other platform by replacing the commands to start a job and the node names as needed.
From the TensorFlow website: A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph. A cluster can also be divided into one or more "jobs", where each job contains one or more tasks.
Therefore, in order to run TensorFlow in parallel, you will need to define a set of workers and one or more parameter servers. With the Urika-GX platform, each worker or parameter server can be set up through mrun
. Here is the bash script needed to run one parameter server and two workers on three different nodes (indicated by node1
, node2
, node3
):
mrun -n 1 -N 1 --cpus-per-task=36 --nodelist=node1 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node2:2222,node3:2222 \
--num_workers=2 \
--job_name=ps \
--task_index=0 \
> output-parameter-server.txt &
mrun -n 1 -N 1 --cpus-per-task=36 --nodelist=node2 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node2:2222,node3:2222 \
--num_workers=2 \
--job_name=worker \
--task_index=0 \
> output-first-worker.txt &
mrun -n 1 -N 1 --cpus-per-task=36 --nodelist=node3 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node2:2222,node3:2222 \
--num_workers=2 \
--job_name=worker \
--task_index=1 \
> output-second-worker.txt &
On Urika-GX, each job must be launched by mrun
. The -n 1
and -N 1
flags indicate the number of jobs to be started and the number of nodes to be used. --cpus-per-task
sets the number of CPUs to be used for each job and --nodelist
is used to specify the name of the node on which the job will run. Here, for simplicity, we assume to have three nodes named node1
, node2
, and node3
. The rest of the script can be used on any platform, as it is simply calling the Python script .
In this example we allocate 36 CPUs to each parallel job. TensorFlow automatically detects all the CPUs available on the same node and decides which part of the job to allocate to each one. However, we found that in practice the stochastic gradient descent algorithms implemented in TensorFlow for linear and logistic regression only use one or two CPUs. Therefore, it is more efficient to run multiple jobs on the same node, by assigning multiple jobs to the same node, with different ports. Here we give a simple example with one parameter server and two workers, all running on node one.
mrun -n 1 -N 1 --cpus-per-task=5 --shared --nodelist=node1 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node1:2223,node1:2224 \
--num_workers=2 \
--job_name=ps \
--task_index=0 \
> output-parameter-server.txt &
mrun -n 1 -N 1 --cpus-per-task=5 --shared --nodelist=node1 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node1:2223,node1:2224 \
--num_workers=2 \
--job_name=worker \
--task_index=0 \
> output-first-worker.txt &
mrun -n 1 -N 1 --cpus-per-task=5 --shared --nodelist=node1 \
python linear-classifier-parallel.py \
--ps_hosts=node1:2222 \
--worker_hosts=node1:2223,node1:2224 \
--num_workers=2 \
--job_name=worker \
--task_index=0 \
> output-second-worker.txt &
The linear-classifier-parallel.py
file will be a Python script containing:
main
function, in which the TensorFlow cluster is set up and different tasks are assigned to the parameter servers and workersdataset.shard()
method as explained below.The parser can be defined as follows:
import argparse
import sys
FLAGS = None
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
parser.add_argument("--ps_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs")
parser.add_argument("--worker_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs")
parser.add_argument("--num_workers", type=int, default=1, help="Total number of workers")
parser.add_argument("--job_name", type=str, default="", help="One of 'ps', 'worker'")
parser.add_argument("--task_index", type=int, default=0, help="Index of task within the job")
parser.add_argument("--l1", type=float, default=0.0, help="L1 regularisation strength")
parser.add_argument("--l2", type=float, default=0.0, help="L2 regularisation strength")
parser.add_argument("--batch_size", type=int, default=500, help="Batch size")
FLAGS, unparsed = parser.parse_known_args()
tf.logging.set_verbosity(tf.logging.WARN)
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
Notice that it can also be used to provide other optional inputs to the main function, such as the L1 and L2 regularisation strength or the batch size.
The main
function is called by the parser and is used to set up the cluster, divide the work between the parameter server and the workers and then run the main part of your code.
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
# Split up the tasks between the parameter servers and the workers
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Training and prediction go here
# ...
if FLAGS.task_index==0:
# Print output with only one worker
With Estimators, if the data is read from file, the only thing that changes between the code for sequential and parallel jobs is how the data is loaded by each worker. In order to specify which batch of data should be read by each worker, you can use the dataset.shard
method as described below. This will make sure that the first batch of data will be read by worker 0, the second one by worker 1, and so on.
def input_fn(data_file, num_epochs, shuffle, batch_size, buffer_size=1000):
# Create list of file names that match "glob" pattern (i.e. data_file_*.csv)
filenames_dataset = tf.data.Dataset.list_files(data_file)
# Read lines from text files
textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset)
# Parse text lines as comma-separated values (CSV)
dataset = textlines_dataset.map(parse_csv)
# Assign different batches to each worker
dataset = dataset.shard(FLAGS.num_workers, FLAGS.task_index)
if shuffle:
dataset = dataset.shuffle(buffer_size=buffer_size)
# We call repeat after shuffling, rather than before, to prevent separate epochs from blending together.
dataset = dataset.repeat(num_epochs)
# Get a batch of data of size batch_size
dataset = dataset.batch(batch_size)
return dataset
The full code can be found at https://github.com/acabassi/large-scale-regression/blob/master/tensorflow-linear-classifier-parallel.py and https://github.com/acabassi/large-scale-regression/blob/master/tensorflow-linear-regressor-parallel.py for logistic and linear regresion respectively.