当前位置: 首页 > news >正文

中国建设教育网证件查询/seo站长工具查询系统

中国建设教育网证件查询,seo站长工具查询系统,中国建设网站用户名,云南网站建设企业一。前述 上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要: “相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算!!!!&#xf…

一。前述

上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要:

相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算!!!!!

二。代码

继续看MapTask任务。

private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapperorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input formatorg.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input splitorg.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output objectif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {   output = new NewOutputCollector(taskContext, job, umbilical, reporter);源码解析一}org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {input.initialize(split, mapperContext);mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}}

解析一。构造OutPut对象:

 NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,JobConf job,TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter);//对应解析源码1.2 partitions = jobContext.getNumReduceTasks();//分区数等于Reduce数,分区数大于分组的概念。if (partitions > 1) {partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);//对应源码1.1} else {partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {@Overridepublic int getPartition(K key, V value, int numPartitions) {return partitions - 1;//用户不设置时默认框架一个reduce,并且分区号为0
          }};}}
  @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));//上下文对象构造写出的值,放在collect缓存区中。
    }


解析1.1

public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);//当用户设置取用户的,没设置默认HashPartitioner 对应解析源码1.1.1

解析源码1.2createSortingCollector类的具体实现

 private <KEY, VALUE> MapOutputCollector<KEY, VALUE>createSortingCollector(JobConf job, TaskReporter reporter)throws IOException, ClassNotFoundException {MapOutputCollector.Context context =new MapOutputCollector.Context(this, job, reporter);Class<?>[] collectorClasses = job.getClasses(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);int remainingCollectors = collectorClasses.length;for (Class clazz : collectorClasses) {try {if (!MapOutputCollector.class.isAssignableFrom(clazz)) {throw new IOException("Invalid output collector class: " + clazz.getName() +" (does not implement MapOutputCollector)");}Class<? extends MapOutputCollector> subclazz =clazz.asSubclass(MapOutputCollector.class);LOG.debug("Trying map output collector class: " + subclazz.getName());MapOutputCollector<KEY, VALUE> collector =ReflectionUtils.newInstance(subclazz, job); collector.init(context);//解析源码对应1.2.1LOG.info("Map output collector class = " + collector.getClass().getName());return collector;} catch (Exception e) {String msg = "Unable to initialize MapOutputCollector " + clazz.getName();if (--remainingCollectors > 0) {msg += " (" + remainingCollectors + " more collector(s) to try)";}LOG.warn(msg, e);}}throw new IOException("Unable to initialize any output collector");}

 解析源码1.2.1 缓冲区collect的初始化

 public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {job = context.getJobConf();reporter = context.getReporter();mapTask = context.getMapTask();mapOutputFile = mapTask.getMapOutputFile();sortPhase = mapTask.getSortPhase();spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);partitions = job.getNumReduceTasks();rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();//sanity checksfinal float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);//缓冲区溢写阈值,final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//缓冲区默认单位是100MindexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT);if (spillper > (float)1.0 || spillper <= (float)0.0) {throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +"\": " + spillper);}if ((sortmb & 0x7FF) != sortmb) {throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);}  sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",QuickSort.class, IndexedSorter.class), job);//Map从缓冲区往磁盘写文件的时候需要排序,用的快排。// buffers and accountingint maxMemUsage = sortmb << 20;maxMemUsage -= maxMemUsage % METASIZE;kvbuffer = new byte[maxMemUsage];bufvoid = kvbuffer.length;kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();setEquator(0);bufstart = bufend = bufindex = equator;kvstart = kvend = kvindex;maxRec = kvmeta.capacity() / NMETA;softLimit = (int)(kvbuffer.length * spillper);bufferRemaining = softLimit;LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);LOG.info("soft limit at " + softLimit);LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
 comparator = job.getOutputKeyComparator();//排序所使用的比较器 见源码解析1,2.1.1
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);
// combiner
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), //map端的组合
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }

      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);//小文件最少是3时,会合并小文件。
      spillThread.setDaemon(true);//线程是另外一个线程负责写的 见解析源码1.2.1.2
      spillThread.setName("SpillThread");
      spillLock.lock();

总结:Mappper输出到缓冲区默认是100M,写到0.8时,会溢写!!!!这块可以调优。通过来回折半来调比如第一次调整50% 然后再80%中减小 70% 然后60%来回折半。

          Combine一定要注意,比如求平均值

 解析1,2.1.1排序比较器的实现

 

 public RawComparator getOutputKeyComparator() {Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);字典排序 默认if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);//如果用户没有设置排序比较器,就是Key类型自己的比较器,所以Key必须实现序列化,反序列化,比较器。}

 

总结:框架默认使用Key的比较器,字典排序 默认,用户也可以覆盖Key的比较器,自定义。!!!

 

解析源码1.2.1.2 溢写线程做的事
protected class SpillThread extends Thread {@Overridepublic void run() {spillLock.lock();spillThreadRunning = true;try {while (true) {spillDone.signal();while (!spillInProgress) {spillReady.await();}try {spillLock.unlock();  sortAndSpill();//排序溢写} catch (Throwable t) {sortSpillException = t;} finally {spillLock.lock();if (bufend < bufstart) {bufvoid = kvbuffer.length;}kvstart = kvend;bufstart = bufend;spillInProgress = false;}}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {spillLock.unlock();spillThreadRunning = false;}}}

总结:Map往缓冲区写入东西,线程把缓冲区中的内容做溢写,开始排序,溢写使用快排!!!Combine也在内存中,buffer也在内存,这些计算逻辑都在内存中,排序算法也在内存中,因为Map方法在内存中,这是第一次Combine,从Buffer产生一堆小文件的时候,然后一堆小文件在合并的时候还会执行一次Combine,这次有条件限制(小文件数量大于3)。

 

 

 

解析源码1.1.1

public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!}
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;!!!重要取分区的写法!!

总结1.以上源码来源于 output = new NewOutputCollector(taskContext, job, umbilical, reporter);所以可得出在输出构造的时候需要构造一个分区器。要么是0的,要么是用户设置的,要么是默认的。
总结2.在输出构造中,有缓冲区的设置。
总结3,以上方法都是OutPut的初始化。
总结4.Map输出的K,V变成K,V,P然后写入到环形缓冲区,内存缓存区80%,然后溢写排序,(先按分区排序,然后再按Key的组排序),然后生成小文件,然后合并,用的归并算法,此时小文件已经是内部有序的,所以使用归并算法,一次io即可。

 

持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

 

 

 

转载于:https://www.cnblogs.com/LHWorldBlog/p/8252953.html

http://www.jmfq.cn/news/5252239.html

相关文章:

  • 网站文章做百度排名/昆明seo网站管理
  • 章丘哪里有建设网站的/怎么做好seo内容优化
  • 阿里巴巴怎么做公司网站/推广app用什么平台比较好
  • 知名高端网站建设服务商/江苏seo平台
  • 简单做网站用什么软件/搭建网站平台需要多少钱
  • 360免费网站建设/百度账号官网
  • 值得相信的西安网站开发/网站一级域名和二级域名区别
  • 北京市网站备案查询/线上推广策划方案范文
  • 东莞常平疫情最新情况/武汉seo招聘
  • php会了 怎么做网站/德州seo整站优化
  • jsp做的网站运行都需要什么/优化网站做什么的
  • 重庆市永川区城乡建设委员会网站/百度seo优化推广公司
  • 班服定制的网站/广州seo优化排名公司
  • 南宁网站建设专家/网络营销策划书结构
  • 营销型网站建设实训总结/全国疫情排行榜最新情况列表
  • 有效方法的小企业网站建设/服务推广软文范例
  • 禁止同ip网站查询/seo是什么东西
  • 族蚂建站/金花关键词工具
  • 网络营销毕业后做什么工作/百度关键词优化大
  • google网站优化器/短视频营销常用平台有
  • 网站更换图片之类的怎么做/app推广拉新接单平台
  • 建站软件公司/品牌推广是做什么的
  • 网页设计规范大全/sem优化公司
  • 黄岐网站制作/如何查询百度收录
  • 钦州做网站/自己如何开网站
  • wordpress免登陆发布模块/seo综合查询平台官网
  • 陵水媒体建站哪家好/百度首页推广广告怎么做
  • 建设部考试中心网站/网站排名优化推广
  • 开设一个网站的费用/百度站长seo
  • 大型建设工程类考试辅导网站/谷歌seo排名技巧