Apache Spark - Beyond Hello World

In our previous post about Apache Hadoop and Apache Spark we covered the initial Docker Compose setup and ended with a basic Spark Hello World. This post explores the possibilities of the Spark Shell, how to connect to an HDFS cluster and how to do some basic analytics with Spark.

Exploring the Spark Shell

The Spark Shell is a powerful tool to explore the Spark API.

Let’s start with creating a simple Dataset from a plain Scala Sequence:

scala> val dataset = Seq(("Hello", "Spark", "Shell")).toDS()

With show you get a pretty printed version of the freshly created data set:

scala> dataset.show
+-----+-----+-----+
|   _1|   _2|   _3|
+-----+-----+-----+
|Hello|Spark|Shell|
+-----+-----+-----+

It’s even possible to create a data container (a case class) on the fly from within the console:

scala> case class Person(name: String, age: Long)
defined class Person

scala> :type new Person("Bob", 34)
Person

Using such data containers will result in a nicely derived schema for your data set. There is a convenient way to print a schema (printSchema) to check the internal representation of a data set via the console:

scala> val persons = Seq(Person("Bob", 34), Person("Alice", 33)).toDS()

scala> persons.printSchema
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = false)

Let’s do some further interactive data processing

scala> persons.select("name").show()
+-------+
|   name|
+-------+
|    Bob|
|  Alice| 
+-------+

scala> persons.where("age > 33").show
+----+---+
|name|age|
+----+---+
| Bob| 34|
+----+---+

Persist HDFS data

In the previous setup the HDFS data was lost when restarting the Hadoop namenode/datanode containers.

To persist the data locally we modify the docker-compose.yaml to mount a directory relative to this yaml file. Both Docker services hadoop-namenode and hadoop-datanode mount an individual volume.

hadoop-namenode:
  volumes:
    - "./hadoop-dfs-name:/hadoop/dfs/name"
hadoop-datanode:
  volumes:
    - "./hadoop-dfs-data:/hadoop/dfs/data"

Let’s check the setup with the Spark Shell and store our Person data set as tmp/persons.csv in our HDFS cluster.

scala> persons.write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://hadoop-namenode/tmp/persons.csv")

There are additional options available like .mode("overwrite”) to fine tune the write operation.

Simply reading the csv file back you get a DataFrame with another schema and with generic column names like _c0.

scala> val dataFrame = spark.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("hdfs://hadoop-namenode/tmp/persons.csv")
scala> dataFrame.show
+-----+---+
| name|age|
+-----+---+
|Alice| 33|
|  Bob| 34|
+-----+---+

scala> dataFrame.printSchema
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

Running the import with .option("inferSchema", "true”) yields a slightly better result:

scala> dataFrame.printSchema
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

You’ll need to specify .option("header", "true”) when writing and loading the csv.

Scala HdfsTest

In the previous chapter we tested the overall setup with the Spark shell. This time we’ll use Scala. If you are completely new to Scala you may want to have a look at Hello Scala World or the official Getting Started.

A good point to check the HDFS setup is the Spark example HdfsTest.scala.

We run a slightly modified version to match our setup:

package sandbox

import org.apache.spark.sql.SparkSession

object HdfsExample {

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder.master("spark://localhost:7077")
      .appName("HdfsExample")
      .getOrCreate()

    spark.read.option("header", "true")
      .csv("hdfs://hadoop-namenode/tmp/persons.csv")
      .show()

    spark.stop()
  }
}

Within your build.sbt check the scalaVersion and sparkVersion. You’ll most likely run into trouble if you use a different Spark version on your cluster than in your development environment.

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.1",
  "org.apache.spark" %% "spark-sql" % "2.1.1"
)

When running the example from within your IDE you should see the same results as previously in the Spark shell:

+-----+---+
| name|age|
+-----+---+
|Alice| 33|
|  Bob| 34|
+-----+---+

What’s next? Not decided yet. We’ll keep you posted. ;-)