磚FeatureStoreClient
-
類
databricks.feature_store.client。
FeatureStoreClient
( feature_store_uri:可選(str) =沒有,model_registry_uri:可選(str) =沒有 ) -
基地:
對象
客戶端與磚交互功能。
-
create_table
( 名稱:str primary_keys:聯盟[str、列表(str)], [pyspark.sql.dataframe df:可選。DataFrame]=沒有一個,*,timestamp_keys: Union[str, List[str], None] = None, partition_columns: Union[str, List[str], None] = None, schema: Optional[pyspark.sql.types.StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs )→databricks.feature_store.entities.feature_table.FeatureTable -
創建並返回一個功能表與給定名稱和主鍵。
返回的功能表有名字和主鍵。使用所提供的
模式
或推斷模式所提供的df
。如果df
提供,此數據將被保存在一個三角洲表。支持的數據類型特點是:IntegerType
,LongType
,FloatType
,倍增式
,StringType
,BooleanType
,DateType
,TimestampType
,ShortType
,ArrayType
,MapType
,BinaryType
,DecimalType
。- 參數
-
的名字——一個表單的功能表名
< database_name >。< table_name >
例如,dev.user_features
。primary_keys——功能表的主鍵。如果需要多個列,例如,指定一個列名列表
[' customer_id ',“地區”)
。df-數據插入到此功能表。的模式
df
將被用作功能表模式。timestamp_keys- - - - - -
包含事件時間相關特性值的列。時間戳鍵和功能表的主鍵唯一地標識一個實體的特征值在一個時間點。
請注意
實驗:這個論點在將來發布的版本中可能會改變或刪除沒有警告。
partition_columns- - - - - -
用於分區功能表列。如果提供了列表,列表中的列排序將用於分區。
模式——功能表模式。要麼
模式
或df
必須提供。描述——功能表的描述。
標簽- - - - - -
標簽與功能表。
請注意
從磚開始運行時可用10.5毫升。
- 其他參數
-
路徑(
可選(str)
)——在一個受支持的文件係統路徑。默認數據庫的位置。
-
register_table
( *,delta_table: str, primary_keys:聯盟[str、列表(str)], timestamp_keys:聯盟(str,列表(str),沒有一個)=沒有描述:可選(str) =沒有標簽:可選[Dict [str, str]] =沒有 )→databricks.feature_store.entities.feature_table.FeatureTable -
注冊一個現有三角洲與給定的表作為一個功能表主鍵。
返回的功能表具有相同的名稱作為三角洲表。
請注意
從磚開始運行時可用10.4毫升。
- 參數
-
的名字-δ表單的表名
< database_name >。< table_name >
例如,dev.user_features
。表必須存在於metastore。primary_keys——三角洲表的主鍵。如果需要多個列,例如,指定一個列名列表
[' customer_id ',“地區”)
。timestamp_keys——列包含事件時間與特征值有關。在一起,功能鍵和主鍵唯一地標識的時間戳值在一個時間點。
描述——功能表的描述。
標簽- - - - - -
標簽與功能表。
請注意
從磚開始運行時可用10.5毫升。
- 返回
-
一個
FeatureTable
對象。
-
get_table
( 名稱:str )→databricks.feature_store.entities.feature_table.FeatureTable -
得到一個功能表的元數據。
- 參數
-
的名字——一個表單的功能表名
< database_name >。< table_name >
例如,dev.user_features
。
-
drop_table
( 名稱:str )→沒有 -
刪除指定的功能表。這個API還滴底層δ表。
請注意
從磚開始運行時可用10.5毫升。
- 參數
-
的名字——表單的功能表名
< database_name >。< table_name >
例如,dev.user_features
。
請注意
刪除功能表可以導致上遊生產商和下遊消費者意想不到的失敗(模型、端點和安排工作)。你必須刪除任何現有的在線商店單獨發表。
-
read_table
( 名稱:str,* * kwargs )→pyspark.sql.dataframe.DataFrame -
閱讀的內容功能表。
- 參數
-
的名字——一個表單的功能表名
< database_name >。< table_name >
例如,dev.user_features
。 - 返回
-
功能表內容,或將拋出一個異常,如果這個特性表不存在。
-
write_table
( 名稱:str,df: pyspark.sql.dataframe.DataFrame,模式:str =“合並”,checkpoint_location:可選(str) =沒有,觸發:Dict (str,任何]= {“processingTime”:“5秒”} )→可選(pyspark.sql.streaming.StreamingQuery) -
寫一個功能表。
如果輸入
DataFrame
流,將創建一個寫流。- 參數
-
的名字——一個表單的功能表名
< database_name >。< table_name >
例如,dev.user_features
。提出了一個異常如果這個特性表不存在。df——火花
DataFrame
特性數據。提出了一個異常,如果模式不匹配的功能表。模式- - - - - -
兩個支持寫模式:
“覆蓋”
更新整個表。“合並”
插入的行嗎df
進入功能表。如果df
包含功能表中的列不存在,這些列將被添加新特性。
checkpoint_location——設置結構化流
checkpointLocation
選擇。通過設置一個checkpoint_location
,火花結構化流將存儲的進展信息和中間狀態,使故障後恢複。這個參數時僅支持論點df
是一個流DataFrame
。觸發——如果
df.isStreaming
,觸發
字典定義了流數據處理的時機,將打開並通過DataStreamWriter.trigger
作為參數。例如,觸發={“一旦”:真正的}
將導致調用嗎DataStreamWriter.trigger(一旦= True)
。
- 返回
-
如果
df.isStreaming
,返回一個PySparkStreamingQuery
。沒有一個
否則。
-
add_data_sources
( *,feature_table_name: str, source_names:聯盟[str、列表(str)], source_type: str =“定製” )→沒有 -
將數據源添加到功能表。
- 參數
-
feature_table_name——功能表名。
source_names——數據源名稱。對於多個源,指定一個列表。如果一個數據源名稱已經存在,它將被忽略。
source_type- - - - - -
下列之一:
“表”
:表中格式< database_name >。< table_name >和存儲在metastore(如蜂巢)。“路徑”
:磚的路徑,如文件係統(DBFS)。“自定義”
:手動添加數據源,一張桌子和一個路徑。
-
delete_data_sources
( *,feature_table_name: str, source_names:聯盟(str、列表(str)) )→沒有 -
從功能表刪除數據源。
請注意
所有類型的數據源(表、路徑、自定義)匹配源名稱將被刪除。
- 參數
-
feature_table_name——功能表名。
source_names——數據源名稱。對於多個源,指定一個列表。如果一個數據源名稱不存在,它將被忽略。
-
publish_table
( 名稱:str, online_store: databricks.feature_store.online_store_spec.online_store_spec。*,OnlineStoreSpec filter_condition:可選(str) = None,模式:str =“合並”,流:bool = False, checkpoint_location:可選(str) = None,觸發:Dict [str,任何]= {“processingTime”:“5分鍾”},特點:聯盟(str,列表(str),沒有一個)=沒有 )→可選(pyspark.sql.streaming.StreamingQuery) -
發布功能表在線商店。
- 參數
-
的名字——功能表的名稱。
online_store——規範的在線商店。
filter_condition——一個SQL表達式使用功能表列,過濾器功能行之前發布的在線商店。例如,
“dt>“2020-09-10”
。這類似於跑步df.filter
或者一個在哪裏
條件在SQL特性表之前出版。模式- - - - - -
指定的行為當數據已經存在於這個特性表在網上商店。如果
“覆蓋”
使用模式,現有的數據被替換的新數據。如果“合並”
使用模式,新的數據將被合並,在這些條件下:如果存在一個關鍵在線表而不是離線表,行在線表修改的。
離線表中是否存在一個重要但不是在線表,在線離線表行插入到表。
如果存在一個關鍵的離線和在線表,在線表行將被更新。
流媒體——如果
真正的
流數據的在線商店。checkpoint_location——設置結構化流
checkpointLocation
選擇。通過設置一個checkpoint_location
,火花結構化流將存儲的進展信息和中間狀態,使故障後恢複。這個參數時僅支持流= True
。觸發——如果
流= True
,觸發
定義了流數據處理的時機。字典將打開並通過DataStreamWriter.trigger
作為參數。例如,觸發={“一旦”:真正的}
將導致調用嗎DataStreamWriter.trigger(一旦= True)
。特性- - - - - -
指定特性列(s)發布到網上商店。所選特征必須現有在線商店功能的超集。主鍵列和時間戳鍵列總是會出版。
請注意
這個參數時僅支持
模式=“合並”
。當特性
沒有設置,整個功能表將發表。
- 返回
-
如果
流= True
,返回一個PySparkStreamingQuery
,沒有一個
否則。
-
create_training_set
( df: pyspark.sql.dataframe.DataFrame,feature_lookups: List[databricks.feature_store.entities.feature_lookup.FeatureLookup], label: Union[str, List[str], None], exclude_columns: List[str] = [] )→databricks.feature_store.training_set.TrainingSet -
創建一個
TrainingSet
。- 參數
-
df- - -
DataFrame
用於連接特性。feature_lookups——功能加入到列表
DataFrame
。標簽- - - - -列(s)的名字
DataFrame
包含訓練集的標簽。沒有標號字段創建一個訓練集,即為無監督訓練集,指定標簽=沒有。exclude_columns——名稱的列下降
TrainingSet
DataFrame
。
- 返回
-
一個
TrainingSet
對象。
-
log_model
( 模型:任何,artifact_path: str,*,味道:模塊,training_set: databricks.feature_store.training_set.TrainingSet,registered_model_name:可選(str) =沒有,await_registration_for: int = mlflow.tracking._model_registry.DEFAULT_AWAIT_MAX_SLEEP_SECONDS,* * kwargs ) -
日誌MLflow模型打包功能查找信息。
請注意
的
DataFrame
返回的TrainingSet.load_df ()
必須被用來訓練模型。如果它已經被修改(例如數據歸一化,添加一個列,和類似的),這些修改將不會應用在推理時,導致training-serving傾斜。- 參數
-
模型——模型得救。這個模型必須能夠被拯救了
flavor.save_model
。看到MLflow模型API。artifact_path——Run-relative工件路徑。
味道MLflow模塊用於記錄模型。
味道
應該有類型ModuleType
。該模塊必須有一個方法save_model
,必須支持python_function
味道。例如,mlflow.sklearn
,mlflow.xgboost
,以及類似的。training_set- - -
TrainingSet
用來訓練這個模型。registered_model_name- - - - - -
請注意
實驗:這個論點在將來發布的版本中可能會改變或刪除沒有警告。
如果有,創建一個模型版本
registered_model_name
,也創建一個模型如果一個注冊名字是不存在的。await_registration_for——等待的秒數模型版本完成創建,
準備好了
的地位。默認情況下,函數等待五分鍾。指定0
或沒有一個
跳過等待。
- 返回
-
score_batch
( model_uri: str,df: pyspark.sql.dataframe.DataFrame,result_type: str =“雙” )→pyspark.sql.dataframe.DataFrame -
評估模型
DataFrame
。模型評價所需的附加功能會自動檢索
功能商店
。模型必須被記錄
FeatureStoreClient.log_model ()
,包元數據模型與特征。除非出現在df
,這些特性將會抬頭功能商店
和與df
前評分模型。如果一個特性是包含在
df
將使用提供的特性值,而不是那些存儲在功能商店
。例如,如果一個模型訓練的兩個特點
account_creation_date
和num_lifetime_purchases
,如:feature_lookups=(FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“account_creation_date”,lookup_key=“customer_id”,),FeatureLookup(table_name=“trust_and_safety.customer_features”,feature_name=“num_lifetime_purchases”,lookup_key=“customer_id”),]與mlflow。start_run():training_set=fs。create_training_set(df,feature_lookups=feature_lookups,標簽=“is_banned”,exclude_columns=(“customer_id”])…fs。log_model(模型,“模型”,味道=mlflow。sklearn,training_set=training_set,registered_model_name=“example_model”)
然後在推理時,調用者的
FeatureStoreClient.score_batch ()
必須通過一個DataFrame
包括customer_id
,lookup_key
中指定的FeatureLookups
的training_set
。如果DataFrame
包含一個列account_creation_date
,這一列的值將用於代替的功能商店
。如:# batch_df已列(“customer_id”、“account_creation_date”)預測=fs。score_batch(“模型:/ example_model / 1”,batch_df)
- 參數
-
model_uri- - - - - -
的位置,URI的格式,MLflow模型的使用記錄
FeatureStoreClient.log_model ()
。之一::/ < mlflow_run_id > / run-relative /道路/ /模型
模型:/ < model_name > / < model_version >
模型:/ < model_name > / <階段>
URI模式的更多信息,請參閱引用工件。
df- - - - - -
的
DataFrame
評分模型。功能商店
將與特性df
前評分模型。df
必須:1。包含查找鍵列必須加入特征的數據特征
商店中指定
feature_spec.yaml
工件。2。包含所有源鍵列必須得分模型,作為中指定
的
feature_spec.yaml
工件。3所示。不包含一個列
預測
,這是預留給模型的預測。df
可能包含附加列。result_type——模型的返回類型。看到
mlflow.pyfunc.spark_udf ()
result_type。
- 返回
-
一個
DataFrame
包含:所有列的
df
。所有特征值從特征檢索存儲。
一個列
預測
包含的輸出模型。
-