Completing the Spark callback

I have a bash script that starts my EC2 Spark standalone cluster every night and runs the application. I would like to notice when the application has finished, so I can stop the cluster.

I was wondering if there are any callbacks based on the status of the spark application.

I'm pretty new to sparkle, so another hint of that will be clear to us.

Thank.

UPDATE:

With the json provided http://<master-host>:8080/metrics/master/json

or http://<master-host>:8080/metrics/applications/json

, I can get the status of the application (WAITING, RUNNING, FINISHED), but I cannot get the status of the driver, which will tell you if the execution is FAILED. I'm sure there must be a specific config to show to show this, but I couldn't find one.

To get this status, I gave up on the web interface provided in http://<master-host>:8080

to find the driver running my application and get its status.

enter image description here

+3


source to share


3 answers


spark-submit --status $submitID --master $master 2>&1 | grep "success"

      



+2


source


There is work to do by referring to Spark's internal scorecard.

Below terminal command gets metrics for current spark applications



curl -X GET "http://<spark master>:4040/metrics/json/"

      

This command can be executed from within a script and you can generate an alert if there are no applications running.

+1


source


Disclaimer: This example requires code changes, has some maintenance layout tolerances, and uses some of the inner Spark classes.

After reading about hidden rest-apis and trying to wrap the SparkSubmit class to receive Future objects, I found the SparkListener class . It has onJobStart / End, onApplicationStart / End, etc. For any desired granularity.

Here's a rough proof of concept for Jobs in the main application method:

//... build spark conf
val sparkContext = new SparkContext(sparkConf)
//programmatically register listener
sparkContext.addSparkListener(new SparkListener {

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    println(s"[ ${jobStart.jobId} ] Job started.")
  }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    println(s"[ ${jobEnd.jobId} ] Job completed with Result : ${jobEnd.jobResult}")
    //(our other services already rely on ActiveMQ)
    val connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616")
    val connection = connectionFactory.createConnection
    connection.setClientID("Client_" + Math.random())
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("job_queue.spark_job_completed")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val textMessage = session.createTextMessage( s"""{\"jobId\" : \"${jobEnd.jobId}\", \"jobResult\" : \"${jobEnd.jobResult}\"}""")

    producer.send(textMessage)

    connection.close
  }

  //api just sends the time :/
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    println(s"[ ${applicationEnd.time} ] Application Completed.")
  }
})
// ... do spark work

      

Our team had to notify external applications when Spark jobs / applications finished in Spark 1.5.2. In addition, Spark's user interface was not available without a lot of port forwarding, so it could now integrate with existing monitoring tools.

Sources:

0


source







All Articles