做网站需学什么条件/宁波seo托管公司
目录
一、说明
1、环境介绍
2、测试业务介绍
3、目的
二、理论基础
1、低效率的代码
2、这样提升效率和灵活性
(1)对KafkaProducer进行封装便于广播
(2)初始化KafkaSink,并广播
(3)使用广播变量发送数据到Kafka
三、完整业务程序展示
1、配置文件sparkStreaming.properties
2、动态加载配置文件LoadParameter类
3、主程序Kafka2SparkStreaming2Kafka
四、启动程序测试
1、创建相关kafka主题
2、kafka生产数据到程序
3、从写入到的kafka主题中读取数据
一、说明
1、环境介绍
spark:2.2.1
kafka:kafka_2.10-0.10.2.1
2、测试业务介绍
此篇博客的简单逻辑主要是利用sparkStreaming技术从kafka中读取数据,为了方便暂不进行数据处理,然后将读取的数据直接再存入kafka。
3、目的
掌握sparkStreaming+kafka的运用原理及注意事项,以便在大型项目中运用的游刃有余;
二、理论基础
1、低效率的代码
input.foreachRDD(rdd =>// 不能在这里创建KafkaProducerrdd.foreachPartition(partition =>partition.foreach {case x: String => {val props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")println(x)val producer = new KafkaProducer[String, String](props)val message = new ProducerRecord[String, String]("output", null, x)producer.send(message)}})
)
这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建 KafkaProducer,然后利用 KafkaProducer 进行输出操作,但是由于 KafkaProducer 是不可序列化的,所以不能将 KafkaProducer 实例放在 foreachPartition 外边创建。
上面这种做法会为每条记录都建立一次连接,这种方法是不灵活且低效的。
2、这样提升效率和灵活性
(1)对KafkaProducer进行封装便于广播
package com.cn.utilimport java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {/* This is the key idea that allows us to work around running intoNotSerializableExceptions. */lazy val producer = createProducer()def send(topic: String, key: K, value: V): Future[RecordMetadata] = {producer.send(new ProducerRecord[K, V](topic, key, value))}def send(topic: String, value: V): Future[RecordMetadata] = {producer.send(new ProducerRecord[K, V](topic, value))}
}object KafkaSink {import scala.collection.JavaConversions._def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {val createProducerFunc = () => {val producer = new KafkaProducer[K, V](config)sys.addShutdownHook {// Ensure that, on executor JVM shutdown, the Kafka producer sends// any buffered messages to Kafka before shutting down.producer.close()}producer}new KafkaSink(createProducerFunc)}def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
(2)初始化KafkaSink,并广播
// 初始化KafkaSink,并广播val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", LoadParameter.getParameter("kafka.brokers"))p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}
(3)使用广播变量发送数据到Kafka
records.foreach(line=>{//sp2kafka为topickafkaProducer.value.send("sp2kafka",line.value())})
三、完整业务程序展示
1、配置文件sparkStreaming.properties
kafka.brokers=192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667
kafka.topics=testSparkStreaming
kafka.groupId=t02
2、动态加载配置文件LoadParameter类
package com.cn.utilimport java.io.FileInputStream
import java.util.Propertiesimport scala.collection.mutableobject LoadParameter {//动态获取properties文件可配置参数val props = new Properties()def getParameter(s:String): String ={props.load(new FileInputStream("sparkStreaming.properties"))props.getProperty(s)//new String( props.getProperty(s).getBytes("ISO-8859-1"), "utf-8")}
}
3、主程序Kafka2SparkStreaming2Kafka
package com.cn.sparkStreamingimport java.util.Propertiesimport com.cn.util.{KafkaSink, LoadParameter}
import com.google.gson.Gson
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._object Kafka2SparkStreaming2Kafka {/*** 流程:* sparkStreaming从kafka数据接受数据。* 经过分析处理,将结果发送的kafka。* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ThiefCall").setMaster("local[1]")val ssc = new StreamingContext(conf,Seconds(7))//控制日志输出级别ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUGval topic=LoadParameter.getParameter("kafka.topics")val groupId=LoadParameter.getParameter("kafka.groupId")val kafkaParams = Map[String, Object]("bootstrap.servers" -> LoadParameter.getParameter("kafka.brokers"),"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false)val topics=Array(topic)val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //均匀分发到executorConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 初始化KafkaSink,并广播val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", LoadParameter.getParameter("kafka.brokers"))p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}stream.foreachRDD(rdd=>{// 获取每一个分区的消费的偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//业务处理区域rdd.foreachPartition(records=>{//直接获取kafka中数据的两种形式//第一种:// if(records.hasNext){// println("########:"+records.next().value())// }//第二种://records.foreach(t=>println("###:"+t.value()))records.foreach(line=>{//sp2kafka为topickafkaProducer.value.send("sp2kafka",line.value())})})// 手动提交偏移量stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}
}
四、启动程序测试
1、创建相关kafka主题
[root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slaves1:2181,slaves2:2181 --replication-factor 3 --partitions 1 --topic sp2kafka
Created topic "sp2kafka"
2、kafka生产数据到程序
[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667 --topic testSparkStreaming
1,11111,Bb,2019-03-12 04:25:22
2,22222,Cc,2019-04-14 05:26:33
3,33333,Dd,2019-05-15 06:29:44
3、从写入到的kafka主题中读取数据
[root@slaves1 bin]# ./kafka-console-consumer.sh --zookeeper master:2181,slaves1:2181,slaves2:2181 --topic sp2kafka
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1,11111,Bb,2019-03-12 04:25:22
2,22222,Cc,2019-04-14 05:26:33
3,33333,Dd,2019-05-15 06:29:44