Here each record shows a crime event published by Chicago Crime division , the first column is the case number , second one the time third address , forth the crime type etc.
The main objective of this analytics is to categorize crime type by zip so that we can get an overview of the type and number of crimes for each zip. This information can be used and loaded to identify place in Chicago where crime rate is low and inturn it will help us to make rational decisions as which place to move and where to buy property in Chicago city.
Flume Ingestion of Data into HDFS
This step shows how data is loaded into HDFS from Flume service. The following implementation is executed by Flume Agent
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data02/rawdata
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://12.cs1cloud.internal:8020/cleanseddata
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.rollInterval=2000
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.batchSize =500000
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Spark Analytics is coded as a Scala and Java Implementation. The business logic is as follows :
package com.idh.driver;import java.util.regex.Pattern;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class CrimeAnalyticsDriver {
private static Pattern COMMA = Pattern.compile(“,”);
public static void main(String[] args) {
try
{
System.out.println(“Initiating Crime Analytics Job…..”);
JavaPairRDD<String, Integer> pairInputData = input.map(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String, Integer> call(String arg0)
throws Exception {
String splits[] = COMMA.split(arg0);
String zip = splits[10];
String crime = splits[5];
return new Tuple2<String, Integer>(zip+”,”+crime, 1);
}
});
JavaPairRDD<String, Integer>reduceData = pairInputData.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
});
JavaRDD<String> resultData = reduceData.map(new Function<Tuple2<String,Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> arg0)
throws Exception {
return new String(arg0._1+”,”+arg0._2);
}
});
context.stop();
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
Loading data Into Hive Warehouse
The following script loads data into the Hive Warehouse
sudo -s <<EOFhadoop fs -rm -r -skipTrash /crimeAnalytics_CrimeCount_By_Zip/_SUCCESShive -e “drop table crimecountzip if exists”hive -e “create external table crimecountzip ( zip String ,type String,count String)row format delimited fields terminated by ‘,’ ”
hive -e “load data inpath ‘/crimeAnalytics_CrimeCount_By_Zip/’ into table crimecountzip”
EOF
cd /scripts
Following are the visualization charts provided for data visualization :