開始
用戶指南
管理指南
參考指南
資源
2022年12月22日更新
給我們反饋
它包含筆記本和代碼示例,用於在數據庫裏使用結構化流的常見模式。
這兩本筆記本展示了如何使用DataFrame API在Python和Scala中構建結構化流應用程序。
在新標簽頁打開筆記本
Apache Cassandra是一個分布式、低延遲、可伸縮、高可用的OLTP數據庫。
結構化流與Cassandra通過Spark Cassandra連接器.該連接器同時支持RDD和DataFrame api,並且具有寫入流數據的本機支持。*重要的對應版本spark-cassandra-connector-assembly.
下麵的示例連接到Cassandra數據庫集群中的一個或多個主機。它還指定連接配置,如檢查點位置和特定的鍵空間和表名:
火花.相依.集(“spark.cassandra.connection.host”,“host1 host2”)df.writeStream\.格式(“org.apache.spark.sql.cassandra”)\.outputMode(“添加”)\.選項(“checkpointLocation”,“/道路/ /檢查站”)\.選項(“用於”,“keyspace_name”)\.選項(“表”,“table_name”)\.開始()
foreachBatch ()
streamingDF.writeStream.foreachBatch ()允許您重用現有的批處理數據寫入器,將流查詢的輸出寫入Azure Synapse Analytics。看到foreachBatch文檔獲取詳細信息。
streamingDF.writeStream.foreachBatch ()
要運行此示例,您需要Azure Synapse Analytics連接器。有關Azure Synapse Analytics連接器的詳細信息,請參見在Azure Synapse Analytics中查詢數據.
從pyspark.sql.functions進口*從pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df.寫\.格式(“com.databricks.spark.sqldw”)\.模式(“覆蓋”)\.選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\.選項(“forward_spark_azure_storage_credentials”,“真正的”)\.選項(“數據表”,“my_table_in_dw_copy”)\.選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name > .blob.core.windows.net/ < your-directory-name >”)\.保存()火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花.readStream.格式(“速度”).負載().selectExpr("值% 10為鍵").groupBy(“關鍵”).數().toDF(“關鍵”,“數”).writeStream.foreachBatch(writeToSQLWarehouse).outputMode(“更新”).開始())
foreach ()
streamingDF.writeStream.foreach ()允許您將流查詢的輸出寫入任意位置。
streamingDF.writeStream.foreach ()
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Python中寫入DynamoDB。第一步獲取DynamoDB boto資源。這個例子是為使用而編寫的access_key而且secret_key,但Databricks建議您使用使用實例概要配置S3訪問.
streamingDataFrame.writeStream.foreach ()
access_key
secret_key
定義幾個輔助方法來創建運行示例的DynamoDB表。
table_name=“PythonForeachTest”defget_dynamodb():進口boto3access_key=“< >訪問關鍵”secret_key=“<密鑰>”地區=“<地區名稱>”返回boto3.資源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地區)defcreateTableIfNotExists():“‘如果DynamoDB表不存在,則創建它。這必須在Spark驅動程序上運行,而不是在foreach內部。“‘dynamodb=get_dynamodb()existing_tables=dynamodb.元.客戶端.list_tables() (的表名]如果table_name不在existing_tables:打印(“創建表% s"%table_name)表格=dynamodb.create_table(的表=table_name,KeySchema=[{“AttributeName”:“關鍵”,“KeyType”:“希”}],AttributeDefinitions=[{“AttributeName”:“關鍵”,“AttributeType”:“年代”}],ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待桌子準備好”)表格.元.客戶端.get_waiter(“table_exists”).等待(的表=table_name)
定義寫入DynamoDB並調用它們的類和方法foreach.有兩種方法來指定自定義邏輯foreach.
foreach
使用函數:這是一種簡單的方法,可以用來一次寫一行。然而,客戶端/連接初始化寫入一行將在每次調用中完成。
defsendToDynamoDB_simple(行):“‘函數將一行發送到DynamoDB。當與' foreach '一起使用時,這個方法將在執行器中被調用使用生成的輸出行。“‘#在執行器中創建客戶端對象#不要使用在驅動程序中創建的客戶端對象dynamodb=get_dynamodb()dynamodb.表格(table_name).put_item(項={“關鍵”:str(行[“關鍵”]),“數”:行[“數”]})
使用類開放,過程,關閉方法:這允許一個更有效的實現,其中一個客戶端/連接被初始化,多行可以被寫入。
開放
過程
關閉
類SendToDynamoDB_ForeachWriter:“‘類將一組行發送到DynamoDB。當與' foreach '一起使用時,這個類的副本將用於寫入執行程序中的多行。參見python文檔中的“DataStreamWriter.foreach”欲知詳情。“‘def開放(自我,partition_id,epoch_id):#在準備發送多行時首先調用。#把所有的初始化代碼放在open()裏麵,這樣就可以這個類的拷貝在執行器中初始化#將被調用。自我.dynamodb=get_dynamodb()返回真正的def過程(自我,行):在open()被調用後的每一行都被調用。#這個實現一次發送一行。#要進一步增強,請聯係Spark+DynamoDB連接器#團隊:https://github.com/audienceproject/spark-dynamodb自我.dynamodb.表格(table_name).put_item(項={“關鍵”:str(行[“關鍵”]),“數”:行[“數”]})def關閉(自我,犯錯):在處理完所有行之後調用。如果犯錯:提高犯錯
調用foreach在您的流查詢與上述函數或對象。
從pyspark.sql.functions進口*火花.相依.集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花.readStream.格式(“速度”).負載().selectExpr("值% 10為鍵").groupBy(“關鍵”).數().toDF(“關鍵”,“數”).writeStream.foreach(SendToDynamoDB_ForeachWriter())#.foreach(sendToDynamoDB_simple) //選項,使用其中一個.outputMode(“更新”).開始())
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()在Scala中寫入DynamoDB。
要運行這個程序,你必須創建一個DynamoDB表,它有一個名為“value”的字符串鍵。
方法的實現ForeachWriter執行寫操作的接口。
ForeachWriter
進口org.apache.火花.sql.{ForeachWriter,行}進口com.amazonaws.AmazonServiceException進口com.amazonaws.身份驗證._進口com.amazonaws.服務.dynamodbv2.AmazonDynamoDB進口com.amazonaws.服務.dynamodbv2.AmazonDynamoDBClientBuilder進口com.amazonaws.服務.dynamodbv2.模型.AttributeValue進口com.amazonaws.服務.dynamodbv2.模型.ResourceNotFoundException進口java.跑龍套.ArrayList進口scala.集合.JavaConverters._類DynamoDbWriter擴展ForeachWriter[行]{私人瓦爾的表=“<表名>”私人瓦爾accessKey=""私人瓦爾secretKey=""私人瓦爾regionName=地區“< >”//隻有在調用open()時才會被惰性地初始化懶惰的瓦爾ddb=AmazonDynamoDBClientBuilder.標準().withCredentials(新AWSStaticCredentialsProvider(新BasicAWSCredentials(accessKey,secretKey))).withRegion(regionName).構建()////在準備發送多行時首先調用。//把所有的初始化代碼放在open()裏麵,這樣就可以//在執行程序中初始化這個類的副本,其中open()//將被調用。//def開放(partitionId:長,epochId:長)={ddb//強製客戶端初始化真正的}////在open()被調用後的每一行都被調用。//每次發送一行。//一個更有效的實現是一次發送多個行。//def過程(行:行)={瓦爾rowAsMap=行.getValuesMap(行.模式.字段名)瓦爾dynamoItem=rowAsMap.mapValues{v:任何= >新AttributeValue(v.toString)}。asJavaddb.putItem(的表,dynamoItem)}////在所有行處理完畢後調用。//def關閉(errorOrNull:Throwable)={ddb.關閉()}}
使用DynamoDbWriter將一個速率流寫入DynamoDB。
DynamoDbWriter
火花.readStream.格式(“速度”).負載().選擇(“價值”).writeStream.foreach(新DynamoDbWriter).開始()
下麵的筆記本演示了如何輕鬆地將Amazon CloudTrail日誌從JSON轉換為Parquet,以便進行高效的特別查詢。看到實時流ETL與結構化流獲取詳細信息。
這兩本筆記本展示了如何在Python和Scala中使用流-流連接。