開始
加載和管理數據
處理數據
政府
參考和資源
2023年2月27日更新
給我們反饋
Apache Avro是流媒體世界中常用的數據序列化係統。一個典型的解決方案是把Avro格式的數據放在Apache Kafka中,元數據放在合並模式注冊表,然後使用連接到Kafka和Schema Registry的流框架運行查詢。
Databricks支持from_avro而且to_avro功能用Kafka中的Avro數據和Schema Registry中的元數據構建流媒體管道。這個函數to_avro將列編碼為Avro格式的二進製和from_avro將Avro二進製數據解碼為一列。這兩個函數都將一列轉換為另一列,並且輸入/輸出SQL數據類型可以是複雜類型或基本類型。
from_avro
to_avro
請注意
的from_avro而且to_avro功能:
可在Python、Scala和Java。
可以在批處理和流查詢中傳遞給SQL函數。
也看到Avro文件數據源.
類似於from_json而且to_json,你可以使用from_avro而且to_avro使用任何二進製列,但必須手動指定Avro模式。
進口org.apache.火花.sql.avro.功能._進口org.apache.avro.SchemaBuilder//當讀取一個Kafka主題的鍵和值時,解碼//二進製(Avro)數據轉換為結構化數據。//結果DataFrame的模式是:瓦爾df=火花.readStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“訂閱”,“t”).負載().選擇(from_avro($“關鍵”,SchemaBuilder.構建器().stringType())。作為(“關鍵”),from_avro($“價值”,SchemaBuilder.構建器().intType())。作為(“價值”))//將結構化數據從字符串(鍵列)轉換為二進製// int(值列)並保存到Kafka主題。dataDF.選擇(to_avro($“關鍵”).作為(“關鍵”),to_avro($“價值”).作為(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).開始()
還可以將模式指定為JSON字符串。例如,如果/ tmp / user.avsc是:
/ tmp / user.avsc
{“名稱”:“example.avro”,“類型”:“記錄”,“名稱”:“用戶”,“字段”:[{“名稱”:“名稱”,“類型”:“字符串”},{“名稱”:“favorite_color”,“類型”:[“字符串”,“零”]}]}
你可以創建一個JSON字符串:
從pyspark.sql.avro.functions進口from_avro,to_avrojsonFormatSchema=開放(“/ tmp / user.avsc”,“r”).讀()
然後使用中的模式from_avro:
# 1。將Avro數據解碼為一個結構。# 2。過濾列“favorite_color”。# 3。將列“name”編碼為Avro格式。輸出=df\.選擇(from_avro(“價值”,jsonFormatSchema).別名(“用戶”))\.在哪裏(的用戶。favorite_color == "red"')\.選擇(to_avro(“user.name”).別名(“價值”))
如果集群有架構注冊中心服務,from_avro可以使用它,因此您不需要手動指定Avro模式。
下麵的例子演示了讀取Kafka主題“t”,假設鍵和值已經在Schema Registry中注冊為類型的主題“t-key”和“t-value”字符串而且INT:
字符串
INT
進口org.apache.火花.sql.avro.功能._瓦爾schemaRegistryAddr=“https://myhost: 8081”瓦爾df=火花.readStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“訂閱”,“t”).負載().選擇(from_avro($“關鍵”,“t鍵”,schemaRegistryAddr).作為(“關鍵”),from_avro($“價值”,“值”,schemaRegistryAddr).作為(“價值”))
為to_avro,默認輸出Avro模式可能與模式注冊服務中目標主題的模式不匹配,原因如下:
Spark SQL類型到Avro模式的映射不是一對一的。看到Spark SQL -> Avro轉換支持的類型.
如果轉換後的輸出Avro模式為記錄類型,則記錄名稱為topLevelRecord默認情況下沒有命名空間。
topLevelRecord
的默認輸出模式to_avro匹配目標主題的模式,您可以執行以下操作:
//轉換後的數據以Kafka主題“t”的形式保存到Kafka。dataDF.選擇(to_avro($“關鍵”,點燃(“t鍵”),schemaRegistryAddr).作為(“關鍵”),to_avro($“價值”,點燃(“值”),schemaRegistryAddr).作為(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).開始()
方法中提供目標主題的模式to_avro功能:
// JSON字符串格式的主題“t-value”的Avro模式。瓦爾avroSchema=...//轉換後的數據以Kafka主題“t”的形式保存到Kafka。dataDF.選擇(to_avro($“關鍵”,點燃(“t鍵”),schemaRegistryAddr).作為(“關鍵”),to_avro($“價值”,點燃(“值”),schemaRegistryAddr,avroSchema).作為(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).開始()
在Databricks Runtime 12.1及以上版本中,您可以向外部合流模式注冊中心進行身份驗證。下麵的示例演示如何配置模式注冊表選項,以包括身份驗證憑據和API密鑰。
進口org.apache.火花.sql.avro.功能._進口scala.集合.JavaConverters._瓦爾schemaRegistryAddr=“https://confluent-schema-registry-endpoint”瓦爾schemaRegistryOptions=地圖(“confluent.schema.registry.basic.auth.credentials.source”->“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”->“confluentApiKey: confluentApiSecret”)瓦爾df=火花.readStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“訂閱”,“t”).負載().選擇(from_avro($“關鍵”,“t鍵”,schemaRegistryAddr,schemaRegistryOptions.asJava).作為(“關鍵”),from_avro($“價值”,“值”,schemaRegistryAddr,schemaRegistryOptions.asJava).作為(“價值”))//轉換後的數據以Kafka主題“t”的形式保存到Kafka。dataDF.選擇(to_avro($“關鍵”,點燃(“t鍵”),schemaRegistryAddr,schemaRegistryOptions.asJava).作為(“關鍵”),to_avro($“價值”,點燃(“值”),schemaRegistryAddr,schemaRegistryOptions.asJava).作為(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).保存()// JSON字符串格式的主題“t-value”的Avro模式。瓦爾avroSchema=...//轉換後的數據以Kafka主題“t”的形式保存到Kafka。dataDF.選擇(to_avro($“關鍵”,點燃(“t鍵”),schemaRegistryAddr,schemaRegistryOptions.asJava).作為(“關鍵”),to_avro($“價值”,點燃(“值”),schemaRegistryAddr,schemaRegistryOptions.asJava,avroSchema).作為(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).保存()
從pyspark.sql.functions進口上校,點燃從pyspark.sql.avro.functions進口from_avro,to_avroschema_registry_address=“https://confluent-schema-registry-endpoint”schema_registry_options={“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:f"{關鍵}:{秘密}"}df=(火花.readStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“訂閱”,“t”).負載().選擇(from_avro(上校(“關鍵”),上校(“t鍵”),schema_registry_address,schema_registry_options).別名(“關鍵”),from_avro(上校(“價值”),上校(“值”),schema_registry_address,schema_registry_options).別名(“價值”)))#轉換後的數據保存到Kafka作為Kafka主題“t”。data_df.選擇(to_avro(上校(“關鍵”),上校(“t鍵”),schema_registry_address,schema_registry_options).別名(“關鍵”),to_avro(上校(“價值”),上校(“值”),schema_registry_address,schema_registry_options).別名(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).保存()#主題“t-value”的Avro模式,JSON字符串格式。avro_schema=...#轉換後的數據保存到Kafka作為Kafka主題“t”。data_df.選擇(to_avro(上校(“關鍵”),點燃(“t鍵”),schema_registry_address,schema_registry_options).別名(“關鍵”),to_avro(上校(“價值”),點燃(“值”),schema_registry_address,schema_registry_options,avro_schema).別名(“價值”)).writeStream.格式(“卡夫卡”).選項(“kafka.bootstrap.servers”,服務器).選項(“主題”,“t”).保存()