跳到主要內容
工程的博客

利用真實世界數據檢測高危患者

如何在健康Delta Lake上使用機器學習運行時和MLflow來預測患者疾病
分享這篇文章

隨著低成本基因組測序和人工智能醫學成像的興起,人們對精準醫療產生了濃厚的興趣。在精準醫療領域,我們的目標是利用數據和人工智能為疾病提出最佳治療方案。雖然精準醫療改善了被診斷患有罕見疾病和癌症的患者的預後,但精準醫療是被動的:患者必須生病才能部署精準醫療。

當我們審視醫療保健支出和結果時,我們有巨大的機會通過以下方式來提高醫療成本和生活質量防止慢性疾病,如糖尿病、心髒病或物質使用障礙。在美國,十分之七的死亡和85%的醫療支出是由慢性疾病引起的,發現了類似的趨勢在歐洲而且東南亞.非傳染性疾病通常可以通過對患者的教育和解決導致慢性疾病的根本問題來預防。這些問題可能包括潛在的生物風險因素,如已知的導致神經係統疾病的遺傳風險,社會經濟因素,比如環境汙染或缺乏獲得健康的途徑食品/預防保健,行為風險,如吸煙、飲酒或久坐不動的生活方式

精密的預防重點是利用數據確定有患病風險的患者群體,然後提供降低疾病風險的幹預措施。幹預措施可能包括一款數字應用程序,可以遠程監控高危患者,並提供生活方式和治療建議加強對疾病狀況的監測,或提供補充預防性護理.然而,部署這些幹預措施首先取決於確定有風險的患者。

識別風險患者最強大的工具之一是使用真實世界數據(RWD),這個術語統稱為醫療保健生態係統生成的數據,例如來自住院、臨床實踐、藥房、醫療保健提供者的電子病曆(EMR)和健康記錄(EHR),以及越來越多地從基因組學、社交媒體和可穿戴設備等其他來源收集的數據。在我們的上一篇博文我們演示了如何從EHR數據建立臨床數據湖。在這篇博客中,我們通過使用Databricks統一數據分析平台來跟蹤患者的旅程,並創建一個機器學習模型。Beplay体育安卓版本使用這個模型,給定患者的病史和人口統計信息,我們可以在給定的時間窗口內評估患者特定疾病的風險。在這個例子中,我們將看看藥物濫用,這是一個重要的話題物質使用障礙導致的廣泛不良健康後果.通過跟蹤我們的模型使用MLflow我們可以很容易地跟蹤模型隨時間的變化,他補充道對將模型應用於病人護理的過程的信心

在Databricks上使用機器學習進行疾病預測

從電子病曆數據預測疾病風險的參考架構

數據準備

為了訓練一個模型在給定時間預測風險,我們需要一個數據集,該數據集捕獲有關患者的相關人口統計信息(如遇到患者時的年齡、種族等),以及關於患者診斷史的時間序列數據。然後,我們可以使用這些數據來訓練一個模型,該模型可以了解影響患者在即將到來的時間段內被診斷出疾病的可能性的診斷和人口風險。

顯示從EHR中提取的表的模式和表之間關係的圖表。
圖1:從EHR中提取的數據模式和表之間的關係

為了訓練這個模型,我們可以利用患者的遭遇記錄和人口統計信息,就像電子健康記錄(EHR)中提供的那樣。圖1描述了我們將在工作流程中使用的表。這些表格是用我們之前博客裏的筆記本.我們將繼續從Delta Lake加載遭遇、組織和患者數據(帶有模糊的PII信息),並創建一個包含所有患者遭遇以及患者人口統計信息的數據框架。

patient_encounters遇到加入(病人,“病人”])加入(組織、“組織”]))顯示器(patient_encounters.filter (' reasondescription不是null ') .limit (10))

基於目標條件,我們還選擇了一組符合訓練數據的患者。也就是說,我們包括病例,通過他們的接觸史至少被診斷出患有這種疾病的患者,以及同等數量的對照組,沒有任何疾病史的患者。

positive_patientspatient_encounters選擇“病人”)在哪裏較低的(“REASONDESCRIPTION”)。就像(“%{}%”.format (條件))).dropDuplicates ().withColumn (“is_positive”點燃(真正的)))negative_patientsall_patients加入(positive_patients“病人”),如何“left_anti”).limit (positive_patients。()).withColumn (“is_positive”點燃()))patients_to_studypositive_patients.union (negative_patients)

現在我們將我們的接觸範圍限製在研究中包括的患者。

qualified_patient_encounters_dfpatient_encounters加入(patients_to_study“病人”])過濾器("DESCRIPTION is not NUll"))

現在我們有了感興趣的記錄,下一步是添加特性。在這個預測任務中,除了人口統計信息外,我們還選擇了被診斷患有這種疾病或任何已知共存疾病(共病)的總次數和以前的遭遇次數作為給定遭遇的曆史背景。

雖然對於大多數疾病,有大量關於共病病症的文獻,但我們也可以利用現實世界數據集中的數據來確定與目標病症相關的共病。

comorbid_conditionspositive_patients。加入(patient_encounters,[“病人”])在哪裏(坳(“REASONDESCRIPTION”) .isNotNull ()).dropDuplicates ([“病人”“REASONDESCRIPTION”]).groupBy (“REASONDESCRIPTION”).().orderBy (“數”,提升).limit (num_conditions))

在我們的代碼中,我們使用筆記本電腦部件指定要包括的共病的數量,以及時間的長度(以天為單位)來查看遭遇。使用MLflow記錄這些參數的跟蹤API

使用MLflow記錄參數,比如我們正在研究的條件。

現在我們需要為每次遭遇添加共病特征。與每一種共病相對應,我們添加一列,表示在過去觀察到感興趣的條件的次數,即。

在給定c的條件下,i / t-w≤i < t,對指示器函數xi,c求和。

在哪裏

指標函數xi,c的定義,如果患者在第i時刻被診斷為c病症,xi,c為1,否則為0。

我們分兩個步驟添加這些特性。首先,我們定義一個函數,添加共病指標函數(即。x我,c):

def add_comorbidities (qualified_patient_encounters_df comorbidity_list):output_dfqualified_patient_encounters_dfidx0伴隨疾病comorbidity_list:output_dfoutput_df.withColumn(“comorbidity_ % d”%idx (output_df [“REASONDESCRIPTION”]。就像“%”+伴隨疾病(“REASONDESCRIPTION”+“%”))。“int”)).withColumn(“comorbidity_ % d”%idx,合並(坳(“comorbidity_ % d”%idx),點燃(0)) #替換0.cache ())idx+1返回(output_df)

然後我們將這些指標函數在連續幾天內相加使用Spark SQL對窗口函數的強大支持

def add_recent_encounters (encounter_features):lowest_dateencounter_features選擇“START_TIME”).orderBy (“START_TIME”).limit (1).withColumnRenamed (“START_TIME”“EARLIEST_TIME”))output_dfencounter_features.crossJoin (lowest_date).withColumn(“天”,datediff(坳(“START_TIME”)、坳(“EARLIEST_TIME”))).withColumn(“patient_age”,datediff(坳(“START_TIME”)、坳(“生日”))))wWindow.orderBy (output_df [“天”]).partitionBy (output_df [“病人”]).rangeBetween (-int(num_days),-1))comorbidity_idx範圍(num_conditions):col_name“recent_ % d”%comorbidity_idxoutput_dfoutput_df.withColumn (col_name總和(坳(“comorbidity_ % d”%comorbidity_idx))。(w)).withColumn (col_name合並(坳(col_name),點燃(0))))返回(output_df)

在添加共病特征之後,我們需要添加目標變量,它表明患者是否在未來給定的時間窗口(例如當前遭遇後的一個月)被診斷為目標疾病。此操作的邏輯與前一步非常相似,不同之處在於時間窗口涵蓋了未來的事件。我們隻使用二進製標簽,表示我們感興趣的診斷是否會在未來發生。

def add_label (encounter_features num_days_future):wWindow.orderBy (encounter_features [“天”]).partitionBy (encounter_features [“病人”]).rangeBetween (0num_days_future))output_dfencounter_features.withColumn (“標簽”馬克斯(坳(“comorbidity_0”))。(w)).withColumn (“標簽”合並(坳(“標簽”),點燃(0))))返回(output_df)

現在我們將這些特性寫入Delta Lake中的特性存儲中。為了確保重現性,我們將mlflow實驗ID和運行ID作為一列添加到特征存儲中.這種方法的優點是我們可以接收更多的數據,我們可以向featurestore中添加新的特性,以便將來可以重複使用。

控製數據的質量問題

在我們繼續訓練任務之前,我們看一下數據,看看不同的標簽在類之間是如何分布的。在二元分類的許多應用中,一個類別可能是罕見的,例如在疾病預測中。這種階層失衡會對學習過程產生負麵影響。在估計過程中,模型傾向於以犧牲罕見事件為代價關注大多數類。此外,評估過程也受到了損害。例如,在一個0/1標簽分別分布為95%和%5的不平衡數據集中,一個總是預測0的模型的準確率將為95%。如果標簽是不平衡的,那麼我們需要應用這是一種常見的技術用於校正不平衡數據。

在我們的訓練集中,隻有4%的遭遇在疾病診斷之前。

查看我們的訓練數據,我們看到(圖2)這是一個非常不平衡的數據集:超過95%的觀察時間窗口沒有顯示診斷的證據。為了調整不平衡,我們可以降低控製類的樣本或生成合成樣本。這個選擇取決於數據集的大小和特征的數量。在這個例子中,我們對majority類進行了低采樣,以獲得一個平衡的數據集。注意,在實踐中,您可以選擇方法的組合,例如對majority類和also進行下采樣在你的訓練算法中分配類權重

df1dataset_df.filter (“標簽= = 1”)n_df1df1。()df2dataset_df.filter (“標簽= = 0”)采樣(0.9) .limit (n_df1)training_dataset_dfdf1.union (df2)采樣(1.0)顯示器(training_dataset_df.groupBy (“標簽”).())

使用采樣重新平衡我們的數據集。

模型訓練

為了訓練模型,我們用人口統計和共病特征的子集來增強我們的條件,對每個觀察結果應用標簽,並將這些數據傳遞給下遊訓練的模型。例如,在這裏,我們增加了我們最近診斷的共病與遭遇類(例如,這次預約是為了預防保健還是去急診室?,)和就診費用,對於人口統計信息,我們選擇種族、性別、Zip和患者就診時的年齡。

大多數情況下,盡管原始臨床數據加起來可能高達tb,但在根據納入/排除標準執行過濾和限製記錄之後,我們最終得到的數據集可以在一台機器上訓練。我們可以很容易地將spark數據幀轉換為熊貓dataframes並根據選擇的任何算法訓練一個模型。當使用Databricks ML運行時在美國,我們可以使用各種開放的ML庫。

任何機器學習算法都有一組參數(超參數),根據輸入參數的不同,分數可能會發生變化。此外,在某些情況下,錯誤的參數或算法會導致過擬合。為了確保模型性能良好,我們使用hyperparameter調優選擇最好的模型架構,然後我們將通過指定從這一步獲得的參數來訓練最終的模型。

要執行模型調優,我們首先需要對數據進行預處理。在這個數據集中,除了數字特征(例如,最近共病的計數),我們還有我們想要使用的分類人口統計數據。對於分類數據,最好的方法是使用one-hot-encoding.這主要有兩個原因:首先,大多數分類器(在這種情況下是邏輯回歸)都是基於數字特征的。其次,如果我們簡單地將分類變量轉換為數值索引,它將在我們的數據中引入序數,這可能會誤導分類器:例如,如果我們將州名轉換為索引,例如加利福尼亞為5,紐約為23,那麼紐約就變得比加利福尼亞“更大”。雖然這反映了按字母順序排列的列表中每個州名的索引,但在我們的模型上下文中,這種排序並不意味著什麼。一次性熱編碼消除了這種影響。

在這種情況下,預處理步驟不需要任何輸入參數,超參數隻影響分類器而不影響預處理部分。因此,我們分別執行預處理,然後使用結果數據集進行模型調優:

sklearn.preprocessing進口OneHotEncoder進口numpy作為npdefpre_processtraining_dataset_pdf):X_pdf = training_dataset_pdf.drop (“標簽”軸=1)y_pdf = training_dataset_pdf [“標簽”onehotencoder = onehotencoder (handle_unknown=“忽略”)one_hot_model = onehotencode .fit(X_pdf.values)X = one_hot_model.transform (X_pdf)y = y_pdf.values返回(X, y)

接下來,我們要選擇模型的最佳參數。對於這種分類,我們使用LogisticRegression彈性淨罰.請注意,在應用one-hot編碼後,根據所討論的分類變量的基數,我們最終可以得到許多可以超過樣本數量的特征。為了避免這類問題的過擬合,對目標函數施加懲罰。彈性網絡正則化的優點是它結合了兩種懲罰技術(套索而且嶺回歸),混合的程度可以由一個變量控製,在超參數調諧。

為了改進該模型,我們使用hyperopt找到最好的參數。此外,我們使用SparkTrialshyperopt模式,以並行執行超參數搜索。這個過程利用Databricks的托管MLflow來自動記錄與每個超參數運行對應的參數和度量。為了驗證每一組參數,我們使用k-fold交叉驗證撇除使用F1的分數作為評估模型的指標。請注意,由於k-fold交叉驗證生成多個值,我們選擇分數的最小值(最壞的情況),並在使用hyperopt時盡量使其最大化。

預處理功能在訓練數據幀中的應用。

數學導入expdef params_to_lr (params):返回“懲罰”“elasticnet”“multi_class”“表達”“random_state”43“n_jobs”-1“規劃求解”“傳奇”“托爾”經驗值(params [“托爾”]), #經驗值()這裏是因為超參數日誌空間“C”經驗值(params [“C”]),“l1_ratio”經驗值(params [“l1_ratio”])def tune_model (params):mlflow.start_run (run_name“tunning-logistic-regression”、嵌套真正的)作為運行:clfLogisticRegression (params_to_lr (params))。fit (X, y)損失-cross_val_score(clf, X, y,n_jobs-1,得分“f1”).最小值()返回“狀態”: STATUS_OK,“損失”:損失}

為了改進對空間的搜索,我們在logspace中選擇參數網格,並定義一個轉換函數,通過超opt對建議的參數進行轉換。為了更好地了解這種方法以及為什麼我們選擇這樣定義超參數空間,請查看以下內容這個演講涵蓋了如何在Databricks上管理端到端ML生命周期。

導入fmin, hp, tpe, SparkTrials, STATUS_OKSearch_space = {在這裏使用uniform而不是loguniform隻是為了讓指標在mlflow比較中更好地顯示在logspace中“托爾”: hp.uniform (“托爾”,3.0),“C”: hp.uniform (“C”,20),“l1_ratio”: hp.uniform (“l1_ratio”,3.,1),spark_trials = SparkTrials(並行度=2)Best_params = fmin(fntune_model空間search_space算法山丘建議max_evals= 32,rstatenp隨機RandomState43),試用spark_trials)

該測試的結果是基於交叉驗證的f1評分評估的最佳參數。

params_to_lr (best_params)46]: {“懲罰”“elasticnet”“multi_class”“表達”“random_state”43“n_jobs”-1“規劃求解”“傳奇”“托爾”0.06555920596441883“C”0.17868321158011416“l1_ratio”0.27598949120226646

現在讓我們看一看MLflow儀表板。MLflow自動將超opt的所有運行分組在一起,我們可以使用各種圖來檢查每個超參數對損失函數的影響,如圖3所示。這對於更好地理解模型的行為和超參數的影響尤為重要。例如,我們注意到較低的C值,即正則化強度的倒數,會導致較高的F1值。

MLflow中模型的平行坐標圖。
圖3所示。MLflow中模型的平行坐標圖。

在找到最優參數組合後,我們用最優超參數訓練一個二元分類器,並使用MLflow記錄模型。MLflow的模型api可以輕鬆地將模型存儲為稍後在模型評分期間調用的python函數,而不管用於訓練的底層庫是什麼。為了幫助模型的可發現性,我們使用與目標條件相關的名稱記錄模型(例如,在本例中為“藥物過量”)。

進口mlflow.sklearn進口matplotlib.pyplot作為pltsklearn.pipeline進口管道mlflow.models.signature進口infer_signature因為我們希望模型輸出概率(風險)而不是預測的標簽,所以我們重寫了# # mlflow。Pyfun的預測方法:SklearnModelWrappermlflow.pyfunc.PythonModel):def__init__自我,模型):自我。Model =模型def預測Self context model_input):返回self.model.predict_proba (model_input) [:,1def火車參數個數):mlflow.start_run (run_name =“training-logistic-regression”嵌套=真正的)作為運行:mlflow.log_params (params_to_lr (params))X_arr = training_dataset_pdf.drop (“標簽”軸=1) . valuesy_arr = training_dataset_pdf [“標簽”) . valuesohe = OneHotEncoder(handle_unknown=“忽略”)clf = LogisticRegression(**params_to_lr(params))。fit (X, y)管道([(在一個炎熱的, ohe), (“clf”, clf)))
              Lr_model = pipe。fit (X_arr y_arr)score=cross_val_score(clf, ohe.transform(X_arr), y_arr,n_jobs=-1,得分=“準確性”) .mean ()wrapped_lr_model = SklearnModelWrapper(lr_model)model_name =“- - -”. join (condition.split ())mlflow.log_metric (“準確性”,得分)mlflow.pyfunc.log_model (model_name python_model = wrapped_lr_model)displayHTML (模型精度為: %s '%(分數))返回(mlflow.active_run () . info)

現在,我們可以通過傳遞從上一步獲得的最佳參數來訓練模型。

注意,對於模型訓練,我們已經將預處理(一個熱編碼)作為sklearn管道的一部分,並將編碼器和分類器記錄為一個模型。在下一步中,我們可以簡單地調用患者數據模型並評估他們的風險。

模型部署和生產

在訓練模型並將其記錄到MLflow之後,下一步是使用模型對新數據進行評分。MLflow的特性之一是可以根據不同的標簽搜索實驗。例如,在本例中,我們使用在模型訓練期間指定的運行名來檢索已訓練模型的工件URI。然後,我們可以根據關鍵指標對檢索到的實驗進行排序。

進口mlflowbest_run = mlflow.search_runs (filter_string ="tags.mlflow.runName = 'training- logistics -regression'"order_by = (的指標。準確性DESC”]) .iloc [0model_name =藥物過量的clf = mlflow.pyfunc.load_model (model_uri =“% s / % s”% (best_run.artifact_uri model_name))clf_udf = mlflow.pyfunc。spark_udf(火花,model_uri =“% s / % s”% (best_run.artifact_uri model_name))

一旦我們選擇了一個特定的模型,我們就可以通過指定模型URI和名稱來加載模型:

加載特性。

將MLflow加載模型應用到特征數據框架中。

我們還可以使用Databricks的模型注冊表來管理模型版本、生產生命周期和簡單的模型服務。

將疾病預測轉化為精準預防

在這篇博客中,我們探討了對精確預防係統的需求,該係統可以識別驅動慢性疾病發作的臨床和人口統計學協變量。然後,我們研究了一個端到端機器學習工作流,該工作流使用來自電子病曆的模擬數據來識別有藥物過量風險的患者。在這個工作流的最後,我們能夠導出我們從MLflow訓練的ML模型,並將其應用於新的患者數據流。

雖然這個模型提供了信息,但在轉化為實踐之前,它不會產生影響。在現實世界的實踐中,我們已經與許多客戶合作,將這些和類似的係統部署到生產中。beplay体育app下载地址例如,在南卡羅來納醫科大學,他們能夠部署實時流媒體管道,處理電子病曆數據,以識別有敗血症風險的患者。這可以提前8小時檢測到膿毒症相關患者的衰退。在INTEGRIS Health的一個類似係統中,監測EHR數據以發現壓瘡發展的新跡象。在這兩種情況下,隻要患者被確認,護理團隊就會被提醒他們的病情。在醫療保險環境中,我們與Optum合作部署了類似的模型。他們能夠開發一種疾病預測引擎,在長期短期架構中使用循環神經網絡來識別疾病進展,並在九個不同的疾病領域具有良好的泛化能力。該模型用於使患者與預防保健途徑保持一致,從而改善慢性病患者的結果和護理成本。

雖然我們的大部分博客都專注於在醫療保健環境中使用疾病預測算法,但在製藥環境中也有很強的機會構建和部署這些模型。疾病預測模型可以深入了解藥物在上市後的使用情況,甚至可以發現以前未被發現的保護作用告知標簽擴展工作.此外,疾病預測模型在觀察罕見疾病(或其他診斷不足的疾病)的臨床試驗登記時是有用的.通過創建一個模型,觀察在接受罕見疾病診斷之前被誤診的患者,我們可以創建教育材料,教育臨床醫生常見的誤診模式,並希望創建試驗納入標準,從而增加試驗登記人數和提高療效。

在健康的三角洲湖上進行精確的預防

在這篇博客中,我們演示了如何在真實世界的數據上使用機器學習來識別有患慢性疾病風險的患者。要了解更多關於使用Delta Lake存儲和處理臨床數據集,請下載我們的關於使用真實世界臨床數據集的免費電子書.您也可以開始免費試用今天使用患者風險評分記錄本來自這個博客。

免費試用Databricks

相關的帖子

看到所有工程的博客的帖子
Baidu
map