Spark AI Summit 2020

Accelerating MLFlow Hyper-Parameter Optimization Pipelines with RAPIDS

https://github.com/rapidsai/cloud-ml-examples/

Add init script

https://github.com/rapidsai/cloud-ml-examples/blob/master/databricks/src/rapids_install_cuml0.13_cuda10.0_ubuntu16.04.sh

hyperopt: log -acc to maximize accuracy.

Define spark_trials for hyperopt trails

Can click a button to productionalize the model and query it to find it.

Match sklearn mlflow logging.


Ibis: Seamless Transition Between Pandas and Apache Spark

There's ibis.pandas and ibis.pyspark


Koalas: Making an easy transition from pandas to apache spark

Announced at Spark AI Summit 2019.

Scale out the pandas code using Koalas and make learning PySpark much easier.

pandas vs. pyspark

Add a column

df['c'] = df['a'] + df['b'] df = df.withColumn('c', df['a'] + df['b'])

Rename columns

df.columns = ['a', 'b'] df = df.select(df['c1'].alias('a'), df['c2'].alias('b'))

df = df.toDF('a', 'b')

Value counts

df['col'].value_counts() df.groupBy(df['col']).count().orderBy('count', ascending=False)


import pandas as pd

df = pd.read_csv('file.csv') df = spark.read.option("inferSchema", "true").csv('file.csv')

df.columns = ['x', 'y', 'z1'] df = df.toDF('x', 'y', 'z1')

df['x2'] = df.x * df.x df = df.withColumn('x2', df.x * df.x)

Koalas is same as pandas but using

import databricks.koalas as ks


Koalas 1.0

Spark 3.0 support. Some functions optimized e.g. mainInPandas()

Python 3.8 support

follow pandas 1.0 behavior

Copy pandas API more and more over time.

Can go back and forth between koalas and spark using .to_koalas() and .to_spark()

there is also ks.sql()

kdf = ks.from_pandas(pdf)
pdf - kdf.to_pandas()

sdf = kdf.to_spark(index_col='index').show() # if you want to add an 1:N index column
pdf = sdf.to_koalas(index_col=['x', 'y'])

kdf.mean()
kdf.describe()

kdf.groupby('A').sum

kdf['col'].cumsum().plot()

https://docs.databricks.com/_static/notebooks/pandas-to-koalas-in-10-minutes.html

Transform if want same size data frame. Apply if you want different length.

kdf = ks.DataFrame({'a': [1, 2,3 ], 'b':[4, 5, 6]})
def pandas_plus(pser):
    #return pser + 1
    #return pser[pser % 2 == 1]
#kdf.transform(pandas_plus)
#kdf.apply(pandas_plus)

Use transform_batch() and apply_batch() if a dataframe

Get spark schema

kdf.spark.schema().simpleString()
kdf.spark.print_schema()

dataframe.spark.apply

kdf.spark.apply(lambda sdf: sdf.select(sdf['A'] * sdf['B'])).head()

Index col parameters is needed of need to preserve the index

kdf.spark.apply(lambda sdf: sdf.select(sdf['index'], sdf['A'] * sdf['B']), index_col='index').head()

series.spark.transform

kdf.A.spark.transform(lambda scol: scol.cast('int')).head()

dataframe.spark.explain()

(kdf + 1).spark.explain()


Can take an Index and also has a Default Index


July/Aug 2020 release DBR/MLR 7.1 will pre-install Koalas 1.x

More mapping pandas API and add ML libraries

https://koalas.readthedocs.io/en/latest/?badge=latest


Productionizing Machine Learning Pipelines with Databricks & Azure ML

Use Azure ML, Azure Pipelines and Azure DevOps

Use an Azure VM, python 3.7

use poetry for dependency management:

  • Upload .whl code to databricks.
  • Can install databricks-cl

Use dbutils.sectrets and can create a Scope to host password

Put images into train, valid and test folder.


Plot MLflow to find best model

See model Artifact

Download model artifact locally to test deployment

Register to AzureML model registry

Pass Webservice function

Can set Gbs and memory


cv2 to read image


Put into deployment pipeline


Use Azure DevOps and Releases under Pipelines


Accelerating Data Processing in Spark SQL With Pandas UDFs

Loops > spark SQL

Aggregate Keys

Use inverted indices

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

UDF = User Defined Function

Scalar UDFs - one-to-one mapping

Groups Map UDFs - can return a variable number of output rows

Grouped Agg UDFs -> Use Grouped Map UDFs instead.


Native approach: use Spark SQL

  • Get all pairs using a cross join
  • functions.arr_intersect for the inclusion/exclusion logic
  • Use groupby and aggregate to get the counts
naive_solution = featurestore_sample_100k.crossJoin(functions.broadcast(model_data))
# Have at least 1 inclusion and no exclusion hashes
def arrays_intersect(co1, co2):
    return functions.size(functions.array_intersect(col1, col2)) != 0
native_solution = native_solution.filter(arrays_intersect('featureIds', 'geoInclusionHashes') & (~arrays_intersect('featureIds', 'geoEnclusionHashes'))
native_solution  = native_solution .select('modelId', 'id').distinct()
native_solution  = native_solution .groupby('modelId').count()

But is slow...

Generate 700x the number of intermediate rows

Pandas UDF

def simple_group_by(feature_store_id_data):
    id_set = []
    for model_id, inclusion_ids, exclusion_ids in model_data_df[['modelId', 'geoInclusionSet', 'geoExclusionSet']].values:
        for id_row in feature_store_id_data['featureIds'].values:
            if (not inclusion_ids.isdisjoint(id_row) and exclusion_ids.isdisjoin(id_row)):
                id_set.append(model_id)
               break
    return pd.DataFrame([[cur_id] for cur_id in id_set], columns=['modelId'])
simple_group_by_udf = functions.pandas_udf(
    simple_group_by,
    StructType([StructField('modelId', IntergetType())]),
    PandasUDFType.GROUPED_MAP)

Speed up by counting on number of unique filters

key_aggregated_solution = (featurestore_sample_100k
  .groupby('id')
  .apply(key_aggregated_group_by_udf)
  .groupby('filterId')
  .count()
  .join(functions.broadcast(filter_model_id_pairs), on='filterId'))

Speed up by batching (e.g. `10k ids per UDF call)

batch_solution = (featurestore_sample_1M
  .withColumn('batch', functions.abs(functions.hash('id') % 100))
  .groupby('batch').apply(batch_group_by_udf)
  .groupby('filterId')
  .agg(functions
  .sum('batchCount')
  .alias('count'))
  .join(functions.broadcast(filter_model_id_pairs), on='filterId'))

User inverted index to iterate over the unqiue features instead of filters

Use .values and pass to itertools instead of pandas to speed up

itertools.group_by to sort and group data

itertools.chain.from_iterable to iterate through a nested for loop


Scalable Acceleration of XGBoost Training On Apache Spark GPU Clusters

XGBoost gradient boosting library.

Regression, classification, ranking and user defined objectives

Use gpu_hist as the tree method

Out-of-core is difficult as GPU mem is small, native streaming over the PCle bus is slow

Can sample the data and use sample to build the tree

gradient-based sampling - probability proportional to the gradients; Gradient-based One-Side Sampling (GOSS; used in LightGBM); Minimal Variable Sampling (MVS; in CatBoss); sample ratio as low as 0.1 without loss of accuracy

500 columns, V100 GPU (16 GM GRAM): max data size (9m in row in-core; 12m for out-of-core; 85 million is using f=0.1)


Learning to rank (LTR)

Used in Information Retrieval (IR):

  • search engine

within each group use ML


XGBoost builds a better model by combining multiple weak models

Models are built by gradient descent using an objective function such as LTR

LambdaMart ranking algorithm which uses pairwise ranking approach. Minimizes pairwise loss by repeatedly sampling pairs or instances

3 algorithms on GPU:

  • Pairwise (default)
  • mAP - mean Average Precision
  • nDCG - normalized Discounted Cumulative Gain

The last two further minimize pairwise loss by adjusting it with the weight of instance pair chosen

objective=rank:map

eval_metric=map (pre[@n] - for precision of top n documents; auc - area under the ROC curve; aucpr - area under the precision recall curve


Test on Microsoft benchmark ranking dataset


Convert data to numeric data; do ETL (XGBoost4j)

XGBoost + Spark 2.x + Rapids

Read file directly to GPU memory

Chunks loading

Convert column-mjor cudf to sparse, row-major DMatrix

CPU

df = spark.read.parquet(path)
featureNames = Sep("f1", "f2", "f3")
vectorAssemble = VectorAssembler().setInputCols(featureNames.toArray).setOutputCol("features")
xgbInput = vectorAssemble .transform(df).select("features", labelColName)
xgbClassifier = XGBoostClassifier(params).setLabelCol(labelColName).setTreeMethod("hist").setFeaturesCol("features")
model = xgbClassifier.fit(xgbInput)

GPU

gpuDF = GpuDataReader(spark).parquet(path)
featureNames = Sep("f1", "f2", "f3")
xgbClassifier = XGBoostClassifier(params).setLabelCol(labelColName).setTreeMethod("gpu_hist").setFeaturesCol("featuresNames")
model = xgbClassifier.fit(gpuDF)

XGBoost + Spark 3.0

Rapids-plugin-4-spark - via rapids library

if gpu_enabled(operation, data_type) else standard spark operation

https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/ebook-sign-up/

https://www.nvidia.com/content/dam/en-zz/Solutions/deep-learning/deep-learning-ai/solutions/WP-09926-001_v01_0610.pdf

https://github.com/rapidsai/xgboost/tree/rapids-spark3.0

https://github.com/rapidsai/spark-examples


Deep Dive Into GPU Support In Apache Spark 3.X


Koalas: Pandas On Apache Spark

Pandas

import pandas as pd
df = pd.read_csv('my_data.csv')
df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x

PySpark

df = spark.read.option("inferSchume", "true").csv("my_data.csv")
df = df.toDF('x', 'y', 'z1')
df = df.withColumn('x2', df.x * df.x)

Koalas

import databricks.koalas as ks
df = ks.read_csv('my_data.csv')
df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x

Spark does not maintain row order

If no index is specified when creating a Koalas DataFrame a "default index" is attached automatically (each default index type has pros and cons):

  • sequence - Uses PySpark Window function. Doesn't partition
  • distributed-sequence - Uses group-by and group-map
  • distributed - Uses monotonically_increasing_id but values are non-deterministic

Can configure these by using compute.default_index_type

bit.ly/koalas_summit_2020

%fs ls dbfs:/home/niall/koalas/sais_2020_csv/


# import pandas as pd


# pdf = pd.read_csv("/dbfs/home/niall/koalas/sais_2020_csv/gbif_australia_2010_2020.csv, sep="\t")

# pdf.head()

But OOM


import databricks.koalas as ks

Chose distributed index (https://koalas.readthedocs.io/en/latest/user_guide/options.html#default-index-type )

ks.set_option('compute.default_index_type', 'distributed')

Read from delta lake

kdf = ks.read_delta("dbfs:/home/niall/koalas/sais_2020")
kdf.head()

Check size

kdf.shape

Use SQL query

genus_test = "Phascolarctos"

ks.sql("""
SELECT COUNT(*) 
FROM {kdf}
WHERE genus = {genus_test}
""")

Grab the column names

kdf.columns

EDA

kdf.describe()

Count null values

kdf.isna().sum()

Keep data

kdf = kdf[kdf["individualCount"].notnull() & kdf["month"].notnull() & kdf["year"].notnull()]

Value counts

kdf["kingdom"].value_counts(normalize=True)

Viz

kdf["kingdom"].value_counts(normalize=True).plot.bar(rot=25, figsize=(12,8), title="Bar plot of kingdom column using Koalas DataFrame")

Number of unique classes

len(kdf["class"].unique())

Total count, the number of distinct recordings and the average count per recording for each class

# Group by the class column, sum up the individual counts for each group, and get the count of instances in each group.
# Note that using as_index=False resets the index
class_ks_df = kdf.groupby("class", as_index=False).agg({"individualCount": "sum", "gbifID": "count"})

# Renaming our columns to reflect the grouping aggregates
class_ks_df.columns = ["class", "total_count", "number_distinct_recordings"]

# Create a column which calculates the average number of animal/plant counts per each distinct recording for each class
class_ks_df["avg_count_per_recording"] = class_ks_df["total_count"] / class_ks_df["number_distinct_recordings"]
class_ks_df.sort_values(by="number_distinct_recordings", ascending=False).head(10)

Distribution of the average counts per recording using .hist()

class_ks_df["avg_count_per_recording"].hist(bins=100, figsize=(12,8), title="Histogram of average individualCount per observation (Aggregated to class level)")

class_ks_df.sort_values(by="avg_count_per_recording", ascending=False).head().style.background_gradient(cmap="Greens")

Filter down to the genus Phascolarctos

koalas_df = kdf[kdf.genus == "Phascolarctos"]
koalas_df.head()
koalas_df.shape[0]

Check the date range for which we have recordings of koala numbers

koalas_df["eventDate"].min(), koalas_df["eventDate"].max()

How many observations were recorded by each province from 2010 to May 2020?

koalas_df["stateProvince"].value_counts()

What is the distribution of the individual count of Koalas for each observation?

koalas_df["individualCount"].hist(bins=100, figsize=(12,8), title="Histogram of individualCount per recording for Koalas")

Remove any observations where over 100 koalas have been recorded for a single recording

koalas_df = koalas_df[koalas_df["individualCount"] < 100]
koalas_df["individualCount"].hist(bins=100, figsize=(12,8), title="Histogram of individualCount per recording for Koalas")

Number of occurences for koalas recorded, along with the total number of koalas counted and get the average number of koalas per recorded sighting.

# Get the sum of all koalas recorded in Australia for each month
koalas_sum_ks_df = koalas_df.groupby(["year", "month"], as_index=False)["individualCount"].sum()

# Get the count of all koalas occurrences in the Australia for each month
koalas_count_ks_df = koalas_df.groupby(["year", "month"], as_index=False)["individualCount"].count()

# Join the above Koalas DataFrames and divide the total_count column by num_recordings column
koalas_avg_cnt_ks_df = koalas_sum_ks_df.merge(koalas_count_ks_df, on=["year", "month"])
koalas_avg_cnt_ks_df.columns = ["year", "month", "total_count", "num_recordings"]

# Get number of koalas per recording nationally
koalas_avg_cnt_ks_df["avg_cnt"] = koalas_avg_cnt_ks_df["total_count"] / koalas_avg_cnt_ks_df["num_recordings"]

koalas_avg_cnt_ks_df.sort_values(by=["year", "month"], ascending=False).head(12)

Plot the count per month of koalas from 2010 to May 2020

# Concatenating the year and month columns to create a year_month column
koalas_avg_cnt_ks_df = ks.sql("""
SELECT year, month, num_recordings, total_count, avg_cnt, CONCAT(year, "-", month) AS year_month
FROM {koalas_avg_cnt_ks_df}
""")

koalas_avg_cnt_ks_df.head()

# Convert the year_month column to datetime (returns in format %Y-%m-%d)
koalas_avg_cnt_ks_df["year_month"] = ks.to_datetime(koalas_avg_cnt_ks_df["year_month"])

# Need to sort by year_month before plotting
koalas_avg_cnt_ks_df.sort_values(by=['year_month'], inplace=True)

Create NumPy arrays of the columns we would like to pass into our matplotlib plot

# Creating numpy arrays from year_month, total_count and num_recordings
x_axis_vals = koalas_avg_cnt_ks_df["year_month"].to_numpy()
y1_vals = koalas_avg_cnt_ks_df["total_count"].to_numpy()
y2_vals = koalas_avg_cnt_ks_df["num_recordings"].to_numpy()

Plot monthly total counts and the number of monthly recordings for koalas from 2010 until May 2020

import matplotlib.pyplot as plt
import seaborn as sns; sns.set_style("white")
plt.rcParams.update({'font.size': 9})

fig, ax1 = plt.subplots(figsize=(12,8))

color = 'tab:red'
ax1.set_xlabel('year_month')
ax1.set_ylabel('total_count', color=color)
ax1.plot(x_axis_vals, y1_vals, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx() 
color = 'tab:blue'
ax2.set_ylabel('num_recordings', color=color) 
ax2.plot(x_axis_vals, y2_vals, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()  
ax1.set_title("Monthly total count and number of recordings for Koalas in Australia from 2011 - 2018", size=9)

plt.show()

Plot the average number of koalas per recording for each month between 2010 and May of 2020

sns.set_style("whitegrid")

# Set year_month to be the index and keep only avg_cnt
koalas_avg_count_ks_series = koalas_avg_cnt_ks_df.set_index("year_month")["avg_cnt"]

plt.title("Monthly average koala count in Australia from 2010 - 2020", size=9)
plt.xlabel("date")
plt.ylabel("avg_num_koalas_per_occurence")

koalas_avg_count_ks_series.plot(figsize=(12, 8))

Parallelizing Model Training with Koalas

forecast future total numbers for genus on a monthly basis

# First filter to the classes we would like to forecast for
filter_1 = (kdf["genus"] == "Phascolarctos") # Koala
filter_2 = (kdf["genus"] == "Macropus") # Kangaroo
filter_3 = (kdf["genus"] == "Cacatua") # Cockatoo
filter_4 = (kdf["genus"] == "Dromaius") # Emu

subset_kdf = kdf[filter_1 | filter_2 | filter_3 | filter_4]

# Get the total number of counts and total number of observations for each class in every month
grouped_kdf = subset_kdf.groupby(["genus", "year", "month"], as_index=False) \
                          .agg({"individualCount": "sum", "occurrenceID": "count"}) \
                          .sort_values(by=["year", "month"])

grouped_kdf.columns = ["genus", "year", "month", "total_count", "num_recordings"]

# Concatenating the year and month columns to create a year_month column
grouped_kdf = ks.sql("""
SELECT genus, year, month, num_recordings, total_count, CONCAT(year, "-", month) AS year_month
FROM {grouped_kdf}
ORDER BY year, month
""")

# Convert the year_month column to datetime (returns in format %Y-%m-%d)
grouped_kdf["year_month"] = ks.to_datetime(grouped_kdf["year_month"], format="%Y-%m")
grouped_kdf.head()

Utilize Koalas groupby-apply method

Apply an arbitrary python function to a group of our data, for example in this instance for each genus, we apply the fcst_func function we have defined which fits a Prophet model and logs parameters and plots to MLflow. Note that the returned koalas.DataFrame can have different number rows and columns as the input.

Koalas makes use of Spark's pandas UDF functionality when implementing a groupby-apply method

When calling groupby-apply, Koalas executes the function once for a small sample to infer the type which can be potentially expensive; for example, where a dataset is created after aggregations or sorting. To avoid this, we specify a return type hint in function being applied.

When forecasting for each individual genus we will produce a number of different plots. To record these, we will use MLflow to log them as artifacts.

# MLflow set up
import mlflow
from  mlflow.tracking import MlflowClient

# MLflow experiment (specify path for experiment to be set to)
# NOTE: you will need to change the following experiment path 
# dependent on where you would like to save the model to
mlflow_exp = f"/Users/niall.turbitt@databricks.com/Koalas/Koalas SAIS 2020/koalas_sais20_exp"
mlflow.set_experiment(mlflow_exp)

# Get the experiment ID to track to
exp_id = MlflowClient().get_experiment_by_name(mlflow_exp).experiment_id

from fbprophet import Prophet
import numpy as np
import matplotlib.pyplot as plt

def fcst_func(pdf) -> ks.DataFrame[np.datetime64, np.float64, np.float64, np.int64, str]:
  """
  Function to take grouped genus data, and provide 3-year monthly 
  forecast of total count. Log model plot as artifacts in MLflow 
  
  pdf: grouped genus data
  """ 
  genus_name = pdf["genus"].iloc[0]
  pdf["ds"] = pdf["year_month"]
  pdf["y"] = pdf["total_count"]
  df = pdf[["ds", "y"]].copy()
    
  # log the run with MLflow
  with mlflow.start_run(experiment_id=exp_id,
                        run_name=f"{genus_name} - Prophet Forecast") as run:
    
    prophet_daily_seasonality = False
    prophet_weekly_seasonality = False
    prophet_yearly_seasonality = True
    
    mlflow.log_params({"genus": genus_name,
                       "prophet_param_daily_seasonality": prophet_daily_seasonality,
                       "prophet_param_weekly_seasonality": prophet_weekly_seasonality,
                       "prophet_param_yearly_seasonality": prophet_yearly_seasonality})

    m = Prophet(daily_seasonality=prophet_daily_seasonality, 
                weekly_seasonality=prophet_weekly_seasonality, 
                yearly_seasonality=prophet_yearly_seasonality, 
                seasonality_mode="additive")
    m.fit(df)
    future = m.make_future_dataframe(periods=36, freq="M")
    fcst = m.predict(future)

    fig_1 = m.plot(fcst)
    plt.xlabel(f"Date")
    plt.ylabel(f"{genus_name} Numbers (Monthly)")
    plt.title(f"{genus_name} - Prophet Forecast")
    fcst_path = f"/tmp/{genus_name}_forecast.png"
    fig_1.savefig(fcst_path)
    mlflow.log_artifact(fcst_path)
  
    fig_2 = m.plot_components(fcst)
    components_path = f"/tmp/{genus_name}_components.png"
    fig_2.savefig(components_path)
    mlflow.log_artifact(components_path)
    plt.clf()
    
  ret_pdf = fcst[["ds", "yhat", "yhat_lower", "yhat_upper"]]
  ret_pdf["genus"] = [genus_name] * len(ret_pdf)

  return ret_pdf


fcst_df = grouped_kdf.groupby("genus", as_index=False).apply(fcst_func)

Call len to run it

len(fcst_df)

Search the runs of our MLflow experiment programmatically using search_runs

test_genus = "Cacatua"

# The following returns a pandas DataFrame of runs
mlflow_res_pdf = mlflow.search_runs(filter_string=f"params.genus='{test_genus}'").sort_values("start_time", ascending=False)
mlflow_res_pdf

From our runs DataFrame, we can grab the artifact URI and load the forecast plot for that given run.

artifact_uri = mlflow_res_pdf.iloc[0]["artifact_uri"]

local_img_path = "/" + artifact_uri.replace(":", "") + f"/{test_genus}_forecast.png"

plt.figure(figsize = (12,8))
img = plt.imread(local_img_path)
plt.imshow(img)
plt.show()


Scaling Up AI Research To Production With PyTorch and MLFlow

What is PyTorch?

Eager & Graph-based execution; dynamic NN; distributed training; hardware acceleration; simplicity of complexity

torch.nn/optim/data/autograd/vision/jit (deployment)

PyTorch developer conference

import torch
class Net(torch.nn.Module):
    def __initi__(self):
        self.fc1 = torch.nn.Linear(8, 64)
        self.fc2 = torch.nn.Linear(64, 1)
    def forward(self, x):
        x = torch.relu(self.fc1.forward(x))
        x = torch.dropout(x, p=0.5)
        x = torch.sigmoid(self.fc2.forward(x))
        return x

net = Net()
data_loader = torch.utils.data.DataLoader(torchvision.datasets.MNIST('./data'))
optimizer = torch.optim.SGD(net.parameters())
for epoch in range(1, 11):
    for data, target in data_loader:
        optimizer.zero_grad()
        prediction = net.forward(data)
        loss = F.nll_loss(prediction, target) # neg log-loss
        loss.backward()
        optimizer.step()
    if epoch % 2 == 0:
        torch.save(net, "net.pt")

https://pytorch.org/ecosystem/

Pruning - hard to deploy large NN. Optimal technique to compress model by reducing the number of parameters without sacrificing accuracy

Quantization - experimental - inference at scale e.g. mobile

Pytorch elastic - fault tolerant and dynamic

Pytorch RPC (remote procedure call) - scale out applications

Torceserve - Http APIs - partnership with AWS

mlflow.pytorch

PyTorch 1.5 released on April 21st

torchvision 0.6; torchtext 0.6; torchaudio 0.5

https://www.udacity.com/course/deep-learning-pytorch--ud188 - free course

https://www.fast.ai/


Anomaly Detection At The Edge

Compute close to the data source

Autonomous vehicles - connected cars, increase situational awareness

Manufacturing - predictive maintenance

Telecommunications - quality of service

Retail and consumer goods - hypertargeting

healthcare - personalization

energy and utilities - Improve efficiency

Federated learning

One pass algorithms - moments of univariate/multivariate data

Incremental - numerical stability

Statistical, DL/RL techniques:

Time series & PCA - ARIMA, Sparse PCA

LSTM - Stochastic RNN

Attention

Autoencoder

Transformer

Gans - GANomaly

Sketching - deal with edge/data issues

Models:

Squeezed convolutional Variational autoencoder

Edge-based RNN

Federated XGBoost

CNN-variational autoencoder

One class SVM

GRU

Features - subcomponent timing


Saving Energy In Homes With A Unified Approach To Data And AI

Data workflow

IoT Stream + Click data + SQL -> delta lake

delta lake -> batch processing -> services API

delta lake -> notebooks

https://github.com/quby-io/databricks-workflow

staging/production/integration test workflows


Efficiently Building Machine Learning Models For Predictive Maintenance In The Oil & Gas Industry with Databricks

Reduce annual maintenace cost by 10% based on avoiding failure modes

For time series - Welch Fourier Transform for each window

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def fill_pdf(pdf):
    for col in col_list:
        pdf.loc[:, col] = pdf[[col]].ffill().bfill()
    return pdf
new_df = df.groupBy('FIELD').apply(fill_pdf)


Geosp.AI.tial: Applying Big Data and Machine Learning to Solve the World's Toughest Geospatial Intelligence Problems

https://www.boozallen.com/

Land classification with Black Marble

packages: GeoMesa, GeoTrellis, GeoPandas and GeoSpark


Geospatial Analytics in Apache Spark

Indexing is difficult. Other options: Geohash (elastic), Quadtree, H3 (uber)

Geopandas - https://geopandas.org/

Scikit-mobility (analysis of human mobility) - https://scikit-mobility.github.io/scikit-mobility/

Moving-pandas - https://movingpandas.readthedocs.io/en/latest/


RasterFrames - https://rasterframes.io/

GeoSpark - https://datasystemslab.github.io/GeoSpark/


Large-scale joins e.g. Microsoft building dataset with the US Census Blocks to get stats on average square foot density.


Bring Satellite & Drone Imagery into Your Data Science Workflows

https://aerialservicesinc.com/earth-on-demand/

https://earthondemand.astraea.earth/

California fire incidents.