Cannot programmatically send Spark app (with Cassandra connector) to cluster from remote client

I am running a standalone Spark cluster on EC2 and I am writing an application using the Spark-Cassandra connector driver and trying to programmatically submit a job to the Spark cluster. The work itself is simple:

public static void main(String[] args) {
    SparkConf conf;
    JavaSparkContext sc;
    conf = new SparkConf()
            .set("spark.cassandra.connection.host", host);
    conf.set("spark.driver.host", "[my_public_ip]");
    conf.set("spark.driver.port", "15000");
    sc = new JavaSparkContext("spark://[spark_master_host]","test",conf);
    CassandraJavaRDD<CassandraRow> rdd = javaFunctions(sc).cassandraTable(
            "keyspace", "table");
    System.out.println(rdd.first().toString());
    sc.stop();
}

      

Which works when I run this in the Spark Master node of my EC2 cluster. I am trying to run this on a remote windows client. The problem was with these two lines:

    conf.set("spark.driver.host", "[my_public_ip]");
    conf.set("spark.driver.port", "15000");

      

First, if I comment these two lines, the application will not throw an exception, but the Executor will not work with the following log:

14/12/06 22:40:03 INFO client.AppClient$ClientActor: Executor updated: app-20141207033931-0021/3 is now LOADING

14/12/06 22:40:03 INFO client.AppClient$ClientActor: Executor updated: app-20141207033931-0021/0 is now EXITED (Command exited with code 1)

14/12/06 22:40:03 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141207033931-0021/0 removed: Command exited with code 1

      

Which never ends when I check the worker node log I found:

14/12/06 22:40:21 ERROR security.UserGroupInformation: PriviledgedActionException as:[username] cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs    
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) 
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:113)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:156)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
... 4 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:125)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
        ... 7 more

      

I have no idea what this means, my guess is that maybe the worker node was unable to connect to the driver, which was probably originally installed as:

14/12/06 22:39:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@[some_host_name]:52660]
14/12/06 22:39:30 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@[some_host_name]:52660]

      

Obviously no DNS will resolve my hostname ...

Since I am unable to set the deployment mode to "client"

or "cluster"

if not via a ./spark-submit

script. (Which I find absurd ...). I am trying to add host permission "XX.XXX.XXX.XX [host-name]"

to /etc/hosts

all Spark workstation nodes.

No luck, of course ... This brings me to the second, un-comment, which is two lines;

Which gives me:

14/12/06 22:59:41 INFO Remoting: Starting remoting
14/12/06 22:59:41 ERROR Remoting: Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
        at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
        at akka.remote.Remoting.start(Remoting.scala:194)
        ...

      

Cause:

Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: /[my_public_ip]:15000
        at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:391)
        at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:388)

      

I double checked my firewall settings and router settings, confirmed my firewall is disabled; and netstat -an

to confirm that port 15000 is not in use (actually I tried to go to multiple available ports with no luck); and i have ping

my public ip from both machines and machines from my cluster, no problem.

Now I am completely screwed up, I will just run out of trying to fix it. Any suggestions? Any help is appreciated!

+3


source to share


1 answer


Please check if 15000 is in your security group.



-2


source







All Articles