動態集群創建…不是在集群互動
和代碼是:
進口dlt
從pyspark.sql。功能導入*
從pyspark.sql。導入類型*
json_path = " / databricks-datasets wikipedia-datasets /數據2015 - 001 /點擊流/ raw-uncompressed-json / _2_clickstream.json”
@dlt.table (
評論= "原始維基百科點擊流數據集,從/ databricks-datasets攝取。”
)
def clickstream_raw ():
返回(spark.read.format (json) .load (json_path))
@dlt.table (
評論= "維基百科點擊流數據清洗和準備分析。”
)
@dlt。期望(“valid_current_page_title”、“current_page_title不是零”)
@dlt。expect_or_fail (“valid_count”、“click_count > 0”)
def clickstream_prepared ():
回報(
dlt.read (“clickstream_raw”)
.withColumn (“click_count expr(鑄(n為INT)))
.withColumnRenamed (“curr_title”、“current_page_title”)
.withColumnRenamed (“prev_title”、“previous_page_title”)
.select (“current_page_title”、“click_count”、“previous_page_title”)
)
@dlt.table (
評論= "這個表包含頁麵鏈接到Apache火花頁麵頂部。”
)
def top_spark_referrers ():
回報(
dlt.read (“clickstream_prepared”)
.filter (expr (“current_page_title = = Apache_Spark”))
.withColumnRenamed (“previous_page_title”、“referrer”)
.sort (desc (“click_count”))
.select (“referrer”、“click_count”)
.limit (10)
)