Introduction to Data Science and Machine Learning

July 2019

Log into databricks

Importing the course:

  • In the left sidebar, click Home.
  • Right-click your home folder, then click Import.
  • Select browse and then choose your course Lessons.dbc file.
  • Click the Import button.
  • Click the Home icon in the left sidebar.
  • Select your home folder.
  • Select the sub-folder matching the name of your course (e.g. Introduction-to-Data-Science-and-Machine-Learning-1.0.1).
  • Open Lesson 01-Course-Overview-and-Setup.

Hit the keyboard sign in the top right to sheet keyboard shortcuts

01: Course Overview and Setup

Machine Learning Drives Business Value:

  • A/B Testing: offering users different versions of a website indicates what the most successful version will be
  • Natural Language Processing: analyzing natural language using statistical methods allow for the algorithmic understanding of language, empowering sentiment analysis, chatbots, and many other applications
  • Financial Forecasting: future company financials can be predicted using past data
  • Churn Analysis: determine whether a customer might leave a business determines the best way to retain customers

02: What is ML

Estimate a function (f) that maps the relationship between input features and the output variable.

Read in the Boston housing data

bostonDF = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
)

display(bostonDF)

Spark differs from many other machine learning frameworks in that we train our model on a single column that contains a vector of all of our features. Prepare the data by creating one column named features that has the average number of rooms, crime rate, and poverty percentage.

Add a column to the DF that has a vector of all our features of interest using the VectorAssembler:

from pyspark.ml.feature import VectorAssembler

featureCols = ["rm", "crim", "lstat"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

bostonFeaturizedDF = assembler.transform(bostonDF)

display(bostonFeaturizedDF)

This column has sparse vector notation. Find out more here

Regression vs Classification

Variables can either be quantitative or qualitative:

  • Quantitative values are numeric and generally unbounded, taking any positive or negative value | continuous, numerical | e.g. age, salary, temperature
  • Qualitative values take on a set number of classes or categories | categorical, discrete | gender, whether or a not a patient has cancer, state of residence

Machine learning models operate on numbers so a qualitative variable like gender, for instance, would need to be encoded as 0 for male or 1 for female.

a supervised model learning a quantitative variable is called regression and a model learning a qualitative variable is called classification.

Import linear regression and set the output to be the medv variable and the input to be the features:

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="medv", featuresCol="features")

Fit the data:

lrModel = lr.fit(bostonFeaturizedDF)

You can see the fit by doing:

print("Coefficients: {0:.1f}, {1:.1f}, {2:.1f}".format(*lrModel.coefficients))
print("Intercept: {0:.1f}".format(lrModel.intercept))

Coefficients: 5.2, -0.1, -0.6 Intercept: -2.6

You can interpret the output as

predicted home value = (5.2 x number of rooms) - (.1 x crime rate) - (.6 x % lower class) - 2.6

Let's see how it predicts on data it has already seen as well as new data.

Look at the first 10 rows

subsetDF = (bostonFeaturizedDF
  .limit(10)
  .select("features", "medv")
)

display(subsetDF)

Use the transform method on the trained model to see its prediction.

Now that lrModel is a trained estimator, we can transform data using its .transform() method.

predictionDF = lrModel.transform(subsetDF)

display(predictionDF)

Now predict off of a hypothetical data point of a 6 bedroom home with a 3.6 crime rate and 12 % average lower class. According to our formula, the model should predict about 21:

from pyspark.ml.linalg import Vectors

data = [(Vectors.dense([6., 3.6, 12.]), )]              # Creates our hypothetical data point
predictDF = spark.createDataFrame(data, ["features"])

display(lrModel.transform(predictDF))

Exercise: Train a Model and Create Predictions

# Initiate data
from pyspark.ml.feature import VectorAssembler

featureCols = ["indus", "age", "dis"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="newFeatures")
bostonFeaturizedDF2 = assembler.transform(bostonDF)

# Initiate model and fit training data
from pyspark.ml.regression import LinearRegression

lrNewFeatures = LinearRegression(labelCol="medv", featuresCol="newFeatures")
lrModelNew = lrNewFeatures.fit(bostonFeaturizedDF2)

# Predict values
from pyspark.ml.linalg import Vectors

data = [(Vectors.dense([11., 68., 4.]), ),
        (Vectors.dense([6., 35., 2.]), ),
        (Vectors.dense([19., 74., 8.]), )]
newDataDF = spark.createDataFrame(data, ["newFeatures"])
predictionsDF = lrModelNew.transform(newDataDF)

display(predictionsDF)

Question: What's the difference between the fit and transform methods on a machine learning model?

Answer: The fit method learns something from the data and saves the result in the model object (e.g. lrModel above). The transform method applies the learned pattern to unseen data.transform can be used to make predictions using a trained model or to change the data in some other way, such as encoding a categorical feature. We'll explore this more in later lessons.

Further reading:

An Introduction to Statistical Learning

https://github.com/databricks/spark-sklearn (DEPRECATED)

https://databricks.com/blog/2019/06/07/hyperparameter-tuning-with-mlflow-apache-spark-mllib-and-hyperopt.html

https://github.com/hyperopt/hyperopt

https://databricks.com/blog/category/engineering/machine-learning

03: Exploratory-Analysis

Exploratory Data Analysis (EDA).

Exploratory analysis focuses on the following:

Gain basic intuition into the data:

  • What does each feature represent?
  • Are your features categorical or continuous?
  • What data needs to be encoded? (e.g. mapping a variable for gender to a number)
  • What data types are you working with? (e.g. integers, strings)

How is the data distributed?:

  • What is the mean, median, and/or mode of each variable?
  • What is the variance, or spread, of each variable?
  • Are there outliers? Missing values?

How can we visualize the data?:

  • Histograms, scatterplots, and boxplots
  • Correlation plots

What hypotheses will I test with my models?:

  • What features are correlated?
  • What are our ideal features and how can we build them if they're not already available?


Import the data and use the .describe() method on it:

bostonDF = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
  .drop("_c0")
)

display(bostonDF.describe())

Now display one variable:

display(bostonDF.select("medv").describe())

Plot a histogram of the median value in the housing dataset. Display it then click on the plotting option and visualize it as a histogram

display(bostonDF.select("medv"))

Look at how the number of rooms compares to the median value. Select the scatterplot option from plotting dropped menu. The click 'Plot Options...' -> 'Show LOESS'.

display(bostonDF.select("rm", "medv"))

Databricks notebooks can display plots generated in pure Python:

  • Import matplotlib, pandas, and seaborn
  • Create a fig object
  • Use the .toPandas() DataFrame method to turn the Spark DataFrame into a Pandas DataFrame. This way we can use Python's plotting libraries
  • Use the scatter_matrix pandas function to plot a matrix of scatterplots

Plot a scatter matrix using the pandas Python library.

%python
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns # importing for a better color scheme

try:
  bostonDF
except NameError: # Looks for local table if bostonDF not defined
  bostonDF = spark.table("boston")

fig, ax = plt.subplots()
pandasDF = bostonDF.select("rm", "crim", "lstat", "medv").toPandas()

pd.scatter_matrix(pandasDF)

display(fig.figure)

To deepen our understanding of correlation, add columns to the bostonDF

from pyspark.sql.functions import col, rand

bostonWithDummyDataDF = (bostonDF
  .select("medv")
  .withColumn("medvX3", col("medv")*3)
  .withColumn("medvNeg", col("medv")*-3)
  .withColumn("random1", rand(seed=41))
  .withColumn("random2", rand(seed=44))
  .withColumn("medvWithNoise", col("medv")*col("random1"))
  .withColumn("medvWithNegativeNoise", col("medv")*col("random1")*-1)
)

display(bostonWithDummyDataDF)

for col in bostonWithDummyDataDF.columns:
  correlation = bostonWithDummyDataDF.stat.corr("medv", col)
  
  print("The correlation between columns 'id' and '{}': \t{}".format(col, correlation))

There are a number of other helpful visualizations depending on the needs of your data:

  • Heat maps: similar to a scattermatrix, heatmaps can be especially helpful at visualizing correlations between variables
  • Box plots: visualizes quantiles and outliers
  • Q-Q Plots: visualizes two probability distributions
  • Maps and GIS: visualizes geographically-bound data
  • t-SNE: plots high dimensional data (i.e. data that has many variables) by projecting it down into two-dimensional plot
  • Time series: plots time-bound variables including run charts, lag plots, and wavelet spectrograms

Assemble all the feature into a single column features. so we can use Spark's built-in correlation functionality.

%python
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=bostonDF.columns, outputCol="features")

bostonFeaturizedDF = assembler.transform(bostonDF)

and calculate the correlations across the entire dataset

%python
from pyspark.ml.stat import Correlation

pearsonCorr = Correlation.corr(bostonFeaturizedDF, 'features').collect()[0][0]
pandasDF = pd.DataFrame(pearsonCorr.toArray())

pandasDF.index, pandasDF.columns = bostonDF.columns, bostonDF.columns # Labels our index and columns so we can interpret the results

Plot the results as a heatmap

%python
import matplotlib.pyplot as plt
import seaborn as sns

fig, ax = plt.subplots()
sns.heatmap(pandasDF)
display(fig.figure)

Exercise: EDA on the Bike Sharing Dataset

Calculate the count, mean, and standard deviation for each variable in the dataset. What does each variable signify? What is the spread of the data?

bikeDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bikeSharing/data-001/hour.csv")
  .drop("instant", "dteday", "casual", "registered", "holiday", "weekday")
)

display(bikeDF.describe())

Create a histogram of the variable cnt.

display(bikeDF.select("cnt")) # Then click on the histogram plot

Create a barplot of counts by hour

display(bikeDF.select("hr", "cnt")) # Then click on barplot then group by hours and put cnt as values

Create a scattermatrix.

%python
# TODO
fig, ax = plt.subplots()
pandasDF = bikeDF.select("hr", "cnt").toPandas()
#pandasDF = bikeDF.toPandas()
pd.scatter_matrix(pandasDF)
display(fig.figure)

Calculate the correlations of the different variables.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(inputCols=bikeDF.columns, outputCol="features")
bikeFeaturizedDF = assembler.transform(bikeDF)

pearsonCorr = Correlation.corr(bikeFeaturizedDF, 'features').collect()[0][0]
pandasDF = pd.DataFrame(pearsonCorr.toArray())
pandasDF.index, pandasDF.columns = bikeDF.columns, bikeDF.columns

Plot the correlations as a heatmap.

%python
fig, ax = plt.subplots()
sns.heatmap(pandasDF)
display(fig.figure)

The main goal of exploratory analysis is to build intuition from the dataset that will determine future modeling decisions. Having a limited understanding of the data leads to downstream problems like unexpected outputs and errors as well as under-performing models.

Further reading:

https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

https://github.com/apache/spark/blob/cad440d1f59d9bb674e06b0f656d6bb9d7872bf9/python/pyspark/sql/functions.py#L144

https://spark.apache.org/docs/latest/ml-statistics.html

https://spark.apache.org/docs/latest/mllib-statistics.html#kernel-density-estimation

04: ML Workflows

CRISP-DM approach (Cross Industry Standard Process for Data Mining)

  1. Business and Data Understanding: ensures a rigorous understanding of both the business problem and the available data
  2. Data Preparation: involves cleaning data so that it can be fed into algorithms and create new features
  3. Modeling: entails training many models and many combinations of parameters for a given model
  4. Evaluation: compares model performance and chooses the best option
  5. Deployment: launches a model into production where it's used to inform business decision-making

Train/Test Split

spliting avoids the memorization of data, known as overfitting. Overfitting occurs when our model learns patterns caused by random chance rather than true signal. By evaluating our model's performance on unseen data, we can minimize overfitting.

Import the Boston dataset. 
bostonDF = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
)

display(bostonDF)

Split the dataset into two DataFrames:

trainDF, testDF = bostonDF.randomSplit([0.8, 0.2], seed=42)
print("We have {} training examples and {} test examples.".format(trainDF.count(), testDF.count()))

Baseline Model

A baseline model offers an educated best guess to improve upon as different models are trained and evaluated. It represents the simplest model we can create. This is generally approached as the center of the data. In the case of regression, this could involve predicting the average of the outcome regardless of the features it sees. In the case of classification, the center of the data is the mode, or the most common class. A baseline model could also be a random value or a preexisting model. Through each new model, we can track improvements with respect to this baseline.

Create a baseline model by calculating the average housing value in the training dataset and append it on the test dataset:

from pyspark.sql.functions import avg
from pyspark.sql.functions import lit

trainAvg = trainDF.select(avg("medv")).first()[0]
print("Average home value: {}".format(trainAvg))

testPredictionDF = testDF.withColumn("prediction", lit(trainAvg))
display(testPredictionDF)

Evaluation and Improvement

The most common evaluation metric in regression tasks is mean squared error (MSE). The result is always positive and the lower the MSE, the better the model is performing.

Define the evaluator with the prediction column, label column, and use the MSE metric.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv", metricName="mse")

Evaluate testPredictionDF using the .evaluator() method.

testError = evaluator.evaluate(testPredictionDF)
print("Error on the test set for the baseline model: {}".format(testError))

This score indicates that the average squared distance between the true home value and the prediction of the baseline is about 79. Taking the square root of that number gives us the error in the units of the quantity being estimated. In other words, taking the square root of 79 gives us an average error of about $8,890.

bikeDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bikeSharing/data-001/hour.csv")
  .drop("instant", "dteday", "casual", "registered", "holiday", "weekday") # Drop unnecessary features
)
display(bikeDF)

trainBikeDF, testBikeDF = bikeDF.randomSplit([0.7, 0.3], seed=42)

avgTrainCnt = trainBikeDF.select(avg("cnt")).first()[0]
bikeTestPredictionDF = testBikeDF.withColumn("prediction", lit(avgTrainCnt))

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="mse")
testError = evaluator.evaluate(bikeTestPredictionDF) # ~= 33255.26300731152 

Use a linear regression model (explored in the previous lesson) to beat the baseline model score.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

featureCols = ["hr", "workingday", "weathersit", "temp", "atemp", "hum", "windspeed"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="newFeatures")
bikeFeaturizedDF2 = assembler.transform(bikeDF)

trainBikeFeaturizedDF, testBikeFeaturizedDF = bikeFeaturizedDF2.randomSplit([0.7, 0.3], seed=42)

lr = LinearRegression(labelCol="cnt", featuresCol="newFeatures")
lrModel = lrNewFeatures.fit(trainBikeFeaturizedDF)

predictionsFeaturizedDF = lrModel.transform(testBikeFeaturizedDF)

bikeevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="mse")
biketestError = evaluator.evaluate(predictionsFeaturizedDF)
print(biketestError) # 22663.224110566814 

05: Featurization

MLlib can refer to both the general machine learning library in Spark or the RDD-specific API. SparkML refers to the DataFrame-specific API, which is preferred over working on RDD's wherever possible.

Spark's machine learning library, MLlib, has three main abstractions:

  1. A transformer takes a DataFrame as an input and returns a new DataFrame with one or more columns appended to it.
    • Transformers implement a .transform() method.
  2. An estimator takes a DataFrame as an input and returns a model, which itself is a transformer.
    • Estimators implements a .fit() method.
  3. A pipeline combines together transformers and estimators to make it easier to combine multiple algorithms.
    • Pipelines implement a .fit() method.

Featurization is the process of creating this input data for a model. There are a number of common featurization approaches:

  • Encoding categorical variables
  • Normalizing
  • Creating new features
  • Handling missing values
  • Binning/discretizing


Categorical features refer to a discrete number of groups. In the case of the AirBnB dataset we'll use in this lesson, one categorical variable is room type. There are three types of rooms: Private room, Entire home/apt, and Shared room.

A machine learning model does not know how to handle these room types. Instead, we must first encode each unique string into a number. Second, we must one-hot encode each of those values to a location in an array. This allows our machine learning algorithms to model effects of each category.

Import the AirBnB dataset.

airbnbDF = spark.read.parquet("/mnt/training/airbnb/sf-listings/sf-listings-correct-types.parquet")
display(airbnbDF)

Take the unique values of room_type and index them to a numerical value. Fit the StringIndexer estimator to the unique room types using the .fit() method and by passing in the data.

The trained StringIndexer model then becomes a transformer. Use it to transform the results using the .transform() method and by passing in the data.

from pyspark.ml.feature import StringIndexer

uniqueTypesDF = airbnbDF.select("room_type").distinct() # Use distinct values to demonstrate how StringIndexer works

indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index") # Set input column and new output column
indexerModel = indexer.fit(uniqueTypesDF) # Fit the indexer to learn room type/index pairs
indexedDF = indexerModel.transform(uniqueTypesDF) # Append a new column with the index

display(indexedDF)

Now each room has a unique numerical value assigned. While we could pass the new room_type_index into a machine learning model, it would assume that Shared room is twice as much as Entire home/apt, which is not the case. Instead, we need to change these values to a binary yes/no value if a listing is for a shared room, entire home, or private room.

Do this by training and fitting the OneHotEncoderEstimator, which only operates on numerical values (this is why we needed to use StringIndexer first)

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(inputCols=["room_type_index"], outputCols=["encoded_room_type"])
encoderModel = encoder.fit(indexedDF)
encodedDF = encoderModel.transform(indexedDF)

display(encodedDF).

Certain models, such as random forest, do not need one-hot encoding (and can actually be negatively affected by the process).

The new column encoded_room_type is a vector. The difference between a sparse and dense vector is whether Spark records all of the empty values. In a sparse vector, like we see here, Spark saves space by only recording the places where the vector has a non-zero value. The value of 0 in the first position indicates that it's a sparse vector. The second value indicates the length of the vector.

Here's how to read the mapping above:

  • Shared room maps to the vector [0, 0]
  • Entire home/apt maps to the vector [0, 1]
  • Private room maps to the vector [1, 0]

Imputing Null or Missing Data

Null values refer to unknown or missing data as well as irrelevant responses. Strategies for dealing with this scenario include:

  • Dropping these records: Works when you do not need to use the information for downstream workloads
  • Adding a placeholder (e.g. -1): Allows you to see missing data later on without violating a schema
  • Basic imputing: Allows you to have a "best guess" of what the data could have been, often by using the mean of non-missing data
  • Advanced imputing: Determines the "best guess" of what data should be using more advanced strategies such as clustering machine learning algorithms or oversampling techniques such as SMOTE.

Try to determine why a value is null. This can provide information that can be helpful to the model.

Describe the dataset and take a look at the count values. There's a fair amount of missing data in this dataset.

display(airbnbDF.describe())

Try dropping missing values.

countWithoutDropping = airbnbDF.count()
countWithDropping = airbnbDF.na.drop(subset=["zipcode", "host_is_superhost"]).count()

print("Count without dropping nulls:\t", countWithoutDropping)
print("Count with dropping nulls:\t", countWithDropping)

Another common option for working with missing data is to impute the missing values with a best guess for their value. Try imputing a list of columns with their median.

from pyspark.ml.feature import Imputer

imputeCols = [
  "host_total_listings_count",
  "bathrooms",
  "beds", 
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
]

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
imputerModel = imputer.fit(airbnbDF)
imputedDF = imputerModel.transform(airbnbDF)

display(imputedDF)

Creating a Pipeline

Passing around estimator objects, trained estimators, and transformed dataframes quickly becomes cumbersome. Spark uses the convention established by scikit-learn to combine each of these steps into a single pipeline. We can now combine all of these steps into a single pipeline.

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
  indexer, 
  encoder, 
  imputer
])

The pipeline is itself is now an estimator. Train the model with its .fit() method and then transform the original dataset. We've now combined all of our featurization steps into one pipeline with three stages.

pipelineModel = pipeline.fit(airbnbDF)
transformedDF = pipelineModel.transform(airbnbDF)

display(transformedDF)

Exercise: Finish Featurizing the Dataset

One common way of handling data is to divide it into bins, a process technically known as discretizing. For instance, the dataset contains a number of rating scores that can be translated into a value of 1 if they are a highly rated host or 0 if not.

Finish featurizing the dataset by binning the review scores rating into high versus low rated hosts. Also filter the extreme values and clean the column price.

Step 1: Binning review_scores_rating

Divide the hosts by whether their review_scores_rating is above 97. Do this using the transformer Binarizer with the output column high_rating. This should create the objects binarizer and the transformed DataFrame transformedBinnedDF.

from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=97, inputCol="review_scores_rating", outputCol="high_rating")

transformedBinnedDF = binarizer.transform(transformedDF)

Step 2: Regular Expressions on Strings

Clean the column price by creating two new columns:

  1. price: a new column that contains a cleaned version of price. This can be done using the regular expression replacement of "[\$,]" with "". Cast the column as a decimal.
  2. raw_price: the column price in its current form
from pyspark.sql.functions import col, regexp_replace

tmp = transformedBinnedDF.select(
  "*",
  col('price').alias('price_raw')
).drop('price')

transformedBinnedRegexDF = tmp.select(
  "*", 
  regexp_replace('price_raw', r'[\$,]', '').alias('price').cast("Decimal")
)

display(transformedBinnedRegexDF)

Further reading:

http://spark.apache.org/docs/latest/ml-features.html

06: Regression Modeling

Univariate regress: 2 variables

Multivariate regression: Multiple features.

Take the example of Boston housing data where we have median value for a number of neighborhoods and variables such as the number of rooms, per capita crime, and economic status of residents. We might have a number of questions about this data including:

  1. Is there a relationship between our features and median home value?
  2. If there is a relationship, how strong is that relationship?
  3. Which of the features affect median home value?
  4. How accurately can we estimate the effect of each feature on home value?
  5. How accurately can we predict on unseen data?
  6. Is the relationship between our features and home value linear?
  7. Are there interaction effects (e.g. value goes up when an area is not industrial and has more rooms on average) between the features?

There is a trade-off between model accuracy and interpretability

y = beta_0 + beta_1 * X

In this case, β0 and β1 are our coefficients where β0 represents the line's intercept with the Y axis and β1 represents the number we multiply by X in order to attain a prediction.

In the case of inferential statistics where we're interested in learning about the relationship between our input features and outputs, it's common to skip the train/test split step, as you'll see in this lesson.

Import the Boston dataset. 
bostonDF = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
  .drop("_c0")
)

display(bostonDF)

Create a column features that has a single input variable rm by using VectorAssembler

from pyspark.ml.feature import VectorAssembler

featureCol = ["rm"]
assembler = VectorAssembler(inputCols=featureCol, outputCol="features")

bostonFeaturizedDF = assembler.transform(bostonDF)

display(bostonFeaturizedDF)

Fit a linear regression model.

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="medv")

lrModel = lr.fit(bostonFeaturizedDF)

Interpreting a linear model entails answering a number of questions:

  1. What did the model estimate my coefficients to be?
  2. Are my coefficients statistically significant?
  3. How accurate was my model?
print("β0 (intercept): {}".format(lrModel.intercept))
print("β1 (coefficient for rm): {}".format(*lrModel.coefficients))

For a 5 bedroom home, our model would predict -35.7 + (9.1 * 5) or $18,900. That's not too bad.

The intercept of -34.7 doesn't make a lot of sense on its own since this would imply that a studio apartment would be worth negative dollars. Also, we don't have any 1 or 2 bedroom homes in our dataset, so the model will perform poorly on data in this range.

In order to determine whether our coefficients are statistically significant, we need to quantify the likelihood of seeing the association by chance. One way of doing this is using a p-value. As a general rule of thumb, a p-value of under .05 indicates statistical significance in that there is less than a 1 in 20 chance of seeing the correlation by mere chance.

Do this using the summary attribute of lrModel.

summary = lrModel.summary

summary.pValues
# [0.0, 0.0] 

These small p-values indicate that it is highly unlikely to see the correlation of the number of rooms to housing price by chance. The first value in the list is the p-value for the rm feature and the second is that for the intercept.

Finally, we need a way to quantify how accurate our model is. R2 is a measure of the proportion of variance in the dataset explained by the model. With R2, a higher number is better.

summary.r2
# 0.4835254559913341

This indicates that 48% of the variability in home value can be explained using rm and the intercept. While this isn't too high, it's not too bad considering that we're training a model using only one variable.

Finally, take a look at the summary attribute of lrModel so see other ways of summarizing model performance.

[attr for attr in dir(summary) if attr[0] != "_"]
# ['coefficientStandardErrors', 'degreesOfFreedom', 'devianceResiduals', 'explainedVariance', 'featuresCol', 'labelCol', 'meanAbsoluteError', 'meanSquaredError', 'numInstances', 'objectiveHistory', 'pValues', 'predictionCol', 'predictions', 'r2', 'r2adj', 'residuals', 'rootMeanSquaredError', 'tValues', 'totalIterations'] 

Multivariate Regression

The equation for multivariate regression looks like the following where each feature p has its own coefficient:

Y ≈ β0 + β1X1 + β2X2 + ... + βpXp

Train a multivariate regression model using rm, crim, and lstat as the input features.

from pyspark.ml.feature import VectorAssembler

featureCols = ["rm", "crim", "lstat"]
assemblerMultivariate = VectorAssembler(inputCols=featureCols, outputCol="features")

bostonFeaturizedMultivariateDF = assemblerMultivariate.transform(bostonDF)

display(bostonFeaturizedMultivariateDF)

Train the model.

from pyspark.ml.regression import LinearRegression

lrMultivariate = (LinearRegression()
  .setLabelCol("medv")
  .setFeaturesCol("features")
)

lrModelMultivariate = lrMultivariate.fit(bostonFeaturizedMultivariateDF)

summaryMultivariate = lrModelMultivariate.summary

Take a look at the coefficients and R2 score.

print("β0 (intercept): {}".format(lrModelMultivariate.intercept))
for i, (col, coef) in enumerate(zip(featureCols, lrModelMultivariate.coefficients)):
  print("β{} (coefficient for {}): {}".format(i+1, col, coef))
# β0 (intercept): -2.562251011927113 β1 (coefficient for rm): 5.2169549243939874 β2 (coefficient for crim): -0.1029408867181515 β3 (coefficient for lstat): -0.578485819633508 
print("\nR2 score: {}".format(lrModelMultivariate.summary.r2))
# R2 score: 0.6458520515781128

Our R2 score improved from 48% to 64%, indicating that our new model can explain more of the variance in the data.

Exercise: Improve on our Model

Step 1: Prepare the Features for a New Model

Prepare a new column allFeatures for a new model that uses all of the features in bostonDF except for the label medv. Create the following variables:

  1. allFeatures: a list of all the column names
  2. assemblerAllFeatures: A VectorAssembler that uses allFeatures to create the output column allFeatures
  3. bostonFeaturizedAllFeaturesDF: The transformed bostonDF
allFeatures = bostonDF.schema.names[0:-1]
assemblerAllFeatures = VectorAssembler(inputCols=allFeatures, outputCol="allFeatures")

bostonFeaturizedAllFeaturesDF = assemblerAllFeatures.transform(bostonDF)

Step 2: Train the Model

Create a linear regression model lrAllFeatures. Save the trained model to lrModelAllFeatures.

lrAllFeatures = (LinearRegression()
  .setLabelCol("medv")
  .setFeaturesCol("allFeatures")
)
lrModelAllFeatures = lrAllFeatures.fit(bostonFeaturizedAllFeaturesDF)

Step 3: Interpret the Coefficients and Variance Explained

Take a look at the coefficients and variance explained. What do these mean?

print("β0 (intercept): {}".format(lrModelAllFeatures.intercept))
for i, (col, coef) in enumerate(zip(allFeatures, lrModelAllFeatures.coefficients)):
  print("β{} (coefficient for {}): {}".format(i+1, col, coef))

nox and rm have large values

print(lrModelAllFeatures.summary.r2)

r2 is 0.74 meaning it explains 74% of the variance

Step 4: Interpret the Statistical Significance of the Coefficients

Print out the p-values associated with each coefficient and the intercept. Which were statistically significant?

print("β0 (intercept pval): {}".format(lrModelAllFeatures.summary.pValues[0]))
for i, (col, coef) in enumerate(zip(allFeatures, lrModelAllFeatures.summary.pValues[1:])):
  print("β{} (coefficient pval for {}): {}".format(i+1, col, coef))

Low p-value mean statistically significant e.g. age , black.

Spark standardizes each feature by default so the user does not need to take this pre-processing step.

07: Classification

While linear regression models continuous variables, logistic regression classifies variables into two or more groups. This lesson trains binomial and multi-class logistic regression models and examines evaluation metrics for classification tasks.

Logistic regression gives probability (0-1).

False +: non-cancerous cells being classified as cancerous

False -: Cancerous cells being classified as non-cancerous

If 0.2 threshold: More Flase + and less False -.

Linear Regression: Y ≈ β0 + β1X

Logistic Regression: p(X) ≈ β0 + β1X. Note that p(X) is in log odds.

Read in cancer data:

from pyspark.sql.functions import col

cols = ["index",
 "sample-code-number",
 "clump-thickness",
 "uniformity-of-cell-size",
 "uniformity-of-cell-shape",
 "marginal-adhesion",
 "single-epithelial-cell-size",
 "bare-nuclei",
 "bland-chromatin",
 "normal-nucleoli",
 "mitoses",
 "class"]

cancerDF = (spark.read  
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/cancer/biopsy/biopsy.csv")
)

cancerDF = (cancerDF    # Add column names and drop nulls
  .toDF(*cols)
  .withColumn("bare-nuclei", col("bare-nuclei").isNotNull().cast("integer"))
)

display(cancerDF)

View the class balance by grouping the data on our label.

In the case of unbalanced classes, one solution is to add weights into the model allowing for a greater penalty for misclassifying the underrepresented class.

display(cancerDF.groupby("class").count())

Turn the label into a numerical value and create a column feature for just clump-thickness.

from pyspark.ml.feature import StringIndexer, VectorAssembler

indexer = StringIndexer(inputCol="class", outputCol="is-malignant")
cancerIndexedDF = indexer.fit(cancerDF).transform(cancerDF)

assembler = VectorAssembler(inputCols=["clump-thickness"], outputCol="features")
cancerIndexedAssembledDF = assembler.transform(cancerIndexedDF)

display(cancerIndexedAssembledDF)

Train the logistic regression model using only the input feature clump-thickness and look at the resulting coefficient.

from pyspark.ml.classification import LogisticRegression

logr = LogisticRegression(featuresCol="features", labelCol="is-malignant")

logrModel = logr.fit(cancerIndexedAssembledDF)

print("β0 (intercept): {}".format(logrModel.intercept)) # -5.16
print("β1 (coefficient for rm): {}".format(logrModel.coefficients[0])) # 0.93

Model Interpretation

In linear regression, increasing a given feature X by one unit increased our prediction by X multiplied by the corresponding coefficient. In logistic regression, increasing a given feature X by one unit increases the log odds by the corresponding coefficient.

The odds that a clump size of 9 is cancerous is

exp(β0 + β1X )

exp(-5.2 + .94 x 9)

exp(3.26)

26

26 to 1, or 0.96.

The probability column gives us the probability of being of class 0 and of being in class 1.

from pyspark.sql.functions import desc

transformedDF = logrModel.transform(cancerIndexedAssembledDF)

display(transformedDF
  .select("clump-thickness", "class", "is-malignant", "probability")
  .distinct()
  .orderBy(desc("clump-thickness"))
)

Calculate this value manually using the intercept and coefficient.

from math import exp

odds = exp(-5.2 + .94 * 9)     # Exponentiate the formula since logistic regression is log odds
probability = 1 - (1 / odds)   # Convert odds to probability

print("Prediction for odds of cancer: {} to 1".format( round(odds, 1) ))
print("Prediction for probability of cancer: {}".format(probability))

Take a look at how the model predicted the probability.

(transformedDF
  .filter(col("clump-thickness") == 9)
  .select("probability")
  .first()[0]
)

Model Performance

A confusion matrix is a table that reports the number of false positives, false negatives, true positives, and true negatives. This is the basis of a number of different evaluation metrics such as accuracy, precision, and area under the ROC curve.

Accuracy is calculated by adding the true positives and true negatives and dividing by the total number of records.

The best accuracy metric for a given use case depends on how we want to optimize between true and false predictions.

Print out the confusion matrix.

n = transformedDF.count()
TP = transformedDF.filter((col("is-malignant") == 1) & (col("prediction") == 1)).count()
FP = transformedDF.filter((col("is-malignant") == 0) & (col("prediction") == 1)).count()
FN = transformedDF.filter((col("is-malignant") == 1) & (col("prediction") == 0)).count()
TN = transformedDF.filter((col("is-malignant") == 0) & (col("prediction") == 0)).count()

print("n = {}\t\t\tPredicted Positive \tPredicted Negative".format(n))
print("Actually Positive\t{} \t\t\t{}".format(TP, FP))
print("Actually Negative\t{} \t\t\t{}".format(FN, TN))

One way of visualizing the trade-off between true positives and false positives is by plotting them on a ROC Curve.

import matplotlib.pyplot as plt
import seaborn as sns

falsePositives, truePositives = [], []

for row in logrModel.summary.roc.collect():
  falsePositives.append(row.FPR)
  truePositives.append(row.TPR)

fig = plt.figure()

plt.plot(falsePositives, truePositives)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")

display(fig)

The associated evaluation metric takes the total area under this curve, which is bounded between 0 and 1.

logrModel.summary.areaUnderROC

Retrain the model using all of the relevant features excluding the index and sequence number.

assembler = VectorAssembler(inputCols=cols[2:-1], outputCol="features")
cancerIndexedAssembledDF2 = assembler.transform(cancerIndexedDF)

logr = LogisticRegression(labelCol="is-malignant", featuresCol="features")

logrModel2 = logr.fit(cancerIndexedAssembledDF2)

logrModel2.coefficients

Take a look at how the area under the ROC curve has improved.

logrModel2.summary.areaUnderROC

Multi-Class Classification

Classification between multiple classes using logistic regression is known as multi-class or multinomial logistic regression. To predict between different classes, set the family argument for LogisticRegression to multinomial. This model then provides probabilities associated with each class.

Import the iris data and clean the column names

irisDF = (spark.read
  .option("INFERSCHEMA", True)
  .option("HEADER", True)
  .csv("/mnt/training/iris/iris.csv")
  .drop("_c0")
)

cols = [col.replace(".", "-").lower() for col in irisDF.columns]
irisDF = irisDF.toDF(*cols)

display(irisDF)

Create a pipeline of three stages:

  1. indexer: a StringIndexer that takes species as an input and outputs speciesClass
  2. assembler: a VectorAssembler that takes all the features in the dataset and outputs features
  3. multinomialRegression: a LogisticRegression that takes features and uses the multinomial family
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="species", outputCol="speciesClass")

allFeatures = irisDF.columns[:-1]
assembler = VectorAssembler(inputCols=allFeatures, outputCol="features")

multinomialRegression = LogisticRegression(featuresCol='features',
                                           labelCol='speciesClass',
                                           family="multinomial")

pipeline = Pipeline(stages=[
  indexer, 
  assembler, 
  multinomialRegression
])

Step 3: Train the Model and Transform the Dataset

Train the model and save the results to multinomialModel and save the transformed dataset to irisTransformedDF.

multinomialModel = pipeline.fit(irisDF)
irisTransformedDF = multinomialModel.transform(irisDF)

Notice how we have probabilities and raw log odds predictions for all three classes.

display(irisTransformedDF)

Question: What are the various ways that I can evaluate model performance in binomial classification?

Answer: Evaluating model performance in the case of binomial regression is more complex than in the case of regression, where most metrics are based around the distance between the true and predicted values. These metrics are different trade-offs between true positives (TP), false positives (FP), true negatives (TN), and false negatives (FN). Some common metrics include:

  1. Sensitivity = TP / (TP + FN)
  2. Specificity = TN / (TN + FP)
  3. Precision = TP / (TP + FP)
  4. Accuracy = (TP + TN) / (TP + TN + FP + FN)

08: Model Selection

Machine learning entails training different models and different parameters (hyperparameters). A hyperparameter is a parameter used in a machine learning algorithm that is set before the learning process begins. In other words, a machine learning algorithm cannot learn hyperparameters from the data itself.

k-fold validation: 25% split then

  • Train, Train, Validate, Not Used
  • Train, Validate, Train, Not Used
  • Validate, Train, Train, Not Used
  • Train, Train, Train, Final Evaluation

Hyperparameter Tuning

You can explore these hyperparameters by using the .explainParams() method on a model.

Grid search is the process of exhaustively trying every combination of hyperparameters. It takes all of the values we want to test and combines them in every possible way so that we test them using cross-validation. https://en.wikipedia.org/wiki/Hyperparameter_optimization

train/test split on the Boston dataset and building a pipeline for linear regression.

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

bostonDF = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
  .drop("_c0")
)

trainDF, testDF = bostonDF.randomSplit([0.8, 0.2], seed=42)

assembler = VectorAssembler(inputCols=bostonDF.columns[:-1], outputCol="features")

lr = (LinearRegression()
  .setLabelCol("medv")
  .setFeaturesCol("features")
)

pipeline = Pipeline(stages = [assembler, lr])

Take a look at the model parameters using the .explainParams() method.

print(lr.explainParams())

ParamGridBuilder() allows us to string together all of the different possible hyperparameters we would like to test. In this case, we can test the maximum number of iterations, whether we want to use an intercept with the y axis, and whether we want to standardize our features. This gives twelve combinations.

from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
  .addGrid(lr.maxIter, [1, 10, 100])
  .addGrid(lr.fitIntercept, [True, False])
  .addGrid(lr.standardization, [True, False])
  .build()
)

Cross-Validation

An exhaustive approach to cross-validation would include every possible split of the training set. https://scikit-learn.org/stable/modules/cross_validation.html

Create a RegressionEvaluator() to evaluate our grid search experiments and a CrossValidator() to build our models.

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = RegressionEvaluator(
  labelCol = "medv", 
  predictionCol = "prediction"
)

cv = CrossValidator(
  estimator = pipeline,             # Estimator (individual model or pipeline)
  estimatorParamMaps = paramGrid,   # Grid of parameters to try (grid search)
  evaluator=evaluator,              # Evaluator
  numFolds = 3,                     # Set k to 3
  seed = 42                         # Seed to sure our results are the same if ran again
)

Fit the CrossValidator()

Take a look at the scores from the different experiments.

for params, score in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
  print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
  print("\tScore: {}".format(score))

You can then access the best model using the .bestModel attribute.

bestModel = cvModel.bestModel
print(bestModel.stages[-1]._java_obj.getMaxIter())
print(bestModel.stages[-1]._java_obj.getFitIntercept())
print(bestModel.stages[-1]._java_obj.getStandardization())

Saving Models and Predictions

Spark can save both the trained model we created as well as the predictions. For online predictions such as on a stream of new data, saving the trained model and using it with Spark Streaming is a common application. It's also common to retrain an algorithm as a nightly batch process and save the results to a database or parquet table for later use.

Save the best model.

modelPath = userhome + "/cvPipelineModel"
dbutils.fs.rm(modelPath, recurse=True)

cvModel.bestModel.save(modelPath)

Take a look at where it saved.

dbutils.fs.ls(modelPath)
[FileInfo(path='dbfs:/user/EMAIL/cvPipelineModel/metadata/', name='metadata/', size=0), FileInfo(path='dbfs:/user/EMAIL/cvPipelineModel/stages/', name='stages/', size=0)] 

Save predictions made on testDF.

predictionsPath = userhome + "/modelPredictions.parquet"

cvModel.bestModel.transform(testDF).write.mode("OVERWRITE").parquet(predictionsPath)

Exercise: Tuning a Model

Use grid search and cross-validation to tune the hyperparameters from a logistic regression model.

Step 1: Import the Data

Import the data and perform a train/test split.

from pyspark.sql.functions import col

cols = ["index",
 "sample-code-number",
 "clump-thickness",
 "uniformity-of-cell-size",
 "uniformity-of-cell-shape",
 "marginal-adhesion",
 "single-epithelial-cell-size",
 "bare-nuclei",
 "bland-chromatin",
 "normal-nucleoli",
 "mitoses",
 "class"]

cancerDF = (spark.read  # read the data
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/mnt/training/cancer/biopsy/biopsy.csv")
)

cancerDF = (cancerDF    # Add column names and drop nulls
  .toDF(*cols)
  .withColumn("bare-nuclei", col("bare-nuclei").isNotNull().cast("integer"))
)

display(cancerDF)

Perform a train/test split to create trainCancerDF and testCancerDF. Put 80% of the data in trainCancerDF

seed = 42
trainCancerDF, testCancerDF = cancerDF.randomSplit([0.8, 0.2], seed=seed)

Step 2: Create a Pipeline

Create a pipeline cancerPipeline that consists of the following stages:

  1. indexer: a StringIndexer that takes class as an input and outputs the column is-malignant
  2. assembler: a VectorAssembler that takes all of the other columns as an input and outputs the column features
  3. logr: a LogisticRegression that takes features as the input and is-malignant as the output variable
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

indexer = StringIndexer(inputCol="class", outputCol="is-malignant")
assembler = VectorAssembler(inputCols=cancerDF.columns[2:-1], outputCol="features")
logr = LogisticRegression(featuresCol='features', labelCol='is-malignant')
cancerPipeline = Pipeline(stages=[indexer, assembler, logr])

Step 3: Create Grid Search Parameters

Take a look at the parameters for our LogisticRegression object. Use this to build the inputs to grid search.

print(logr.explainParams())

Create a ParamGridBuilder object with two grids:

  1. A regularization parameter regParam of [0., .2, .8, 1.]
  2. Test both with and without an intercept using fitIntercept
from pyspark.ml.tuning import ParamGridBuilder

cancerParamGrid = (ParamGridBuilder()
  .addGrid(logr.regParam, [0., .2, .8, 1.])
  .addGrid(lr.fitIntercept, [True, False])
  .build() 
)

Step 4: Perform 3-Fold Cross-Validation

Create a BinaryClassificationEvaluator object and use it to perform 3-fold cross-validation.

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

binaryEvaluator = BinaryClassificationEvaluator(
  labelCol = "is-malignant", 
  metricName = "areaUnderROC"
)

cancerCV = CrossValidator(
  estimator = cancerPipeline,              # Estimator (individual model or pipeline)
  estimatorParamMaps = cancerParamGrid,     # Grid of parameters to try (grid search)
  evaluator= binaryEvaluator,               # Evaluator
  numFolds = 3,               # Set k to 3
  seed = 42                           # Seed to sure our results are the same if ran again
)

cancerCVModel = cancerCV.fit(trainCancerDF)

Step 5: Examine the results

Take a look at the results. Which combination of hyperparameters learned the most from the data?

for params, score in zip(cancerCVModel.getEstimatorParamMaps(), cancerCVModel.avgMetrics):
  print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
  print("\tScore: {}".format(score))
bestModel = cancerCVModel.bestModel
print(bestModel.stages[-1]._java_obj.getRegParam())
print(bestModel.stages[-1]._java_obj.getFitIntercept())

Question: How can I further improve my predictions?

Answer: There are a number of different strategies including:

  1. Different models: train different models such as a random forest or gradient boosted trees
  2. Expert knowledge: combine the current pipeline with domain expertise about the data
  3. Better tuning continue tuning with more hyperparameters to choose from
  4. Feature engineering create new features for the model to train on
  5. Ensemble models combining predictions from multiple models can produce better results than any one model


You can use http://mleap-docs.combust.ml/ for training pipelines and exporting them to an MLeap Bundle. It supports Spark, Scikit-learn and Tensorflow

09: Capstone Project: An End-to-End ML Model

The goal of this project is to build an end-to-end machine learning model from data exploration and featurization through model training and hyperparameter tuning.

Use data from http://insideairbnb.com/get-the-data.html. It has a cool tool where you can explore the data http://insideairbnb.com/london/ http://insideairbnb.com/san-francisco/

This exercise elaborates on the Airbnb dataset used in the lesson on featurization. The goals are as follows:

  1. Perform exploratory analysis
  2. Featurize the dataset
  3. Train a linear regression model
  4. Tune the model hyperparameters using grid search and cross-validation

Exercise 1: Exploratory Analysis

Import the dataset and perform exploratory analysis including calculating summary statistics and plotting histograms, a scatterplot, and a heatmap of the city of San Francisco.

Import the data, sitting in /mnt/training/airbnb/sf-listings/sf-listings-clean.parquet

filePath = "/mnt/training/airbnb/sf-listings/sf-listings-clean.parquet"

initDF = spark.read.parquet(filePath)

Calculate summary statistics on the data to answer the following questions:

  1. What do each of the variables mean?

host_total_listings_count: Number of listings for each host?

neighbourhood_cleansed: Wherever the property is associated with a neighborhood

zipcode: 5 digit size code

latitude: Latitude of property

longitude: Longitude or property

property_type: Type of property

room_type: Type of room e.g. Entire home, Shared room.

accommodates: How many occupants it accommodates?

bathrooms: Number of bathrooms in the property

bedrooms: Number of bedrooms in the property

beds: Number of beds available

bed_type: Type of bed e.g. airbed

minimum_nights: The shortest period of time you can stay?

number_of_reviews: Number of reviews for each property

review_scores_rating: 0-100 overall rating

review_scores_accuracy: 0-10 accuracy from listing to actual

review_scores_cleanliness: 0-10 cleanliness

review_scores_checkin: 0-10 experience of checkin

review_scores_communication: 0-10 experience of communication

review_scores_location: 0-10 location of property

review_scores_value: 0-10 value for money

price_raw: Price with $

price: Price as float


2. Are there missing values?

host_total_listings_count: N

neighbourhood_cleansed: Y

zipcode: Y

latitude: N

longitude: N

property_type: Y

room_type: Y

accommodates: N

bathrooms: N

bedrooms: N

beds: N

bed_type: N

minimum_nights: N

number_of_reviews: N

review_scores_rating: N

review_scores_accuracy: N

review_scores_cleanliness: N

review_scores_checkin: N

review_scores_communication: N

review_scores_location: N

review_scores_value: N

price_raw: N

price: Y

3. How many records are in the dataset?

4759


4. What is the mean and standard deviation of the dataset?

host_total_listings_count: 6, 20

neighbourhood_cleansed: N/A

zipcode: N/A

latitude: 37.7, 0.02

longitude: -122, 0.02

property_type: N/A

room_type: N/A

accommodates: 3.4, 2.2

bathrooms: 1.3, 1.2

bedrooms: 1.4, 1.4

beds: 1.89, 1.59

bed_type: N/A

minimum_nights: 7, 13.3

number_of_reviews: 50, 68

review_scores_rating: 9.8, 0.4

review_scores_accuracy: 9.8, 0.4

review_scores_cleanliness: 9.7, 0.6

review_scores_checkin: 9.9, 0.3

review_scores_communication: 9.6, 0.6

review_scores_location: 9.6, 0.5

review_scores_value: 9.5, 0.6

price_raw: N/A

price: 222. 374


5. Which variables are continuous and which are categorical?

host_total_listings_count: con

neighbourhood_cleansed: cat

zipcode: cat

latitude: con

longitude: con

property_type: cat

room_type: cat

accommodates: con

bathrooms: con

bedrooms: con

beds: con

bed_type: cat

minimum_nights: con

number_of_reviews: con

review_scores_rating: con

review_scores_accuracy: con

review_scores_cleanliness: con

review_scores_checkin: con

review_scores_communication: con

review_scores_location: con

review_scores_value: con

price_raw: con

price: con

Drop the column price_raw and cache the results

airbnbDF = initDF.drop("price_raw")
airbnbDF.cache()

Create a histogram of price

display(airbnbDF.select('price'))
# Then click on histogram

Is this a Log Normal distribution? Yes it's looks Gaussian in log space

from pyspark.sql.functions import log, col

display(airbnbDF.select('price').withColumn('log_price', log(col('price'))))

Create a scattermatrix (a matrix of scatterplots) to look at how the following variables correlate together:

  • price
  • host_total_listings_count
  • review_scores_rating
  • number_of_reviews
  • bathrooms
display(airbnbDF.select('price', 'host_total_listings_count', 'review_scores_rating', 'number_of_reviews', 'bathrooms')) # and click on scatter
# or

%python
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

fig, ax = plt.subplots()
pandasDF = airbnbDF.select('price', 'host_total_listings_count', 'review_scores_rating', 'number_of_reviews', 'bathrooms').toPandas()

pd.scatter_matrix(pandasDF)

display(fig.figure)

Which neighborhoods have the highest number of rentals? Display the neighborhoods and their associated count in descending order.

display(airbnbDF.select('neighbourhood_cleansed', 'host_total_listings_count').groupBy('neighbourhood_cleansed').count())
# Then click the arrow on count

Map the various listings by price. Since Databricks notebooks can display arbitrary HTML code, use the code provided.

%python
from pyspark.sql.functions import col

try:
  airbnbDF
except NameError: # Looks for local table if airbnbDF not defined
  airbnbDF = spark.table("airbnb")

v = ",\n".join(map(lambda row: "[{}, {}, {}]".format(row[0], row[1], row[2]), airbnbDF.select(col("latitude"),col("longitude"),col("price")/600).collect()))
displayHTML("""
<html>
<head>
 <link rel="stylesheet" href="https://unpkg.com/leaflet@1.3.1/dist/leaflet.css"
   integrity="sha512-Rksm5RenBEKSKFjgI3a41vrjkw4EVPlJ3+OiI65vTjIdo9brlAacEuKOiQ5OFh7cOI1bkDwLqdLw3Zg0cRJAAQ=="
   crossorigin=""/>
 <script src="https://unpkg.com/leaflet@1.3.1/dist/leaflet.js"
   integrity="sha512-/Nsx9X4HebavoBvEBuyp3I7od5tA0UzAxs+j83KgC8PU0kgB4XiK4Lfe4y4cgBtaRJQEIFCW+oC506aPT2L1zw=="
   crossorigin=""></script>
 <script src="https://cdnjs.cloudflare.com/ajax/libs/leaflet.heat/0.2.0/leaflet-heat.js"></script>
</head>
<body>
    <div id="mapid" style="width:700px; height:500px"></div>
  <script>
  var mymap = L.map('mapid').setView([37.7587,-122.4486], 12);
  var tiles = L.tileLayer('http://{s}.tile.osm.org/{z}/{x}/{y}.png', {
    attribution: '&copy; <a href="http://osm.org/copyright">OpenStreetMap</a> contributors',
}).addTo(mymap);
  var heat = L.heatLayer([""" + v + """], {radius: 25}).addTo(mymap);
  </script>
  </body>
  </html>
""")

Exercise 2: Featurization

Featurize the dataset by building a pipeline that accomplishes the following:

  1. Indexes all categorical variables
  2. One-hot encodes the indexed values
  3. Combines all features into a features column

Finally, perform a train/test split.

from pyspark.ml.feature import StringIndexer

iNeighbourhood = StringIndexer(inputCol="neighbourhood_cleansed", outputCol="cat_neighborhood", handleInvalid="skip")
iRoomType = StringIndexer(inputCol="room_type", outputCol="cat_room_type", handleInvalid="skip")
iZipCode = StringIndexer(inputCol="zipcode", outputCol="cat_zipcode", handleInvalid="skip")
iPropertyType = StringIndexer(inputCol="property_type", outputCol="cat_property_type", handleInvalid="skip")
iBedType= StringIndexer(inputCol="bed_type", outputCol="cat_bed_type", handleInvalid="skip")

from pyspark.ml.feature import OneHotEncoderEstimator

oneHotEnc = OneHotEncoderEstimator(inputCols=["cat_neighborhood", "cat_room_type", "cat_zipcode", "cat_property_type", "cat_bed_type"],
                                   outputCols=["vec_neighborhood", "vec_room_type", "vec_zipcode", "vec_property_type", "vec_bed_type"])

seed = 42
(testDF, trainDF) = airbnbDF.randomSplit([0.2, 0.8], seed=seed)

Use VectorAssembler to combine the features.

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

Put this all together in a pipeline.

from pyspark.ml import Pipeline
featurizationPipeline = Pipeline(stages=[
  iNeighbourhood, iRoomType, iZipCode, iPropertyType, iBedType,
  oneHotEnc,
  assembler
])

trainTransformedDF = featurizationPipeline.fit(trainDF).transform(trainDF)
testTransformedDF = featurizationPipeline.fit(testDF).transform(testDF)

Use the average of price for your baseline model. Save that value to baselinePrediction

from pyspark.sql.functions import avg

baselinePrediction = trainTransformedDF.select(avg("price")).first()[0]

Create a RegressionEvaluator object named evaluator with price as the label, prediction as the prediction column, and mse as the metric.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="mse")

Create the DataFrame baselinePredictionDF that contains the transformed test data testTransformedDF with a new column prediction that has the same baseline prediction for all observations in the dataset.

from pyspark.sql.functions import lit
baselinePredictionDF = testTransformedDF.withColumn("prediction", lit(baselinePrediction))

Evaluate the results:

metricName = evaluator.getMetricName()
metricVal = evaluator.evaluate(baselinePredictionDF)

print("{}: {}".format(metricName, metricVal))
# 158205.77280874032

Train a Linear Regression Model

from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

lr = LinearRegression(labelCol="price", featuresCol="features")
pipelineFull = Pipeline(stages=[featurizationPipeline, lr])


pipelineFullModel = pipelineFull.fit(trainDF)

Evaluate the performance of linear regression over the baseline model.

metricName = evaluator.getMetricName()
metricVal = evaluator.evaluate(pipelineFullModel.transform(testDF))

print("{}: {}".format(metricName, metricVal))
# 129976.29251042467

Exercise 4: Hyperparameter Tuning

Use 3-fold cross-validation to tune the hyperparameters of the linear regression model.

print(lr.explainParams())

Build a grid of parameters for cross-validation.

from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
  .addGrid(lr.elasticNetParam, [0.0, 1.0])             
  .addGrid(lr.maxIter, [1, 10, 100])
  .addGrid(lr.fitIntercept, [True, False])
  .build()
)

Build the cross-validator.

from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(
  estimator = pipelineFull,
  estimatorParamMaps = paramGrid,
  evaluator=evaluator,
  numFolds = 3
)

Fit the cross-validator to the training data.

cvModel = cv.fit(trainDF)

Examine the results.

for params, score in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
  print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
  print("\tScore: {}".format(score))

See the best fit

bestModel = cvModel.bestModel
print(bestModel.stages[-1]._java_obj.getElasticNetParam())
print(bestModel.stages[-1]._java_obj.getMaxIter())
print(bestModel.stages[-1]._java_obj.getFitIntercept())

Evaluate on training and test data

evaluator.evaluate(cvModel.bestModel.transform(trainDF))
# 103136.8496131469

evaluator.evaluate(cvModel.bestModel.transform(testDF))
# 129981.5354094153