唐山网站建设服务/seo优化外包公司
-
Direct Access
-
Kafka
object DirectKafkaWordCount { def main(args: Array[String]) { val Array(brokers, topics) = args // Create context with 2 second batch intervalval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and printval lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print() // Start the computationssc.start()ssc.awaitTermination()} }
/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */private[kafka]class KafkaRDD[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag] private[spark] ( sc: SparkContext, kafkaParams: Map[String, String], val offsetRanges: Array[OffsetRange], leaders: Map[TopicAndPartition, (String, Int)], messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = {offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)}.toArray} ... override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) {log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}") Iterator.empty} else { new KafkaRDDIterator(part, context)}}
private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { val kc = new KafkaCluster(kafkaParams)... private def fetchBatch: Iterator[MessageAndOffset] = { val req = new FetchRequestBuilder().addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).build() val resp = consumer.fetch(req)handleFetchErr(resp) // kafka may return a batch that starts before the requested offsetresp.messageSet(part.topic, part.partition).iterator.dropWhile(_.offset < requestOffset)} override def close(): Unit = { if (consumer != null) {consumer.close()}} override def getNext(): R = { if (iter == null || !iter.hasNext) {iter = fetchBatch} if (!iter.hasNext) {assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))finished = truenull.asInstanceOf[R]} else { val item = iter.next() if (item.offset >= part.untilOffset) {assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))finished = truenull.asInstanceOf[R]} else {requestOffset = item.nextOffsetmessageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))}}}}
org/apache/spark/streaming/kafka/KafkaCluster.scala def connect(host: String, port: Int): SimpleConsumer =new SimpleConsumer(host, port, config.socketTimeoutMs,config.socketReceiveBufferBytes, config.clientId)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]ssc, kafkaParams, fromOffsets, cleanedHandler)}
org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.val offsetRanges = currentOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset)} val description = offsetRanges.filter { offsetRange =>// Don't display empty ranges.offsetRange.fromOffset != offsetRange.untilOffset}.map { offsetRange =>s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"}.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the userval metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd)}