当前位置: 首页 > news >正文

柳州住房和城乡建设部网站/良品铺子网络营销策划书

柳州住房和城乡建设部网站,良品铺子网络营销策划书,主流网站开发软件,临沂网站制作企业1)spark通常把shuffle操作定义为划分stage的边界,其实stage的边界有两种:ShuffleMapTask和ResultTask。ResultTask就是输出结果,输出结果的称为ResultTask,都为引起stage的划分,比如以下代码: r…

1)spark通常把shuffle操作定义为划分stage的边界,其实stage的边界有两种:ShuffleMapTask和ResultTask。ResultTask就是输出结果,输出结果的称为ResultTask,都为引起stage的划分,比如以下代码:

rdd.parallize(1 to 10).foreach(println)

每个stage内部,一定有一个ShuffleMapTask或者是ResultTask,因为这两者是划分stage的依据,是stage之间的边界。一个stage中的所有task最后会以taskSet的形式提交给TaskScheduler去执行,Spark实现了三种不同的TaskScheduler,包括LocalShedulerClusterSchedulerMesosScheduler

2)actions(动作)会生成一个job,触发job的提交,所以我们从客户端提交的一个作业可能会被划分为多个job。但是,如果一个action后没有其他操作,也就是这个action是最后一个操作的话,这个action就独立为一个stage,而非提交一个job。(参考0)

3)task分为ShuffleMapTask和ResultTask(参考1)。

4)广播变量应该广播RDD对应的值,也就是广播RDD.collect()而不是RDD本身,可以使用spark-shell测试。(参考2)

5)使用mapPartitions以分区为单位处理RDD时,定义的函数返回值必须是Iterator类型,同样的,mapPartitionsWithIndex,也必须返回Iterator类型。mapPartitions(iter:Iterator[])有一个Iterator类型的参数,使用以下方式使用mapPartitions:

 1 val rdd1=....
 2 val rdd2=rdd1.mapPartitions(processPartitions)//如果我们定义的函数除了Iterator之外,没有别的参数,就可以以这种方式来使用mapPartitions
 3 
 4 val rdd3=rdd1.mapPartitions((x,y)=>processPartitions2(x,y))//其中x是Iterator类型,y是broad
 5 
 6 //定义processPartitions
 7 .mapPartitions只有一个Iterator参数,所以我们自定义的函数也必须包含这个参数
 8 def processPartitions(iter:Iterator[Int]){//假设分区中每个元素类型是Int
 9 ...
10 
11 }
12 
13 def processPartitions2(iter:Iterator[Int],broad:Array[Double]){//假设分区中每个元素类型是Int
14 ...
15 
16 }

   mapPartitionsIndex的使用与mapPartitions类似,只是mapPartitionsIndex(Idx:Int,iter:Iterator[U])多了一个参数Idx,Idx就是分区的编号。参考3

 6)使用rdd1.zipPartitions(rdd2)(substractTwoPatitionsValue),在iter.hasNext之前,不能使用调用iter变量做任何的操作,这会导致iter变为空,等到使用iter.hasNext时取不到值,错误代码如下:

def substractTwoPatitionsValue(iter1:Iterator[(Int,Array[Double])],iter2:Iterator[(Int,Array[Double])]): Iterator[(Int,Array[Double])] ={val matRows=iter1.length//这里在hasNext之前使用Iter1,可以得到正确值println("matRows:"+matRows)//522println("iter1 length:"+iter1.length)//第二次使用iter就不能取到值了,结果是0//println("iter2 length:"+iter2.length)val resMatrixColumns=21025//21025val u=Array.ofDim[Double](matRows,resMatrixColumns)val keyIndexArray=new Array[Int](matRows)//分区中的key值var keyCounter=(-1)//var keyIndex=0var Value=new Array[Double](resMatrixColumns)var cur:(Int,Array[Double])=(0,new Array[Double](resMatrixColumns))println("before iter1")while(iter1.hasNext){keyCounter+=1println("in iter1")cur=iter1.next()keyIndexArray(keyCounter)=cur._1//这里每个分区里的key/value是按照key逆序排列Value=cur._2for(k<-0 until resMatrixColumns){//right matrix colsu(keyCounter)(k)=Value(k)//将顺序读取到的分区里的值,存储到u中
      }//println(cur._2(1))
    }/************************以上就完成了读取第1个rdd中1个分区的数据的任务*************************************************************//************************接下来读取第2个rdd中1个分区的数据,并根据key和第1个rdd中分区值相加************************************************************///var cc:(Int,Array[Double])=(0,new Array[Double](resMatrixColumns))var keyIndex=0var positionInMatrixu=0println("before iter2")while(iter2.hasNext){println("in iter2")cur=iter2.next()keyIndex=cur._1Value=cur._2positionInMatrixu=keyIndexArray.indexOf(keyIndex)//存储数据时,keyIndexArray和u的索引值通过keyCounter一样对应for(k<-0 until resMatrixColumns){//right matrix colsu(positionInMatrixu)(k)-=Value(k)//完成两个分区值的相加
      }}val newIter=keyIndexArray.zip(u).iterator //结果是有序的,不像SRdd中key是无序的
    newIter/* 使用以上方式产生迭代子keyCounter=(-1)val newIter=u.map(x=>{keyCounter+=1//println("keyIndex:"+keyIndex)(keyIndexArray(keyCounter),x)}).toIteratornewIter*/}

也就是说,在zipPartitions里,各个RDD的迭代子iter1以下面这种形式使用,并且只能使用一次:

while(iter1.hasNext){
.....
}

 7)运行spark作业的时候有时候会出现“Task deserialization Time”过长的问题,如下所示:

也就是“任务反序列化时间”过长,原因是调用rdd的转换操作时,使用了“外部变量”(driver中的一个数组),解决方法是使用“广播变量”将此“外部变量”进行广播。

8)同步、异步、阻塞、非阻赛:参考

 

转载于:https://www.cnblogs.com/lz3018/p/5428250.html

http://www.jmfq.cn/news/4945609.html

相关文章:

  • 临安网站设计/网站seo文章该怎么写
  • 惠州做网站的/百度搜索推广操作简要流程
  • 建设部网站官网 下载规范/品牌词优化
  • 宁波免费做网站/合肥建站公司seo
  • 网上做任务的网站有哪些/资阳市网站seo
  • 深圳做营销网站的公司简介/靖江seo要多少钱
  • 做导航网站用多大的空间/如何免费搭建自己的网站
  • 全国建设部网站/开源crm系统
  • 网站图片怎么做优化/nba录像回放
  • wordpress文章列分页/廊坊推广seo霸屏
  • 深圳宝安高端网站建设报价/站长工具seo排名查询
  • 石家庄做网站哪家公司好/指数函数图像
  • 怎么制作网站客服/阿里云免费域名
  • 网站实施要求/网络营销推广要求
  • 赌博假网站这么做/软文公司代写
  • 丹徒网站建设多少钱/爱站网备案查询
  • 营销型企业网站的建设方案/百度一下你就知道原版
  • 营销型企业网站建设案例/做网络推广工作怎么样
  • 南和邢台网站制作/目前搜索引擎排名
  • 如何查看网站是否开启gzip/seo算法是什么
  • 西安网站建设怎样/福州百度快速优化
  • 如果网站不备案/搜索引擎关键词优化有哪些技巧
  • 公司注册网站需要什么条件/郴州网络推广外包公司
  • 物联网平台层/宁波网站关键词优化排名
  • 常见的网站开发工具/接app推广接单平台
  • wordpress首页循环/seo关键词优化排名公司
  • 晋江网站建设价格多少/超级优化
  • 如何做企业网站建设/网店代运营诈骗
  • 网站业务流程设计/百度云建站
  • seo 能提高网站速度吗/淘宝网站的推广与优化