当前位置: 首页 > news >正文

定制app开发哪家比较好/seo网站排名优化价格

定制app开发哪家比较好,seo网站排名优化价格,写好网页怎么建成网站,四川招标投标网这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理. 其实这个问题非常简单,最容易想到的办法就是直接对Dstream的数据进行filter过滤操作,类似于下面的写法 val data stream.map(_.value()).c…

这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理.

其实这个问题非常简单,最容易想到的办法就是直接对Dstream的数据进行filter过滤操作,类似于下面的写法

val data = stream.map(_.value()).cache()// a topic的数据data.filter(_.contains("a"))// b topic的数据data.filter(_.contains("b"))

但是这种写法的前提是你需要提前知道数据的格式,然后根据两个topic里面数据的不同字段或者不同点去过滤数据然后再分别处理,如果提前不知道数据的类型该怎么办呢? 其实也非常简单,我们先来看下相关的源码.

def createDirectStream[K, V](ssc: StreamingContext,locationStrategy: LocationStrategy,consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]] = {val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
}

从kafka里面获取数据我们常用的是createDirectStream低阶API,也就是Direct方式,可以看到这个方法的返回值是一个InputDStream[ConsumerRecord[K, V]]类型的数据,InputDStream里面的泛型是ConsumerRecord类型的,我们主要来看下ConsumerRecord类里面都有什么信息

public class ConsumerRecord<K, V> {public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;public static final int NULL_SIZE = -1;public static final int NULL_CHECKSUM = -1;private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final long checksum;private final int serializedKeySize;private final int serializedValueSize;private final K key;private final V value;
}

这个类的作用是从kafka接收的键/值对。这也包括一个主题名称和从中接收记录的分区号,它指向一个偏移量记录到Kafka分区中的记录,以及相应的ProducerRecord标记的时间戳 ,也就是说所有的信息都是在这里面封装的,比如key,value,topic,offset等信息都在里面,那就好办了,直接从这里面获取topic信息不就行了吗,其实所有的信息都在这里,只是很多人不知道,一上来就只获取了value的值,到下面foreach里在想获取别的信息已经拿不到了.

代码如下:

stream.map(m => (m.topic(),m.value()))

这里一个map操作把topic和value转换成一个Tuple2,这样做的好处是过滤掉了其他不需要的信息,只保留有用的数据,然后就可以获取到topic的信息了,后面做什么样的处理就看自己的逻辑了,当然这里的数据类型可以定义为任何类型,只要包含有topic的信息即可,我这里只是为了演示.

完整的代码如下:

package sparkimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._object moreTopic {private var stream: InputDStream[ConsumerRecord[String, String]] = nullprivate var offsetRanges: Array[OffsetRange] = nulldef main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)val spark = SparkSession.builder().appName("Spark Jason").master("local[*]").getOrCreate()val sc = spark.sparkContextval scc = new StreamingContext(sc, Seconds(2))val kafkaParams = Map[String, Object]("auto.offset.reset" -> "latest","value.deserializer" -> classOf[StringDeserializer], "key.deserializer" -> classOf[StringDeserializer], "bootstrap.servers" -> "master:9092,storm1:9092,storm2:9092", "group.id" -> "jason_spark", "enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("jason_flink-1", "jason_flink-2")stream = KafkaUtils.createDirectStream[String, String](scc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))stream.map(m => (m.topic(),m.value()))stream.transform(t => {offsetRanges = t.asInstanceOf[HasOffsetRanges].offsetRangest}).map(m => (m.topic(),m.value())).foreachRDD(rdd => {rdd.foreachPartition(fp => {val o = offsetRanges(TaskContext.get.partitionId)fp.foreach(f => {println(s"topic: ${o.topic} partition: ${o.partition} fromOffset: ${o.fromOffset} untilOffset: ${o.untilOffset}")println(f)})})})scc.start()scc.awaitTermination()}
}

为了更加清楚的看到打印的结果,我在代码里把日志级别改成了ERROR级别,运行程序打印的结果如下:

topic: jason_flink-1 partition: 0 fromOffset: 598 untilOffset: 599
(jason_flink-1,{"address":"北京市海淀区","money":100.5,"city":"beijing","name":"jason50","topic":"jason_flink-1","age":25,"ts":1591005156780})
topic: jason_flink-2 partition: 0 fromOffset: 570 untilOffset: 571
(jason_flink-2,{"address":"北京市海淀区","money":100.5,"city":"beijing","name":"jason49","topic":"jason_flink-2","age":26,"ts":1591005155779})
topic: jason_flink-2 partition: 2 fromOffset: 448 untilOffset: 449
(jason_flink-2,{"address":"北京市海淀区","money":100.5,"city":"beijing","name":"jason51","topic":"jason_flink-2","age":26,"ts":1591005157780})

可以看到打印的数据里面第一个元素就是topic的信息,如果你还想获取别的信息也可以通过这种方式获取.

有人可能会觉得多此一举,不加上面那段map转换的代码也可以获取到topic,确实是这样的,我把这行代码删了,到下面的foreach里可以获取到所有的信息.

但是本着不需要的数据提前过滤掉的原则,我习惯在上面加一行转换操作,只留下有用的信息.大家可以自己选择.

推荐阅读:

Flink1.10.0 SQL DDL 把计算结果写入kafka

JasonLee,公众号:JasonLee的博客Flink1.10.0 SQL DDL 把计算结果写入kafka

Flink 1.10.0 SQL DDL中如何定义watermark和计算列

JasonLee,公众号:JasonLee的博客Flink 1.10.0 SQL DDL中如何定义watermark和计算列

FlinkSQL使用DDL语句创建kafka源表

JasonLee,公众号:JasonLee的博客FlinkSQL使用DDL语句创建kafka源表

Flink 状态清除的演进之路

JasonLee,公众号:JasonLee的博客Flink 状态清除的演进之路

Flink动态写入kafka的多个topic

JasonLee,公众号:JasonLee的博客Flink动态写入kafka的多个topic

Flink源码分析: 窗口机制的执行流程

JasonLee,公众号:JasonLee的博客Flink源码分析: 窗口机制的执行流程

更多spark和flink的内容可以关注下面的公众号

http://www.jmfq.cn/news/4871251.html

相关文章:

  • 天津做企业网站公司/亚马逊开店流程及费用
  • 新城建站/西安百度框架户
  • 做网站设计的论文中摘要怎么写/电商入门基础知识
  • 建设网站公司/国际军事新闻最新消息视频
  • 在自己的网站做外链/如何做外贸网站的推广
  • 中山高端网站建设/关键词搜索热度
  • 泸州高端网站建设公司/alexa排名查询统计
  • 网站建设上市公司/整站优化seo公司哪家好
  • 郴州网站制作设计/重庆seo代理计费
  • 青岛网络服务公司/seo 网站优化推广排名教程
  • 回收那个网站做推广好/怎样做推广更有效
  • 装修公司网站开发/今日头条搜索引擎
  • 哪家公司做网站建设比较好/网络广告策划的内容
  • 郑州市做网站的公/优质友情链接
  • 五合一网站做优化好用吗/企业推广网站
  • 网站建设中 模版/什么是百度快照
  • 全网推广开户/seo tdk
  • 做网站需要哪些审核/搜索引擎营销方案例子
  • 关于网站建设live2500/站长工具高清无吗
  • 国内做网站建设好的/良品铺子网络营销策划书
  • php除了做网站/公司网站免费自建
  • 网站设计网站维护/推广seo网站
  • 建设小说网站用什么软件/搜什么关键词你都懂的
  • 泉州网站建设解决方案/网络营销建议
  • 什么是动态网页和静态网页/哈尔滨seo优化软件
  • 免费crm软件/优化关键词排名工具
  • 网站建设的方向和任务/国内免费建网站
  • 个人网站做公司网站/新公司怎么做网络推广
  • 宝鸡外贸网站建设/免费入驻的卖货平台
  • 建站主机/成人技术培训班有哪些种类