在前一章 共享儲存 (Shared Store) 中,我們探討了節點之間如何透過「共享儲存」來交換資料。我們知道了節點可以在 prep
階段從共享儲存讀取資訊,並在 post
階段將成果寫回。這就像團隊成員將他們的成果放在共享的白板上。
但是,如果我們有多個節點 (Node),例如一個節點負責下載資料,另一個節點負責分析資料,第三個節點負責產生報告,我們如何確保它們按照正確的順序執行,並且根據前一個節點的結果決定下一步該做什麼呢?這就需要一個「生產線經理」來協調指揮。這就是本章的主角——流程 (Flow)。
想像一下,您正在指揮一個小型樂隊。每個樂手(節點)都知道如何演奏自己的樂器(執行任務),但他們需要一位指揮(流程)來告訴他們何時開始、何時停止,以及如何與其他樂手協調,才能演奏出一首和諧的樂曲。
流程 (Flow) 的核心任務就是編排和協調多個節點的執行順序與邏輯。它像一位工廠的生產線經理,根據每個工人(節點)完成任務後的成果(post
方法返回的行動指令),決定下一個應該執行哪個工序(節點),從而引導整個生產流程。
例如,一個簡單的客戶服務流程可能包含:
流程可以讓這些步驟順暢地串聯起來,並根據情況做出不同的選擇。
我們在節點 (Node) 章節中提到,每個節點的 post()
方法會回傳一個「行動指令」(Action) 字串。這個行動指令非常重要,因為它告訴流程接下來應該做什麼。
"default"
):如果 post()
方法沒有明確回傳任何字串,PocketFlow 會將其視為回傳了 "default"
。post()
回傳任何您定義的字串,例如 "approved"
、"needs_revision"
、"error_occurred"
等。流程會根據這個行動指令,以及您預先定義好的「轉換規則」(transitions),來決定下一個要執行的節點。
您可以使用非常直觀的語法來定義轉換規則:
基本預設轉換:node_a >> node_b
node_a.post()
回傳 "default"
,則流程會接著執行 node_b
。node_a - "default" >> node_b
。指定行動轉換:node_a - "action_name" >> node_b
node_a.post()
回傳 "action_name"
,則流程會接著執行 node_b
。透過這些規則,您可以建立簡單的線性流程、包含分支邏輯的流程,甚至是包含循環的流程。
建立一個流程很簡單。您只需要指定流程的起始節點 (start node)。
from pocketflow import Flow, Node # 假設 pocketflow 已安裝
# 先定義一些節點 (稍後會詳細說明)
class NodeA(Node):
def post(self, shared, prep_res, exec_res):
print("節點 A 完成")
return "go_to_b" # 回傳一個自訂行動
class NodeB(Node):
def post(self, shared, prep_res, exec_res):
print("節點 B 完成")
# 沒有回傳,預設是 "default"
# 建立節點實例
node_a = NodeA()
node_b = NodeB()
# 定義轉換規則
node_a - "go_to_b" >> node_b # 如果 node_a 回傳 "go_to_b",則執行 node_b
# 建立流程,並指定 node_a 為起始節點
my_flow = Flow(start=node_a)
在這段程式碼中,我們首先定義了兩個簡單的節點 NodeA
和 NodeB
。然後,我們建立了它們的實例,並設定了一個轉換規則:如果 node_a
的 post
方法回傳 "go_to_b"
,則下一個執行 node_b
。最後,我們用 Flow(start=node_a)
創建了一個流程。
一旦流程建立好了,您就可以使用 flow.run(shared)
來執行它。它會從起始節點開始執行,然後根據每個節點回傳的行動指令和您定義的轉換規則,依序執行後續的節點,直到沒有下一個節點可執行為止。
# 假設 node_a, node_b, my_flow 已如上定義
shared_data = {} # 準備共享儲存
print("--- 開始運行流程 ---")
my_flow.run(shared_data)
print("--- 流程運行結束 ---")
當您運行這段程式碼時,會發生:
my_flow
從 node_a
開始執行。node_a
執行完畢,其 post
方法回傳 "go_to_b"
。node_a - "go_to_b" >> node_b
,找到下一個節點是 node_b
。node_b
開始執行。node_b
執行完畢,其 post
方法回傳預設的 "default"
。node_b
在行動 "default"
下是否有後續節點。在這個例子中,我們沒有定義,所以流程在此結束。輸出將會是:
--- 開始運行流程 ---
節點 A 完成
節點 B 完成
--- 流程運行結束 ---
讓我們用之前在 共享儲存 (Shared Store) 章節中見過的 GetUserNameNode
和 GreetUserNode
來建立一個完整的流程。
首先,我們需要確保節點的 post
方法會適當地回傳行動指令,以便流程可以串聯它們。
from pocketflow import Node, Flow
class GetUserNameNode(Node):
def exec(self, prep_res):
user_name = input("請輸入您的名字:") # 實際獲取使用者名稱
return user_name
def post(self, shared, prep_res, exec_res):
shared["user_name_key"] = exec_res # 將名稱存入共享儲存
print(f"GetUserNameNode: 已儲存名稱 '{exec_res}'。")
return "name_acquired" # 回傳行動,表示名稱已獲取
class GreetUserNode(Node):
def prep(self, shared):
name = shared.get("user_name_key", "訪客")
return name
def exec(self, prep_res):
greeting = f"你好,{prep_res}!歡迎探索 PocketFlow 流程。"
return greeting
def post(self, shared, prep_res, exec_res):
shared["greeting_message_key"] = exec_res
print(f"GreetUserNode: {exec_res}")
# 沒有明確 return,預設行動 "default",流程在此結束 (如果沒有後續)
現在,我們可以連接這些節點並建立流程:
# 建立節點實例
get_name_node = GetUserNameNode()
greet_user_node = GreetUserNode()
# 定義轉換規則:
# 如果 get_name_node 的 post 回傳 "name_acquired",則執行 greet_user_node
get_name_node - "name_acquired" >> greet_user_node
# 建立流程,以 get_name_node 為起始點
greeting_flow = Flow(start=get_name_node)
# 準備共享儲存並運行流程
shared_data = {}
greeting_flow.run(shared_data)
print(f"共享儲存最終內容:{shared_data}")
當您運行這段程式碼時:
GetUserNameNode
將「小華」存入 shared_data
並回傳 "name_acquired"
。GreetUserNode
。GreetUserNode
從 shared_data
讀取「小華」,產生問候語並印出。GreetUserNode
的 post
方法回傳預設的 "default"
。由於沒有為此行動定義下一個節點,流程結束。輸出範例 (假設輸入「小華」):
請輸入您的名字:小華
GetUserNameNode: 已儲存名稱 '小華'。
GreetUserNode: 你好,小華!歡迎探索 PocketFlow 流程。
共享儲存最終內容:{'user_name_key': '小華', 'greeting_message_key': '你好,小華!歡迎探索 PocketFlow 流程。'}
流程不僅僅是線性的。它們可以包含分支和循環,使其能夠處理更複雜的邏輯。
讓我們看看一個簡化的費用報銷流程範例。ReviewExpense
節點可以回傳三種可能的行動:
"approved"
:費用已核准,移至付款處理。"needs_revision"
:費用需要修改,退回修改。"rejected"
:費用被拒絕,結束流程。# 假設我們已經定義了以下節點類別:
# ReviewExpenseNode, PaymentNode, ReviseNode, FinishNode
# 並且它們的 post 方法會回傳相應的行動指令
review_node = ReviewExpenseNode()
payment_node = PaymentNode()
revise_node = ReviseNode()
finish_node = FinishNode()
# 定義流程轉換規則
review_node - "approved" >> payment_node # 核准則付款
review_node - "needs_revision" >> revise_node # 需修改則退回修改
review_node - "rejected" >> finish_node # 拒絕則結束
revise_node >> review_node # 修改後,回到審核 (預設行動)
payment_node >> finish_node # 付款後,結束 (預設行動)
# 建立流程
expense_flow = Flow(start=review_node)
# expense_flow.run(shared_data) # 運行流程
這個流程的運作方式如下:
review_node.post()
回傳 "approved"
,費用將移至 payment_node
。review_node.post()
回傳 "needs_revision"
,則移至 revise_node
,後者完成後會循環回到 review_node
進行再次審核。review_node.post()
回傳 "rejected"
,則移至 finish_node
並停止。以下是此流程的圖示:
您可能記得,在節點 (Node) 章節中,我們使用 node.run(shared)
來執行單個節點。
node.run(shared)
:僅執行該節點本身(呼叫其 prep
-> exec
-> post()
),並回傳其行動指令。它不會繼續執行流程中定義的下一個節點。flow.run(shared)
:從起始節點開始執行,並根據行動指令和轉換規則,自動執行後續的節點,直到流程無法繼續為止。
node.run(shared)
不會 自動執行後續節點。 這主要用於調試或測試單個節點。在生產環境中,請始終使用
flow.run(...)
以確保完整的流程按預期運行。 {: .warning }
PocketFlow 的一個強大功能是流程本身也可以被視為一個節點。這使得我們可以進行強大的組合:
params
) 將是所有父流程 params
的合併。由於一個流程 (Flow) 也繼承自 BaseNode
(與 Node
相同的基礎類別),所以它也有 prep()
和 post()
方法。但是:
exec()
方法,因為它的主要邏輯是編排其內部的節點。post()
方法接收到的 exec_res
通常是其內部最後一個節點返回的行動指令 (或者 None
,如果內部流程結束時沒有明確的下一個節點)。它應該從共享儲存 (Shared Store)中獲取流程執行的實際結果。以下是如何將一個子流程連接到另一個節點:
# 假設 NodeA, NodeB, NodeC 已定義
node_a = NodeA()
node_b = NodeB()
node_c = NodeC()
# 建立一個子流程
node_a >> node_b # node_a 的預設行動導向 node_b
sub_flow = Flow(start=node_a) # 子流程從 node_a 開始
# 將子流程連接到另一個節點 node_c
# sub_flow 的預設行動 (即其內部最後一個節點的預設行動) 將導向 node_c
sub_flow >> node_c
# 建立父流程
parent_flow = Flow(start=sub_flow)
# shared_data = {}
# parent_flow.run(shared_data)
當 parent_flow.run()
執行時:
sub_flow
。sub_flow
執行其內部的節點(例如 node_a
-> node_b
)。sub_flow
完成後,其 post()
方法會回傳一個行動指令 (通常是其內部最後一個節點的行動指令)。parent_flow
根據這個行動指令和 sub_flow >> node_c
的規則,繼續執行到 node_c
。這是一個將訂單處理分解為巢狀流程的實際範例:
# 假設以下節點已定義:
# ValidatePaymentNode, ProcessPaymentNode, PaymentConfirmationNode
# CheckStockNode, ReserveItemsNode, UpdateInventoryNode
# CreateLabelNode, AssignCarrierNode, SchedulePickupNode
# 付款處理子流程
validate_payment = ValidatePaymentNode()
process_payment = ProcessPaymentNode()
payment_confirmation = PaymentConfirmationNode()
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
#庫存管理子流程
check_stock = CheckStockNode()
reserve_items = ReserveItemsNode()
update_inventory = UpdateInventoryNode()
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# 運送安排子流程
create_label = CreateLabelNode()
assign_carrier = AssignCarrierNode()
schedule_pickup = SchedulePickupNode()
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# 將子流程串聯成主訂單流程
# 假設每個子流程完成後都回傳預設行動 "default"
payment_flow >> inventory_flow
inventory_flow >> shipping_flow
# 建立主流程
order_pipeline = Flow(start=payment_flow)
# shared_data = {}
# order_pipeline.run(shared_data) # 運行整個訂單處理流程
這種結構實現了關注點分離,同時保持了清晰的執行路徑:
當您呼叫一個流程的 flow.run(shared)
方法時,PocketFlow 內部會發生什麼事呢?
非程式碼的逐步解析:
my_flow.run(shared_data)
。prep
):如果 my_flow
覆寫了 prep()
方法,則首先執行它。start_node
(起始節點)開始。我們稱當前執行的節點為 current_node
。current_node._run(shared_data)
(這會依序執行該節點的 prep
, _exec
(包含重試和回退), post
)。current_node
的 post()
方法回傳一個行動指令 (例如 action_returned
)。current_node
的轉換規則中,是否有針對 action_returned
的後續節點。current_node - "action_returned" >> next_node_instance
,那麼 next_node_instance
就是下一個要執行的節點。action_returned
是 None
或 "default"
,且有 current_node >> next_node_instance
這樣的規則,則也會找到 next_node_instance
。current_node
,並重複步驟 4。action_returned
找到對應的後續節點,流程就會在此結束。post
):流程執行完其內部所有節點後(或提前結束),如果 my_flow
覆寫了 post()
方法,則執行它。_orch
(內部協調方法) 的回傳值 (通常是內部流程最後一個節點的行動指令) 會作為 exec_res
傳給流程的 post
方法。流程的 post
方法本身也可以回傳一個行動指令,這使得流程可以被巢狀使用。序列圖 (Sequence Diagram) 概覽:
相關程式碼片段 (簡化版):
在 pocketflow/__init__.py
檔案中,Flow
類別的相關實作大致如下:
# 位於 pocketflow/__init__.py (為教學目的簡化)
class Flow(BaseNode): # Flow 繼承自 BaseNode,所以也有 prep, post
def __init__(self, start=None):
super().__init__()
self.start_node = start # 設定起始節點
def get_next_node(self, curr_node, action):
# 根據當前節點和行動指令,獲取下一個節點
# 如果行動是 None 或空字串,則預設為 "default"
next_node_candidate = curr_node.successors.get(action or "default")
# ... (省略警告部分)
return next_node_candidate
def _orch(self, shared, params=None): # 內部協調方法
current_node = copy.copy(self.start_node) # 從起始節點開始
last_action_returned = None
while current_node:
# current_node.set_params(params) # 設定節點參數 (如果有的話)
last_action_returned = current_node._run(shared) # 執行當前節點
# 獲取下一個節點,注意這裡複製了下一個節點,避免修改原始流程定義
current_node = copy.copy(self.get_next_node(current_node, last_action_returned))
return last_action_returned # 回傳內部流程最後一個節點的行動
def _run(self, shared): # Flow 執行時呼叫
prep_result_for_flow = self.prep(shared) # 1. 執行 Flow 自己的 prep
# 2. 執行內部節點的編排 (_orch)
# _orch 的回傳結果 (內部最後一個節點的行動) 會作為 exec_res 傳給 Flow 的 post
orchestration_result = self._orch(shared)
# 3. 執行 Flow 自己的 post
return self.post(shared, prep_result_for_flow, orchestration_result)
# Flow 的 post 方法預設會回傳 orchestration_result (內部流程的最後行動)
# 這使得 Flow 本身也可以回傳行動,用於巢狀流程
def post(self, shared, prep_res, exec_res):
return exec_res
# 以下是定義轉換規則的語法糖部分:
# class BaseNode:
# def __rshift__(self, other_node): # 處理 >> 運算子 (預設轉換)
# return self.next(other_node, "default")
#
# def __sub__(self, action_string): # 處理 - "action" 部分
# if isinstance(action_string, str):
# # 回傳一個臨時對象,該對象可以再接收 >> other_node
# return _ConditionalTransition(self, action_string)
# raise TypeError("Action must be a string")
# class _ConditionalTransition: # 臨時對象
# def __init__(self, source_node, action_name):
# self.source_node = source_node
# self.action_name = action_name
#
# def __rshift__(self, target_node): # 處理 - "action" >> target_node
# return self.source_node.next(target_node, self.action_name)
這段簡化的程式碼展示了 Flow
如何透過 _orch
方法來迭代執行其內部的節點。_orch
方法會追蹤當前節點,執行它,然後使用 get_next_node
來根據節點回傳的行動找到下一個節點,直到沒有更多節點可執行。
Flow
自身的 _run
方法會先執行自己的 prep
,然後呼叫 _orch
來執行內部流程,最後執行自己的 post
。Flow.post
預設會回傳內部流程最後一個節點的行動,這使得流程可以自然地嵌入到其他流程中。
__rshift__
(對應 >>
) 和 __sub__
(對應 -
) 的特殊方法以及 _ConditionalTransition
類別,共同實現了定義節點轉換時的簡潔語法。
在本章中,我們學習了 PocketFlow 中負責協調節點執行的核心概念——流程 (Flow)。
post()
方法回傳的行動指令 (Action),來決定下一個執行的節點。>>
(預設行動) 和 - "行動名稱" >>
的語法來定義節點之間的轉換規則。Flow(start=起始節點)
創建,並透過 flow.run(shared_data)
執行。_orch
方法來協調的,它會迭代執行流程中的節點。節點是工人,共享儲存是他們的共享工作台和材料,而流程則是那位指揮若定的生產線經理或樂隊指揮。它們共同構成了 PocketFlow 強大而靈活的工作流程編排能力。
到目前為止,我們已經了解了單個節點、節點間的數據交換,以及如何將節點組合成一個可執行的流程。在下一章中,我們將介紹一個更高層次的抽象概念——工作流 (Workflow),它允許我們將流程與其他元數據(如名稱、版本)打包在一起,以便更好地管理和部署。