歡迎來到 PocketFlow 的世界!在本章節中,我們將一起探索 PocketFlow 中最核心、最基礎的建構模塊——節點 (Node)。
想像一下,您正在建造一座複雜的樂高城堡。如果試圖一次性將所有樂高積木拼湊起來,那將會是一項艱鉅且容易出錯的任務。更聰明的方法是將城堡拆分成幾個部分,例如城牆、塔樓、大門等,分別建造完成後再組合起來。
在軟體開發中,特別是處理像呼叫大型語言模型 (LLM) 這類可能包含多個步驟或容易出錯的任務時,我們也希望能將複雜的任務拆解成一個個獨立、專注的小單元。這就是「節點」誕生的緣由。
節點 (Node) 就像工廠流水線上的一位工人,專注於一道特定的工序。它負責執行單一、明確的任務,讓整個工作流程更加清晰、穩定且易於管理。
讓我們以一個常見的任務為例:將一段文字內容進行摘要。這個任務看似簡單,但如果我們希望它能穩定運行,可能需要考慮:
一個「節點」就能優雅地處理上述所有環節。
prep
-> exec
-> post
)每個節點都遵循一個清晰的三步驟工作流程:準備 (prepare)、執行 (execute)、以及善後 (post-process)。
讓我們更詳細地了解這三個步驟:
prep(shared)
- 準備階段:
prep_res
,這個結果會被接下來的 exec()
和 post()
步驟使用。exec(prep_res)
- 執行階段:
exec_res
,這個結果會被傳遞給 post()
步驟。post(shared, prep_res, exec_res)
- 善後階段:
exec
階段的成果,並將結果寫回「共享儲存 (Shared Store)」。同時,它還會決定下一步的行動。"default"
。為何是三個步驟? 這是為了實踐「關注點分離 (separation of concerns)」原則。數據的存取(
prep
和post
)與數據的處理(exec
)被明確分開,使得程式碼更清晰、更易於維護和測試。值得一提的是,這三個步驟都是可選的。例如,如果您只需要處理數據而不需要執行複雜計算,可以只實作
prep
和post
。 {: .note }
讓我們來建立一個名為 SummarizeFile
的節點,它會讀取一段文字,呼叫(模擬的)大型語言模型來產生摘要,然後儲存摘要結果。
首先,我們需要定義一個繼承自 Node
的類別:
from pocketflow import Node # 假設 pocketflow 已安裝
# 模擬呼叫大型語言模型
def call_llm(prompt):
print(f"正在呼叫 LLM,提示:'{prompt[:30]}...'")
if "錯誤" in prompt: # 模擬錯誤情況
raise Exception("LLM 呼叫失敗!")
return f"這是對 '{prompt[prompt.find(':')+2:prompt.find(':')+22]}...' 的摘要。"
class SummarizeFile(Node):
# 我們將逐步填寫 prep, exec, post 方法
pass
這段程式碼定義了一個名為 SummarizeFile
的新類別,它繼承了 PocketFlow
提供的 Node
類別。我們還定義了一個 call_llm
函數來模擬呼叫語言模型的行為。
prep
)prep
方法負責從共享儲存中獲取需要處理的資料。在這個例子中,我們假設要摘要的文本存放在共享儲存的 "data"
鍵中。
class SummarizeFile(Node):
def prep(self, shared):
print("步驟 1: prep - 正在準備資料...")
text_to_summarize = shared.get("data", "") # 從 shared store 獲取 "data"
if not text_to_summarize:
print("警告:沒有找到要摘要的文本。")
return text_to_summarize # 將文本傳遞給 exec
prep
方法接收一個名為 shared
的參數,這就是我們的共享儲存 (Shared Store)。它會嘗試讀取 shared["data"]
的內容。如果找不到,則返回空字串。
exec
)exec
方法接收 prep
方法的回傳結果 (即 prep_res
),並執行核心任務——呼叫 LLM 進行摘要。
class SummarizeFile(Node):
# ... (prep 方法如上) ...
def exec(self, prep_res): # prep_res 是從 prep 方法返回的文本
print(f"步驟 2: exec - 正在執行摘要...")
if not prep_res: # 如果文本為空
return "輸入文件內容為空。"
# 建立提示語句
prompt = f"請將以下文本摘要成10個字:{prep_res}"
summary = call_llm(prompt) # 呼叫 LLM,這可能會失敗
return summary # 返回摘要結果
這裡,exec
方法首先檢查傳入的 prep_res
(文本內容) 是否為空。如果非空,它會建立一個提示 (prompt),然後呼叫 call_llm
函數來獲得摘要。
post
)post
方法接收 shared
(共享儲存)、prep_res
(準備階段的結果) 和 exec_res
(執行階段的結果,即摘要)。它的任務是將摘要儲存回共享儲存,並決定下一步的行動。
class SummarizeFile(Node):
# ... (prep 和 exec 方法如上) ...
def post(self, shared, prep_res, exec_res):
print(f"步驟 3: post - 正在儲存結果...")
shared["summary"] = exec_res # 將摘要 (exec_res) 存到 shared store 的 "summary"
print(f"摘要已儲存:'{exec_res}'")
# 沒有明確 return,預設返回 "default" 行動指令
post
方法將 exec_res
(摘要結果) 儲存到 shared
字典的 "summary"
鍵下。因為我們沒有明確 return
一個行動指令,所以這個節點在執行完畢後會預設返回 "default"
。
現實世界的任務,尤其是與外部服務(如 LLM API)互動時,並非總是一帆風順。網路問題、服務暫時不可用或 API 速率限制都可能導致失敗。PocketFlow 的節點內建了錯誤處理和重試機制,讓您的工作流程更加強韌。
max_retries
和 wait
)在定義節點時,您可以指定兩個參數來控制 exec()
方法的重試行為:
max_retries
(整數):exec()
方法最多執行的次數。預設值是 1
,表示不重試。如果設為 3
,則在第一次執行失敗後,最多再重試 2
次。wait
(整數):每次重試前等待的秒數。預設值是 0
(不等待)。當遇到 API 速率限制或配額錯誤時,設定等待時間進行退避 (back off) 會很有幫助。例如,我們可以這樣初始化我們的 SummarizeFile
節點,使其在失敗時最多重試3次,每次重試前等待10秒:
summarize_node = SummarizeFile(max_retries=3, wait=10)
當 exec()
方法中發生異常 (exception) 時,節點會自動重試,直到:
max_retries - 1
次,並且在最後一次嘗試時仍然失敗。您可以透過 self.cur_retry
(從0開始計數) 在 exec()
方法內部獲取當前的重試次數。
class RetryNode(Node):
def exec(self, prep_res):
print(f"這是第 {self.cur_retry} 次重試 (從0開始計)") # 了解目前是第幾次嘗試
raise Exception("模擬執行失敗")
# node = RetryNode(max_retries=3)
# node.run({}) # 執行時會看到打印3次,然後拋出異常
exec_fallback
)在所有重試都失敗後,如果我們不希望整個流程因異常而崩潰,而是提供一個備用的「降級」結果,可以覆寫 exec_fallback
方法。
def exec_fallback(self, prep_res, exc): # exc 是導致失敗的異常對象
raise exc # 預設行為是重新拋出異常
預設情況下,exec_fallback
只是簡單地重新拋出它接收到的異常。但是,您可以覆寫它來返回一個備用結果。這個備用結果隨後會成為 exec_res
傳遞給 post()
方法。
讓我們為 SummarizeFile
節點添加一個 exec_fallback
方法:
class SummarizeFile(Node):
# ... (prep, exec, post 方法如上) ...
def exec_fallback(self, prep_res, exc):
print(f"錯誤:執行摘要時發生問題 ({exc})。提供備用方案。")
# 提供一個簡單的備用回應,而不是讓程式崩潰
return "處理您的請求時發生錯誤,請稍後再試。"
現在,如果 call_llm
在 exec
中多次失敗(達到 max_retries
),exec_fallback
將被調用,並返回一個友好的錯誤訊息,而不是讓整個程式停止。
萬事俱備!現在我們可以創建 SummarizeFile
節點的實例,並運行它。
# 假設 SummarizeFile 類別已完整定義 (包含 prep, exec, post, exec_fallback)
summarize_node = SummarizeFile(max_retries=3, wait=2) # 最多嘗試3次,每次等待2秒
# 準備共享數據
shared_data = {"data": "PocketFlow 是一個輕量級的工作流程編排框架。"}
# 運行節點
# node.run() 會依次調用 prep -> exec (可能重試/fallback) -> post
print("--- 開始運行節點 (正常情況) ---")
action_result = summarize_node.run(shared_data)
print(f"\n節點返回的行動指令: {action_result}") # 應該是 "default"
print(f"共享儲存中的摘要: {shared_data.get('summary')}")
# 模擬一個會導致 LLM 呼叫失敗的情況
print("\n--- 開始運行節點 (模擬 LLM 失敗) ---")
shared_data_will_fail = {"data": "這段文字包含“錯誤”關鍵字,會導致模擬的LLM失敗。"}
action_result_fail = summarize_node.run(shared_data_will_fail)
print(f"\n節點返回的行動指令 (失敗情況): {action_result_fail}")
print(f"共享儲存中的摘要 (失敗情況): {shared_data_will_fail.get('summary')}")
執行上述程式碼,您將會看到:
正常情況下:
--- 開始運行節點 (正常情況) ---
步驟 1: prep - 正在準備資料...
步驟 2: exec - 正在執行摘要...
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:PocketFlow...'
步驟 3: post - 正在儲存結果...
摘要已儲存:'這是對 'PocketFlow 是一個輕量級的...' 的摘要。'
節點返回的行動指令: default
共享儲存中的摘要: 這是對 'PocketFlow 是一個輕量級的...' 的摘要。
模擬 LLM 失敗情況:
--- 開始運行節點 (模擬 LLM 失敗) ---
步驟 1: prep - 正在準備資料...
步驟 2: exec - 正在執行摘要...
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
步驟 2: exec - 正在執行摘要... (重試第1次)
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
步驟 2: exec - 正在執行摘要... (重試第2次)
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
錯誤:執行摘要時發生問題 (LLM 呼叫失敗!)。提供備用方案。
步驟 3: post - 正在儲存結果...
摘要已儲存:'處理您的請求時發生錯誤,請稍後再試。'
節點返回的行動指令 (失敗情況): default
共享儲存中的摘要 (失敗情況): 處理您的請求時發生錯誤,請稍後再試。
正如您所見,當 call_llm
模擬失敗時,節點會自動重試。在所有重試都用盡後,exec_fallback
方法被調用,確保流程能夠優雅地處理錯誤,並將備用結果儲存到共享數據中。
當您調用一個節點的 run(shared)
方法時,PocketFlow 在內部會按照一定的順序執行定義好的步驟。
非程式碼的逐步解析:
my_node.run(shared_data)
。prep
):節點首先執行 prep(shared_data)
方法。此方法可以讀取 shared_data
,進行一些預處理,然後返回結果 (我們稱之為 prep_result
)。_exec
):prep_result
來執行核心邏輯。這一步驟實際上是由一個內部方法 _exec(prep_result)
來管理的,它包含了重試邏輯。_exec
會在一個迴圈中嘗試執行您定義的 exec(prep_result)
方法,最多 max_retries
次。exec()
成功:它返回的結果 (我們稱之為 exec_result
) 會被用於下一步。exec()
拋出異常:max_retries - 1
:節點會等待 wait
秒(如果 wait > 0
),然後再次嘗試執行 exec()
。cur_retry == max_retries - 1
)並且仍然失敗:節點會調用 exec_fallback(prep_result, exception_object)
。exec_fallback
返回的結果將作為 exec_result
。如果 exec_fallback
本身也拋出異常(或預設行為),則該異常會向上拋出,中斷流程。post
):最後,節點執行 post(shared_data, prep_result, exec_result)
方法。此方法可以使用執行結果來更新 shared_data
,並返回一個行動指令字串。run()
方法最終返回 post()
方法所返回的行動指令。序列圖 (Sequence Diagram) 概覽:
相關程式碼片段 (簡化版):
在 pocketflow/__init__.py
檔案中,Node
類別的相關實作大致如下:
# 位於 pocketflow/__init__.py (為教學目的簡化)
class BaseNode:
def run(self,shared):
# ... (省略部分警告檢查)
return self._run(shared) # 實際執行交給 _run
def _run(self,shared):
p = self.prep(shared) # 1. 執行 prep
e = self._exec(p) # 2. 執行 _exec (包含重試和 fallback)
return self.post(shared,p,e)# 3. 執行 post
class Node(BaseNode):
def __init__(self,max_retries=1,wait=0):
super().__init__()
self.max_retries = max_retries
self.wait = wait
self.cur_retry = 0 # 當前重試次數 (從0開始)
def exec_fallback(self,prep_res,exc): # 如果 exec 最終失敗,會呼叫這個
raise exc # 預設是重新拋出錯誤
def _exec(self,prep_res): # 內部執行方法,處理重試邏輯
for self.cur_retry in range(self.max_retries): # 最多嘗試 max_retries 次
try:
return self.exec(prep_res) # 嘗試執行使用者定義的 exec
except Exception as e:
if self.cur_retry == self.max_retries - 1: # 如果是最後一次嘗試
return self.exec_fallback(prep_res,e) # 執行 fallback
if self.wait > 0:
time.sleep(self.wait) # 等待指定時間後重試
這段簡化的程式碼展示了 run
方法如何協調 prep
、_exec
和 post
的調用。而 _exec
方法則負責實現重試邏輯:它在一個迴圈中調用您定義的 exec
方法,如果發生異常,它會檢查是否還有重試機會。如果所有重試都失敗了,它會調用 exec_fallback
。
在本章中,我們學習了 PocketFlow 的基礎建構單元——節點 (Node)。
prep
(準備資料)、exec
(執行核心邏輯)、post
(善後處理並儲存結果) 的三步驟工作流程。max_retries
和 wait
參數設定自動重試機制。exec_fallback
方法,可以為節點的執行失敗提供優雅的降級處理。run(shared_data)
方法執行,並與共享儲存 (Shared Store)互動來讀取輸入和儲存輸出。節點是構建更複雜工作流程的基石。它們使我們能夠將大問題分解為小而可管理的部分,並為每個部分提供健壯的執行保證。
在下一章中,我們將深入探討節點之間交換數據的橋樑——共享儲存 (Shared Store)。