网络营销方案分析/seo项目经理
默认策略-重新计算:如果RDD依赖关系仅是窄依赖,只需要重新计算一次故障分区就可以了;如果RDD依赖关系既有窄依赖又有宽依赖,需要重新计算故障分区之前的所有的分区数据,仅保留故障分区的数据。
RDD进行持久化:将指定RDD的内容持久化自盘或者内存中;在重复计算或者结果复用时,直接从内存或者磁盘中恢复RDD的数据。
RDD持久化Checkpoint:将某个计算过程数据作为检查点持久化保存到共享的分布式文件系统通常是HDFS。
缓存实现RDD的复用:
缓存是一种RDD计算容错的一种手段,程序在RDD数据丢失的时候,可以通过缓存快速计算当前的RDD的值,需要反推出所有的RDD重新计算,因此Spark在需要对某个RDD多次使用的时候,为了提高程序的执行效率,用户可以考虑使用RDD的cache。
测试前需要导入的依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.4</version>
</dependency>
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}object RDDCacheExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("word-count").setMaster("local[2]")val sc = new SparkContext(conf)val value: RDD[String] = sc.textFile("file:///Users/gaozhy/Downloads/184/spark-day5/src/main/resources/access.log")//.cache() //将RDD的数据持久化 内存//.persist(StorageLevel.DISK_ONLY).persist(StorageLevel.MEMORY_AND_DISK)value.count() //统计文本行var begin = System.currentTimeMillis()value.count()var end = System.currentTimeMillis()println("耗时:" + (end - begin)) //耗时:30ms//失效缓存value.unpersist()begin = System.currentTimeMillis()value.count()end = System.currentTimeMillis()println("不使用缓存耗时:" + (end - begin)) //150mssc.stop()}
}
除了调用cache之外,Spark提供了更细粒度的RDD缓存方案,用户可以根据集群的内存状态根据合适的内存状态选择合适的缓存策略。用户可以使用persist方法指定缓存级别。
RDD#persist(StorageLevel.MEMORY_ONLY)//设置缓存级别
存储级别中的
_SER RDD元素序序列化存储
_2 RDD元素备份存储
小结:
内存充足,又要求性能,建议使用MEMORY_ONLY > MEMORY_ONLY_SER
内存不太充足,又想提供性能和容错:MEMORY_AND_DISK > MEMORY_AND_DISK _SER (常用方式)
其中_2系统会备份缓存数据。
Check Point 机制
除了使用缓存机制可以有效的保证RDD的故障恢复,但是如果缓存失效还是会在导致系统重新计算RDD的结果,所以对于一些RDD的lineage较长的场景,计算比较耗时,用户可以尝试使用checkpoint机制存储RDD的计算结果,该种机制和缓存最大的不同在于,使用checkpoint之后被checkpoint的RDD数据直接持久化在文件系统中,一般推荐将结果写在hdfs中,这种checpoint并不会自动清空。注意checkpoint在计算的过程中先是对RDD做mark,在任务执行结束后,再对mark的RDD实行checkpoint,也就是要重新计算被Mark之后的rdd的依赖和结果,因此为了避免Mark RDD重复计算,推荐使用策略。
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}object RDDCheckpointExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("word-count").setMaster("local[2]")val sc = new SparkContext(conf)// 设定一个检查点数据的存放pathsc.setCheckpointDir("hdfs://Spark:9000/184/ck")val value: RDD[String] = sc.textFile("file:///Users/gaozhy/Downloads/184/spark-day5/src/main/resources/access.log")// 将RDD标记为检查点RDDvalue.checkpoint()value.count() //统计文本行var begin = System.currentTimeMillis()value.count()var end = System.currentTimeMillis()println("耗时:" + (end - begin)) //耗时:30ms//失效缓存value.unpersist()begin = System.currentTimeMillis()value.count()end = System.currentTimeMillis()println("不使用缓存耗时:" + (end - begin)) //150mssc.stop()}
}