当前位置: 首页 > news >正文

做网站需学什么条件/宁波seo托管公司

做网站需学什么条件,宁波seo托管公司,网站建设高端网站,自己如何做黑客网站目录 一、说明 1、环境介绍 2、测试业务介绍 3、目的 二、理论基础 1、低效率的代码 2、这样提升效率和灵活性 (1)对KafkaProducer进行封装便于广播 (2)初始化KafkaSink,并广播 (3)使用广播变量发送数据到Kafka 三、完整业务程序展示 1、配置文件sparkStreaming.pr…

目录

一、说明

1、环境介绍

2、测试业务介绍

3、目的

二、理论基础

1、低效率的代码

2、这样提升效率和灵活性

(1)对KafkaProducer进行封装便于广播

(2)初始化KafkaSink,并广播

(3)使用广播变量发送数据到Kafka

三、完整业务程序展示

1、配置文件sparkStreaming.properties

2、动态加载配置文件LoadParameter类

3、主程序Kafka2SparkStreaming2Kafka

四、启动程序测试

1、创建相关kafka主题

2、kafka生产数据到程序

3、从写入到的kafka主题中读取数据


一、说明

1、环境介绍

spark:2.2.1

kafka:kafka_2.10-0.10.2.1

2、测试业务介绍

此篇博客的简单逻辑主要是利用sparkStreaming技术从kafka中读取数据,为了方便暂不进行数据处理,然后将读取的数据直接再存入kafka。

3、目的

掌握sparkStreaming+kafka的运用原理及注意事项,以便在大型项目中运用的游刃有余;

二、理论基础

1、低效率的代码

input.foreachRDD(rdd =>// 不能在这里创建KafkaProducerrdd.foreachPartition(partition =>partition.foreach {case x: String => {val props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")println(x)val producer = new KafkaProducer[String, String](props)val message = new ProducerRecord[String, String]("output", null, x)producer.send(message)}})
)

这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建 KafkaProducer,然后利用 KafkaProducer 进行输出操作,但是由于 KafkaProducer 是不可序列化的,所以不能将 KafkaProducer 实例放在 foreachPartition 外边创建。
上面这种做法会为每条记录都建立一次连接,这种方法是不灵活且低效的。

2、这样提升效率和灵活性

(1)对KafkaProducer进行封装便于广播

package com.cn.utilimport java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {/* This is the key idea that allows us to work around running intoNotSerializableExceptions. */lazy val producer = createProducer()def send(topic: String, key: K, value: V): Future[RecordMetadata] = {producer.send(new ProducerRecord[K, V](topic, key, value))}def send(topic: String, value: V): Future[RecordMetadata] = {producer.send(new ProducerRecord[K, V](topic, value))}
}object KafkaSink {import scala.collection.JavaConversions._def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {val createProducerFunc = () => {val producer = new KafkaProducer[K, V](config)sys.addShutdownHook {// Ensure that, on executor JVM shutdown, the Kafka producer sends// any buffered messages to Kafka before shutting down.producer.close()}producer}new KafkaSink(createProducerFunc)}def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

(2)初始化KafkaSink,并广播

   // 初始化KafkaSink,并广播val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", LoadParameter.getParameter("kafka.brokers"))p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}

(3)使用广播变量发送数据到Kafka

 records.foreach(line=>{//sp2kafka为topickafkaProducer.value.send("sp2kafka",line.value())})

三、完整业务程序展示

1、配置文件sparkStreaming.properties

kafka.brokers=192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667
kafka.topics=testSparkStreaming
kafka.groupId=t02

2、动态加载配置文件LoadParameter类

package com.cn.utilimport java.io.FileInputStream
import java.util.Propertiesimport scala.collection.mutableobject LoadParameter {//动态获取properties文件可配置参数val props = new Properties()def getParameter(s:String): String ={props.load(new FileInputStream("sparkStreaming.properties"))props.getProperty(s)//new String( props.getProperty(s).getBytes("ISO-8859-1"), "utf-8")}
}

3、主程序Kafka2SparkStreaming2Kafka

package com.cn.sparkStreamingimport java.util.Propertiesimport com.cn.util.{KafkaSink, LoadParameter}
import com.google.gson.Gson
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._object Kafka2SparkStreaming2Kafka {/*** 流程:* sparkStreaming从kafka数据接受数据。* 经过分析处理,将结果发送的kafka。* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ThiefCall").setMaster("local[1]")val ssc = new StreamingContext(conf,Seconds(7))//控制日志输出级别ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUGval topic=LoadParameter.getParameter("kafka.topics")val groupId=LoadParameter.getParameter("kafka.groupId")val kafkaParams = Map[String, Object]("bootstrap.servers" -> LoadParameter.getParameter("kafka.brokers"),"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false)val topics=Array(topic)val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //均匀分发到executorConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 初始化KafkaSink,并广播val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", LoadParameter.getParameter("kafka.brokers"))p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}stream.foreachRDD(rdd=>{// 获取每一个分区的消费的偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//业务处理区域rdd.foreachPartition(records=>{//直接获取kafka中数据的两种形式//第一种://        if(records.hasNext){//          println("########:"+records.next().value())//        }//第二种://records.foreach(t=>println("###:"+t.value()))records.foreach(line=>{//sp2kafka为topickafkaProducer.value.send("sp2kafka",line.value())})})// 手动提交偏移量stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}
}

四、启动程序测试

1、创建相关kafka主题

[root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slaves1:2181,slaves2:2181 --replication-factor 3 --partitions 1 --topic sp2kafka
Created topic "sp2kafka"

2、kafka生产数据到程序

[root@master bin]# ./kafka-console-producer.sh --broker-list master:6667 --topic testSparkStreaming
1,11111,Bb,2019-03-12 04:25:22
2,22222,Cc,2019-04-14 05:26:33
3,33333,Dd,2019-05-15 06:29:44

3、从写入到的kafka主题中读取数据

[root@slaves1 bin]# ./kafka-console-consumer.sh --zookeeper master:2181,slaves1:2181,slaves2:2181 --topic sp2kafka
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1,11111,Bb,2019-03-12 04:25:22
2,22222,Cc,2019-04-14 05:26:33
3,33333,Dd,2019-05-15 06:29:44

 

http://www.jmfq.cn/news/4748905.html

相关文章:

  • 什么人适合做服装设计师/搜索引擎推广和优化方案
  • 我想给别人做网站/独立站优化
  • 做韩国护的网站/广东今日最新疫情通报
  • 网站建设评价/论坛营销
  • 青岛网站开发/上海网站seo诊断
  • 周口做网站公司/定西seo排名
  • 西部数码网站管理助手 ftp密码/搜索引擎推广一般包括哪些
  • 信息发布网站模板下载/网络营销推广工具
  • 重庆 网站建设大全福利/站内优化
  • 手机怎么做钓鱼网站/企业网站建设哪家好
  • 视频类网站怎么做/凡科建站收费价目表
  • 网站建设挣钱/seo排名优化资源
  • 怎么做网页下载链接/网站手机版排名seo
  • 网站开发趋势/网站开发月薪多少钱
  • python可以做网站吗/汕头网站建设方案外包
  • 如何做网站联盟/如何开一个自己的网站
  • 深圳外贸建网站/佛山关键词排名工具
  • 网站开发与客户交流/常德seo公司
  • 网站怎么做可以合法让别人充钱/2024近期新闻
  • 贵州省住房建设部网站/上海搜索引擎关键词优化
  • 设计找版面网站/火星培训机构收费明细
  • 虎门做网站/360竞价推广登录入口
  • 做零售外贸网站有哪些/seo搜索优化待遇
  • wordpress 做大网站/龙华百度快速排名
  • Wordpress 商城主题过于臃肿/重庆seo网站运营
  • acg wordpress模板/重庆百度整站优化
  • 找方案的网站/seo平台有哪些
  • 网站程序更换/刷粉网站推广马上刷
  • 数码庄园的网站建设公司/哪个浏览器看黄页最快夸克浏览器
  • 什么程序做网站安全/今天最新的新闻头条新闻