跳轉到主要內容
公司博客上

構建一個實時歸因與磚三角洲的管道

通過<一個數據- - - - - -external-link="true" href="//www.eheci.com/blog/author/caryl-yuhas" aria-label="Caryl Yuhas">Caryl Yuhas和<一個數據- - - - - -external-link="true" href="//www.eheci.com/blog/author/denny-lee" aria-label="Denny Lee">丹尼李

2018年8月9日 公司博客上

得到一個O ' reilly的新電子書的早期預覽一步一步的指導你需要開始使用三角洲湖。


在磚試試這個筆記本

在數字廣告,最重要的事情之一,能夠提供給客戶的信息是他們的廣告支出推動的結果。很快我們可以提供這個,越多越好。將轉換或活動印象在一場廣告宣傳活動,公司必須進行歸因。屬性可以是一個相當昂貴的過程,對不斷更新數據集運行歸因是沒有正確的技術挑戰。傳統上,這不是一個容易解決的問題,有很多事情要思考:

  • 我們如何確保數據可以在低延遲寫讀位置沒有損壞記錄嗎?
  • 我們怎樣才能不斷追加很大,query-able數據集沒有爆炸的成本或損失的性能嗎?
  • 和我應該在哪裏以及如何引入歸因的加入嗎?

實時歸因與磚三角洲的管道

幸運的是,與結構化流和磚磚使這項任務變得很容易的三角洲。在這個博客(和相關的筆記本),我們將看看如何使用DataFrame API之上構建結構化的流媒體應用程序動作(對於那些使用Azure磚,可以使用<一個href="https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/streaming-event-hubs" target="_blank">Azure EventHubs,<一個href="https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/kafka" target="_blank">Apache卡夫卡在HDInsight,或<一個href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/cosmosdb-connector" target="_blank">Azure宇宙DB集成),並使用磚三角洲近實時查詢流。我們還將向您展示如何使用BI工具您所選擇的回顧你的歸因實時數據。

定義流

我們需要做的第一件事是建立的印象和轉換數據流。數據流的印象給我們提供了一個實時的視圖屬性與這些客戶服務相關的數字廣告(印象),而轉換流表示客戶已經執行一個動作(例如點擊廣告,購買一個項目,等)根據廣告。beplay体育app下载地址

磚結構的流,您可以快速插入流磚支持直接連接到卡夫卡(<一個href="https://docs.www.eheci.com/spark/latest/structured-streaming/kafka.html" target="_blank">Apache卡夫卡,<一個href="https://aws.amazon.com/msk/" target="_blank">Apache卡夫卡在AWS,<一個href="https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/kafka" target="_blank">Apache卡夫卡在HDInsight),<一個href="https://docs.www.eheci.com/spark/latest/structured-streaming/kinesis.html" target="_blank">運動正如下麵的代碼片段(這是印象,重複這個步驟轉換)

/ /讀印象流val運動= spark.readStream.format (“運動”).option (“streamName”kinesisStreamName).option (“地區”kinesisRegion).option (“initialPosition”,“最新”).option (“awsAccessKey”awsAccessKeyId美元).option (“awsSecretKey”awsSecretKey美元).load ()

接下來,創建數據流模式正如下麵的代碼片段。

/ /定義印象流模式val模式= StructType (Seq (StructField (“uid”StringType,真正的),StructField (“impTimestamp”TimestampType,真正的),StructField (“exchangeID”IntegerType,真正的),StructField (“出版社”StringType,真正的),StructField (“creativeID”IntegerType,真正的),StructField (“點擊”StringType,真正的),StructField (“advertiserID”IntegerType,真正的),StructField (“瀏覽器”StringType,真正的),StructField (“地理”StringType,真正的),StructField (“bidAmount”倍增式,真正的)))

最後,我們想要創造我們的流<我>印象DataFrame。磚顯示的命令,我們將看到數據和輸入/實時處理速度與我們的數據。

//定義流媒體印象DataFrameval小鬼=kinesis.select (from_json (data.cast(“字符串”)、模式)作為“字段”)。選擇(“。*”美元)//視圖印象真正的- - - - - -時間數據顯示器(imp)

同步流磚δ

的印象(小鬼)和轉換(conv)流可以直接同步到磚三角洲允許我們更大程度的靈活性和可伸縮性的實時歸因的用例。t允許您快速寫這些實時數據流為拚花格式S3 / Blob存儲,同時允許用戶同時讀取相同的目錄中沒有的開銷管理一致性、事務性和表現自己。正如下麵的代碼片段,我們從單一來源獲取的原始記錄和寫作成自己的磚三角洲的表。

進口org.apache.spark.sql.SparkSession進口org.apache.spark.sql.expressions.Window/ /印象“小淘氣”數據持久化到磚三角洲imp.withWatermark (“impTimestamp”,“一分鍾”).repartition (1).writeStream.format (“δ”).option (“路徑”,“/ tmp /到場/印象”).option (“checkpointLocation”,“/ tmp /到場/ impressions-checkpoints”).trigger (org.apache.spark.sql.streaming.Trigger.ProcessingTime (“10秒”).start ()

重要的是要注意,在磚δ,您還可以:

  • 應用額外的ETL、分析和/或濃縮步驟
  • 寫不同的溪流或批處理和不同來源的數據在同一個表

為專門報告磚三角洲的視圖

現在我們已經創建了印象和轉換數據磚三角洲表,我們將創建命名的觀點我們可以很容易地執行連接在火花SQL以及使這些數據可用於查詢從您最喜愛的BI工具。我們先開始創建我們的磚三角洲視圖。

%sql使用到場;創建取代視圖impressionsDelta作為選擇*delta.”/tmp/到場/印象”;創建取代視圖conversionsDelta作為選擇*delta.”/tmp/到場/轉換”;

計算實時歸因

現在我們已經建立了我們的磚三角洲的觀點,我們可以計算出最後碰歸因視圖,然後計算<我>加權歸因在視圖中。

計算最後碰歸因視圖

計算實時窗口歸因,指出在前麵的部分中,我們將需要連接兩個不同的三角洲的數據流:<我>印象和<我>轉換。正如下麵的代碼片段中,我們將首先定義我們<我>磚delta-based印象<我>和轉換。我們還將定義窗口規範將由以下使用<我>dense_rank()聲明。窗口和等級定義我們的歸因邏輯。

//定義數據需要的印象val小鬼=火花。sql("select uid as impUid, advertiserID as impAdv, * from sparksummit.imps").下降(“advertiserID”)//定義需要轉換的數據val conv=火花。sql("select * from sparksummit.convs")//定義火花SQL窗口命令通過印象時間戳//分區通過印象Uid印象Advertisorval windowSpec=Window.partitionBy (“impUid”、“impAdv”) .orderBy (desc(“impTimestamp”))//計算真正的- - - - - -時間歸因通過//加入impression.impUid==conversion.uid//確保印象時間之前發生轉換時間//通過dense_rank過濾val windowedAttribution=conv。加入(小鬼,imps.col (“impUid”)===convs.col (“uid”)& &imps.col (“impTimestamp”)
              
              現在我們可以計算真正的- - - - - -時間窗口歸因通過加入的印象轉換數據一起但過濾最近的印象用戶id (作為定義通過impUid)。最後,我們可以創建一個<>全球臨時視圖</>這(.createGlobalTempView)真正的- - - - - -時間視圖可訪問的通過下遊係統。例子中,您可以視圖你的真正的- - - - - -時間數據使用火花SQL下麵的代碼片段。
              
              
%sql選擇*global_temp.realTimeAttribution

,你可以穿上你最喜歡的BI工具等<一個href="https://docs.www.eheci.com/integrations/bi/tableau.html" target="_blank">表進行特別分析你的數據。

計算加權屬性視圖

在前麵的例子中,我們演示了一個簡單模型——隔離所有用戶的印象在轉換之前,選擇最近的,將隻在轉換之前最近的印象。更複雜的模型可以應用歸因windows或體重的印象時間如以下代碼片段。

//定義歸因窗口印象val attrWindow=窗口.partitionBy (“uid”).orderBy ($“impTimestamp”。desc)val attrRank=dense_rank()。(attrWindow)//定義排名窗口通過在歸因窗口//分區通過conversionIDval rankedWindow=窗口.partitionBy (“conversionID”)val numAttrImps=馬克斯(坳(“attrRank”))。(rankedWindow)//參考的印象val小鬼=火花。sql("select * from sparksummit.imps").withColumn("date", $"impTimestamp".(DateType))。下降(“advertiserID”)//引用轉換val conv=火花。sql("select * from sparksummit.convs").withColumnRenamed("uid","cuid")//加入印象轉換val加入=小鬼。加入(conv imps.col (“uid”)===convs.col (“cuid”)& &imps.col (“impTimestamp”)
              
              

討論

在這個博客中,我們回顧了如何磚三角洲實時歸因管道提供了一個簡化的解決方案。使用磚的優點三角洲同步並保存您的數據流包括(但不限於)的能力:

  • 保存並堅持你的實時流數據數據倉庫,因為磚三角洲維護一個事務日誌,有效跟蹤更改表。
  • ,但仍有能力運行您的查詢和實時執行計算,並在幾秒內完成
  • 與磚三角洲,可以有多個作家仍然可以同時修改一個數據集和看到一致的觀點。
  • 作家可以修改一個數據集不幹擾工作讀數據集。
  • 磚三角洲的一個重要的優化是避免了“許多小文件”的問題許多大數據項目,因為它的典型特性自動數據文件管理組織到大文件,這樣就可以有效地閱讀。
  • 統計數據啟用加速讀取10 - 100 x和數據不避免閱讀無關的信息。

流媒體框架和磚一起統一的分析平台,您可以快速構建和使用實時歸因與磚三角洲管道解決你複雜的廣告問題Beplay体育安卓版本<我>在實時

看到所有公司博客上的帖子
Baidu
map