In our previous post about Apache Hadoop and Apache Spark we covered the initial Docker Compose setup and ended with a basic Spark Hello World. This post explores the possibilities of the Spark Shell, how to connect to an HDFS cluster and how to do some basic analytics with Spark.
Exploring the Spark Shell
The Spark Shell is a powerful tool to explore the Spark API.
Let’s start with creating a simple Dataset from a plain Scala Seq
uence:
scala> val dataset = Seq(("Hello", "Spark", "Shell")).toDS()
With show
you get a pretty printed version of the freshly created data set:
scala> dataset.show
+-----+-----+-----+
| _1| _2| _3|
+-----+-----+-----+
|Hello|Spark|Shell|
+-----+-----+-----+
It’s even possible to create a data container (a case class
) on the fly from within the console:
scala> case class Person(name: String, age: Long)
defined class Person
scala> :type new Person("Bob", 34)
Person
Using such data containers will result in a nicely derived schema for your data set.
There is a convenient way to print a schema (printSchema
) to check the internal representation of a data set via the console:
scala> val persons = Seq(Person("Bob", 34), Person("Alice", 33)).toDS()
scala> persons.printSchema
root
|-- name: string (nullable = true)
|-- age: long (nullable = false)
Let’s do some further interactive data processing
scala> persons.select("name").show()
+-------+
| name|
+-------+
| Bob|
| Alice|
+-------+
scala> persons.where("age > 33").show
+----+---+
|name|age|
+----+---+
| Bob| 34|
+----+---+
Persist HDFS data
In the previous setup the HDFS data was lost when restarting the Hadoop namenode
/datanode
containers.
To persist the data locally we modify the docker-compose.yaml
to mount a directory relative to this yaml
file.
Both Docker services hadoop-namenode
and hadoop-datanode
mount an individual volume.
hadoop-namenode:
volumes:
- "./hadoop-dfs-name:/hadoop/dfs/name"
hadoop-datanode:
volumes:
- "./hadoop-dfs-data:/hadoop/dfs/data"
Let’s check the setup with the Spark Shell and store our Person
data set as tmp/persons.csv
in our HDFS cluster.
scala> persons.write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://hadoop-namenode/tmp/persons.csv")
There are additional options available like .mode("overwrite”)
to fine tune the write operation.
Simply reading the csv
file back you get a DataFrame with another schema and with generic column names like _c0
.
scala> val dataFrame = spark.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("hdfs://hadoop-namenode/tmp/persons.csv")
scala> dataFrame.show
+-----+---+
| name|age|
+-----+---+
|Alice| 33|
| Bob| 34|
+-----+---+
scala> dataFrame.printSchema
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
Running the import with .option("inferSchema", "true”)
yields a slightly better result:
scala> dataFrame.printSchema
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
You’ll need to specify .option("header", "true”)
when writing and loading the csv
.
Scala HdfsTest
In the previous chapter we tested the overall setup with the Spark shell. This time we’ll use Scala. If you are completely new to Scala you may want to have a look at Hello Scala World or the official Getting Started.
A good point to check the HDFS setup is the Spark example
HdfsTest.scala
.
We run a slightly modified version to match our setup:
package sandbox
import org.apache.spark.sql.SparkSession
object HdfsExample {
def main(args: Array[String]) {
val spark = SparkSession
.builder.master("spark://localhost:7077")
.appName("HdfsExample")
.getOrCreate()
spark.read.option("header", "true")
.csv("hdfs://hadoop-namenode/tmp/persons.csv")
.show()
spark.stop()
}
}
Within your build.sbt
check the scalaVersion
and sparkVersion
.
You’ll most likely run into trouble if you use a different Spark version on your cluster than in your development environment.
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.1",
"org.apache.spark" %% "spark-sql" % "2.1.1"
)
When running the example from within your IDE you should see the same results as previously in the Spark shell:
+-----+---+
| name|age|
+-----+---+
|Alice| 33|
| Bob| 34|
+-----+---+
What’s next? Not decided yet. We’ll keep you posted. ;-)