以編程方式創建多個表
您可以使用Python與達美住表以編程方式創建多個表來減少代碼冗餘。
你可能會包含多個流的管道或數據集定義,隻有少量的參數不同。這種冗餘導致管道容易出錯且難以維護。例如,下圖顯示的圖形管道使用消防部門數據集來發現社區最快的響應時間為不同類別的緊急呼叫。在這個例子中,並行流隻相差幾個參數。
三角洲生活與Python示例表元編程
請注意
這個例子中讀取包含在示例數據磚的數據集。因為磚與管道發布數據集不支持統一目錄,這個例子隻能與一個蜂巢metastore管道配置發布。然而,這種模式也適用於統一目錄啟用管道,但你必須讀取數據外部位置。了解更多關於使用統一目錄與達美住表,看看使用統一的目錄與三角洲住表管道。
您可以使用元編程模式來減少生成的開銷和維護冗餘流的定義。元編程在三角洲住表是使用Python內部函數完成的。因為這些功能是懶洋洋地評估,您可以使用它們來創建流,除了輸入參數是相同的。每次調用可以包括一組不同的參數,控製每個表應該如何生成,如以下示例所示。
重要的
因為Python函數與達美住表調用decorator懶洋洋地,當創建數據集在一個循環中你必須調用另一個函數來創建數據集,以確保使用正確的參數值。未能創建數據集在一個單獨的函數結果在多個表,使用參數的最終執行循環。
下麵的示例調用create_table ()
函數在一個循環來創建表t1
和t2
:
defcreate_table(的名字):@dlt。表(的名字=的名字)deft():返回火花。讀。表(的名字)表=(“t1”,“t2”]為t在表:create_table(t)
進口dlt從pyspark.sql.functions進口*@dlt。表(的名字=“raw_fire_department”,評論=“原始表消防部門反應”)@dlt。expect_or_drop(“valid_received”,“收到NOT NULL”)@dlt。expect_or_drop(“valid_response”,“回應不是零”)@dlt。expect_or_drop(“valid_neighborhood”,“社區! =‘沒有’”)defget_raw_fire_department():返回(火花。讀。格式(“csv”)。選項(“頭”,“真正的”)。選項(“多行”,“真正的”)。負載(' / databricks-datasets / timeseries /火災/ Fire_Department_Calls_for_Service.csv ')。withColumnRenamed(“調用類型”,“call_type”)。withColumnRenamed(“收到DtTm”,“收到”)。withColumnRenamed(“響應DtTm”,“回應”)。withColumnRenamed(“Neighborhooods分析邊界”,“社區”)。選擇(“call_type”,“收到”,“回應”,“社區”))all_tables=[]defgenerate_tables(call_table,response_table,過濾器):@dlt。表(的名字=call_table,評論=“頂級表調用類型”)defcreate_call_table():返回(火花。sql(”“”選擇unix_timestamp (, ' M / d / yyyy h: M:年代”)作為ts_received,unix_timestamp(回答說,' M / d / yyyy h: M:年代”)作為ts_responded,社區從LIVE.raw_fire_department在call_type = '{過濾器}””“”。格式(過濾器=過濾器)))@dlt。表(的名字=response_table,評論=“十大社區最快響應時間”)defcreate_response_table():返回(火花。sql(”“”選擇社區,AVG response_time ((ts_received - ts_responded))從生活。{call_table}組1ORDER BY response_time限製10”“”。格式(call_table=call_table)))all_tables。附加(response_table)generate_tables(“alarms_table”,“alarms_response”,“警報”)generate_tables(“fire_table”,“fire_response”,“結構火”)generate_tables(“medical_table”,“medical_response”,“醫療事故”)@dlt。表(的名字=“best_neighborhoods”,評論=”鄰居出現在最好的響應時間列表最“)def總結():target_tables=(dlt。讀(t)為t在all_tables]聯合=functools。減少(λx,y:x。聯盟(y),target_tables)返回(聯合。groupBy(上校(“社區”))。gg(數(“*”)。別名(“分數”))。orderBy(desc(“分數”)))