Skip to content

Instantly share code, notes, and snippets.

@GrigorievNick
Last active April 10, 2024 14:01
Show Gist options
  • Save GrigorievNick/bf920e32f70cb1cf8308cd601e415d12 to your computer and use it in GitHub Desktop.
Save GrigorievNick/bf920e32f70cb1cf8308cd601e415d12 to your computer and use it in GitHub Desktop.
Spark Structure Streaming GraceFullShutdown on Sigterm. Sigterm will not interrupt currently running batch, but due to asynс nature of SparkQueryListner.onProgres method, can interrupt next batch during first few moments.
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
/**
* Streaming Query stop, do not wait until batch ready.
* Just immediately cancel all spark job generated by stream, and interrupt query execution thread.
* This listener create a shutdown wait until current batch has finish, and try kill stream as fast as possible until next batch generated.
* But because on onQueryProgress is async, it's possible that next batch will start and will be canceled.
*/
class GracefulStopOnShutdownListener(streams: StreamingQueryManager) extends StreamingQueryListener {
private val log = LoggerFactory.getLogger(getClass)
private val runningQuery = new ConcurrentHashMap[UUID, (Runnable, SynchronousQueue[Boolean])]()
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val stream = streams.get(event.id)
val stopSignalChannel = new SynchronousQueue[Boolean]()
val shutdownHook: Runnable = () => {
if (stream.isActive) {
log.info(s"stop signal arrived,query ${stream.id} wait for until current batch ready")
val stopSignal = true
stopSignalChannel.put(stopSignal)
log.info(s"Send stop ${stream.id}")
stream.stop()
stream.awaitTermination()
log.info(s"Query ${stream.id} stopped")
}
}
ShutdownHookManager.get().addShutdownHook(shutdownHook, 100, 20, TimeUnit.MINUTES)
runningQuery.put(stream.id, (shutdownHook, stopSignalChannel))
log.info(s"Register shutdown hook for query ${event.id}")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
log.info(s"Query ${event.progress.id} batch ready " + event.progress.batchId)
val (_, stopSignalChannel) = runningQuery.get(event.progress.id)
stopSignalChannel.poll()
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
val (shutdownHook, stopSignalChannel) = runningQuery.remove(event.id)
log.info(s"Do shutdown hook for ${event.id} exist: " + ShutdownHookManager.get().hasShutdownHook(shutdownHook))
if (!ShutdownHookManager.get().isShutdownInProgress) ShutdownHookManager.get().removeShutdownHook(shutdownHook)
log.info(s"query ${event.id} shutdown, release hook.")
stopSignalChannel.poll()
}
}
import com.appsflyer.raw.data.ingestion.GracefulStopOnShutdownListener
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.util.Try
object Main {
val log = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val dummyTablePath = "/tmp/table_path"
import spark.implicits._
(0 to 100).toDF.write.mode(SaveMode.Overwrite).parquet(dummyTablePath)
val ingestion = new Thread(() =>
Try {
while (true) {
(0 to 2).map(i => i -> s"${i}DummyVal").toDF.write.mode(SaveMode.Append).parquet(dummyTablePath)
Thread.sleep(1000)
}
}
)
ingestion.start()
def runBatch(data: DataFrame, id: Long): Unit = {
spark.sparkContext.setJobDescription(id.toString)
log.info(s"start batch $id")
// Emulate long job execution
val res = data.rdd.map { x => Thread.sleep(10.seconds.toMillis); x }.take(2)
log.info(s"Batch $id size ${res.mkString("Array(", ", ", ")")}")
}
// comment to reproduce issue
spark.streams.addListener(new GracefulStopOnShutdownListener(spark.streams))
val stream = spark
.readStream
.schema(StructType(Seq(StructField("int", IntegerType))))
.option("maxFilesPerTrigger", "1")
.parquet(dummyTablePath)
.writeStream
.foreachBatch(runBatch _)
.start()
// Uncomment to reproduce issue
// val shutdownHook: Runnable = () => {
// if (stream.isActive) {
// log.info(s"stop signal arrived,query ${stream.id} wait for until current batch ready")
// stream.stop()
// stream.awaitTermination()
// log.info(s"Query ${stream.id} stopped")
// }
// }
// ShutdownHookManager.get().addShutdownHook(shutdownHook, 100, 10, TimeUnit.MINUTES)
// dummy hook to have ability check spark UI
ShutdownHookManager.get().addShutdownHook(() => Thread.sleep(1000000000), 99, 10, TimeUnit.MINUTES)
stream.awaitTermination()
println("finish")
}
}
@xuan1905
Copy link

xuan1905 commented Sep 9, 2022

This sounds promising.
Can the approach handle the case where I use the code above and implement "yarn application -kill " to stop the running streaming application?

@GrigorievNick
Copy link
Author

GrigorievNick commented Sep 9, 2022

@xuan1905, No, only -stop from YARN 3.x and higher.
The problem in YARN itself:
When you call kill - YARN sends SIGTERM to not only AM container but also shutdown all RPC and clears caches for this application on YARN master node.
So many calls, like new executor requests from Dynamic Resource allocation or job result state, do not work through error.
I personally test it only with dynamic resource allocation, so I never check what happens in Executor's containers.
But I think they also get SIGTERM, immediately and do not wait for the driver's Signal.

@xuan1905
Copy link

xuan1905 commented Sep 9, 2022

So could you tell me what's your suggestion on killing the streaming application when using your approach? As far as I know, some proposed creating a file and when it's deleted, the streaming query then context will be sequentially closed, as in https://github.com/kali786516/FraudDetection/blob/3763a862cc4ca85a30a8e5f8332e44bee44e2169/src/main/scala/com/datamantra/spark/GracefulShutdown.scala#L26. I'm finding a cleaner solution for this requirement.

@GrigorievNick
Copy link
Author

GrigorievNick commented Sep 9, 2022

@xuan1905 first of all, it;s already work with MESOS, YARN 3 and K8s.
it's do not work on YARN above 3.0.0x, becasue it does not have stop command and only kill.
Actually it's work, with any solution which send SIGTERM to DRIVER and wait untill Driver gracefull stopped. \

Regarding solution you mention, it's not gracefull.

  streamingQueries.map(query => {
          query.stop()
        })

query.stop will cancel any running job created by StructureStream.
So it's not gracefull, from my point of view.

@GrigorievNick
Copy link
Author

Regarding YARN 2, I wrote a two solutions to make gracefull shutdown.

  1. Just simple HTTP server, which use GET HTTP call, to perform System.exit()
  2. External File monitor(simple java Timer which read file content), which perform System.exit if file content changed to SHUTDOWN.
    Both have it's own issues:
  3. HTTP serice require Service Disicovery and chekc for open ports. So I use YARN API to find Node where AM container running.
  4. Fiile is additional state, which must be keeped synced with Spark APplication(Driver) state, which always a bad idea.
    But it's better than nothing. \

But I can't share them, because I need split them from other code which is under NDA.
But the idea itself is very simple - just call SIGTERM -> System.exit on Driver. And not from external place like YARN API.
So YARN AM(Driver) container will be treated as SUCEED/FAILED. With state FINISHED.

@xuan1905
Copy link

xuan1905 commented Sep 9, 2022

Thanks for your sharing.

By "it's do not work on YARN above 3.0.0x", did you mean "below 3.x.x"?
Besides, I received mixed results regarding the effectiveness of "spark.streaming.stopGracefullyOnShutdown". Have you tried it?

I wish to be clearer about the code. It would be great if you could give me answers about these questions.

  • Why did you add a boolean value to a queue in stopSignalChannel.put(stopSignal)?
  • Why was stopSignalChannel.put() followed by stream.stop()? Why do we stop the stream here?
  • Why did you remove the signal when streaming is in progress, by using stopSignalChannel.poll()? Does receving a new batch activate the onQueryProgress()?
  • Can you say more about onQueryTerminated()? I didn't quite understand what is "signal" here and where the resource appears in the flow of the methods.

@GrigorievNick
Copy link
Author

GrigorievNick commented Sep 9, 2022

By "it's do not work on YARN above 3.0.0x", did you mean "below 3.x.x"? \

Yes.

Besides, I received mixed results regarding the effectiveness of "spark.streaming.stopGracefullyOnShutdown". Have you tried it?

This config is for DStream of RDD, but do not related to structure streaming. Structure streaming conig prefix is spark.sql.streaming.X.

Why did you add a boolean value to a queue in stopSignalChannel.put(stopSignal)?

No matter what value, I will put there. It's just a way to synchornize two threads: thread which run SparkQueryListener and shutdown hook thread.

Why was stopSignalChannel.put() followed by stream.stop()? Why do we stop the stream here?

Structure Stream, physically is JVM thread on driver with while(true) loop inside. And event listener is running in async thread pool.
And Shutdown Hook thread is also another thread. What I want achieve:

  1. catch Sigterm with Shutdown hook thread. - this handled by JVM API.
  2. Register some notification, that current batch is last one and shutdown thread wait untill it finish to stop stream - handle by stopSignalChannel.put(stopSignal), this line will block shutdown hook thread untill stopSignalChannel.poll() will be called.
  3. Event Listener onQueryProgress catch state when batch is finish, and unblock shutdown hook thread using stopSignalChannel.poll().
  4. Shutdown hook thread call, stream stop and wait untill stream will be shutdown.

stopSignalChannel: SynchronousQueue work like sempahohre - you can change it to countDownLatch or Sempahore or any other lock mehcanic.
But I prefer SynchronousQueue, I find it more simple and clear. In my head, I image Actor mailbox, but with only one message in queue.

Important note:
Becasue Even Listner Thread and Stream thread are fully async to each other:
In theory, it can happen that next batch is already in running state when Listener onQueryprogress called.
But on practice, for 99.9% cases - createing batch in strcuture streaming take more(much more) then 1 second, which is more than enough to stop stream untill batch will create any job.

Can you say more about onQueryTerminated()? I didn't quite understand what is "signal" here and where the resource appears in the flow of the methods.

This function is important for two cases:

  1. When we have more than 1 Strcuture stream per Driver and want stop them independently.
  2. When Sigterm arrived, but stream was stopped by other reasson - logical stop command or exceptional case. So to avoid deadlock between shutdown hook thread and Event Listener thread.

Does receving a new batch activate the onQueryProgress()?

yes, but this is not important. What is important, that we call stopSignalChannel.put(signal) once per stream from shutdown hook thread.

@xuan1905
Copy link

xuan1905 commented Sep 10, 2022

Thanks for patiently answering me.
Some more naive questions though.

  1. Is it true that alone, "yarn -stop" can't make graceful shutdown, and only by combining with a further adjustment like your approach can it gracefully shutdown streaming applications?
  2. In onQueryTerminated(),
  • why did you remove the stream from the listener and remove the shutdownhook?
  • why implemented ShutdownHookManager.get().removeShutdownHook(shutdownHook)?
  • why stopSignalChannel.poll()?
  1. What's the relation between the listener thread and shutdownhook thread? How can they block each other (if they can)?

@GrigorievNick
Copy link
Author

GrigorievNick commented Sep 10, 2022

  1. Yes, yarn -stop just send sigterm to AM container. But in spark driver - it's your responsibility to shutdown internal statesull objects in correct order. By default spark session and spark context has it's own shutdown hook in correct order. So spark session immediately will start shutdown itself, when sigterm arrived. So we need and one more shutdown hook, with bigger priority to wait that streams finished, before session and context wi start it's own stop operation. That's why we use Hadoop shutdown manager wrapper and not just pure jvm shutdown hook thread .
  2. As I say in my previous message, there are two reasson to implement logic inside onQueryTerminated, but all reasson are valid only if structure stream life cycle is not equal to spark driver. Mean it's valid situation where one spark driver start/stop/restart one or more streams.
    • every time when stream started I created shutdown hook which wait for termination. So I want remove this hook, if stream stopped programmatically from driver and not by sigterm signal. For example I have application that monitor some file, and restart stream every time when it's changed. So in case I will not cleanup shutdown hooks, it will lead to memory leak.
    • another reason to remove shutdown hook is avoid deadlock. If stream was stopped because of exception or spark driver business logic and after stream stopped - sigterm arrived. In this situation shutdown hook will be called and block until next batch finished or stream finished... but there is no running stream -> so no one will trigger unblock -> So shutdown hook will be blocked forever. To avoid this, I need remove shutdown hook when stream terminated.
    • one more case with deadlock. If sigterm arrived at point of time when stream are already in the middle of stop operation, this will lead to situation that shutdown hook will be started and block until next batch finished. But no batch running, stream in the middle of stop operation. So some thing else must unblock shutdown hook stream. And that's why I use one more stopSignalChannel.poll to unblock shutdown hook thread in onQueryTerminated
    • this is how Java SynchronousQueue work. When call put thread blocked, until some other call take or poll. Please check Java doc to this class for more details.
  3. stopSignalChannel: SynchronousQueue. As I say just check how this class behave and what it's purpose. Briefly put block shutdown hook thread, until poll from other Listener thread called.

@austermann
Copy link

Thank you for sharing - very useful for me. I had to convert it to Java - so I wanted to share this, too.

import static java.util.Objects.requireNonNull;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Credits to https://gist.github.com/GrigorievNick/bf920e32f70cb1cf8308cd601e415d12
 */
public class GracefulStopOnShutdownListener extends StreamingQueryListener {

    private static final Logger logger = LoggerFactory.getLogger(GracefulStopOnShutdownListener.class);

    private final StreamingQueryManager streamingQueryManager;
    private final Map<UUID, Tuple> runningQuery = new ConcurrentHashMap<>();

    public GracefulStopOnShutdownListener(StreamingQueryManager streamingQueryManager) {
        this.streamingQueryManager = requireNonNull(streamingQueryManager);
    }

    @Override
    public void onQueryStarted(QueryStartedEvent event) {
        final var stream = streamingQueryManager.get(event.id());
        var stopSignalChannel = new SynchronousQueue<Boolean>();
        Runnable shutdownHook = () -> {
            if (stream.isActive()) {
                try {
                    logger.info("stop signal arrived, query {} wait for until current batch ready", stream.id());
                    stopSignalChannel.put(Boolean.TRUE);
                    logger.info("Send stop {}", stream.id());
                    stream.stop();
                    stream.awaitTermination();
                    logger.info("Query {} stopped", stream.id());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted", e);
                    Thread.currentThread().interrupt();
                } catch (StreamingQueryException e) {
                    logger.warn("Unexpected exception", e);
                    throw new RuntimeException(e);
                }
            }
        };
        ShutdownHookManager.get().addShutdownHook(shutdownHook, 100, 20, TimeUnit.MINUTES);
        runningQuery.put(stream.id(), new Tuple(shutdownHook, stopSignalChannel));
        logger.info("Registered shutdown hook for query {}", stream.id());
    }

    @Override
    public void onQueryProgress(QueryProgressEvent event) {
        logger.info("onQueryProgress: Query batch {} finished ", event.progress().batchId());
        var tuple = runningQuery.get(event.progress().id());
        tuple.stopSignalChannel.poll();
    }

    @Override
    public void onQueryTerminated(QueryTerminatedEvent event) {
        var tuple = runningQuery.remove(event.id());
        logger.info("Does shutdown hook for {} exist: {}", event.id(), ShutdownHookManager.get().hasShutdownHook(tuple.shutdownHook));
        if (!ShutdownHookManager.get().isShutdownInProgress()) ShutdownHookManager.get().removeShutdownHook(tuple.shutdownHook);
        logger.info("query {} shutdown, release hook.", event.id());
        tuple.stopSignalChannel.poll();
    }

    private static class Tuple {

        final Runnable shutdownHook;
        final SynchronousQueue<Boolean> stopSignalChannel;

        Tuple(Runnable shutdownHook, SynchronousQueue<Boolean> stopSignalChannel) {
            this.shutdownHook = shutdownHook;
            this.stopSignalChannel = stopSignalChannel;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment