開始
用戶指南
管理指南
參考指南
資源
2022年9月20日更新
給我們反饋
這包含了在Databricks上使用結構化流的常用模式的筆記和代碼示例。
這兩本筆記展示了如何使用DataFrame API在Python和Scala中構建結構化流應用程序。
在新選項卡中打開筆記本
foreachBatch ()
streamingDF.writeStream.foreachBatch ()允許您重用現有的批處理數據寫入器,將流查詢的輸出寫入Cassandra。下麵的筆記本演示了這一點,它使用Scala中的Spark Cassandra連接器將聚合查詢的鍵值輸出寫入Cassandra。看到foreachBatch文檔獲取詳細信息。
streamingDF.writeStream.foreachBatch ()
要運行此示例,您需要安裝適當的卡桑德拉火花連接器為您的Spark版本Maven庫.
在本例中,我們創建了一個表,然後啟動結構化流查詢來寫入該表。然後,我們使用foreachBatch ()使用批處理DataFrame連接器寫入流輸出。
進口org.apache.火花.sql._進口org.apache.火花.sql.卡珊德拉._進口com.datastax.火花.連接器.cql.CassandraConnectorConf進口com.datastax.火花.連接器.抽樣.ReadConf進口com.datastax.火花.連接器._瓦爾宿主=“< ip地址>”瓦爾clusterName=“<集群名稱>”瓦爾用於=“<用於>”瓦爾的表=“<表名>”火花.setCassandraConf(clusterName,CassandraConnectorConf.ConnectionHostParam.選項(宿主))火花.readStream.格式(“速度”).負載().selectExpr("值% 10為鍵").groupBy(“關鍵”).數().toDF(“關鍵”,“價值”).writeStream.foreachBatch{(batchDF:DataFrame,batchId:長)= >batchDF.寫//使用Cassandra批處理數據源寫入流.cassandraFormat(的表,用於).選項(“集群”,clusterName).模式(“添加”).保存()}.outputMode(“更新”).開始()
streamingDF.writeStream.foreachBatch ()允許您重用現有的批處理數據寫入器,將流查詢的輸出寫入Azure Synapse Analytics。看到foreachBatch文檔獲取詳細信息。
要運行這個示例,您需要Azure Synapse Analytics連接器。有關Azure Synapse Analytics連接器的詳細信息,請參見Azure突觸分析.
從pyspark.sql.functions進口*從pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df.寫\.格式(“com.databricks.spark.sqldw”)\.模式(“覆蓋”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“forward_spark_azure_storage_credentials”,“真正的”)\.選項(“數據表”,“my_table_in_dw_copy”)\.選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.保存()火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花.readStream.格式(“速度”).負載().selectExpr("值% 10為鍵").groupBy(“關鍵”).數().toDF(“關鍵”,“數”).writeStream.foreachBatch(writeToSQLWarehouse).outputMode(“更新”).開始())
foreach ()
streamingDF.writeStream.foreach ()允許您將流查詢的輸出寫入任意位置。
streamingDF.writeStream.foreach ()
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()用Python寫到DynamoDB。第一步獲取DynamoDB boto資源。編寫此示例是為了使用access_key和secret_key,但Databricks建議您使用使用實例配置文件安全訪問S3桶.
streamingDataFrame.writeStream.foreach ()
access_key
secret_key
定義一些幫助器方法來創建用於運行示例的DynamoDB表。
table_name=“PythonForeachTest”defget_dynamodb():進口boto3access_key=“< >訪問關鍵”secret_key=“<密鑰>”地區=“<地區名稱>”返回boto3.資源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地區)defcreateTableIfNotExists():“‘如果DynamoDB表不存在,則創建它。這必須在Spark驅動程序上運行,而不是在foreach內部。“‘dynamodb=get_dynamodb()existing_tables=dynamodb.元.客戶端.list_tables() (的表名]如果table_name不在existing_tables:打印(“創建表% s"%table_name)表格=dynamodb.create_table(的表=table_name,KeySchema=[{“AttributeName”:“關鍵”,“KeyType”:“希”}),AttributeDefinitions=[{“AttributeName”:“關鍵”,“AttributeType”:“年代”}),ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待餐桌準備好”)表格.元.客戶端.get_waiter(“table_exists”).等待(的表=table_name)
定義寫入DynamoDB並調用它們的類和方法foreach.中指定自定義邏輯有兩種方法foreach.
foreach
使用函數:這是一種簡單的方法,可用於每次寫入一行。但是,每次調用都會初始化客戶端/連接來寫入一行。
defsendToDynamoDB_simple(行):“‘函數將一行發送到DynamoDB。當與' foreach '一起使用時,這個方法將在執行器中被調用使用生成的輸出行。“‘#在執行器中創建客戶端對象#不要使用在驅動中創建的客戶端對象dynamodb=get_dynamodb()dynamodb.表格(table_name).put_item(項={“關鍵”:str(行[“關鍵”]),“數”:行[“數”]})
使用帶有開放,過程,關閉方法:這允許更有效的實現,其中客戶端/連接被初始化並可以寫入多行。
開放
過程
關閉
類SendToDynamoDB_ForeachWriter:“‘類向DynamoDB發送一組行。當與' foreach '一起使用時,這個類的副本將被用於寫入執行器中的多行。參見' DataStreamWriter.foreach '的python文檔為更多的細節。“‘def開放(自我,partition_id,epoch_id):#在準備發送多行時首先調用。#將所有初始化代碼放在open()中,這樣就可以創建一個fresh在執行器中初始化這個類的副本#將被調用。自我.dynamodb=get_dynamodb()返回真正的def過程(自我,行):在open()被調用之後,每一行都會調用這個函數。這個實現一次發送一行。#要進一步增強,請聯係Spark+DynamoDB連接器#團隊:https://github.com/audienceproject/spark-dynamodb自我.dynamodb.表格(table_name).put_item(項={“關鍵”:str(行[“關鍵”]),“數”:行[“數”]})def關閉(自我,犯錯):在處理完所有行之後調用。如果犯錯:提高犯錯
調用foreach在使用上述函數或對象的流查詢中。
從pyspark.sql.functions進口*火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花.readStream.格式(“速度”).負載().selectExpr("值% 10為鍵").groupBy(“關鍵”).數().toDF(“關鍵”,“數”).writeStream.foreach(SendToDynamoDB_ForeachWriter())#.foreach(sendToDynamoDB_simple) //選擇一個或另一個.outputMode(“更新”).開始())
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Scala中寫入DynamoDB。
要運行這個程序,您必須創建一個DynamoDB表,它隻有一個名為“value”的字符串鍵。
類的實現ForeachWriter執行寫操作的接口。
ForeachWriter
進口org.apache.火花.sql.{ForeachWriter,行}進口com.amazonaws.AmazonServiceException進口com.amazonaws.身份驗證._進口com.amazonaws.服務.dynamodbv2.AmazonDynamoDB進口com.amazonaws.服務.dynamodbv2.AmazonDynamoDBClientBuilder進口com.amazonaws.服務.dynamodbv2.模型.AttributeValue進口com.amazonaws.服務.dynamodbv2.模型.ResourceNotFoundException進口java.跑龍套.ArrayList進口scala.集合.JavaConverters._類DynamoDbWriter擴展ForeachWriter[行]{私人瓦爾的表=“<表名>”私人瓦爾accessKey=aws訪問密鑰“< >”私人瓦爾secretKey=“< > aws密鑰”私人瓦爾regionName=地區“< >”//隻有在調用open()時才會惰性地初始化懶惰的瓦爾ddb=AmazonDynamoDBClientBuilder.標準().withCredentials(新AWSStaticCredentialsProvider(新BasicAWSCredentials(accessKey,secretKey))).withRegion(regionName).構建()////在準備發送多行時首先調用。//把所有的初始化代碼放在open()中,這樣一個fresh//在執行器中初始化這個類的副本//將被調用。//def開放(partitionId:長,epochId:長)={ddb//強製客戶端初始化真正的}////在open()被調用之後,每一行都調用這個函數。//此實現每次發送一行。//一個更有效的實現是一次發送多個行。//def過程(行:行)={瓦爾rowAsMap=行.getValuesMap(行.模式.字段名)瓦爾dynamoItem=rowAsMap.mapValues{v:任何= >新AttributeValue(v.toString)}。asJavaddb.putItem(的表,dynamoItem)}////在處理完所有行之後調用。//def關閉(errorOrNull:Throwable)={ddb.關閉()}}
使用DynamoDbWriter將速率流寫入DynamoDB。
DynamoDbWriter
火花.readStream.格式(“速度”).負載().選擇(“價值”).writeStream.foreach(新DynamoDbWriter).開始()
下麵的筆記本展示了如何輕鬆地將Amazon CloudTrail日誌從JSON轉換為Parquet,以實現高效的特別查詢。看到具有結構化流的實時流ETL獲取詳細信息。
這兩本筆記展示了如何在Python和Scala中使用流-流連接。