Introduction to Data Science and Machine Learning
July 2019
Log into databricks
Importing the course:
- In the left sidebar, click Home.
- Right-click your home folder, then click Import.
- Select browse and then choose your course Lessons.dbc file.
- Click the Import button.
- Click the Home icon in the left sidebar.
- Select your home folder.
- Select the sub-folder matching the name of your course (e.g. Introduction-to-Data-Science-and-Machine-Learning-1.0.1).
- Open Lesson 01-Course-Overview-and-Setup.
Hit the keyboard sign in the top right to sheet keyboard shortcuts
01: Course Overview and Setup
Machine Learning Drives Business Value:
- A/B Testing: offering users different versions of a website indicates what the most successful version will be
- Natural Language Processing: analyzing natural language using statistical methods allow for the algorithmic understanding of language, empowering sentiment analysis, chatbots, and many other applications
- Financial Forecasting: future company financials can be predicted using past data
- Churn Analysis: determine whether a customer might leave a business determines the best way to retain customers
02: What is ML
Estimate a function (f) that maps the relationship between input features and the output variable.
Read in the Boston housing data
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
)
display(bostonDF)
Spark differs from many other machine learning frameworks in that we train our model on a single column that contains a vector of all of our features. Prepare the data by creating one column named features
that has the average number of rooms, crime rate, and poverty percentage.
Add a column to the DF
that has a vector of all our features of interest using the VectorAssembler:
from pyspark.ml.feature import VectorAssembler
featureCols = ["rm", "crim", "lstat"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
bostonFeaturizedDF = assembler.transform(bostonDF)
display(bostonFeaturizedDF)
This column has sparse vector notation. Find out more here
Regression vs Classification
Variables can either be quantitative or qualitative:
- Quantitative values are numeric and generally unbounded, taking any positive or negative value | continuous, numerical | e.g. age, salary, temperature
- Qualitative values take on a set number of classes or categories | categorical, discrete | gender, whether or a not a patient has cancer, state of residence
Machine learning models operate on numbers so a qualitative variable like gender, for instance, would need to be encoded as 0
for male or 1
for female.
a supervised model learning a quantitative variable is called regression and a model learning a qualitative variable is called classification.
Import linear regression and set the output to be the medv
variable and the input to be the features
:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol="medv", featuresCol="features")
Fit the data:
lrModel = lr.fit(bostonFeaturizedDF)
You can see the fit by doing:
print("Coefficients: {0:.1f}, {1:.1f}, {2:.1f}".format(*lrModel.coefficients))
print("Intercept: {0:.1f}".format(lrModel.intercept))
Coefficients: 5.2, -0.1, -0.6 Intercept: -2.6
You can interpret the output as
predicted home value = (5.2 x number of rooms) - (.1 x crime rate) - (.6 x % lower class) - 2.6
Let's see how it predicts on data it has already seen as well as new data.
Look at the first 10 rows
subsetDF = (bostonFeaturizedDF
.limit(10)
.select("features", "medv")
)
display(subsetDF)
Use the transform
method on the trained model to see its prediction.
Now that lrModel
is a trained estimator, we can transform data using its .transform()
method.
predictionDF = lrModel.transform(subsetDF)
display(predictionDF)
Now predict off of a hypothetical data point of a 6 bedroom home with a 3.6 crime rate and 12 % average lower class. According to our formula, the model should predict about 21:
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([6., 3.6, 12.]), )] # Creates our hypothetical data point
predictDF = spark.createDataFrame(data, ["features"])
display(lrModel.transform(predictDF))
Exercise: Train a Model and Create Predictions
# Initiate data
from pyspark.ml.feature import VectorAssembler
featureCols = ["indus", "age", "dis"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="newFeatures")
bostonFeaturizedDF2 = assembler.transform(bostonDF)
# Initiate model and fit training data
from pyspark.ml.regression import LinearRegression
lrNewFeatures = LinearRegression(labelCol="medv", featuresCol="newFeatures")
lrModelNew = lrNewFeatures.fit(bostonFeaturizedDF2)
# Predict values
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([11., 68., 4.]), ),
(Vectors.dense([6., 35., 2.]), ),
(Vectors.dense([19., 74., 8.]), )]
newDataDF = spark.createDataFrame(data, ["newFeatures"])
predictionsDF = lrModelNew.transform(newDataDF)
display(predictionsDF)
Question: What's the difference between the fit
and transform
methods on a machine learning model?
Answer: The fit
method learns something from the data and saves the result in the model object (e.g. lrModel
above). The transform
method applies the learned pattern to unseen data.transform
can be used to make predictions using a trained model or to change the data in some other way, such as encoding a categorical feature. We'll explore this more in later lessons.
Further reading:
An Introduction to Statistical Learning
https://github.com/databricks/spark-sklearn (DEPRECATED)
https://github.com/hyperopt/hyperopt
https://databricks.com/blog/category/engineering/machine-learning
03: Exploratory-Analysis
Exploratory Data Analysis (EDA).
Exploratory analysis focuses on the following:
Gain basic intuition into the data:
- What does each feature represent?
- Are your features categorical or continuous?
- What data needs to be encoded? (e.g. mapping a variable for gender to a number)
- What data types are you working with? (e.g. integers, strings)
How is the data distributed?:
- What is the mean, median, and/or mode of each variable?
- What is the variance, or spread, of each variable?
- Are there outliers? Missing values?
How can we visualize the data?:
- Histograms, scatterplots, and boxplots
- Correlation plots
What hypotheses will I test with my models?:
- What features are correlated?
- What are our ideal features and how can we build them if they're not already available?
Import the data and use the .describe()
method on it:
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
.drop("_c0")
)
display(bostonDF.describe())
Now display one variable:
display(bostonDF.select("medv").describe())
Plot a histogram of the median value in the housing dataset. Display it then click on the plotting option and visualize it as a histogram
display(bostonDF.select("medv"))
Look at how the number of rooms compares to the median value. Select the scatterplot option from plotting dropped menu. The click 'Plot Options...' -> 'Show LOESS'.
display(bostonDF.select("rm", "medv"))
Databricks notebooks can display plots generated in pure Python:
- Import
matplotlib
,pandas
, andseaborn
- Create a
fig
object - Use the
.toPandas()
DataFrame method to turn the Spark DataFrame into a Pandas DataFrame. This way we can use Python's plotting libraries - Use the
scatter_matrix
pandas function to plot a matrix of scatterplots
Plot a scatter matrix using the pandas
Python library.
%python
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns # importing for a better color scheme
try:
bostonDF
except NameError: # Looks for local table if bostonDF not defined
bostonDF = spark.table("boston")
fig, ax = plt.subplots()
pandasDF = bostonDF.select("rm", "crim", "lstat", "medv").toPandas()
pd.scatter_matrix(pandasDF)
display(fig.figure)
To deepen our understanding of correlation, add columns to the bostonDF
from pyspark.sql.functions import col, rand
bostonWithDummyDataDF = (bostonDF
.select("medv")
.withColumn("medvX3", col("medv")*3)
.withColumn("medvNeg", col("medv")*-3)
.withColumn("random1", rand(seed=41))
.withColumn("random2", rand(seed=44))
.withColumn("medvWithNoise", col("medv")*col("random1"))
.withColumn("medvWithNegativeNoise", col("medv")*col("random1")*-1)
)
display(bostonWithDummyDataDF)
for col in bostonWithDummyDataDF.columns:
correlation = bostonWithDummyDataDF.stat.corr("medv", col)
print("The correlation between columns 'id' and '{}': \t{}".format(col, correlation))
There are a number of other helpful visualizations depending on the needs of your data:
- Heat maps: similar to a scattermatrix, heatmaps can be especially helpful at visualizing correlations between variables
- Box plots: visualizes quantiles and outliers
- Q-Q Plots: visualizes two probability distributions
- Maps and GIS: visualizes geographically-bound data
- t-SNE: plots high dimensional data (i.e. data that has many variables) by projecting it down into two-dimensional plot
- Time series: plots time-bound variables including run charts, lag plots, and wavelet spectrograms
Assemble all the feature into a single column features
. so we can use Spark's built-in correlation functionality.
%python
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=bostonDF.columns, outputCol="features")
bostonFeaturizedDF = assembler.transform(bostonDF)
and calculate the correlations across the entire dataset
%python
from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(bostonFeaturizedDF, 'features').collect()[0][0]
pandasDF = pd.DataFrame(pearsonCorr.toArray())
pandasDF.index, pandasDF.columns = bostonDF.columns, bostonDF.columns # Labels our index and columns so we can interpret the results
Plot the results as a heatmap
%python
import matplotlib.pyplot as plt
import seaborn as sns
fig, ax = plt.subplots()
sns.heatmap(pandasDF)
display(fig.figure)
Exercise: EDA on the Bike Sharing Dataset
Calculate the count, mean, and standard deviation for each variable in the dataset. What does each variable signify? What is the spread of the data?
bikeDF = (spark
.read
.option("header", True)
.option("inferSchema", True)
.csv("/mnt/training/bikeSharing/data-001/hour.csv")
.drop("instant", "dteday", "casual", "registered", "holiday", "weekday")
)
display(bikeDF.describe())
Create a histogram of the variable cnt
.
display(bikeDF.select("cnt")) # Then click on the histogram plot
Create a barplot of counts by hour
display(bikeDF.select("hr", "cnt")) # Then click on barplot then group by hours and put cnt as values
Create a scattermatrix.
%python
# TODO
fig, ax = plt.subplots()
pandasDF = bikeDF.select("hr", "cnt").toPandas()
#pandasDF = bikeDF.toPandas()
pd.scatter_matrix(pandasDF)
display(fig.figure)
Calculate the correlations of the different variables.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
assembler = VectorAssembler(inputCols=bikeDF.columns, outputCol="features")
bikeFeaturizedDF = assembler.transform(bikeDF)
pearsonCorr = Correlation.corr(bikeFeaturizedDF, 'features').collect()[0][0]
pandasDF = pd.DataFrame(pearsonCorr.toArray())
pandasDF.index, pandasDF.columns = bikeDF.columns, bikeDF.columns
Plot the correlations as a heatmap.
%python
fig, ax = plt.subplots()
sns.heatmap(pandasDF)
display(fig.figure)
The main goal of exploratory analysis is to build intuition from the dataset that will determine future modeling decisions. Having a limited understanding of the data leads to downstream problems like unexpected outputs and errors as well as under-performing models.
Further reading:
https://spark.apache.org/docs/latest/ml-statistics.html
https://spark.apache.org/docs/latest/mllib-statistics.html#kernel-density-estimation
04: ML Workflows
CRISP-DM approach (Cross Industry Standard Process for Data Mining)
- Business and Data Understanding: ensures a rigorous understanding of both the business problem and the available data
- Data Preparation: involves cleaning data so that it can be fed into algorithms and create new features
- Modeling: entails training many models and many combinations of parameters for a given model
- Evaluation: compares model performance and chooses the best option
- Deployment: launches a model into production where it's used to inform business decision-making
Train/Test Split
spliting avoids the memorization of data, known as overfitting. Overfitting occurs when our model learns patterns caused by random chance rather than true signal. By evaluating our model's performance on unseen data, we can minimize overfitting.
Import the Boston dataset.
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
)
display(bostonDF)
Split the dataset into two DataFrames:
trainDF, testDF = bostonDF.randomSplit([0.8, 0.2], seed=42)
print("We have {} training examples and {} test examples.".format(trainDF.count(), testDF.count()))
Baseline Model
A baseline model offers an educated best guess to improve upon as different models are trained and evaluated. It represents the simplest model we can create. This is generally approached as the center of the data. In the case of regression, this could involve predicting the average of the outcome regardless of the features it sees. In the case of classification, the center of the data is the mode, or the most common class. A baseline model could also be a random value or a preexisting model. Through each new model, we can track improvements with respect to this baseline.
Create a baseline model by calculating the average housing value in the training dataset and append it on the test dataset:
from pyspark.sql.functions import avg
from pyspark.sql.functions import lit
trainAvg = trainDF.select(avg("medv")).first()[0]
print("Average home value: {}".format(trainAvg))
testPredictionDF = testDF.withColumn("prediction", lit(trainAvg))
display(testPredictionDF)
Evaluation and Improvement
The most common evaluation metric in regression tasks is mean squared error (MSE). The result is always positive and the lower the MSE, the better the model is performing.
Define the evaluator with the prediction column, label column, and use the MSE metric.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv", metricName="mse")
Evaluate testPredictionDF
using the .evaluator()
method.
testError = evaluator.evaluate(testPredictionDF)
print("Error on the test set for the baseline model: {}".format(testError))
This score indicates that the average squared distance between the true home value and the prediction of the baseline is about 79. Taking the square root of that number gives us the error in the units of the quantity being estimated. In other words, taking the square root of 79 gives us an average error of about $8,890.
bikeDF = (spark
.read
.option("header", True)
.option("inferSchema", True)
.csv("/mnt/training/bikeSharing/data-001/hour.csv")
.drop("instant", "dteday", "casual", "registered", "holiday", "weekday") # Drop unnecessary features
)
display(bikeDF)
trainBikeDF, testBikeDF = bikeDF.randomSplit([0.7, 0.3], seed=42)
avgTrainCnt = trainBikeDF.select(avg("cnt")).first()[0]
bikeTestPredictionDF = testBikeDF.withColumn("prediction", lit(avgTrainCnt))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="mse")
testError = evaluator.evaluate(bikeTestPredictionDF) # ~= 33255.26300731152
Use a linear regression model (explored in the previous lesson) to beat the baseline model score.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
featureCols = ["hr", "workingday", "weathersit", "temp", "atemp", "hum", "windspeed"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="newFeatures")
bikeFeaturizedDF2 = assembler.transform(bikeDF)
trainBikeFeaturizedDF, testBikeFeaturizedDF = bikeFeaturizedDF2.randomSplit([0.7, 0.3], seed=42)
lr = LinearRegression(labelCol="cnt", featuresCol="newFeatures")
lrModel = lrNewFeatures.fit(trainBikeFeaturizedDF)
predictionsFeaturizedDF = lrModel.transform(testBikeFeaturizedDF)
bikeevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="cnt", metricName="mse")
biketestError = evaluator.evaluate(predictionsFeaturizedDF)
print(biketestError) # 22663.224110566814
05: Featurization
MLlib
can refer to both the general machine learning library in Spark or the RDD-specific API. SparkML
refers to the DataFrame-specific API, which is preferred over working on RDD's wherever possible.
Spark's machine learning library, MLlib
, has three main abstractions:
- A transformer takes a DataFrame as an input and returns a new DataFrame with one or more columns appended to it.
- Transformers implement a
.transform()
method.
- Transformers implement a
- An estimator takes a DataFrame as an input and returns a model, which itself is a transformer.
- Estimators implements a
.fit()
method.
- Estimators implements a
- A pipeline combines together transformers and estimators to make it easier to combine multiple algorithms.
- Pipelines implement a
.fit()
method.
- Pipelines implement a
Featurization is the process of creating this input data for a model. There are a number of common featurization approaches:
- Encoding categorical variables
- Normalizing
- Creating new features
- Handling missing values
- Binning/discretizing
Categorical features refer to a discrete number of groups. In the case of the AirBnB dataset we'll use in this lesson, one categorical variable is room type. There are three types of rooms: Private room
, Entire home/apt
, and Shared room
.
A machine learning model does not know how to handle these room types. Instead, we must first encode each unique string into a number. Second, we must one-hot encode each of those values to a location in an array. This allows our machine learning algorithms to model effects of each category.
Import the AirBnB dataset.
airbnbDF = spark.read.parquet("/mnt/training/airbnb/sf-listings/sf-listings-correct-types.parquet")
display(airbnbDF)
Take the unique values of room_type
and index them to a numerical value. Fit the StringIndexer
estimator to the unique room types using the .fit()
method and by passing in the data.
The trained StringIndexer
model then becomes a transformer. Use it to transform the results using the .transform()
method and by passing in the data.
from pyspark.ml.feature import StringIndexer
uniqueTypesDF = airbnbDF.select("room_type").distinct() # Use distinct values to demonstrate how StringIndexer works
indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index") # Set input column and new output column
indexerModel = indexer.fit(uniqueTypesDF) # Fit the indexer to learn room type/index pairs
indexedDF = indexerModel.transform(uniqueTypesDF) # Append a new column with the index
display(indexedDF)
Now each room has a unique numerical value assigned. While we could pass the new room_type_index
into a machine learning model, it would assume that Shared room
is twice as much as Entire home/apt
, which is not the case. Instead, we need to change these values to a binary yes/no value if a listing is for a shared room, entire home, or private room.
Do this by training and fitting the OneHotEncoderEstimator
, which only operates on numerical values (this is why we needed to use StringIndexer
first)
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["room_type_index"], outputCols=["encoded_room_type"])
encoderModel = encoder.fit(indexedDF)
encodedDF = encoderModel.transform(indexedDF)
display(encodedDF)
.
Certain models, such as random forest, do not need one-hot encoding (and can actually be negatively affected by the process).
The new column encoded_room_type
is a vector. The difference between a sparse and dense vector is whether Spark records all of the empty values. In a sparse vector, like we see here, Spark saves space by only recording the places where the vector has a non-zero value. The value of 0 in the first position indicates that it's a sparse vector. The second value indicates the length of the vector.
Here's how to read the mapping above:
Shared room
maps to the vector[0, 0]
Entire home/apt
maps to the vector[0, 1]
Private room
maps to the vector[1, 0]
Imputing Null or Missing Data
Null values refer to unknown or missing data as well as irrelevant responses. Strategies for dealing with this scenario include:
- Dropping these records: Works when you do not need to use the information for downstream workloads
- Adding a placeholder (e.g.
-1
): Allows you to see missing data later on without violating a schema - Basic imputing: Allows you to have a "best guess" of what the data could have been, often by using the mean of non-missing data
- Advanced imputing: Determines the "best guess" of what data should be using more advanced strategies such as clustering machine learning algorithms or oversampling techniques such as SMOTE.
Try to determine why a value is null. This can provide information that can be helpful to the model.
Describe the dataset and take a look at the count
values. There's a fair amount of missing data in this dataset.
display(airbnbDF.describe())
Try dropping missing values.
countWithoutDropping = airbnbDF.count()
countWithDropping = airbnbDF.na.drop(subset=["zipcode", "host_is_superhost"]).count()
print("Count without dropping nulls:\t", countWithoutDropping)
print("Count with dropping nulls:\t", countWithDropping)
Another common option for working with missing data is to impute the missing values with a best guess for their value. Try imputing a list of columns with their median.
from pyspark.ml.feature import Imputer
imputeCols = [
"host_total_listings_count",
"bathrooms",
"beds",
"review_scores_rating",
"review_scores_accuracy",
"review_scores_cleanliness",
"review_scores_checkin",
"review_scores_communication",
"review_scores_location",
"review_scores_value"
]
imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
imputerModel = imputer.fit(airbnbDF)
imputedDF = imputerModel.transform(airbnbDF)
display(imputedDF)
Creating a Pipeline
Passing around estimator objects, trained estimators, and transformed dataframes quickly becomes cumbersome. Spark uses the convention established by scikit-learn
to combine each of these steps into a single pipeline. We can now combine all of these steps into a single pipeline.
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
indexer,
encoder,
imputer
])
The pipeline is itself is now an estimator. Train the model with its .fit()
method and then transform the original dataset. We've now combined all of our featurization steps into one pipeline with three stages.
pipelineModel = pipeline.fit(airbnbDF)
transformedDF = pipelineModel.transform(airbnbDF)
display(transformedDF)
Exercise: Finish Featurizing the Dataset
One common way of handling data is to divide it into bins, a process technically known as discretizing. For instance, the dataset contains a number of rating scores that can be translated into a value of 1
if they are a highly rated host or 0
if not.
Finish featurizing the dataset by binning the review scores rating into high versus low rated hosts. Also filter the extreme values and clean the column price
.
Step 1: Binning review_scores_rating
Divide the hosts by whether their review_scores_rating
is above 97. Do this using the transformer Binarizer
with the output column high_rating
. This should create the objects binarizer
and the transformed DataFrame transformedBinnedDF
.
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=97, inputCol="review_scores_rating", outputCol="high_rating")
transformedBinnedDF = binarizer.transform(transformedDF)
Step 2: Regular Expressions on Strings
Clean the column price
by creating two new columns:
price
: a new column that contains a cleaned version of price. This can be done using the regular expression replacement of"[\$,]"
with""
. Cast the column as a decimal.raw_price
: the columnprice
in its current form
from pyspark.sql.functions import col, regexp_replace
tmp = transformedBinnedDF.select(
"*",
col('price').alias('price_raw')
).drop('price')
transformedBinnedRegexDF = tmp.select(
"*",
regexp_replace('price_raw', r'[\$,]', '').alias('price').cast("Decimal")
)
display(transformedBinnedRegexDF)
Further reading:
06: Regression Modeling
Univariate regress: 2 variables
Multivariate regression: Multiple features.
Take the example of Boston housing data where we have median value for a number of neighborhoods and variables such as the number of rooms, per capita crime, and economic status of residents. We might have a number of questions about this data including:
- Is there a relationship between our features and median home value?
- If there is a relationship, how strong is that relationship?
- Which of the features affect median home value?
- How accurately can we estimate the effect of each feature on home value?
- How accurately can we predict on unseen data?
- Is the relationship between our features and home value linear?
- Are there interaction effects (e.g. value goes up when an area is not industrial and has more rooms on average) between the features?
There is a trade-off between model accuracy and interpretability
y = beta_0 + beta_1 * X
In this case, β0
and β1
are our coefficients where β0
represents the line's intercept with the Y axis and β1
represents the number we multiply by X in order to attain a prediction.
In the case of inferential statistics where we're interested in learning about the relationship between our input features and outputs, it's common to skip the train/test split step, as you'll see in this lesson.
Import the Boston dataset.
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
.drop("_c0")
)
display(bostonDF)
Create a column features
that has a single input variable rm
by using VectorAssembler
from pyspark.ml.feature import VectorAssembler
featureCol = ["rm"]
assembler = VectorAssembler(inputCols=featureCol, outputCol="features")
bostonFeaturizedDF = assembler.transform(bostonDF)
display(bostonFeaturizedDF)
Fit a linear regression model.
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="medv")
lrModel = lr.fit(bostonFeaturizedDF)
Interpreting a linear model entails answering a number of questions:
- What did the model estimate my coefficients to be?
- Are my coefficients statistically significant?
- How accurate was my model?
print("β0 (intercept): {}".format(lrModel.intercept))
print("β1 (coefficient for rm): {}".format(*lrModel.coefficients))
For a 5 bedroom home, our model would predict -35.7 + (9.1 * 5)
or $18,900
. That's not too bad.
The intercept of -34.7
doesn't make a lot of sense on its own since this would imply that a studio apartment would be worth negative dollars. Also, we don't have any 1 or 2 bedroom homes in our dataset, so the model will perform poorly on data in this range.
In order to determine whether our coefficients are statistically significant, we need to quantify the likelihood of seeing the association by chance. One way of doing this is using a p-value. As a general rule of thumb, a p-value of under .05 indicates statistical significance in that there is less than a 1 in 20 chance of seeing the correlation by mere chance.
Do this using the summary
attribute of lrModel
.
summary = lrModel.summary
summary.pValues
# [0.0, 0.0]
These small p-values indicate that it is highly unlikely to see the correlation of the number of rooms to housing price by chance. The first value in the list is the p-value for the rm
feature and the second is that for the intercept.
Finally, we need a way to quantify how accurate our model is. R2 is a measure of the proportion of variance in the dataset explained by the model. With R2, a higher number is better.
summary.r2
# 0.4835254559913341
This indicates that 48% of the variability in home value can be explained using rm
and the intercept. While this isn't too high, it's not too bad considering that we're training a model using only one variable.
Finally, take a look at the summary
attribute of lrModel
so see other ways of summarizing model performance.
[attr for attr in dir(summary) if attr[0] != "_"]
# ['coefficientStandardErrors', 'degreesOfFreedom', 'devianceResiduals', 'explainedVariance', 'featuresCol', 'labelCol', 'meanAbsoluteError', 'meanSquaredError', 'numInstances', 'objectiveHistory', 'pValues', 'predictionCol', 'predictions', 'r2', 'r2adj', 'residuals', 'rootMeanSquaredError', 'tValues', 'totalIterations']
Multivariate Regression
The equation for multivariate regression looks like the following where each feature p
has its own coefficient:
Y ≈ β0 + β1X1 + β2X2 + ... + βpXp
Train a multivariate regression model using rm
, crim
, and lstat
as the input features.
from pyspark.ml.feature import VectorAssembler
featureCols = ["rm", "crim", "lstat"]
assemblerMultivariate = VectorAssembler(inputCols=featureCols, outputCol="features")
bostonFeaturizedMultivariateDF = assemblerMultivariate.transform(bostonDF)
display(bostonFeaturizedMultivariateDF)
Train the model.
from pyspark.ml.regression import LinearRegression
lrMultivariate = (LinearRegression()
.setLabelCol("medv")
.setFeaturesCol("features")
)
lrModelMultivariate = lrMultivariate.fit(bostonFeaturizedMultivariateDF)
summaryMultivariate = lrModelMultivariate.summary
Take a look at the coefficients and R2 score.
print("β0 (intercept): {}".format(lrModelMultivariate.intercept))
for i, (col, coef) in enumerate(zip(featureCols, lrModelMultivariate.coefficients)):
print("β{} (coefficient for {}): {}".format(i+1, col, coef))
# β0 (intercept): -2.562251011927113 β1 (coefficient for rm): 5.2169549243939874 β2 (coefficient for crim): -0.1029408867181515 β3 (coefficient for lstat): -0.578485819633508
print("\nR2 score: {}".format(lrModelMultivariate.summary.r2))
# R2 score: 0.6458520515781128
Our R2 score improved from 48% to 64%, indicating that our new model can explain more of the variance in the data.
Exercise: Improve on our Model
Step 1: Prepare the Features for a New Model
Prepare a new column allFeatures
for a new model that uses all of the features in bostonDF
except for the label medv
. Create the following variables:
allFeatures
: a list of all the column namesassemblerAllFeatures
: AVectorAssembler
that usesallFeatures
to create the output columnallFeatures
bostonFeaturizedAllFeaturesDF
: The transformedbostonDF
allFeatures = bostonDF.schema.names[0:-1]
assemblerAllFeatures = VectorAssembler(inputCols=allFeatures, outputCol="allFeatures")
bostonFeaturizedAllFeaturesDF = assemblerAllFeatures.transform(bostonDF)
Step 2: Train the Model
Create a linear regression model lrAllFeatures
. Save the trained model to lrModelAllFeatures.
lrAllFeatures = (LinearRegression()
.setLabelCol("medv")
.setFeaturesCol("allFeatures")
)
lrModelAllFeatures = lrAllFeatures.fit(bostonFeaturizedAllFeaturesDF)
Step 3: Interpret the Coefficients and Variance Explained
Take a look at the coefficients and variance explained. What do these mean?
print("β0 (intercept): {}".format(lrModelAllFeatures.intercept))
for i, (col, coef) in enumerate(zip(allFeatures, lrModelAllFeatures.coefficients)):
print("β{} (coefficient for {}): {}".format(i+1, col, coef))
nox and rm have large values
print(lrModelAllFeatures.summary.r2)
r2 is 0.74 meaning it explains 74% of the variance
Step 4: Interpret the Statistical Significance of the Coefficients
Print out the p-values associated with each coefficient and the intercept. Which were statistically significant?
print("β0 (intercept pval): {}".format(lrModelAllFeatures.summary.pValues[0]))
for i, (col, coef) in enumerate(zip(allFeatures, lrModelAllFeatures.summary.pValues[1:])):
print("β{} (coefficient pval for {}): {}".format(i+1, col, coef))
Low p-value mean statistically significant e.g. age , black.
Spark standardizes each feature by default so the user does not need to take this pre-processing step.
07: Classification
While linear regression models continuous variables, logistic regression classifies variables into two or more groups. This lesson trains binomial and multi-class logistic regression models and examines evaluation metrics for classification tasks.
Logistic regression gives probability (0-1).
False +: non-cancerous cells being classified as cancerous
False -: Cancerous cells being classified as non-cancerous
If 0.2 threshold: More Flase + and less False -.
Linear Regression: Y ≈ β0 + β1X
Logistic Regression: p(X) ≈ β0 + β1X.
Note that p(X) is in log odds.
Read in cancer data:
from pyspark.sql.functions import col
cols = ["index",
"sample-code-number",
"clump-thickness",
"uniformity-of-cell-size",
"uniformity-of-cell-shape",
"marginal-adhesion",
"single-epithelial-cell-size",
"bare-nuclei",
"bland-chromatin",
"normal-nucleoli",
"mitoses",
"class"]
cancerDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/cancer/biopsy/biopsy.csv")
)
cancerDF = (cancerDF # Add column names and drop nulls
.toDF(*cols)
.withColumn("bare-nuclei", col("bare-nuclei").isNotNull().cast("integer"))
)
display(cancerDF)
View the class balance by grouping the data on our label.
In the case of unbalanced classes, one solution is to add weights into the model allowing for a greater penalty for misclassifying the underrepresented class.
display(cancerDF.groupby("class").count())
Turn the label into a numerical value and create a column feature
for just clump-thickness
.
from pyspark.ml.feature import StringIndexer, VectorAssembler
indexer = StringIndexer(inputCol="class", outputCol="is-malignant")
cancerIndexedDF = indexer.fit(cancerDF).transform(cancerDF)
assembler = VectorAssembler(inputCols=["clump-thickness"], outputCol="features")
cancerIndexedAssembledDF = assembler.transform(cancerIndexedDF)
display(cancerIndexedAssembledDF)
Train the logistic regression model using only the input feature clump-thickness
and look at the resulting coefficient.
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol="features", labelCol="is-malignant")
logrModel = logr.fit(cancerIndexedAssembledDF)
print("β0 (intercept): {}".format(logrModel.intercept)) # -5.16
print("β1 (coefficient for rm): {}".format(logrModel.coefficients[0])) # 0.93
Model Interpretation
In linear regression, increasing a given feature X
by one unit increased our prediction by X
multiplied by the corresponding coefficient. In logistic regression, increasing a given feature X
by one unit increases the log odds by the corresponding coefficient.
The odds that a clump size of 9 is cancerous is
exp(β0 + β1X )
exp(-5.2 + .94 x 9)
exp(3.26)
26
26 to 1, or 0.96.
The probability
column gives us the probability of being of class 0 and of being in class 1.
from pyspark.sql.functions import desc
transformedDF = logrModel.transform(cancerIndexedAssembledDF)
display(transformedDF
.select("clump-thickness", "class", "is-malignant", "probability")
.distinct()
.orderBy(desc("clump-thickness"))
)
Calculate this value manually using the intercept and coefficient.
from math import exp
odds = exp(-5.2 + .94 * 9) # Exponentiate the formula since logistic regression is log odds
probability = 1 - (1 / odds) # Convert odds to probability
print("Prediction for odds of cancer: {} to 1".format( round(odds, 1) ))
print("Prediction for probability of cancer: {}".format(probability))
Take a look at how the model predicted the probability.
(transformedDF
.filter(col("clump-thickness") == 9)
.select("probability")
.first()[0]
)
Model Performance
A confusion matrix is a table that reports the number of false positives, false negatives, true positives, and true negatives. This is the basis of a number of different evaluation metrics such as accuracy, precision, and area under the ROC curve.
Accuracy is calculated by adding the true positives and true negatives and dividing by the total number of records.
The best accuracy metric for a given use case depends on how we want to optimize between true and false predictions.
Print out the confusion matrix.
n = transformedDF.count()
TP = transformedDF.filter((col("is-malignant") == 1) & (col("prediction") == 1)).count()
FP = transformedDF.filter((col("is-malignant") == 0) & (col("prediction") == 1)).count()
FN = transformedDF.filter((col("is-malignant") == 1) & (col("prediction") == 0)).count()
TN = transformedDF.filter((col("is-malignant") == 0) & (col("prediction") == 0)).count()
print("n = {}\t\t\tPredicted Positive \tPredicted Negative".format(n))
print("Actually Positive\t{} \t\t\t{}".format(TP, FP))
print("Actually Negative\t{} \t\t\t{}".format(FN, TN))
One way of visualizing the trade-off between true positives and false positives is by plotting them on a ROC Curve.
import matplotlib.pyplot as plt
import seaborn as sns
falsePositives, truePositives = [], []
for row in logrModel.summary.roc.collect():
falsePositives.append(row.FPR)
truePositives.append(row.TPR)
fig = plt.figure()
plt.plot(falsePositives, truePositives)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
display(fig)
The associated evaluation metric takes the total area under this curve, which is bounded between 0 and 1.
logrModel.summary.areaUnderROC
Retrain the model using all of the relevant features excluding the index and sequence number.
assembler = VectorAssembler(inputCols=cols[2:-1], outputCol="features")
cancerIndexedAssembledDF2 = assembler.transform(cancerIndexedDF)
logr = LogisticRegression(labelCol="is-malignant", featuresCol="features")
logrModel2 = logr.fit(cancerIndexedAssembledDF2)
logrModel2.coefficients
Take a look at how the area under the ROC curve has improved.
logrModel2.summary.areaUnderROC
Multi-Class Classification
Classification between multiple classes using logistic regression is known as multi-class or multinomial logistic regression. To predict between different classes, set the family
argument for LogisticRegression
to multinomial
. This model then provides probabilities associated with each class.
Import the iris data and clean the column names
irisDF = (spark.read
.option("INFERSCHEMA", True)
.option("HEADER", True)
.csv("/mnt/training/iris/iris.csv")
.drop("_c0")
)
cols = [col.replace(".", "-").lower() for col in irisDF.columns]
irisDF = irisDF.toDF(*cols)
display(irisDF)
Create a pipeline of three stages:
indexer
: aStringIndexer
that takesspecies
as an input and outputsspeciesClass
assembler
: aVectorAssembler
that takes all the features in the dataset and outputsfeatures
multinomialRegression
: aLogisticRegression
that takesfeatures
and uses themultinomial
family
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol="species", outputCol="speciesClass")
allFeatures = irisDF.columns[:-1]
assembler = VectorAssembler(inputCols=allFeatures, outputCol="features")
multinomialRegression = LogisticRegression(featuresCol='features',
labelCol='speciesClass',
family="multinomial")
pipeline = Pipeline(stages=[
indexer,
assembler,
multinomialRegression
])
Step 3: Train the Model and Transform the Dataset
Train the model and save the results to multinomialModel
and save the transformed dataset to irisTransformedDF
.
multinomialModel = pipeline.fit(irisDF)
irisTransformedDF = multinomialModel.transform(irisDF)
Notice how we have probabilities and raw log odds predictions for all three classes.
display(irisTransformedDF)
Question: What are the various ways that I can evaluate model performance in binomial classification?
Answer: Evaluating model performance in the case of binomial regression is more complex than in the case of regression, where most metrics are based around the distance between the true and predicted values. These metrics are different trade-offs between true positives (TP), false positives (FP), true negatives (TN), and false negatives (FN). Some common metrics include:
- Sensitivity = TP / (TP + FN)
- Specificity = TN / (TN + FP)
- Precision = TP / (TP + FP)
- Accuracy = (TP + TN) / (TP + TN + FP + FN)
08: Model Selection
Machine learning entails training different models and different parameters (hyperparameters). A hyperparameter is a parameter used in a machine learning algorithm that is set before the learning process begins. In other words, a machine learning algorithm cannot learn hyperparameters from the data itself.
k-fold validation: 25% split then
- Train, Train, Validate, Not Used
- Train, Validate, Train, Not Used
- Validate, Train, Train, Not Used
- Train, Train, Train, Final Evaluation
Hyperparameter Tuning
You can explore these hyperparameters by using the .explainParams()
method on a model.
Grid search is the process of exhaustively trying every combination of hyperparameters. It takes all of the values we want to test and combines them in every possible way so that we test them using cross-validation. https://en.wikipedia.org/wiki/Hyperparameter_optimization
train/test split on the Boston dataset and building a pipeline for linear regression.
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
.drop("_c0")
)
trainDF, testDF = bostonDF.randomSplit([0.8, 0.2], seed=42)
assembler = VectorAssembler(inputCols=bostonDF.columns[:-1], outputCol="features")
lr = (LinearRegression()
.setLabelCol("medv")
.setFeaturesCol("features")
)
pipeline = Pipeline(stages = [assembler, lr])
Take a look at the model parameters using the .explainParams()
method.
print(lr.explainParams())
ParamGridBuilder()
allows us to string together all of the different possible hyperparameters we would like to test. In this case, we can test the maximum number of iterations, whether we want to use an intercept with the y axis, and whether we want to standardize our features. This gives twelve combinations.
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(lr.maxIter, [1, 10, 100])
.addGrid(lr.fitIntercept, [True, False])
.addGrid(lr.standardization, [True, False])
.build()
)
Cross-Validation
An exhaustive approach to cross-validation would include every possible split of the training set. https://scikit-learn.org/stable/modules/cross_validation.html
Create a RegressionEvaluator()
to evaluate our grid search experiments and a CrossValidator()
to build our models.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
evaluator = RegressionEvaluator(
labelCol = "medv",
predictionCol = "prediction"
)
cv = CrossValidator(
estimator = pipeline, # Estimator (individual model or pipeline)
estimatorParamMaps = paramGrid, # Grid of parameters to try (grid search)
evaluator=evaluator, # Evaluator
numFolds = 3, # Set k to 3
seed = 42 # Seed to sure our results are the same if ran again
)
Fit the CrossValidator()
Take a look at the scores from the different experiments.
for params, score in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
print("\tScore: {}".format(score))
You can then access the best model using the .bestModel
attribute.
bestModel = cvModel.bestModel
print(bestModel.stages[-1]._java_obj.getMaxIter())
print(bestModel.stages[-1]._java_obj.getFitIntercept())
print(bestModel.stages[-1]._java_obj.getStandardization())
Saving Models and Predictions
Spark can save both the trained model we created as well as the predictions. For online predictions such as on a stream of new data, saving the trained model and using it with Spark Streaming is a common application. It's also common to retrain an algorithm as a nightly batch process and save the results to a database or parquet table for later use.
Save the best model.
modelPath = userhome + "/cvPipelineModel"
dbutils.fs.rm(modelPath, recurse=True)
cvModel.bestModel.save(modelPath)
Take a look at where it saved.
dbutils.fs.ls(modelPath)
[FileInfo(path='dbfs:/user/EMAIL/cvPipelineModel/metadata/', name='metadata/', size=0), FileInfo(path='dbfs:/user/EMAIL/cvPipelineModel/stages/', name='stages/', size=0)]
Save predictions made on testDF
.
predictionsPath = userhome + "/modelPredictions.parquet"
cvModel.bestModel.transform(testDF).write.mode("OVERWRITE").parquet(predictionsPath)
Exercise: Tuning a Model
Use grid search and cross-validation to tune the hyperparameters from a logistic regression model.
Step 1: Import the Data
Import the data and perform a train/test split.
from pyspark.sql.functions import col
cols = ["index",
"sample-code-number",
"clump-thickness",
"uniformity-of-cell-size",
"uniformity-of-cell-shape",
"marginal-adhesion",
"single-epithelial-cell-size",
"bare-nuclei",
"bland-chromatin",
"normal-nucleoli",
"mitoses",
"class"]
cancerDF = (spark.read # read the data
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/cancer/biopsy/biopsy.csv")
)
cancerDF = (cancerDF # Add column names and drop nulls
.toDF(*cols)
.withColumn("bare-nuclei", col("bare-nuclei").isNotNull().cast("integer"))
)
display(cancerDF)
Perform a train/test split to create trainCancerDF
and testCancerDF
. Put 80% of the data in trainCancerDF
seed = 42
trainCancerDF, testCancerDF = cancerDF.randomSplit([0.8, 0.2], seed=seed)
Step 2: Create a Pipeline
Create a pipeline cancerPipeline
that consists of the following stages:
indexer
: aStringIndexer
that takesclass
as an input and outputs the columnis-malignant
assembler
: aVectorAssembler
that takes all of the other columns as an input and outputs the columnfeatures
logr
: aLogisticRegression
that takesfeatures
as the input andis-malignant
as the output variable
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
indexer = StringIndexer(inputCol="class", outputCol="is-malignant")
assembler = VectorAssembler(inputCols=cancerDF.columns[2:-1], outputCol="features")
logr = LogisticRegression(featuresCol='features', labelCol='is-malignant')
cancerPipeline = Pipeline(stages=[indexer, assembler, logr])
Step 3: Create Grid Search Parameters
Take a look at the parameters for our LogisticRegression
object. Use this to build the inputs to grid search.
print(logr.explainParams())
Create a ParamGridBuilder
object with two grids:
- A regularization parameter
regParam
of[0., .2, .8, 1.]
- Test both with and without an intercept using
fitIntercept
from pyspark.ml.tuning import ParamGridBuilder
cancerParamGrid = (ParamGridBuilder()
.addGrid(logr.regParam, [0., .2, .8, 1.])
.addGrid(lr.fitIntercept, [True, False])
.build()
)
Step 4: Perform 3-Fold Cross-Validation
Create a BinaryClassificationEvaluator
object and use it to perform 3-fold cross-validation.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
binaryEvaluator = BinaryClassificationEvaluator(
labelCol = "is-malignant",
metricName = "areaUnderROC"
)
cancerCV = CrossValidator(
estimator = cancerPipeline, # Estimator (individual model or pipeline)
estimatorParamMaps = cancerParamGrid, # Grid of parameters to try (grid search)
evaluator= binaryEvaluator, # Evaluator
numFolds = 3, # Set k to 3
seed = 42 # Seed to sure our results are the same if ran again
)
cancerCVModel = cancerCV.fit(trainCancerDF)
Step 5: Examine the results
Take a look at the results. Which combination of hyperparameters learned the most from the data?
for params, score in zip(cancerCVModel.getEstimatorParamMaps(), cancerCVModel.avgMetrics):
print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
print("\tScore: {}".format(score))
bestModel = cancerCVModel.bestModel
print(bestModel.stages[-1]._java_obj.getRegParam())
print(bestModel.stages[-1]._java_obj.getFitIntercept())
Question: How can I further improve my predictions?
Answer: There are a number of different strategies including:
- Different models: train different models such as a random forest or gradient boosted trees
- Expert knowledge: combine the current pipeline with domain expertise about the data
- Better tuning continue tuning with more hyperparameters to choose from
- Feature engineering create new features for the model to train on
- Ensemble models combining predictions from multiple models can produce better results than any one model
You can use http://mleap-docs.combust.ml/ for training pipelines and exporting them to an MLeap Bundle. It supports Spark, Scikit-learn and Tensorflow
09: Capstone Project: An End-to-End ML Model
The goal of this project is to build an end-to-end machine learning model from data exploration and featurization through model training and hyperparameter tuning.
Use data from http://insideairbnb.com/get-the-data.html. It has a cool tool where you can explore the data http://insideairbnb.com/london/ http://insideairbnb.com/san-francisco/
This exercise elaborates on the Airbnb dataset used in the lesson on featurization. The goals are as follows:
- Perform exploratory analysis
- Featurize the dataset
- Train a linear regression model
- Tune the model hyperparameters using grid search and cross-validation
Exercise 1: Exploratory Analysis
Import the dataset and perform exploratory analysis including calculating summary statistics and plotting histograms, a scatterplot, and a heatmap of the city of San Francisco.
Import the data, sitting in /mnt/training/airbnb/sf-listings/sf-listings-clean.parquet
filePath = "/mnt/training/airbnb/sf-listings/sf-listings-clean.parquet"
initDF = spark.read.parquet(filePath)
Calculate summary statistics on the data to answer the following questions:
- What do each of the variables mean?
host_total_listings_count: Number of listings for each host?
neighbourhood_cleansed: Wherever the property is associated with a neighborhood
zipcode: 5 digit size code
latitude: Latitude of property
longitude: Longitude or property
property_type: Type of property
room_type: Type of room e.g. Entire home, Shared room.
accommodates: How many occupants it accommodates?
bathrooms: Number of bathrooms in the property
bedrooms: Number of bedrooms in the property
beds: Number of beds available
bed_type: Type of bed e.g. airbed
minimum_nights: The shortest period of time you can stay?
number_of_reviews: Number of reviews for each property
review_scores_rating: 0-100 overall rating
review_scores_accuracy: 0-10 accuracy from listing to actual
review_scores_cleanliness: 0-10 cleanliness
review_scores_checkin: 0-10 experience of checkin
review_scores_communication: 0-10 experience of communication
review_scores_location: 0-10 location of property
review_scores_value: 0-10 value for money
price_raw: Price with $
price: Price as float
2. Are there missing values?
host_total_listings_count: N
neighbourhood_cleansed: Y
zipcode: Y
latitude: N
longitude: N
property_type: Y
room_type: Y
accommodates: N
bathrooms: N
bedrooms: N
beds: N
bed_type: N
minimum_nights: N
number_of_reviews: N
review_scores_rating: N
review_scores_accuracy: N
review_scores_cleanliness: N
review_scores_checkin: N
review_scores_communication: N
review_scores_location: N
review_scores_value: N
price_raw: N
price: Y
3. How many records are in the dataset?
4759
4. What is the mean and standard deviation of the dataset?
host_total_listings_count: 6, 20
neighbourhood_cleansed: N/A
zipcode: N/A
latitude: 37.7, 0.02
longitude: -122, 0.02
property_type: N/A
room_type: N/A
accommodates: 3.4, 2.2
bathrooms: 1.3, 1.2
bedrooms: 1.4, 1.4
beds: 1.89, 1.59
bed_type: N/A
minimum_nights: 7, 13.3
number_of_reviews: 50, 68
review_scores_rating: 9.8, 0.4
review_scores_accuracy: 9.8, 0.4
review_scores_cleanliness: 9.7, 0.6
review_scores_checkin: 9.9, 0.3
review_scores_communication: 9.6, 0.6
review_scores_location: 9.6, 0.5
review_scores_value: 9.5, 0.6
price_raw: N/A
price: 222. 374
5. Which variables are continuous and which are categorical?
host_total_listings_count: con
neighbourhood_cleansed: cat
zipcode: cat
latitude: con
longitude: con
property_type: cat
room_type: cat
accommodates: con
bathrooms: con
bedrooms: con
beds: con
bed_type: cat
minimum_nights: con
number_of_reviews: con
review_scores_rating: con
review_scores_accuracy: con
review_scores_cleanliness: con
review_scores_checkin: con
review_scores_communication: con
review_scores_location: con
review_scores_value: con
price_raw: con
price: con
Drop the column price_raw
and cache the results
airbnbDF = initDF.drop("price_raw")
airbnbDF.cache()
Create a histogram of price
display(airbnbDF.select('price'))
# Then click on histogram
Is this a Log Normal distribution? Yes it's looks Gaussian in log space
from pyspark.sql.functions import log, col
display(airbnbDF.select('price').withColumn('log_price', log(col('price'))))
Create a scattermatrix (a matrix of scatterplots) to look at how the following variables correlate together:
price
host_total_listings_count
review_scores_rating
number_of_reviews
bathrooms
display(airbnbDF.select('price', 'host_total_listings_count', 'review_scores_rating', 'number_of_reviews', 'bathrooms')) # and click on scatter
# or
%python
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
fig, ax = plt.subplots()
pandasDF = airbnbDF.select('price', 'host_total_listings_count', 'review_scores_rating', 'number_of_reviews', 'bathrooms').toPandas()
pd.scatter_matrix(pandasDF)
display(fig.figure)
Which neighborhoods have the highest number of rentals? Display the neighborhoods and their associated count in descending order.
display(airbnbDF.select('neighbourhood_cleansed', 'host_total_listings_count').groupBy('neighbourhood_cleansed').count())
# Then click the arrow on count
Map the various listings by price. Since Databricks notebooks can display arbitrary HTML code, use the code provided.
%python
from pyspark.sql.functions import col
try:
airbnbDF
except NameError: # Looks for local table if airbnbDF not defined
airbnbDF = spark.table("airbnb")
v = ",\n".join(map(lambda row: "[{}, {}, {}]".format(row[0], row[1], row[2]), airbnbDF.select(col("latitude"),col("longitude"),col("price")/600).collect()))
displayHTML("""
<html>
<head>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.3.1/dist/leaflet.css"
integrity="sha512-Rksm5RenBEKSKFjgI3a41vrjkw4EVPlJ3+OiI65vTjIdo9brlAacEuKOiQ5OFh7cOI1bkDwLqdLw3Zg0cRJAAQ=="
crossorigin=""/>
<script src="https://unpkg.com/leaflet@1.3.1/dist/leaflet.js"
integrity="sha512-/Nsx9X4HebavoBvEBuyp3I7od5tA0UzAxs+j83KgC8PU0kgB4XiK4Lfe4y4cgBtaRJQEIFCW+oC506aPT2L1zw=="
crossorigin=""></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/leaflet.heat/0.2.0/leaflet-heat.js"></script>
</head>
<body>
<div id="mapid" style="width:700px; height:500px"></div>
<script>
var mymap = L.map('mapid').setView([37.7587,-122.4486], 12);
var tiles = L.tileLayer('http://{s}.tile.osm.org/{z}/{x}/{y}.png', {
attribution: '© <a href="http://osm.org/copyright">OpenStreetMap</a> contributors',
}).addTo(mymap);
var heat = L.heatLayer([""" + v + """], {radius: 25}).addTo(mymap);
</script>
</body>
</html>
""")
Exercise 2: Featurization
Featurize the dataset by building a pipeline that accomplishes the following:
- Indexes all categorical variables
- One-hot encodes the indexed values
- Combines all features into a
features
column
Finally, perform a train/test split.
from pyspark.ml.feature import StringIndexer
iNeighbourhood = StringIndexer(inputCol="neighbourhood_cleansed", outputCol="cat_neighborhood", handleInvalid="skip")
iRoomType = StringIndexer(inputCol="room_type", outputCol="cat_room_type", handleInvalid="skip")
iZipCode = StringIndexer(inputCol="zipcode", outputCol="cat_zipcode", handleInvalid="skip")
iPropertyType = StringIndexer(inputCol="property_type", outputCol="cat_property_type", handleInvalid="skip")
iBedType= StringIndexer(inputCol="bed_type", outputCol="cat_bed_type", handleInvalid="skip")
from pyspark.ml.feature import OneHotEncoderEstimator
oneHotEnc = OneHotEncoderEstimator(inputCols=["cat_neighborhood", "cat_room_type", "cat_zipcode", "cat_property_type", "cat_bed_type"],
outputCols=["vec_neighborhood", "vec_room_type", "vec_zipcode", "vec_property_type", "vec_bed_type"])
seed = 42
(testDF, trainDF) = airbnbDF.randomSplit([0.2, 0.8], seed=seed)
Use VectorAssembler to combine the features.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
Put this all together in a pipeline.
from pyspark.ml import Pipeline
featurizationPipeline = Pipeline(stages=[
iNeighbourhood, iRoomType, iZipCode, iPropertyType, iBedType,
oneHotEnc,
assembler
])
trainTransformedDF = featurizationPipeline.fit(trainDF).transform(trainDF)
testTransformedDF = featurizationPipeline.fit(testDF).transform(testDF)
Use the average of price
for your baseline model. Save that value to baselinePrediction
from pyspark.sql.functions import avg
baselinePrediction = trainTransformedDF.select(avg("price")).first()[0]
Create a RegressionEvaluator
object named evaluator
with price
as the label, prediction
as the prediction column, and mse
as the metric.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="mse")
Create the DataFrame baselinePredictionDF
that contains the transformed test data testTransformedDF
with a new column prediction
that has the same baseline prediction for all observations in the dataset.
from pyspark.sql.functions import lit
baselinePredictionDF = testTransformedDF.withColumn("prediction", lit(baselinePrediction))
Evaluate the results:
metricName = evaluator.getMetricName()
metricVal = evaluator.evaluate(baselinePredictionDF)
print("{}: {}".format(metricName, metricVal))
# 158205.77280874032
Train a Linear Regression Model
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
lr = LinearRegression(labelCol="price", featuresCol="features")
pipelineFull = Pipeline(stages=[featurizationPipeline, lr])
pipelineFullModel = pipelineFull.fit(trainDF)
Evaluate the performance of linear regression over the baseline model.
metricName = evaluator.getMetricName()
metricVal = evaluator.evaluate(pipelineFullModel.transform(testDF))
print("{}: {}".format(metricName, metricVal))
# 129976.29251042467
Exercise 4: Hyperparameter Tuning
Use 3-fold cross-validation to tune the hyperparameters of the linear regression model.
print(lr.explainParams())
Build a grid of parameters for cross-validation.
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(lr.elasticNetParam, [0.0, 1.0])
.addGrid(lr.maxIter, [1, 10, 100])
.addGrid(lr.fitIntercept, [True, False])
.build()
)
Build the cross-validator.
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(
estimator = pipelineFull,
estimatorParamMaps = paramGrid,
evaluator=evaluator,
numFolds = 3
)
Fit the cross-validator to the training data.
cvModel = cv.fit(trainDF)
Examine the results.
for params, score in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
print("\tScore: {}".format(score))
See the best fit
bestModel = cvModel.bestModel
print(bestModel.stages[-1]._java_obj.getElasticNetParam())
print(bestModel.stages[-1]._java_obj.getMaxIter())
print(bestModel.stages[-1]._java_obj.getFitIntercept())
Evaluate on training and test data
evaluator.evaluate(cvModel.bestModel.transform(trainDF))
# 103136.8496131469
evaluator.evaluate(cvModel.bestModel.transform(testDF))
# 129981.5354094153