How to configure Flink to use S3 for backend state and checkpoints

I have setup with Flink v1.2, 3 JobManagers, 2 TaskManagers. I want to use S3 bucket instead of hdfs for backend state and checkpoints and zookeeper storageDir

fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]

state.backend: filesystem

state.backend.fs.checkpointdir: s3: /// [bucket] / flink-checkpoints
state.checkpoints.dir: s3: /// [bucket] / external checkpoints
high availability: zookeeper
high availability.zookeeper.storageDir: s3 : /// [bucket] / recovery

In JobManager log I have

2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN  org.apache.hadoop.security.UserGroupInformation               - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.

      

I don't have hadoop installed. Not sure if this is needed and if so / should it be installed / configured?

Edit: After setting up Flink with the following hasoop xml (core-site.xml) I didn't really understand the IAM part, and I am not using EMR, I set up the cluster myself (in AWS) to be able to update Flink without depending on the image:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3://[ bucket ] </value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsAccessKeyId</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsSecretAccessKey</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3n.awsAccessKeyId</name>
        <value>[ Access Key ]</value>
    </property>

    <property>
        <name>fs.s3n.awsSecretAccessKey</name>
        <value>[ Secret Key ]</value>
    </property>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3.buffer.dir</name>
        <value>/tmp</value>
    </property>
</configuration>

      

I am getting this error:

  2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
        at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
        at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
        at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

      

Edit: My mistake. I have set the key in the description field instead of the value.

+3


source to share


2 answers


Refer to the guide when starting Flink with S3 on how to set up S3.



I think you are missing the hadoop config file with the fs.s3.impl config key. Even though you are not using Hadoop, you still need to use the Hadoop config file.

+1


source


Robert, I would recommend that people use the "s3a" filesystem which, if haop-aws and JAR-amazons were installed, would be self-registering. S3: // URLs are deprecated in the ASF codebase even though EMR is still using them.



+1


source







All Articles