Rdd.checkpoint is skipped in spark work

Hi, I am trying to run a long spark that often fails due to StackoverflowError. The job reads the parquet file and creates rdd in a foreach loop. After some research I thought that creating a checkpoint for each rdd would help me solve my memory problems. (I tried various memory, memory overhead, concurrency, rework and found the most working settings for the job, however sometimes it still fails depending on the load on our cluster.)

Now to my real problem. I am trying to create checkpoints by first reading the parquet, creating an RDD, then caching it, triggering a checkpoint function, and then calling an action first to make a checkpoint. No breakpoints are created in the path I specified, and the YARN UI says the step is skipped. Can anyone help me understand the problem :)

  ctx.getSparkContext().setCheckpointDir("/tmp/checkpoints");
    public static void writeHhidToCouchbase(DataFrameContext ctx, List<String> filePathsStrings)  {
    filePathsStrings
        .forEach(filePath -> {
          JavaPairRDD<String, String> rdd =
              UidHhidPerDay.getParquetFromPath(ctx, filePath);
          rdd.cache();
          rdd.checkpoint();
          rdd.first();
          rdd.foreachPartition(p -> {
            CrumbsClient client = getClient();
            p.forEachRemaining(uids -> {
              Crumbs crumbs = client.getAsync(uids._1)
                  .timeout(10, TimeUnit.SECONDS)
                  .toBlocking()
                  .first();
              String hHid = uids._2;
              if (hHid != null) {
                crumbs.getOrCreateSingletonCrumb(HouseholdCrumb.class).setHouseholdId(hHid);
                client.putSync(crumbs);
              }
            });
            client.shutdown();
          });
        });
}

      

The checkpoint is created once in the first iteration, but never again. KR

+3


source to share


1 answer


My mistake: the partitions are actually created. The "first" section I mentioned above is a directory with internal sections. Due to the directory name such as 8f987639-d5c8-46b8-a1e0-37081f9f8e00 I was confused. However, looking at the ruler comment from @ImDarrenG I gave a few more insights. I created a new oversized RDD from the first one I cached and checkpoint. This gave the application more stable and no crashes.



JavaPairRDD<String, String> rdd =
          UidHhidPerDay.getParquetFromPath(ctx, filePath);
      rdd.cache();
      rdd.checkpoint();
      rdd.first();
      JavaPairRDD<String, String> rddToCompute = rdd.repartition(72);
      rddToCompute.foreachPartition...

      

+1


source







All Articles