嗨
我有問題,我的“模塊”不知道在一個用戶定義的函數。下麵的精確的消息發布。我有一個回購結構如下:
analytics_pipelines│├──__init__。py│├──coordinate_transformation。py│├──data_quality_checks。py│├──管道。py│└──轉換。py├──delta_live_tables│├──配置││└──data_ingestion。json│└──data_ingestion。py├──dist├──筆記本│├──local_example。ipynb│├──testdata││├──配置。csv││├──輸入。avro││└──測試。鑲木地板├──詩歌。鎖├──pyproject。toml├──README。md├──│測試├──__init__。py│└──test_transformations.py
delta_live_tables文件夾中我有做類似的筆記本
進口sys sys.path.append (' / Workspace /回購/ <用戶> / analytics-data-pipelines analytics_pipelines”)進口管道配置=管道。setup_config(模式,avro_raw_data) pipeline.define_ingestion_pipeline(火花,配置)pipeline.define_summary_tables(火花,配置)
pipeline.define_ingestion_pipeline我定義一群三角洲生活表通過python api。我也導入轉換。py內部管道。py定義必要的數據轉換。
從轉換從coordinate_transformation進口進口*進口dlt apply_coordinate_transform # .....def define_ingestion_pipeline(火花,配置):....@dlt。表(= "發表評論,路徑=…)def table_name (): data = dlt.read_stream(“其他”)返回transform_data(數據)……
一切工作,除非我用python用戶定義函數的轉換。相應的轉換類似:
def coordinate_transform (group_keys pdf) - > pd。DataFrame: trafo = get_coordinate_transformation (group_keys [0])……做一些熊貓代碼返回pdf def apply_coordinate_transform(數據):……模式=數據。模式數據= data.groupBy (serialnumber,…) \ .applyInPandas (coordinate_transform模式=)返回數據
顯然coordinate_transformation。py是不可用的,但是為什麼呢?
錯誤信息:文件“/磚/火花/ python / pyspark /序列化器。py”, 188行,在_read_with_length返回self.loads (obj)文件“/磚/火花/ python / pyspark /序列化器。py”, 540行,在加載cloudpickle返回。負載(obj,編碼=編碼)ModuleNotFoundError:沒有模塊命名為“coordinate_transformation”
(不相關:
可能有人告訴我如何用一個3 * 3矩陣到Nx3大dataframe ?(導致Nx3 dataframe)
問候和感謝
大衛