Tuesday, 31 May 2016

Apache Spark Streaming : How to do Graceful Shutdown




In my current project, I am using Spark Streaming as processing engine , Kafka as data source and Mesos as cluster /resource manager.
To be precise, i am using Direct Kafka Approach in spark for data ingestion.
Once a streaming application is up and running, there will be multiple things to do to make it stable ,consistent and seamless.
One of them is ensuring Graceful Shutdown to avoid data loss. In cases of restarting Streaming application, deploying changes, etc we have to ensure that the shutdown happens gracefully and in consistent state. It means that once the application receives shutdown signal, it should not accept any more data for processing but at the same time, it should make sure to process all the data/jobs for the current Kafka offsets in memory to get processed before bringing the application down. When the application restarts, it will read the Kafka offset from the checkpoint directory and start getting the data from kafka accordingly for processing.

In this post, i am going to share details how to do graceful shutdown of Spark Streaming application.
There are 2 ways   :
1. Explicitly calling the shutdown hook in driver program : 

       sys.ShutdownHookThread
         {
            log.info("Gracefully stopping Spark Streaming Application")
            ssc.stop(true, true)
            log.info("Application stopped")
          }


The ssc.stop method’s 1st boolean argument is for stopping the associated spark context while the 2nd boolean argument is for graceful shutdown of streaming context.
      I tried this above approach in my spark application with version 1.5.1 but it did not work. The streaming application was shutting down gracefully but the spark context remained alive or lets say hung. The driver and executor processes were not getting exited. I had to use kill -9 command to forcefully terminate the spark context(which kills driver and executors ).
Later, i found out that this approach is old and was used for spark version before 1.4 . For new spark versions, we use the 2nd approach.

2. spark.streaming.stopGracefullyOnShutdown parameter :
        sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true") 
        Setting this parameter to True in spark configuration ensures the proper graceful shutdown in new Spark version (1.4 onwards) applications. Also we should not use 1st explicit shutdown hook approach or call the ssc.stop method in the driver along with this parameter . We can just set this parameter, and then call methods ssc.start() and
ssc.awaitTermination() . No need to call ssc.stop method. Otherwise application might hung during shutdown.
Please look at the spark source code for knowing how this parameter is used internally : https://github.com/apache/spark/blob/8ac71d62d976bbfd0159cac6816dd8fa580ae1cb/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L732

How to pass Shutdown Signal :
Now we know how to ensure graceful shutdown in spark streaming. But how can we pass the shutdown signal to spark streaming. One naive option is to use CTRL+C command at the screen terminal where we run driver program but obviously its not a good option.
One solution , which i am using is , grep the driver process of spark streaming and send a SIGTERM signal . When driver gets this signal, it initiates the graceful shutdown of the application.
We can write the command as below in some shell script  and run the script to pass shutdown signal :
ps -ef | grep spark |  grep <DriverProgramName> | awk '{print $2}'   | xargs kill  -SIGTERM
e.g. ps -ef | grep spark |  grep DataPipelineStreamDriver | awk '{print $2}'   | xargs kill  -SIGTERM

One limitation of this approach is that it can be run only on the same machine on which driver program was run and not on any other node machine of the spark cluster.

If you come to know any of the better approach, please do share.

15 comments:

  1. great post dude.. keep rocking... :)

    ReplyDelete
  2. If you are running in a yarn-client or yarn-cluster mode, you can issue an yarn application -kill applicationId command?

    You can get the list of running applications using yarn application -list and parse out the applicationId for your streaming job.

    ReplyDelete
  3. Once spark application is started, how do i gracefully terminate it by using sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true")

    ReplyDelete
  4. I am also facing the same issue.I followed the same steps as you mentioned but unable to get the executor logs.File also not creating.I am using single node cluster.
    Could you please suggest us.

    ReplyDelete
  5. When running in yarn cluster mode: yarn application -kill applicationId is getting the application to a grinding hault. Anyway where we can stop application gracefully when run in yarn cluster mode?

    ReplyDelete
    Replies
    1. vishhhhh - see this blog post for a strategy to handle graceful shutdown on YARN: http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/

      Delete
  6. Is there anyway to perform an action, such as run through each RDD and count a distinct on the final values and then persist that to disk before shutting down? It seems like sending the sigterm kills the stream and I receive " Adding new inputs, transformations, and output operations after stopping a context is not supported"

    ReplyDelete
  7. Apache Spark streaming concepts are very clear in this article and i am clear in this topic at after reading this article.. thank you for sharing this informative concepts to us

    hadoop training institute in chennai velachery | big data training institute in chennai velachery

    ReplyDelete
  8. Great and helpful blog to everyone.. Before reading this blog i have dont have a proper idea about hadoop but now i am very strong in topic which really helpful to update my knowledge of big data.. thanks a lot for sharing this blog to us..

    best big data training in chennai | best hadoop training

    ReplyDelete
  9. Hi,
    using yarn -kill we've experimented some weird behaviors like phantom processes kept on driver or yarn not aware of application end (telling a job is still running while it's finished since hours).

    So we decided to add soft kills :
    - the stop script sends first a stop request ( message in a topic) to application. It waits for a given amount of time to let a chance to end by itself, and only then, sends a sigterm kill.
    - in code we regularly poll for stop request in a separate thread.
    With this, we've get rid of phantom applications in YARN.

    ReplyDelete

  10. Wonderful blog.. Thanks for sharing informative Post. Its very useful to me.

    Installment loans
    Payday loans
    Title loans

    ReplyDelete
  11. Hello!
    The information you provided in this article is very useful to me. As a newbie, i learned a lot about Apache Spark Streaming from your blog. Thanks for sharing this information on Apache spark Streaming.

    ReplyDelete
  12. Hi,

    I submitted the spark job without any streaming and cluster.
    No Master or worker nodes. Any idea how to gracefully shutdown spark in this case.

    ReplyDelete