DASK Example
DASK enables scalable analytics in Python.
More details here
Prepare
Conda environment
Create conda environment with Python version of your liking. Activate it
module purgemodule load anaconda3/2020.07conda create -p $(pwd)/cenv python=3.7conda activate <path_to_env>Install mpi4py
DASK rely on mpi4py to distribute jobs across many nodes.
This is python package shall be compiled (so, don't use 'conda install', use 'pip install') only after mpicc made available (more info).
# Load MPImodule load openmpi/intel/4.1.1# Compile mpy4pipip install mpi4py# You can also compile daskpip install daskpip install dask distributed --upgradepip install dask_mpiInstall conda packages
Conda packages come precompiled and thus are fast to install. They use MKL libraries and thus are efficient.
conda install numpy pandas scikit-learnRUN using sbatch
SVM example
file 'send'
Please change '<path_to_cenv>' to actual value
Run execution using 'sbatch send'.
Note: below we request 2*3=6 CPUs. However for workers we will have 4 (2 less)
#!/bin/bash#SBATCH --job-name=dask_joblib#SBATCH --nodes=2#SBATCH --cpus-per-task=1#SBATCH --ntasks-per-node=3#SBATCH --mem=8GB#SBATCH --time=1:00:00path_to_cenv="<path_to_cenv>"module purgemodule load openmpi/intel/4.1.1mpiexec --mca mpi_warn_on_fork 0 bash -c "module purge; openmpi/intel/4.1.1; module load anaconda3/2020.07; source /share/apps/anaconda3/2020.07/etc/profile.d/conda.sh; conda activate "$path_to_cenv"; export PATH="$path_to_cenv/bin:$PATH"; export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK; python -u mpi_gird_search.py"file mpi_gird_search.py
SKLEARN example based on https://scikit-learn.org/stable/auto_examples/model_selection/plot_grid_search_digits.html
Please notice interface='ib0', which is specifying to use infiniband (fast connection between nodes) instead of default (ethernet)
## ACTIVATE DASKimport osfrom dask_mpi import initializeinitialize(interface='ib0', local_directory = os.getenv('TMPDIR'))from dask.distributed import Client, progressif __name__ == '__main__': client = Client() # https://scikit-learn.org/stable/auto_examples/model_selection/plot_grid_search_digits.html from sklearn import datasets from sklearn.model_selection import train_test_split from sklearn.model_selection import GridSearchCV from sklearn.metrics import classification_report from sklearn.svm import SVC print(__doc__) # Loading the Digits dataset digits = datasets.load_digits() # To apply an classifier on this data, we need to flatten the image, to # turn the data in a (samples, feature) matrix: n_samples = len(digits.images) X = digits.images.reshape((n_samples, -1)) y = digits.target # Split the dataset in two equal parts X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.5, random_state=0) # Set the parameters by cross-validation tuned_parameters = [{'kernel': ['rbf'], 'gamma': [1e-3, 1e-4], 'C': [1, 10, 100, 1000]}, {'kernel': ['linear'], 'C': [1, 10, 100, 1000]}] scores = ['precision', 'recall'] for score in scores: print("# Tuning hyper-parameters for %s" % score) print() clf = GridSearchCV( SVC(), tuned_parameters, scoring='%s_macro' % score ) clf.fit(X_train, y_train) print("Best parameters set found on development set:") print() print(clf.best_params_) print() print("Grid scores on development set:") print() means = clf.cv_results_['mean_test_score'] stds = clf.cv_results_['std_test_score'] for mean, std, params in zip(means, stds, clf.cv_results_['params']): print("%0.3f (+/-%0.03f) for %r" % (mean, std * 2, params)) print() print("Detailed classification report:") print() print("The model is trained on the full development set.") print("The scores are computed on the full evaluation set.") print() y_true, y_pred = y_test, clf.predict(X_test) print(classification_report(y_true, y_pred)) print() # Note the problem is too easy: the hyperparameter plateau is too flat and the # output model is the same for precision and recall with ties in quality.Dask jobqueue
This is a note, aimed to save your time.
There is a way to run interactive multi-worker jobs: https://jobqueue.dask.org/en/latest/, which is not supported at our cluster. This approach relies on slurm's "--ntasks" parameter which is not allowed to use on NYU HPC cluster