Baseline
The data preprocessing speedup baseline is set by a not fully parallelized model.
The parallel data preprocessing step with Spark takes the most important part in our execution. Our preprocessing process baseline is mostly based on the script general_preprocess.py.
Baseline scaling
Line # Hits Time(ms) Per Hit(ms) % Time Line Content
Code Profiling with baseline script
Initialize Spark Session, ~ 40%, O(1), fixed
Create Spark Dataframe, ~45%, O(1), fixed
orderBy(‘window’) in Spark, ~ 9%, O(NlogN)
Independent transformation in Spark, ~ 5%, O(N)
Initialize Spark Session, ~ 19%, O(1), fixed
Create Spark Dataframe, ~21%, O(1), fixed
orderBy(‘window’) in Spark, ~ 14%, O(NlogN)
Independent transformation in Spark, ~ 46%, O(N)
For small data size, fixed time related with Spark takes about 85% of our time cost. When the dataset (news + price) increased to almost 20x, with the same model, such fixed time decrease to 40%.
Window number depends on the total date we use, so the time period in the larger data set (2009-2021) will make the window increase to 3 times, the 'Orderby' part can be increased to 14%.
The other parallelizable part will come to 46% of the total time.
Improved model
Initialize Spark Session, ~ 4%, O(1), fixed
Create Spark Dataframe, ~5%, O(1), fixed
orderBy(‘window’) in Spark, ~ 89%, O(NlogN)
Independent transformation in Spark, ~ 2%, O(N)
With more parallel improvements in our code of data preprocessing, now the ''Orderby' in Spark takes almost 89% in total time cost.
The fixed time with Spark initialization and Dataframe creation only takes 9% and other parallelizable parts take 2% in total.
Line # Hits Time(ms) Per Hit(ms) % Time Line Content
Code Profiling with improved script
Speedup
Defined as a fraction of code that can be parallelized
S(n, p) = T(n, 1)/T(n, p)
With the comparison, we also discuss the scaling with our improved script.
Multi - Core
AWS: Ubuntu 18.04, m4.4xlarge(16vcpus + 64GiB memory)
Library Installed: Pyspark == 3.1.1, python == 3.9.1,
Java version: openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b100ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
CPU: Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Dependencies: pyspark, datatime, time, vaderSentiment, numpy, pandas, sys
Our figure in the right almost fit the theoretical result well at the beginning and it gives at most 2.5x acceleration.
When the core number comes to larger than 6, the speedup decreases a little because of the system communication and synchronization and shows a little worse than the theoretical result.
However, this improved model shows weak scaling, this may because the fixed complexity part has been decreased to a very low level, which makes the increase of data size do not really matter the total speedup.
Existing work running on the local machine is 109s.
The figure named improved speedup vs. Baseline shows the improved model easily has more significant acceleration than that of the best theoretical performance baseline.
To define the optimization performance of Spark, we refer to a conclusion from a paper by Prasad Perera:
"For p number of processes, T (n) = (n / k) log (n / k)
where k satisfies = 2k-1 < p < = 2k"
Then we have the theoretical speedup and we can compare it to our real performance on AWS as shown in the figure below.
Multi - Instance
AWS: emr-6.3.0: Spark 3.1.1 on Hadoop 3.2.1 with 1-8 core instances (m4.xlarge)
python == 3.9.1
CPU: Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Latency: rtt min/avg/max/mdev = 0.311/0.339/0.363/0.021 ms
Bandwidth:
Interval Transfer Bandwidth
0.0-10.0sec 1.22GBytes 1.05Gbits/sec
Dependencies: pyspark, datatime, time, vaderSentiment, numpy, pandas, sys
We also test the data preprocessing script on the EMR instance.
The execution time decreases a little with more given instances. And the speedup grows slowly with it.
However, the performance improvement here is not so good as that in cores.
The reason may from our special calculation in Spark, using window function to generate our multi days' data set, which might be hard to be split and improved between different instances.
The execution time and speedup information we get on EMR shows slow improvement here.
LSTM training Speedup with GPU and cuDNN Toolkit
Harvard Cluster seas_dgx1 partition
CPU: Intel(R) Xeon(R) CPU E5-2698 v4 @ 2.20GHz
GPU: Tesla V100-SXM2-16GB
Dependencies: pyspark, datatime, time, vaderSentiment, numpy, pandas, sys
Specs of Harvard Cannon virtual machines where we trained our LSTM models in parallel
log1: COG_2009_1, without GPU, 3s45ms
log2: COG_2009_1, with GPU, 1s12ms
log3: COG_2009_5, without GPU, 2s34ms
log4: COG_2009_5, with GPU, 1s10ms
log5: COG_2016_1, without GPU, 1s27ms
log6: COG_2016_1, with GPU, 0s12ms
log7: COG_2016_5, without GPU, 1s43ms
log8: COG_2016_5, with GPU, 0s13ms
seas_dgx1 partition, 16GB, 4 CPUs, 1 GPU
Dependencies: pyspark, datatime, time, vaderSentiment, numpy, pandas, sys
We use GPU and CUDA Deep Neural Network Library to accelerate the LSTM model training and do a comparison as shown above.
On Harvard cluster, with Xeon CPU and V-100 GPU. We observed a significant speedup about 11 times with smaller datasets. The speedup on larger datasets is about 3 times.
So training model on Harvard Cluster with GPU and CUDA saved us a lot of time. And we also adopted SLURM job manager to do embarrassing parallels like training multiple models simultaneously.
Overheads
We realized some overhead in communication, load balancing, setup, and model training, then optimize them to an acceptable level.
Communication & Load Balancing & Setup
A main source of overheads is from Spark itself. The communication between the master node and worker nodes, load balancing of the task assignments, and setup of spark will take a long time when the data size is small.
We tried different numbers of nodes and cores and found the best setup is 1 node with 6 cores for data preprocessing speedup
We also increased the data size by getting more news data and more stock prices (20x our initial dataset). It decreased a lot the percent of the time that spark itself takes in the total execution time.
LSTM Training
One of the main sequential parts of our project is the training of LSTM. With our large dataset, we decreased the runtime of LSTM by using CUDA Deep neural network with GPU, and parallelizing training of multiple models on Harvard Cannon with SLURM job manager.
We decreased the runtime of LSTM by using CUDA Deep neural network with GPU, and parallelizing training of multiple models on Harvard Cannon.