歡迎來到 open-notebook
教學系列的第四章!在上一章 使用者介面 (Streamlit UI) 中,我們探索了如何使用 Streamlit 建立一個互動式的網頁介面,讓使用者可以瀏覽筆記本、新增內容,就像汽車的駕駛艙一樣。
現在,想像一下,您透過那個漂亮的介面,興奮地將一篇精彩的網路文章網址,或一份重要的 PDF 報告,甚至一段錄音檔加入到您的筆記本中。但是,這些原始資料並不是我們最終想要的格式。我們希望 open-notebook
能自動幫我們從這些五花八門的來源中,提取出最重要的純文字內容,方便我們日後搜尋、摘要或進行 AI 問答。
如果每次加入不同類型的來源,我們都需要手動選擇不同的工具來處理(例如,用 PDF 閱讀器複製文字、用線上工具下載網頁內容、用語音轉文字軟體處理音檔),那將會非常繁瑣且耗時。
這就是「內容處理流程 (Content Processing Graph)」派上用場的地方!
我們的目標是讓 open-notebook
變得智慧且自動化。當您提供任何來源(網址、PDF、音檔、影片、Office 文件等)時,系統應該:
如果沒有一個統一的流程來處理這件事,我們就必須在程式碼的不同地方寫一大堆 if...else...
判斷式來處理不同的來源類型。這會讓程式碼變得混亂、難以維護,而且每次要支援新的來源類型時,都得修改很多地方。
「內容處理流程」就是為了解決這個問題而設計的,它提供了一個標準化、自動化的工作流程。
內容處理流程 (Content Processing Graph) 是一個自動化的工作流程(您可以想像成一個圖形或流程圖),負責接收不同類型的原始資料,並從中提取出純文字內容。它會根據來源類型,自動選擇並執行相應的處理步驟。
您可以把它想像成一條多功能的食品加工生產線:
source_identification
節點)會先辨識這是哪種原料。url_provider
節點,再細分到 extract_url
或 extract_youtube_transcript
)。file_type
節點,再到 extract_pdf
)。file_type
節點,可能先到 extract_best_audio_from_video
再到 extract_audio
)。file_type
節點,再到 extract_office_content
)。這個流程被設計成一個「圖」(Graph),其中包含:
open-notebook
使用一個名為 LangGraph 的函式庫來建立和執行這個處理流程。我們將在下一章 LangGraph 狀態機 (Graph Workflows) 中更深入地了解 LangGraph。目前,您只需要知道有這樣一個自動化的流程存在。
這個流程的定義主要位於 open_notebook/graphs/content_processing/__init__.py
檔案中。
對使用者或應用程式的其他部分來說,使用這個流程非常簡單。通常是在建立一個新的 Source
物件(代表一個資料來源,繼承自 物件模型 (ObjectModel))之後,觸發這個處理流程。
例如,在 使用者介面 (Streamlit UI) 中,當使用者上傳了一個 PDF 檔案並點擊「新增來源」時,後端程式碼可能會執行類似下面的步驟:
Source
物件,記錄這個來源的基本資訊(例如,檔案路徑、所屬筆記本 ID)。# 假設這是 Streamlit UI 後端處理檔案上傳的部分 (簡化示意)
import streamlit as st
from open_notebook.domain.source import Source
# 匯入我們定義好的圖 (graph)
from open_notebook.graphs.content_processing import graph
import asyncio # 因為圖的執行是異步的
async def process_uploaded_file(uploaded_file, notebook_id):
# 1. 儲存檔案 (此處省略細節,假設得到檔案路徑)
temp_file_path = f"/tmp/{uploaded_file.name}"
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# 2. 建立 Source 物件 (尚未包含提取出的 content)
new_source = Source(
notebook_id=notebook_id,
file_path=temp_file_path,
original_filename=uploaded_file.name,
source_type="file" # 標示為檔案來源
)
new_source.save() # 先儲存基本資訊
# 3. 觸發內容處理流程
# 準備初始狀態,包含檔案路徑和指示處理完畢後刪除暫存檔
initial_state = {
"file_path": temp_file_path,
"delete_source": True # 指示圖在完成後刪除暫存檔
}
try:
# 執行圖 (Graph),這是一個非同步操作
final_state = await graph.ainvoke(initial_state)
# 4. 更新 Source 物件,填入提取出的內容和標題
extracted_content = final_state.get("content")
extracted_title = final_state.get("title")
if extracted_content:
new_source.content = extracted_content
if extracted_title:
new_source.title = extracted_title
# 可能還有 metadata 等其他資訊
new_source.metadata = final_state.get("metadata", {})
new_source.save() # 再次儲存,更新內容
st.success(f"來源 '{new_source.title or new_source.original_filename}' 處理完成!")
except Exception as e:
st.error(f"處理來源時發生錯誤:{e}")
# 可能需要刪除已建立的 Source 物件或標示為失敗
# 此處也應確保暫存檔被刪除
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
# --- 在 Streamlit UI 中呼叫 ---
# uploaded_file = st.file_uploader("上傳檔案", type=["pdf", "docx", "mp3", ...])
# current_notebook_id = st.session_state.get("current_notebook_id")
# if uploaded_file and current_notebook_id:
# if st.button("新增來源"):
# asyncio.run(process_uploaded_file(uploaded_file, current_notebook_id))
程式碼解釋:
initial_state
): 我們準備一個包含 file_path
(指向剛上傳的 PDF)和 delete_source: True
(指示流程結束後刪除這個暫存檔案)的字典。graph.ainvoke(initial_state)
):graph
物件的 ainvoke
方法(a
代表異步 async)。這會啟動我們定義的內容處理流程。START
開始,進入 source_identification
節點。由於 initial_state
提供了 file_path
,這個節點會判斷 source_type
為 "file"
。source_type
為 "file"
,條件式邊將流程導向 file_type
節點。file_type
節點會使用 python-magic
函式庫檢查檔案內容,辨識出 MIME 類型(例如 "application/pdf"
)。它也會設定檔案的原始名稱作為標題 (title
)。file_type_edge
) 將流程導向對應的處理節點,例如 extract_pdf
。extract_pdf
節點(位於 pdf.py
)會執行,使用 PyMuPDF (fitz)
函式庫讀取 PDF 檔案,提取文字內容,並進行清理。它將提取出的文字放入狀態的 content
欄位。extract_pdf
節點完成後,邊將流程導向 delete_file
節點。delete_file
節點檢查狀態中的 delete_source
是否為 True
。如果是,它會刪除 file_path
指向的暫存 PDF 檔案。END
狀態。final_state
): graph.ainvoke
會返回一個包含最終狀態的字典。對於 PDF 檔案,這個字典可能包含 content
(提取出的文字)、title
(檔案名稱)、identified_type
("application/pdf")以及其他中繼資料。file_path
可能已被設為 None
(如果 delete_source
為 True
)。final_state
取出 content
, title
等資訊,更新之前建立的 Source
物件,並再次儲存到資料庫。這個例子展示了應用程式如何簡單地呼叫 graph.ainvoke
來觸發複雜的、自動化的內容處理,而不需要關心內部的具體步驟和路由邏輯。
我們已經知道如何使用這個流程,現在讓我們稍微深入了解它是如何被定義和執行的。
假設使用者提供了一個 YouTube 網址。當 graph.ainvoke({"url": "youtube_url"})
被呼叫時:
START
節點開始。source_identification
): 此節點看到輸入中有 url
,於是將 source_type
設定為 "url"
。source_type_router
): 根據 source_type
為 "url"
,流程被導向 url_provider
節點。url_provider
): 此節點檢查 URL,發現包含 "youtube.com"
,於是將 identified_type
設定為 "youtube"
。url_type_router
): 根據 identified_type
為 "youtube"
,流程被導向 extract_youtube_transcript
節點。extract_youtube_transcript
): 此節點(定義在 youtube.py
)會:youtube-transcript-api
函式庫獲取影片的最佳可用字幕。content
欄位,標題設定到 title
欄位。extract_youtube_transcript
節點完成後,流程直接導向 END
節點。content
(字幕文字)和 title
(影片標題)的最終狀態被返回。以下是一個簡化的序列圖,展示了處理 URL 的過程:
這個流程圖是使用 LangGraph 在 open_notebook/graphs/content_processing/__init__.py
中定義的。讓我們看看簡化的關鍵部分:
# open_notebook/graphs/content_processing/__init__.py (簡化)
from langgraph.graph import END, START, StateGraph
from open_notebook.graphs.content_processing.state import ContentState # 狀態定義
# 匯入各個節點函式
from .pdf import extract_pdf
from .url import extract_url, url_provider
from .youtube import extract_youtube_transcript
from .audio import extract_audio
# ... 匯入其他節點函式 ...
# 1. 建立工作流程物件,指定狀態的結構 (ContentState)
workflow = StateGraph(ContentState)
# 2. 新增節點 (Nodes)
# 每個節點是一個名稱和對應的 Python 函式
workflow.add_node("source", source_identification) # 起始辨識節點
workflow.add_node("file_type", file_type) # 檔案類型辨識
workflow.add_node("url_provider", url_provider) # URL 提供者辨識
workflow.add_node("extract_pdf", extract_pdf) # PDF 提取
workflow.add_node("extract_url", extract_url) # 通用 URL 提取
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript) # YT 提取
workflow.add_node("extract_audio", extract_audio) # 音檔提取 (轉錄)
workflow.add_node("delete_file", delete_file) # 刪除暫存檔
# ... 新增其他節點 ...
# 3. 新增邊 (Edges) - 連接節點
# 設定起始點
workflow.add_edge(START, "source") # 流程從 START 指向 "source" 節點
# 新增條件式邊 (Conditional Edges)
# 從 "source" 節點出發,根據 source_type_router 函式的回傳值決定去向
workflow.add_conditional_edges(
"source", # 起始節點
source_type_router, # 判斷函式 (返回 "url", "file", 或 "text")
{ # 可能的去向映射
"url": "url_provider", # 如果是 "url",去 "url_provider"
"file": "file_type", # 如果是 "file",去 "file_type"
"text": END, # 如果是 "text",直接結束 (END)
},
)
# 從 "file_type" 節點出發,根據 file_type_edge 函式的回傳值決定去向
workflow.add_conditional_edges(
"file_type", # 起始節點
file_type_edge, # 判斷函式 (返回 "extract_txt", "extract_pdf", etc.)
# 注意:這裡省略了映射字典,LangGraph 會直接使用回傳的字串作為目標節點名稱
)
# 從 "url_provider" 節點出發,根據 url_type_router 函式的回傳值決定去向
workflow.add_conditional_edges(
"url_provider", # 起始節點
url_type_router, # 判斷函式 (返回 "article" 或 "youtube")
{ # 可能的去向映射
"article": "extract_url", # 如果是 "article",去 "extract_url"
"youtube": "extract_youtube_transcript", # 如果是 "youtube",去 "extract_youtube_transcript"
},
)
# 新增固定邊
workflow.add_edge("extract_pdf", "delete_file") # PDF 提取完後,去刪除檔案
workflow.add_edge("extract_audio", "delete_file") # 音檔提取完後,去刪除檔案
workflow.add_edge("delete_file", END) # 刪除檔案後,結束流程
workflow.add_edge("extract_url", END) # URL 提取完後,直接結束
workflow.add_edge("extract_youtube_transcript", END) # YT 提取完後,直接結束
# ... 新增其他邊 ...
# 4. 編譯圖 (Compile Graph)
graph = workflow.compile() # 將定義好的流程編譯成可執行的物件
程式碼解釋:
StateGraph(ContentState)
: 建立一個新的狀態圖工作流程。ContentState
是一個 TypedDict
,定義了在整個流程中傳遞的狀態資料包含哪些欄位(如 content
, file_path
, url
, source_type
, identified_type
等)。workflow.add_node("節點名稱", 處理函式)
: 定義流程中的一個步驟。第一個參數是節點的唯一名稱(字串),第二個參數是對應的 Python 函式(通常是 async
函式),這個函式會接收目前的狀態,執行任務,並返回一個字典來更新狀態。workflow.add_edge(起始節點名稱, 目標節點名稱)
: 定義一個固定的連接。表示當 起始節點名稱
完成後,流程總是會流向 目標節點名稱
。START
和 END
是 LangGraph 提供的特殊節點名稱。workflow.add_conditional_edges(起始節點名稱, 判斷函式, 映射字典)
: 定義一個條件式的連接。起始節點名稱
:從哪個節點出發。判斷函式
:一個 Python 函式,它會接收目前的狀態,並返回一個字串,這個字串決定了下一步要去哪個分支。映射字典
:一個字典,鍵是 判斷函式
可能返回的字串,值是對應的目標節點名稱。workflow.compile()
: 將所有定義好的節點和邊組裝起來,產生一個可執行的 graph
物件。這個物件就是我們在前面使用範例中呼叫 graph.ainvoke()
的對象。透過這種方式,我們將複雜的內容處理邏輯拆分成一個個獨立的節點函式,並用清晰的邊定義它們之間的流程和條件,使得整個工作流程易於理解、修改和擴展。
每個節點對應的函式(例如 extract_pdf
, extract_url
)都位於 open_notebook/graphs/content_processing/
目錄下的獨立檔案中(如 pdf.py
, url.py
)。這些函式通常是 async
異步函式,因為它們可能需要執行 I/O 操作(讀檔案、網路請求)或 CPU 密集型任務(PDF 解析、音訊轉錄),使用異步可以提高效率。
它們接收當前的 ContentState
字典作為輸入,執行其特定任務(例如讀取檔案、呼叫外部 API),然後返回一個字典,其中包含要更新到狀態中的欄位和值。
# open_notebook/graphs/content_processing/pdf.py (極簡化示意)
from open_notebook.graphs.content_processing.state import ContentState
# ... (匯入 fitz 等) ...
async def extract_pdf(state: ContentState):
"""
從 PDF 檔案提取文字內容 (節點函式)。
"""
file_path = state.get("file_path")
if not file_path:
raise ValueError("缺少檔案路徑")
try:
# --- 執行核心任務 ---
# (這裡會呼叫 PyMuPDF 的函式來讀取和清理文字)
# text = await _extract_text_from_pdf(file_path)
text = "這是從 PDF 提取的模擬文字內容。" # 簡化示意
# --- 返回要更新的狀態 ---
return {"content": text}
except Exception as e:
# 處理錯誤...
raise Exception(f"提取 PDF 時發生錯誤: {e}")
在本章中,我們認識了 open-notebook
的「內容處理流程 (Content Processing Graph)」,這是一個自動化的工作流程,用於從各種來源提取純文字內容。
graph.ainvoke()
來觸發這個自動化流程,而無需關心內部的複雜性。這個內容處理流程確保了無論使用者新增什麼類型的來源,open-notebook
都能一致且自動地處理它們,提取出有用的文字資訊。但是,這個流程本身是如何被 LangGraph 這個工具執行的呢?LangGraph 如何管理狀態並在節點之間導航?
在下一章,我們將深入探討 LangGraph 狀態機 (Graph Workflows),更詳細地了解 LangGraph 的工作原理以及它如何驅動 open-notebook
中的這些自動化流程。