安裝和編譯Cython

學習如何使用Databricks安裝和編譯Cython。

寫的亞當Pavlacka

最後發布時間:2022年5月19日

本文檔解釋了如何用編譯的Cython代碼運行Spark代碼。具體步驟如下:

  1. 在DBFS上創建一個示例Cython模塊(AWS|Azure).
  2. 將文件添加到Spark會話中。
  3. 創建包裝器方法以在執行程序上加載模塊。
  4. 在示例數據集上運行映射程序。
  5. 生成更大的數據集,並將性能與原生Python示例進行比較。
刪除

信息

默認情況下,路徑使用dbfs: /如果沒有引用協議。

寫一個示例cython模塊到/example/cython/fib。pyx DBFS。dbutils.fs.put(“/ / cython /無傷大雅的例子。pyx", """ def fib_mapper_cython(n): " '返回第一個fibonnaci數> n。'' cdef int a = 0 cdef int b = 1 cdef int j = int(n) while b
         

添加Cython源文件到Spark

要使Cython源文件在整個集群中可用,我們將使用sc.addPyFile將這些文件添加到Spark中。例如,

% python sc.addPyFile(“dbfs: / / cython / fib.pyx例子”)

在驅動程序節點上測試Cython編譯

這段代碼將首先在驅動程序節點上測試編譯。

%python import pyximport import OS pyximport.install() import fib . import

定義wapper函數來編譯和導入模塊

打印語句將在執行器節點上執行。您可以查看stdout日誌消息,以跟蹤模塊的進度。

%python import sys, os, shutil, cython def spark_cython(模塊,方法):def wrapped(*args, **kwargs): print 'Entered function with: %s' % args global cython_function_ try: return cython_function_(*args, **kwargs) except: import pyximport pyximport.install() print ' cython_function_(*args, **kwargs)編譯完成' cython_function_ = getattr(__import__(模塊),方法)print '定義的函數:%s' % cython_function_(*args, **kwargs)返回包裝

運行Cython示例

下麵的代碼片段在幾個數據點上運行fibonacci示例。

使用CSV讀取器生成Spark DataFrame。從DataFrames回滾到rdd,並從GenericRowObject中獲取單個元素= spark.read.csv("/example/cython_input/").rdd。Map (lambda y: y.__getitem__(0)) mapper = spark_cython('fib', 'fib_mapper_cython') fib_frequency = lines.map(mapper)。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

性能比較

下麵我們將測試兩種實現之間的速度差異。我們將使用spark.range ()api在50個Spark分區中生成從10,000到100,000,000的數據點。我們將把這個輸出作為CSV寫入DBFS。

對於此測試,禁用自動縮放(AWS|Azure),以確保集群中有固定數量的Spark執行程序。

python dbutils.fs %。rm(“/ tmp / cython_input /”,真的)火花。Range (10000, 100000000, 1,50).write.csv("/tmp/cython_input/")

正常PySpark代碼

%python def fib_mapper_python(n): a = 0 b = 1 print "Trying: %s" % n while b < int(n): a, b = b, a+b return (b, 1) print fib_mapper_python(2000) lines = spark.read.csv("/tmp/cython_input/").rdd. print "Trying: %s" % n while b < int(n): a, b = b, a+b return (b, 1)映射(lambda y: y.__getitem__(0)) fib_frequency = lines。地圖(λx: fib_mapper_python (x))。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

測試Cython代碼

現在測試編譯後的Cython代碼。

%python lines = spark.read.csv("/tmp/cython_input/").rdd. csv("/tmp/cython_input/")。Map (lambda y: y.__getitem__(0)) mapper = spark_cython('fib', 'fib_mapper_cython') fib_frequency = lines.map(mapper)。reduceByKey(lambda a, b: a+b).collect()打印fib_frequency

我們生成的測試數據集有50個Spark分區,它創建了50個csv文件,如下所示。您可以使用dbutils.fs.ls(“/ tmp / cython_input /”)