Use pandas Merge_Asof in Databricks to range join dataframes.
The below exmple merge df_l to df_r on times column.
For every row from df_l, find the row from df_r with the max 'time' <= the 'time' form df_l, i.e. merge_asof in pandas.
In databricks, the cogroup can apply a pandas function on the left & right grouped dataframes.
The drawback is that it can not apply the funciton on two dataframes directly, but need to be on two groupby dataframes.
Also the congroup will only compare cogroups with the same id, e.g. left group with id = 1 is only merged with right group with id =1.
import pandas as pd
df_r = spark.createDataFrame(
[(20000101, 1, '1.0'),
(20000103, 2, '3.0'),
(20000105, 2, '5.0'),
(20000107, 2, '7.0')],
("time", "id_r", "value_r"))
df_l = spark.createDataFrame(
[(20000104, 1, "4"),
(20000106, 2, "6")],
("time", "id_l", "value_l"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time")
df_l.groupby("id_l").cogroup(df_r.groupby("id_r")).applyInPandas(
asof_join, schema="time int, id_l int, value_l string, id_r int, value_r string").show()
In this example we get below result. Note df_r has a timestamp 20000103 closer to the 20000104, but df_r with id = 1 was chosen because the cogroup needs id_l == id_r so only rows with id=1 is considered.
+--------+----+-------+----+-------+
| time|id_l|value_l|id_r|value_r|
+--------+----+-------+----+-------+
|20000104| 1| 4| 1| 1.0|
|20000106| 2| 6| 2| 5.0|
+--------+----+-------+----+-------+
A work around to merge_asof for two dataframes is to set a dummy id column which is the same value for both dataframes.
Like in below, the id value is always 1 for both dataframes, so databricks would compare the two dataframes.
import pandas as pd
df_r = spark.createDataFrame(
[(20000101, 1, '1.0'),
(20000103, 1, '3.0'),
(20000105, 1, '5.0'),
(20000107, 1, '7.0')],
("time", "id_r", "value_r"))
df_l = spark.createDataFrame(
[(20000104, 1, "4"),
(20000106, 1, "6")],
("time", "id_l", "value_l"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time")
df_l.groupby("id_l").cogroup(df_r.groupby("id_r")).applyInPandas(
asof_join, schema="time int, id_l int, value_l string, id_r int, value_r string").show()
Note that databricks need to shuffle cogroup data with the same id to do the merge, so if merging on the dummy id, the whole dataframe will be shuffled (broadcasted?) and may cause out of memory issue.