橙子建站 推广/seo综合查询
spark实现往es写入数据 并且支持x-pack
发布时间:2018-08-06 17:52,
浏览次数:2154
, 标签:
spark
es
pack
spark实现往es写入数据 并且支持x-pack
废话不说 直接贴代码
1、pom文件
org.apache.sparkspark-sql_2.11
artifactId> 2.2.0
org.apache.spark spark-core_2.11
2.2.0 org.elasticsearch
elasticsearch-spark-20_2.115.6.5
dependency>
2、scala代码
package com.test.demo import java.util import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch
.spark.sql.EsSparkSQL object SparkToES { def main(args: Array[String]): Unit =
{ //我是本地模式进行写入 到集群去掉setMaster("local[5]") var sconf = new SparkConf().setAppName
(this.getClass.getName).setMaster("local[5]") var sc = new SparkContext(sconf)
sconf.set("es.nodes", "localhost1localhost2,localhost3") sconf.set("es.port",
"9200") sconf.set("es.index.auto.create", "true") sconf.set("es.write.operation"
,"index") sconf.set("es.mapping.id", "id") //如果装有x-pack 可以使用下面方式添加用户名密码 sconf
.set("es.net.http.auth.user","username") sconf.set("es.net.http.auth.pass",
"password") val spark = SparkSession.builder().config(sconf).getOrCreate()
//这里为了方便 直接json字符串转成dataframe 可以根据数据源自行更改 var json="""[{"id":"1","test1":"1","
test2":"2","test3":"3"},{"id":"2","test1":"11","test2":"22","test3":"33"}]"""
import spark.implicits._ var dataset = spark.createDataset(s"$json" :: Nil) var
df=spark.read.json(dataset) //第一个dataframe 第二个形参格式 _index/_type EsSparkSQL
.saveToEs(df,"testsparkes/testsparkes") } }