從pyspark。sql進口SparkSession scala_version = ' 2.12 ' spark_version = = [f 'org.apache 3.3.0的包。火花:spark-sql-kafka-0-10_ {scala_version}: {spark_version}”、“org.apache.kafka: kafka-clients: 3.2.1 '] = SparkSession火花。構建器\部分(“當地”)\ .appName (kafka-spark) \ config (“spark.jars。包”、“,”. join(包))\ config (“spark.dynamicAllocation。啟用”、“假”)\ config (“spark.streaming.kafka。maxRatePerPartition", 100)\ .config("spark.sql.shuffle.partitions", 2)\ .getOrCreate() kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", BROKER) \ .option("failOnDataLoss", "false") \ .option("subscribe", TOPIC) \ .option("includeHeaders", "true") \ .option("spark.streaming.kafka.maxRatePerPartition", 100)\ .load() display(kafka_df)
一個重要的參數是spark.streaming.kafka.maxRatePerPartition最大速度(每秒消息),每個卡夫卡分區將閱讀。
代碼片段包括兩種方式配置maxRatePerPartition參數,但不幸的是,沒有人工作。
有一個合適的方法來配置限製嗎?(限製引發閱讀)