取消
顯示的結果
而不是尋找
你的意思是:

Protobuf反序列化的磚

sourander
新的貢獻者三世

你好,

假設我有這些東西:

  • 二進製列包含protobuf-serialized數據
  • .proto文件包括消息定義

不同的方法有磚用戶選擇反序列化數據?Python是我主要熟悉的編程語言,因此,任何可以通過使用pyspark就好了。

在本地,我的方法是使用grpc_tool。protoc生成pb2文件(消息。原型- > message_pb2.python)。我後來在進口這些類並使用approariate消息反序列化的二進製數據。示例代碼如下:

進口操作係統從grpc_tools進口進口pkg_resources protoc #詳細打印(f”(建築){protopath}”) #出於某種原因,grpc_tools。protoc不包括_proto模塊導致錯誤:#“google.protobuf。時間戳”沒有定義。path_to_module = pkg_resources。resource_filename (“grpc_tools”、“_proto”) #旗幟args = (grpc_tools。protoc”, #參數不需要0 f”——proto_path = {proto_path}”, f”——python_out = {OUTPUT_PATH}”, f - i {path_to_module}, # f”——grpc_python_out = {OUTPUT_PATH}”, protopath.split (“/”) [1]) protoc.main (args)

我目前的想法是:

  • 執行上麵的代碼使用一個外部的機器。“my_message_derializer創建一個包。輪”,以此作為一個依賴庫的工作/任務/集群。這將需要更新每次原型文件更改使用如git人。
  • 或者,磚,安裝grpcio grpcio-tools,司機上運行和上麵類似的代碼。然後導入創建pb2類並使用消息像往常一樣。

還有其他的方式使用反序列化器和火花?少一點手冊嗎?

14日回複14

sourander
新的貢獻者三世

這可以幫助別人,我將離開這裏的代碼供參考,我使用磚在一個UDF的運行方法。

從輸入迭代器進口@pandas_udf(“字符串”)def my_test_function (blob:迭代器[pd.Series]) (pd - >迭代器。係列:d = ProtoFetcher blob (blob my_message_name):收益率blob.apply new_df = df (d.blob_to_json)。withColumn (“col_as_json my_test_function (“original_col”))

對於小數據集,這正如non-Pandas UDF執行,但我認為這將改變當我規模數據集。

Dan_Z
尊敬的貢獻者

由於@Jani Sourander,你可以這個標記為“最佳答案”這樣的問題是解決,未來用戶的答案很容易找到。

匿名
不適用

@Jani Sourander你介意發你ProtoFetched類中使用的邏輯,我跑到相同的不能泡菜問題protobuf pb2。py文件並試圖重建ProtoFetched類和my_test_function但我仍然收到錯誤。

謝謝你!

sourander
新的貢獻者三世

你好,

ProtoFetcher隻是一個包裝器pb2文件(s)使用protoc創建的。到目前為止,我使用Scala做同樣的把戲(大約快5倍)。

為了避免酸洗的問題,您需要構建ProtoFetcher圖書館到一個輪子文件non-Databricks集群上(比如筆記本電腦)。我個人更喜歡Python詩歌來管理庫和構建過程。然後將這個輪子上傳到S3使用集群圖書館數據磚。

類本身隻是一個包裝器,用於pb2文件:

從google.protobuf.pyext。從google.protobuf cpp_message GeneratedProtocolMessageType進口。從ProtoFetcher json_format MessageToJson進口。PROTO_OUT進口example_message_pb2 #硬編碼實現消息的列表。一個原型可以包含許多有用的消息。known_messages = {" example_message ": {“proto_source”:“example_message。原型”、“階級”:example_message_pb2 proto_messagename”:“ExampleMessageList”、“proto_sub_messagename”:“ExampleMessageValues}}類ProtoFetcher:”“一個類代表一個原型消息。google.protobuf的包裝器。參數- - - - - - - - - - - messagename: str共同服務的名稱。例子- - - - - - - - > > >服務= ProtoFetcher (example_message) > > > serde = service.get_deserializer () > > >“some_field_that_is_part_of_the_proto_message”serde.DESCRIPTOR.fields_by_name.keys()真實”““def __init__(自我,messagename: str):“”“構造原型消息包裝。”" " #輸入檢查messagename不在known_messages.keys():提高NameError (f”服務必須之一:{self.list_message_names()}”) #設置/信息從配置messagename自我。msg_info = known_messages [messagename] #初始化消息self.deserializer = self._init_deserializer_class () def _init_deserializer_class(自我)- > GeneratedProtocolMessageType:““Protobuf的協議消息類型描述符(_pb2。py文件)。 It can be used for serializing and deserializing the messages. """ # Get message = getattr(self.msg_info["class"], self.msg_info["proto_messagename"]) # Some messages include a submessage. This happens at least when # a blob field description is a list of lists. if self.msg_info["proto_sub_messagename"]: message = getattr(message, self.msg_info["proto_sub_messagename"]) return message def get_deserializer(self) -> GeneratedProtocolMessageType: """Getter for the message that can be used for serializing and deserializing the messages.""" return self.deserializer def blob_to_json(self, blob) -> str: """Converts the bytes-like BLOB object to a the human-readable form. Parameters ---------- blob : str/bytes bytes-like object or ASCII string including the contents of the blob field. Returns ------- j : str Stringified JSON containing the deserialized message data. """ # Deserialize the proto deserializer = self.get_deserializer() message = deserializer.FromString(blob) # Convert to JSON and clean it my_message_as_json = MessageToJson(message) j = json.dumps(json.loads(my_message_as_json)) return j

匿名
不適用

你的文章內容是非常迷人的。我非常好奇你的帖子。我渴望得到更多不可思議的帖子。

直接2小時

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map