讀和寫協議緩衝區
磚之間提供本機支持序列化和反序列化的Apache火花結構和協議緩衝區(protobuf)。Protobuf支持被實現為一個Apache火花DataFrame變壓器,可以使用結構化流或批處理操作。
如何反序列化和序列化協議緩衝區
在磚運行時12.1及以上的,你可以使用from_protobuf
和to_protobuf
函數來進行序列化和反序列化數據。Protobuf序列化流中常用的工作負載。
protobuf的基本語法功能是類似的讀寫功能。你在使用前必須進口這些函數。
from_protobuf
投一個二進製列結構,to_protobuf
將一個struct列轉換為二進製。你必須提供一個模式指定的注冊表選項
確定的參數或描述符文件descFilePath
論點。
from_protobuf(數據:“ColumnOrName”,messageName:可選(str]=沒有一個,descFilePath:可選(str]=沒有一個,選項:可選(Dict(str,str]]=沒有一個)to_protobuf(數據:“ColumnOrName”,messageName:可選(str]=沒有一個,descFilePath:可選(str]=沒有一個,選項:可選(Dict(str,str]]=沒有一個)
/ /在使用模式注冊表:from_protobuf(數據:列,選項:地圖(字符串,字符串])/ /或者Protobuf描述符文件:from_protobuf(數據:列,messageName:字符串,descFilePath:字符串,選項:地圖(字符串,字符串])/ /在使用模式注冊表:to_protobuf(數據:列,選項:地圖(字符串,字符串])/ /或者Protobuf描述符文件:to_protobuf(數據:列,messageName:字符串,descFilePath:字符串,選項:地圖(字符串,字符串])
下麵的示例說明了如何處理二進製protobuf記錄from_protobuf ()
和將火花SQL結構轉換為二進製protobufto_protobuf ()
。
注冊表使用protobuf彙合的模式
磚支持使用融合性的模式注冊表定義Protobuf。
從pyspark.sql.protobuf.functions進口to_protobuf,from_protobufschema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://schema-registry: 8081 /”}#二進製Protobuf轉換為SQL結構與from_protobuf ():proto_events_df=(input_df。選擇(from_protobuf(“proto_bytes”,選項=schema_registry_options)。別名(“proto_event”)))# SQL結構轉換為二進製與to_protobuf Protobuf ():protobuf_binary_df=(proto_events_df。selectExpr(“結構(名稱、id上下文)事件”)。選擇(to_protobuf(“事件”,選項=schema_registry_options)。別名(“proto_bytes”)))
進口org。apache。火花。sql。protobuf。功能。_進口scala。集合。JavaConverters。_瓦爾schemaRegistryOptions=地圖(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://schema-registry: 8081 /”)/ /二進製Protobuf轉換為SQL結構與from_protobuf ():瓦爾protoEventsDF=inputDF。選擇(from_protobuf(美元“proto_bytes”,選項=schemaRegistryOptions。asJava)。作為(“proto_event”))/ / SQL結構轉換為二進製Protobuf to_protobuf ():瓦爾protobufBinaryDF=protoEventsDF。selectExpr(“結構(名稱、id上下文)事件”)。選擇(to_protobuf(美元“事件”,選項=schemaRegistryOptions。asJava)。作為(“proto_bytes”))
注冊驗證外部彙合的模式
外部彙合的模式驗證注冊表,更新您的注冊表模式選項包括身份驗證憑據和API密鑰。
schema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”}
瓦爾schemaRegistryOptions=地圖(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”)
使用Protobuf描述符文件
你也可以參考protobuf描述符文件可用來計算集群。確保你有適當的權限來讀取文件,這取決於它的位置。
從pyspark.sql.protobuf.functions進口to_protobuf,from_protobufdescriptor_file=“/道路/ / proto_descriptor.desc”proto_events_df=(input_df。選擇(from_protobuf(input_df。價值,“BasicMessage”,descFilePath=descriptor_file)。別名(“原型”)))proto_binary_df=(proto_events_df。選擇(to_protobuf(proto_events_df。原型,“BasicMessage”,descriptor_file)。別名(“字節”)))
進口org。apache。火花。sql。protobuf。功能。_瓦爾descriptorFile=“/道路/ / proto_descriptor.desc”瓦爾protoEventsDF=inputDF。選擇(from_protobuf(美元“價值”,“BasicMessage”,descFilePath=descriptorFile)。作為(“原型”))瓦爾protoBytesDF=protoEventsDF。選擇(to_protobuf(美元“原型”,“BasicMessage”,descriptorFile)。作為(“字節”))