Hadoop: how are reducer nodes selected?
I just started learning Hadoop but don't understand how datanode becomes a node reducer.
- Once the map task is finished, the contents of the sort buffer will be flushed to the local disk after the KV pairs are sorted and partitioned
- The jobtracker is then notified of the spilled sections.
- After that, the reducers begin to request data from a specific section.
But how does jobtracker decide which node becomes the reducer node? I am reading the Hadoop Definitive manual, but this step is not mentioned in the book.
Thanks, Bruckwald
source to share
To a great extent first-come, first-serve
. Tasks are assigned by beats, so if Tasktracker pings Jobtracker that it is alive, it will receive a response that may contain a new task to run:
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
Here is the corresponding Jobtracker source code . Thus, in addition to the tasktracker being in the first place, the taskcheduler will check the resource conditions (for example, if there is a free slot, or one node is not overloaded).
The relevant code can be found here (which is not particularly interesting):
//
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
//
final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
trackerReduceCapacity);
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
trackerReduceCapacity);
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
break;
}
// Don't assign reduce tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededReducePadding) {
break;
}
}
}
Basically, the first tasktracker that accepts Jobtracker bits and has enough slots available will have pruning problems.
source to share