dask-jobqueue

08/06/18

My first major PR (pull request) to use dask (python parallel computing package) on the LSF (Load Sharing Facility) HPC (High Performance Computing).

https://github.com/dask/dask-jobqueue/pull/78

Update 8/25/19: This is being used on the fastest supercomputer in the world (Summit at Oak Ridge National Labs) - https://github.com/dask/dask-jobqueue/issues/328

I will start with a video of what dask is and what dask can do. The video is about 4 minutes along. Notice the line:

from dask_jobqueue import PBSCluster. This is from the dask_jobqueue project.

Pretty cool: Analyzing a terabyte of data in ~10/20 seconds!

For an introduction to dask I recommend reading Stephan Hoyer blog post of using dask with xarray which highlights dasks applications to the geosceinces.

The dask-jobqueue project works similar to dask distributed. It allows your python code to access processes (an instance of a computer program) and cores (another word for central processing units; execute computer programs). For example, the macbook I am typing this on has four cores. With dask distributed I am able to speed up analysis by using all four cores.

Contributing to dask-jobqueue has been a pleasure as the reviews have been helpful and thorough. It was my first non-documentation pull request and I learnt a lot in doing so. To other people who want to get into open source computing I recommend having confidence and don't be afraid to make a pull request even if you tried to solve something but got stuck. Just add 'WIP:' (work in progresses) in front of the title and ask for help. People are often very happy that you want to help them with their package.

The dask-jobqueue code repository has been setup great so it's easy to add other clusters (different HPC job schedulers see https://en.wikipedia.org/wiki/Job_scheduler#Batch_queuing_for_HPC_clusters). You just need to know what the submit script looks like. For example with LSF a submit script (sumbit.sh) looks like:

#!/bin/sh 
#BSUB -n 16 # Number of cores
#BSUB -J test # Job name
#BSUB -o test.out # Standard out file name
#BSUB -e test.err # Standard error file name
#BSUB -q parallel # Submit to the parallel queue
#BSUB -P hpc # Use credits associated with the 'hpc' project
mpirun.lsf ./mpi_example1.x # Application to run

The only other addition thing I had to add to my PR was a minor change to subprocces.Popen. This is a python module to allow you to run command line calls within python e.g. bsub < sumbit.sh which is how to submit jobs with LSF. The < required shell=True in subprocces.Popen. An easy fix for this was to add **kwargs to the Popen command.

Now, let's play with my newly created LSFCluster (this notebook used in this blog can be downloaded here):

Setting up the environment:

# On HPC make sure you have conda installed (https://sites.google.com/view/raybellwaves/pythonrsmas)
$ conda create -n djq_lsf python=3.6
$ conda activate djq_lsf
$ pip install git+https://github.com/dask/dask-jobqueue
$ conda install -c conda-forge psutil notebook ipywidgets bokeh pandas
# psutil: needed to stop an issue I was having with my setup
# notebook: we will run the code in the jupyter notebook  
# ipywidgets: this is used to allow us to submit jobs by pointing and clicking
# bokeh: required for dask dashboard 
http://dask.pydata.org/en/latest/diagnostics-distributed.html#dashboard
# pandas: required for dask distributed

Setting up the notebook:

$ jupyter notebook --generate-config
# This creates ~/.jupyter/jupyter_notebook_config.py
$ ipython
In [1]: from notebook.auth import passwd; passwd()
Enter password:
# Choose a password
# Copy the hash of that password to the password line in ~/.jupyter/jupyter_notebook_config.py: c.NotebookApp.password = u'sha1:blahblahblah' and uncomment the line.

Setting up the tunneling to view the notebook and dask dashboard on your laptop. You want to find the IP address where the dask dashboard is being served and serve your notebook on that IP address:

$ ipython
In [1]: from dask_jobqueue import LSFCluster

In [2]: from dask.distributed import Client

In [3]: cluster = LSFCluster(cores=2, memory='2 GB')

In [4]: client = Client(cluster)

In [5]: client
Out[5]: <Client: scheduler='tcp://10.10.0.14:42151' processes=0 cores=0>

The IP address shown here is 10.10.0.14. Use this address to serve the notebook:

$ jupyter notebook --no-browser --ip=10.10.0.14 --port=8888

# On your laptop tunnel this notebook as
$ ssh -N -L 8888:10.10.0.14:8888 USER@pegasus.ccs.miami.edu
# On your laptop open http://localhost:8888 (e.g. google chrome).

Now create/open the notebook:

# Open a notebook: New -> Python3
# The notebook text will look like (a new paragraph is a new cell):
from dask_jobqueue import LSFCluster
from dask.distributed import Client
import dask.dataframe as dd
import dask.array as da

cluster = LSFCluster(cores=15, memory='250GB', queue='bigmem', walltime='00:10')
cluster
# Click on 'Manual Scaling' and choose 10 workers. If you open another terminal on the HPC and type `bjobs` you should see you have 10 jobs submitted. Each has 15 cores and 250 GB of memory.

client = Client(cluster)
client
# Make a note of the port number for the dashboard. It will most likely by 8787 or a five digit number (e.g. 54611).

Use this port to tunnel the dashboard:

# Open a new terminal on your laptop and tunnel this as ssh -N -L 54611:10.10.0.14:54611 USER@pegasus.ccs.miami.edu
# On your laptop open http://localhost:54611 (e.g. google chrome). This will show you the dask dashboard.
# Click on the 'Workers' tabs on the dask dashboard to see the jobs submitted. But you probably want to stay on the 'Status' tab.

Back to the notebook:

# Create a terabyte dataset 
# 10 milliseconds for 10 years
# Each day has 8640000 data points and there are 3987 days in those 10 years
# That's 34447680000 time points.
# Create 3 columns so in total there are 103,343,040,000 data points
# one hundred and three billion, three hundred and forty-three million and forty thousand data points!
df = dd.demo.make_timeseries(start='2000-01-01',
                             end='2010-12-31',
                             dtypes={'x': float, 'y': float, 'id': int},
                             freq='10ms',
                             partition_freq='24h')
df

df.head()

# Load the data (persist) into memory
df = df.persist()

# Calculate the mean of the x and y columns for different ids
%time df.groupby('id')[['x', 'y']].mean().compute()

# Calculate a rolling 1 minute window; take the standard deviation for all the windows; remove the first and last day of data and find the index that corresponds to the maximum.
%time df.x.rolling('1min').std().loc['2000-01-02':'2010-12-30'].idxmax().compute()

# Calculate the singular vector decomposition
%time u, s, v = da.linalg.svd(df.values + 1)