国家部委政府网站群建设工作/网站优化助手
文章目录
- 一、读取JSON文件简介
- 二、读取JSON文件案例演示
- (一)创建JSON文件并上传到HDFS
- (二)读取JSON文件,创建临时表,进行关联查询
- 1、读取user.json文件,创建临时表t_user
- 2、读取score.json文件,创建临时表t_score
- 3、关联查询生成新的数据帧
- (三)利用json()方法将数据集转成数据帧
- 1、在Spark Shell里交互式完成任务
- 2、在IDEA里编写Scala程序完成任务
一、读取JSON文件简介
Spark SQL可以自动推断JSON文件的Schema,并将其加载为DataFrame。在加载和写入JSON文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的json()方法。该方法不仅可以读写JSON文件,还可以将Dataset[String]类型的数据集转为DataFrame。
需要注意的是,要想成功地将一个JSON文件加载为DataFrame,JSON文件的每一行必须包含一个独立有效的JSON对象,而不能将一个JSON对象分散在多行。
二、读取JSON文件案例演示
(一)创建JSON文件并上传到HDFS
创建user.json文件
{"name": "张三", "gender": "女", "age": 18}
{"name": "李四", "gender": "男", "age": 35}
{"name": "王五", "gender": "女", "age": 24}
上传到HDFS的/input目录
创建score.json文件
{"name": "张三", "score": 98}
{"name": "李四", "score": 88}
{"name": "王五", "score": 91}
上传到HDFS的/input目录
(二)读取JSON文件,创建临时表,进行关联查询
1、读取user.json文件,创建临时表t_user
执行命令:val userdf = spark.read.json(“hdfs://master:9000/input/user.json”)
查看用户数据帧的内容,执行命令:userdf.show()
创建临时表t_user,执行命令:userdf.createTempView(“t_user”)
2、读取score.json文件,创建临时表t_score
执行命令:val scoredf = spark.read.json(“hdfs://master:9000/input/score.json”)
查看成绩数据帧的内容,执行命令:scoredf.show()
创建临时表t_score,执行命令:scoredf.createTempView(“t_score”)
3、关联查询生成新的数据帧
执行命令:val resultdf = spark.sql(“select u.name, u.age, s.score from t_user u inner join t_score s on u.name = s.name”)
查看结果数据帧的内容,执行命令:resultdf.show()
(三)利用json()方法将数据集转成数据帧
1、在Spark Shell里交互式完成任务
创建用户数组:执行命令:val userarr = Array(“{‘name’: ‘Mike’, ‘age’: 18}”, “{‘name’: ‘Alice’, ‘age’: 30}”, “{‘name’: ‘Brown’, ‘age’: 38}”)
基于用户数组创建用户数据集,执行命令:val userds = spark.createDataset(userarr)
将用户数据集转成用户数据帧,执行命令:val userdf = spark.read.json(userds.rdd)(注意要将数据集转成RDD才能作为json()方法的参数)
显示用户数据帧的内容,执行命令:userdf.show()
2、在IDEA里编写Scala程序完成任务
创建Dataset2DataFrame单例对象
package net.army.sql.day01import org.apache.spark.sql.{Dataset, SparkSession}/*** 功能:利用json()方法将数据集转成数据帧* 日期:2023年06月14日* 作者:梁辰兴*/
object Dataset2DataFrame {def main(args: Array[String]): Unit = {// 设置HADOOP用户名属性,否则本地运行访问会被拒绝System.setProperty("HADOOP_USER_NAME", "root")// 创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLDataSource").master("local[*]").getOrCreate()// 导入隐式转换import spark.implicits._// 创建用户数组val userarr = Array("{'name': 'Mike', 'age': 18}","{'name': 'Alice', 'age': 30}","{'name': 'Brown', 'age': 38}")// 基于用户数组创建用户数据集val userds: Dataset[String] = spark.createDataset(userarr)// 将用户数据集转成用户数据帧val userdf = spark.read.json(userds.rdd)// 显示用户数据帧内容userdf.show()}
}
运行程序,查看结果