DASK Example

DASK enables scalable analytics in Python.

More details here

Prepare

Conda environment

Create conda environment with Python version of your liking. Activate it

module purgemodule load anaconda3/2020.07conda create -p $(pwd)/cenv python=3.7conda activate <path_to_env>

Install mpi4py

DASK rely on mpi4py to distribute jobs across many nodes.

This is python package shall be compiled (so, don't use 'conda install', use 'pip install') only after mpicc made available (more info).

# Load MPImodule load openmpi/intel/4.1.1# Compile mpy4pipip install mpi4py# You can also compile daskpip install daskpip install dask distributed --upgradepip install dask_mpi

Install conda packages

Conda packages come precompiled and thus are fast to install. They use MKL libraries and thus are efficient.

conda install numpy pandas scikit-learn

RUN using sbatch

SVM example

file 'send'

Please change '<path_to_cenv>' to actual value

Run execution using 'sbatch send'.

Note:  below we request 2*3=6 CPUs. However for workers we will have 4 (2 less)

#!/bin/bash#SBATCH --job-name=dask_joblib#SBATCH --nodes=2#SBATCH --cpus-per-task=1#SBATCH --ntasks-per-node=3#SBATCH --mem=8GB#SBATCH --time=1:00:00path_to_cenv="<path_to_cenv>"module purgemodule load openmpi/intel/4.1.1mpiexec --mca mpi_warn_on_fork 0 bash -c "module purge;                                          openmpi/intel/4.1.1;                                          module load anaconda3/2020.07;                                          source /share/apps/anaconda3/2020.07/etc/profile.d/conda.sh;                                          conda activate "$path_to_cenv";                                          export PATH="$path_to_cenv/bin:$PATH";                                          export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK;                                          python -u mpi_gird_search.py"

file mpi_gird_search.py

SKLEARN example based on https://scikit-learn.org/stable/auto_examples/model_selection/plot_grid_search_digits.html

Please notice interface='ib0', which is specifying to use infiniband (fast connection between nodes) instead of default (ethernet)

## ACTIVATE DASKimport osfrom dask_mpi import initializeinitialize(interface='ib0', local_directory = os.getenv('TMPDIR'))from dask.distributed import Client, progressif __name__ == '__main__':  client = Client()  #  https://scikit-learn.org/stable/auto_examples/model_selection/plot_grid_search_digits.html  from sklearn import datasets  from sklearn.model_selection import train_test_split  from sklearn.model_selection import GridSearchCV  from sklearn.metrics import classification_report  from sklearn.svm import SVC  print(__doc__)  # Loading the Digits dataset  digits = datasets.load_digits()  # To apply an classifier on this data, we need to flatten the image, to  # turn the data in a (samples, feature) matrix:  n_samples = len(digits.images)  X = digits.images.reshape((n_samples, -1))  y = digits.target  # Split the dataset in two equal parts  X_train, X_test, y_train, y_test = train_test_split(      X, y, test_size=0.5, random_state=0)  # Set the parameters by cross-validation  tuned_parameters = [{'kernel': ['rbf'], 'gamma': [1e-3, 1e-4],                       'C': [1, 10, 100, 1000]},                      {'kernel': ['linear'], 'C': [1, 10, 100, 1000]}]  scores = ['precision', 'recall']  for score in scores:    print("# Tuning hyper-parameters for %s" % score)    print()    clf = GridSearchCV(        SVC(), tuned_parameters, scoring='%s_macro' % score    )    clf.fit(X_train, y_train)    print("Best parameters set found on development set:")    print()    print(clf.best_params_)    print()    print("Grid scores on development set:")    print()    means = clf.cv_results_['mean_test_score']    stds = clf.cv_results_['std_test_score']    for mean, std, params in zip(means, stds, clf.cv_results_['params']):        print("%0.3f (+/-%0.03f) for %r"              % (mean, std * 2, params))    print()    print("Detailed classification report:")    print()    print("The model is trained on the full development set.")    print("The scores are computed on the full evaluation set.")    print()    y_true, y_pred = y_test, clf.predict(X_test)    print(classification_report(y_true, y_pred))    print()  # Note the problem is too easy: the hyperparameter plateau is too flat and the  # output model is the same for precision and recall with ties in quality.

Dask jobqueue

This is a note, aimed to save your time

There is a way to run interactive multi-worker jobs: https://jobqueue.dask.org/en/latest/, which is not supported at our cluster. This approach relies on slurm's "--ntasks" parameter which is not allowed to use on NYU HPC cluster