当前位置: 首页 > news >正文

唐山网站建设服务/seo优化外包公司

唐山网站建设服务,seo优化外包公司,wordpress+论坛偷笑,sns社交网站本期内容: Direct Access Kafka 前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读。但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势&#x…

本期内容:

  • Direct Access

  • Kafka

前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读。但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 

1. 更强的控制自由度 

2. 语义一致性 

其实No Receivers的方式更符合我们读取数据,操作数据的思路的。因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式。 如果要操作数据来源,肯定要有一个封装器,这个封装器一定是RDD类型。 以直接访问Kafka中的数据为例:

object DirectKafkaWordCount {  def main(args: Array[String]) {    val Array(brokers, topics) = args    // Create context with 2 second batch intervalval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(2))    // Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSet    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)    // Get the lines, split them into words, count the words and printval lines = messages.map(_._2)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()    // Start the computationssc.start()ssc.awaitTermination()}
}

Spark Streaming会封装一个KafkaRDD:

/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */private[kafka]class KafkaRDD[  K: ClassTag,  V: ClassTag,  U <: Decoder[_]: ClassTag,  T <: Decoder[_]: ClassTag,  R: ClassTag] private[spark] (    sc: SparkContext,    kafkaParams: Map[String, String],    val offsetRanges: Array[OffsetRange],    leaders: Map[TopicAndPartition, (String, Int)],    messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {  override def getPartitions: Array[Partition] = {offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)}.toArray}
...  override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {    val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))    if (part.fromOffset == part.untilOffset) {log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}")      Iterator.empty} else {      new KafkaRDDIterator(part, context)}}

RDD中重要的方法 getPartitions 和 compute 其中compute中返回了一个 KafkaRDDIterator:

private class KafkaRDDIterator(      part: KafkaRDDPartition,      context: TaskContext) extends NextIterator[R] {    val kc = new KafkaCluster(kafkaParams)...    private def fetchBatch: Iterator[MessageAndOffset] = {      val req = new FetchRequestBuilder().addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).build()      val resp = consumer.fetch(req)handleFetchErr(resp)      // kafka may return a batch that starts before the requested offsetresp.messageSet(part.topic, part.partition).iterator.dropWhile(_.offset < requestOffset)}    override def close(): Unit = {      if (consumer != null) {consumer.close()}}    override def getNext(): R = {      if (iter == null || !iter.hasNext) {iter = fetchBatch}      if (!iter.hasNext) {assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))finished = truenull.asInstanceOf[R]} else {        val item = iter.next()        if (item.offset >= part.untilOffset) {assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))finished = truenull.asInstanceOf[R]} else {requestOffset = item.nextOffsetmessageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))}}}}

其中会调用KafkaCluster的connect方法:

org/apache/spark/streaming/kafka/KafkaCluster.scala  def connect(host: String, port: Int): SimpleConsumer =new SimpleConsumer(host, port, config.socketTimeoutMs,config.socketReceiveBufferBytes, config.clientId)

KafkaCluster的connect方法返回了一个 SimpleConsumer,如果想自定义控制kafka消息的消费,则可自定义Kafka的consumer。

我们再回过头看看:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

实际生成了什么:

 def createDirectStream[    K: ClassTag,    V: ClassTag,    KD <: Decoder[K]: ClassTag,    VD <: Decoder[V]: ClassTag,    R: ClassTag] (      ssc: StreamingContext,      kafkaParams: Map[String, String],      fromOffsets: Map[TopicAndPartition, Long],      messageHandler: MessageAndMetadata[K, V] => R): InputDStream[R] = {    val cleanedHandler = ssc.sc.clean(messageHandler)    new DirectKafkaInputDStream[K, V, KD, VD, R]ssc, kafkaParams, fromOffsets, cleanedHandler)}

生成了一个DirectKafkaInputDStream:

org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    // Report the record number and metadata of this batch interval to InputInfoTracker.val offsetRanges = currentOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp)      OffsetRange(tp.topic, tp.partition, fo, uo.offset)}    val description = offsetRanges.filter { offsetRange =>// Don't display empty ranges.offsetRange.fromOffset != offsetRange.untilOffset}.map { offsetRange =>s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"}.mkString("\n")    // Copy offsetRanges to immutable.List to prevent from being modified by the userval metadata = Map(      "offsets" -> offsetRanges.toList,      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    val inputInfo = StreamInputInfo(id, rdd.count, metadata)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)}

这里面即产生了KafkaRDD实例,一些相关的北京Java培训基本都会讲一些这方面的知识。

我们再重新思考有Receiver和No Receiver的Spark Streaming应用 Direct访问的好处:

 1. 不需要缓存,不会出现OOM等问题(数据缓存在Kafka中) 

 2. 如果采用Receiver的方式,Receiver和Worker上Executor绑定了,不方便做分布式(配置一下也可以做)。如果采用Direct的方式,直接是RDD操作,数据默认分布在多个Executor上,天然就是分布式的。 

 3. 数据消费的问题,在实际操作的时候,如果采用Receiver的方式,如果数据操作来不及消费,Delay多次之后,Spark Streaming程序有可能崩溃。如果是Direct的方式,就不会。

 4. 完全的语义一致性,不会重复消费,且只被消费一次。


http://www.jmfq.cn/news/5177017.html

相关文章:

  • 企业做网站有什么好处坏处/短视频优化
  • WordPress更改网站地址/seo商城
  • 音乐网站建设规划书/搜索引擎关键词怎么优化
  • 一起做网店网站靠谱么/长沙seo关键词
  • 网络推广运营的技巧/南宁seo外包要求
  • 网站推广策划的思路/天津seo霸屏
  • 做网站有2个前提条件_一个是网站/唐山网站建设方案优化
  • 做资料上哪个网站好/手游代理加盟哪个平台最强大
  • 安吉做网站/百度正版下载并安装
  • 忻府区最新疫情/360优化大师最新版的功能
  • 设计自己的名字图画/seoul怎么读
  • 为什么做独立站的人都不止一个网站/汕头网站建设
  • 企业网站制作教程视频/营销型网站的公司
  • 上海做网站推荐/谷歌海外推广
  • 做网站做推广/百度seo优化怎么做
  • 网站建站案例/南阳网站seo
  • 专业营销的网站建设公司/软文写作案例
  • 做招聘长图用什么网站/免费顶级域名注册
  • 北京酒店团购网站建设/营销网站有哪些
  • 里水九江网站建设/郑州网络推广公司排名
  • php 做视频网站/百度账号登录官网
  • wordpress一键拨号/沈阳关键字优化公司
  • 可以用AI做网站上的图吗/滁州网站seo
  • win系统安装wordpress/百度seo查询
  • 深圳福田大型商城网站建设/厦门网站推广公司哪家好
  • 某俄文网站/必应搜索引擎下载
  • 福建泉州网站建设公司哪家好/seo推广任务小结
  • 新乡百度网站推广工具/百度刷排名优化软件
  • 三亚文明城市建设服务中心报名网站/苏州seo优化
  • 营销型网站策划建设/哪里可以做