大家好,我是不溫卜火,是一名計算機學院大資料專業大二的學生,暱稱來源於成語—
不溫不火
,本意是希望自己性情溫和
。作為一名網際網路行業的小白,博主寫部落格一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處於起步階段的萌新。但由於水平有限,部落格中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,部落格主頁:https://buwenbuhuo.blog.csdn.net/
本片博文為大家帶來的是Spark Shuffle 解析。
在所有的 MapReduce 框架中, Shuffle 是連線 map 任務和 reduce 任務的橋樑. map 任務的中間輸出要作為 reduce 任務的輸入, 就必須經過 Shuffle, 所以 Shuffle 的效能的優劣直接決定了整個計算引擎的效能和吞吐量.
相比於 Hadoop 的 MapReduce, 我們將看到 Spark 提供了多種結算結果處理的方式及對 Shuffle 過程進行的多種最佳化.
Shuffle 是所有 MapReduce 計算框架必須面臨的執行階段, Shuffle 用於打通 map 任務的輸出與reduce 任務的輸入.
map 任務的中間輸出結果按照指定的分割槽策略(例如, 按照 key 的雜湊值)分配給處理某一個分割槽的 reduce 任務.
通用的 MapReduce 框架
:
Shuffle 的核心要點
在劃分 Stage 時,最後一個 Stage 稱為finalStage(變數名),它本質上是一個ResultStage型別的物件,前面的所有 Stage 被稱為 ShuffleMapStage。
ShuffleMapStage 的結束伴隨著 shuffle 檔案的寫磁碟。
ResultStage 基本上對應程式碼中的 action 運算元,即將一個函式應用在 RDD的各個partition的資料集上,意味著一個job的執行結束。
一. Shuffle 流程原始碼分析
我們從CoarseGrainedExecutorBackend
開始分析
啟動任務
override def receive: PartialFunction[Any, Unit] = {
case LaunchTask(data) =>
if (executor == null) {
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
// 啟動任務
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
}
Executor.launchTask 方法
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
// Runnable 介面的物件.
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
// 線上程池中執行 task
threadPool.execute(tr)
}
tr.run方法
override def run(): Unit = {
// 更新 task 的狀態
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
try {
// 把任務相關的資料反序列化出來
val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies(serializedTask)
val value = try {
// 開始執行 Task
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
res
} finally {
}
} catch {
} finally {
}
}
Task.run 方法
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
context = new TaskContextImpl(
stageId,
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
localProperties,
metricsSystem,
metrics)
try {
// 執行任務
runTask(context)
} catch {
} finally {
}
}
Task.runTask 方法
Task.runTask是一個抽象方法.
Task 有兩個實現類, 分別執行不同階段的Task
-
ShuffleMapTask
原始碼分析
ShuffleMapTask.runTask 方法
override def runTask(context: TaskContext): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
// 獲取 ShuffleWriter
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 寫出 RDD 中的資料. rdd.iterator 是讀(計算)資料的操作.
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
} catch {
}
}
具體如何把資料寫入到磁碟, 是由ShuffleWriter.write方法來完成.
ShuffleWriter是一個抽象類, 有 3 個實現:
根據在manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)中的dep.shuffleHandle由manager來決定選使用哪種ShuffleWriter.
-
ShuffleManager
ShuffleManage 是一個Trait, 從2.0.0開始就只有一個實現類了: SortShuffleManager
registerShuffle 方法: 匹配出來使用哪種ShuffleHandle
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
getWriter 方法
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
// 根據不同的 Handle, 建立不同的 ShuffleWriter
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K@unchecked, V@unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K@unchecked, V@unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K@unchecked, V@unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
二. HashShuffle 解析
Spark-1.6 之前預設的shuffle方式是hash. 在 spark-1.6版本之後使用Sort-Base Shuffle,因為HashShuffle存在的不足所以就替換了HashShuffle. Spark2.0之後, 從原始碼中完全移除了HashShuffle.
未最佳化的HashShuffle
為了方便分析假設前提:每個 Executor 只有 1 個CPU core,也就是說,無論這個 Executor 上分配多少個 task 執行緒,同一時間都只能執行一個 task 執行緒。
如下圖中有 3個 Reducer,從 Task 開始那邊各自把自己進行 Hash 計算(分割槽器:hash/numreduce取模),分類出3個不同的類別,每個 Task 都分成3種類別的資料,想把不同的資料匯聚然後計算出最終的結果,所以Reducer 會在每個 Task 中把屬於自己類別的資料收集過來,匯聚成一個同類別的大集合,每1個 Task 輸出3份本地檔案,這裡有4個 Mapper Tasks,所以總共輸出了4個 Tasks x 3個分類檔案 = 12個本地小檔案。
缺點:
-
map 任務的中間結果首先存入記憶體(快取), 然後才寫入磁碟. 這對於記憶體的開銷很大, 當一個節點上 map 任務的輸出結果集很大時, 很容易導致記憶體緊張, 發生 OOM
-
生成很多的小檔案. 假設有 M 個 MapTask, 有 N 個 ReduceTask, 則會建立 M * n 個小檔案, 磁碟 I/O 將成為效能瓶頸.
最佳化的HashShuffle
最佳化的 HashShuffle 過程就是啟用合併機制,合併機制就是複用buffer,開啟合併機制的配置是spark.shuffle.consolidateFiles。該引數預設值為false,將其設定為true即可開啟最佳化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。
這裡還是有 4 個Tasks,資料類別還是分成 3 種類型,因為Hash演算法會根據你的 Key 進行分類,在同一個程序中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的資料寫入以Core數量為單位的本地檔案中,(一個Core只有一種型別的Key的資料),每1個Task所在的程序中,分別寫入共同程序中的3份本地檔案,這裡有4個Mapper Tasks,所以總共輸出是 2個Cores x 3個分類檔案 = 6個本地小檔案。
三. SortShuffle 解析
- 1. 普通 SortShuffle
在該模式下,資料會先寫入一個數據結構,reduceByKey 寫入 Map,一邊透過 Map 區域性聚合,一遍寫入記憶體。Join 運算元寫入 ArrayList 直接寫入記憶體中。然後需要判斷是否達到閾值,如果達到就會將記憶體資料結構的資料寫入到磁碟,清空記憶體資料結構。
在溢寫磁碟前,先根據 key 進行排序,排序過後的資料,會分批寫入到磁碟檔案中。預設批次為 10000 條,資料會以每批一萬條寫入到磁碟檔案。寫入磁碟檔案透過緩衝區溢寫的方式,每次溢寫都會產生一個磁碟檔案,也就是說一個 Task 過程會產生多個臨時檔案。
最後在每個 Task 中,將所有的臨時檔案合併,這就是merge過程,此過程將所有臨時檔案讀取出來,一次寫入到最終檔案。意味著一個Task的所有資料都在這一個檔案中。同時單獨寫一份索引檔案,標識下游各個Task的資料在檔案中的索引,start offset和end offset。
- 2.普通 SortShuffle 原始碼解析
write 方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 排序器
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 將 Map 任務的輸出記錄插入到快取中
sorter.insertAll(records)
// 資料 shuffle 資料檔案
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
try { // 將 map 端快取的資料寫入到磁碟中, 並生成 Block 檔案對應的索引檔案.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 記錄各個分割槽資料的長度
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 生成 Block 檔案對應的索引檔案. 此索引檔案用於記錄各個分割槽在 Block檔案中的偏移量, 以便於
// Reduce 任務拉取時使用
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
}
}
- 3 bypassSortShuffle
bypass執行機制的觸發條件如下(必須同時滿足
):
- shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值,預設為200。
- 不是聚合類的shuffle運算元(沒有預聚合)(比如groupByKey)。
此時 task 會為每個 reduce 端的 task 都建立一個臨時磁碟檔案,並將資料按 key 進行 hash 然後根據key 的 hash 值,將 key 寫入對應的磁碟檔案之中。當然,寫入磁碟檔案時也是先寫入記憶體緩衝,緩衝寫滿之後再溢寫到磁碟檔案的。最後,同樣會將所有臨時磁碟檔案都合併成一個磁碟檔案,並建立一個單獨的索引檔案。
該過程的磁碟寫機制其實跟未經最佳化的 HashShuffleManager 是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已。因此少量的最終磁碟檔案,也讓該機制相對未經最佳化的HashShuffleManager來說,shuffle read的效能會更好。 而該機制與普通SortShuffleManager執行機制的不同在於:不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷。
- 4. bypass SortShuffle 原始碼解析
有時候, map 端不需要在持久化資料之前進行排序等操作, 那麼 ShuffleWriter的實現類之一BypassMergeSortShuffleWriter 就可以派上用場了.
觸發 BypassMergeSort
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
// 如果 map 端有聚合, 則不能繞過排序
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
// 分割槽數不能超過200 預設值
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}
本次的分享就到這裡了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持透過學習來獲取更多知識,用知識改變命運,用部落格見證成長,用行動證明我在努力。
如果我的部落格對你有幫助、如果你喜歡我的部落格內容,請“點贊” “評論”“收藏”
一鍵三連哦!聽說點讚的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我部落格看看。
碼字不易,大家的支援就是我堅持下去的動力。點贊後不要忘了關注
我哦!
本文章已修改原文用詞符合繁體字使用者習慣使其容易閱讀
版權宣告:此處為CSDN博主「不溫卜火」的原創文章,依據CC 4.0 BY-SA版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/qq_16146103/article/details/108111101