使用Apache Flink三角洲湖
與所有的部分我們的平台,我們不斷和添加新功能增強開Beplay体育安卓版本發人員構建的應用程序的能力,這將使他們的Lakehouse現實。建築實時應用程序在磚也不例外。等功能異步的檢查點,會話窗口,三角洲生活表允許組織建立更加強大,實時管道在磚使用三角洲湖作為基礎為所有流經lakehouse的數據。
然而,對於組織利用Flink實時轉換,看起來,他們無法利用大三角洲湖和磚的一些特性,但事實並非如此。在這個博客中,我們將探討Flink開發人員可以構建管道Flink應用程序整合到更廣泛的Lakehouse架構。
有狀態Flink應用程序
讓我們使用信用卡公司探索如何做到這一點。
信用卡公司,防止欺詐性交易是賭注對於一個成功的業務。信用卡欺詐帶來的聲譽和收入風險的金融機構,因此,信用卡公司必須有完備的係統來保持不斷警惕防止欺詐性交易。這些組織可能實現監控係統使用Apache Flink,分布式event-at-a-time處理引擎和細粒度的控製流媒體應用程序狀態和時間。
下麵是一個簡單的例子在Flink欺詐檢測應用程序的。它監視事務數量隨著時間的推移和發送警報,如果一個小交易立即緊隨其後的是一個大交易在一分鍾對於任何給定的信用卡帳戶。通過利用Flink的ValueState數據類型和KeyedProcessFunction在一起,開發人員可以實現業務邏輯觸發下遊警報基於事件和時間。
進口org.apache.flink.api.common.state。{ValueState, ValueStateDescriptor}進口org.apache.flink.api.scala.typeutils.Types進口org.apache.flink.configuration.Configuration進口org.apache.flink.streaming.api.functions.KeyedProcessFunction進口org.apache.flink.util.Collector進口org.apache.flink.walkthrough.common.entity.Alert進口org.apache.flink.walkthrough.common.entity.Transaction
對象FraudDetector {val SMALL_AMOUNT:雙=1.00val LARGE_AMOUNT:雙=500.00val ONE_MINUTE:長=60*1000升}@SerialVersionUID(1升)類FraudDetector擴展KeyedProcessFunction(長,事務,警報]{@transient私人varflagState: ValueState [. lang。布爾]= _@transient私人vartimerState: ValueState [. lang。長]= _@throws(例外)覆蓋def開放(參數配置):單元= {val flagDescriptor =新ValueStateDescriptor (“國旗”Types.BOOLEAN)flagState = getRuntimeContext.getState (flagDescriptor)val timerDescriptor =新ValueStateDescriptor (“timer-state”Types.LONG)timerState = getRuntimeContext.getState (timerDescriptor)}覆蓋defprocessElement(事務:事務,背景:KeyedProcessFunction長,交易,提醒#上下文,收集器:收集器[警告]):單元= {/ /獲得當前關鍵的當前狀態val lastTransactionWasSmall = flagState.value/ /檢查是否設置國旗如果(lastTransactionWasSmall ! =零){如果(transaction.getAmount>FraudDetector。LARGE_AMOUNT) {< /span>/ /輸出一個警告下遊val警報=新警報alert.setId (transaction.getAccountId)
collector.collect(警報)}/ /清理我們的狀態清理(上下文)}如果(transaction.getAmount除了發送警報,大多數組織都希望能夠對所有交易進行分析的過程。騙子不斷進化的希望不被對方發現他們使用的技術,所以它很有可能是一個簡單的heuristic-based欺詐檢測應用程序,如上圖,不充分的為防止欺詐活動。組織利用Flink為提醒還需要結合不同的數據集來創建先進的欺詐檢測模型,分析不僅僅是事務性數據,但包括數據點,如帳戶持有人的人口統計信息,以前的采購曆史,交易時間和地點等等。
整合使用雲Flink應用程序對象存儲與三角洲湖下沉
之間有一個權衡非常低延遲操作用例和大數據集上運行性能OLAP。以滿足操作sla和防止欺詐性交易,記錄需要由Flink近盡快收到事件,導致小文件(KBs)的順序Flink應用的下沉。這個“小文件問題”可以導致很差的性能在下遊的查詢,如清單目錄和執行引擎花更多的時間把文件從雲存儲實際上比他們處理這些文件中的數據。考慮拚花一樣的欺詐檢測應用程序寫入事務文件以下模式:
根| - dt:時間戳(可空=真正的)|——accountId:字符串(可空=真正的)|——數量:雙(可空=真正的)| - - -提醒:布爾(可空=真正的)
幸運的是,磚自動加載程序很容易流數據從Flink降落到對象存儲應用程序為下遊三角洲湖表毫升和BI數據。
從pyspark.sql.functions進口上校,date_formatdata_path =“/演示/ flink_delta_blog /交易”delta_silver_table_path =“/演示/ flink_delta_blog / silver_transactions”checkpoint_path =“/演示/ flink_delta_blog /檢查點/ delta_silver”
flink_parquet_schema = spark.read.parquet (data_path) . schema#啟用自動優化處理小文件的問題spark.conf。集(“spark.databricks.delta.optimizeWrite.enabled”,“真正的”)spark.conf。集(“spark.databricks.delta.autoCompact.enabled”,“真正的”)flink_parquet_to_delta_silver = (spark.readStream。格式(“cloudFiles”).option (“cloudFiles.format”,“鋪”). schema (flink_parquet_schema).load (data_path).withColumn (“日期”,date_format(坳(“dt”),“yyyy-MM-dd”))#使用分區下遊三角洲表.withColumnRenamed (“dt”,“時間戳”).writeStream。格式(“δ”).option (“checkpointLocation”checkpoint_path).partitionBy (“日期”).start (delta_silver_table_path))
三角洲湖表自動優化在雲存儲通過壓縮數據的物理布局和索引來減輕下遊小文件的問題,使性能分析。
——進一步優化使用ZORDER表的物理布局。優化delta.”/演示/flink_delta_blog/silver_transactions”ZORDER通過(accountId)
就像裝載器可以將靜態源像雲存儲轉換為流數據源,三角洲湖表也功能流源盡管被存儲在對象存儲。這意味著使用Flink操作用例的組織可以利用這個體係結構模式為流分析在不犧牲他們的實時要求。
streaming_delta_silver_table = (spark.readStream。格式(“δ”).load (delta_silver_table_path)#……額外的流ETL和/或分析……)
使用Apache卡夫卡和三角洲湖Flink集成應用程序
假設信用卡公司希望利用他們的欺詐檢測模型,在磚建造,並在實時模型得分數據。推動文件雲存儲可能不足夠快一些sla欺詐檢測,所以他們可以從他們的Flink寫入數據應用程序消息總線係統就像卡夫卡,AWS動作,或Azure事件中心。一旦數據被寫入卡夫卡,磚的工作可以讀卡夫卡和寫三角洲湖。
對於Flink開發人員,有一個卡夫卡連接器可以結合Flink項目允許DataStream數據API和基於表API的流媒體工作結果寫出到一個組織的卡夫卡集群。注意,寫這個博客,這個連接器Flink不打包,所以您將需要包括卡夫卡連接器JAR項目的構建文件中(即砰的一聲。xml構建。sbt等)。
這裏是如何寫的一個例子的結果在集群Flink主題在卡夫卡DataStream數據:
包spendreport;進口org.apache.flink.streaming.api.datastream.DataStream;進口org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;進口org.apache.flink.walkthrough.common.entity.Transaction;進口org.apache.flink.walkthrough.common.source.TransactionSource;公共類FraudDetectionJob{公共靜態無效主要(String[]參數)拋出異常{StreamExecutionEnvironmentenv=StreamExecutionEnvironment。getExecutionEnvironment();數據流<事務>交易=env。addSource(新TransactionSource ())。的名字(“交易”);字符串經紀人= "輸入- - - - - -代理- - - - - -信息- - - - - -在這裏”KafkaSink<字符串>水槽=KafkaSink<。字符串>構建器()。setBootstrapServers(經紀人)。setRecordSerializer(KafkaRecordSerializationSchema.builder ().setTopic (“交易”).setValueSerializationSchema (新TransactionSchema ()).build ())。構建();交易。sinkTo(水槽)env。執行(“欺詐檢測”);}}< /字符串> < /字符串> < /事務>
現在你可以很容易地利用磚來編寫一個結構化的流媒體應用程序讀卡夫卡的話題Flink DataStream數據的結果寫出來。建立從卡夫卡……
卡夫卡=(spark.readStream.format(“卡夫卡”).option (“kafka.bootstrap。服務器”,kafka_bootstrap_servers_plaintext).option(“訂閱”,“欺詐- - - - - -事件”).option (“startingOffsets”、“最新”).load ())kafkaTransformed=kafka.select (from_json(坳(“價值”)。投(“字符串),模式)\…額外的轉換
一旦數據被圖示,我們可以加載模型和分數microbatch每次觸發後引發的數據流程。機器學習的更詳細的示例模型和結構化流,檢查這篇文章在我們的文檔。
進口pyspark.ml.PipelinepipelineModel=Pipeline.load (“/路徑/來/訓練有素的/模型)streamingPredictions=(pipelineModel.transform (kafkaTransformed).groupBy (" id ").agg ((總和(當(“預測= = =”標簽,1))/數(的標簽)。別名(“真正的預測率”),數('標簽).alias(“計數”)))
現在我們可以寫通過配置writeStreamδ,它指向我們fraud_predictions三角洲湖表。這將使我們能夠構建重要報告我們如何跟蹤和處理欺詐性交易為我們的客戶;beplay体育app下载地址我們甚至可以使用輸出了解隨著時間的推移我們的模型正在做它輸出多少假陽性或準確的評估。
streamingPredictions。writeStream \。格式(“δ”)\.outputMode \(“追加”).option (“checkpointLocation”、“/位置/在雲計算/存儲”)\.table (“fraud_predictions”)
結論
這兩個選項,Flink和自動裝卸機或Flink和卡夫卡,組織仍然可以利用三角洲湖的特性,確保他們將Flink應用程序集成到更廣泛的Lakehouse架構。磚也正在同Flink社區建立一個直接Flink三角洲湖連接器,您可以閱讀更多關於在這裏。