安裝和編譯Cython

學習如何使用Databricks安裝和編譯Cython。

寫的亞當Pavlacka

最後發布日期:2022年5月19日

本文檔解釋了如何使用已編譯的Cython代碼運行Spark代碼。具體步驟如下:

  1. 在DBFS上創建示例Cython模塊(AWS|Azure).
  2. 將文件添加到Spark會話。
  3. 創建包裝器方法以在執行器上加載模塊。
  4. 在示例數據集上運行映射器。
  5. 生成一個更大的數據集,並將性能與本機Python示例進行比較。
刪除

信息

缺省情況下,路徑使用dbfs: /如果沒有引用協議。

寫一個cython模塊示例到/example/cython/fib目錄。DBFS中的pyx。dbutils.fs.put(“/ / cython /無傷大雅的例子。pyx", """ def fib_mapper_cython(n): " '返回第一個fibonnaci數字> n。" ' cdef int a = 0 cdef int b = 1 cdef int j = int(n) while b
         

將Cython源文件添加到Spark

為了使Cython源文件在整個集群中可用,我們將使用sc.addPyFile將這些文件添加到Spark。例如,

% python sc.addPyFile(“dbfs: / / cython / fib.pyx例子”)

在驅動程序節點上測試Cython編譯

這段代碼將首先在驅動程序節點上測試編譯。

%python import pyximport import OS pyximport.install() import fib

定義wapper函數來編譯和導入模塊

打印語句將在執行器節點上執行。您可以查看stdout日誌消息以跟蹤模塊的進度。

%python import sys, os, shutil, cython def spark_cython(模塊,方法):def wrapped(*args, **kwargs): print '輸入函數帶有:%s' % args全局cython_function_ try:返回cython_function_(*args, **kwargs) except: import pyximport pyximport.install() print ' cython_function_ = getattr(__import__(模塊),方法)print '已定義函數:%s' % cython_function_返回cython_function_(*args, **kwargs)返回wrapped

運行Cython示例

下麵的代碼段在幾個數據點上運行fibonacci示例。

使用CSV閱讀器生成一個Spark DataFrame。從DataFrames回滾到rdd,並從GenericRowObject中獲取單個元素lines = spark.read.csv("/example/cython_input/").rdd。Map (lambda y: y.__getitem__(0)) mapper = spark_cython('fib', 'fib_mapper_cython') fib_frequency = lines.map(mapper)。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

性能比較

下麵我們將測試兩種實現之間的速度差異。我們將使用spark.range ()api使用50個Spark分區生成從10,000到100,000,000的數據點。我們將把這個輸出作為CSV寫入DBFS。

對於此測試,禁用自動縮放(AWS|Azure),以確保集群有固定數量的Spark執行器。

python dbutils.fs %。rm("/tmp/cython_input/", True)Range (10000, 100000000, 1,50).write.csv("/tmp/cython_input/")

普通PySpark代碼

%python def fib_mapper_python(n): a = 0 b = 1 print "Trying: %s" % n while b < int(n): a, b = b, a+b return (b, 1) print fib_mapper_python(2000) lines = spark.read.csv("/tmp/cython_input/").rdd。Map (lambda y: y.__getitem__(0)) fib_frequency = lines。Map (lambda x: fib_mapper_python(x))。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

測試Cython代碼

現在測試已編譯的Cython代碼。

%python lines = spark.read.csv("/tmp/cython_input/").rdd. csv。Map (lambda y: y.__getitem__(0)) mapper = spark_cython('fib', 'fib_mapper_cython') fib_frequency = lines.map(mapper)。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

我們生成的測試數據集有50個Spark分區,它創建了50個csv文件,如下所示。您可以使用dbutils.fs.ls(“/ tmp / cython_input /”)