红色文化网站建设/信息流优化师怎么入行
Pyspark
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_SQL5
#博学谷IT学习技术支持
文章目录
- Pyspark
- 前言
- 一、基于Pandas完成UDF函数
- 二、基于Pandas实现自定义UDAF函数
- 三、基于Pandas实现自定义UDF和UDAF综合案例
- 总结
前言
今天继续分享Pyspark_SQL5。
一、基于Pandas完成UDF函数
自定义Python函数的要求: SeriesToSeries
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as winif __name__ == '__main__':print("spark pandas udf")spark = SparkSession.builder.appName("spark pandas udf").master("local[*]") \.config('spark.sql.shuffle.partitions', 200) \.getOrCreate()spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)schema = StructType().add("a", IntegerType()) \.add("b", IntegerType())df = spark.createDataFrame([(1, 5),(2, 6),(3, 7),(4, 8),], schema=schema)df.createTempView("t1")# 针对DSL格式@F.pandas_udf(returnType=IntegerType())def sum_ab(a: pd.Series, b: pd.Series) -> pd.Series:return a + b# 针对SQL格式spark.udf.register("sum_ab", sum_ab)spark.sql("""select *,sum_ab(a,b) as sum_ab from t1""").show()df.select("*", sum_ab("a", "b").alias("sum_ab")).show()spark.stop()
二、基于Pandas实现自定义UDAF函数
自定义Python函数的要求: SeriesTo标量
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as winif __name__ == '__main__':print("spark pandas udaf")spark = SparkSession.builder.appName("spark pandas udaf").master("local[*]") \.config('spark.sql.shuffle.partitions', 200) \.getOrCreate()spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)schema = StructType().add("a", IntegerType()) \.add("b", IntegerType())df = spark.createDataFrame([(1, 5),(2, 6),(3, 7),(4, 8),], schema=schema)df.createTempView("t1")df.show()# 针对DLS格式@F.pandas_udf(returnType=FloatType())def avg_column(a: pd.Series) -> float:return a.mean()# 针对SQL格式spark.udf.register("avg_column", avg_column)spark.sql("""select *,avg_column(a) over(order by a) as a_avg,avg_column(b) over(order by b) as b_avgfrom t1""").show()df.select("*",avg_column("a").over(win.orderBy("a")).alias("a_avg"),avg_column("b").over(win.orderBy("b")).alias("b_avg")).show()spark.stop()
三、基于Pandas实现自定义UDF和UDAF综合案例
数据:
_c0,对手,胜负,主客场,命中,投篮数,投篮命中率,3分命中率,篮板,助攻,得分
0,勇士,胜,客,10,23,0.435,0.444,6,11,27
1,国王,胜,客,8,21,0.381,0.286,3,9,28
2,小牛,胜,主,10,19,0.526,0.462,3,7,29
3,火箭,负,客,8,19,0.526,0.462,7,9,20
4,快船,胜,主,8,21,0.526,0.462,7,9,28
5,热火,负,客,8,19,0.435,0.444,6,11,18
6,骑士,负,客,8,21,0.435,0.444,6,11,28
7,灰熊,负,主,10,20,0.435,0.444,6,11,27
8,活塞,胜,主,8,19,0.526,0.462,7,9,16
9,76人,胜,主,10,21,0.526,0.462,7,9,28
# 需求1:助攻这一列+10
# 需求2:篮板+助攻的次数
# 需求3:统计胜负的平均分
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as winif __name__ == '__main__':print("spark pandas udf example")spark = SparkSession.builder.appName("spark pandas udf example").master("local[*]") \.config('spark.sql.shuffle.partitions', 200) \.getOrCreate()spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)df = spark.read.format("csv") \.option("header", True).option("inferSchema", True) \.load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/data.csv")df.createTempView("t1")df.printSchema()df.show()# 需求1:助攻这一列+10# 需求2:篮板+助攻的次数# 需求3:统计胜负的平均分@F.pandas_udf(returnType=IntegerType())def method01(score: pd.Series) -> pd.Series:return score + 10@F.pandas_udf(returnType=IntegerType())def method02(score1: pd.Series, score2: pd.Series) -> pd.Series:return score1 + score2@F.pandas_udf(returnType=FloatType())def method03(score: pd.Series) -> float:return score.mean()spark.udf.register("method01", method01)spark.udf.register("method02", method02)spark.udf.register("method03", method03)spark.sql("""select*,method01(`助攻`) as z_10,method02(`助攻`,`篮板`) as z_l_plusfrom t1""").show()#spark.sql("""select`胜负`,method03(`得分`) as avg_scorefrom t1group by `胜负`""").show()df.select("*",method01("助攻").alias("z_10"),method02("助攻", "篮板").alias("z_l_plus")).show()df.select("胜负", "得分").groupBy("胜负").agg(method03("得分").alias("avg_score")).show()spark.stop()
总结
今天主要和大家分享了pandas UDF和UDAF。