Chapter 1: 節點 (Node)

歡迎來到 PocketFlow 的世界!在本章節中,我們將一起探索 PocketFlow 中最核心、最基礎的建構模塊——節點 (Node)

為何需要節點?——化繁為簡的魔法

想像一下,您正在建造一座複雜的樂高城堡。如果試圖一次性將所有樂高積木拼湊起來,那將會是一項艱鉅且容易出錯的任務。更聰明的方法是將城堡拆分成幾個部分,例如城牆、塔樓、大門等,分別建造完成後再組合起來。

在軟體開發中,特別是處理像呼叫大型語言模型 (LLM) 這類可能包含多個步驟或容易出錯的任務時,我們也希望能將複雜的任務拆解成一個個獨立、專注的小單元。這就是「節點」誕生的緣由。

節點 (Node) 就像工廠流水線上的一位工人,專注於一道特定的工序。它負責執行單一、明確的任務,讓整個工作流程更加清晰、穩定且易於管理。

讓我們以一個常見的任務為例:將一段文字內容進行摘要。這個任務看似簡單,但如果我們希望它能穩定運行,可能需要考慮:

  1. 如何獲取待摘要的文字?
  2. 如何呼叫語言模型進行摘要?
  3. 如果語言模型呼叫失敗了怎麼辦?是否需要重試?
  4. 摘要完成後,結果要存放到哪裡?

一個「節點」就能優雅地處理上述所有環節。

節點的核心:三步工作法 (prep -> exec -> post)

每個節點都遵循一個清晰的三步驟工作流程:準備 (prepare)、執行 (execute)、以及善後 (post-process)。

讓我們更詳細地了解這三個步驟:

  1. prep(shared) - 準備階段

    • 任務:從一個叫做「共享儲存 (Shared Store)」的公共空間讀取並預處理所需的數據。
    • 比喻:工人在開始工作前,從倉庫領取今天所需的原材料(例如文字檔案、數據庫記錄)。
    • 範例:讀取檔案內容、從資料庫查詢資訊、將數據序列化成字串。
    • 回傳prep_res,這個結果會被接下來的 exec()post() 步驟使用。
  2. exec(prep_res) - 執行階段

    • 任務:執行節點的核心計算邏輯。這是真正「做事」的階段。
    • 比喻:工人使用領來的材料,專心完成自己的加工任務(例如:使用機器打磨零件、呼叫大型語言模型分析文本)。
    • 範例:呼叫大型語言模型 (LLM)、調用遠端 API、使用特定工具。
    • ⚠️ 注意:此步驟應專注於計算,不應直接存取「共享儲存 (Shared Store)」。
    • ⚠️ 注意:如果啟用了重試機制(稍後會介紹),請確保此處的實作是冪等 (idempotent) 的,意味著多次執行相同的操作應產生相同的結果,不會產生副作用。
    • 回傳exec_res,這個結果會被傳遞給 post() 步驟。
  3. post(shared, prep_res, exec_res) - 善後階段

    • 任務:後續處理 exec 階段的成果,並將結果寫回「共享儲存 (Shared Store)」。同時,它還會決定下一步的行動。
    • 比喻:工人將加工完成的零件放到傳送帶上,送往下一道工序或成品區,並告知控制中心接下來的安排。
    • 範例:更新資料庫、改變狀態、記錄日誌。
    • 回傳:一個表示下一步行動的字串。如果沒有明確指定,則預設為 "default"

為何是三個步驟? 這是為了實踐「關注點分離 (separation of concerns)」原則。數據的存取(preppost)與數據的處理(exec)被明確分開,使得程式碼更清晰、更易於維護和測試。

值得一提的是,這三個步驟都是可選的。例如,如果您只需要處理數據而不需要執行複雜計算,可以只實作 preppost。 {: .note }

小試身手:建立一個「文字摘要節點」

讓我們來建立一個名為 SummarizeFile 的節點,它會讀取一段文字,呼叫(模擬的)大型語言模型來產生摘要,然後儲存摘要結果。

首先,我們需要定義一個繼承自 Node 的類別:

from pocketflow import Node # 假設 pocketflow 已安裝

# 模擬呼叫大型語言模型
def call_llm(prompt):
    print(f"正在呼叫 LLM,提示:'{prompt[:30]}...'")
    if "錯誤" in prompt: # 模擬錯誤情況
        raise Exception("LLM 呼叫失敗!")
    return f"這是對 '{prompt[prompt.find(':')+2:prompt.find(':')+22]}...' 的摘要。"

class SummarizeFile(Node):
    # 我們將逐步填寫 prep, exec, post 方法
    pass

這段程式碼定義了一個名為 SummarizeFile 的新類別,它繼承了 PocketFlow 提供的 Node 類別。我們還定義了一個 call_llm 函數來模擬呼叫語言模型的行為。

1. 準備資料 (prep)

prep 方法負責從共享儲存中獲取需要處理的資料。在這個例子中,我們假設要摘要的文本存放在共享儲存的 "data" 鍵中。

class SummarizeFile(Node):
    def prep(self, shared):
        print("步驟 1: prep - 正在準備資料...")
        text_to_summarize = shared.get("data", "") # 從 shared store 獲取 "data"
        if not text_to_summarize:
            print("警告:沒有找到要摘要的文本。")
        return text_to_summarize # 將文本傳遞給 exec

prep 方法接收一個名為 shared 的參數,這就是我們的共享儲存 (Shared Store)。它會嘗試讀取 shared["data"] 的內容。如果找不到,則返回空字串。

2. 執行核心邏輯 (exec)

exec 方法接收 prep 方法的回傳結果 (即 prep_res),並執行核心任務——呼叫 LLM 進行摘要。

class SummarizeFile(Node):
    # ... (prep 方法如上) ...

    def exec(self, prep_res): # prep_res 是從 prep 方法返回的文本
        print(f"步驟 2: exec - 正在執行摘要...")
        if not prep_res: # 如果文本為空
            return "輸入文件內容為空。"
        
        # 建立提示語句
        prompt = f"請將以下文本摘要成10個字:{prep_res}"
        summary = call_llm(prompt) # 呼叫 LLM,這可能會失敗
        return summary # 返回摘要結果

這裡,exec 方法首先檢查傳入的 prep_res (文本內容) 是否為空。如果非空,它會建立一個提示 (prompt),然後呼叫 call_llm 函數來獲得摘要。

3. 善後處理與儲存 (post)

post 方法接收 shared (共享儲存)、prep_res (準備階段的結果) 和 exec_res (執行階段的結果,即摘要)。它的任務是將摘要儲存回共享儲存,並決定下一步的行動。

class SummarizeFile(Node):
    # ... (prep 和 exec 方法如上) ...

    def post(self, shared, prep_res, exec_res):
        print(f"步驟 3: post - 正在儲存結果...")
        shared["summary"] = exec_res # 將摘要 (exec_res) 存到 shared store 的 "summary"
        print(f"摘要已儲存:'{exec_res}'")
        # 沒有明確 return,預設返回 "default" 行動指令

post 方法將 exec_res (摘要結果) 儲存到 shared 字典的 "summary" 鍵下。因為我們沒有明確 return 一個行動指令,所以這個節點在執行完畢後會預設返回 "default"

提升節點的強韌性:錯誤處理與重試

現實世界的任務,尤其是與外部服務(如 LLM API)互動時,並非總是一帆風順。網路問題、服務暫時不可用或 API 速率限制都可能導致失敗。PocketFlow 的節點內建了錯誤處理和重試機制,讓您的工作流程更加強韌。

自動重試 (max_retrieswait)

在定義節點時,您可以指定兩個參數來控制 exec() 方法的重試行為:

例如,我們可以這樣初始化我們的 SummarizeFile 節點,使其在失敗時最多重試3次,每次重試前等待10秒:

summarize_node = SummarizeFile(max_retries=3, wait=10)

exec() 方法中發生異常 (exception) 時,節點會自動重試,直到:

您可以透過 self.cur_retry (從0開始計數) 在 exec() 方法內部獲取當前的重試次數。

class RetryNode(Node):
    def exec(self, prep_res):
        print(f"這是第 {self.cur_retry} 次重試 (從0開始計)") # 了解目前是第幾次嘗試
        raise Exception("模擬執行失敗")

# node = RetryNode(max_retries=3)
# node.run({}) # 執行時會看到打印3次,然後拋出異常

優雅降級 (exec_fallback)

在所有重試都失敗後,如果我們不希望整個流程因異常而崩潰,而是提供一個備用的「降級」結果,可以覆寫 exec_fallback 方法。

def exec_fallback(self, prep_res, exc): # exc 是導致失敗的異常對象
    raise exc # 預設行為是重新拋出異常

預設情況下,exec_fallback 只是簡單地重新拋出它接收到的異常。但是,您可以覆寫它來返回一個備用結果。這個備用結果隨後會成為 exec_res 傳遞給 post() 方法。

讓我們為 SummarizeFile 節點添加一個 exec_fallback 方法:

class SummarizeFile(Node):
    # ... (prep, exec, post 方法如上) ...

    def exec_fallback(self, prep_res, exc):
        print(f"錯誤:執行摘要時發生問題 ({exc})。提供備用方案。")
        # 提供一個簡單的備用回應,而不是讓程式崩潰
        return "處理您的請求時發生錯誤,請稍後再試。"

現在,如果 call_llmexec 中多次失敗(達到 max_retries),exec_fallback 將被調用,並返回一個友好的錯誤訊息,而不是讓整個程式停止。

實際運行節點

萬事俱備!現在我們可以創建 SummarizeFile 節點的實例,並運行它。

# 假設 SummarizeFile 類別已完整定義 (包含 prep, exec, post, exec_fallback)
summarize_node = SummarizeFile(max_retries=3, wait=2) # 最多嘗試3次,每次等待2秒

# 準備共享數據
shared_data = {"data": "PocketFlow 是一個輕量級的工作流程編排框架。"}

# 運行節點
# node.run() 會依次調用 prep -> exec (可能重試/fallback) -> post
print("--- 開始運行節點 (正常情況) ---")
action_result = summarize_node.run(shared_data)

print(f"\n節點返回的行動指令: {action_result}") # 應該是 "default"
print(f"共享儲存中的摘要: {shared_data.get('summary')}")

# 模擬一個會導致 LLM 呼叫失敗的情況
print("\n--- 開始運行節點 (模擬 LLM 失敗) ---")
shared_data_will_fail = {"data": "這段文字包含“錯誤”關鍵字,會導致模擬的LLM失敗。"}
action_result_fail = summarize_node.run(shared_data_will_fail)

print(f"\n節點返回的行動指令 (失敗情況): {action_result_fail}")
print(f"共享儲存中的摘要 (失敗情況): {shared_data_will_fail.get('summary')}")

執行上述程式碼,您將會看到:

正常情況下:

--- 開始運行節點 (正常情況) ---
步驟 1: prep - 正在準備資料...
步驟 2: exec - 正在執行摘要...
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:PocketFlow...'
步驟 3: post - 正在儲存結果...
摘要已儲存:'這是對 'PocketFlow 是一個輕量級的...' 的摘要。'

節點返回的行動指令: default
共享儲存中的摘要: 這是對 'PocketFlow 是一個輕量級的...' 的摘要。

模擬 LLM 失敗情況:

--- 開始運行節點 (模擬 LLM 失敗) ---
步驟 1: prep - 正在準備資料...
步驟 2: exec - 正在執行摘要...
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
步驟 2: exec - 正在執行摘要... (重試第1次)
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
步驟 2: exec - 正在執行摘要... (重試第2次)
正在呼叫 LLM,提示:'請將以下文本摘要成10個字:這段文字包含“...'
錯誤:執行摘要時發生問題 (LLM 呼叫失敗!)。提供備用方案。
步驟 3: post - 正在儲存結果...
摘要已儲存:'處理您的請求時發生錯誤,請稍後再試。'

節點返回的行動指令 (失敗情況): default
共享儲存中的摘要 (失敗情況): 處理您的請求時發生錯誤,請稍後再試。

正如您所見,當 call_llm 模擬失敗時,節點會自動重試。在所有重試都用盡後,exec_fallback 方法被調用,確保流程能夠優雅地處理錯誤,並將備用結果儲存到共享數據中。

幕後揭秘:節點是如何運作的?

當您調用一個節點的 run(shared) 方法時,PocketFlow 在內部會按照一定的順序執行定義好的步驟。

非程式碼的逐步解析:

  1. 接收指令:您呼叫 my_node.run(shared_data)
  2. 準備階段 (prep):節點首先執行 prep(shared_data) 方法。此方法可以讀取 shared_data,進行一些預處理,然後返回結果 (我們稱之為 prep_result)。
  3. 執行階段 (_exec)
    • 節點接著使用 prep_result 來執行核心邏輯。這一步驟實際上是由一個內部方法 _exec(prep_result) 來管理的,它包含了重試邏輯。
    • _exec 會在一個迴圈中嘗試執行您定義的 exec(prep_result) 方法,最多 max_retries 次。
    • 如果 exec() 成功:它返回的結果 (我們稱之為 exec_result) 會被用於下一步。
    • 如果 exec() 拋出異常
      • 如果當前重試次數小於 max_retries - 1:節點會等待 wait 秒(如果 wait > 0),然後再次嘗試執行 exec()
      • 如果已經達到最大重試次數(即 cur_retry == max_retries - 1)並且仍然失敗:節點會調用 exec_fallback(prep_result, exception_object)exec_fallback 返回的結果將作為 exec_result。如果 exec_fallback 本身也拋出異常(或預設行為),則該異常會向上拋出,中斷流程。
  4. 善後階段 (post):最後,節點執行 post(shared_data, prep_result, exec_result) 方法。此方法可以使用執行結果來更新 shared_data,並返回一個行動指令字串。
  5. 返回行動run() 方法最終返回 post() 方法所返回的行動指令。

序列圖 (Sequence Diagram) 概覽:

sequenceDiagram participant 使用者 participant MyNode as 摘要節點 participant SharedStore as 共享儲存 使用者->>摘要節點: run(shared_data) 摘要節點->>摘要節點: prep(shared_data) Note right of 摘要節點: 從共享儲存讀取數據 摘要節點-->>SharedStore: 讀取 "data" SharedStore-->>摘要節點: 文本內容 摘要節點->>摘要節點: _exec(prep_result) Note right of 摘要節點: 執行核心邏輯 (例如: 呼叫 LLM) alt 執行成功 摘要節點-->>摘要節點: exec_result (摘要) else 執行失敗 (且重試次數未用盡) Note over 摘要節點: 呼叫 LLM 失敗 摘要節點->>摘要節點: 等待 (wait) 摘要節點->>摘要節點: 重試 exec() Note over 摘要節點: 再次呼叫 LLM alt 重試成功 摘要節點-->>摘要節點: exec_result (摘要) else 重試依然失敗 (達到 max_retries) 摘要節點->>摘要節點: exec_fallback(prep_result, exception) 摘要節點-->>摘要節點: fallback_result (錯誤訊息) end end 摘要節點->>摘要節點: post(shared_data, prep_result, exec_result) Note right of 摘要節點: 將結果寫回共享儲存 摘要節點-->>SharedStore: 寫入 "summary" SharedStore-->>摘要節點: 確認寫入 摘要節點-->>使用者: action ("default")

相關程式碼片段 (簡化版):

pocketflow/__init__.py 檔案中,Node 類別的相關實作大致如下:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class BaseNode:
    def run(self,shared): 
        # ... (省略部分警告檢查)
        return self._run(shared) # 實際執行交給 _run

    def _run(self,shared):
        p = self.prep(shared)       # 1. 執行 prep
        e = self._exec(p)           # 2. 執行 _exec (包含重試和 fallback)
        return self.post(shared,p,e)# 3. 執行 post

class Node(BaseNode):
    def __init__(self,max_retries=1,wait=0):
        super().__init__()
        self.max_retries = max_retries
        self.wait = wait
        self.cur_retry = 0 # 當前重試次數 (從0開始)

    def exec_fallback(self,prep_res,exc): # 如果 exec 最終失敗,會呼叫這個
        raise exc # 預設是重新拋出錯誤

    def _exec(self,prep_res): # 內部執行方法,處理重試邏輯
        for self.cur_retry in range(self.max_retries): # 最多嘗試 max_retries 次
            try:
                return self.exec(prep_res) # 嘗試執行使用者定義的 exec
            except Exception as e:
                if self.cur_retry == self.max_retries - 1: # 如果是最後一次嘗試
                    return self.exec_fallback(prep_res,e) # 執行 fallback
                if self.wait > 0:
                    time.sleep(self.wait) # 等待指定時間後重試

這段簡化的程式碼展示了 run 方法如何協調 prep_execpost 的調用。而 _exec 方法則負責實現重試邏輯:它在一個迴圈中調用您定義的 exec 方法,如果發生異常,它會檢查是否還有重試機會。如果所有重試都失敗了,它會調用 exec_fallback

總結

在本章中,我們學習了 PocketFlow 的基礎建構單元——節點 (Node)

節點是構建更複雜工作流程的基石。它們使我們能夠將大問題分解為小而可管理的部分,並為每個部分提供健壯的執行保證。

在下一章中,我們將深入探討節點之間交換數據的橋樑——共享儲存 (Shared Store)