Jul 23, 2020

How to use Spark Checkpoints on new Clusters

Resuming spark streaming application from stored checkpoint in s3 results in NoRouteToHostException

Often orgainisations use HDFS for storing the spark checkpoints and move them hose checkpoints periodically to S3. These checkpoint backups are done as a fail proof mechanism that will help Spark jobs to resume based on the most recent checkpoint. However these checkpoints are cluster specific and you cannot resume the spark jobs using them on a different Cluster. So what do you do if the cluster itself fails ? Now let's say you have the checkpoints stored in s3 and you can resume your application from there .Hmm , sounds straightforward but I would request you to give it a go and come back to this blog later .

If you are still here that it means your application failed and you got NoRouteToHostException. let's undertstand why they fail and why you get thejava.net.NoRouteToHostException: No route to host .

I first run this in my cluster-1 (master ip ip-172-31-45-105.ec2.internal)

spark-submit --master local[4] /usr/lib/spark/examples/src/main/python/streaming/recoverable_network_wordcount.py localhost 9999 /user/hadoop/checkpointtest2 checkpointout1.txt

The application was run in the local mode and the checkpointing was happening at the hdfs location '/user/hadoop/checkpointtest2' and the application was listening to the port 9999. I ran this application for few minutes by giving the input works through 'nc -lk 9999' command. The checkpoint files along with the write ahead log file (receivedBlockMetadata) was getting saved in the checkpointtest2 directory.

I then copied the contents of the checkpoint directory to my s3 bucket using the s3-dist-cp command. Then in the cluster-2 (master ip ip-172-31-32-104.ec2.internal) I again copied these checkpoint files along with the WAL logs to the hdfs location '/user/hadoop/sparkstreaming/'

I then ran the same job with the following arguments

spark-submit --master local[4] /usr/lib/spark/examples/src/main/python/streaming/recoverable_network_wordcount.py localhost 9999 /user/hadoop/sparkstreamtest sparkstreamtest.txt

After running the above application I got the following logs from the terminal,

"20/06/22 09:09:23 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 1 write ahead log files from hdfs://ip-172-31-45-105.ec2.internal:8020/user/hadoop/checkpointtest2/receivedBlockMetadata
20/06/22 09:09:23 INFO ReceivedBlockTracker: Recovering from write ahead logs in hdfs://ip-172-31-45-105.ec2.internal:8020/user/hadoop/checkpointtest2
20/06/22 09:09:23 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Reading from the logs:
hdfs:

Notice that the application is trying to connect to the previous cluster's hdfs data to replay the the WAL logs. Here the ip-172-31-45-105.ec2.internal is the namenode of the cluster-1.

Now let us try runnig the application with WAL disabled.

spark-submit --conf spark.streaming.receiver.writeAheadLog.enable=false --master local[4] /usr/lib/spark/examples/src/main/python/streaming/recoverable_network_wordcount.py localhost 9999 /user/hadoop/sparkstreamtest sparkstreamtest.txt

This time the checkpoint write was directly happening in the cluster-1 and even the output file sparkstreamtest.txt was changed to checkpointout1.txt (cluster-1 file) and this file was saving all the outputs.

"20/06/22 11:39:34 INFO ReceivedBlockTracker: Possibly processed batch 1592816309000 ms needs to be processed again in WAL recovery
20/06/22 11:39:34 INFO CheckpointWriter: Deleting hdfs://ip-172-31-45-105.ec2.internal:8020/user/hadoop/checkpointtest2/checkpoint-1592816263000
....
20/06/22 11:39:34 INFO CheckpointWriter: Checkpoint for time 1592816263000 ms saved to file 'hdfs://ip-172-31-45-105.ec2.internal:8020/user/hadoop/checkpointtest2/checkpoint-1592816263000', took 8327 bytes and 41 ms
...
Counts at time 2020-06-22 08:57:44 []
Dropped 0 word(s) totally
Appending to /home/hadoop/checkpointout1.txt
...
20/06/22 11:39:34 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Attempting to clear 0 old log files in hdfs://ip-172-31-45-105.ec2.internal:8020/user/hadoop/checkpointtest2/receivedBlockMetadata older than 1592816262000"

As you can see from the above logs the checkpoint files are retaining the previous cluster configuration and arguments and are continuing to communicate to the cluster-1 even though the spark-submit command was s on cluster-2. This is because when spark streaming savse checkpoint, it saves the checkpoint directory name inside the checkpoint files. This can be seen in this document [1], and also can be see in spark code CheckPoint.scala line 41 [2] where it shows checkpointDir is from streaming context.

This checkpointDir is given to streaming context in function checkpoint() in StreamingContext.scala line 236 [3]. As can be seen from the code, it creates hdfs directory using mkdir() then get the fullpath from the path and give it to checkpointDir

val path = new Path(directory)
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
fs.mkdirs(path)
val fullPath = fs.getFileStatus(path).getPath().toString
sc.setCheckpointDir(fullPath)
checkpointDir = fullPath

This is the reason why the checkpointwriter was writing to cluster-1 checkpoint directory (even though we gave checkpoint directory as /user/hadoop/sparkstreamtest) and the output file was writing to checkpointout1.txt (even though we gave the output file as sparkstreamtest.txt in the cluster-2 spark-submit). From the above tests spark streams stores 2 types of checkpointing data,

1.Metadata checkpointing - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application.

2.Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches.

More details can be found from the link [4]. So when you have terminated your old cluster you get below exception as the old cluster was already terminated.

Therefore current process that save checkpoint from clusterA to s3 then restore it from s3 to clusterB will not work.

So now you have an underdstanding of the issue let us see what can be done to address this issue .

We can make both the cluster use the same namenode URL. This can be done by putting a common hostname in /etc/hosts file on all nodes.

E.g. My cluster A master node has url ip-172-31-2-48.ap-southeast-2.compute.internal and cluster B master node has ip-172-31-10-66.ap-southeast-2.compute.internal. So in /etc/hosts file on ALL nodes in cluster A, I edit the file as below,

172.31.2.48 emr-master

Then in /etc/hosts file on ALL nodes in cluster B, I set it as

172.31.10.66 emr-master

Also /etc/hadoop/conf/core-site.xml on all nodes on both cluster should be modified as well. cluster A: set fs.defaultFS to hdfs://emr-master:8020 cluster B: set fs.defaultFS to hdfs://emr-master:8020 his way both cluster A and cluster B can use same URL hdfs://emr-master/ to access their own hdfs, though the hostname resolves to different IP. And the hdfs namenode URL saved in checkpoint file will also be hdfs://emr-master:8020/, which can be recognized by both clusters.

Now you can do the operation that create checkpointin cluster A -> copy it to S3 -> copy checkpoint to cluster B -> start streaming on cluster B.

Other option to make it work in your environment is to save the checkpint to a shared file system e.g. AWS EFS [5]. EFS is a NFS-compatible shared file system. You can mount the EFS to all nodes in the EMR cluster, then specify checkpoint directory using file:///.

References

  1. Search The checkpoint directory as checkpointDir
  2. Checkpoint.scala
  3. StreamingContext
  4. Checkpointing
  5. EFS

«  India China StandoffHow to enable JVM metrics for Hbase  »