你好,
假設我有這些東西:
不同的方法有磚用戶選擇反序列化數據?Python是我主要熟悉的編程語言,因此,任何可以通過使用pyspark就好了。
在本地,我的方法是使用grpc_tool。protoc生成pb2文件(消息。原型- > message_pb2.python)。我後來在進口這些類並使用approariate消息反序列化的二進製數據。示例代碼如下:
進口操作係統從grpc_tools進口進口pkg_resources protoc #詳細打印(f”(建築){protopath}”) #出於某種原因,grpc_tools。protoc不包括_proto模塊導致錯誤:#“google.protobuf。時間戳”沒有定義。path_to_module = pkg_resources。resource_filename (“grpc_tools”、“_proto”) #旗幟args = (grpc_tools。protoc”, #參數不需要0 f”——proto_path = {proto_path}”, f”——python_out = {OUTPUT_PATH}”, f - i {path_to_module}, # f”——grpc_python_out = {OUTPUT_PATH}”, protopath.split (“/”) [1]) protoc.main (args)
我目前的想法是:
還有其他的方式使用反序列化器和火花?少一點手冊嗎?
這可以幫助別人,我將離開這裏的代碼供參考,我使用磚在一個UDF的運行方法。
從輸入迭代器進口@pandas_udf(“字符串”)def my_test_function (blob:迭代器[pd.Series]) (pd - >迭代器。係列:d = ProtoFetcher blob (blob my_message_name):收益率blob.apply new_df = df (d.blob_to_json)。withColumn (“col_as_json my_test_function (“original_col”))
對於小數據集,這正如non-Pandas UDF執行,但我認為這將改變當我規模數據集。
你好,
ProtoFetcher隻是一個包裝器pb2文件(s)使用protoc創建的。到目前為止,我使用Scala做同樣的把戲(大約快5倍)。
為了避免酸洗的問題,您需要構建ProtoFetcher圖書館到一個輪子文件non-Databricks集群上(比如筆記本電腦)。我個人更喜歡Python詩歌來管理庫和構建過程。然後將這個輪子上傳到S3使用集群圖書館數據磚。
類本身隻是一個包裝器,用於pb2文件:
從google.protobuf.pyext。從google.protobuf cpp_message GeneratedProtocolMessageType進口。從ProtoFetcher json_format MessageToJson進口。PROTO_OUT進口example_message_pb2 #硬編碼實現消息的列表。一個原型可以包含許多有用的消息。known_messages = {" example_message ": {“proto_source”:“example_message。原型”、“階級”:example_message_pb2 proto_messagename”:“ExampleMessageList”、“proto_sub_messagename”:“ExampleMessageValues}}類ProtoFetcher:”“一個類代表一個原型消息。google.protobuf的包裝器。參數- - - - - - - - - - - messagename: str共同服務的名稱。例子- - - - - - - - > > >服務= ProtoFetcher (example_message) > > > serde = service.get_deserializer () > > >“some_field_that_is_part_of_the_proto_message”serde.DESCRIPTOR.fields_by_name.keys()真實”““def __init__(自我,messagename: str):“”“構造原型消息包裝。”" " #輸入檢查messagename不在known_messages.keys():提高NameError (f”服務必須之一:{self.list_message_names()}”) #設置/信息從配置messagename自我。msg_info = known_messages [messagename] #初始化消息self.deserializer = self._init_deserializer_class () def _init_deserializer_class(自我)- > GeneratedProtocolMessageType:““Protobuf的協議消息類型描述符(_pb2。py文件)。 It can be used for serializing and deserializing the messages. """ # Get message = getattr(self.msg_info["class"], self.msg_info["proto_messagename"]) # Some messages include a submessage. This happens at least when # a blob field description is a list of lists. if self.msg_info["proto_sub_messagename"]: message = getattr(message, self.msg_info["proto_sub_messagename"]) return message def get_deserializer(self) -> GeneratedProtocolMessageType: """Getter for the message that can be used for serializing and deserializing the messages.""" return self.deserializer def blob_to_json(self, blob) -> str: """Converts the bytes-like BLOB object to a the human-readable form. Parameters ---------- blob : str/bytes bytes-like object or ASCII string including the contents of the blob field. Returns ------- j : str Stringified JSON containing the deserialized message data. """ # Deserialize the proto deserializer = self.get_deserializer() message = deserializer.FromString(blob) # Convert to JSON and clean it my_message_as_json = MessageToJson(message) j = json.dumps(json.loads(my_message_as_json)) return j