當使用randomSplit在DataFrame上,您可能會觀察到不一致的行為。這裏有一個例子:
%python df = spark.read.format('inconsistent_data_source').load() a,b = df. randomsplit ([0.5, 0.5]) a.join(broadcast(b), on='id', how='inner').count()
通常這個查詢返回0.但是,根據基礎數據源或輸入DataFrame的不同,在某些情況下,查詢可能導致超過0條記錄。
這種意外的行為是由這樣一個事實解釋的:跨RDD分區的數據分布不是冪等的,並且可以在查詢執行期間重新排列或更新,從而影響的輸出randomSplit方法。
解決方案
做以下其中一件事:
- 使用顯式的Apache Spark RDD緩存
%python df = inputDF.cache() a,b = df. randomsplit ([0.5, 0.5])
- 按一列或一組列重新分區
%python df = inputDF。重分區(100,'col1') a,b = df.randomSplit([0.5, 0.5])
- 應用聚合函數
%python df = inputDF.groupBy('col1').count() a,b = df. randomsplit ([0.5, 0.5])
這些操作會對數據進行持久化或洗牌,從而使Spark作業的各個分區之間的數據分布保持一致。