Chapter 3: 流程 (Flow)

在前一章 共享儲存 (Shared Store) 中,我們探討了節點之間如何透過「共享儲存」來交換資料。我們知道了節點可以在 prep 階段從共享儲存讀取資訊,並在 post 階段將成果寫回。這就像團隊成員將他們的成果放在共享的白板上。

但是,如果我們有多個節點 (Node),例如一個節點負責下載資料,另一個節點負責分析資料,第三個節點負責產生報告,我們如何確保它們按照正確的順序執行,並且根據前一個節點的結果決定下一步該做什麼呢?這就需要一個「生產線經理」來協調指揮。這就是本章的主角——流程 (Flow)

為何需要流程?——指揮節點的交響樂

想像一下,您正在指揮一個小型樂隊。每個樂手(節點)都知道如何演奏自己的樂器(執行任務),但他們需要一位指揮(流程)來告訴他們何時開始、何時停止,以及如何與其他樂手協調,才能演奏出一首和諧的樂曲。

流程 (Flow) 的核心任務就是編排和協調多個節點的執行順序與邏輯。它像一位工廠的生產線經理,根據每個工人(節點)完成任務後的成果(post 方法返回的行動指令),決定下一個應該執行哪個工序(節點),從而引導整個生產流程。

例如,一個簡單的客戶服務流程可能包含:

  1. 接收客戶請求 (節點 A)
  2. 分析請求類型 (節點 B)
  3. 如果是一般查詢,則 提供標準答案 (節點 C)
  4. 如果是技術問題,則 轉交技術支援 (節點 D)

流程可以讓這些步驟順暢地串聯起來,並根據情況做出不同的選擇。

流程的核心概念

1. 基於行動的轉換 (Action-based Transitions)

我們在節點 (Node) 章節中提到,每個節點的 post() 方法會回傳一個「行動指令」(Action) 字串。這個行動指令非常重要,因為它告訴流程接下來應該做什麼。

流程會根據這個行動指令,以及您預先定義好的「轉換規則」(transitions),來決定下一個要執行的節點。

您可以使用非常直觀的語法來定義轉換規則:

  1. 基本預設轉換node_a >> node_b

    • 這表示如果 node_a.post() 回傳 "default",則流程會接著執行 node_b
    • 這等同於 node_a - "default" >> node_b
  2. 指定行動轉換node_a - "action_name" >> node_b

    • 這表示如果 node_a.post() 回傳 "action_name",則流程會接著執行 node_b

透過這些規則,您可以建立簡單的線性流程、包含分支邏輯的流程,甚至是包含循環的流程。

2. 建立流程 (Creating a Flow)

建立一個流程很簡單。您只需要指定流程的起始節點 (start node)

from pocketflow import Flow, Node # 假設 pocketflow 已安裝

# 先定義一些節點 (稍後會詳細說明)
class NodeA(Node):
    def post(self, shared, prep_res, exec_res):
        print("節點 A 完成")
        return "go_to_b" # 回傳一個自訂行動

class NodeB(Node):
    def post(self, shared, prep_res, exec_res):
        print("節點 B 完成")
        # 沒有回傳,預設是 "default"

# 建立節點實例
node_a = NodeA()
node_b = NodeB()

# 定義轉換規則
node_a - "go_to_b" >> node_b # 如果 node_a 回傳 "go_to_b",則執行 node_b

# 建立流程,並指定 node_a 為起始節點
my_flow = Flow(start=node_a)

在這段程式碼中,我們首先定義了兩個簡單的節點 NodeANodeB。然後,我們建立了它們的實例,並設定了一個轉換規則:如果 node_apost 方法回傳 "go_to_b",則下一個執行 node_b。最後,我們用 Flow(start=node_a) 創建了一個流程。

3. 運行流程 (Running a Flow)

一旦流程建立好了,您就可以使用 flow.run(shared) 來執行它。它會從起始節點開始執行,然後根據每個節點回傳的行動指令和您定義的轉換規則,依序執行後續的節點,直到沒有下一個節點可執行為止。

# 假設 node_a, node_b, my_flow 已如上定義
shared_data = {} # 準備共享儲存

print("--- 開始運行流程 ---")
my_flow.run(shared_data)
print("--- 流程運行結束 ---")

當您運行這段程式碼時,會發生:

  1. my_flownode_a 開始執行。
  2. node_a 執行完畢,其 post 方法回傳 "go_to_b"
  3. 流程根據轉換規則 node_a - "go_to_b" >> node_b,找到下一個節點是 node_b
  4. node_b 開始執行。
  5. node_b 執行完畢,其 post 方法回傳預設的 "default"
  6. 流程查找 node_b 在行動 "default" 下是否有後續節點。在這個例子中,我們沒有定義,所以流程在此結束。

輸出將會是:

--- 開始運行流程 ---
節點 A 完成
節點 B 完成
--- 流程運行結束 ---

小試身手:建立一個使用者問候流程

讓我們用之前在 共享儲存 (Shared Store) 章節中見過的 GetUserNameNodeGreetUserNode 來建立一個完整的流程。

首先,我們需要確保節點的 post 方法會適當地回傳行動指令,以便流程可以串聯它們。

from pocketflow import Node, Flow

class GetUserNameNode(Node):
    def exec(self, prep_res):
        user_name = input("請輸入您的名字:") # 實際獲取使用者名稱
        return user_name

    def post(self, shared, prep_res, exec_res):
        shared["user_name_key"] = exec_res # 將名稱存入共享儲存
        print(f"GetUserNameNode: 已儲存名稱 '{exec_res}'。")
        return "name_acquired" # 回傳行動,表示名稱已獲取

class GreetUserNode(Node):
    def prep(self, shared):
        name = shared.get("user_name_key", "訪客")
        return name

    def exec(self, prep_res):
        greeting = f"你好,{prep_res}!歡迎探索 PocketFlow 流程。"
        return greeting

    def post(self, shared, prep_res, exec_res):
        shared["greeting_message_key"] = exec_res
        print(f"GreetUserNode: {exec_res}")
        # 沒有明確 return,預設行動 "default",流程在此結束 (如果沒有後續)

現在,我們可以連接這些節點並建立流程:

# 建立節點實例
get_name_node = GetUserNameNode()
greet_user_node = GreetUserNode()

# 定義轉換規則:
# 如果 get_name_node 的 post 回傳 "name_acquired",則執行 greet_user_node
get_name_node - "name_acquired" >> greet_user_node

# 建立流程,以 get_name_node 為起始點
greeting_flow = Flow(start=get_name_node)

# 準備共享儲存並運行流程
shared_data = {}
greeting_flow.run(shared_data)

print(f"共享儲存最終內容:{shared_data}")

當您運行這段程式碼時:

  1. 程式會提示您輸入名字。假設您輸入「小華」。
  2. GetUserNameNode 將「小華」存入 shared_data 並回傳 "name_acquired"
  3. 流程根據規則,執行 GreetUserNode
  4. GreetUserNodeshared_data 讀取「小華」,產生問候語並印出。
  5. GreetUserNodepost 方法回傳預設的 "default"。由於沒有為此行動定義下一個節點,流程結束。

輸出範例 (假設輸入「小華」):

請輸入您的名字:小華
GetUserNameNode: 已儲存名稱 '小華'。
GreetUserNode: 你好,小華!歡迎探索 PocketFlow 流程。
共享儲存最終內容:{'user_name_key': '小華', 'greeting_message_key': '你好,小華!歡迎探索 PocketFlow 流程。'}

實例:具備分支與循環的費用報銷流程

流程不僅僅是線性的。它們可以包含分支和循環,使其能夠處理更複雜的邏輯。 讓我們看看一個簡化的費用報銷流程範例。ReviewExpense 節點可以回傳三種可能的行動:

# 假設我們已經定義了以下節點類別:
# ReviewExpenseNode, PaymentNode, ReviseNode, FinishNode
# 並且它們的 post 方法會回傳相應的行動指令

review_node = ReviewExpenseNode()
payment_node = PaymentNode()
revise_node = ReviseNode()
finish_node = FinishNode()

# 定義流程轉換規則
review_node - "approved" >> payment_node       # 核准則付款
review_node - "needs_revision" >> revise_node  # 需修改則退回修改
review_node - "rejected" >> finish_node        # 拒絕則結束

revise_node >> review_node     # 修改後,回到審核 (預設行動)
payment_node >> finish_node    # 付款後,結束 (預設行動)

# 建立流程
expense_flow = Flow(start=review_node)

# expense_flow.run(shared_data) # 運行流程

這個流程的運作方式如下:

  1. 如果 review_node.post() 回傳 "approved",費用將移至 payment_node
  2. 如果 review_node.post() 回傳 "needs_revision",則移至 revise_node,後者完成後會循環回到 review_node 進行再次審核。
  3. 如果 review_node.post() 回傳 "rejected",則移至 finish_node 並停止。

以下是此流程的圖示:

flowchart TD A[審核費用 ReviewExpense] -->|核准 approved| B[處理付款 Payment] A -->|需修改 needs_revision| C[修改報告 ReviseReport] A -->|拒絕 rejected| D[結束流程 Finish] C --> A B --> D

單獨運行節點 vs. 運行流程

您可能記得,在節點 (Node) 章節中,我們使用 node.run(shared) 來執行單個節點。

node.run(shared) 不會 自動執行後續節點。 這主要用於調試或測試單個節點。

在生產環境中,請始終使用 flow.run(...) 以確保完整的流程按預期運行。 {: .warning }

巢狀流程 (Nested Flows)

PocketFlow 的一個強大功能是流程本身也可以被視為一個節點。這使得我們可以進行強大的組合:

  1. 將一個流程(子流程)作為另一個流程(父流程)中的一個節點來使用。
  2. 將多個較小的、專注的流程組合成一個更大的、可重用的流程。
  3. 子流程中的節點參數 (params) 將是所有父流程 params 的合併。

流程的節點特性

由於一個流程 (Flow) 也繼承自 BaseNode (與 Node 相同的基礎類別),所以它也有 prep()post() 方法。但是:

基本的流程巢狀

以下是如何將一個子流程連接到另一個節點:

# 假設 NodeA, NodeB, NodeC 已定義
node_a = NodeA()
node_b = NodeB()
node_c = NodeC()

# 建立一個子流程
node_a >> node_b  # node_a 的預設行動導向 node_b
sub_flow = Flow(start=node_a) # 子流程從 node_a 開始

# 將子流程連接到另一個節點 node_c
# sub_flow 的預設行動 (即其內部最後一個節點的預設行動) 將導向 node_c
sub_flow >> node_c

# 建立父流程
parent_flow = Flow(start=sub_flow)

# shared_data = {}
# parent_flow.run(shared_data)

parent_flow.run() 執行時:

  1. 它啟動 sub_flow
  2. sub_flow 執行其內部的節點(例如 node_a -> node_b)。
  3. sub_flow 完成後,其 post() 方法會回傳一個行動指令 (通常是其內部最後一個節點的行動指令)。
  4. 父流程 parent_flow 根據這個行動指令和 sub_flow >> node_c 的規則,繼續執行到 node_c

實例:訂單處理流程

這是一個將訂單處理分解為巢狀流程的實際範例:

# 假設以下節點已定義:
# ValidatePaymentNode, ProcessPaymentNode, PaymentConfirmationNode
# CheckStockNode, ReserveItemsNode, UpdateInventoryNode
# CreateLabelNode, AssignCarrierNode, SchedulePickupNode

# 付款處理子流程
validate_payment = ValidatePaymentNode()
process_payment = ProcessPaymentNode()
payment_confirmation = PaymentConfirmationNode()
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)

#庫存管理子流程
check_stock = CheckStockNode()
reserve_items = ReserveItemsNode()
update_inventory = UpdateInventoryNode()
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)

# 運送安排子流程
create_label = CreateLabelNode()
assign_carrier = AssignCarrierNode()
schedule_pickup = SchedulePickupNode()
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)

# 將子流程串聯成主訂單流程
# 假設每個子流程完成後都回傳預設行動 "default"
payment_flow >> inventory_flow
inventory_flow >> shipping_flow

# 建立主流程
order_pipeline = Flow(start=payment_flow)

# shared_data = {}
# order_pipeline.run(shared_data) # 運行整個訂單處理流程

這種結構實現了關注點分離,同時保持了清晰的執行路徑:

flowchart LR subgraph order_pipeline[訂單處理總流程] subgraph paymentFlow["付款流程"] A[驗證付款] --> B[處理付款] --> C[付款確認] end subgraph inventoryFlow["庫存流程"] D[檢查庫存] --> E[預留商品] --> F[更新庫存] end subgraph shippingFlow["運送流程"] G[建立標籤] --> H[指派貨運] --> I[安排取件] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end

幕後揭秘:流程是如何運作的?

當您呼叫一個流程的 flow.run(shared) 方法時,PocketFlow 內部會發生什麼事呢?

非程式碼的逐步解析:

  1. 啟動流程:您呼叫 my_flow.run(shared_data)
  2. 準備階段 (Flow prep):如果 my_flow 覆寫了 prep() 方法,則首先執行它。
  3. 進入點:流程從其 start_node(起始節點)開始。我們稱當前執行的節點為 current_node
  4. 執行當前節點
    • 流程內部呼叫 current_node._run(shared_data) (這會依序執行該節點的 prep, _exec (包含重試和回退), post)。
    • current_nodepost() 方法回傳一個行動指令 (例如 action_returned)。
  5. 查找下一個節點
    • 流程查看 current_node 的轉換規則中,是否有針對 action_returned 的後續節點。
    • 例如,如果規則是 current_node - "action_returned" >> next_node_instance,那麼 next_node_instance 就是下一個要執行的節點。
    • 如果 action_returnedNone"default",且有 current_node >> next_node_instance 這樣的規則,則也會找到 next_node_instance
  6. 流程推進
    • 如果找到了下一個節點,則將該節點設為新的 current_node,並重複步驟 4。
    • 如果沒有為 action_returned 找到對應的後續節點,流程就會在此結束。
  7. 善後階段 (Flow post):流程執行完其內部所有節點後(或提前結束),如果 my_flow 覆寫了 post() 方法,則執行它。_orch (內部協調方法) 的回傳值 (通常是內部流程最後一個節點的行動指令) 會作為 exec_res 傳給流程的 post 方法。流程的 post 方法本身也可以回傳一個行動指令,這使得流程可以被巢狀使用。

序列圖 (Sequence Diagram) 概覽:

sequenceDiagram participant 使用者 participant MyFlow as 流程實例 participant NodeA as 節點A participant NodeB as 節點B participant SharedStore as 共享儲存 使用者->>MyFlow: run(shared_data) MyFlow->>MyFlow: prep(shared_data) (流程自身的準備) MyFlow->>NodeA: (設定為當前節點, 從 start_node 開始) NodeA->>NodeA: _run(shared_data) (執行 節點A 的 prep, exec, post) NodeA-->>SharedStore: (讀取/寫入共享儲存) SharedStore-->>NodeA: (資料) NodeA->>MyFlow: 回傳行動 "action_X" MyFlow->>MyFlow: 根據 "action_X" 查找 NodeA 的下一個節點 (假設是 NodeB) MyFlow->>NodeB: (設定 NodeB 為當前節點) NodeB->>NodeB: _run(shared_data) (執行 節點B 的 prep, exec, post) NodeB-->>SharedStore: (讀取/寫入共享儲存) SharedStore-->>NodeB: (資料) NodeB->>MyFlow: 回傳行動 "action_Y" MyFlow->>MyFlow: 根據 "action_Y" 查找 NodeB 的下一個節點 (假設無) Note right of MyFlow: 內部流程結束,"action_Y" 作為 _orch 的結果 MyFlow->>MyFlow: post(shared_data, prep_res_flow, "action_Y") (流程自身的善後) MyFlow-->>使用者: (流程結束,回傳 Flow.post 的結果)

相關程式碼片段 (簡化版):

pocketflow/__init__.py 檔案中,Flow 類別的相關實作大致如下:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class Flow(BaseNode): # Flow 繼承自 BaseNode,所以也有 prep, post
    def __init__(self, start=None):
        super().__init__()
        self.start_node = start # 設定起始節點

    def get_next_node(self, curr_node, action):
        # 根據當前節點和行動指令,獲取下一個節點
        # 如果行動是 None 或空字串,則預設為 "default"
        next_node_candidate = curr_node.successors.get(action or "default")
        # ... (省略警告部分)
        return next_node_candidate

    def _orch(self, shared, params=None): # 內部協調方法
        current_node = copy.copy(self.start_node) # 從起始節點開始
        last_action_returned = None
        
        while current_node:
            # current_node.set_params(params) # 設定節點參數 (如果有的話)
            last_action_returned = current_node._run(shared) # 執行當前節點
            # 獲取下一個節點,注意這裡複製了下一個節點,避免修改原始流程定義
            current_node = copy.copy(self.get_next_node(current_node, last_action_returned))
        
        return last_action_returned # 回傳內部流程最後一個節點的行動

    def _run(self, shared): # Flow 執行時呼叫
        prep_result_for_flow = self.prep(shared)    # 1. 執行 Flow 自己的 prep
        # 2. 執行內部節點的編排 (_orch)
        # _orch 的回傳結果 (內部最後一個節點的行動) 會作為 exec_res 傳給 Flow 的 post
        orchestration_result = self._orch(shared)
        # 3. 執行 Flow 自己的 post
        return self.post(shared, prep_result_for_flow, orchestration_result)

    # Flow 的 post 方法預設會回傳 orchestration_result (內部流程的最後行動)
    # 這使得 Flow 本身也可以回傳行動,用於巢狀流程
    def post(self, shared, prep_res, exec_res):
        return exec_res

# 以下是定義轉換規則的語法糖部分:
# class BaseNode:
#     def __rshift__(self, other_node): # 處理 >> 運算子 (預設轉換)
#         return self.next(other_node, "default")
#
#     def __sub__(self, action_string): # 處理 - "action" 部分
#         if isinstance(action_string, str):
#             # 回傳一個臨時對象,該對象可以再接收 >> other_node
#             return _ConditionalTransition(self, action_string)
#         raise TypeError("Action must be a string")

# class _ConditionalTransition: # 臨時對象
#     def __init__(self, source_node, action_name):
#         self.source_node = source_node
#         self.action_name = action_name
#
#     def __rshift__(self, target_node): # 處理 - "action" >> target_node
#         return self.source_node.next(target_node, self.action_name)

這段簡化的程式碼展示了 Flow 如何透過 _orch 方法來迭代執行其內部的節點。_orch 方法會追蹤當前節點,執行它,然後使用 get_next_node 來根據節點回傳的行動找到下一個節點,直到沒有更多節點可執行。 Flow 自身的 _run 方法會先執行自己的 prep,然後呼叫 _orch 來執行內部流程,最後執行自己的 postFlow.post 預設會回傳內部流程最後一個節點的行動,這使得流程可以自然地嵌入到其他流程中。 __rshift__ (對應 >>) 和 __sub__ (對應 -) 的特殊方法以及 _ConditionalTransition 類別,共同實現了定義節點轉換時的簡潔語法。

總結

在本章中,我們學習了 PocketFlow 中負責協調節點執行的核心概念——流程 (Flow)

節點是工人,共享儲存是他們的共享工作台和材料,而流程則是那位指揮若定的生產線經理或樂隊指揮。它們共同構成了 PocketFlow 強大而靈活的工作流程編排能力。

到目前為止,我們已經了解了單個節點、節點間的數據交換,以及如何將節點組合成一個可執行的流程。在下一章中,我們將介紹一個更高層次的抽象概念——工作流 (Workflow),它允許我們將流程與其他元數據(如名稱、版本)打包在一起,以便更好地管理和部署。