如果你有AWS CloudWatch訂閱日誌寫出AWS運動,運動流是base64編碼和監測日誌GZIP壓縮。我們麵臨的挑戰是如何解決,在pyspark能夠讀取數據。
我們可以創建一個UDF處理數據是在使用火花結構化流
# #創建UDF來壓縮數據
def
decompress_func (x):
試一試
:
返回
zlib.decompress (x, zlib。MAX_WBITS |
32
).decode (
“utf - 8”
)
除了
異常
作為
艾凡:
返回
str
(e)
#返回錯誤消息,而不是沒有
#注冊UDF
udf_decompress = udf (decompress_func)
DataFrame從運動
運動=火花。readStream \
。
格式
(
“運動”
)\
.option (
“streamName”
stream_name) \
.option (
“地區”
,
地區
)\
.option (
“roleArn”
,角色)\
.option (
“initialPosition”
,
“最早”
)\
.load ()
#應用減壓
運動= kinesis.withColumn (
“uncompressed_data”
,udf_decompress(坳(
“數據”
))).drop (
“partitionKey”
,
“數據”
,
“shardId”
,
“sequenceNumber”
).withColumn (
“LOG_DATE”
to_date (
“approximateArrivalTimestamp”
))
請確認取代stream_name、地區和角色代碼中實際值特定於您的AWS環境。提供的代碼使用火花從AWS運動結構化流中讀取數據,解壓縮使用decompress_func UDF GZIP壓縮數據,並添加一個新列命名uncompressed_data DataFrame包含解壓數據。不必要的列是下降,添加一個新列LOG_DATE approximateArrivalTimestamp通過提取日期