我已經配置了一個三角洲湖沉連接器從AVRO話題中讀取數據,並將其寫入三角洲湖。我跟著文檔和配置下麵的樣子。
{
“名稱”:“dev_test_delta_connector”,
"配置":{
“主題”:“dl_test_avro”,
“input.data。形式at": "AVRO",
”連接器。類”:“io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkConnector”,
“名稱”:“dev_test_delta_connector”,
“kafka.auth。模式”:“SERVICE_ACCOUNT”,
“kafka.service.account.id”:“* * * *”,
“delta.lake.host.name”:“* * * * * *”,
“delta.lake.http。路徑”:“* * * * * * * * *”,
:“delta.lake.database dl_test_db”,
“delta.lake。令牌”:“* * * * * * * * *”,
“delta.lake.table.auto。創造”:“真正的”,
主題:“delta.lake.table.format kafka_ $ {}”,
:“staging.bucket.name dl-test-bucket”,
:“s3.region eu-west-2”,
“staging.s3.access.key.id”:“* * * * * * * * * * * * *”,
“staging.s3.secret.access.key”:“* * * * * * * * * *”,
“confluent.topic.bootstrap.servers”:“* * * * * * * * * * * * * * * *”,
“flush.interval.ms”:“100”,
”任務。馬克斯”:“1”
}
}
我期待連接器自動創建表,但它錯誤為:
“跟蹤”:“org.apache.kafka.connect.errors。ConnectException:退出WorkerSinkTask由於不可恢複的異常。\ n \乙org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask.java: 568) \ n \乙org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java: 326) \ n \乙org.apache.kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java: 228) \ n \乙org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java: 196) \ n \乙org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java: 184) \ n \乙org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java: 234) \ n \乙java.util.concurrent.Executors RunnableAdapter.call美元(Executors.java: 511) \ n \乙java.util.concurrent.FutureTask.run (FutureTask.java: 266) \ n \乙java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149) \ n \乙java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 624) \ n \乙java.lang.Thread.run (Thread.java: 750) \ nCaused: org.apache.kafka.connect.errors。ConnectException:無效的字段模式選擇提供\ n \乙io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.convertFieldSchemaToTableSchema (DatabricksDeltaLakeSinkTask.java: 368) \ n \乙io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.autoCreateTable (DatabricksDeltaLakeSinkTask.java: 309) \ n \乙io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.recordsToPutToS3 (DatabricksDeltaLakeSinkTask.java: 146) \ n \乙io.confluent.connect.databricks.deltalake.DatabricksDeltaLakeSinkTask.put (DatabricksDeltaLakeSinkTask.java: 98) \ n \乙org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask.java: 546) \ n \ t…10 \ n”
任何幫助將不勝感激,謝謝