Spark 分区知识梳理
Spark 分区知识梳理
Spark的Task数目可以说跟分区是一一对应的,分区数目决定了Spark执行Task的并行度,与性能息息相关.
1. 创建RDD、DataFrame时默认的分区数设置
主要由 spark.default.parallelism 和 spark.sql.shuffle.partitions 这两个参数设置分区数.
- spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
- spark.sql.shuffle.partitions则是对Spark SQL专用的设置.
还有一个参数,defaultMinPartitions表示最小分区数.defaultMinPartitions=min(sc.defaultParallelism,2)也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1
上面的公式是在源码里定义的(均在类SparkContext里):
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
2. 如果没有配置spark.default.parallelism时,默认的分区数目
- 当master=local或local[*]时,默认分区数为CPU的核心数
- 当master=local[n],默认为n
- 当master=yarn时, 为分配的所有的Executor的cpu核数的总和或者2,取两者的最大值
3. 读取HDFS文件时的默认分区
使用Spark读取HDFS文件是最常用的,下面记录两种Spark读取HDFS文件的默认分区情况.
3.1. sc.textFile()
3.2. 当HDFS文件很大,有多个分区时, rdd的分区数 = max(hdfs文件的block数目, 调用textFile()传递的第二个参数值)
通常HDFS文件每个block大小为128M(默认配置,具体还要看HDFS的实际配置)
这种方式无论是sc.defaultParallelism(即spark.default.parallelism配置的分区说)大于block数还是sc.defaultParallelism小于block数,
rdd的默认分区数都为block数.
注:之所以说是默认分区,因为textFile可以指定分区数,sc.textFile(path, minPartitions),通过第二个参数可以指定分区数
当用参数指定分区数时,有两种情况,当参数大于block数时,则rdd的分区数为指定的参数值,否则分区数为block数.
也就是说,sc.textFile()读取HDFS文件,不受spark.default.parallelism影响,仅受HDFSblock数量和指定参数控制分区数目.
3.3. 当HDFS时小文件,只有个block时, rdd的分区数 = max(sc.defaultMinPartitions, 调用textFile()传递的第二个参数值)
4. spark.read.csv()
4.1. 大文件(block较多):df的分区数 = max(hdfs文件的block数目, sc.defaultParallelism)
4.2. 小文件(本次测试的block为):df的分区数=,也就是和sc.defaultParallelism无关(一般小文件也没必要用很多分区,一个分区很快就可以处理完成)
赞 赏 微信赞赏
支付宝赞赏