嗨,團隊,
我的python dataframe如下。
原始數據是相當長的係列大約5000的數字。我的要求是經過RawData列中的每一行並計算2個指標。我已經創建了一個函數在Python中,效果絕對好。
Python函數
def calculate_metrics (value_dict):
df = value_dict.copy ()
df [' Metric1 '] = pd.Series (dtype =“浮動”)
df [' Metric2 '] = pd.Series (dtype =“浮動”)
指數,行value_dict.iterrows ():
df。loc (df (“Id”) = =行(“Id”), ' Metric1 '] = Function1(行[' RawData '])
df。loc (df (“Id”) = =行(“Id”), ' Metric2 '] = Function2(行[' RawData '])
返回df
我通過一個數據幀value_dict與“標識符和RawData”兩列。
我叫如下
value_dict [' RawData '] = value_dict [' RawData ']。應用(λx: np.array (x))
df_Fullagg = calculate_metrics (value_dict)
這個計算所有我需要的指標dataframe並返回。
這裏的羊皮紙書卷的數據是相當高的。我想用火花azure的突觸框架在這裏工作。我怎麼能使用pandas_Udf寫相同的功能。
我在找一些這樣的實現。
@pandas_udf (int, PandasUDFType.SCALAR)
def calculate_metrics (value_dict):
df = value_dict.copy ()
df [' Metric1 '] = pd.Series (dtype =“浮動”)
df [' Metric2 '] = pd.Series (dtype =“浮動”)
指數,行value_dict.iterrows ():
df。loc (df (“Id”) = =行(“Id”), ' Metric1 '] = Function1(行[' RawData '])
df。loc (df (“Id”) = =行(“Id”), ' Metric2 '] = Function2(行[' RawData '])
返回df
任何幫助將非常感激。
你好,看來你不需要一個熊貓udf。試試以下:
進口numpy pyspark.sql np。從pyspark進口FloatType類型。sql導入函數f數據=[{“標識符”:123年,“RawData”:“1、2、4、2、34歲,6,7,8”},{“標識符”:456年,“RawData”:“4、5、7、8、9、3、4、7、8”}] df = spark.createDataFrame series_mean = f(數據)。udf(λx:浮動(np.mean (x)), FloatType()) #取代Metric1邏輯series_max = f。udf(λx:浮動(np.max (x)), FloatType()) #取代Metric2邏輯df = (df .withColumn (“series_int f.split (f.col (RawData) ', ') .cast(數組< int >)) .withColumn(“的意思是”,series_mean (“series_int”)) .withColumn (“max”, series_max (“series_int”)))顯示(df)