EMR MapReduce error: Java heap space

I mostly read every post on SO, but I still have memory issues. I use Airflow to instantiate my work in EMR and everything went great until two days ago each job ran out of memory. Jobs read from the S3 bucket and then I do some aggregation and then I save it back to S3. I have two scheduled job types - hourly and daily - in which I give different settings as shown below:

public static Configuration getJobConfiguration(String interval) {

    Configuration jobConfiguration = new Configuration();

    jobConfiguration.set("job.name", "JobImport Job");

    jobConfiguration.set("mr.mapreduce.framework.name", "yarn);

    jobConfiguration.set("fs.defaultFS", "hdfs://xxxx:8020");
    jobConfiguration.set("dfs.client.use.datanode.hostname", "true");
    jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
    jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.best-effort", "true");

    if (interval.equals("hourly")) {
        jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "1000000");
        jobConfiguration.set("mapreduce.reduce.memory.mb", "6144"); // max memory for reducer
        jobConfiguration.set("mapreduce.map.memory.mb", "4098");    // max memory for mapper
        jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx4098m"); // reducer java heap
        jobConfiguration.set("mapreduce.map.java.opts", "-Xmx3075m");    // mapper java heap
    }
    else {
        jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "20000000");
        jobConfiguration.set("mapreduce.reduce.memory.mb", "8192"); // max memory for reducer
        jobConfiguration.set("mapreduce.map.memory.mb", "6144");    // max memory for mapper
        jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx6144m"); // reducer java heap
        jobConfiguration.set("mapreduce.map.java.opts", "-Xmx4098m");    // mapper java heap
    }

    return jobConfiguration;
}

      

My mapper and Reduce looks like this:

private JsonParser parser = new JsonParser();
private Text apiKeyText = new Text();
private Text eventsText = new Text();

@Override
public void map(LongWritable key, Text value, Context context) {

    String line = value.toString();
    String[] hourlyEvents = line.split("\n");

    JsonElement elem;
    JsonObject ev;

    try {
        for (String events : hourlyEvents) {

            elem = this.parser.parse(events);
            ev = elem.getAsJsonObject();

            if(!ev.has("api_key") || !ev.has("events")) {
                continue;
            }

            this.apiKeyText.set(ev.get("api_key").getAsString());
            this.eventsText.set(ev.getAsJsonArray("events").toString());

            context.write(this.apiKeyText, this.eventsText);
        }
    } catch (IOException | InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}

// ------------------
// Separate class
// ------------------

private JsonParser parser = new JsonParser();
private Text events = new Text();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
    try {

        JsonObject obj = new JsonObject();
        JsonArray dailyEvents = new JsonArray();

        for (Text eventsTmp : values) {
            JsonArray tmp = this.parser.parse(eventsTmp.toString()).getAsJsonArray();
            for (JsonElement ev: tmp) {
                dailyEvents.add(ev);
            }
        }

        obj.addProperty("api_key", key.toString());
        obj.add("events", dailyEvents);

        this.events.set(obj.toString());

        context.write(NullWritable.get(), this.events);
    } catch (IOException | InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}

      

This is the dump after setting mapreduce:

INFO mapreduce.Job: Counters: 56 File System Counters FILE: Number of bytes read=40 FILE: Number of bytes written=69703431 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3250 HDFS: Number of bytes written=0 HDFS: Number of read operations=58 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 S3N: Number of bytes read=501370932 S3N: Number of bytes written=0 S3N: Number of read operations=0 S3N: Number of large read operations=0 S3N: Number of write operations=0 Job Counters Failed reduce tasks=4 Killed reduce tasks=1 Launched map tasks=26 Launched reduce tasks=6 Data-local map tasks=26 Total time spent by all maps in occupied slots (ms)=35841984 Total time spent by all reduces in occupied slots (ms)=93264640 Total time spent by all map tasks (ms)=186677 Total time spent by all reduce tasks (ms)=364315 Total vcore-milliseconds taken by all map tasks=186677 Total vcore-milliseconds taken by all reduce tasks=364315 Total megabyte-milliseconds taken by all map tasks=1146943488 Total megabyte-milliseconds taken by all reduce tasks=2984468480 Map-Reduce Framework Map input records=24 Map output records=24 Map output bytes=497227681 Map output materialized bytes=66055825 Input split bytes=3250 Combine input records=0 Combine output records=0 Reduce input groups=0 Reduce shuffle bytes=832 Reduce input records=0 Reduce output records=0 Spilled Records=24 Shuffled Maps =52 Failed Shuffles=0 Merged Map outputs=52 GC time elapsed (ms)=23254 CPU time spent (ms)=274180 Physical memory (bytes) snapshot=25311526912 Virtual memory (bytes) snapshot=183834742784 Total committed heap usage (bytes)=27816099840 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=501370932 File Output Format Counters Bytes Written=0

The cluster used is one emr-5.2.0

with two nodes, where each node is an instance of m3.xlarge . To start the EMR user bank I am yarn jar ...

from Steps (each step is created from Airflow).

Apart from these parameters in the configuration, I do not change anything, so I use the default values. How can I solve the Error: Java Heap Space problem ?

+3


source to share





All Articles