讀和寫流Avro數據
Apache Avro是一種常用的數據序列化係統流的世界。一個典型的解決方案是將數據在Apache卡夫卡Avro格式,元數據融合性的模式注冊表,然後運行查詢的流媒體框架連接到卡夫卡和模式注冊表。
磚支持from_avro
和to_avro
功能在卡夫卡建立與Avro數據流管道在注冊表模式和元數據。這個函數to_avro
在Avro二進製格式和編碼專欄from_avro
解碼Avro二進製數據列。函數變換到另一列一列,和輸入/輸出的SQL數據類型可以是一個複雜類型或一個原始類型。
也看到Avro文件數據源。
基本的例子
類似於from_json和to_json,你可以使用from_avro
和to_avro
與任何二進製列,但你必須指定Avro手動模式。
進口org。apache。火花。sql。avro。功能。_進口org。apache。avro。SchemaBuilder/ /當閱讀卡夫卡的鍵和值的話題,解碼/ /二進製(Avro)數據結構化數據。/ /結果DataFrame的模式是:<關鍵:字符串,價值:int >瓦爾df=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(美元“關鍵”,SchemaBuilder。構建器()。stringType())。作為(“關鍵”),from_avro(美元“價值”,SchemaBuilder。構建器()。intType())。作為(“價值”))/ /將結構化數據轉換為二進製字符串(鍵列)和/ / int(值列)並保存到卡夫卡的主題。dataDF。選擇(to_avro(美元“關鍵”)。作為(“關鍵”),to_avro(美元“價值”)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()
jsonFormatSchema例子
您還可以指定一個模式作為一個JSON字符串。例如,如果/ tmp / user.avsc
是:
{“名稱”:“example.avro”,“類型”:“記錄”,“名稱”:“用戶”,“字段”:({“名稱”:“名稱”,“類型”:“字符串”},{“名稱”:“favorite_color”,“類型”:(“字符串”,“零”]}]}
您可以創建一個JSON字符串:
從pyspark.sql.avro.functions進口from_avro,to_avrojsonFormatSchema=開放(“/ tmp / user.avsc”,“r”)。讀()
然後使用模式from_avro
:
# 1。解碼Avro數據結構。# 2。過濾器由列“favorite_color”。# 3。以Avro格式編碼的“名稱”列。輸出=df\。選擇(from_avro(“價值”,jsonFormatSchema)。別名(“用戶”))\。在哪裏(的用戶。favorite_color = =“紅色”)\。選擇(to_avro(“user.name”)。別名(“價值”))
例子與模式注冊表
如果您的集群模式注冊表服務,from_avro
可以使用它,這樣你不需要指定Avro手動模式。
請注意
集成模式隻在Scala和Java注冊表是可用的。
進口org。apache。火花。sql。avro。功能。_/ /讀卡夫卡的主題“t”,假設鍵和值已經/ /注冊在注冊表模式主題“t鍵”和“值”的類型/ /字符串和整數。二進製轉化為字符串鍵和值列/ /注冊表和int類型Avro和模式。結果DataFrame的模式/ /是:<關鍵:字符串,價值:int >。瓦爾schemaRegistryAddr=“https://myhost: 8081”瓦爾df=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(美元“關鍵”,“t鍵”,schemaRegistryAddr)。作為(“關鍵”),from_avro(美元“價值”,“值”,schemaRegistryAddr)。作為(“價值”))
為to_avro
,默認輸出Avro模式可能不匹配模式的目標主題注冊表服務有以下原因:
從火花SQL類型映射到Avro模式不是一對一的。看到支持類型的火花SQL - > Avro轉換。
如果轉換輸出Avro模式記錄類型,記錄的名字是
topLevelRecord
沒有默認名稱空間。
如果默認輸出模式to_avro
匹配模式的目標主題,您可以執行以下操作:
/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()
否則,你必須提供模式的目標對象to_avro
功能:
/ / Avro模式的JSON字符串格式的“值”。瓦爾avroSchema=…/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr,avroSchema)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()