一。前述
上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要:
“相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算!!!!!
二。代码
继续看MapTask任务。
private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapperorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input formatorg.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input splitorg.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output objectif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else { output = new NewOutputCollector(taskContext, job, umbilical, reporter);源码解析一}org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {input.initialize(split, mapperContext);mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}}
解析一。构造OutPut对象:
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,JobConf job,TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter);//对应解析源码1.2 partitions = jobContext.getNumReduceTasks();//分区数等于Reduce数,分区数大于分组的概念。if (partitions > 1) {partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);//对应源码1.1} else {partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {@Overridepublic int getPartition(K key, V value, int numPartitions) {return partitions - 1;//用户不设置时默认框架一个reduce,并且分区号为0 }};}}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));//上下文对象构造写出的值,放在collect缓存区中。
}
解析1.1
public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);//当用户设置取用户的,没设置默认HashPartitioner 对应解析源码1.1.1
解析源码1.2createSortingCollector类的具体实现
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>createSortingCollector(JobConf job, TaskReporter reporter)throws IOException, ClassNotFoundException {MapOutputCollector.Context context =new MapOutputCollector.Context(this, job, reporter);Class<?>[] collectorClasses = job.getClasses(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);int remainingCollectors = collectorClasses.length;for (Class clazz : collectorClasses) {try {if (!MapOutputCollector.class.isAssignableFrom(clazz)) {throw new IOException("Invalid output collector class: " + clazz.getName() +" (does not implement MapOutputCollector)");}Class<? extends MapOutputCollector> subclazz =clazz.asSubclass(MapOutputCollector.class);LOG.debug("Trying map output collector class: " + subclazz.getName());MapOutputCollector<KEY, VALUE> collector =ReflectionUtils.newInstance(subclazz, job); collector.init(context);//解析源码对应1.2.1LOG.info("Map output collector class = " + collector.getClass().getName());return collector;} catch (Exception e) {String msg = "Unable to initialize MapOutputCollector " + clazz.getName();if (--remainingCollectors > 0) {msg += " (" + remainingCollectors + " more collector(s) to try)";}LOG.warn(msg, e);}}throw new IOException("Unable to initialize any output collector");}
解析源码1.2.1 缓冲区collect的初始化
public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {job = context.getJobConf();reporter = context.getReporter();mapTask = context.getMapTask();mapOutputFile = mapTask.getMapOutputFile();sortPhase = mapTask.getSortPhase();spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);partitions = job.getNumReduceTasks();rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();//sanity checksfinal float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);//缓冲区溢写阈值,final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//缓冲区默认单位是100MindexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT);if (spillper > (float)1.0 || spillper <= (float)0.0) {throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +"\": " + spillper);}if ((sortmb & 0x7FF) != sortmb) {throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);} sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",QuickSort.class, IndexedSorter.class), job);//Map从缓冲区往磁盘写文件的时候需要排序,用的快排。// buffers and accountingint maxMemUsage = sortmb << 20;maxMemUsage -= maxMemUsage % METASIZE;kvbuffer = new byte[maxMemUsage];bufvoid = kvbuffer.length;kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();setEquator(0);bufstart = bufend = bufindex = equator;kvstart = kvend = kvindex;maxRec = kvmeta.capacity() / NMETA;softLimit = (int)(kvbuffer.length * spillper);bufferRemaining = softLimit;LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);LOG.info("soft limit at " + softLimit);LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
comparator = job.getOutputKeyComparator();//排序所使用的比较器 见源码解析1,2.1.1
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// combiner
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(), //map端的组合
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);//小文件最少是3时,会合并小文件。
spillThread.setDaemon(true);//线程是另外一个线程负责写的 见解析源码1.2.1.2
spillThread.setName("SpillThread");
spillLock.lock();
总结:Mappper输出到缓冲区默认是100M,写到0.8时,会溢写!!!!这块可以调优。通过来回折半来调比如第一次调整50% 然后再80%中减小 70% 然后60%来回折半。
Combine一定要注意,比如求平均值
解析1,2.1.1排序比较器的实现
public RawComparator getOutputKeyComparator() {Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);字典排序 默认if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);//如果用户没有设置排序比较器,就是Key类型自己的比较器,所以Key必须实现序列化,反序列化,比较器。}
总结:框架默认使用Key的比较器,字典排序 默认,用户也可以覆盖Key的比较器,自定义。!!!
解析源码1.2.1.2 溢写线程做的事
protected class SpillThread extends Thread {@Overridepublic void run() {spillLock.lock();spillThreadRunning = true;try {while (true) {spillDone.signal();while (!spillInProgress) {spillReady.await();}try {spillLock.unlock(); sortAndSpill();//排序溢写} catch (Throwable t) {sortSpillException = t;} finally {spillLock.lock();if (bufend < bufstart) {bufvoid = kvbuffer.length;}kvstart = kvend;bufstart = bufend;spillInProgress = false;}}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {spillLock.unlock();spillThreadRunning = false;}}}
总结:Map往缓冲区写入东西,线程把缓冲区中的内容做溢写,开始排序,溢写使用快排!!!Combine也在内存中,buffer也在内存,这些计算逻辑都在内存中,排序算法也在内存中,因为Map方法在内存中,这是第一次Combine,从Buffer产生一堆小文件的时候,然后一堆小文件在合并的时候还会执行一次Combine,这次有条件限制(小文件数量大于3)。
解析源码1.1.1
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!}
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!重要取分区的写法!!
总结1.以上源码来源于 output = new NewOutputCollector(taskContext, job, umbilical, reporter);所以可得出在输出构造的时候需要构造一个分区器。要么是0的,要么是用户设置的,要么是默认的。
总结2.在输出构造中,有缓冲区的设置。
总结3,以上方法都是OutPut的初始化。
总结4.Map输出的K,V变成K,V,P然后写入到环形缓冲区,内存缓存区80%,然后溢写排序,(先按分区排序,然后再按Key的组排序),然后生成小文件,然后合并,用的归并算法,此时小文件已经是内部有序的,所以使用归并算法,一次io即可。
持续更新中。。。。,欢迎大家关注我的公众号LHWorld.