EMR MapReduce error: Java heap space
I mostly read every post on SO, but I still have memory issues. I use Airflow to instantiate my work in EMR and everything went great until two days ago each job ran out of memory. Jobs read from the S3 bucket and then I do some aggregation and then I save it back to S3. I have two scheduled job types - hourly and daily - in which I give different settings as shown below:
public static Configuration getJobConfiguration(String interval) {
Configuration jobConfiguration = new Configuration();
jobConfiguration.set("job.name", "JobImport Job");
jobConfiguration.set("mr.mapreduce.framework.name", "yarn);
jobConfiguration.set("fs.defaultFS", "hdfs://xxxx:8020");
jobConfiguration.set("dfs.client.use.datanode.hostname", "true");
jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.best-effort", "true");
if (interval.equals("hourly")) {
jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "1000000");
jobConfiguration.set("mapreduce.reduce.memory.mb", "6144"); // max memory for reducer
jobConfiguration.set("mapreduce.map.memory.mb", "4098"); // max memory for mapper
jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx4098m"); // reducer java heap
jobConfiguration.set("mapreduce.map.java.opts", "-Xmx3075m"); // mapper java heap
}
else {
jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "20000000");
jobConfiguration.set("mapreduce.reduce.memory.mb", "8192"); // max memory for reducer
jobConfiguration.set("mapreduce.map.memory.mb", "6144"); // max memory for mapper
jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx6144m"); // reducer java heap
jobConfiguration.set("mapreduce.map.java.opts", "-Xmx4098m"); // mapper java heap
}
return jobConfiguration;
}
My mapper and Reduce looks like this:
private JsonParser parser = new JsonParser();
private Text apiKeyText = new Text();
private Text eventsText = new Text();
@Override
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] hourlyEvents = line.split("\n");
JsonElement elem;
JsonObject ev;
try {
for (String events : hourlyEvents) {
elem = this.parser.parse(events);
ev = elem.getAsJsonObject();
if(!ev.has("api_key") || !ev.has("events")) {
continue;
}
this.apiKeyText.set(ev.get("api_key").getAsString());
this.eventsText.set(ev.getAsJsonArray("events").toString());
context.write(this.apiKeyText, this.eventsText);
}
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// ------------------
// Separate class
// ------------------
private JsonParser parser = new JsonParser();
private Text events = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
JsonObject obj = new JsonObject();
JsonArray dailyEvents = new JsonArray();
for (Text eventsTmp : values) {
JsonArray tmp = this.parser.parse(eventsTmp.toString()).getAsJsonArray();
for (JsonElement ev: tmp) {
dailyEvents.add(ev);
}
}
obj.addProperty("api_key", key.toString());
obj.add("events", dailyEvents);
this.events.set(obj.toString());
context.write(NullWritable.get(), this.events);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
This is the dump after setting mapreduce:
INFO mapreduce.Job: Counters: 56
File System Counters
FILE: Number of bytes read=40
FILE: Number of bytes written=69703431
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=3250
HDFS: Number of bytes written=0
HDFS: Number of read operations=58
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
S3N: Number of bytes read=501370932
S3N: Number of bytes written=0
S3N: Number of read operations=0
S3N: Number of large read operations=0
S3N: Number of write operations=0
Job Counters
Failed reduce tasks=4
Killed reduce tasks=1
Launched map tasks=26
Launched reduce tasks=6
Data-local map tasks=26
Total time spent by all maps in occupied slots (ms)=35841984
Total time spent by all reduces in occupied slots (ms)=93264640
Total time spent by all map tasks (ms)=186677
Total time spent by all reduce tasks (ms)=364315
Total vcore-milliseconds taken by all map tasks=186677
Total vcore-milliseconds taken by all reduce tasks=364315
Total megabyte-milliseconds taken by all map tasks=1146943488
Total megabyte-milliseconds taken by all reduce tasks=2984468480
Map-Reduce Framework
Map input records=24
Map output records=24
Map output bytes=497227681
Map output materialized bytes=66055825
Input split bytes=3250
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=832
Reduce input records=0
Reduce output records=0
Spilled Records=24
Shuffled Maps =52
Failed Shuffles=0
Merged Map outputs=52
GC time elapsed (ms)=23254
CPU time spent (ms)=274180
Physical memory (bytes) snapshot=25311526912
Virtual memory (bytes) snapshot=183834742784
Total committed heap usage (bytes)=27816099840
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=501370932
File Output Format Counters
Bytes Written=0
The cluster used is one emr-5.2.0
with two nodes, where each node is an instance of m3.xlarge . To start the EMR user bank I am yarn jar ...
from Steps (each step is created from Airflow).
Apart from these parameters in the configuration, I do not change anything, so I use the default values. How can I solve the Error: Java Heap Space problem ?
source to share
No one has answered this question yet
Check out similar questions: