Dask

Cluster

import os

from dask.distributed import Client, LocalCluster

from dask.distributed import SSHCluster

from dask_cloudprovider.aws import FargateCluster, EC2Cluster, ECSCluster


cluster = LocalCluster(processes=True, local_directory="/tmp")

cluster = LocalCluster(local_directory="/tmp")

cluster = LocalCluster(n_workers=4, memory_limit="32GB", dashboard_address=":8787", local_directory="/tmp")

cluster = LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, local_directory="/tmp")


cluster = FargateCluster(cluster_arn=cluster_arn, skip_cleanup=True, vpc=vpc, security_groups=security_groups)


# client = Client()

client = Client(cluster)

client


# processes=1

with LocalCluster(processes=False, local_directory="/tmp") as cluster:

    with Client(cluster) as client:

        print(client)

Array

Simple test

import dask.array as da


da.ones((1000, 1000, 1000)).mean().compute()

Apply function

def np_pi_calculator(vector_len: int = 1_000_000) -> int:

    rand_x, rand_y = np.random.uniform(-1, 1, vector_len), np.random.uniform(

        -1, 1, vector_len

    )

    origin_dist = rand_x * rand_x + rand_y * rand_y

    circle_points = (origin_dist <= 1).sum()

    return circle_points



Dataframe

IO

Built in time series dataset

import dask


df = dask.datasets.timeseries()

 or

import dask.dataframe as dd


ddf = dd.demo.make_timeseries(

    start="2000-01-01",

    end="2000-01-02",

    dtypes={"x": float, "y": float, "id": int},

    freq="10ms",

    partition_freq="24h",

)

ddf.groupby("id")[["x", "y"]].mean().compute()

Convert a pandas.DataFrame to a dask.DataFrame

import pandas as pd


df = pd.DataFrame({'col1': ['a', 'b'], 'col2': [1, 2], 'col3': [1.5, 2.5]})

ddf = dd.from_pandas(df, npartitions=2)

Using pyarrow-dataset for row filtering

df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "==", 0)])

df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "in", tuple(arr))])

df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "in", tuple(arr)), ("col2", "in", tuple(arr2))])

From dask.delayed

ddf = dd.from_delayed([file_etl(file) for file in files])

To parquet

import pyarrow as pa


ddf.to_parquet("df.parquet", engine="pyarrow", overwrite=True)

ddf.to_parquet("df.parquet", engine="pyarrow", append=True, ignore_divisions=True)

ddf.to_parquet(

    "df.parquet",

    engine="pyarrow",

    schema={

        "col1": pa.string(),

        "col2": pa.int32(),

        "col3": pa.float32(),

        "col4": pa.timestamp("s", tz="UTC"),

    },

)

Query an SQL table in parallel

See Sqlalchemy for uri

dd.read_sql_table(table=TABLE, uri=uri, index_col='INT OR TIME', npartitions=N)

Store in Azure blob

storage_options = {'account_name': BLOB_NAME, 'account_key': BLOB_KEY}

remote_folder = 'az://FOLDER/'

dd.to_parquet(ddf, path=remote_folder + 'FILE.parquet', engine='pyarrow', storage_options=storage_options)

Read from Azure blob

dd.read_parquet(path=remote_folder + 'FILE.parquet', columns=["COL"], engine='pyarrow', storage_options=storage_options)

Read from S3

dd.read_parquet("s3://BUCKET/file.parquet", storage_options={"key": AWS_KEY, "secret": AWS_SECRET, "token": AWS_TOKEN})

Repartition

See number of partitions

ddf.npartitions

Get a partition

df.get_partition(0)

Repartition

ddf.repartition(npartitions=6)

ddf.repartition(partition_size='100MB')

Groupby

def myfunc(df):

    out = {}

    out["Mean Absolute Error"] = mean_absolute_error(df["y_true"], df["y_pred"])

    out["Mean Absolute Percentage Error"] = mean_absolute_percentage_error(

        df["y_true"], df["y_pred"]

    )    

    return pd.Series(out, index=["Mean Absolute Error", "Mean Absolute Percentage Error"])


ddf.groupby('COL').apply(myfunc,

                         meta={'Mean Absolute Error': 'f8',

                               'Mean Absolute Percentage Error': 'f8'}).compute()

If you have many groups use split_out. This will increase number of tasks but will help

ddf.groupby("TIME").sum(split_out=8)

Analytics

Convert object columns to categories

ddf.categorize()

Do one-hot encoding

dd.reshape.get_dummies(ddf)

Run a  function  on each partition (type)

ddf.map_partitions(type).compute()

Convert to dask array (for ML)

ddf.to_dask_array(lengths=True)

See if a column has nans

dd.isna(ddf).any()

Rename columns

df.rename(columns=dict(zip(df.columns, new_columns)))

Dtypes

ddf.astype({"COL_OBJECT": "string", "COL2_OBJECT": "datetime64[D]", "COL2_OBJECT": "datetime64[D]"})

See memory usage

from dask.utils import format_bytes


format_bytes(ddf.memory_usage(deep=True).sum().compute())

Joining

dd.merge(df1, df2, on='COL')

Bag

db.from_sequence

ML

Model

from dask_ml.linear_model import LinearRegression


lr = LinearRegression(solver='lbfgs', max_iter=10)

lr_model = lr.fit(X_train, y_train)

lr_model.coef_

y_pred = lr_model.predict(X_test)

Pipeline

from sklearn.pipeline import make_pipeline

from dask_ml.preprocessing import Categorizer, DummyEncoder

from dask_ml.linear_model import LinearRegression


pipe = make_pipeline(

    Categorizer(),

    DummyEncoder()

)


pipe.fit(ddf)


prepared = pipe.transform(ddf)

SQL

dbfs_engine = create_engine( "databricks+pyhive://token:" + token + "@" + region + ".azuredatabricks.net:443/" + database, connect_args={"http_path": http_path} )

Utils

Schedulers

import dask


dask.base.named_schedulers

Config

#import dask.multiprocessing

dask.config.config.get('distributed')


dask.config.set(scheduler="processes")


~/.config/dask/dask.yaml:

distributed:

  dashboard:

    link: "{JUPYTERHUB_BASE_URL}user/{JUPYTERHUB_USER}/proxy/{port}/status"


export DASK_DISTRIBUTED__DASHBOARD__LINK=/user/${JUPYTERHUB_USER}/proxy/8787/status

export 

Port forwarding

local$ ssh -L 8000:localhost:8787 user@remote

then go to:

localhost:8000

When using in a script / python file

if __name__ == "__main__":

    client = Client()

Wrap a function to delayed

@dask.delayed

def my_function(x):

    return x + 1


tasks = []

for i in [1, 2, 3]:

    task = my_function(i)

    tasks.append(task)


dask.compute(tasks, retries=1)

# https://docs.dask.org/en/latest/setup/single-machine.html#single-machine-scheduler

# https://github.com/andersy005/xarray-tutorial/blob/main/notebooks/11-dask-distributed.ipynb

# https://xarray-contrib.github.io/xarray-tutorial/scipy-tutorial/05_intro_to_dask.html#Dask-Schedulers

dask.compute(tasks, scheduler="threads") # default for all Dask operations expect bag

dask.compute(tasks, scheduler="processes") # ~ multi-processing. Best for jhub

dask.compute(tasks, scheduler="sync")

or 

tasks = [my_function(i) for i in [1, 2, 3]]

dask.compute(tasks)

Suppress warnings

import logging

logger = logging.getLogger("distributed.utils_perf")

logger.setLevel(logging.ERROR)

Progress bar

from dask.diagnostics import ProgressBar


with ProgressBar():

    df = ddf.compute()

TQDM progress bar

from tqdm.dask import TqdmCallback

TqdmCallback(desc="dask tasks").register()

Profile report

from dask.distributed import performance_report


with performance_report(filename="dask-report.html"):

    # dask.compute()

See bytes

from dask.utils import format_bytes

Get logs

logs = cluster.get_logs()

DAG

dask.visualize(ddf)

or

ddf.visualize()

or

ddf.dask