Chapter 5: 批次處理 (Batch)

在上一章 工作流 (Workflow) 中,我們學習了如何將複雜的任務分解成一系列的節點 (Node),並使用流程 (Flow) 將它們串聯起來,形成一個完整的工作流。這就像是設計了一條生產線的藍圖。

但是,如果我們需要讓這條生產線處理大量的相似任務,例如,我們設計了一個「文章撰寫工作流」,現在需要為 100 個不同的主題都生成一篇文章;或者,我們有一份非常長的文檔,需要將它分塊進行摘要。這時,逐一處理顯然效率低下。「批次處理 (Batch)」機制正是為了解決這類問題而生的,它能幫助我們高效地處理大量輸入數據或重複執行流程。

批次處理的兩種主要方式

PocketFlow 提供了兩種主要的批次處理方式:

  1. BatchNode:當您需要在單個節點內處理一系列項目時使用。例如,將一個大文件分割成許多小塊,然後對每個小塊執行相同的操作。
  2. BatchFlow:當您需要將整個流程針對不同的參數集重複執行多次時使用。例如,對一系列不同的文件名,重複執行「讀取文件 -> 摘要文件」的流程。

讓我們逐一了解它們。

1. BatchNode:節點內的批次專家

BatchNode 繼承自普通的節點 (Node),但它對 prep()exec()post() 方法的行為做了一些調整,使其特別適合處理一批項目。

想像一位工人 (BatchNode) 拿到一疊文件。

圖示:BatchNode 就像一位工人,逐一處理一疊文件,最後匯總報告。

BatchNode 的核心方法

小試身手:使用 BatchNode 摘要大型文件

假設我們有一個非常大的文本文件,無法一次性載入記憶體或傳遞給語言模型進行摘要。我們可以使用 BatchNode 將其分塊,逐塊摘要,最後合併摘要結果。

首先,我們定義一個 ChunkAndSummarizeNode

from pocketflow import BatchNode, Flow # 假設 pocketflow 已安裝

# 模擬呼叫大型語言模型
def call_llm_for_summary(text_chunk):
    print(f"📚 正在為文本塊 '{text_chunk[:20]}...' 產生摘要...")
    return f"'{text_chunk[:10]}...'的摘要"

class ChunkAndSummarizeNode(BatchNode):
    def prep(self, shared):
        large_text = shared.get("large_document", "")
        # 將長文本分割成較小的文本塊,例如每塊100字元
        chunk_size = 100 
        chunks = [large_text[i:i+chunk_size] for i in range(0, len(large_text), chunk_size)]
        if not chunks:
            print("📄 文件內容為空,沒有可供處理的文本塊。")
        else:
            print(f"🔪 文件已分割成 {len(chunks)} 個文本塊。")
        return chunks # 回傳文本塊列表

prep 方法中,我們從共享儲存 (Shared Store)中獲取長文本,並將其分割成多個小文本塊 (chunks)。

接下來是 exec 方法,它會為每一個文本塊產生摘要:

# (續上)
class ChunkAndSummarizeNode(BatchNode):
    # ... (prep 方法如上) ...

    def exec(self, chunk): # 參數 chunk 是 prep 回傳列表中的一個文本塊
        summary_of_chunk = call_llm_for_summary(chunk)
        return summary_of_chunk # 回傳這個文本塊的摘要

exec 方法接收單個 chunk 作為輸入,並返回該 chunk 的摘要。

最後是 post 方法,它會收集所有文本塊的摘要並合併它們:

# (續上)
class ChunkAndSummarizeNode(BatchNode):
    # ... (prep 和 exec 方法如上) ...

    def post(self, shared, prep_res, exec_res_list):
        # prep_res 是文本塊列表
        # exec_res_list 是每個文本塊摘要組成的列表
        print(f"📝 正在合併 {len(exec_res_list)} 個摘要...")
        combined_summary = "\n".join(exec_res_list)
        shared["final_summary"] = combined_summary # 將最終摘要存入共享儲存
        print("🎉 最終摘要已生成並儲存!")
        return "default" # 預設行動指令

post 方法接收 exec_res_list(所有文本塊摘要的列表),將它們合併,並存儲最終結果。

現在,我們可以運行這個 BatchNode

# 準備共享數據
shared_data = {
    "large_document": "這是一份非常非常長的示範文件,需要被分割成多個小塊來進行處理。" * 3
}

# 建立節點實例
chunk_summarizer = ChunkAndSummarizeNode()

# 建立一個簡單流程來運行它
summarization_flow = Flow(start=chunk_summarizer)

# 運行流程
print("--- 🚀 開始運行 BatchNode ---")
summarization_flow.run(shared_data)
print("--- ✨ BatchNode 運行結束 ---")
print(f"最終摘要:\n{shared_data.get('final_summary')}")

執行上述程式碼,您會看到類似以下的輸出:

--- 🚀 開始運行 BatchNode ---
🔪 文件已分割成 5 個文本塊。
📚 正在為文本塊 '這是一份非常非常長的示範文件,需要被分...' 產生摘要...
📚 正在為文本塊 '割成多個小塊來進行處理。這是一份非常非...' 產生摘要...
📚 正在為文本塊 '常長的示範文件,需要被分割成多個小塊來...' 產生摘要...
📚 正在為文本塊 '進行處理。這是一份非常非常長的示範文件...' 產生摘要...
📚 正在為文本塊 ',需要被分割成多個小塊來進行處理。...' 產生摘要...
📝 正在合併 5 個摘要...
🎉 最終摘要已生成並儲存!
--- ✨ BatchNode 運行結束 ---
最終摘要:
'這是一份非常非...'的摘要
'割成多個小塊來...'的摘要
'常長的示範文件...'的摘要
'進行處理。這是...'的摘要
',需要被分割成...'的摘要

正如所見,ChunkAndSummarizeNodeexec 方法針對每個文本塊被調用了一次,而 post 方法則接收到了所有這些摘要的列表,並成功將它們合併。

2. BatchFlow:流程的批次指揮家

有時候,我們不是要在一個節點內處理一批項目,而是需要將整個流程 (Flow) 重複執行多次,每次執行時使用不同的配置或輸入。這就是 BatchFlow 的用武之地。

想像一位生產線主管 (BatchFlow),他需要讓整條生產線(一個子流程 (Flow))針對不同規格的產品(不同的參數集)重複運作多次,以提高整體效率。

圖示:BatchFlow 像一位生產線主管,讓整條生產線針對不同規格重複運作。

BatchFlowBatchNode 的關鍵區別

理解 BatchFlowBatchNode 的不同非常重要:

  1. prep 的回傳值

    • BatchNode.prep():回傳數據項目本身的可迭代對象(例如文本塊列表)。
    • BatchFlow.prep():回傳一個參數字典 (parameter dictionaries) 的列表。每個字典定義了一次子流程執行時所需的特定參數。
  2. 參數的存取

    • BatchNodeexec(item) 中,item 是數據本身。
    • BatchFlow 所管理的子流程的節點中,是通過 self.params 來存取由 BatchFlow.prep() 提供的當前參數集。這些參數不是共享儲存 (Shared Store)中讀取的。
  3. 執行方式

    • BatchNode:其 exec 方法對每個數據項目執行一次,結果匯總到 post
    • BatchFlow:其內部的子流程 (Flow) 會為 BatchFlow.prep() 返回的每個參數字典獨立運行一次
  4. 子節點類型

    • BatchNode 本身就是一個執行批次操作的節點。
    • BatchFlow 管理的子流程可以由普通的 Node 組成(批次處理發生在流程層級)。

小試身手:使用 BatchFlow 摘要多個文件

假設我們有多個文本文件,需要為每個文件生成摘要。我們可以創建一個處理單個文件的子流程,然後用 BatchFlow 來為每個文件重複執行這個子流程。

首先,定義處理單個文件的子流程中的節點:

LoadSingleFileNode (普通 Node)

from pocketflow import Node, Flow, BatchFlow
import os # 用於模擬文件操作

# 模擬的文件系統
mock_file_system = {
    "file1.txt": "這是文件一的內容,關於貓咪。",
    "file2.txt": "這是文件二的內容,關於狗狗。",
    "file3.txt": "這是文件三的內容,關於小鳥。"
}

class LoadSingleFileNode(Node):
    def prep(self, shared):
        # 從 self.params 獲取當前要處理的文件名
        # 這是由 BatchFlow 傳遞過來的!
        filename = self.params.get("current_filename")
        if not filename:
            print("⚠️ 警告:LoadSingleFileNode 未在 params 中找到 'current_filename'")
            return None
        print(f"📄 正在準備加載文件:{filename}")
        return filename

    def exec(self, filename_to_load): # filename_to_load 是從 prep 返回的文件名
        if filename_to_load is None: return "錯誤:文件名為空"
        # 模擬讀取文件內容
        return mock_file_system.get(filename_to_load, "錯誤:文件未找到")

    def post(self, shared, prep_res, exec_res):
        # 將當前文件內容存入共享儲存,供後續節點使用
        shared["current_document_content"] = exec_res
        print(f"💾 文件 '{prep_res}' 的內容已加載。")
        return "default"

注意 prep 方法中,filename = self.params.get("current_filename") 這一行。文件名是從 self.params 中獲取的,這正是 BatchFlow 傳遞參數的方式。

SummarizeSingleFileNode (普通 Node)

# (續上)
# 模擬 LLM 摘要函數
def call_llm_for_doc_summary(content, filename):
    print(f"📝 正在為文件 '{filename}' 的內容 '{content[:15]}...' 產生摘要...")
    return f"文件 '{filename}' 的摘要:內容精華..."

class SummarizeSingleFileNode(Node):
    def prep(self, shared):
        # 從共享儲存獲取當前文件內容
        return shared.get("current_document_content", "內容為空")

    def exec(self, content_to_summarize):
        # 從 self.params 獲取當前文件名,用於日誌或標記摘要
        filename = self.params.get("current_filename", "未知文件")
        return call_llm_for_doc_summary(content_to_summarize, filename)

    def post(self, shared, prep_res, exec_res):
        # 將摘要結果存入共享儲存
        # 使用 self.params 中的文件名作為鍵,將各文件摘要分開存放
        filename = self.params.get("current_filename", "未知文件")
        if "all_summaries" not in shared:
            shared["all_summaries"] = {}
        shared["all_summaries"][filename] = exec_res
        print(f"🎉 文件 '{filename}' 的摘要已生成:'{str(exec_res)[:30]}...'")
        return "default"

這個節點從共享儲存 (Shared Store)讀取由 LoadSingleFileNode 加載的文件內容,然後進行摘要。在 post 方法中,它也使用了 self.params["current_filename"] 來將不同文件的摘要結果分別儲存。

現在,將這兩個節點組成一個處理單個文件的子流程

# 建立子流程的節點實例
load_node = LoadSingleFileNode()
summarize_node = SummarizeSingleFileNode()

# 連接節點形成子流程
load_node >> summarize_node
single_file_processing_flow = Flow(start=load_node)

接下來,定義 BatchFlow,它的 prep 方法會返回要處理的文件名列表(作為參數字典):

class ProcessMultipleFilesBatchFlow(BatchFlow):
    def prep(self, shared):
        # 假設要處理的文件名列表存儲在共享儲存的 "filenames_to_process" 中
        # 或直接在此處定義
        filenames = shared.get("filenames_to_process", ["file1.txt", "file2.txt", "file3.txt"])
        print(f"🗂️ BatchFlow 準備處理以下文件:{filenames}")
        # IMPORTANT: 返回一個參數字典的列表
        # 每個字典包含 'current_filename' 鍵,供子流程中的節點使用
        return [{"current_filename": fname} for fname in filenames]

    def post(self, shared, prep_res, exec_res_list):
        # BatchFlow 的 post 通常用於批次任務完成後的總體清理或日誌記錄
        # 注意:對於 BatchFlow,exec_res_list 通常是 None
        print("📦 所有文件已通過 BatchFlow 處理完畢!")
        return "default"

ProcessMultipleFilesBatchFlowprep 方法返回了一個列表,例如 [{"current_filename": "file1.txt"}, {"current_filename": "file2.txt"}, ...]

最後,將子流程 single_file_processing_flow 設置為 BatchFlow 的起始點,並運行它:

# 建立 BatchFlow 實例,並將子流程設為其起始點
batch_processor_flow = ProcessMultipleFilesBatchFlow(start=single_file_processing_flow)

# 準備共享數據 (可以為空,因為文件名列表在 BatchFlow 的 prep 中定義)
shared_data_for_batch_flow = {
    "filenames_to_process": ["file1.txt", "file2.txt", "file3.txt", "non_existent_file.txt"]
}

# 運行 BatchFlow
print("--- 🚀 開始運行 BatchFlow ---")
batch_processor_flow.run(shared_data_for_batch_flow)
print("--- ✨ BatchFlow 運行結束 ---")

print("\n📑 所有文件的摘要結果:")
for filename, summary in shared_data_for_batch_flow.get("all_summaries", {}).items():
    print(f"- {filename}: {summary}")

執行上述程式碼,您將看到子流程 single_file_processing_flow 對每個文件名都執行了一次:

--- 🚀 開始運行 BatchFlow ---
🗂️ BatchFlow 準備處理以下文件:['file1.txt', 'file2.txt', 'file3.txt', 'non_existent_file.txt']
📄 正在準備加載文件:file1.txt
💾 文件 'file1.txt' 的內容已加載。
📝 正在為文件 'file1.txt' 的內容 '這是文件一的內容,關...' 產生摘要...
🎉 文件 'file1.txt' 的摘要已生成:'文件 'file1.txt' 的摘要:內容精...'
📄 正在準備加載文件:file2.txt
💾 文件 'file2.txt' 的內容已加載。
📝 正在為文件 'file2.txt' 的內容 '這是文件二的內容,關...' 產生摘要...
🎉 文件 'file2.txt' 的摘要已生成:'文件 'file2.txt' 的摘要:內容精...'
📄 正在準備加載文件:file3.txt
💾 文件 'file3.txt' 的內容已加載。
📝 正在為文件 'file3.txt' 的內容 '這是文件三的內容,關...' 產生摘要...
🎉 文件 'file3.txt' 的摘要已生成:'文件 'file3.txt' 的摘要:內容精...'
📄 正在準備加載文件:non_existent_file.txt
💾 文件 'non_existent_file.txt' 的內容已加載。
📝 正在為文件 'non_existent_file.txt' 的內容 '錯誤:文件未找到...' 產生摘要...
🎉 文件 'non_existent_file.txt' 的摘要已生成:'文件 'non_existent_file.txt' 的摘...'
📦 所有文件已通過 BatchFlow 處理完畢!
--- ✨ BatchFlow 運行結束 ---

📑 所有文件的摘要結果:
- file1.txt: 文件 'file1.txt' 的摘要:內容精華...
- file2.txt: 文件 'file2.txt' 的摘要:內容精華...
- file3.txt: 文件 'file3.txt' 的摘要:內容精華...
- non_existent_file.txt: 文件 'non_existent_file.txt' 的摘要:內容精華...

這個例子清楚地展示了 BatchFlow 如何為每個參數集(這裡是指每個 {"current_filename": ...} 字典)重複運行子流程。子流程中的節點通過 self.params 獲取當前迭代的特定文件名。

3. 巢狀批次處理 (Nested Batches)

PocketFlow 還支持將 BatchFlow 進行巢狀。例如:

在每一層,BatchFlow 都會將其自身的參數字典與父層的參數字典合併。因此,最內層的節點可以通過 self.params 獲取到整個巢狀結構中所有層級的參數(例如,同時獲取 self.params["directory"]self.params["filename"])。

簡化範例:處理多個目錄下的文件

# 假設 ProcessSingleFileNode 是一個處理單個文件路徑的普通 Node
# class ProcessSingleFileNode(Node):
#     def prep(self, shared):
#         # 從 params 獲取完整路徑
#         dir_path = self.params.get("current_dir")
#         file_name = self.params.get("current_file")
#         full_path = os.path.join(dir_path, file_name) # 模擬路徑組合
#         print(f"⚙️ 準備處理文件: {full_path}")
#         return full_path
#     def exec(self, path_to_process):
#         return f"已處理 {path_to_process}"
#     def post(self, shared, prep_res, exec_res):
#         print(f"✅ {exec_res}")

# 模擬的文件系統結構
mock_dirs = {
    "docs": ["guide.txt", "notes.txt"],
    "data": ["report.txt"]
}

class FilesInDirectoryBatchFlow(BatchFlow): # 內部 BatchFlow
    def prep(self, shared):
        current_dir_name = self.params.get("current_dir") # 從外部 BatchFlow 獲取
        # 模擬列出目錄下的文件
        files_in_dir = mock_dirs.get(current_dir_name, [])
        print(f"  📁 內部 BatchFlow:在 '{current_dir_name}' 中找到文件:{files_in_dir}")
        return [{"current_file": f} for f in files_in_dir] # 為每個文件生成參數

class DirectoryBatchFlow(BatchFlow): # 外部 BatchFlow
    def prep(self, shared):
        dir_names = list(mock_dirs.keys()) # ["docs", "data"]
        print(f"🗂️ 外部 BatchFlow:準備處理目錄:{dir_names}")
        return [{"current_dir": d} for d in dir_names] # 為每個目錄生成參數

# 假設 ProcessSingleFileNode 已定義,它會使用 self.params["current_dir"] 和 self.params["current_file"]
process_node = ProcessSingleFileNode() # 這是一個普通節點

# 建立巢狀結構
inner_batch_flow = FilesInDirectoryBatchFlow(start=process_node) # 普通節點作為內部批次的起始
outer_batch_flow = DirectoryBatchFlow(start=inner_batch_flow) # 內部批次作為外部批次的起始

# 運行
shared_data_nested = {}
print("--- 🚀 開始運行巢狀 BatchFlow ---")
outer_batch_flow.run(shared_data_nested)
print("--- ✨ 巢狀 BatchFlow 運行結束 ---")

當運行此程式碼時,ProcessSingleFileNode 在其 prep 方法中將能同時訪問到 self.params["current_dir"] (來自 DirectoryBatchFlow) 和 self.params["current_file"] (來自 FilesInDirectoryBatchFlow)。輸出會類似:

--- 🚀 開始運行巢狀 BatchFlow ---
🗂️ 外部 BatchFlow:準備處理目錄:['docs', 'data']
  📁 內部 BatchFlow:在 'docs' 中找到文件:['guide.txt', 'notes.txt']
📄 正在準備加載文件:guide.txt  // (假設 ProcessSingleFileNode 就是 LoadSingleFileNode)
💾 文件 'guide.txt' 的內容已加載。
📝 正在為文件 'guide.txt' 的內容 '錯誤:文件未找到...' 產生摘要...
🎉 文件 'guide.txt' 的摘要已生成:'文件 'guide.txt' 的摘要:內容精...'
📄 正在準備加載文件:notes.txt
💾 文件 'notes.txt' 的內容已加載。
📝 正在為文件 'notes.txt' 的內容 '錯誤:文件未找到...' 產生摘要...
🎉 文件 'notes.txt' 的摘要已生成:'文件 'notes.txt' 的摘要:內容精...'
📦 所有文件已通過 BatchFlow 處理完畢! // FilesInDirectoryBatchFlow 的 post
  📁 內部 BatchFlow:在 'data' 中找到文件:['report.txt']
📄 正在準備加載文件:report.txt
💾 文件 'report.txt' 的內容已加載。
📝 正在為文件 'report.txt' 的內容 '錯誤:文件未找到...' 產生摘要...
🎉 文件 'report.txt' 的摘要已生成:'文件 'report.txt' 的摘要:內容精...'
📦 所有文件已通過 BatchFlow 處理完畢! // FilesInDirectoryBatchFlow 的 post
📦 所有文件已通過 BatchFlow 處理完畢! // DirectoryBatchFlow 的 post
--- ✨ 巢狀 BatchFlow 運行結束 ---

(這裡的輸出假設 ProcessSingleFileNode 包含了加載和摘要的邏輯,且每個 BatchFlow 都有自己的 post 打印消息。實際輸出會根據 ProcessSingleFileNode 的實現而定。)

幕後揭秘:BatchNode 如何運作?

當您調用 batch_node.run(shared) 時:

  1. batch_node.prep(shared) 被調用,它返回一個可迭代對象 (例如 items_list = [item1, item2, item3])。
  2. batch_node 內部有一個特殊的 _exec 方法。它會遍歷 items_list
  3. 對於 items_list 中的每一個 item
    • 它會調用 super(BatchNode, self)._exec(item)。這實際上是調用了節點 (Node) 的標準 _exec 過程來處理這個 item(包括執行您在 BatchNode 中定義的 exec(item) 方法,以及處理重試和回退邏輯)。
    • 收集該 item 的處理結果。
  4. 所有 item 都處理完畢後,會得到一個結果列表 (例如 results_list = [res1, res2, res3])。
  5. 最後,batch_node.post(shared, items_list, results_list) 被調用。

序列圖概覽 (BatchNode):

sequenceDiagram participant User as 使用者 participant MyBatchNode as 批次節點實例 participant SharedStore as 共享儲存 User->>MyBatchNode: run(shared_data) MyBatchNode->>MyBatchNode: prep(shared_data) Note right of MyBatchNode: 返回 items_list = [itemA, itemB] MyBatchNode->>MyBatchNode: _exec(items_list) Note right of MyBatchNode: 迭代 items_list MyBatchNode->>MyBatchNode: 處理 itemA (調用 exec(itemA)) MyBatchNode-->>MyBatchNode: resultA MyBatchNode->>MyBatchNode: 處理 itemB (調用 exec(itemB)) MyBatchNode-->>MyBatchNode: resultB Note right of MyBatchNode: 收集結果 exec_res_list = [resultA, resultB] MyBatchNode->>MyBatchNode: post(shared_data, items_list, exec_res_list) MyBatchNode-->>SharedStore: (寫入共享儲存) MyBatchNode-->>User: (行動指令) end

相關程式碼片段 (簡化版):pocketflow/__init__.py 檔案中,BatchNode_exec 方法大致如下:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class BatchNode(Node):
    def _exec(self, items): # items 是從 prep 返回的可迭代對象
        # 對 items 中的每個 i,調用父類 Node 的 _exec 方法來處理它
        # 並將所有結果收集到一個列表中返回
        return [super(BatchNode, self)._exec(i) for i in (items or [])]

這段程式碼清晰地顯示了 BatchNode 如何迭代處理 prep 返回的每個項目。

幕後揭秘:BatchFlow 如何運作?

當您調用 batch_flow.run(shared) 時:

  1. batch_flow.prep(shared) 被調用,它返回一個參數字典的列表 (例如 params_list = [param_dict1, param_dict2])。
  2. batch_flow 內部有一個特殊的 _run 方法 (繼承自 Flow 但有修改,或者它自己的 _orch 邏輯會被其 _run 方法多次調用)。它會遍歷 params_list
  3. 對於 params_list 中的每一個 param_dict
    • batch_flow 將當前 param_dictbatch_flow 自身可能擁有的 self.params 合併。
    • 然後,batch_flow 會使用這個合併後的參數集,執行其內部的子流程 (self.start_node 所代表的流程)。這通常是通過調用類似 self._orch(shared, merged_params) 的方法來完成的,該方法會完整地運行一次子流程。
  4. 所有參數字典都對應執行完子流程後。
  5. 最後,batch_flow.post(shared, params_list, None) 被調用。(注意:BatchFlowpost 方法接收的第三個參數 exec_res 通常是 None,因為主要結果是通過多次運行子流程並修改共享儲存 (Shared Store)來產生的)。

序列圖概覽 (BatchFlow):

sequenceDiagram participant User as 使用者 participant MyBatchFlow as 批次流程實例 participant ChildFlow as 子流程實例 participant SharedStore as 共享儲存 User->>MyBatchFlow: run(shared_data) MyBatchFlow->>MyBatchFlow: prep(shared_data) Note right of MyBatchFlow: 返回 params_list = [paramsA, paramsB] MyBatchFlow->>MyBatchFlow: (迭代 params_list) Note over MyBatchFlow, ChildFlow: 對於 paramsA: MyBatchFlow->>ChildFlow: _orch(shared_data, paramsA) (運行子流程) ChildFlow->>SharedStore: (讀/寫共享儲存,使用 paramsA) ChildFlow-->>MyBatchFlow: (子流程完成) Note over MyBatchFlow, ChildFlow: 對於 paramsB: MyBatchFlow->>ChildFlow: _orch(shared_data, paramsB) (再次運行子流程) ChildFlow->>SharedStore: (讀/寫共享儲存,使用 paramsB) ChildFlow-->>MyBatchFlow: (子流程完成) MyBatchFlow->>MyBatchFlow: post(shared_data, params_list, None) MyBatchFlow-->>User: (行動指令) end

相關程式碼片段 (簡化版):pocketflow/__init__.py 檔案中,BatchFlow_run 方法大致如下:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class BatchFlow(Flow):
    def _run(self, shared):
        # 1. 執行 BatchFlow 自己的 prep,獲取參數字典列表 (pr)
        pr = self.prep(shared) or [] 
        
        # 2. 遍歷每個參數字典 (bp)
        for bp in pr:
            # 對於每個參數字典 bp,
            # 合併 BatchFlow 自身的 params 和當前的 bp,
            # 然後調用 _orch 方法來執行一次子流程
            self._orch(shared, {**self.params, **bp}) 
            
        # 3. 執行 BatchFlow 自己的 post
        #    exec_res 傳遞為 None,因為結果通常寫在 shared store
        return self.post(shared, pr, None)

這段程式碼顯示了 BatchFlow 如何迭代從其 prep 方法獲取的參數集,並為每個參數集調用其 _orch 方法(該方法負責執行子流程)。

總結

在本章中,我們學習了 PocketFlow 中用於高效處理大量數據或重複任務的「批次處理 (Batch)」機制:

批次處理是構建可擴展和高效工作流的關鍵工具。當您面對大量相似的處理任務時,BatchNodeBatchFlow 將是您的得力助手。

雖然批次處理提高了吞吐量,但對於某些 I/O 密集型(例如大量網路請求)的批次任務,逐個順序執行仍然可能很慢。如果這些任務彼此獨立,我們是否有辦法讓它們並行執行以進一步節省時間呢?這就是我們下一章將要探討的內容:異步處理 (Async)