Keeping Track of Your Company's Revenue in Real-time

7/14/19

Plotting real-time data with Databricks

Why do you need real-time data?

In a fast-paced dynamic business environment it is optimal for revenue data to be available in real-time. Think of the stock market for example: data can be obtained at a one second frequency or even millisecond frequency in some cases. The high-frequency data is extremely valuable to traders who track the stock-market in real-time for optimal stock purchases. Having the latest revenue is vital for businesses to make smart data-driven-decisions. Not to mention it is very motivational and rewarding for teams to see the "fruits of their labor".

Unfortunately, in large corporations real-time data is not always available. There are often various moving parts with revenue coming from a wide range of sources. The purchases often go through some kind of ETL (Extract, Transform, Load) and land in a central repository. This can take time and sometimes these ETL jobs are scheduled daily so the data is only available at a daily frequency.

In this blog post I will not focus on how to obtain real-time data. I will instead create some fake real-time data and develop a visualization for monitoring the real-time data.

Open-source tools

I am a huge proponent of using open-source tools. I believe a community lead effort is the best approach for advancing business efficiency and data science. It does however requires businesses to acknowledge this and either support employees to develop/contribute to open-source tools or donate money to programs such as NumFOCUS. The JupyterLab team wrote a great article sharing a similar idea here.

To generate the real-time data we will use scala because of its use in Apache Kafka. Taken from the website: 'Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.'. For the plotting we will use Databricks. Taken from the website: 'Databricks Unified Analytics Platform, from the original creators of Apache Spark™, unifies data science and engineering across the Machine Learning lifecycle from data preparation, to experimentation and deployment of ML applications.'. Databricks isn't entirely open source but it does offer free usage via a community version that we will use. It also uses Apache Spark under the hood which is open source.

Generating real-time data

I will talk through the data steps for brevity and provide the source code at the end.

First create an object called DummyDataGenerator which will connect to and processes a fake, fire-hose of purchase data for five products. To keep your code clean I would advise writing this to a new python notebook called Stream-Generator. You will then import this object in another python notebook by running:

%run "./Stream-Generator"

Next remove any data and generate streaming data for five minutes:

%scala
DummyDataGenerator.clean()
DummyDataGenerator.start(5)

Use spark.readStream to read the stream of data and lastly display the data using display. You can then play with Databrick's built in visualization tools to monitor the stream of product purchases. In the GIF below I start by showing a time series of the revenue generated over time. I then add ProductId to the grouping so you can see what is being purchased. Next I add the ProductId to 'Keys' so you can keep track of what products are selling better/worse than others. Last I just use Revenue in the Values to keep track of the total amount of revenue generated.

Source code

Stream-Generator:

class DummyDataGenerator:
  streamDirectory = "dbfs:/tmp/{}/new-purchases".format("EMAIL") # Whatever e-mail you used to sign up for databricks
None # suppress output

%scala

import scala.util.Random
import java.io._
import java.time._

// http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
// https://stackoverflow.com/questions/33297689/number-reduce-tasks-spark
spark.conf.set("spark.sql.shuffle.partitions", 200)

// Make the username available to all other languages.
val username = com.databricks.logging.AttributionContext.current.tags(com.databricks.logging.BaseTagDefinitions.TAG_USER);
spark.conf.set("com.databricks.training.username", username)

object DummyDataGenerator extends Runnable {
  var runner : Thread = null;
  val className = getClass().getName()
  val streamDirectory = s"dbfs:/tmp/$username/new-purchases"

  val rand = new Random(System.currentTimeMillis())
  var maxDuration = 3 * 60 * 1000 // default to a couple of minutes

  def clean() {
    System.out.println("Removing old files for dummy data generator.")
    dbutils.fs.rm(streamDirectory, true)
    if (dbutils.fs.mkdirs(streamDirectory) == false) {
      throw new RuntimeException("Unable to create temp directory.")
    }
  }
  
  def run() {
    val date = LocalDate.now()
    val start = System.currentTimeMillis()
    
    while (System.currentTimeMillis() - start < maxDuration) {
      try {
        val dir = s"/dbfs/tmp/$username/new-purchases"
        val tempFile = File.createTempFile("purchases-", "", new File(dir)).getAbsolutePath()+".csv"
        val writer = new PrintWriter(tempFile)
        
        val productId = rand.nextInt(5)+1
        val purchaseTime = LocalDateTime.now().plusHours(-4)
        val revenue = rand.nextInt(1000)+1

        println(s"- Product #$productId sold at $purchaseTime for $revenue dollars")
        writer.println(s""" "$productId","$purchaseTime","$revenue" """.trim)
        writer.close()
      
        // wait a couple of seconds
        Thread.sleep(rand.nextInt(5000))

      } catch {
        case e: Exception => {
          printf("* Processing failure: %s%n", e.getMessage())
          return;
        }
      }
    }
    println("No more purchases!")
  }

  def start(minutes:Int = 5) {
    maxDuration = minutes * 60 * 1000

    if (runner != null) {
      println("Stopping dummy data generator.")
      runner.interrupt();
      runner.join();
    }
    println(s"Running dummy data generator for $minutes minutes.")
    runner = new Thread(this);
    runner.start();
  }

  def stop() {
    start(0)
  }
}

DummyDataGenerator.clean()

displayHTML("Imported streaming logic...") // suppress output

Real-time-plotting:

%run "./Stream-Generator"


%scala

// Clean any temp files from previous runs.
DummyDataGenerator.clean()

// Generate data for 5 minutes.
// To force it to stop rerun with 0.
DummyDataGenerator.start(5)

from pyspark.sql.functions import col, date_format, unix_timestamp, window
from pyspark.sql.types import StructType
spark.conf.set("spark.sql.shuffle.partitions", "8")

purchaseSchema = (StructType()
  .add("ProductId", "integer")
  .add("PurchaseTime", "string")                  
  .add("Revenue", "integer")
)

streamingDF = (spark.readStream
  .schema(purchaseSchema)
  .csv(DummyDataGenerator.streamDirectory)
  .withColumn("PurchaseTime", unix_timestamp("PurchaseTime", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp"))
  .withWatermark("PurchaseTime", "5 minute")
  .groupBy(window("PurchaseTime", "1 seconds"), "ProductId")
  .sum("Revenue")
  .select(col("window.start").alias("Start"), "ProductId", col("sum(Revenue)").alias("Revenue"))
  .orderBy("start", "ProductId")
  .select(date_format("start", "HH:mm:ss").alias("Time"), "ProductId", "Revenue")
)
display(streamingDF)