Kubernetes and Data Science

01/04/20

Creating scalable data science analysis using K8s

* Update as of 1/31. This project cost me $2.

What is Kubernetes?*

Kubernetes: Greek, meaning helmsman or pilot.

The Kubernetes website states "Kubernetes (K8s) is an open-source system for automating deployment, scaling, and management of containerized applications".

What does that mean? What are "containerized applications"?

To answer that we will switch to Docker. Docker defines a container as "A container is a standard unit of software that packages up code and all its dependencies so the application runs quickly and reliably from one computing environment to another".

* The best resource I found learning about K8s is this blog post by Jeremy Jordan (ML Engineer, Proofpoint) which links to other useful resources. The best overview I found is this comic by Scott McCloud. The best tutorial is 'K8s the hard way' by Kelsey Hightower which has over 19,000 stars on GitHub.

What is Docker?

Docker's tagline on the website is: "Securely build, share and run any application, anywhere".

The use of containers to deploy applications is called containerization. Containers aim to be:

  • Flexible: Even the most complex applications can be containerized
  • Lightweight: Containers leverage and share the host kernel, making them much more efficient in terms of system resources than virtual machines.
  • Portable: You can build locally, deploy to the cloud, and run anywhere.
  • Scalable: You can increase and automatically distribute container replicas across a datacenter

By contrast, a virtual machine (VM) runs a full-blown “guest” operating system with virtual access to host resources through a hypervisor. In general, VMs incur a lot of overhead beyond what is being consumed by your application logic.

What is a Helm?

Helm is the K8s package manager. Think of it like conda for python. It "helps you define, install, and upgrade even the most complex Kubernetes application".

It is worth looking at the Helm Hub to see which data science tools have helm charts. For example, I see dask, hadoop, jupyterhub, luigi and spark.

Container vs. Virtual Machine (VM). Taken from https://www.docker.com/resources/what-container

Kubernetes for Data Science

K8s can allow you to deploy your data science environment to the cloud and allow it to scale big. Here are some examples:

  • Open.ai talking about how they scaled up to 2,500 nodes using D15v2 and NC24 VMs (these VMs have ~20 cores so that's 20 * 2,5000 = 50,000 cores!)
  • NVIDIA showing how you can log deep learning experiments using GPUs.

K8s is cloud agnostic so you can develop workflows and easily deploy to any cloud resource.

Tools used in this blog post

Google Cloud Platform

Google Cloud Platform (GCP) gives you $300 of free credit. K8s was also developed at Google and is therefore one of the easiest cloud platforms to use it on, hence I am using it in this blog. However, Microsoft and Amazon have Azure Kubernetes Service and Elastic Kubernetes Service, respectively.

Dask

Dask provides advanced parallelism for analytics. It also integrates well with Python's Scientific Stack (see below), so if you know numpy, pandas and scikit-learn it's easy to speed-up your code using dask.

Python's Scientific Stack (taken from http://matthewrocklin.com/dask-website/)

Launch a Dask cluster and a Jupyter notebook server on cloud resources

1. Create a K8s Cluster

We will use the documentation in the zero-to-jupyterhub project.

  1. Log into GCP
  2. Create a project here and ensure billing is on.
  3. Install Cloud SDK (Command-line interface for Google Cloud Platform products and services) by using (on my linux machine):
$ echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
$ sudo apt-get install apt-transport-https ca-certificates gnupg
$ curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
$ sudo apt-get update && sudo apt-get install google-cloud-sdk

4. Install kubectl (command line interface for running commands against Kubernetes clusters):

$ sudo apt-get install kubectl

5. Update gcloud (https://cloud.google.com/kubernetes-engine/docs/how-to/managing-clusters) by doing:

$ gcloud components update

6. Login by running:

$ gcloud auth login

7. Connect to a project:

$ gcloud config set project PROJECT_ID

8. Enable the Kubernetes engine on the project in the online project page.

9. Create a managed Kubernetes cluster and a default node pool (Nodes represents hardware and a node pool will keep track of how much of a certain type of hardware that you would like. The nodes in the node pool within a cluster have the same configuration):

gcloud container clusters create \ # https://cloud.google.com/sdk/gcloud/reference/container/clusters/create
  --machine-type n1-standard-2 \ # https://cloud.google.com/compute/docs/machine-types (2 CPUs, 7.5Gb RAM)
  --num-nodes 2 \ # number of nodes to be created in each of the cluster's zones
  --zone us-central1-b \ # https://cloud.google.com/compute/docs/regions-zones/
  --cluster-version latest \
  k8scluster # Name of the cluster

9. Test if your cluster is initialized:

$ kubectl get node

2. Install helm

https://helm.sh/docs/intro/install/#from-script

$ curl https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bash

3. Install the dask helm chart

https://helm.dask.org/ (https://github.com/dask/helm-chart)

$ helm repo add dask https://helm.dask.org
$ helm repo update

The Helm deployment launches a dask-scheduler, several dask-worker processes, and a Jupyter server (https://docs.dask.org/en/latest/setup/kubernetes-helm.html#helm-install-dask).

If you want to get up and running you can do $ helm install my-release dask/dask. However, i'm going to edit the default values of the chart to include additional python packages and add more workers. It is also easy to edit the chart to add parallelized GPU support via dask-cuda but that is outside the scope of this blog.

I'll be adding gcsfs and python-snappy as i'll be reading a file from Google Cloud Storage. I'll also add pyarrow and fastparquet as i'll be reading a parquet file. I will add dask-ml as i'll be doing machine learning. Lastly, i'll add matplotlib for plotting.

The env: input of the chart looks like below. Remember to add the packages to the jupyter environment as well as the worker environment:

env:
  - name: EXTRA_CONDA_PACKAGES
    value: "gcsfs python-snappy pyarrow fastparquet dask-ml matplotlib -c conda-forge"

I'm going to request 10 workers, each with 1 CPU and 3.75Gb RAM, giving me ~10 cores and ~80 Gb memory. It is not uncommon to ask for 1,000 workers and the free tier account will allow you to do so.

Change the value of replicas: in worker: to: replicas: 10.

and use this chart by running:

$ helm install my-release -f values.yaml dask/dask

Check status by running:

$ kubectl get pods
$ kubectl get services

Run the commands that are displayed to get the IP address of the Jupyter notebook.

$ export DASK_SCHEDULER=$(kubectl get svc --namespace default my-release-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
$ export DASK_SCHEDULER_UI_IP=$(kubectl get svc --namespace default my-release-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
$ export DASK_SCHEDULER_PORT=8786
$ export DASK_SCHEDULER_UI_PORT=80

$ export JUPYTER_NOTEBOOK_IP=$(kubectl get svc --namespace default my-release-jupyter -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
$ export JUPYTER_NOTEBOOK_PORT=80

$ echo tcp://$DASK_SCHEDULER:$DASK_SCHEDULER_PORT               -- Dask Client connection
$ echo http://$DASK_SCHEDULER_UI_IP:$DASK_SCHEDULER_UI_PORT     -- Dask dashboard
$ echo http://$JUPYTER_NOTEBOOK_IP:$JUPYTER_NOTEBOOK_PORT       -- Jupyter notebook

Open the URL of Jupyter notebook (if the tab is empty wait a few minutes) and enter the password.

Open a Jupyter notebook.

Click on the dask extension to the left and copy the URL for the Dask dashboard. The tabs should turn orange. Click on the 'TASK STREAM' then drag the window that pops up to the right. Click on 'PROGRESS' then drag it below the 'TASK STREAM' window.

4. Data science

This code below is taken from Tom Augspurger's PyData NYC talk (code).

Import packages and setup the Cluster:

import gcsfs
from dask.distributed import Client
import dask.dataframe as dd
import sklearn.preprocessing
import sklearn.ensemble
import sklearn.pipeline
import joblib

client = Client()
client

Read in the parquet file (the NYC taxi dataset):

df = dd.read_parquet("gs://dask-nyc-taxi/yellowtrip.parquet",
                     engine="fastparquet",
                     storage_options={"token": "anon"})
df

Read in a subset of the data:

sdf = df.partitions[0].compute()
rides = sdf.head(200_000)

4.1 Parallelize scikit-learn across multiple machines

Create a classification dataset of whether the passenger tipped or not:

features = [
    'passenger_count', 'trip_distance',
    'pickup_longitude', 'pickup_latitude',
    'dropoff_longitude', 'dropoff_latitude',
    'fare_amount', 'tolls_amount', 'total_amount',
]

X = rides[features]
y = (rides['tip_amount'] > 0).astype(int)

Standardize the data and use a random forest classifier with 1,000 trees:

pipeline = sklearn.pipeline.make_pipeline(
    sklearn.preprocessing.StandardScaler(),
    sklearn.ensemble.RandomForestClassifier(
        n_estimators=1000, n_jobs=-1
    )
)

Use the dask back-end to joblib (see video at end) which will parallelize the computation across multiple machines:

with joblib.parallel_backend("dask"):
    pipeline.fit(X, y)

4.2 Parallelize search over specified parameter values for an estimator

Use the dask implementation of GridSearchCV:

import dask_ml.model_selection
import numpy as np
import pandas as pd

pipe = sklearn.pipeline.make_pipeline(
    sklearn.preprocessing.StandardScaler(), sklearn.linear_model.LogisticRegression()
)

param_grid = dict(
    logisticregression__C=np.linspace(0.001, 10, 50),
    logisticregression__fit_intercept=[True, False],
    logisticregression__tol=[0.0001, 0.001],
)

search = dask_ml.model_selection.GridSearchCV(pipe, param_grid, cv=3)

search.fit(X, y)

Find the best score:

search.best_score_

return all the fits:

cv_results = pd.DataFrame(search.cv_results_)
cv_results.sort_values(by='mean_test_score', ascending=False)

4.3 Find the best hyper-parameters using Hyperband

Hyperband is a relatively new technique for finding the best hyper-parameters of a model using adaptive resource allocation and early-stopping [1]. It will stop training estimators that perform poorly. It has been implemented in dask-ml as HyperbandSearchCV [2].

Create a small dataset that is (10,000, 2):

from sklearn.datasets import make_circles
%matplotlib inline

X, y = make_circles(n_samples=10_000, random_state=0, noise=0.09)

Plot it:

import pandas as pd

pd.DataFrame({0: X[:, 0], 1: X[:, 1], "class": y}).sample(4_000).plot.scatter(
    x=0, y=1, alpha=0.2, c="class", cmap="bwr"
);

Scale it:

X = sklearn.preprocessing.StandardScaler().fit_transform(X)

Use dask.delayed to create a larger dataset which has more samples and more features ((40,000,000, 62): 20 Gb):

import dask
import dask.array as da
from distributed import wait
import numpy as np
from sklearn.utils import check_random_state

@dask.delayed(nout=2)
def clone(X, y, seed=None, noise=0.9, n_random=60, n_repeats=4):
    random_state = check_random_state(seed)
    
    # Add some noise
    X = X + random_state.normal(scale=noise, size=X.shape)
    
    # Add random features
    random_feats = random_state.normal(0, 1, size=(X.shape[0], n_repeats))
    X = np.hstack((X, random_feats))
    
    # Replicate multiple times
    X = np.repeat(X, n_repeats, axis=0)
    y = np.repeat(y, n_repeats, axis=0)
    
    return X, y

n_random=60
n_repeats=4
shape = n_repeats * X.shape[0], X.shape[1] + n_random
dX = dask.delayed(X)
dy = dask.delayed(y)

Xs, ys = zip(*[clone(dX, dy, seed=i,
                    n_random=n_random,
                    n_repeats=n_repeats)
               for i in range(1000)])

Xs = [da.from_delayed(x, dtype=X.dtype, shape=shape) for x in Xs]
ys = [da.from_delayed(x, dtype=y.dtype, shape=(shape[0],)) for x in ys]

X_big = da.concatenate(Xs)
y_big = da.concatenate(ys)

X, y = client.persist([X_big, y_big])
wait(X);

Setup the model and the hyper-parameters to test:

from sklearn.neural_network import MLPClassifier

model = MLPClassifier()
params = {
    "hidden_layer_sizes": [
        (24, ),
        (12, 12),
        (6, 6, 6, 6),
        (4, 4, 4, 4, 4, 4),
        (12, 6, 3, 3),
    ],
    "activation": ["relu", "logistic", "tanh"],
    "alpha": np.logspace(-6, -3, num=1000),
    "batch_size": [16, 32, 64, 128, 256, 512],
} 

Input it to HyperbandSearchCV:

import dask_ml.model_selection

search = dask_ml.model_selection.HyperbandSearchCV(
    model,
    params,
    max_iter=15,
    patience=True,
)

search.fit(X, y, classes=[0, 1])

Find the best parameters:

search.best_params_

Return all results:

pd.DataFrame(search.cv_results_).sort_values("rank_test_score") 

5. Delete the cluster

https://cloud.google.com/kubernetes-engine/docs/how-to/deleting-a-cluster

$ gcloud container clusters delete k8scluster --zone us-central1-b

Future work

Another way to deploy dask on kubernetes is to use the kubernetes native approach via dask-kubernetes. This has the advantage of allowing the creation of a KubeCluster which can launch or kill workers via adaptive scaling. The process is more hands-on than using the helm chart as it involves writing a Dockerfile. There is an open issue in dask-kubernetes to attach a KubeCluster to an existing running cluster https://github.com/dask/dask-kubernetes/issues/185.