Range query join hint

range query


In Spark/databricks, there is a type of query that checks if a value falls within a range

e.g.

select *

  from table1 a

       join table2 b

  on a.time > b.start and a.time < b.end

   

In traditional database, if there is an index built on a.time, then for every row in table2

it just needs to go straight for the table1 records that fall within range (a,b), fast.


In Spark/databricks, however there seems no such index thing/join strategy, so the default join strategy is to brute-force / nested loop.

It needs to compare every single pair of records from table1 and table2 to complete the join, which is super expensive.


The trouble is from the inequations in the join conditions. 

If if a.time == b.time, it will be easier for Spark because it can probably sort both table1 and table2 first, and then merge join accordingly.

Now it is looking at both start and end. There can be many records with a.time > b.start, and there can be many records with a.time < b.end.

The sort and merge doesn't work anymore for spark.


One idea is to separate the time into buckets / bins.

E.g. bin = day

so for table 1, it can virtually have an extra column 'bin' which is the day of the time.

for table 2, it's bin column has all the days for the (start, end) range, or imagine expanding the row to multiple rows if there are multiple days for the range.

After that, spark can run a join on the bin column, on a.day = b.day. In this way, It converts a range join, to a point join.

As there may be multiple records match the day, so it just needs to run a second round to check if a.time is actually within (b.start, b.end) range to get the final correct results.

However, the previous point join has already reduced the size of data to be checked, so performance is improved greatly compared to brute-force.


Above is just my guess. The documentation doesn't mention that at all. Sucks.

https://docs.microsoft.com/en-us/azure/databricks/delta/join-performance/range-join


Here are some examples of using it as join hint in Databricks:

select /*+ RANGE_JOIN(table1, 1) */ *

  from table1 a

       join table2 b

  on a.time > b.start and a.time < b.end

or

   

df1.hint("range_join", 1)

   .join(df2, (..join conditions), 'inner')

   

If the time is date format, then the join hint number is #days, so it would be 1 in this example.

If the time is timestamp format, then the join hint number is #seconds, so it would be 60 * 60 * 24 in this example.

The join hint also work for numeric type.