Dask
Cluster
import os
from dask.distributed import Client, LocalCluster
from dask.distributed import SSHCluster
from dask_cloudprovider.aws import FargateCluster, EC2Cluster, ECSCluster
cluster = LocalCluster(processes=True, local_directory="/tmp")
cluster = LocalCluster(local_directory="/tmp")
cluster = LocalCluster(n_workers=4, memory_limit="32GB", dashboard_address=":8787", local_directory="/tmp")
cluster = LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, local_directory="/tmp")
cluster = FargateCluster(cluster_arn=cluster_arn, skip_cleanup=True, vpc=vpc, security_groups=security_groups)
# client = Client()
client = Client(cluster)
client
# processes=1
with LocalCluster(processes=False, local_directory="/tmp") as cluster:
with Client(cluster) as client:
print(client)
Array
Simple test
import dask.array as da
da.ones((1000, 1000, 1000)).mean().compute()
Apply function
def np_pi_calculator(vector_len: int = 1_000_000) -> int:
rand_x, rand_y = np.random.uniform(-1, 1, vector_len), np.random.uniform(
-1, 1, vector_len
)
origin_dist = rand_x * rand_x + rand_y * rand_y
circle_points = (origin_dist <= 1).sum()
return circle_points
Dataframe
IO
Built in time series dataset
import dask
df = dask.datasets.timeseries()
or
import dask.dataframe as dd
ddf = dd.demo.make_timeseries(
start="2000-01-01",
end="2000-01-02",
dtypes={"x": float, "y": float, "id": int},
freq="10ms",
partition_freq="24h",
)
ddf.groupby("id")[["x", "y"]].mean().compute()
Convert a pandas.DataFrame to a dask.DataFrame
import pandas as pd
df = pd.DataFrame({'col1': ['a', 'b'], 'col2': [1, 2], 'col3': [1.5, 2.5]})
ddf = dd.from_pandas(df, npartitions=2)
Using pyarrow-dataset for row filtering
df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "==", 0)])
df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "in", tuple(arr))])
df = dd.read_parquet("file.parquet", engine="pyarrow-dataset", filters=[("col1", "in", tuple(arr)), ("col2", "in", tuple(arr2))])
From dask.delayed
ddf = dd.from_delayed([file_etl(file) for file in files])
To parquet
import pyarrow as pa
ddf.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
ddf.to_parquet("df.parquet", engine="pyarrow", append=True, ignore_divisions=True)
ddf.to_parquet(
"df.parquet",
engine="pyarrow",
schema={
"col1": pa.string(),
"col2": pa.int32(),
"col3": pa.float32(),
"col4": pa.timestamp("s", tz="UTC"),
},
)
Query an SQL table in parallel
See Sqlalchemy for uri
dd.read_sql_table(table=TABLE, uri=uri, index_col='INT OR TIME', npartitions=N)
Store in Azure blob
storage_options = {'account_name': BLOB_NAME, 'account_key': BLOB_KEY}
remote_folder = 'az://FOLDER/'
dd.to_parquet(ddf, path=remote_folder + 'FILE.parquet', engine='pyarrow', storage_options=storage_options)
Read from Azure blob
dd.read_parquet(path=remote_folder + 'FILE.parquet', columns=["COL"], engine='pyarrow', storage_options=storage_options)
Read from S3
dd.read_parquet("s3://BUCKET/file.parquet", storage_options={"key": AWS_KEY, "secret": AWS_SECRET, "token": AWS_TOKEN})
Repartition
See number of partitions
ddf.npartitions
Get a partition
df.get_partition(0)
Repartition
ddf.repartition(npartitions=6)
ddf.repartition(partition_size='100MB')
Groupby
def myfunc(df):
out = {}
out["Mean Absolute Error"] = mean_absolute_error(df["y_true"], df["y_pred"])
out["Mean Absolute Percentage Error"] = mean_absolute_percentage_error(
df["y_true"], df["y_pred"]
)
return pd.Series(out, index=["Mean Absolute Error", "Mean Absolute Percentage Error"])
ddf.groupby('COL').apply(myfunc,
meta={'Mean Absolute Error': 'f8',
'Mean Absolute Percentage Error': 'f8'}).compute()
If you have many groups use split_out. This will increase number of tasks but will help
ddf.groupby("TIME").sum(split_out=8)
Analytics
Convert object columns to categories
ddf.categorize()
Do one-hot encoding
dd.reshape.get_dummies(ddf)
Run a function on each partition (type)
ddf.map_partitions(type).compute()
Convert to dask array (for ML)
ddf.to_dask_array(lengths=True)
See if a column has nans
dd.isna(ddf).any()
Rename columns
df.rename(columns=dict(zip(df.columns, new_columns)))
Dtypes
ddf.astype({"COL_OBJECT": "string", "COL2_OBJECT": "datetime64[D]", "COL2_OBJECT": "datetime64[D]"})
See memory usage
from dask.utils import format_bytes
format_bytes(ddf.memory_usage(deep=True).sum().compute())
Joining
dd.merge(df1, df2, on='COL')
Bag
db.from_sequence
ML
Model
from dask_ml.linear_model import LinearRegression
lr = LinearRegression(solver='lbfgs', max_iter=10)
lr_model = lr.fit(X_train, y_train)
lr_model.coef_
y_pred = lr_model.predict(X_test)
Pipeline
from sklearn.pipeline import make_pipeline
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LinearRegression
pipe = make_pipeline(
Categorizer(),
DummyEncoder()
)
pipe.fit(ddf)
prepared = pipe.transform(ddf)
SQL
dbfs_engine = create_engine( "databricks+pyhive://token:" + token + "@" + region + ".azuredatabricks.net:443/" + database, connect_args={"http_path": http_path} )
Utils
Schedulers
import dask
dask.base.named_schedulers
Config
#import dask.multiprocessing
dask.config.config.get('distributed')
dask.config.set(scheduler="processes")
~/.config/dask/dask.yaml:
distributed:
dashboard:
link: "{JUPYTERHUB_BASE_URL}user/{JUPYTERHUB_USER}/proxy/{port}/status"
export DASK_DISTRIBUTED__DASHBOARD__LINK=/user/${JUPYTERHUB_USER}/proxy/8787/status
export
Port forwarding
local$ ssh -L 8000:localhost:8787 user@remote
then go to:
localhost:8000
When using in a script / python file
if __name__ == "__main__":
client = Client()
Wrap a function to delayed
@dask.delayed
def my_function(x):
return x + 1
tasks = []
for i in [1, 2, 3]:
task = my_function(i)
tasks.append(task)
dask.compute(tasks, retries=1)
# https://docs.dask.org/en/latest/setup/single-machine.html#single-machine-scheduler
# https://github.com/andersy005/xarray-tutorial/blob/main/notebooks/11-dask-distributed.ipynb
# https://xarray-contrib.github.io/xarray-tutorial/scipy-tutorial/05_intro_to_dask.html#Dask-Schedulers
dask.compute(tasks, scheduler="threads") # default for all Dask operations expect bag
dask.compute(tasks, scheduler="processes") # ~ multi-processing. Best for jhub
dask.compute(tasks, scheduler="sync")
or
tasks = [my_function(i) for i in [1, 2, 3]]
dask.compute(tasks)
Suppress warnings
import logging
logger = logging.getLogger("distributed.utils_perf")
logger.setLevel(logging.ERROR)
Progress bar
from dask.diagnostics import ProgressBar
with ProgressBar():
df = ddf.compute()
TQDM progress bar
from tqdm.dask import TqdmCallback
TqdmCallback(desc="dask tasks").register()
Profile report
from dask.distributed import performance_report
with performance_report(filename="dask-report.html"):
# dask.compute()
See bytes
from dask.utils import format_bytes
Get logs
logs = cluster.get_logs()
DAG
dask.visualize(ddf)
or
ddf.visualize()
or
ddf.dask