讀和寫流Avro數據

Apache Avro是一種常用的數據序列化係統流的世界。一個典型的解決方案是將數據在Apache卡夫卡Avro格式,元數據融合性的模式注冊表,然後運行查詢的流媒體框架連接到卡夫卡和模式注冊表。

磚支持from_avroto_avro功能在卡夫卡建立與Avro數據流管道在注冊表模式和元數據。這個函數to_avro在Avro二進製格式和編碼專欄from_avro解碼Avro二進製數據列。函數變換到另一列一列,和輸入/輸出的SQL數據類型可以是一個複雜類型或一個原始類型。

請注意

from_avroto_avro功能:

  • 可在PythonScala和Java。

  • 可以傳遞給SQL函數在兩個批處理和流媒體查詢。

也看到Avro文件數據源

基本的例子

類似於from_jsonto_json,你可以使用from_avroto_avro與任何二進製列,但你必須指定Avro手動模式。

進口orgapache火花sqlavro功能_進口orgapacheavroSchemaBuilder/ /當閱讀卡夫卡的鍵和值的話題,解碼/ /二進製(Avro)數據結構化數據。/ /結果DataFrame的模式是:<關鍵:字符串,價值:int >瓦爾df=火花readStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,服務器)選項(“訂閱”,“t”)負載()選擇(from_avro(美元“關鍵”,SchemaBuilder構建器()。stringType())。作為(“關鍵”),from_avro(美元“價值”,SchemaBuilder構建器()。intType())。作為(“價值”))/ /將結構化數據轉換為二進製字符串(鍵列)和/ / int(值列)並保存到卡夫卡的主題。dataDF選擇(to_avro(美元“關鍵”)。作為(“關鍵”),to_avro(美元“價值”)。作為(“價值”))writeStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,服務器)選項(“主題”,“t”)保存()

jsonFormatSchema例子

您還可以指定一個模式作為一個JSON字符串。例如,如果/ tmp / user.avsc是:

{“名稱”:“example.avro”,“類型”:“記錄”,“名稱”:“用戶”,“字段”:({“名稱”:“名稱”,“類型”:“字符串”},{“名稱”:“favorite_color”,“類型”:(“字符串”,“零”]}]}

您可以創建一個JSON字符串:

pyspark.sql.avro.functions進口from_avro,to_avrojsonFormatSchema=開放(“/ tmp / user.avsc”,“r”)()

然後使用模式from_avro:

# 1。解碼Avro數據結構。# 2。過濾器由列“favorite_color”。# 3。以Avro格式編碼的“名稱”列。輸出=df\選擇(from_avro(“價值”,jsonFormatSchema)別名(“用戶”))\在哪裏(的用戶。favorite_color = =“紅色”)\選擇(to_avro(“user.name”)別名(“價值”))

例子與模式注冊表

如果您的集群模式注冊表服務,from_avro可以使用它,這樣你不需要指定Avro手動模式。

請注意

集成模式隻在Scala和Java注冊表是可用的。

進口orgapache火花sqlavro功能_/ /讀卡夫卡的主題“t”,假設鍵和值已經/ /注冊在注冊表模式主題“t鍵”和“值”的類型/ /字符串和整數。二進製轉化為字符串鍵和值列/ /注冊表和int類型Avro和模式。結果DataFrame的模式/ /是:<關鍵:字符串,價值:int >。瓦爾schemaRegistryAddr=“https://myhost: 8081”瓦爾df=火花readStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,服務器)選項(“訂閱”,“t”)負載()選擇(from_avro(美元“關鍵”,“t鍵”,schemaRegistryAddr)。作為(“關鍵”),from_avro(美元“價值”,“值”,schemaRegistryAddr)。作為(“價值”))

to_avro,默認輸出Avro模式可能不匹配模式的目標主題注冊表服務有以下原因:

  • 從火花SQL類型映射到Avro模式不是一對一的。看到支持類型的火花SQL - > Avro轉換

  • 如果轉換輸出Avro模式記錄類型,記錄的名字是topLevelRecord沒有默認名稱空間。

如果默認輸出模式to_avro匹配模式的目標主題,您可以執行以下操作:

/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr)。作為(“價值”))writeStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,服務器)選項(“主題”,“t”)保存()

否則,你必須提供模式的目標對象to_avro功能:

/ / Avro模式的JSON字符串格式的“值”。瓦爾avroSchema=/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr,avroSchema)。作為(“價值”))writeStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,服務器)選項(“主題”,“t”)保存()