讀和寫協議緩衝區

磚之間提供本機支持序列化和反序列化的Apache火花結構和協議緩衝區(protobuf)。Protobuf支持被實現為一個Apache火花DataFrame變壓器,可以使用結構化流或批處理操作。

如何反序列化和序列化協議緩衝區

在磚運行時12.1及以上的,你可以使用from_protobufto_protobuf函數來進行序列化和反序列化數據。Protobuf序列化流中常用的工作負載。

protobuf的基本語法功能是類似的讀寫功能。你在使用前必須進口這些函數。

from_protobuf投一個二進製列結構,to_protobuf將一個struct列轉換為二進製。你必須提供一個模式指定的注冊表選項確定的參數或描述符文件descFilePath論點。

from_protobuf(數據:“ColumnOrName”,messageName:可選(str]=沒有一個,descFilePath:可選(str]=沒有一個,選項:可選(Dict(str,str]]=沒有一個)to_protobuf(數據:“ColumnOrName”,messageName:可選(str]=沒有一個,descFilePath:可選(str]=沒有一個,選項:可選(Dict(str,str]]=沒有一個)
/ /在使用模式注冊表:from_protobuf(數據:,選項:地圖(字符串,字符串])/ /或者Protobuf描述符文件:from_protobuf(數據:,messageName:字符串,descFilePath:字符串,選項:地圖(字符串,字符串])/ /在使用模式注冊表:to_protobuf(數據:,選項:地圖(字符串,字符串])/ /或者Protobuf描述符文件:to_protobuf(數據:,messageName:字符串,descFilePath:字符串,選項:地圖(字符串,字符串])

下麵的示例說明了如何處理二進製protobuf記錄from_protobuf ()和將火花SQL結構轉換為二進製protobufto_protobuf ()

注冊表使用protobuf彙合的模式

磚支持使用融合性的模式注冊表定義Protobuf。

pyspark.sql.protobuf.functions進口to_protobuf,from_protobufschema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://schema-registry: 8081 /”}#二進製Protobuf轉換為SQL結構與from_protobuf ():proto_events_df=(input_df選擇(from_protobuf(“proto_bytes”,選項=schema_registry_options)別名(“proto_event”)))# SQL結構轉換為二進製與to_protobuf Protobuf ():protobuf_binary_df=(proto_events_dfselectExpr(“結構(名稱、id上下文)事件”)選擇(to_protobuf(“事件”,選項=schema_registry_options)別名(“proto_bytes”)))
進口orgapache火花sqlprotobuf功能_進口scala集合JavaConverters_瓦爾schemaRegistryOptions=地圖(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://schema-registry: 8081 /”)/ /二進製Protobuf轉換為SQL結構與from_protobuf ():瓦爾protoEventsDF=inputDF選擇(from_protobuf(美元“proto_bytes”,選項=schemaRegistryOptionsasJava)作為(“proto_event”))/ / SQL結構轉換為二進製Protobuf to_protobuf ():瓦爾protobufBinaryDF=protoEventsDFselectExpr(“結構(名稱、id上下文)事件”)選擇(to_protobuf(美元“事件”,選項=schemaRegistryOptionsasJava)作為(“proto_bytes”))

注冊驗證外部彙合的模式

外部彙合的模式驗證注冊表,更新您的注冊表模式選項包括身份驗證憑據和API密鑰。

schema_registry_options={“schema.registry.subject”:“app-events-value”,“schema.registry.address”:“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”}
瓦爾schemaRegistryOptions=地圖(“schema.registry.subject”- >“app-events-value”,“schema.registry.address”- >“https://remote-schema-registry-endpoint”,“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:“confluentApiKey: confluentApiSecret”)

使用Protobuf描述符文件

你也可以參考protobuf描述符文件可用來計算集群。確保你有適當的權限來讀取文件,這取決於它的位置。

pyspark.sql.protobuf.functions進口to_protobuf,from_protobufdescriptor_file=“/道路/ / proto_descriptor.desc”proto_events_df=(input_df選擇(from_protobuf(input_df價值,“BasicMessage”,descFilePath=descriptor_file)別名(“原型”)))proto_binary_df=(proto_events_df選擇(to_protobuf(proto_events_df原型,“BasicMessage”,descriptor_file)別名(“字節”)))
進口orgapache火花sqlprotobuf功能_瓦爾descriptorFile=“/道路/ / proto_descriptor.desc”瓦爾protoEventsDF=inputDF選擇(from_protobuf(美元“價值”,“BasicMessage”,descFilePath=descriptorFile)。作為(“原型”))瓦爾protoBytesDF=protoEventsDF選擇(to_protobuf(美元“原型”,“BasicMessage”,descriptorFile)。作為(“字節”))