LangGraph 是如何让LLM产生确定性输出的?

像经常用 LLM 的同学都知道现在最头疼的问题就是幻觉问题,在金融或精密计算领域,不确定性意味着风险。 如果 Agent 负责分析 NVDA 或 TSLA 的财报,开发者希望它在处理相同数据时,逻辑推导链条是严密的,而不是在不同时间给出自相矛盾的结论。或是需要 LLM 输出 JSON 来触发一个 API,我们不会希望 LLM 在 JSON 里多加了一个逗号或改变了字段名。

最后我还尝试用 LangGraph 的理念自己写了一个 smallest-LangGraph

LangGraph 可以做什么?

传统的 LangChain 核心逻辑是 DAG(有向无环图)。我们可以轻松定义 A -> B -> C 的步骤,但如果你想让 AI 在 B 步骤发现结果不满意,自动跳回 A 重新执行,LangChain 的普通 Chain 很难优雅地实现。并且在复杂的长对话或多步骤任务中,维护一个全局的、可持久化的“状态快照”非常困难。

所以为了解决这些问题,LangGraph 就诞生了。LangGraph 的主要有这些核心优势:

  1. 支持“循环(Cycles)”与“迭代”

    思考 -> 2. 行动 -> 3. 观察结果 -> 4. 如果不满意,回到第1步。 LangGraph 允许你定义这种闭环逻辑,这在长任务、自我修正代码、多轮调研场景下是刚需。

  2. 状态管理

    LangGraph 引入了 State 的概念,所有节点共享同一个 TypedDict,你可以精确定义哪些数据是追加的(operator.add),哪些是覆盖的。并且它可以自动保存每一步的状态。即使程序崩溃或需要人工审核,你也可以从特定的“存档点”恢复,而不需要从头运行。

  3. 人机协作

    LangGraph 允许你将流程设计为“在某处强制停下”,等待人类信号后再继续。这在 LangChain 的线性模型中极难实现,但在 LangGraph 的状态机模型中只是一个节点属性。

  4. 高度可控

    “如果工具返回报错,必须走 A 路径。” 这种确定性对于生产环境的后端服务至关重要。不能让模型乱输出,在生产环境上严格把控输出结果是很重要的。

LangGraph 结构

由于 LangGraph 的核心思想是将 Agent 的工作流建模为一张有向图(Directed Graph)。所以 LangGraph 有如下几个结构组成

  • 全局状态(State)

    这个状态通常被定义为一个 Python 的 TypedDict,它可以包含任何你需要追踪的信息,如对话历史、中间结果、迭代次数等,所有的节点都能读取和更新这个中心状态。

  • 节点(Nodes)

    每个节点都是一个接收当前状态作为输入、并返回一个更新后的状态作为输出的 Python 函数。

  • 边(Edges)

    边负责连接节点,定义工作流的方向。最简单的边是常规边,它指定了一个节点的输出总是流向另一个固定的节点。而 LangGraph 最强大的功能在于条件边(Conditional Edges)。它通过一个函数来判断当前的状态,然后动态地决定下一步应该跳转到哪个节点。

基于上面的概念,我们来做一个例子,假设我们要开发一个 Agent:它先翻译一段话,然后自己检查是否有语法错误,如果有,就打回重新翻译;如果没有,就结束。

首先,我们先定义状态 (State):

from typing import TypedDict, List

class AgentState(TypedDict):
    # 原始文本
    input_text: str
    # 翻译后的文本
    translated_text: str
    # 反思反馈
    feedback: str
    # 循环次数(防止死循环)
    iterations: int

定义节点逻辑 (Nodes):

def translator_node(state: AgentState):
    print("--- 正在翻译 ---")
    # 这里通常会调用 LLM
    new_text = f"Translated: {state['input_text']}" 
    return {"translated_text": new_text, "iterations": state.get("iterations", 0) + 1}

def critic_node(state: AgentState):
    print("--- 正在自检 ---")
    # 模拟检查逻辑,如果包含 'bad' 字符就认为不合格
    if "bad" in state['translated_text']:
        return {"feedback": "发现不当词汇,请重试"}
    return {"feedback": "OK"}

定义路由逻辑 (Conditional Edges):

def should_continue(state: AgentState):
    if state["feedback"] == "OK" or state["iterations"] > 3:
        return "end"
    else:
        return "rephrase"

构建图 (Graph Construction):

from langgraph.graph import StateGraph, END

# 1. 初始化图
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)

# 3. 设置入口点
workflow.set_entry_point("translator")

# 4. 连接节点
workflow.add_edge("translator", "critic")

# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
    "critic",
    should_continue,
    {
        "rephrase": "translator", # 如果不 OK,回到翻译节点
        "end": END                # 如果 OK,结束
    }
)

# 6. 编译成可执行应用
app = workflow.compile()

通过上面这种编排方式,可以让 LLM 概率性输出产生确定性的输出,通过各种限制节点,很好的控制了 LLM 的访问的节点。

下面我给出完整的例子,大家可以用这个例子去尝试一下:

from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langchain_core.messages import SystemMessage, HumanMessage

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6V",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

class AgentState(TypedDict):
    # 原始文本
    input_text: str
    # 翻译后的文本
    translated_text: str
    # 反思反馈
    feedback: str
    # 循环次数(防止死循环)
    iterations: int

def translator_node(state: AgentState):
    """翻译节点:负责将中文翻译成英文"""
    print(f"\n--- [节点:翻译器] 第 {state.get('iterations', 0) + 1} 次尝试 ---")

    iters = state.get("iterations", 0)
    feedback = state.get("feedback", "无")

    # 构建提示词:如果是重试,带上反馈建议
    system_prompt = "你是一个专业的翻译官。请将用户的中文翻译成地道、优雅的英文。"
    if iters > 0:
        system_prompt += f" 注意:这是第二次尝试,请参考之前的反馈进行改进:{feedback}"

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=state["input_text"])
    ])

    return {
        "translated_text": response.content,
        "iterations": iters + 1
    }

def critic_node(state: AgentState):
    """评审节点:检查翻译质量"""
    print("--- [节点:评审员] 正在检查翻译质量... ---")

    system_prompt = (
        "你是一个严苛的英文编辑。请评价以下翻译是否准确、地道。"
        "如果翻译得很好,请只回复关键词:【PASS】。"
        "如果翻译有改进空间,请直接指出问题并给出改进建议。"
    )

    user_content = f"原文:{state['input_text']}\n译文:{state['translated_text']}"

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=user_content)
    ])

    return {"feedback": response.content}

# 4. 定义路由逻辑
def should_continue(state: AgentState):
    """判断是继续修改还是直接结束"""
    if "【PASS】" in state["feedback"] or state["iterations"] >= 3:
        if state["iterations"] >= 3:
            print("!!! 达到最大尝试次数,停止优化。")
        return "end"
    else:
        print(f">>> 反馈建议:{state['feedback']}")
        return "rephrase"

# 1. 初始化图
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)

# 3. 设置入口点
workflow.set_entry_point("translator")

# 4. 连接节点
workflow.add_edge("translator", "critic")

# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
    "critic",
    should_continue,
    {
        "rephrase": "translator", # 如果不 OK,回到翻译节点
        "end": END                # 如果 OK,结束
    }
)

# 6. 编译成可执行应用
app = workflow.compile()

# 7. 运行时交互
if __name__ == "__main__":
    print("=== LangGraph 智能翻译 Agent (输入 'exit' 退出) ===")
    while True:
        user_input = input("\n请输入想要翻译的中文内容: ")
        if user_input.lower() == 'exit':
            break

        # 初始状态
        initial_state = {
            "input_text": user_input,
            "iterations": 0
        }

        # 运行图并获取最终状态
        final_state = app.invoke(initial_state)

        print("\n" + "=" * 30)
        print(f"最终翻译结果:\n{final_state['translated_text']}")
        print("=" * 30)

LangGraph 是如何管理状态的?

State Reducer 自动合并 state

Reducer 在 LangGraph 中就是一种更新状态的处理逻辑,如果没有指定默认行为是 用新值覆盖旧值。想要指定 Reducer 只需要通过 typing.Annotated 字段绑定一个 Reducer 函数即可。

比如使用 operator.add 定义这是一个“追加型”字段:

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
import operator

# 定义状态结构 (类似 Go 的 Struct)
class AgentState(TypedDict):
    # 使用 Annotated 和 operator.add 定义这是一个“追加型”字段
    # 每次节点返回消息,都会 append 到这个列表,而不是覆盖它
    messages: Annotated[list[str], operator.add]

    # 普通字段,默认行为是 Overwrite (覆盖)
    # 适合存储状态机当前的步骤或分析结论
    current_status: str

    # 计数器,也可以使用 operator.add 实现增量累加
    retry_count: Annotated[int, operator.add]

Checkpointer + Thread 持久化状态

在 LangGraph 中,Checkpointer 是一个持久化层接口,这意味着历史的对话记录,可以被自动持久化到数据库(如 SQLite 或其他外部数据库)中。这使得即使应用程序重启或用户断开连接,对话历史也能被保存和恢复,从而实现“真正的多轮记忆”。

LangGraph 提供了多种 Checkpointer 以便应对不同的使用场景:

  • MemorySaver 保存在内存,适用开发调试、单元测试;

  • SqliteSaver 保存在本地的.db文件,轻量级应用、边缘计算适合单机部署;

  • PostgresSaver 保存在 PostgreSQL,适合用在生产环境、多实例部署;

  • RedisSaver 适合处理高频、短时会话;

LangGraph 通过 thread_id 会话的唯一标识,结合 Checkpointer 就可以实现状态的隔离:

首先指定一个 指定一个 thread_id,所有相关的状态都会被保存到这个线程中。

config = {"configurable": {"thread_id": "conversation_1"}}
graph.invoke(input_data, config)

编译的时候传入 Checkpointer 即可。

# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)

完整示例:

from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI 
from langgraph.graph import StateGraph, START, END, MessagesState 

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

# 定义节点函数
def call_model(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": response}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_edge(START, "agent")

# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)

# 第一次对话
config = {"configurable": {"thread_id": "user_123"}}
response1 = graph.invoke(
    {"messages": [{"role": "user", "content": "你好,我的名字是张三"}]},
    config
)
print(f"AI: {response1['messages'][-1].content}")

# 第二次对话(相同 thread_id)
response2 = graph.invoke(
    {"messages": [{"role": "user", "content": "我的名字是什么?"}]},
    config  # 使用相同的 thread_id
)
print(f"AI: {response2['messages'][-1].content}")

# 获取当前的状态信息
print(f"AI: {graph.get_state(config)}")

除此之外,可以 graph.get_state() / graph.get_state_history() 拿到当前/历史状态;也可以基于 checkpoint 做 replay、update_state(时间旅行能力通常要求启用 checkpointer)。

Super-step 原子循环单元

由于一个 node 也可以连接多个 node,多个 node 也可以连接到 一个 node,所以 LangGraph 设计了 Super-step 来作为原子循环单元。比如下面的例子:

  graph.set_entry_point("n1")
  graph.add_edge("n1", "n2")
  graph.add_edge("n1", "n3")
  graph.add_edge("n2", "n4")
  graph.add_edge("n3", "n4")
  graph.add_edge("n4", END)

LangGraph 只分了三步就执行完了该循环。如下图,第二步的时候会 n2、n3 节点并行执行。

graph active nodes in each superstep

并且每个 super-step 都会自动保存一个 checkpoint,这就是持久化机制的基础。即使程序中断,也能从最后一个 super-step 的 checkpoint 恢复执行。

Human-in-the-loop 人机协同

Human-in-the-loop 本质上就是让 agent “关键时刻”暂停,它的底层靠的是 interrupt + 持久化(checkpoint):暂停时把状态存起来,恢复时从存档续跑。

比如我们想要是线一个场景就是让 AI 去判断是否应该要人工审核,如过需要人工审核,那么就 interrupt 进行中断,然后等人工输入之后根据执行逻辑进行恢复,然后配合Command(resume=...) 恢复。

基本流程可以是这样:

import uuid
from langgraph.types import interrupt, Command

def ask_human(state):
    answer = interrupt("Do you approve?")
    return {"approved": answer}

config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# 第一次跑:会中断,返回 __interrupt__
graph.invoke({"input": "x"}, config=config)

# 人给了答复后:用 Command(resume=...) 恢复
graph.invoke(Command(resume=True), config=config)

这个例子中interrupt()暂停图执行,把一个值(必须可 JSON 序列化)抛给调用方,并依赖 checkpointer 持久化状态;然后你用同一个 thread_id 重新调用图,并传入 Command(resume=...) 来继续。

接下来我们看一个完整的例子,设计一个常见的场景,当模型觉得需要“找专家/找人类”时,会调用一个工具 human_assistance,而这个工具会用 interrupt() 把流程暂停下来,等你在命令行里输入专家建议后,再用 Command(resume=...) 把图唤醒继续跑。

from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition 
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

class State(TypedDict):
    messages: Annotated[list, add_messages]

@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]

tools = [human_assistance]
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
    message = llm_with_tools.invoke(state["messages"])
    return {"messages": [message]}

tool_node = ToolNode(tools=tools)

graph_builder = StateGraph(State)
graph_builder.add_node("tools", tool_node)

graph_builder.add_conditional_edges(
    "chatbot",
    tools_condition,
)

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

memory = InMemorySaver()
graph = graph_builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "test_thread_123"}}

# 第一步:用户提出一个需要“人工协助”的问题
print("--- 第一阶段:AI 运行并遇到 interrupt ---")
initial_input = HumanMessage(content="你好,帮我找个专家回答我的问题")

for event in graph.stream({"messages": [initial_input]}, config, stream_mode="values"):
    if "messages" in event:
        event["messages"][-1].pretty_print()

# 此时,你会发现程序停止了,因为它卡在 `human_assistance` 的 `interrupt` 处。

# 第二步:模拟人类(你)在一段时间后看到了请求并回复
print("\n--- 第二阶段:模拟人类介入并提供答案 ---")

# 我们构造一个 Command 对象来“唤醒”它
# resume 里的内容会直接成为 interrupt() 函数的返回值
expert_input = input("专家建议: ")
human_feedback = {"data": expert_input}

for event in graph.stream(
    Command(resume=human_feedback), # 这里是恢复运行的关键
    config,
    stream_mode="values"
):
    if "messages" in event:
        event["messages"][-1].pretty_print()

snapshot = graph.get_state(config)

print(snapshot.values)

LangGraph如何轻松实现 Agent 多种执行范式

ReAct

ReAct由Shunyu Yao于2022年提出[1],其核心思想是模仿人类解决问题的方式,将推理 (Reasoning)行动 (Acting) 显式地结合起来,形成一个“思考-行动-观察”的循环。

img

ReAct范式通过一种特殊的提示工程来引导模型,使其每一步的输出都遵循一个固定的轨迹:

  • Thought (思考): 这是智能体的“内心独白”。它会分析当前情况、分解任务、制定下一步计划,或者反思上一步的结果。
  • Action (行动): 这是智能体决定采取的具体动作,通常是调用一个外部工具API 。
  • Observation (观察): 这是执行Action后从外部工具返回的结果,例如或API的返回值。

智能体将不断重复这个 Thought -> Action -> Observation 的循环,将新的观察结果追加到历史记录中,形成一个不断增长的上下文,直到它在Thought中认为已经找到了最终答案,然后输出结果。

from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# --- 1. 双手:定义查天气的工具 ---
@tool
def get_weather(city: str):
    """查询指定城市的天气"""
    # 这里模拟后端 API 返回数据
    if "北京" in city:
        return "晴天,25度"
    return "阴天,20度"

tools = [get_weather]
tool_node = ToolNode(tools)

# --- 2. 记忆:定义存储对话的状态 ---
class State(TypedDict):
    messages: Annotated[list, add_messages]

# --- 3. 大脑:定义思考逻辑 ---
model = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
).bind_tools(tools)

def call_model(state: State):
    # 大脑看一眼目前的对话,决定是直接说话还是去用手拿工具
    return {"messages": [model.invoke(state["messages"])]}

# --- 4. 路由:判断下一步是干活还是结束 ---
def should_continue(state: State):
    last_message = state["messages"][-1]
    # 如果大脑发出的指令包含“调用工具”,就去 tools 节点
    if last_message.tool_calls:
        return "tools"
    # 如果大脑直接说话了,就结束
    return END

# --- 5. 编排图(把脑和手连起来) ---
workflow = StateGraph(State)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")

# 条件边:agent 运行完,判断是去 tools 还是结束
workflow.add_conditional_edges("agent", should_continue)

# 普通边:tools 运行完(干完活了),必须把结果拿回给 agent 看
workflow.add_edge("tools", "agent")

app = workflow.compile()

# --- 6. 执行测试 ---
for chunk in app.stream({"messages": [("user", "北京今天天气怎么样?")]}):
    print(chunk)

Plan-and-Solve

Plan-and-Solve 顾名思义,这种范式将任务处理明确地分为两个阶段:先规划 (Plan),后执行 (Solve)。Plan-and-Solve Prompting 由 Lei Wang 在2023年提出。其核心动机是为了解决思维链在处理多步骤、复杂问题时容易“偏离轨道”的问题。

Plan-and-Solve 将整个流程解耦为两个核心阶段:

  1. 规划阶段 (Planning Phase): 首先,智能体会接收用户的完整问题。它的第一个任务不是直接去解决问题或调用工具,而是将问题分解,并制定出一个清晰、分步骤的行动计划
  2. 执行阶段 (Solving Phase): 在获得完整的计划后,智能体进入执行阶段。它会严格按照计划中的步骤,逐一执行。每一步的执行都可能是一次独立的 LLM 调用,或者是对上一步结果的加工处理,直到计划中的所有步骤都完成,最终得出答案。

img

import operator
from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# 1. 定义状态 (State)
class PlanExecuteState(TypedDict):
    input: str            # 原始问题
    plan: List[str]       # 当前待办清单
    past_steps: Annotated[List[Tuple], operator.add] # 已完成的步骤和结果
    response: str         # 最终答案

# 2. 定义结构化输出模型 (用于 Planner)
class Plan(BaseModel):
    """步骤清单"""
    steps: List[str] = Field(description="为了回答问题需要执行的步骤")

# 3. 定义节点逻辑
model = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

planner_model = model.with_structured_output(Plan, method="function_calling")

# --- 节点 A: 规划者 ---
def planner_node(state: PlanExecuteState):
    plan = planner_model.invoke(f"针对以下问题制定计划: {state['input']}")
    return {"plan": plan.steps}

# --- 节点 B: 执行者 (这里简化了工具调用) ---
def executor_node(state: PlanExecuteState):
    step = state["plan"][0] # 取当前第一步
    print(f"--- 正在执行: {step} ---")
    # 模拟工具执行结果
    result = f"已完成 {step} 的查询,结果为: [模拟数据]"
    return {"past_steps": [(step, result)], "plan": state["plan"][1:]}

# --- 节点 C: 重规划者 (决定是继续还是结束) ---
def replanner_node(state: PlanExecuteState):
    if not state["plan"]:  # 如果清单空了,让 AI 生成最终总结
        summary = model.invoke(
            f"请基于已完成的步骤和结果给出最终答案:{state['past_steps']}"
        )
        return {"response": summary.content}
    return {"response": None}

# 4. 路由逻辑
def should_continue(state: PlanExecuteState):
    if state["response"]:
        return END
    return "executor"

# 5. 编排图
workflow = StateGraph(PlanExecuteState)

workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("re-planner", replanner_node)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "re-planner")

# 循环逻辑:根据 re-planner 的判断决定是否回 executor
workflow.add_conditional_edges("re-planner", should_continue)

app = workflow.compile()

# 6. 测试
input_query = {"input": "对比北京和上海的天气,哪个更热?"}
for event in app.stream(input_query):
    print(event)

Reflection

Reflection 机制的核心思想,正是为智能体引入一种事后(post-hoc)的自我校正循环,使其能够像人类一样,审视自己的工作,发现不足,并进行迭代优化。 Reflection 框架是Shinn, Noah 在2023年提出,其核心工作流程可以概括为一个简洁的三步循环:执行 -> 反思 -> 优化

  1. 执行 (Execution):首先,智能体使用我们熟悉的方法(如 ReAct 或 Plan-and-Solve)尝试完成任务,生成一个初步的解决方案或行动轨迹。这可以看作是“初稿”。
  2. 反思 (Reflection):接着,智能体进入反思阶段。它会调用一个独立的、或者带有特殊提示词的大语言模型实例,来扮演一个“评审员”的角色。这个“评审员”会审视第一步生成的“初稿”,并从多个维度进行评估,例如:
    • 事实性错误:是否存在与常识或已知事实相悖的内容?
    • 逻辑漏洞:推理过程是否存在不连贯或矛盾之处?
    • 效率问题:是否有更直接、更简洁的路径来完成任务?
    • 遗漏信息:是否忽略了问题的某些关键约束或方面? 根据评估,它会生成一段结构化的反馈 (Feedback),指出具体的问题所在和改进建议。
  3. 优化 (Refinement):最后,智能体将“初稿”和“反馈”作为新的上下文,再次调用大语言模型,要求它根据反馈内容对初稿进行修正,生成一个更完善的“修订稿”。

img

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

class ReflectionState(TypedDict):
    prompt: str
    draft: str
    critique: str
    final: str
    iteration: int

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)

MAX_ITERS = 2

def generate_draft(state: ReflectionState):
    msg = llm.invoke(f"请写一段简短答案:{state['prompt']}")
    return {"draft": msg.content, "iteration": 0}

def reflect_on_draft(state: ReflectionState):
    prompt = (
        "你是严格的审稿人。请指出这段答案的问题并给出改进建议。"
        "如果没有明显问题,请只输出 NO_ISSUES。\n\n"
        f"答案:\n{state['draft']}"
    )
    critique = llm.invoke(prompt)
    print(f"--- 正在执行: reflect,critique:\n {critique.content} ---")
    return {"critique": critique.content}

def revise_draft(state: ReflectionState):
    prompt = (
        "请根据以下反馈重写答案,保持简短清晰:\n\n"
        f"反馈:\n{state['critique']}\n\n"
        f"原答案:\n{state['draft']}"
    )
    revision = llm.invoke(prompt)
    print(f"--- 正在执行: revise,原答案:\n{state['draft']},改进后:\n{revision.content} ---")
    return {"draft": revision.content, "iteration": state["iteration"] + 1}

def finalize(state: ReflectionState):
    return {"final": state["draft"]}

def should_reflect(state: ReflectionState):
    if state["critique"].strip() == "NO_ISSUES":
        return "finalize"
    if state["iteration"] >= MAX_ITERS:
        return "finalize"
    return "revise"

workflow = StateGraph(ReflectionState)
workflow.add_node("generate", generate_draft)
workflow.add_node("reflect", reflect_on_draft)
workflow.add_node("revise", revise_draft)
workflow.add_node("finalize", finalize)

workflow.add_edge(START, "generate")
workflow.add_edge("generate", "reflect")
workflow.add_conditional_edges("reflect", should_reflect)
workflow.add_edge("revise", "reflect")
workflow.add_edge("finalize", END)

app = workflow.compile()

if __name__ == "__main__":
    input_data = {"prompt": "用三句话解释什么是 LangGraph。"}
    result = app.invoke(input_data)
    print("最终答案:")
    print(result["final"])

大致流程就是,首先里面需要有两个角色:写稿人和审稿人,然后用 should_reflect 来判断是否需要重写,然后用 MAX_ITERS 来限制一下最大撰写次数。

  [START]
     |
     v
  (generate_draft)
     |
     v
  (reflect_on_draft) -- NO_ISSUES --> (finalize) --> [END]
           |
           | iteration >= MAX_ITERS
           +-----------------------> (finalize) --> [END]
           |
           +-- else --> (revise_draft) --+
                                         |
                                         v
                                 (reflect_on_draft)

Multi-Agent Pattern

Multi-Agent 模式是将复杂的任务拆解为多个专门化、独立且可协同的微服务,每个服务(Agent)只负责一个特定的领域。

因为单个 Prompt 包含太多工具和指令会导致 LLM “迷失”,模型表现下降。所以通过使用doge Agent 进行职责分离,不同的 Agent 可以使用不同的 Prompt、不同的模型(如 GPT-4o 负责决策,Llama-3 负责写代码),甚至不同的工具集。

比如下面的例子中:一个“PM” Agent 负责拆解任务,并将子任务分发给不同的“员工(Workers)”。

img

img

from typing import Annotated, Literal
from typing_extensions import TypedDict

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# Multi-agent pattern: a supervisor routes work between specialist agents.

llm = ChatOpenAI(
    temperature=0.4,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)

class State(TypedDict):
    messages: Annotated[list, add_messages]
    next: str
    turn: int

MAX_TURNS = 6

def _call_agent(system_prompt: str, messages: list, name: str):
    response = llm.invoke([{"role": "system", "content": system_prompt}] + messages)
    return {
        "messages": [
            {"role": "assistant", "name": name, "content": response.content}
        ],
        "turn": 1,
    }

def supervisor(state: State):
    if state["turn"] >= MAX_TURNS:
        return {"next": "finish"}

    system = (
        "You are a supervisor managing a team: researcher, writer, critic. "
        "Choose who should act next or finish. "
        "Respond with exactly one word: researcher, writer, critic, finish."
    )
    response = llm.invoke([{"role": "system", "content": system}] + state["messages"])
    decision = response.content.strip().lower()
    for option in ("researcher", "writer", "critic", "finish"):
        if option in decision:
            return {"next": option}
    return {"next": "finish"}

def researcher(state: State):
    system = (
        "You are a researcher. Gather key facts and constraints for the task. "
        "Be concise and list only essential points."
    )
    return _call_agent(system, state["messages"], "researcher")

def writer(state: State):
    system = (
        "You are a writer. Produce a clear, structured response using the context. "
        "If facts are missing, note assumptions."
    )
    return _call_agent(system, state["messages"], "writer")

def critic(state: State):
    system = (
        "You are a critic. Identify gaps, risks, or unclear parts in the draft, "
        "then suggest improvements."
    )
    return _call_agent(system, state["messages"], "critic")

def route_next(state: State) -> Literal["researcher", "writer", "critic", "finish"]:
    return state["next"]

builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("critic", critic)

builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
    "supervisor",
    route_next,
    {
        "researcher": "researcher",
        "writer": "writer",
        "critic": "critic",
        "finish": END,
    },
)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", "supervisor")
builder.add_edge("critic", "supervisor")

app = builder.compile()

if __name__ == "__main__":
    user_input = "创建一款广告招商的帖子"
    initial_state = {
        "messages": [{"role": "user", "content": user_input}],
        "next": "supervisor",
        "turn": 0,
    }
    for event in app.stream(initial_state):
        for value in event.values():
            if "messages" in value:
                msg = value["messages"][-1]
                name = msg.get("name", "assistant")
                print(f"[{name}] {msg['content']}")

Reference

https://datawhalechina.github.io/

https://www.philschmid.de/agentic-pattern

https://blog.dailydoseofds.com/p/5-agentic-ai-design-patterns

https://zhuanlan.zhihu.com/p/1972437682400519404

https://www.zhihu.com/people/yuan-chelsea

https://ywctech.net/ml-ai/langchain-langgraph-agent-part2