我們正在建設一個DLT管道和自動裝卸機是處理模式演化的罰款。然而,進一步下降流的管道我們正在努力加載數據與apply_changes()函數到一個新表,從它的外貌,似乎沒有處理行更新一個新的模式。然而,在“設置表”不能“org.apache.spark.sql.catalyst.parser。拋出ParseException”錯誤。我能想到的唯一解釋是它不喜歡更換列字段類型的“空”與“結構”。
這是代碼:
@dlt。@dlt視圖(name = " authenticators_stream ")。expect_all_or_drop ({“valid_doc”:“醫生不是零”})def stream_table():返回(火花。readStream \ .format (cloudFiles) \ .option (“cloudFiles。useNotifications”、“真實”)\ .option (“cloudFiles。queueUrl”、“https://sqs.us -東- 1. - amazonaws.com/ * * * * * * * * / mongo-data-queue-testing ") \ .option (“cloudFiles。includeExistingFiles”、“真實”)\ .option (“cloudFiles。格式”、“json”) \ .option (“cloudFiles。在ferColumnTypes", "true") .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \ .option("multiline","false") \ .option("cloudFiles.schemaHints", "_id STRING, ot STRING, ts TIMESTAMP, year INT, month INT, day INT") .load(json_path)) dlt.create_streaming_live_table( name = "authenticators_raw", spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"} ) dlt.apply_changes( target = "authenticators_raw", source = "authenticators_stream", keys = ["_id"], sequence_by = F.col("ts"), stored_as_scd_type = 2 )
這是完整的錯誤信息:
org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR]語法錯誤達到或接近“<”(第1行,pos 6)
SQL = = = =
struct < __v: bigint _id:字符串,buttonlabel:字符串,公司:字符串,配置:struct <參數:struct < company-id:字符串,cyberarkurl:字符串,duo-sso-url:字符串、電子郵件:字符串,google-oauth-url:字符串,login-success-text:字符串,登錄url:字符串,microsofturl:字符串,okta-url:字符串,oktasubdomain:字符串,onelogin-url:字符串,密碼:字符串,payroll-cookies-wait-for-url:字符串,payroll-provider-selector:字符串,ping-identity-url:字符串,請求id:字符串,secureid-url:字符串,子域名:字符串,target-computing-resources-url:字符串,用戶名:字符串,usersname:字符串,wait-for-milliseconds-param-key:字符串,wait-for-xpath-after-navigate:字符串,workday-organization-group-name: string > >,連接器:字符串,createdat:字符串,可選:boolean、updatedat: string >
- - - - - - ^ ^ ^