Chapter 4: 內容處理流程 (Content Processing Graph)

歡迎來到 open-notebook 教學系列的第四章!在上一章 使用者介面 (Streamlit UI) 中,我們探索了如何使用 Streamlit 建立一個互動式的網頁介面,讓使用者可以瀏覽筆記本、新增內容,就像汽車的駕駛艙一樣。

現在,想像一下,您透過那個漂亮的介面,興奮地將一篇精彩的網路文章網址,或一份重要的 PDF 報告,甚至一段錄音檔加入到您的筆記本中。但是,這些原始資料並不是我們最終想要的格式。我們希望 open-notebook 能自動幫我們從這些五花八門的來源中,提取出最重要的純文字內容,方便我們日後搜尋、摘要或進行 AI 問答。

如果每次加入不同類型的來源,我們都需要手動選擇不同的工具來處理(例如,用 PDF 閱讀器複製文字、用線上工具下載網頁內容、用語音轉文字軟體處理音檔),那將會非常繁瑣且耗時。

這就是「內容處理流程 (Content Processing Graph)」派上用場的地方!

為什麼需要內容處理流程?

我們的目標是讓 open-notebook 變得智慧且自動化。當您提供任何來源(網址、PDF、音檔、影片、Office 文件等)時,系統應該:

  1. 自動辨識 這是哪種類型的資料。
  2. 根據類型,自動選擇 並執行正確的處理步驟。
  3. 最終提取出純文字內容。

如果沒有一個統一的流程來處理這件事,我們就必須在程式碼的不同地方寫一大堆 if...else... 判斷式來處理不同的來源類型。這會讓程式碼變得混亂、難以維護,而且每次要支援新的來源類型時,都得修改很多地方。

「內容處理流程」就是為了解決這個問題而設計的,它提供了一個標準化、自動化的工作流程。

什麼是內容處理流程 (Content Processing Graph)?

內容處理流程 (Content Processing Graph) 是一個自動化的工作流程(您可以想像成一個圖形或流程圖),負責接收不同類型的原始資料,並從中提取出純文字內容。它會根據來源類型,自動選擇並執行相應的處理步驟。

您可以把它想像成一條多功能的食品加工生產線

這個流程被設計成一個「圖」(Graph),其中包含:

open-notebook 使用一個名為 LangGraph 的函式庫來建立和執行這個處理流程。我們將在下一章 LangGraph 狀態機 (Graph Workflows) 中更深入地了解 LangGraph。目前,您只需要知道有這樣一個自動化的流程存在。

這個流程的定義主要位於 open_notebook/graphs/content_processing/__init__.py 檔案中。

如何使用內容處理流程

對使用者或應用程式的其他部分來說,使用這個流程非常簡單。通常是在建立一個新的 Source 物件(代表一個資料來源,繼承自 物件模型 (ObjectModel))之後,觸發這個處理流程。

例如,在 使用者介面 (Streamlit UI) 中,當使用者上傳了一個 PDF 檔案並點擊「新增來源」時,後端程式碼可能會執行類似下面的步驟:

  1. 儲存檔案: 將上傳的 PDF 檔案暫時儲存到伺服器上,取得檔案路徑。
  2. 建立 Source 物件: 建立一個 Source 物件,記錄這個來源的基本資訊(例如,檔案路徑、所屬筆記本 ID)。
  3. 觸發流程: 呼叫內容處理流程,並將包含檔案路徑的資訊傳遞給它。
# 假設這是 Streamlit UI 後端處理檔案上傳的部分 (簡化示意)
import streamlit as st
from open_notebook.domain.source import Source
# 匯入我們定義好的圖 (graph)
from open_notebook.graphs.content_processing import graph 
import asyncio # 因為圖的執行是異步的

async def process_uploaded_file(uploaded_file, notebook_id):
    # 1. 儲存檔案 (此處省略細節,假設得到檔案路徑)
    temp_file_path = f"/tmp/{uploaded_file.name}" 
    with open(temp_file_path, "wb") as f:
        f.write(uploaded_file.getbuffer())
    
    # 2. 建立 Source 物件 (尚未包含提取出的 content)
    new_source = Source(
        notebook_id=notebook_id, 
        file_path=temp_file_path, 
        original_filename=uploaded_file.name,
        source_type="file" # 標示為檔案來源
    )
    new_source.save() # 先儲存基本資訊

    # 3. 觸發內容處理流程
    # 準備初始狀態,包含檔案路徑和指示處理完畢後刪除暫存檔
    initial_state = {
        "file_path": temp_file_path, 
        "delete_source": True # 指示圖在完成後刪除暫存檔
    }
    
    try:
        # 執行圖 (Graph),這是一個非同步操作
        final_state = await graph.ainvoke(initial_state)
        
        # 4. 更新 Source 物件,填入提取出的內容和標題
        extracted_content = final_state.get("content")
        extracted_title = final_state.get("title")
        
        if extracted_content:
            new_source.content = extracted_content
        if extracted_title:
            new_source.title = extracted_title
        # 可能還有 metadata 等其他資訊
        new_source.metadata = final_state.get("metadata", {})
            
        new_source.save() # 再次儲存,更新內容
        st.success(f"來源 '{new_source.title or new_source.original_filename}' 處理完成!")
        
    except Exception as e:
        st.error(f"處理來源時發生錯誤:{e}")
        # 可能需要刪除已建立的 Source 物件或標示為失敗
        # 此處也應確保暫存檔被刪除
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

# --- 在 Streamlit UI 中呼叫 ---
# uploaded_file = st.file_uploader("上傳檔案", type=["pdf", "docx", "mp3", ...])
# current_notebook_id = st.session_state.get("current_notebook_id")
# if uploaded_file and current_notebook_id:
#     if st.button("新增來源"):
#         asyncio.run(process_uploaded_file(uploaded_file, current_notebook_id)) 

程式碼解釋:

這個例子展示了應用程式如何簡單地呼叫 graph.ainvoke 來觸發複雜的、自動化的內容處理,而不需要關心內部的具體步驟和路由邏輯。

深入探索:內容處理流程的內部機制

我們已經知道如何使用這個流程,現在讓我們稍微深入了解它是如何被定義和執行的。

流程執行概覽(以處理 URL 為例)

假設使用者提供了一個 YouTube 網址。當 graph.ainvoke({"url": "youtube_url"}) 被呼叫時:

  1. 啟動: 流程從 START 節點開始。
  2. 辨識來源 (source_identification): 此節點看到輸入中有 url,於是將 source_type 設定為 "url"
  3. 來源類型路由 (source_type_router): 根據 source_type"url",流程被導向 url_provider 節點。
  4. 辨識 URL 提供者 (url_provider): 此節點檢查 URL,發現包含 "youtube.com",於是將 identified_type 設定為 "youtube"
  5. URL 類型路由 (url_type_router): 根據 identified_type"youtube",流程被導向 extract_youtube_transcript 節點。
  6. 提取 YouTube 字幕 (extract_youtube_transcript): 此節點(定義在 youtube.py)會:
    • 從 URL 中提取影片 ID。
    • 使用 youtube-transcript-api 函式庫獲取影片的最佳可用字幕。
    • (可選)獲取影片標題。
    • 將格式化後的字幕文字設定到狀態的 content 欄位,標題設定到 title 欄位。
  7. 結束: extract_youtube_transcript 節點完成後,流程直接導向 END 節點。
  8. 返回結果: 包含 content(字幕文字)和 title(影片標題)的最終狀態被返回。

以下是一個簡化的序列圖,展示了處理 URL 的過程:

sequenceDiagram participant UserCode as 使用者程式碼 participant ContentGraph as 內容處理圖 (Graph) participant SourceIdNode as 辨識來源節點 participant UrlProviderNode as URL提供者節點 participant ExtractYTNode as 提取YT字幕節點 UserCode->>ContentGraph: ainvoke({"url": "youtube_url", ...}) ContentGraph->>SourceIdNode: 執行 source_identification SourceIdNode-->>ContentGraph: 返回狀態 (source_type="url") ContentGraph->>UrlProviderNode: 執行 url_provider (根據路由) UrlProviderNode-->>ContentGraph: 返回狀態 (identified_type="youtube") ContentGraph->>ExtractYTNode: 執行 extract_youtube_transcript (根據路由) ExtractYTNode-->>ContentGraph: 返回狀態 (包含 content, title) ContentGraph-->>UserCode: 返回最終狀態

圖 (Graph) 的定義

這個流程圖是使用 LangGraph 在 open_notebook/graphs/content_processing/__init__.py 中定義的。讓我們看看簡化的關鍵部分:

# open_notebook/graphs/content_processing/__init__.py (簡化)
from langgraph.graph import END, START, StateGraph
from open_notebook.graphs.content_processing.state import ContentState # 狀態定義
# 匯入各個節點函式
from .pdf import extract_pdf
from .url import extract_url, url_provider
from .youtube import extract_youtube_transcript
from .audio import extract_audio
# ... 匯入其他節點函式 ...

# 1. 建立工作流程物件,指定狀態的結構 (ContentState)
workflow = StateGraph(ContentState)

# 2. 新增節點 (Nodes)
#    每個節點是一個名稱和對應的 Python 函式
workflow.add_node("source", source_identification) # 起始辨識節點
workflow.add_node("file_type", file_type)           # 檔案類型辨識
workflow.add_node("url_provider", url_provider)     # URL 提供者辨識
workflow.add_node("extract_pdf", extract_pdf)       # PDF 提取
workflow.add_node("extract_url", extract_url)       # 通用 URL 提取
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript) # YT 提取
workflow.add_node("extract_audio", extract_audio)   # 音檔提取 (轉錄)
workflow.add_node("delete_file", delete_file)       # 刪除暫存檔
# ... 新增其他節點 ...

# 3. 新增邊 (Edges) - 連接節點

# 設定起始點
workflow.add_edge(START, "source") # 流程從 START 指向 "source" 節點

# 新增條件式邊 (Conditional Edges)
# 從 "source" 節點出發,根據 source_type_router 函式的回傳值決定去向
workflow.add_conditional_edges(
    "source", # 起始節點
    source_type_router, # 判斷函式 (返回 "url", "file", 或 "text")
    { # 可能的去向映射
        "url": "url_provider", # 如果是 "url",去 "url_provider"
        "file": "file_type",   # 如果是 "file",去 "file_type"
        "text": END,           # 如果是 "text",直接結束 (END)
    },
)

# 從 "file_type" 節點出發,根據 file_type_edge 函式的回傳值決定去向
workflow.add_conditional_edges(
    "file_type", # 起始節點
    file_type_edge, # 判斷函式 (返回 "extract_txt", "extract_pdf", etc.)
    # 注意:這裡省略了映射字典,LangGraph 會直接使用回傳的字串作為目標節點名稱
)

# 從 "url_provider" 節點出發,根據 url_type_router 函式的回傳值決定去向
workflow.add_conditional_edges(
    "url_provider", # 起始節點
    url_type_router, # 判斷函式 (返回 "article" 或 "youtube")
    { # 可能的去向映射
        "article": "extract_url", # 如果是 "article",去 "extract_url"
        "youtube": "extract_youtube_transcript", # 如果是 "youtube",去 "extract_youtube_transcript"
    },
)

# 新增固定邊
workflow.add_edge("extract_pdf", "delete_file") # PDF 提取完後,去刪除檔案
workflow.add_edge("extract_audio", "delete_file") # 音檔提取完後,去刪除檔案
workflow.add_edge("delete_file", END)           # 刪除檔案後,結束流程
workflow.add_edge("extract_url", END)           # URL 提取完後,直接結束
workflow.add_edge("extract_youtube_transcript", END) # YT 提取完後,直接結束
# ... 新增其他邊 ...

# 4. 編譯圖 (Compile Graph)
graph = workflow.compile() # 將定義好的流程編譯成可執行的物件

程式碼解釋:

透過這種方式,我們將複雜的內容處理邏輯拆分成一個個獨立的節點函式,並用清晰的邊定義它們之間的流程和條件,使得整個工作流程易於理解、修改和擴展。

節點函式範例

每個節點對應的函式(例如 extract_pdf, extract_url)都位於 open_notebook/graphs/content_processing/ 目錄下的獨立檔案中(如 pdf.py, url.py)。這些函式通常是 async 異步函式,因為它們可能需要執行 I/O 操作(讀檔案、網路請求)或 CPU 密集型任務(PDF 解析、音訊轉錄),使用異步可以提高效率。

它們接收當前的 ContentState 字典作為輸入,執行其特定任務(例如讀取檔案、呼叫外部 API),然後返回一個字典,其中包含要更新到狀態中的欄位和值。

# open_notebook/graphs/content_processing/pdf.py (極簡化示意)
from open_notebook.graphs.content_processing.state import ContentState
# ... (匯入 fitz 等) ...

async def extract_pdf(state: ContentState):
    """
    從 PDF 檔案提取文字內容 (節點函式)。
    """
    file_path = state.get("file_path")
    if not file_path:
        raise ValueError("缺少檔案路徑")

    try:
        # --- 執行核心任務 ---
        # (這裡會呼叫 PyMuPDF 的函式來讀取和清理文字)
        # text = await _extract_text_from_pdf(file_path) 
        text = "這是從 PDF 提取的模擬文字內容。" # 簡化示意
        
        # --- 返回要更新的狀態 ---
        return {"content": text} 
    except Exception as e:
        # 處理錯誤...
        raise Exception(f"提取 PDF 時發生錯誤: {e}")

總結

在本章中,我們認識了 open-notebook 的「內容處理流程 (Content Processing Graph)」,這是一個自動化的工作流程,用於從各種來源提取純文字內容。

這個內容處理流程確保了無論使用者新增什麼類型的來源,open-notebook 都能一致且自動地處理它們,提取出有用的文字資訊。但是,這個流程本身是如何被 LangGraph 這個工具執行的呢?LangGraph 如何管理狀態並在節點之間導航?

在下一章,我們將深入探討 LangGraph 狀態機 (Graph Workflows),更詳細地了解 LangGraph 的工作原理以及它如何驅動 open-notebook 中的這些自動化流程。