타임트리

[LangGraph] Subgraph State(상태) 본문

LLM/LangGraph

[LangGraph] Subgraph State(상태)

sean_j 2025. 1. 2. 01:33

subgraph를 추가할 때, 메모리를 추가하면 언제든지 subgraph의 state를 확인하고 업데이트할 수 있다. 이를 활용하면 human-in-the-loop 패턴도 다양하게 사용 가능하다.

  • 그래프를 중단하고 사용자에게 state를 보여준 뒤, 사용자가 작업 수행 여부 결정
  • subgraph를 rewind 해서 문제 재현 또는 방지
  • 사용자가 subgraph 작업 전 state 수정을 통해 제어

1. subgraph 정의

먼저 특정 도시의 날씨를 가져올 수 있는 간단한 subgraph를 정의하자. 그리고, weather_node 전에 그래프를 중단하도록 compile하자.

with_structured_output 메서드는 LLM의 출력을 사전 정의된 pydantic 모델 타입으로 변환
(pydantic이 아닌) 함수/tool을 인자로 넣은 경우, LLM에 tool을 binding 하고 dict type으로 input을 반환되며 보장되지는 않음

 

import random
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool


@tool
def get_weather(city: str):
    """Get the weather for a spectific city"""
    weather_list = ["맑음", "흐림", "비", "눈", "태풍", "안개"]
    return f"{city}의 날씨는 현재 {random.choice(weather_list)}입니다."


llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_structred_output = llm.with_structured_output(get_weather)


# 상태 정의
class SubGraphState(TypedDict):
    messages: Annotated[list, add_messages]
    city: str


def model(state: SubGraphState) -> SubGraphState:
    response = llm_with_structred_output.invoke(state["messages"])
    return {"city": response["city"]}


def weather_node(state: SubGraphState) -> SubGraphState:
    response = get_weather.invoke({"city": state["city"]})
    return {"messages": [("assistant", response)]}


subgraph_flow = StateGraph(SubGraphState)
subgraph_flow.add_node("model", model)
subgraph_flow.add_node("weather_node", weather_node)
subgraph_flow.add_edge(START, "model")
subgraph_flow.add_edge("model", "weather_node")
subgraph_flow.add_edge("weather_node", END)

subgraph = subgraph_flow.compile(interrupt_before=["weather_node"])

2. parent graph 정의

이제 전체 그래프를 구축하자. 이 그래프는 날씨를 가져와야 하는 경우에는 subgraph로 라우팅 되고, 그렇지 않으면 그냥 chatbot 노드로 라우팅 된다.

from typing import Annotated, Literal, TypedDict
from langchain_core.messages import SystemMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.message import add_messages


# 상태 정의
class RouterState(TypedDict):
    messages: Annotated[list, add_messages]
    route: Literal["weather", "other"]


# router model의 응답 정의
class Router(TypedDict):
    route: Literal["weather", "other"]


# 노드 정의
router_model = llm.with_structured_output(Router)


def router_node(state: RouterState) -> RouterState:
    messages = [
        SystemMessage(content="아래 내용에 대해 weather인지 other인지 분류해주세요.")
    ] + state["messages"]
    response = router_model.invoke(messages)
    return {"route": response["route"]}


def chatbot(state: RouterState) -> RouterState:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}


def route_after_pred(state: RouterState) -> Literal["chatbot", "weather_graph"]:
    if state["route"] == "weather":
        return "weather_graph"
    return "chatbot"


flow = StateGraph(RouterState)
flow.add_node("router", router_node)
flow.add_node("chatbot", chatbot)
flow.add_node("weather_graph", subgraph)
flow.add_edge(START, "router")
flow.add_conditional_edges(
    "router",
    route_after_pred,
    # {"chatbot": "chatbot",
    #  "weather_graph": "weather_graph"}
)
flow.add_edge("weather_graph", END)
flow.add_edge("chatbot", END)

memory = MemorySaver()
graph = flow.compile(checkpointer=memory)

from IPython.display import display, Image

display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

 

의도한 대로 작동하는지 일반 쿼리로 테스트해 보자.

 

날씨에 대해 물어보지 않았기 때문에, router 노드에서 route key에 other를 업데이트했고, chatbot 노드로 이동해 정상적으로 응답을 받았다.

input = {"messages": [("user", "안녕하세요!")]}
config = {"configurable": {"thread_id": "1"}}
events = graph.stream(input, config, stream_mode="updates")

for event in events:
    print(event)
{'router': {'route': 'other'}}
{'chatbot': {'messages': [AIMessage(content='안녕하세요! 어떻게 도와드릴까요?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 10, 'total_tokens': 21, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_d02d531b47', 'finish_reason': 'stop', 'logprobs': None}, id='run-f457e0c4-6cf7-463c-9cb1-7bc759ec763d-0', usage_metadata={'input_tokens': 10, 'output_tokens': 11, 'total_tokens': 21, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}}

3. 중단된 위치부터 재개

stream 메서드 호출 시 subgraphs=True 인자로 subgraph 내부의 상태값 접근 가능하다.
구체적으로 반환값의 tasks로 subgroup이 걸려있다!

 

이제 중단점을 살펴보자. subgraph weather_graph를 compile 할 때, interrupt_before=["weather_node"]를 선언했기 때문에 weather_graph로 라우팅 하는 쿼리를 사용해 보자.

config = {"configurable": {"thread_id": "12"}}
input = {"messages": [("user", "서울의 날씨는 지금 어떤가요?")]}
events = graph.stream(input, config, stream_mode="updates")

for event in events:
    print(event)
{'router': {'route': 'weather'}}
{'__interrupt__': ()}

 

위 결과를 보면 weatherroute key의 값으로 들어가고, weather_graph 경로로 들어가 weather_node 전에 중단이 발생했다. 그런데, 단순히 graph.stream() 메서드로 호출했을 때는 subgraph 내부의 과정이 보이지 않는다(subgraph까지 streaming 되지 않음!).

 

subgraph도 streaming 하기 위해서는 인자로 subgraphs=True를 넘겨줘야 한다.

config = {"configurable": {"thread_id": "19"}}
input = {"messages": [("user", "서울의 날씨는 지금 어떤가요?")]}
events = graph.stream(input, config, stream_mode="updates", subgraphs=True)

for event in events:
    print(event)
((), {'router': {'route': 'weather'}})
(('weather_graph:862ec5db-cc9d-0a02-ee1e-faa3b060cf2e',), {'model': {'city': '서울'}})
((), {'__interrupt__': ()})

 

지금 graph의 state를 snapshot으로 가져와 확인해 보면, weather_graph에서 일시 중지되었음을 알 수 있다.

snapshot = graph.get_state(config)
print(snapshot.values)
print(snapshot.next)
{'messages': [HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='c1c59f18-7f5d-48c5-9855-f9610f3a210e'), HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='3a72f0f7-e660-44ba-b4fd-e429815ef5f9')], 'route': 'weather'}
('weather_graph',)

 

현재 State에서 pending중인 task를 살펴보기 위해서 .task 메서드를 사용할 수 있다. 살펴보면, subgraph task에 해당하는 weather_graph 하나가 남아 있다는 것을 확인할 수 있다.

print(snapshot.tasks)
(PregelTask(id='c9443402-3fb2-c628-2391-2c131b0da54f', name='weather_graph', path=('__pregel_pull', 'weather_graph'), error=None, interrupts=(), state={'configurable': {'thread_id': '12', 'checkpoint_ns': 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f'}}, result=None),)

 

그런데, parent graph의 config로 state를 얻은 것이기 때문에 subgraph의 state에 접근할 수 없다(또한 앞선 parent state에는 city key가 존재하지 않는다).

Subgraph의 state에 접근하기 위해서는 subgraphs=True 인자를 get_state 메서드를 사용할 때 넘겨주어야 한다!

  • weather_graph의 state에는 messagescity key가 있다.
  • model 노드에 city key를 업데이트한다.
  • 아래 결과를 보면, city key가 보이고 subgraph의 state에 접근 가능한 것을 확인 가능!

PregelTask의 state 값을 보면 다음 노드(weather_node) 및 현재 state 값(예: city)과 같이 필요한 모든 정보가 포함되어 있다.

state = graph.get_state(config, subgraphs=True)

print(state.tasks[0])
PregelTask(id='c9443402-3fb2-c628-2391-2c131b0da54f', name='weather_graph', path=('__pregel_pull', 'weather_graph'), error=None, interrupts=(), state=StateSnapshot(values={'messages': [HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='c1c59f18-7f5d-48c5-9855-f9610f3a210e'), HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='3a72f0f7-e660-44ba-b4fd-e429815ef5f9')], 'city': '서울'}, next=('weather_node',), config={'configurable': {'thread_id': '12', 'checkpoint_ns': 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f', 'checkpoint_id': '1efc84eb-69a0-636d-8001-a0e290d4222c', 'checkpoint_map': {'': '1efc84eb-62c6-6dd2-8004-ec2cc9bea8d4', 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f': '1efc84eb-69a0-636d-8001-a0e290d4222c'}}}, metadata={'source': 'loop', 'writes': {'model': {'city': '서울'}}, 'thread_id': '12', 'langgraph_step': 5, 'langgraph_node': 'weather_graph', 'langgraph_triggers': ['branch:router:route_after_pred:weather_graph'], 'langgraph_path': ['__pregel_pull', 'weather_graph'], 'langgraph_checkpoint_ns': 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f', 'checkpoint_ns': 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f', 'step': 1, 'parents': {'': '1efc84eb-62c6-6dd2-8004-ec2cc9bea8d4'}}, created_at='2025-01-01T14:43:06.831563+00:00', parent_config={'configurable': {'thread_id': '12', 'checkpoint_ns': 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f', 'checkpoint_id': '1efc84eb-62cb-6bd5-8000-c78d563c032e', 'checkpoint_map': {'': '1efc84eb-62c6-6dd2-8004-ec2cc9bea8d4', 'weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f': '1efc84eb-62cb-6bd5-8000-c78d563c032e'}}}, tasks=(PregelTask(id='be5a389f-9c5a-59fa-5813-d69188270e70', name='weather_node', path=('__pregel_pull', 'weather_node'), error=None, interrupts=(), state=None, result=None),)), result=None)

 

중단된 위치에서 다시 graph를 재개하려면 기존과 똑같이 input=None을 주고 실행하면 된다.

for event in graph.stream(None, config, stream_mode="values", subgraphs=True):
    print(event)

4. 특정 subgraph node에서 리플레이

위에서는 parent graph에서 재생을 시작했다. 이 경우 subgraph는 weather_node 이전에 멈추고, 이전 state에 상관없이 자동으로 리플레이했다.

 

이와 반대로, subgraph 내부에서 리플레이를 하는 것도 가능하다. 이를 위해 리플레이하려는 정확한 subgraph의 config를 가져와야 한다.

 

subgraph에서 특정 노드(model 노드 등) 이전 state로 돌아가려면 subgraph 상태 기록(state history) 을 탐색해야 한다.

4.1 Subgraph의 state history에서 필요한 state 찾기

Subgraph의 state history를 탐색하여, 특정 노드 이전 state를 추출하는 과정은 다음과 같다.

  1. Subgraph의 state history 가져오기
    특정 subgraph state로 이동하려면 해당 graph의 state history에서 .next 파라미터를 기준으로 필터링
# subgraph에 들어가기 직전의 snapshot을 .next 파라미터로 필터링
parent_graph_state_before_subgraph = next(
    h for h in graph.get_state_history(config) if h.next == ("weather_graph",)
)

# model 노드 이전 snapshot 필터링 (tasks로 subgraph config 접근)
subgraph_state_before_model_node = next(
    h
    for h in graph.get_state_history(parent_graph_state_before_subgraph.tasks[0].state)
    if h.next == ("model",)
)

 

올바른 상태를 선택했는지 .next 값을 출력하여 검증해보자.

subgraph_state_before_model_node.next
('model',)

 

결과를 보면 subgraph의 model 노드에 가기 직전 snapshot을 올바르게 가져왔다. 이제 subgraph 내부의 model노드부터 다시 리플레이해 보자.

for event in graph.stream(None, config=subgraph_state_before_model_node.config, stream_mode="values", subgraphs=True):
    print(event)
((), {'messages': [HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='c1c59f18-7f5d-48c5-9855-f9610f3a210e'), HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='3a72f0f7-e660-44ba-b4fd-e429815ef5f9')], 'route': 'weather'})
(('weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f',), {'messages': [HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='c1c59f18-7f5d-48c5-9855-f9610f3a210e'), HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='3a72f0f7-e660-44ba-b4fd-e429815ef5f9')]})
(('weather_graph:c9443402-3fb2-c628-2391-2c131b0da54f',), {'messages': [HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='c1c59f18-7f5d-48c5-9855-f9610f3a210e'), HumanMessage(content='서울의 날씨는 지금 어떤가요?', additional_kwargs={}, response_metadata={}, id='3a72f0f7-e660-44ba-b4fd-e429815ef5f9')], 'city': '서울'})

5. Subgraph state 업데이트

5.1 subgraph의 state 업데이트

subgraph의 state를 업데이트하려면, 일반 graph의 state를 업데이트하는 방법과 유사하게 가능하다. 즉, config만 잘 가져와서 .update_state 메서드를 사용하면 된다.

# 새로운 thread 시작
config = {"configurable": {"thread_id": "123"}}
input = {"messages": [("user", "부산의 날씨는 지금 어때요?")]}
for update in graph.stream(input, config, stream_mode="updates"):
    print(update)
{'router': {'route': 'weather'}}
{'__interrupt__': ()}
state = graph.get_state(config, subgraphs=True)
print(state.values)
{'messages': [HumanMessage(content='부산의 날씨는 지금 어때요?', additional_kwargs={}, response_metadata={}, id='8f2b5444-280b-4acf-b6a9-18293280752c')], 'route': 'weather'}

 

subgraph의 state를 업데이트하려면 subgraph에 대한 config를 전달해야 하고, 앞서 살펴본 것처럼 state.tasks[0].state를 사용해 얻을 수 있다. subgraph 내부에서 중단되었기 때문에 task의 상태는 subgraph의 state다.

graph.update_state(state.tasks[0].state, {"city": "울산"})
{'configurable': {'thread_id': '123',
  'checkpoint_ns': 'weather_graph:288abb0a-6790-92fd-9ad2-fda23b6ce94c',
  'checkpoint_id': '1efc858b-3d2c-6e3a-8002-b61c58880090',
  'checkpoint_map': {'': '1efc8582-892c-6a71-8001-3b45a3acee7d',
   'weather_graph:288abb0a-6790-92fd-9ad2-fda23b6ce94c': '1efc858b-3d2c-6e3a-8002-b61c58880090'}}}

 

이제 parent graph의 스트리밍을 재개하자(그래도 subgraph가 재개된다). "부산" 대신 "울산"을 city로 잘 사용했는지 확인해 보자.

for update in graph.stream(None, config, stream_mode="updates", subgraphs=True):
    print(update)
(('weather_graph:288abb0a-6790-92fd-9ad2-fda23b6ce94c',), {'weather_node': {'messages': [('assistant', '울산의 날씨는 현재 눈입니다.')]}})
((), {'weather_graph': {'messages': [HumanMessage(content='부산의 날씨는 지금 어때요?', additional_kwargs={}, response_metadata={}, id='8f2b5444-280b-4acf-b6a9-18293280752c'), AIMessage(content='울산의 날씨는 현재 눈입니다.', additional_kwargs={}, response_metadata={}, id='53b7c70c-8903-4dc3-a258-95ffdbd5c700')]}})

5.2 subgraph node 역할 대체

이번에는 subgraph의 node인 것처럼 update_state를 사용해 보자.

즉, as_node="weather_node"옵션으로 model의 output을 변경하지 말고 weather_node의 output을 직접 만들어보자.

config = {"configurable": {"thread_id": "1212"}}
input = {"messages": [("user", "인천의 날씨는 어때요?")]}
for update in graph.stream(input, config, stream_mode="updates", subgraphs=True):
    print(update)

print("=============== 중단! ===============")

state = graph.get_state(config, subgraphs=True)

#  weather_node에서 반환할 메시지를 전달하여 state 업데이트
graph.update_state(
    state.tasks[0].state.config,
    {"messages": [("assistant", "인천은 날씨가 매우 화창합니다🔥")]},
    as_node="weather_node"
)

for update in graph.stream(None, config, stream_mode="updates", subgraphs=True):
    print(update)
((), {'router': {'route': 'weather'}})
(('weather_graph:55b4eb97-8e84-a65b-0304-707bc332fc47',), {'model': {'city': '인천'}})
((), {'__interrupt__': ()})
=============== 중단! ===============
((), {'weather_graph': {'messages': [HumanMessage(content='인천의 날씨는 어때요?', additional_kwargs={}, response_metadata={}, id='43279a22-2f22-46cc-b13a-d81a794a0cef'), AIMessage(content='인천은 날씨가 매우 화창합니다🔥', additional_kwargs={}, response_metadata={}, id='978803e4-48fd-4b13-b526-b92fe5e3d067')]}})

5.3 subgraph 전체 대체

subgraph의 state를 수정하는 예제의 마지막으로, subgraph 전체를 마치 동작한 것처럼 state 업데이트가 가능하다. 위에서 살펴본 subgraph node 역할 대체와 유사하다. 대신 update_state 메서드에 인자로 parent graph의 config를 전달한다.

config = {"configurable": {"thread_id": "121212"}}
input = {"messages": [("user", "인천의 날씨는 어때요?")]}
for update in graph.stream(input, config, stream_mode="updates", subgraphs=True):
    print(update)

print("=============== 중단! ===============")

#  weather_graph에서 반환할 메시지를 전달하여 state 업데이트
graph.update_state(
    config,
    {"messages": [("assistant", "인천은 날씨가 매우 화창합니다🔥")]},
    as_node="weather_graph"    # as_node로 subgraph
)

for update in graph.stream(None, config, stream_mode="updates"):
    print(update)
((), {'router': {'route': 'weather'}}) (('weather_graph:61758531-f1f8-aaa9-5347-5455ce8f0713',), {'model': {'city': '인천'}}) ((), {'__interrupt__': ()}) =============== 중단! ===============
print(graph.get_state(config).values["messages"])
[HumanMessage(content='인천의 날씨는 어때요?', additional_kwargs={}, response_metadata={}, id='be2b9fb8-15f4-44ff-abbc-f8c65370397a'), AIMessage(content='인천은 날씨가 매우 화창합니다🔥', additional_kwargs={}, response_metadata={}, id='c2947cb6-e675-4c71-8c5d-c936da6e02a0'), HumanMessage(content='인천의 날씨는 어때요?', additional_kwargs={}, response_metadata={}, id='d92daf81-6d58-4183-8167-4266c7380ccc'), AIMessage(content='인천은 날씨가 매우 화창합니다🔥', additional_kwargs={}, response_metadata={}, id='0bfe673f-4a21-48ab-a0a6-0698a711161e')]

 

 

---

출처

LangGraph. "How to view and update state in subgraphs". https://langchain-ai.github.io/langgraph/how-tos/subgraphs-manage-state/