在上一章 流程 (Flow) 中,我們學習了如何將多個節點 (Node) 串聯起來,形成一個有順序、有邏輯的執行路徑。我們看到了「流程」就像一位生產線經理,指揮著各個節點按部就班地工作。
現在,我們將更進一步,探討如何運用這些知識來解決更大型、更複雜的真實世界任務。這就是「工作流 (Workflow)」概念的用武之地。想像一下,您不再只是管理一條生產線,而是要設計並運營整個工廠的生產藍圖。
許多真實世界的任務都非常複雜,遠非單一的指令或一個簡單的流程 (Flow) 就能完成。例如,要撰寫一本完整的書籍,涉及的步驟可能包括:
如果試圖一次性完成所有這些,不僅容易出錯,而且難以管理進度。更明智的方法是將這個龐大的專案「分解」成一系列較小、可管理且相互關聯的階段。
工作流 (Workflow) 正是這樣一種理念與實踐:它指將一個複雜任務分解為一系列較小、可管理的步驟(在 PocketFlow 中通常是節點 (Node)),並將它們串聯起來形成一個有序的執行流程。PocketFlow 的核心設計便是支持這種任務分解,通過節點間的鏈接和行動指令來定義工作流的具體執行路徑,確保任務按部就班地完成。
圖示:工作流將複雜任務分解為節點鏈
在本章中,我們將學習如何運用 PocketFlow 的組件來設計和實現這樣的工作流。
您可能會問,PocketFlow 中是否有一個叫做 Workflow
的特定類別呢?
實際上,在 PocketFlow 目前的設計中,「工作流 (Workflow)」更多的是一種設計模式或一種方法論,而不是一個獨立的類別。我們通過組合使用已經學過的 節點 (Node) 和 流程 (Flow) 來構建和實現工作流。
因此,當我們談論在 PocketFlow 中創建一個「工作流」時,我們實質上是在設計一個結構良好、目標明確的 流程 (Flow),用以解決一個特定的複雜問題。
讓我們以一個常見的任務為例:自動撰寫一篇關於特定主題的文章。這個任務可以分解為以下幾個主要步驟:
我們將為每個步驟創建一個節點 (Node),然後將它們組合成一個流程 (Flow),這個流程就構成了我們的「文章撰寫工作流」。
首先,我們定義三個節點。為了簡化,我們將用模擬函數 call_llm_for_task
來代表對大型語言模型的調用。
from pocketflow import Node, Flow
# 模擬呼叫大型語言模型
def call_llm_for_task(task_description, input_data):
print(f"🤖 正在為任務 '{task_description}' 呼叫 LLM,輸入:'{str(input_data)[:50]}...'")
if task_description == "產生大綱":
return f"主題 '{input_data}' 的大綱:1. 引言 2. 主要論點 3. 結論"
elif task_description == "撰寫內容":
return f"基於大綱 '{input_data[:30]}...' 的詳細內容。"
elif task_description == "風格潤飾":
return f"已潤飾的文章:'{input_data[:30]}...' (風格:引人入勝)"
return "未知任務的 LLM 結果"
class GenerateOutlineNode(Node):
def prep(self, shared):
return shared.get("topic", "未知主題") # 從共享儲存讀取主題
def exec(self, prep_res): # prep_res 是主題
return call_llm_for_task("產生大綱", prep_res)
def post(self, shared, prep_res, exec_res): # exec_res 是大綱
shared["outline"] = exec_res
print(f"✅ 大綱已產生並儲存:{exec_res}")
class WriteContentNode(Node):
def prep(self, shared):
return shared.get("outline", "沒有大綱") # 從共享儲存讀取大綱
def exec(self, prep_res): # prep_res 是大綱
return call_llm_for_task("撰寫內容", prep_res)
def post(self, shared, prep_res, exec_res): # exec_res 是初稿
shared["draft_content"] = exec_res
print(f"✅ 初稿已撰寫並儲存:{str(exec_res)[:60]}...")
class ApplyStyleNode(Node):
def prep(self, shared):
return shared.get("draft_content", "沒有初稿") # 從共享儲存讀取初稿
def exec(self, prep_res): # prep_res 是初稿
return call_llm_for_task("風格潤飾", prep_res)
def post(self, shared, prep_res, exec_res): # exec_res 是最終文章
shared["final_article"] = exec_res
print(f"🎉 最終文章已完成並儲存:{str(exec_res)[:60]}...")
GenerateOutlineNode
:接收一個「主題」(topic),輸出一個「大綱」(outline)。WriteContentNode
:接收「大綱」,輸出「初稿內容」(draft_content)。ApplyStyleNode
:接收「初稿內容」,輸出「最終文章」(final_article)。每個節點都遵循 prep
-> exec
-> post
的模式,並使用共享儲存 (Shared Store)來讀取輸入和儲存輸出。
現在,我們將這些節點實例化,並將它們連接成一個流程 (Flow)。由於這是一個簡單的線性序列,我們使用 >>
運算符進行連接。
# 建立節點實例
outline_node = GenerateOutlineNode()
write_node = WriteContentNode()
style_node = ApplyStyleNode()
# 將節點連接起來形成流程
# 假設每個節點成功後都執行下一個 (使用預設的 "default" 行動)
outline_node >> write_node
write_node >> style_node
# 建立流程,將 outline_node 設為起始節點
article_workflow = Flow(start=outline_node)
這段程式碼定義了節點的執行順序:outline_node
完成後執行 write_node
,write_node
完成後執行 style_node
。這個 article_workflow
就是我們設計的「文章撰寫工作流」。
最後,我們可以準備初始的共享儲存 (Shared Store)(包含文章主題),然後運行整個工作流。
# 準備共享儲存,放入初始資料
shared_data = {"topic": "AI 的未來"}
print(f"=== 🚀 開始文章撰寫工作流,主題:{shared_data['topic']} ===")
# 運行工作流
article_workflow.run(shared_data)
print("\n=== ✨ 工作流執行完畢 ===")
print(f"最終文章預覽:{shared_data.get('final_article')}")
當您運行這段程式碼時,您會看到類似以下的輸出(順序和具體內容可能因模擬函數而略有不同):
=== 🚀 開始文章撰寫工作流,主題:AI 的未來 ===
🤖 正在為任務 '產生大綱' 呼叫 LLM,輸入:'AI 的未來...'
✅ 大綱已產生並儲存:主題 'AI 的未來' 的大綱:1. 引言 2. 主要論點 3. 結論
🤖 正在為任務 '撰寫內容' 呼叫 LLM,輸入:'主題 'AI 的未來' 的大綱:1. 引言 2. 主...'
✅ 初稿已撰寫並儲存:基於大綱 '主題 'AI 的未來' 的大綱:1. 引言 2. 主...' 的詳細內容。
🤖 正在為任務 '風格潤飾' 呼叫 LLM,輸入:'基於大綱 '主題 'AI 的未來' 的大綱:1. 引言 2. 主...'
🎉 最終文章已完成並儲存:已潤飾的文章:'基於大綱 '主題 'AI 的未來' 的大綱:1. 引言 2. 主...' (風格:引人入勝)
=== ✨ 工作流執行完畢 ===
最終文章預覽:已潤飾的文章:'基於大綱 '主題 'AI 的未來' 的大綱:1. 引言 2. 主...' (風格:引人入勝)
正如您所見,我們的「文章撰寫工作流」成功地按照預期順序執行了所有步驟,並將最終結果儲存在了共享儲存 (Shared Store)中。我們通過將複雜任務分解為簡單節點,並用流程將它們串聯起來,有效地實現了一個自動化工作流。
在設計自己的工作流時,有幾個重要的原則可以幫助您:
任務分解的粒度 (Granularity of Task Decomposition):
明確的職責 (Clear Responsibilities):
數據流的設計 (Data Flow Design):
錯誤處理與容錯 (Error Handling and Fault Tolerance):
max_retries
, wait
) 和回退邏輯 (exec_fallback
) 來增強工作流的強韌性。可重用性 (Reusability):
最佳實踐提醒:
- 任務劃分不宜過粗,否則單個 LLM 調用可能難以處理。
- 任務劃分不宜過細,否則 LLM 可能缺乏足夠上下文,導致節點間結果不一致。
- 通常需要多次迭代才能找到最佳平衡點。
- 如果任務包含大量動態分支或需要根據上下文靈活決策的邊緣情況,可以考慮使用更高級的抽象,例如我們將在後續章節介紹的 代理人/智能體 (Agent)。 {: .best-practice }
由於我們在 PocketFlow 中使用 流程 (Flow) 來實現工作流,因此工作流的內部運作機制與流程的運作機制是相同的。這裡我們快速回顧一下:
workflow.run(shared_data)
(其中 workflow
是一個 Flow
物件實例)時,流程從其指定的起始節點開始。prep
、exec
(可能包含重試或回退)和 post
方法。post
方法回傳一個「行動指令」。下面的序列圖展示了我們的「文章撰寫工作流」執行時的簡化交互過程:
這個圖清晰地展示了控制流(節點如何被依次調用)和數據流(數據如何在節點之間通過共享儲存傳遞)。更詳細的關於流程 (Flow) 的內部機制,您可以參考上一章的內容。
在本章中,我們探討了「工作流 (Workflow)」的概念及其在 PocketFlow 中的實現:
掌握了工作流的設計理念,您就能夠運用 PocketFlow 來解決更廣泛、更複雜的自動化問題,將宏大的想法一步步變為現實。
在我們設計好單個工作流之後,經常會遇到需要對大量不同的輸入數據重複執行同一個工作流的情況。例如,我們可能需要為 100 個不同的主題都生成一篇文章。如何高效地處理這種情況呢?這就是我們下一章將要介紹的內容:批次處理 (Batch)。