Chapter 6: 異步處理 (Async)

歡迎來到 PocketFlow 教學的第六章!在上一章 批次處理 (Batch) 中,我們學習了如何高效地處理大量相似的任務,例如對一堆文件進行摘要,或者將長文件分塊處理。批次處理能讓我們的生產線(流程)一次消化更多工作。

但是,想像一下,如果生產線上的每個工人(節點)在處理每個零件時,都需要打一個耗時的電話(例如呼叫外部 API、讀取大型檔案或等待使用者回應),然後呆呆地等待電話接通和回應,才能繼續下一個零件。即使是批次處理,如果這些等待是「同步」的,即一個接一個地等待,那麼整體效率依然會受到限制。

這就是「異步處理 (Async)」登場的時刻!異步處理就像給我們的工人配備了多線電話和處理多任務的能力。當一個請求發出後,他們不必原地等待,而是可以先去處理其他事務,直到請求完成後再回來繼續。這使得整個應用程式在面對耗時的 I/O 操作時,不會被阻塞,能夠更流暢地運行。

為何需要異步處理?——化解等待的魔法

許多真實世界的應用程式都需要與外部世界互動,例如:

這些操作通常比 CPU 的計算速度慢得多。在傳統的「同步」程式設計中,當程式執行到這類操作時,它會停下來等待操作完成,才能繼續執行後續的程式碼。如果這樣的操作很多,或者某個操作特別耗時,整個應用程式就會顯得卡頓,使用者體驗會很差。

異步處理 (Asynchronous Processing) 解決了這個問題。它允許程式在發起一個耗時操作後,不必等待其完成,而是可以立即轉去執行其他任務。當該耗時操作最終完成時,程式會得到通知,然後再回過頭來處理該操作的結果。

核心概念:不阻塞,更流暢。

想像一位餐廳服務生:

在 PocketFlow 中,AsyncNode 就是那位能幹的異步服務生,而 AsyncFlow 則是管理這些異步服務生團隊的經理。

PocketFlow 中的異步核心:AsyncNodeAsyncFlow

PocketFlow 透過 AsyncNodeAsyncFlow 兩個主要組件來支援異步處理,它們底層依賴 Python 的 asyncio 函式庫。

1. AsyncNode:多任務處理的節點員工

AsyncNode 繼承自普通的節點 (Node),但它定義了一套對應的異步方法。如果您的節點需要執行異步操作,就應該讓它繼承 AsyncNode 並實作這些以 _async 結尾的方法:

注意 await 關鍵字!async def 方法內部,當您呼叫另一個異步函數時,通常需要在其前面加上 await。這表示「暫停」當前方法的執行,讓事件循環可以去處理其他任務,直到 await 的異步操作完成後再回來繼續。

2. AsyncFlow:異步團隊的流程經理

AsyncNode 不能像普通節點 (Node) 那樣直接透過 node.run() 執行。它們必須被包裹在一個 AsyncFlow 中運行。

小試身手:打造一個異步食譜查找器

讓我們來建立一個簡單的異步食譜查找器。它會:

  1. 根據使用者提供的食材(儲存在共享儲存 (Shared Store)中)。
  2. 異步呼叫一個模擬的 API 來獲取相關食譜列表。
  3. 異步呼叫一個模擬的 LLM 來從列表中推薦一個食譜。
  4. 異步等待使用者確認是否接受該建議。
  5. 如果使用者拒絕,則嘗試建議另一個食譜(如果還有)。

首先,我們需要一些模擬的異步輔助函數:

import asyncio

# 模擬異步 API 呼叫
async def simulate_fetch_recipes_api(ingredient):
    print(f"🍳 正在異步搜尋 '{ingredient}' 的食譜...")
    await asyncio.sleep(1) # 模擬網路延遲
    if ingredient == "雞肉":
        return ["香烤雞腿", "宮保雞丁", "白斬雞"]
    return ["查無食譜"]

# 模擬異步 LLM 呼叫
async def simulate_llm_suggestion(recipes, used_suggestions):
    print("🧠 LLM 正在異步思考建議...")
    await asyncio.sleep(0.5) # 模擬 LLM 處理時間
    for recipe in recipes:
        if recipe not in used_suggestions:
            return recipe
    return "沒有其他建議了"

# 模擬異步獲取使用者輸入
async def simulate_user_input_async(prompt):
    print(f"💬 {prompt}", end="")
    await asyncio.sleep(0.1) # 模擬等待輸入的間隙
    # 在真實應用中,這裡會使用像 aioconsole.ainput 這樣的異步輸入方法
    # 為簡化,我們仍使用同步 input,但假裝它是異步的
    return input()

這些函數使用 await asyncio.sleep() 來模擬耗時的 I/O 操作。

步驟 1:定義 AsyncNode

FetchRecipesNode (獲取食譜節點)

from pocketflow import AsyncNode, AsyncFlow, Node # 引入所需類別

class FetchRecipesNode(AsyncNode):
    async def prep_async(self, shared):
        ingredient = shared.get("ingredient", "未知食材")
        print(f"🔍 (準備) 食材:{ingredient}")
        return ingredient # 將食材傳遞給 exec_async

    async def exec_async(self, ingredient): # ingredient 來自 prep_async
        recipes = await simulate_fetch_recipes_api(ingredient)
        return recipes # 返回獲取的食譜列表

    async def post_async(self, shared, prep_res, exec_res): # exec_res 是食譜列表
        shared["all_recipes"] = exec_res
        shared.setdefault("used_suggestions", set()) # 初始化已建議列表
        print(f"📚 (善後) 找到食譜:{exec_res}")
        return "recipes_fetched"

SuggestRecipeNode (建議食譜節點)

class SuggestRecipeNode(AsyncNode):
    async def prep_async(self, shared):
        recipes = shared.get("all_recipes", [])
        used = shared.get("used_suggestions", set())
        return recipes, used # 將食譜和已建議列表傳給 exec_async

    async def exec_async(self, prep_res): # prep_res 是 (recipes, used)
        recipes, used = prep_res
        suggestion = await simulate_llm_suggestion(recipes, used)
        return suggestion # 返回 LLM 的建議

    async def post_async(self, shared, prep_res, exec_res): # exec_res 是建議
        if exec_res == "沒有其他建議了":
            print("🤷 (善後) 沒有更多食譜可以建議了。")
            return "no_more_suggestions"
        
        shared["current_suggestion"] = exec_res
        shared["used_suggestions"].add(exec_res) # 記錄此建議已被使用
        print(f"💡 (善後) LLM 建議:{exec_res}")
        return "suggestion_made"

UserApprovalNode (使用者確認節點)

class UserApprovalNode(AsyncNode):
    async def prep_async(self, shared):
        return shared.get("current_suggestion", "未知建議")

    async def exec_async(self, suggestion): # suggestion 來自 prep_async
        prompt = f"您是否接受建議 '{suggestion}'? (y/n): "
        answer = await simulate_user_input_async(prompt)
        return answer.lower() # 返回使用者答案 (小寫)

    async def post_async(self, shared, prep_res, exec_res): # exec_res 是 'y' 或 'n'
        if exec_res == "y":
            shared["final_choice"] = shared.get("current_suggestion")
            print(f"👍 (善後) 使用者接受了 '{shared['final_choice']}'。")
            return "accept"
        else:
            print("👎 (善後) 使用者拒絕了建議。")
            return "retry" # 讓流程嘗試下一個建議

FinalMessageNode (結束訊息節點 - 同步範例) 為了展示 AsyncFlow 可以包含同步節點,我們加入一個簡單的同步節點。

class FinalMessageNode(Node): # 注意:這是普通的 Node
    def exec(self, prep_res): # 普通的 exec
        final_choice = self.shared_store_proxy.get("final_choice") # 假設 shared_store_proxy 可用
        if final_choice:
            message = f"🎉 太棒了!您選擇了:{final_choice}。祝您用餐愉快!"
        else:
            message = "🤔 可惜,這次沒有找到您滿意的食譜。"
        print(message)
        # 由於 exec 不直接存取 shared, 假設結果存在 shared_store_proxy 供 post 使用
        return message # 返回訊息本身

    def post(self, shared, prep_res, exec_res):
        shared["final_message"] = exec_res # 將最終訊息存起來
        # 沒有明確 return,預設 "default"

注意:在普通 Nodeexec 方法中,我們通常不直接存取 shared。此處為了簡化範例,我們假設有一個 self.shared_store_proxy 可以間接取得 shared 中的值,或者更好的做法是將 final_choiceprep 中讀取並傳給 exec。這裡我們重點是展示異步流程中可以有同步節點。在實際使用時,FinalMessageNodeprep 會從 shared 讀取 final_choice,並將其傳給 exec

為了更貼近 PocketFlow 設計,我們修改 FinalMessageNode 如下:

class FinalMessageNode(Node):
    def prep(self, shared):
        return shared.get("final_choice") # 從 shared 獲取最終選擇

    def exec(self, final_choice): # final_choice 來自 prep
        if final_choice:
            message = f"🎉 太棒了!您選擇了:{final_choice}。祝您用餐愉快!"
        else:
            message = "🤔 可惜,這次沒有找到您滿意的食譜。"
        print(message)
        return message

    def post(self, shared, prep_res, exec_res):
        shared["final_message"] = exec_res

步驟 2:建立 AsyncFlow 並連接節點

# 建立節點實例
fetch_node = FetchRecipesNode()
suggest_node = SuggestRecipeNode()
approval_node = UserApprovalNode()
final_node = FinalMessageNode() # 同步節點

# 定義轉換規則
fetch_node - "recipes_fetched" >> suggest_node
suggest_node - "suggestion_made" >> approval_node
suggest_node - "no_more_suggestions" >> final_node # 沒有建議了,直接結束

approval_node - "accept" >> final_node
approval_node - "retry" >> suggest_node # 使用者拒絕,回到建議節點

# 建立 AsyncFlow
recipe_flow = AsyncFlow(start=fetch_node)

這裡我們定義了節點之間的轉換邏輯,包括了使用者拒絕建議時的重試循環。

步驟 3:運行 AsyncFlow

async def main():
    shared_data = {"ingredient": "雞肉"} # 設定初始食材
    print(f"--- 🏁 開始異步食譜查找流程 (食材:{shared_data['ingredient']}) ---")
    
    await recipe_flow.run_async(shared_data) # 異步運行流程
    
    print("--- ✨ 流程執行完畢 ---")
    if "final_message" in shared_data:
        print(f"最終訊息:{shared_data['final_message']}")
    if "final_choice" in shared_data:
        print(f"您的選擇是:{shared_data['final_choice']}")

# 執行主異步函數
if __name__ == "__main__":
    asyncio.run(main())

我們定義了一個 async def main() 函數,在其中設定初始的共享儲存 (Shared Store)數據,然後使用 await recipe_flow.run_async(shared_data) 來啟動流程。最後,使用 asyncio.run(main()) 來執行整個異步程式。

當您運行此程式碼時,您會看到程式逐步執行,並在模擬的 API 呼叫、LLM 建議和使用者輸入時「等待」,但應用程式本身並未凍結。您可以嘗試輸入 'n' 來拒絕建議,看看流程是否會如預期般嘗試下一個建議。

範例互動過程可能如下:

--- 🏁 開始異步食譜查找流程 (食材:雞肉) ---
🔍 (準備) 食材:雞肉
🍳 正在異步搜尋 '雞肉' 的食譜...
📚 (善後) 找到食譜:['香烤雞腿', '宮保雞丁', '白斬雞']
🧠 LLM 正在異步思考建議...
💡 (善後) LLM 建議:香烤雞腿
💬 您是否接受建議 '香烤雞腿'? (y/n): n
👎 (善後) 使用者拒絕了建議。
🧠 LLM 正在異步思考建議...
💡 (善後) LLM 建議:宮保雞丁
💬 您是否接受建議 '宮保雞丁'? (y/n): y
👍 (善後) 使用者接受了 '宮保雞丁'。
🎉 太棒了!您選擇了:宮保雞丁。祝您用餐愉快!
--- ✨ 流程執行完畢 ---
最終訊息:🎉 太棒了!您選擇了:宮保雞丁。祝您用餐愉快!
您的選擇是:宮保雞丁

幕後揭秘:異步處理是如何運作的?

當您呼叫 await async_flow.run_async(shared) 時,PocketFlow 內部發生了什麼?

非程式碼的逐步解析:

  1. 啟動異步流程async_flow.run_async(shared) 被呼叫。
  2. 流程自身的準備 (Flow prep_async):如果 async_flow 覆寫了 prep_async() 方法,則首先 await 執行它。
  3. 進入異步協調 (_orch_async):流程開始執行其內部的異步協調邏輯,從其 start_node 開始。
  4. 執行當前節點
    • 如果當前節點是 AsyncNode:流程會 await current_node._run_async(shared)。這會依序 await 執行該 AsyncNodeprep_async_exec(內部會 await exec_asyncawait exec_fallback_async)、以及 post_async
    • 如果當前節點是普通的同步 Node:流程會直接呼叫 current_node._run(shared),這個執行是同步的,會阻塞直到完成。
    • 節點(無論異步或同步)的 postpost_async 方法回傳一個行動指令。
  5. 查找下一個節點:流程根據回傳的行動指令和轉換規則,找到下一個要執行的節點。
  6. 流程推進:如果找到了下一個節點,則將該節點設為新的當前節點,並重複步驟 4。如果沒有找到,則內部流程結束。
  7. 流程自身的善後 (Flow post_async):內部流程執行完畢後,如果 async_flow 覆寫了 post_async() 方法,則 await 執行它。內部流程最後一個節點的行動指令會作為 exec_res 傳給流程的 post_async 方法。
  8. 回傳結果run_async 方法最終回傳 async_flowpost_async 方法的回傳結果。

序列圖 (Sequence Diagram) 概覽:

sequenceDiagram participant 使用者 participant MyAsyncFlow as 異步流程 participant AsyncNodeA as 異步節點A participant SyncNodeB as 同步節點B (可選) participant SharedStore as 共享儲存 使用者->>MyAsyncFlow: await run_async(shared_data) MyAsyncFlow->>MyAsyncFlow: await prep_async(shared_data) (流程自身準備) MyAsyncFlow->>AsyncNodeA: (設為當前節點, 從 start_node 開始) AsyncNodeA->>AsyncNodeA: await _run_async(shared_data) (執行 A 的 prep_async, exec_async, post_async) Note right of AsyncNodeA: 期間可能有 await api_call() AsyncNodeA-->>SharedStore: (讀/寫共享儲存) AsyncNodeA->>MyAsyncFlow: 回傳行動 "action_X" MyAsyncFlow->>MyAsyncFlow: 根據 "action_X" 查找下一個節點 (假設是 SyncNodeB) MyAsyncFlow->>SyncNodeB: (設定 SyncNodeB 為當前節點) SyncNodeB->>SyncNodeB: _run(shared_data) (同步執行 B 的 prep, exec, post) SyncNodeB-->>SharedStore: (讀/寫共享儲存) SyncNodeB->>MyAsyncFlow: 回傳行動 "action_Y" MyAsyncFlow->>MyAsyncFlow: 根據 "action_Y" 查找下一個節點 (假設無) Note right of MyAsyncFlow: 內部流程結束,"action_Y" 作為 _orch_async 的結果 MyAsyncFlow->>MyAsyncFlow: await post_async(shared_data, prep_res_flow, "action_Y") (流程自身善後) MyAsyncFlow-->>使用者: (流程結束,回傳 Flow.post_async 的結果) end

相關程式碼片段 (簡化版):

參考 pocketflow/__init__.py 中的實作:

AsyncNode 的執行:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class AsyncNode(Node):
    # ... (prep_async, exec_async, post_async 定義) ...
    async def _exec(self,prep_res): # 異步的 _exec
        for i in range(self.max_retries): # 重試邏輯
            try: 
                return await self.exec_async(prep_res) # 注意 await
            except Exception as e:
                if i==self.max_retries-1: 
                    return await self.exec_fallback_async(prep_res,e) # 注意 await
                if self.wait>0: 
                    await asyncio.sleep(self.wait) # 異步等待

    async def _run_async(self,shared): # 異步的 _run
        p = await self.prep_async(shared)    # 1. await prep_async
        e = await self._exec(p)              # 2. await _exec (內部 await exec_async)
        return await self.post_async(shared,p,e) # 3. await post_async

這顯示了 AsyncNode 如何在其生命週期中正確地使用 await

AsyncFlow 的協調邏輯:

# 位於 pocketflow/__init__.py (為教學目的簡化)
class AsyncFlow(Flow, AsyncNode): # AsyncFlow 同時繼承 Flow 和 AsyncNode
    async def _orch_async(self,shared,params=None): # 異步協調方法
        curr = copy.copy(self.start_node)
        p = (params or {**self.params})
        last_action = None
        
        while curr:
            curr.set_params(p)
            if isinstance(curr, AsyncNode): # 如果是 AsyncNode
                last_action = await curr._run_async(shared) # 用 await _run_async
            else: # 如果是普通同步 Node
                last_action = curr._run(shared) # 直接 _run
            curr = copy.copy(self.get_next_node(curr, last_action))
        return last_action

    async def _run_async(self,shared): # AsyncFlow 的主執行方法
        prep_result_for_flow = await self.prep_async(shared) # Flow 自身的 prep_async
        # Flow 自身的 _orch_async,其結果作為 exec_res 傳給 post_async
        orchestration_result = await self._orch_async(shared) 
        return await self.post_async(shared, prep_result_for_flow, orchestration_result)

    async def post_async(self,shared,prep_res,exec_res): # Flow 自身的 post_async
        return exec_res # 預設回傳內部流程最後一個行動

_orch_async 方法是關鍵,它能判斷當前節點是異步還是同步,並相應地調用 await curr._run_async()curr._run()。這使得 AsyncFlow 可以無縫混合和執行兩種類型的節點。

總結

在本章中,我們探索了 PocketFlow 中的「異步處理 (Async)」:

異步處理是現代應用程式開發中不可或缺的一環,特別是在涉及大量網路通訊或與外部服務互動的場景中。掌握了 PocketFlow 的異步能力,您就能建構出反應更迅速、使用者體驗更佳的自動化流程。

在前面的章節中,我們的節點和流程大多是預先定義好行為的。但如果我們希望節點能更「智能」地根據上下文做出決策,甚至像一個小型助手一樣與使用者或其他服務互動呢?這就引出了我們下一章的主題:代理人/智能體 (Agent)