在磚結構化流模式

這包含常見的筆記本和代碼示例模式處理結構化流磚。

開始使用結構化的流

如果你是全新的結構化流,明白了第一個結構化流負載運行

寫卡珊德拉在Python作為結構化流水槽

Apache Cassandra是一個分布式的、低延遲、可伸縮的高度可用的OLTP數據庫。

結構化流與卡桑德拉通過火花卡桑德拉連接器。這個連接器支持抽樣和DataFrame api,它有原生支持編寫流數據。*重要*您必須使用相應的版本的spark-cassandra-connector-assembly

下麵的示例連接到一個或多個主機在卡桑德拉數據庫集群。它還指定了連接配置如檢查點位置和具體用於和表名:

火花相依(“spark.cassandra.connection.host”,“host1 host2”)dfwriteStream\格式(“org.apache.spark.sql.cassandra”)\outputMode(“添加”)\選項(“checkpointLocation”,“/道路/ /檢查站”)\選項(“用於”,“keyspace_name”)\選項(“表”,“table_name”)\開始()

寫Azure突觸分析使用foreachBatch ()在Python中

streamingDF.writeStream.foreachBatch ()允許重用現有的一批作家寫的輸出數據流查詢Azure突觸分析。看到foreachBatch文檔獲取詳細信息。

要運行這個示例,您需要Azure突觸分析連接器。在Azure突觸分析連接器的詳細信息,請參見在Azure突觸分析查詢數據

pyspark.sql.functions進口*pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df\格式(“com.databricks.spark.sqldw”)\模式(“覆蓋”)\選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\選項(“forward_spark_azure_storage_credentials”,“真正的”)\選項(“數據表”,“my_table_in_dw_copy”)\選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\保存()火花相依(“spark.sql.shuffle.partitions”,“1”)查詢=(火花readStream格式(“速度”)負載()selectExpr(“值% 10鍵”)groupBy(“關鍵”)()toDF(“關鍵”,“數”)writeStreamforeachBatch(writeToSQLWarehouse)outputMode(“更新”)開始())

Stream-Stream連接

這兩個筆記本展示如何在Python和Scala使用stream-stream連接。

Stream-Stream加入Python筆記本

在新標簽頁打開筆記本

Scala Stream-Stream連接筆記本電腦

在新標簽頁打開筆記本