在磚結構化流模式

這包含常見的筆記本和代碼示例模式處理結構化流磚。

開始使用結構化的流

如果你是全新的結構化流,明白了第一個結構化流負載運行

寫卡珊德拉在Python作為結構化流水槽

Apache Cassandra是一個分布式的、低延遲、可伸縮的高度可用的OLTP數據庫。

結構化流與卡桑德拉通過火花卡桑德拉連接器。這個連接器支持抽樣和DataFrame api,它有原生支持編寫流數據。*重要*您必須使用相應的版本的spark-cassandra-connector-assembly

下麵的示例連接到一個或多個主機在卡桑德拉數據庫集群。它還指定了連接配置如檢查點位置和具體用於和表名:

火花相依(“spark.cassandra.connection.host”,“host1 host2”)dfwriteStream\格式(“org.apache.spark.sql.cassandra”)\outputMode(“添加”)\選項(“checkpointLocation”,“/道路/ /檢查站”)\選項(“用於”,“keyspace_name”)\選項(“表”,“table_name”)\開始()

寫Azure突觸分析使用foreachBatch ()在Python中

streamingDF.writeStream.foreachBatch ()允許重用現有的一批作家寫的輸出數據流查詢Azure突觸分析。看到foreachBatch文檔獲取詳細信息。

要運行這個示例,您需要Azure突觸分析連接器。在Azure突觸分析連接器的詳細信息,請參見在Azure突觸分析查詢數據

pyspark.sql.functions進口*pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df\格式(“com.databricks.spark.sqldw”)\模式(“覆蓋”)\選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\選項(“forward_spark_azure_storage_credentials”,“真正的”)\選項(“數據表”,“my_table_in_dw_copy”)\選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\保存()火花相依(“spark.sql.shuffle.partitions”,“1”)查詢=(火花readStream格式(“速度”)負載()selectExpr(“值% 10鍵”)groupBy(“關鍵”)()toDF(“關鍵”,“數”)writeStreamforeachBatch(writeToSQLWarehouse)outputMode(“更新”)開始())

寫信給亞馬遜DynamoDB使用foreach ()在Scala中,Python

streamingDF.writeStream.foreach ()允許您編寫的輸出流查詢到任意位置。

使用Python

這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Python中給DynamoDB寫信。第一步得到DynamoDB寶途資源。這個例子是用寫的access_keysecret_key,但是磚建議你使用S3訪問配置實例配置文件

  1. 定義一些輔助方法來創建DynamoDB表運行的例子。

    table_name=“PythonForeachTest”defget_dynamodb():進口boto3access_key=“<訪問密鑰>”secret_key=“<秘密密鑰>”地區=“<地區名稱>”返回boto3資源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地區)defcreateTableIfNotExists():“‘創建一個DynamoDB表如果它不存在。這一定是火花驅動程序上運行,而不是在foreach。“‘dynamodb=get_dynamodb()existing_tables=dynamodb客戶端list_tables()(的表名]如果table_nameexisting_tables:打印(“創建表% s%table_name)=dynamodbcreate_table(的表=table_name,KeySchema=({“AttributeName”:“關鍵”,“KeyType”:“希”}),AttributeDefinitions=({“AttributeName”:“關鍵”,“AttributeType”:“年代”}),ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待表做好準備”)客戶端get_waiter(“table_exists”)等待(的表=table_name)
  2. 定義的類和方法,寫入DynamoDB並調用它們foreach。有兩種方法來指定您的自定義邏輯foreach

    • 使用一個函數:這是最簡單的方法,可以用來寫一行。然而,客戶端/連接初始化寫一行將在每次調用完成。

      defsendToDynamoDB_simple():“‘DynamoDB函數發送一行。當使用foreach,調用這個方法是遺囑執行人與生成的輸出行。“‘#創建客戶端對象的執行者,#不使用客戶端在驅動程序中創建的對象dynamodb=get_dynamodb()dynamodb(table_name)put_item(={“關鍵”:str((“關鍵”]),“數”:(“數”]})
    • 使用一個類開放,過程,關閉方法:這允許一個更高效的實現,客戶端/連接初始化,可以編寫多行。

      SendToDynamoDB_ForeachWriter:“‘類發送一組行DynamoDB。當使用foreach,這個類的副本將被用來寫字多行執行人。看到“DataStreamWriter.foreach”的python文檔為更多的細節。“‘def開放(自我,partition_id,epoch_id):#這叫做第一次當準備發送多個行。#把所有的初始化代碼在open(),以便新鮮#複製這個類的初始化在開放的執行人()#將調用。自我dynamodb=get_dynamodb()返回真正的def過程(自我,):#這是要求每一行()被稱為後開放。#這個實現發送一次一行。#為進一步增強,接觸火花+ DynamoDB連接器#團隊:https://github.com/audienceproject/spark-dynamodb自我dynamodb(table_name)put_item(={“關鍵”:str((“關鍵”]),“數”:(“數”]})def關閉(自我,犯錯):#這叫做畢竟行處理。如果犯錯:提高犯錯
  3. 調用foreach在你流查詢使用上麵的函數或對象。

    pyspark.sql.functions進口*火花相依(“spark.sql.shuffle.partitions”,“1”)查詢=(火花readStream格式(“速度”)負載()selectExpr(“值% 10鍵”)groupBy(“關鍵”)()toDF(“關鍵”,“數”)writeStreamforeach(SendToDynamoDB_ForeachWriter())# .foreach (sendToDynamoDB_simple) / /選擇,使用一個或另一個outputMode(“更新”)開始())

使用Scala

這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Scala中給DynamoDB寫信。

這個你需要創建一個運行DynamoDB表都有一個單獨的字符串鍵命名為“價值”。

  1. 定義的實現ForeachWriter接口執行寫。

    進口orgapache火花sql。{ForeachWriter,}進口comamazonawsAmazonServiceException進口comamazonaws身份驗證_進口comamazonaws服務dynamodbv2AmazonDynamoDB進口comamazonaws服務dynamodbv2AmazonDynamoDBClientBuilder進口comamazonaws服務dynamodbv2模型AttributeValue進口comamazonaws服務dynamodbv2模型ResourceNotFoundException進口java跑龍套ArrayList進口scala集合JavaConverters_DynamoDbWriter擴展ForeachWriter(]{私人瓦爾的表=“<表名稱>”私人瓦爾accessKey=“< aws訪問密鑰>”私人瓦爾secretKey=“< aws密鑰>”私人瓦爾regionName=“<地區>”/ /這個懶洋洋地將隻有當初始化()開放懶惰的瓦爾ddb=AmazonDynamoDBClientBuilder標準()withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(accessKey,secretKey)))withRegion(regionName)構建()/ // /這叫做第一次當準備發送多個行。/ /把所有的初始化代碼內部開放的(),這樣一個新鮮的/ /複製這個類的初始化在開放的執行人()/ /將被調用。/ /def開放(partitionId:,epochId:)={ddb/ /客戶端的初始化真正的}/ // /這是要求每一行()被稱為後開放。/ /這個實現發送一次一行。/ /一個更有效的實現可以發送一次批次的行。/ /def過程(:)={瓦爾rowAsMap=getValuesMap(模式字段名)瓦爾dynamoItem=rowAsMapmapValues{v:任何= >AttributeValue(vtoString)}。asJavaddbputItem(的表,dynamoItem)}/ // /這叫做畢竟行處理。/ /def關閉(errorOrNull:Throwable)={ddb關閉()}}
  2. 使用DynamoDbWriter寫速度流進DynamoDB。

    火花readStream格式(“速度”)負載()選擇(“價值”)writeStreamforeach(DynamoDbWriter)開始()

亞馬遜CloudTrail ETL

以下筆記本顯示您可以很容易地改變你的亞馬遜CloudTrail日誌從JSON拚花特別的高效查詢。看到實時流ETL和結構化流獲取詳細信息。

ETL亞馬遜CloudTrail日誌使用結構化流Python筆記本

ETL亞馬遜CloudTrail日誌使用結構化流Scala筆記本

Stream-Stream連接

這兩個筆記本展示如何在Python和Scala使用stream-stream連接。

Stream-Stream加入Python筆記本

Scala Stream-Stream連接筆記本電腦