Tame the yellow elephant with Spark

Sparky Spark

Big Data and Hadoop were great buzzwords in the previous years. The whole Big Data Boom was started off from some papers published by Google, describing a programming model called map-reduce. Since then a whole ecosystem of tools became available for data analysts and architects to play with in order to handle the huge amounts of data generated every minute. One of the most popular tools is Apache Spark. Spark is a cluster computing framework and now we are going to scratch the surface of big data analytics by trying out some simple Spark programs.

Spark basics

Spark is a very flexible framework, it can run on a cluster managed by itself, over YARN or on a single computer as well. In our examples we are going to use the single machine settings for convenience.

One of the main concepts of Spark is called an RDD. RDD stands for resilient distributed dataset (obviously). These can be think of like a regular Java collection but they are stored in a distributed way in the memory of the computers making up the cluster. In Spark programs data is stored in RDDs, we can make transformations on them that produce new RDDs and so on.

Spark is mainly written in Scala and also uses a functional programming model where we can apply different functions on the RDDs as I have mentioned before. There are two main types of these functions: transformations and actions. All Spark transformations produce a new RDD and are lazily evaluated. This means that no transformation will be done until an action is called. By using this method, Spark can create a graph from the chain of functions and optimize it for best performance. Transformations include map, mapToPair, filter, join and many other functions. Actions create some sort of aggregation on the dataset like reduce, collect, count and so on. Actions obviously won't produce new RDDs, but some kind of a value.

Advanced material

Spark has many libraries that offer extra functionalities for some use cases. These include machine learning, graph processing, SQL and streaming libraries. Streaming is very interesting because it makes it possible to do low latency computations in a Hadoop environment where previously only big and bulky map-reduce jobs could run for hours. Spark Streaming uses customizable sized micro batches to provide streaming data processing. I will cover this very interesting area in a separate post, but first we have to get familiar with simple Spark applications trough some examples.

Examples

And now the fun part begins. For the following examples I have used the MovieLens dataset which can be downloaded free of charge. The dataset has 3 important parts: a movies, a users and a ratings data file. We are going to use the ratings file for now which is in csv like format and has the following structure: UserID::MovieID::Rating::Timestamp

Our first spark example

The first thing we do is very simple. We are going to compute the distribution of the ratings. How many were 5 star, 4 and so on. I'm going to provide the source in Java (version 8 because of Lambdas). So take a look:

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import scala.Tuple2;

public class SparkRatingsCount {

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: SparkRatingsCount <input-file> <output-folder>");
            System.exit(1);
        }

        final String outputPath = args[1];
        SparkConf sparkConf = new SparkConf().setAppName("SparkRatingsCount").setMaster("local");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);

        // line example: 1::2804::5::978300719
        JavaRDD<String> lines = ctx.textFile(args[0], 1);

        JavaPairRDD<String, Integer> ratingOnePairs = lines.mapToPair(s -> new Tuple2<>(s.split("::")[2], 1));

        JavaPairRDD<String, Integer> results = ratingOnePairs.reduceByKey((i1, i2) -> i1 + i2);

        results.saveAsTextFile(outputPath);

        ctx.stop();
    }
}

Okay so we have some boring imports and a class definition. The structure is a basic Java program with a main method. In Spark this is called the driver program which initializes the environment and manages the flow of data and processing.
After we have checked that every necessary arguments are there we can set up the Spark environment. We should give a name to the application and set Spark Master. This latter setting is very important. By setting it local you can run spark programs locally on your own pc, this is great for testing. If you set it to yarn-cluster, the application is capable to run on a YARN cluster.
By using the ctx.textFile() call we read the input file into an RDD. In the next step we apply a mapToPair operation on this dataset, mapping each element to a key-value pair of the actual rating and the number 1. To count each rating we have to use the reduceByKey method which, in this case sums all the 1s for each key. The only thing left is to save the results to another file, this is done by the saveAsTextFile() method.

A tougher excersise

Using the same dataset we can calculate the average rating given by each user.

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import scala.Tuple2;

public class SparkUserAvgRatings {

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: SparkDistinctUsers <input-file> <output-folder>");
            System.exit(1);
        }

        final String outputPath = args[1];
        SparkConf sparkConf = new SparkConf().setAppName("SparkDistinctUsers").setMaster("local");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);

        // line example: 1::2804::5::978300719
        JavaRDD<String> lines = ctx.textFile(args[0], 1);

        // userid, rating 
        JavaPairRDD<Integer, Integer> ratings = lines.mapToPair(s -> new Tuple2<>(Integer.parseInt(s.split("::")[0]), Integer.parseInt(s.split("::")[2])));

        // userid, (rating, 1)
        JavaPairRDD<Integer, Tuple2<Integer, Integer>> mappedRatings = ratings.mapValues(x -> new Tuple2<>(x, 1));

        // userid, (ratingsum, ratingcount)
        JavaPairRDD<Integer, Tuple2<Integer, Integer>> reduced = mappedRatings.reduceByKey((x, y) -> new Tuple2<>(x._1 + y._1, x._2 + y._2));

        reduced.mapToPair(x -> new Tuple2<>(x._1, (double)(x._2._1) / x._2._2)).saveAsTextFile(outputPath);

        ctx.stop();
    }
}

This program starts similarly to the previous one. After reading the data from the file we map each row to a key-value pair where the key is the userid and the value is the rating he or she has given to a movie. In the next step we map the values further to so now they become k-v pairs themselves with the rating as a key and a constant 1 as a value. Now we can reduce our RDDs by key. This means we sum up all the rating values and 1s for each userid. Computing the average rating from here is easy, we map our RDD to a simple k-v pair keeping the userid as the key and using the summed up rating values divided by the count of the ratings (the average in short) as the value.

It may take some time to get used to the ways of functional programming but Spark is such a powerful tool that it is a waste not to use it when dealing with huge amounts of data in a cluster environment on or a single PC.