Spark AI Summit 2020
Accelerating MLFlow Hyper-Parameter Optimization Pipelines with RAPIDS
https://github.com/rapidsai/cloud-ml-examples/
Add init script
hyperopt: log -acc to maximize accuracy.
Define spark_trials for hyperopt trails
Can click a button to productionalize the model and query it to find it.
Match sklearn mlflow logging.
Ibis: Seamless Transition Between Pandas and Apache Spark
There's ibis.pandas and ibis.pyspark
Koalas: Making an easy transition from pandas to apache spark
Announced at Spark AI Summit 2019.
Scale out the pandas code using Koalas and make learning PySpark much easier.
pandas vs. pyspark
Add a column
df['c'] = df['a'] + df['b'] df = df.withColumn('c', df['a'] + df['b'])
Rename columns
df.columns = ['a', 'b'] df = df.select(df['c1'].alias('a'), df['c2'].alias('b'))
df = df.toDF('a', 'b')
Value counts
df['col'].value_counts() df.groupBy(df['col']).count().orderBy('count', ascending=False)
import pandas as pd
df = pd.read_csv('file.csv') df = spark.read.option("inferSchema", "true").csv('file.csv')
df.columns = ['x', 'y', 'z1'] df = df.toDF('x', 'y', 'z1')
df['x2'] = df.x * df.x df = df.withColumn('x2', df.x * df.x)
Koalas is same as pandas but using
import databricks.koalas as ks
Koalas 1.0
Spark 3.0 support. Some functions optimized e.g. mainInPandas()
Python 3.8 support
follow pandas 1.0 behavior
Copy pandas API more and more over time.
Can go back and forth between koalas and spark using .to_koalas()
and .to_spark()
there is also ks.sql()
kdf = ks.from_pandas(pdf)
pdf - kdf.to_pandas()
sdf = kdf.to_spark(index_col='index').show() # if you want to add an 1:N index column
pdf = sdf.to_koalas(index_col=['x', 'y'])
kdf.mean()
kdf.describe()
kdf.groupby('A').sum
kdf['col'].cumsum().plot()
https://docs.databricks.com/_static/notebooks/pandas-to-koalas-in-10-minutes.html
Transform if want same size data frame. Apply if you want different length.
kdf = ks.DataFrame({'a': [1, 2,3 ], 'b':[4, 5, 6]})
def pandas_plus(pser):
#return pser + 1
#return pser[pser % 2 == 1]
#kdf.transform(pandas_plus)
#kdf.apply(pandas_plus)
Use transform_batch() and apply_batch() if a dataframe
Get spark schema
kdf.spark.schema().simpleString()
kdf.spark.print_schema()
dataframe.spark.apply
kdf.spark.apply(lambda sdf: sdf.select(sdf['A'] * sdf['B'])).head()
Index col parameters is needed of need to preserve the index
kdf.spark.apply(lambda sdf: sdf.select(sdf['index'], sdf['A'] * sdf['B']), index_col='index').head()
series.spark.transform
kdf.A.spark.transform(lambda scol: scol.cast('int')).head()
dataframe.spark.explain()
(kdf + 1).spark.explain()
Can take an Index and also has a Default Index
July/Aug 2020 release DBR/MLR 7.1 will pre-install Koalas 1.x
More mapping pandas API and add ML libraries
https://koalas.readthedocs.io/en/latest/?badge=latest
Productionizing Machine Learning Pipelines with Databricks & Azure ML
Use Azure ML, Azure Pipelines and Azure DevOps
Use an Azure VM, python 3.7
use poetry for dependency management:
- Upload .whl code to databricks.
- Can install databricks-cl
Use dbutils.sectrets and can create a Scope to host password
Put images into train, valid and test folder.
Plot MLflow to find best model
See model Artifact
Download model artifact locally to test deployment
Register to AzureML model registry
Pass Webservice function
Can set Gbs and memory
cv2 to read image
Put into deployment pipeline
Use Azure DevOps and Releases under Pipelines
Accelerating Data Processing in Spark SQL With Pandas UDFs
Loops > spark SQL
Aggregate Keys
Use inverted indices
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
UDF = User Defined Function
Scalar UDFs - one-to-one mapping
Groups Map UDFs - can return a variable number of output rows
Grouped Agg UDFs -> Use Grouped Map UDFs instead.
Native approach: use Spark SQL
- Get all pairs using a cross join
- functions.arr_intersect for the inclusion/exclusion logic
- Use groupby and aggregate to get the counts
naive_solution = featurestore_sample_100k.crossJoin(functions.broadcast(model_data))
# Have at least 1 inclusion and no exclusion hashes
def arrays_intersect(co1, co2):
return functions.size(functions.array_intersect(col1, col2)) != 0
native_solution = native_solution.filter(arrays_intersect('featureIds', 'geoInclusionHashes') & (~arrays_intersect('featureIds', 'geoEnclusionHashes'))
native_solution = native_solution .select('modelId', 'id').distinct()
native_solution = native_solution .groupby('modelId').count()
But is slow...
Generate 700x the number of intermediate rows
Pandas UDF
def simple_group_by(feature_store_id_data):
id_set = []
for model_id, inclusion_ids, exclusion_ids in model_data_df[['modelId', 'geoInclusionSet', 'geoExclusionSet']].values:
for id_row in feature_store_id_data['featureIds'].values:
if (not inclusion_ids.isdisjoint(id_row) and exclusion_ids.isdisjoin(id_row)):
id_set.append(model_id)
break
return pd.DataFrame([[cur_id] for cur_id in id_set], columns=['modelId'])
simple_group_by_udf = functions.pandas_udf(
simple_group_by,
StructType([StructField('modelId', IntergetType())]),
PandasUDFType.GROUPED_MAP)
Speed up by counting on number of unique filters
key_aggregated_solution = (featurestore_sample_100k
.groupby('id')
.apply(key_aggregated_group_by_udf)
.groupby('filterId')
.count()
.join(functions.broadcast(filter_model_id_pairs), on='filterId'))
Speed up by batching (e.g. `10k ids per UDF call)
batch_solution = (featurestore_sample_1M
.withColumn('batch', functions.abs(functions.hash('id') % 100))
.groupby('batch').apply(batch_group_by_udf)
.groupby('filterId')
.agg(functions
.sum('batchCount')
.alias('count'))
.join(functions.broadcast(filter_model_id_pairs), on='filterId'))
User inverted index to iterate over the unqiue features instead of filters
Use .values and pass to itertools instead of pandas to speed up
itertools.group_by
to sort and group data
itertools.chain.from_iterable
to iterate through a nested for loop
Scalable Acceleration of XGBoost Training On Apache Spark GPU Clusters
XGBoost gradient boosting library.
Regression, classification, ranking and user defined objectives
Use gpu_hist as the tree method
Out-of-core is difficult as GPU mem is small, native streaming over the PCle bus is slow
Can sample the data and use sample to build the tree
gradient-based sampling - probability proportional to the gradients; Gradient-based One-Side Sampling (GOSS; used in LightGBM); Minimal Variable Sampling (MVS; in CatBoss); sample ratio as low as 0.1 without loss of accuracy
500 columns, V100 GPU (16 GM GRAM): max data size (9m in row in-core; 12m for out-of-core; 85 million is using f=0.1)
Learning to rank (LTR)
Used in Information Retrieval (IR):
- search engine
within each group use ML
XGBoost builds a better model by combining multiple weak models
Models are built by gradient descent using an objective function such as LTR
LambdaMart ranking algorithm which uses pairwise ranking approach. Minimizes pairwise loss by repeatedly sampling pairs or instances
3 algorithms on GPU:
- Pairwise (default)
- mAP - mean Average Precision
- nDCG - normalized Discounted Cumulative Gain
The last two further minimize pairwise loss by adjusting it with the weight of instance pair chosen
objective=rank:map
eval_metric=map (pre[@n] - for precision of top n documents; auc - area under the ROC curve; aucpr - area under the precision recall curve
Test on Microsoft benchmark ranking dataset
Convert data to numeric data; do ETL (XGBoost4j)
XGBoost + Spark 2.x + Rapids
Read file directly to GPU memory
Chunks loading
Convert column-mjor cudf to sparse, row-major DMatrix
CPU
df = spark.read.parquet(path)
featureNames = Sep("f1", "f2", "f3")
vectorAssemble = VectorAssembler().setInputCols(featureNames.toArray).setOutputCol("features")
xgbInput = vectorAssemble .transform(df).select("features", labelColName)
xgbClassifier = XGBoostClassifier(params).setLabelCol(labelColName).setTreeMethod("hist").setFeaturesCol("features")
model = xgbClassifier.fit(xgbInput)
GPU
gpuDF = GpuDataReader(spark).parquet(path)
featureNames = Sep("f1", "f2", "f3")
xgbClassifier = XGBoostClassifier(params).setLabelCol(labelColName).setTreeMethod("gpu_hist").setFeaturesCol("featuresNames")
model = xgbClassifier.fit(gpuDF)
XGBoost + Spark 3.0
Rapids-plugin-4-spark - via rapids library
if gpu_enabled(operation, data_type) else standard spark operation
https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/ebook-sign-up/
https://github.com/rapidsai/xgboost/tree/rapids-spark3.0
https://github.com/rapidsai/spark-examples
Deep Dive Into GPU Support In Apache Spark 3.X
Koalas: Pandas On Apache Spark
Pandas
import pandas as pd
df = pd.read_csv('my_data.csv')
df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x
PySpark
df = spark.read.option("inferSchume", "true").csv("my_data.csv")
df = df.toDF('x', 'y', 'z1')
df = df.withColumn('x2', df.x * df.x)
Koalas
import databricks.koalas as ks
df = ks.read_csv('my_data.csv')
df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x
Spark does not maintain row order
If no index is specified when creating a Koalas DataFrame a "default index" is attached automatically (each default index type has pros and cons):
- sequence - Uses PySpark Window function. Doesn't partition
- distributed-sequence - Uses group-by and group-map
- distributed - Uses monotonically_increasing_id but values are non-deterministic
Can configure these by using compute.default_index_type
%fs ls dbfs:/home/niall/koalas/sais_2020_csv/
# import pandas as pd
# pdf = pd.read_csv("/dbfs/home/niall/koalas/sais_2020_csv/gbif_australia_2010_2020.csv, sep="\t")
# pdf.head()
But OOM
import databricks.koalas as ks
Chose distributed index (https://koalas.readthedocs.io/en/latest/user_guide/options.html#default-index-type )
ks.set_option('compute.default_index_type', 'distributed')
Read from delta lake
kdf = ks.read_delta("dbfs:/home/niall/koalas/sais_2020")
kdf.head()
Check size
kdf.shape
Use SQL query
genus_test = "Phascolarctos"
ks.sql("""
SELECT COUNT(*)
FROM {kdf}
WHERE genus = {genus_test}
""")
Grab the column names
kdf.columns
EDA
kdf.describe()
Count null values
kdf.isna().sum()
Keep data
kdf = kdf[kdf["individualCount"].notnull() & kdf["month"].notnull() & kdf["year"].notnull()]
Value counts
kdf["kingdom"].value_counts(normalize=True)
Viz
kdf["kingdom"].value_counts(normalize=True).plot.bar(rot=25, figsize=(12,8), title="Bar plot of kingdom column using Koalas DataFrame")
Number of unique classes
len(kdf["class"].unique())
Total count, the number of distinct recordings and the average count per recording for each class
# Group by the class column, sum up the individual counts for each group, and get the count of instances in each group.
# Note that using as_index=False resets the index
class_ks_df = kdf.groupby("class", as_index=False).agg({"individualCount": "sum", "gbifID": "count"})
# Renaming our columns to reflect the grouping aggregates
class_ks_df.columns = ["class", "total_count", "number_distinct_recordings"]
# Create a column which calculates the average number of animal/plant counts per each distinct recording for each class
class_ks_df["avg_count_per_recording"] = class_ks_df["total_count"] / class_ks_df["number_distinct_recordings"]
class_ks_df.sort_values(by="number_distinct_recordings", ascending=False).head(10)
Distribution of the average counts per recording using .hist()
class_ks_df["avg_count_per_recording"].hist(bins=100, figsize=(12,8), title="Histogram of average individualCount per observation (Aggregated to class level)")
class_ks_df.sort_values(by="avg_count_per_recording", ascending=False).head().style.background_gradient(cmap="Greens")
Filter down to the genus Phascolarctos
koalas_df = kdf[kdf.genus == "Phascolarctos"]
koalas_df.head()
koalas_df.shape[0]
Check the date range for which we have recordings of koala numbers
koalas_df["eventDate"].min(), koalas_df["eventDate"].max()
How many observations were recorded by each province from 2010 to May 2020?
koalas_df["stateProvince"].value_counts()
What is the distribution of the individual count of Koalas for each observation?
koalas_df["individualCount"].hist(bins=100, figsize=(12,8), title="Histogram of individualCount per recording for Koalas")
Remove any observations where over 100 koalas have been recorded for a single recording
koalas_df = koalas_df[koalas_df["individualCount"] < 100]
koalas_df["individualCount"].hist(bins=100, figsize=(12,8), title="Histogram of individualCount per recording for Koalas")
Number of occurences for koalas recorded, along with the total number of koalas counted and get the average number of koalas per recorded sighting.
# Get the sum of all koalas recorded in Australia for each month
koalas_sum_ks_df = koalas_df.groupby(["year", "month"], as_index=False)["individualCount"].sum()
# Get the count of all koalas occurrences in the Australia for each month
koalas_count_ks_df = koalas_df.groupby(["year", "month"], as_index=False)["individualCount"].count()
# Join the above Koalas DataFrames and divide the total_count column by num_recordings column
koalas_avg_cnt_ks_df = koalas_sum_ks_df.merge(koalas_count_ks_df, on=["year", "month"])
koalas_avg_cnt_ks_df.columns = ["year", "month", "total_count", "num_recordings"]
# Get number of koalas per recording nationally
koalas_avg_cnt_ks_df["avg_cnt"] = koalas_avg_cnt_ks_df["total_count"] / koalas_avg_cnt_ks_df["num_recordings"]
koalas_avg_cnt_ks_df.sort_values(by=["year", "month"], ascending=False).head(12)
Plot the count per month of koalas from 2010 to May 2020
# Concatenating the year and month columns to create a year_month column
koalas_avg_cnt_ks_df = ks.sql("""
SELECT year, month, num_recordings, total_count, avg_cnt, CONCAT(year, "-", month) AS year_month
FROM {koalas_avg_cnt_ks_df}
""")
koalas_avg_cnt_ks_df.head()
# Convert the year_month column to datetime (returns in format %Y-%m-%d)
koalas_avg_cnt_ks_df["year_month"] = ks.to_datetime(koalas_avg_cnt_ks_df["year_month"])
# Need to sort by year_month before plotting
koalas_avg_cnt_ks_df.sort_values(by=['year_month'], inplace=True)
Create NumPy arrays of the columns we would like to pass into our matplotlib
plot
# Creating numpy arrays from year_month, total_count and num_recordings
x_axis_vals = koalas_avg_cnt_ks_df["year_month"].to_numpy()
y1_vals = koalas_avg_cnt_ks_df["total_count"].to_numpy()
y2_vals = koalas_avg_cnt_ks_df["num_recordings"].to_numpy()
Plot monthly total counts and the number of monthly recordings for koalas from 2010 until May 2020
import matplotlib.pyplot as plt
import seaborn as sns; sns.set_style("white")
plt.rcParams.update({'font.size': 9})
fig, ax1 = plt.subplots(figsize=(12,8))
color = 'tab:red'
ax1.set_xlabel('year_month')
ax1.set_ylabel('total_count', color=color)
ax1.plot(x_axis_vals, y1_vals, color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('num_recordings', color=color)
ax2.plot(x_axis_vals, y2_vals, color=color)
ax2.tick_params(axis='y', labelcolor=color)
fig.tight_layout()
ax1.set_title("Monthly total count and number of recordings for Koalas in Australia from 2011 - 2018", size=9)
plt.show()
Plot the average number of koalas per recording for each month between 2010 and May of 2020
sns.set_style("whitegrid")
# Set year_month to be the index and keep only avg_cnt
koalas_avg_count_ks_series = koalas_avg_cnt_ks_df.set_index("year_month")["avg_cnt"]
plt.title("Monthly average koala count in Australia from 2010 - 2020", size=9)
plt.xlabel("date")
plt.ylabel("avg_num_koalas_per_occurence")
koalas_avg_count_ks_series.plot(figsize=(12, 8))
Parallelizing Model Training with Koalas
forecast future total numbers for genus
on a monthly basis
# First filter to the classes we would like to forecast for
filter_1 = (kdf["genus"] == "Phascolarctos") # Koala
filter_2 = (kdf["genus"] == "Macropus") # Kangaroo
filter_3 = (kdf["genus"] == "Cacatua") # Cockatoo
filter_4 = (kdf["genus"] == "Dromaius") # Emu
subset_kdf = kdf[filter_1 | filter_2 | filter_3 | filter_4]
# Get the total number of counts and total number of observations for each class in every month
grouped_kdf = subset_kdf.groupby(["genus", "year", "month"], as_index=False) \
.agg({"individualCount": "sum", "occurrenceID": "count"}) \
.sort_values(by=["year", "month"])
grouped_kdf.columns = ["genus", "year", "month", "total_count", "num_recordings"]
# Concatenating the year and month columns to create a year_month column
grouped_kdf = ks.sql("""
SELECT genus, year, month, num_recordings, total_count, CONCAT(year, "-", month) AS year_month
FROM {grouped_kdf}
ORDER BY year, month
""")
# Convert the year_month column to datetime (returns in format %Y-%m-%d)
grouped_kdf["year_month"] = ks.to_datetime(grouped_kdf["year_month"], format="%Y-%m")
grouped_kdf.head()
Utilize Koalas groupby-apply method
Apply an arbitrary python function to a group of our data, for example in this instance for each genus, we apply the fcst_func
function we have defined which fits a Prophet model and logs parameters and plots to MLflow. Note that the returned koalas.DataFrame can have different number rows and columns as the input.
Koalas makes use of Spark's pandas UDF functionality when implementing a groupby-apply method
When calling groupby-apply, Koalas executes the function once for a small sample to infer the type which can be potentially expensive; for example, where a dataset is created after aggregations or sorting. To avoid this, we specify a return type hint in function being applied.
When forecasting for each individual genus we will produce a number of different plots. To record these, we will use MLflow to log them as artifacts.
# MLflow set up
import mlflow
from mlflow.tracking import MlflowClient
# MLflow experiment (specify path for experiment to be set to)
# NOTE: you will need to change the following experiment path
# dependent on where you would like to save the model to
mlflow_exp = f"/Users/niall.turbitt@databricks.com/Koalas/Koalas SAIS 2020/koalas_sais20_exp"
mlflow.set_experiment(mlflow_exp)
# Get the experiment ID to track to
exp_id = MlflowClient().get_experiment_by_name(mlflow_exp).experiment_id
from fbprophet import Prophet
import numpy as np
import matplotlib.pyplot as plt
def fcst_func(pdf) -> ks.DataFrame[np.datetime64, np.float64, np.float64, np.int64, str]:
"""
Function to take grouped genus data, and provide 3-year monthly
forecast of total count. Log model plot as artifacts in MLflow
pdf: grouped genus data
"""
genus_name = pdf["genus"].iloc[0]
pdf["ds"] = pdf["year_month"]
pdf["y"] = pdf["total_count"]
df = pdf[["ds", "y"]].copy()
# log the run with MLflow
with mlflow.start_run(experiment_id=exp_id,
run_name=f"{genus_name} - Prophet Forecast") as run:
prophet_daily_seasonality = False
prophet_weekly_seasonality = False
prophet_yearly_seasonality = True
mlflow.log_params({"genus": genus_name,
"prophet_param_daily_seasonality": prophet_daily_seasonality,
"prophet_param_weekly_seasonality": prophet_weekly_seasonality,
"prophet_param_yearly_seasonality": prophet_yearly_seasonality})
m = Prophet(daily_seasonality=prophet_daily_seasonality,
weekly_seasonality=prophet_weekly_seasonality,
yearly_seasonality=prophet_yearly_seasonality,
seasonality_mode="additive")
m.fit(df)
future = m.make_future_dataframe(periods=36, freq="M")
fcst = m.predict(future)
fig_1 = m.plot(fcst)
plt.xlabel(f"Date")
plt.ylabel(f"{genus_name} Numbers (Monthly)")
plt.title(f"{genus_name} - Prophet Forecast")
fcst_path = f"/tmp/{genus_name}_forecast.png"
fig_1.savefig(fcst_path)
mlflow.log_artifact(fcst_path)
fig_2 = m.plot_components(fcst)
components_path = f"/tmp/{genus_name}_components.png"
fig_2.savefig(components_path)
mlflow.log_artifact(components_path)
plt.clf()
ret_pdf = fcst[["ds", "yhat", "yhat_lower", "yhat_upper"]]
ret_pdf["genus"] = [genus_name] * len(ret_pdf)
return ret_pdf
fcst_df = grouped_kdf.groupby("genus", as_index=False).apply(fcst_func)
Call len to run it
len(fcst_df)
Search the runs of our MLflow experiment programmatically using search_runs
test_genus = "Cacatua"
# The following returns a pandas DataFrame of runs
mlflow_res_pdf = mlflow.search_runs(filter_string=f"params.genus='{test_genus}'").sort_values("start_time", ascending=False)
mlflow_res_pdf
From our runs DataFrame, we can grab the artifact URI and load the forecast plot for that given run.
artifact_uri = mlflow_res_pdf.iloc[0]["artifact_uri"]
local_img_path = "/" + artifact_uri.replace(":", "") + f"/{test_genus}_forecast.png"
plt.figure(figsize = (12,8))
img = plt.imread(local_img_path)
plt.imshow(img)
plt.show()
Scaling Up AI Research To Production With PyTorch and MLFlow
What is PyTorch?
Eager & Graph-based execution; dynamic NN; distributed training; hardware acceleration; simplicity of complexity
torch.nn/optim/data/autograd/vision/jit (deployment)
PyTorch developer conference
import torch
class Net(torch.nn.Module):
def __initi__(self):
self.fc1 = torch.nn.Linear(8, 64)
self.fc2 = torch.nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.fc1.forward(x))
x = torch.dropout(x, p=0.5)
x = torch.sigmoid(self.fc2.forward(x))
return x
net = Net()
data_loader = torch.utils.data.DataLoader(torchvision.datasets.MNIST('./data'))
optimizer = torch.optim.SGD(net.parameters())
for epoch in range(1, 11):
for data, target in data_loader:
optimizer.zero_grad()
prediction = net.forward(data)
loss = F.nll_loss(prediction, target) # neg log-loss
loss.backward()
optimizer.step()
if epoch % 2 == 0:
torch.save(net, "net.pt")
https://pytorch.org/ecosystem/
Pruning - hard to deploy large NN. Optimal technique to compress model by reducing the number of parameters without sacrificing accuracy
Quantization - experimental - inference at scale e.g. mobile
Pytorch elastic - fault tolerant and dynamic
Pytorch RPC (remote procedure call) - scale out applications
Torceserve - Http APIs - partnership with AWS
mlflow.pytorch
PyTorch 1.5 released on April 21st
torchvision 0.6; torchtext 0.6; torchaudio 0.5
https://www.udacity.com/course/deep-learning-pytorch--ud188 - free course
Anomaly Detection At The Edge
Compute close to the data source
Autonomous vehicles - connected cars, increase situational awareness
Manufacturing - predictive maintenance
Telecommunications - quality of service
Retail and consumer goods - hypertargeting
healthcare - personalization
energy and utilities - Improve efficiency
Federated learning
One pass algorithms - moments of univariate/multivariate data
Incremental - numerical stability
Statistical, DL/RL techniques:
Time series & PCA - ARIMA, Sparse PCA
LSTM - Stochastic RNN
Attention
Autoencoder
Transformer
Gans - GANomaly
Sketching - deal with edge/data issues
Models:
Squeezed convolutional Variational autoencoder
Edge-based RNN
Federated XGBoost
CNN-variational autoencoder
One class SVM
GRU
Features - subcomponent timing
Saving Energy In Homes With A Unified Approach To Data And AI
Data workflow
IoT Stream + Click data + SQL -> delta lake
delta lake -> batch processing -> services API
delta lake -> notebooks
https://github.com/quby-io/databricks-workflow
staging/production/integration test workflows
Efficiently Building Machine Learning Models For Predictive Maintenance In The Oil & Gas Industry with Databricks
Reduce annual maintenace cost by 10% based on avoiding failure modes
For time series - Welch Fourier Transform for each window
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def fill_pdf(pdf):
for col in col_list:
pdf.loc[:, col] = pdf[[col]].ffill().bfill()
return pdf
new_df = df.groupBy('FIELD').apply(fill_pdf)
Geosp.AI.tial: Applying Big Data and Machine Learning to Solve the World's Toughest Geospatial Intelligence Problems
Land classification with Black Marble
packages: GeoMesa, GeoTrellis, GeoPandas and GeoSpark
Geospatial Analytics in Apache Spark
Indexing is difficult. Other options: Geohash (elastic), Quadtree, H3 (uber)
Geopandas - https://geopandas.org/
Scikit-mobility (analysis of human mobility) - https://scikit-mobility.github.io/scikit-mobility/
Moving-pandas - https://movingpandas.readthedocs.io/en/latest/
RasterFrames - https://rasterframes.io/
GeoSpark - https://datasystemslab.github.io/GeoSpark/
Large-scale joins e.g. Microsoft building dataset with the US Census Blocks to get stats on average square foot density.
Bring Satellite & Drone Imagery into Your Data Science Workflows
https://aerialservicesinc.com/earth-on-demand/
https://earthondemand.astraea.earth/
California fire incidents.