Creating scalable data science analysis using K8s
* Update as of 1/31. This project cost me $2.
Kubernetes: Greek, meaning helmsman or pilot.
The Kubernetes website states "Kubernetes (K8s) is an open-source system for automating deployment, scaling, and management of containerized applications".
What does that mean? What are "containerized applications"?
To answer that we will switch to Docker. Docker defines a container as "A container is a standard unit of software that packages up code and all its dependencies so the application runs quickly and reliably from one computing environment to another".
* The best resource I found learning about K8s is this blog post by Jeremy Jordan (ML Engineer, Proofpoint) which links to other useful resources. The best overview I found is this comic by Scott McCloud. The best tutorial is 'K8s the hard way' by Kelsey Hightower which has over 19,000 stars on GitHub.
Docker's tagline on the website is: "Securely build, share and run any application, anywhere".
The use of containers to deploy applications is called containerization. Containers aim to be:
By contrast, a virtual machine (VM) runs a full-blown “guest” operating system with virtual access to host resources through a hypervisor. In general, VMs incur a lot of overhead beyond what is being consumed by your application logic.
Helm is the K8s package manager. Think of it like conda for python. It "helps you define, install, and upgrade even the most complex Kubernetes application".
It is worth looking at the Helm Hub to see which data science tools have helm charts. For example, I see dask, hadoop, jupyterhub, luigi and spark.
K8s can allow you to deploy your data science environment to the cloud and allow it to scale big. Here are some examples:
K8s is cloud agnostic so you can develop workflows and easily deploy to any cloud resource.
Google Cloud Platform (GCP) gives you $300 of free credit. K8s was also developed at Google and is therefore one of the easiest cloud platforms to use it on, hence I am using it in this blog. However, Microsoft and Amazon have Azure Kubernetes Service and Elastic Kubernetes Service, respectively.
Dask provides advanced parallelism for analytics. It also integrates well with Python's Scientific Stack (see below), so if you know numpy, pandas and scikit-learn it's easy to speed-up your code using dask.
We will use the documentation in the zero-to-jupyterhub project.
$ echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list$ sudo apt-get install apt-transport-https ca-certificates gnupg$ curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -$ sudo apt-get update && sudo apt-get install google-cloud-sdk4. Install kubectl (command line interface for running commands against Kubernetes clusters):
$ sudo apt-get install kubectl 5. Update gcloud (https://cloud.google.com/kubernetes-engine/docs/how-to/managing-clusters) by doing:
$ gcloud components update6. Login by running:
$ gcloud auth login7. Connect to a project:
$ gcloud config set project PROJECT_ID8. Enable the Kubernetes engine on the project in the online project page.
9. Create a managed Kubernetes cluster and a default node pool (Nodes represents hardware and a node pool will keep track of how much of a certain type of hardware that you would like. The nodes in the node pool within a cluster have the same configuration):
gcloud container clusters create \ # https://cloud.google.com/sdk/gcloud/reference/container/clusters/create --machine-type n1-standard-2 \ # https://cloud.google.com/compute/docs/machine-types (2 CPUs, 7.5Gb RAM) --num-nodes 2 \ # number of nodes to be created in each of the cluster's zones --zone us-central1-b \ # https://cloud.google.com/compute/docs/regions-zones/ --cluster-version latest \ k8scluster # Name of the cluster9. Test if your cluster is initialized:
$ kubectl get nodehttps://helm.sh/docs/intro/install/#from-script
$ curl https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bashhttps://helm.dask.org/ (https://github.com/dask/helm-chart)
$ helm repo add dask https://helm.dask.org$ helm repo updateThe Helm deployment launches a dask-scheduler, several dask-worker processes, and a Jupyter server (https://docs.dask.org/en/latest/setup/kubernetes-helm.html#helm-install-dask).
If you want to get up and running you can do $ helm install my-release dask/dask. However, i'm going to edit the default values of the chart to include additional python packages and add more workers. It is also easy to edit the chart to add parallelized GPU support via dask-cuda but that is outside the scope of this blog.
I'll be adding gcsfs and python-snappy as i'll be reading a file from Google Cloud Storage. I'll also add pyarrow and fastparquet as i'll be reading a parquet file. I will add dask-ml as i'll be doing machine learning. Lastly, i'll add matplotlib for plotting.
The env: input of the chart looks like below. Remember to add the packages to the jupyter environment as well as the worker environment:
env: - name: EXTRA_CONDA_PACKAGES value: "gcsfs python-snappy pyarrow fastparquet dask-ml matplotlib -c conda-forge"I'm going to request 10 workers, each with 1 CPU and 3.75Gb RAM, giving me ~10 cores and ~80 Gb memory. It is not uncommon to ask for 1,000 workers and the free tier account will allow you to do so.
Change the value of replicas: in worker: to: replicas: 10.
and use this chart by running:
$ helm install my-release -f values.yaml dask/daskCheck status by running:
$ kubectl get pods$ kubectl get servicesRun the commands that are displayed to get the IP address of the Jupyter notebook.
$ export DASK_SCHEDULER=$(kubectl get svc --namespace default my-release-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')$ export DASK_SCHEDULER_UI_IP=$(kubectl get svc --namespace default my-release-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')$ export DASK_SCHEDULER_PORT=8786$ export DASK_SCHEDULER_UI_PORT=80$ export JUPYTER_NOTEBOOK_IP=$(kubectl get svc --namespace default my-release-jupyter -o jsonpath='{.status.loadBalancer.ingress[0].ip}')$ export JUPYTER_NOTEBOOK_PORT=80$ echo tcp://$DASK_SCHEDULER:$DASK_SCHEDULER_PORT -- Dask Client connection$ echo http://$DASK_SCHEDULER_UI_IP:$DASK_SCHEDULER_UI_PORT -- Dask dashboard$ echo http://$JUPYTER_NOTEBOOK_IP:$JUPYTER_NOTEBOOK_PORT -- Jupyter notebookOpen the URL of Jupyter notebook (if the tab is empty wait a few minutes) and enter the password.
Open a Jupyter notebook.
Click on the dask extension to the left and copy the URL for the Dask dashboard. The tabs should turn orange. Click on the 'TASK STREAM' then drag the window that pops up to the right. Click on 'PROGRESS' then drag it below the 'TASK STREAM' window.
This code below is taken from Tom Augspurger's PyData NYC talk (code).
Import packages and setup the Cluster:
import gcsfsfrom dask.distributed import Clientimport dask.dataframe as ddimport sklearn.preprocessingimport sklearn.ensembleimport sklearn.pipelineimport joblibclient = Client()clientRead in the parquet file (the NYC taxi dataset):
df = dd.read_parquet("gs://dask-nyc-taxi/yellowtrip.parquet", engine="fastparquet", storage_options={"token": "anon"})dfRead in a subset of the data:
sdf = df.partitions[0].compute()rides = sdf.head(200_000)Create a classification dataset of whether the passenger tipped or not:
features = [ 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'tolls_amount', 'total_amount',]X = rides[features]y = (rides['tip_amount'] > 0).astype(int)Standardize the data and use a random forest classifier with 1,000 trees:
pipeline = sklearn.pipeline.make_pipeline( sklearn.preprocessing.StandardScaler(), sklearn.ensemble.RandomForestClassifier( n_estimators=1000, n_jobs=-1 ))Use the dask back-end to joblib (see video at end) which will parallelize the computation across multiple machines:
with joblib.parallel_backend("dask"): pipeline.fit(X, y)Use the dask implementation of GridSearchCV:
import dask_ml.model_selectionimport numpy as npimport pandas as pdpipe = sklearn.pipeline.make_pipeline( sklearn.preprocessing.StandardScaler(), sklearn.linear_model.LogisticRegression())param_grid = dict( logisticregression__C=np.linspace(0.001, 10, 50), logisticregression__fit_intercept=[True, False], logisticregression__tol=[0.0001, 0.001],)search = dask_ml.model_selection.GridSearchCV(pipe, param_grid, cv=3)search.fit(X, y)Find the best score:
search.best_score_return all the fits:
cv_results = pd.DataFrame(search.cv_results_)cv_results.sort_values(by='mean_test_score', ascending=False)Hyperband is a relatively new technique for finding the best hyper-parameters of a model using adaptive resource allocation and early-stopping [1]. It will stop training estimators that perform poorly. It has been implemented in dask-ml as HyperbandSearchCV [2].
Create a small dataset that is (10,000, 2):
from sklearn.datasets import make_circles%matplotlib inlineX, y = make_circles(n_samples=10_000, random_state=0, noise=0.09)Plot it:
import pandas as pdpd.DataFrame({0: X[:, 0], 1: X[:, 1], "class": y}).sample(4_000).plot.scatter( x=0, y=1, alpha=0.2, c="class", cmap="bwr");Scale it:
X = sklearn.preprocessing.StandardScaler().fit_transform(X)Use dask.delayed to create a larger dataset which has more samples and more features ((40,000,000, 62): 20 Gb):
import daskimport dask.array as dafrom distributed import waitimport numpy as npfrom sklearn.utils import check_random_state@dask.delayed(nout=2)def clone(X, y, seed=None, noise=0.9, n_random=60, n_repeats=4): random_state = check_random_state(seed) # Add some noise X = X + random_state.normal(scale=noise, size=X.shape) # Add random features random_feats = random_state.normal(0, 1, size=(X.shape[0], n_repeats)) X = np.hstack((X, random_feats)) # Replicate multiple times X = np.repeat(X, n_repeats, axis=0) y = np.repeat(y, n_repeats, axis=0) return X, yn_random=60n_repeats=4shape = n_repeats * X.shape[0], X.shape[1] + n_randomdX = dask.delayed(X)dy = dask.delayed(y)Xs, ys = zip(*[clone(dX, dy, seed=i, n_random=n_random, n_repeats=n_repeats) for i in range(1000)])Xs = [da.from_delayed(x, dtype=X.dtype, shape=shape) for x in Xs]ys = [da.from_delayed(x, dtype=y.dtype, shape=(shape[0],)) for x in ys]X_big = da.concatenate(Xs)y_big = da.concatenate(ys)X, y = client.persist([X_big, y_big])wait(X);Setup the model and the hyper-parameters to test:
from sklearn.neural_network import MLPClassifiermodel = MLPClassifier()params = { "hidden_layer_sizes": [ (24, ), (12, 12), (6, 6, 6, 6), (4, 4, 4, 4, 4, 4), (12, 6, 3, 3), ], "activation": ["relu", "logistic", "tanh"], "alpha": np.logspace(-6, -3, num=1000), "batch_size": [16, 32, 64, 128, 256, 512],} Input it to HyperbandSearchCV:
import dask_ml.model_selectionsearch = dask_ml.model_selection.HyperbandSearchCV( model, params, max_iter=15, patience=True,)search.fit(X, y, classes=[0, 1])Find the best parameters:
search.best_params_Return all results:
pd.DataFrame(search.cv_results_).sort_values("rank_test_score") https://cloud.google.com/kubernetes-engine/docs/how-to/deleting-a-cluster
$ gcloud container clusters delete k8scluster --zone us-central1-bAnother way to deploy dask on kubernetes is to use the kubernetes native approach via dask-kubernetes. This has the advantage of allowing the creation of a KubeCluster which can launch or kill workers via adaptive scaling. The process is more hands-on than using the helm chart as it involves writing a Dockerfile. There is an open issue in dask-kubernetes to attach a KubeCluster to an existing running cluster https://github.com/dask/dask-kubernetes/issues/185.
[1] Li et al (2016) Hyperband: A Novel Bandit-Based Approach to Hyperparameter Optimization
[2] Sievert et al (2019) Better and faster hyperparameter optimization with Dask
https://www.jeremyjordan.me/kubernetes/
https://stackshare.io/pinterest/building-a-kubernetes-platform-at-pinterest
https://medium.com/better-programming/kubeflow-pipelines-with-gpus-1af6a74ec2a
https://towardsdatascience.com/tagged/kubernetes
https://github.com/kelseyhightower/kubernetes-the-hard-way
https://github.com/TomAugspurger/pydata-nyc-2019-scalable-ml
https://docs.dask.org/en/latest/setup/kubernetes-helm.html#configure-environment
https://binder.pangeo.io/v2/gh/pangeo-data/pangeo-example-notebooks/master
https://martindurant.github.io/blog/moving-to-google-compute-and-storage/