Fork/Join with Madagascar

Overview

The Fork/Join framework allows us to easily move SConstructs from an individual desktop to our clusters on campus.

This page outlines how to modify your SConstruct so that it can run on the cluster with little/no work involved on your part.

Fork/Join examples:

Examples of how to use the Fork/Join framework are located in:  cwp08/jgodwin/mio 

When should you use Fork/Join?  


Fork/Join is designed to parallelize loops in SConstructs.  The main reason because loops consist of doing the same action over and over to many different files.  This type of job is called "embarrassingly" parallel in the HPC community, because there is no need for communication between nodes, so we distribute the iterations of the loop to different nodes.    If you only have a few tasks or a loop that does not require a long time to run, then you might want to consider whether or not you really need to run on the cluster.   Fork/Join is a good idea if you have many iterations (over shots for example) that each take a long-time to run (many hours).

How do you use Fork/Join?


To start replace the line: from rsf.proj import * with from rsf.cluster import * .

NOTE: You must replace this line in all python modules that you use as well.  If you don't then you will be missing commands.

This overrides the default functions for Flow, Plot and Result and adds some additional ones.

Then you have to call Cluster().  The Cluster function sets defaults for all PBS jobs that you are going to create in your script, and sets some additional parameters.  In particular, cluster has the following parameters:

Cluster(name, email, cluster, time,ppn,nodetype)

name - the name of the job (string)
email - your email address (string)
cluster - the name of the cluster (string) OPTIONAL
time - the number of hours for all jobs if not otherwise specified (int) OPTIONAL
ppn - the number of processors per node (int) OPTIONAL (either 8 or 12)
nodetype - the node specific identifier you want to use (string) ('x5600' or 'fat' on Ra)

Once you have called Cluster you then use Fork to indicate where you want to start a parallel job.  Fork has a few arguments:

Fork(time=0,nodes=0,ipn=0,ppn=0)

time - the number of hours for each individual job to run (int)
nodes - the number of nodes for each individual job (int)
ipn - the number of iterations per node per job (int)  
       ipn is a special variable that controls how your jobs are distributed for example, if you parallelize a loop of 20 iterations, with 4 ipn, then you need 5 nodes for the job to finish in one set of jobs.
ppn - the number of processors per node (8 or 12) (int)  OPTIONAL
      ppn determines which types of nodes you are allowed to run on.

After you have called Fork, all subsequent calls to Flow, Plot and Result are assigned to parallel jobs (PBS jobs on the cluster).  To subdivide the work to different nodes you need to call Iterate.  Iterate takes no arguments, it only determines where to break jobs apart, so the placement is very particular.  See the examples below to understand how you can change the granularity of the parallelization based on where you place Iterate.

Iterate()

After the loop is finished, you call Join to indicate that the parallel section is finished.  Once the parallel section is finished the wrapper automatically continues with a non-parallel job.

Join()

The wrapper keeps track of job ordering, and job dependencies (i.e. the order of job execution).  You don't have to specify anything else.

Serial(time=0,ppn=0) - Create a new serial job. Useful for creating jobs that need to run beyond the normal time.

Save(filename) - Convert the file with the given name to a combined header binary file in the current directory.  This is useful for saving files for later storage or for transferring completed files from the cluster to your desktop for plotting.

Launching MPI jobs:


To launch MPI jobs, you need to add a few keyword arguments to your flow commands.  Flows have the following additional keywords:

mpi=True - This is an MPI job.
ppn=(8 or 12) (int) - Use a different number of ppn for this job.
time=(int) - The number of hours for this job.
np=(int) - The number of processes to launch for MPI.  If this is not specified, then default to one per node (e.g. nodes).
nodes=(int) - The number of nodes to use.
mpiopts=(string) - Any additional arguments for mpiexec.  Typically this is set to --bynode, so that processes are assigned by node instead of by slot, this assures a more even load distribution.


Flow(target,source,command,mpi=True,ppn=8,nodes=4,time=1,np=16,mpiopts='--bynode')

Make sure that you call End() as the last command of your SConstruct.  Fork/Join needs End() to finish making the jobs.

Simple example of Fork/Join:


 

Before

from rsf.proj import *








Flow('spike',None,'spike n1=200')

Result('spike','graph')







for i in range(20):

    Flow('spike-%d' % i, None, 'spike n1=100')

    Result('spike-%d' % i, None, 'graph')








for i in range(20):

    for j in range(10):

        Flow('spike-%d-%d' % (i,j) ,None,'spike n1=100 k1=%d' % (j))

        Result('spike-%d-%d' % (i,j),'graph')

        Iterate()












End()

 

With Fork/Join

from rsf.cluster import *


###################

# Have to call Cluster first

Cluster(name='simple',email='jgodwin@mines.edu',

        time=1, ppn=8)

###################


Flow('spike',None,'spike n1=200')

Result('spike','graph')


###################

# Demo of simple parallelization

###################


Fork(time=1,ipn=5,nodes=4)

for i in range(20):

    Flow('spike-%d' % i, None, 'spike n1=100')

    Result('spike-%d' % i, None, 'graph')

    Iterate()

Join()


###################

# Demo of nested parallelization

###################

Fork(time=1,ipn=5,nodes=4)

for i in range(20):

    for j in range(10):

        Flow('spike-%d-%d' % (i,j) ,None,'spike n1=100 k1=%d' % (j))

        Result('spike-%d-%d' % (i,j),'graph')

        Iterate()

    # You can put Iterate here too, but you get different parallelization

# If you put Iterate on the outside, you don't get any parallelization


Join()


###################

# Remember to call End

###################


End()


    

Running parallelized jobs

Once you have made the required changes to the SConstruct, then you just need to open up Mio, and run: cscons in the directory that your SConstruct is in.  cscons will then parse the SConstruct and create all the jobs in the pbs directory.  cscons will then output the command that you need to execute to start your jobs on the cluster.  Job execution is then automated, meaning that you only have to wait until all your jobs have finished.


Getting finished output from the cluster

Cscons will execute all Flow, Plot and Result commands as it would on your desktop.  Unfortunately, this means that if you want to change a plot you have to change the plotting command, and either resubmit the single job or resubmit all jobs in the job list.  This can be very tedious.  Instead, you can separate your Plot and Result commands from your main SConstruct and place them in another SConstruct in the same directory.  For each file that you are interested in plotting in the long-term, you should use the Save(...) function inside your SConstruct to combine the header and binary together.  

Once your job has completed on mio, run cscons get in your local directory on your desktop or laptop machine.  Cscons will transfer all combined header binary files to your desktop.  You can then use the plotting commands in your other SConstruct via: scons -f SConstruct-plot


Troubleshooting

Sometimes your jobs won't finish for various reasons.  In any case, you should receive an email that tells you whether your jobs are successfully finishing or terminating abnormally.  Once you know which jobs are not finishing, or failing to execute, you can look in the pbs folder for the job output and error logs.  By examining those error logs you can usually tell if something is wrong with your setup (i.e. you're missing files, or directories or a program isn't compiled correctly).    If your jobs are running out of time, you'll see "mpiexec ..." commands at the bottom of the error logs.  

Lastly, if you still can't figure out what's wrong with your files, you can view the job files themselves.  This is a somewhat tedious way to find out what's going on, but it lists all of the commands executed in the job script.

If you're not sure which job has failed, view pbs/jobs.txt .  This file contains a running list of jobs that have started and finished execution (or failed to finish execution).  You can find jobs that haven't finished by looking for job numbers that have started but do not have a corresponding end statement in the job file.  

Where are the job files stored?


All job files are stored in the pbs directory in your working directory.

What does Fork/Join do?


Fork/Join overrides the Madagascar Flow, Plot and Result commands.  It grabs the text output that SCons would use to execute the commands, in order, and then determines which commands should be mapped to which nodes.  Lastly, it creates the PBS job file wrappers that are used by the cluster.

Fixing the PBS job file

You can modify the PBS job file produced by Fork/Join by editing cwp08/iCode/cluster.py .  Look for the function: createPBSfile .
Č
Ċ
ď
Jeff Godwin,
Feb 9, 2011, 10:25 AM
Comments