타임트리

[LangGraph] Branches for parallel node execution(병렬 노드 실행) 본문

LLM/LangGraph

[LangGraph] Branches for parallel node execution(병렬 노드 실행)

sean_j 2024. 12. 31. 04:43

병렬 노드 실행을 위한 브랜치 생성

앞서 포스팅까지는 LangGrpah에서 하나의 노드에서 다른 여러 노드로 조건부로 분기하는 conditional_edge까지 다루고 있다. conditional_edge는 여러 노드 중 하나만 선택한다. 그런데, 만약 여러 프로세스를 동시에 처리가 필요한 경우를 생각해보자. 예를 들어, 리포트를 생성하고자 할 때 여러 관점(우호적/중립적/부정적)에서 각 LLM이 특정 주제에 대해 조사하도록 시키는 작업을 만들고자 한다.

 

 
이때 주황 박스로 표시한 부분은 순차적으로 처리하기보다는 병렬 처리를 통해 전체 그래프 작업의 속도 향상이 가능하다.
다행히도 LangGraph는 Node의 병렬 실행을 기본적으로 지원한다. LangGraph에서 병렬 처리는 fan-out과 fan-in 메커니즘을 통해 이루어진다.

Fan-out과 fan-in

여기서 Fan-out은 병렬 처리를 위해 흩어지는 부분, Fan-in은 병렬 처리가 끝난 뒤 합쳐지는 구간을 말한다. 위 그림에서는 3개의 노드로 확장되는 부분인 fan-out, Generate_comprehensive_reports 노드로 합쳐지는 부분을 fan-in이라고 부른다.

우선 간단히 아래와 같은 그래프를 생성해보자. 아래 그래프는 A 노드에서 B, C, D 노드로 fan-out 한 다음, E 노드로 fan-in 한다.

 

상태 관점에서는 상태에 reducer add operation operator.add를 지정한다. 이렇게 하면 State의 특정 key에 대한 값이 덮어쓰는 것이 아니라 결합되거나 누적된다. list의 경우 새로운 list를 기존 liist에 concat한다. (add_messages와 유사)

 

 

import operator
from typing import Annotated, Any, TypedDict

from langgraph.graph import StateGraph, START, END

# 상태 정의
class State(TypedDict):
    # operator.add 리듀서 함수가 기존 list에 새로운 list를 concat하도록 함
    aggregate: Annotated[list, operator.add]


# 호출 시 node의 값(node_secret)을 "aggregate" 키에 리스트 값으로 반환하는 클래스
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


flow = StateGraph(State)

flow.add_node("a", ReturnNodeValue("Start research with this topic!"))
flow.add_node("b", ReturnNodeValue("negative perspective"))
flow.add_node("c", ReturnNodeValue("neutral perspective"))
flow.add_node("d", ReturnNodeValue("positive perspective"))
flow.add_node("e", ReturnNodeValue("I'm writing a report"))

flow.add_edge(START, "a")
# fan-out
flow.add_edge("a", "b")
flow.add_edge("a", "c")
flow.add_edge("a", "d")
# fan-in
flow.add_edge("b", "e")
flow.add_edge("c", "e")
flow.add_edge("d", "e")
flow.add_edge("e", END)

graph = flow.compile()

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

 

이제 그래프를 호출하면서 input 값으로 빈 리스트를 aggregate 키에 전달하면, 각 노드의 output이 누적되는 것을 확인할 수 있다!

  1. a 노드 실행
  2. b, c, d 노드 실행 (같은 시점에 실행)
  3. e 노드 실행
input = {"aggregate": []}
config = {"configurable": {"thread_id": "dummy"}}
graph.invoke(input, config)
Adding Start research with this topic! to [] 
Adding negative perspective to ['Start research with this topic!'] 
Adding neutral perspective to ['Start research with this topic!'] 
Adding positive perspective to ['Start research with this topic!'] 
Adding I'm writing a report to ['Start research with this topic!', 'negative perspective', 'neutral perspective', 'positive perspective']
{'aggregate': ['Start research with this topic!',
  'negative perspective',
  'neutral perspective',
  'positive perspective',
  "I'm writing a report"]}

LangGraph는 superstep 내에서 노드를 수행한다. 여기서 superstep이란, 여러 노드가 동시점에 실행되는 스텝이라고 보면 된다. 즉, 병렬 처리가 일어나는 스텝은 같은 superstep이 된다.
따라서, 위 B, C, D 노드 중 하나라도 에러가 발생한다면 해당 superstep 오류로 간주되어서 A 노드 이후 상태 업데이트가 일어나지 않는다!

 

fan-out 후 추가 단계가 있는 경우

 
앞서 만든 그래프는 fan-out 후 모두 한 단계의 경로만 가지고 있다. 그런데 만약 fan-out 후 특정 경로에서는 추가 단계가 있다면 해당 노드끼리 edge를 추가하면 된다.
 

 

또, fan-in 단계에서 보듯 list 형태로도 노드를 묶어 전달해 코드의 가독성을 높일 수 있다.

import operator
from typing import Annotated, Any, TypedDict

from langgraph.graph import StateGraph, START, END


# 상태 정의
class State(TypedDict):
    # operator.add 리듀서 함수가 기존 list에 새로운 list를 concat하도록 함
    aggregate: Annotated[list, operator.add]


# 호출 시 node의 값(node_secret)을 "aggregate" 키에 리스트 값으로 반환하는 클래스
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


flow = StateGraph(State)

flow.add_node("a", ReturnNodeValue("Start research with this topic!"))
flow.add_node("b", ReturnNodeValue("negative perspective"))
flow.add_node("c", ReturnNodeValue("neutral perspective"))
flow.add_node("d1", ReturnNodeValue("positive perspective1"))
flow.add_node("d2", ReturnNodeValue("positive perspective2"))
flow.add_node("e", ReturnNodeValue("I'm writing a report"))

flow.add_edge(START, "a")
# fan-out
flow.add_edge("a", "b")
flow.add_edge("a", "c")
flow.add_edge("a", "d1")
# additional steps
flow.add_edge("d1", "d2")
# fan-in
# flow.add_edge("b", "e")
# flow.add_edge("c", "e")
# flow.add_edge("d2", "e")
flow.add_edge(["b", "c", "d2"], "e")
flow.add_edge("e", END)

graph = flow.compile()

 
마찬가지로 그래프를 호출해보면, 각 노드의 output이 누적되는 것을 확인할 수 있다!

Adding Start research with this topic! to []
Adding negative perspective to ['Start research with this topic!']
Adding neutral perspective to ['Start research with this topic!']
Adding positive perspective1 to ['Start research with this topic!']
Adding positive perspective2 to ['Start research with this topic!', 'negative perspective', 'neutral perspective', 'positive perspective1']
Adding I'm writing a report to ['Start research with this topic!', 'negative perspective', 'neutral perspective', 'positive perspective1', 'positive perspective2']
{'aggregate': ['Start research with this topic!',
  'negative perspective',
  'neutral perspective',
  'positive perspective1',
  'positive perspective2',
  "I'm writing a report"]}

조건부 분기 (Conditional Branching)

fan-out이 결정되어있지 않은 경우, add_conditional_edges를 직접 사용할 수 있다. 즉, A 노드에서 B, C, D로 모두 가는 것이 아니라, 조건에 따라 BC 노드만 혹은 CD 노드로만 이동하도록 조건을 둘 수 있다.

 

 

이를 위해, add_conditional_edges 메서드를 사용할 때, path 함수의 반환값을 Node의 list로 반환하도록 작성하면 된다. 코드를 통해 확인해보자.

import operator
from typing import Annotated, Any, TypedDict

from langgraph.graph import StateGraph, START, END


# 상태 정의
class State(TypedDict):
    # operator.add 리듀서 함수가 기존 list에 새로운 list를 concat하도록 함
    aggregate: Annotated[list, operator.add]
    # 조건부 분기를 위함
    which: str


# 호출 시 node의 값(node_secret)을 "aggregate" 키에 리스트 값으로 반환하는 클래스
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


flow = StateGraph(State)

flow.add_node("a", ReturnNodeValue("Start research with this topic!"))
flow.add_node("b", ReturnNodeValue("negative perspective"))
flow.add_node("c", ReturnNodeValue("neutral perspective"))
flow.add_node("d", ReturnNodeValue("positive perspective"))
flow.add_node("e", ReturnNodeValue("I'm writing a report"))

def route_bc_or_cd(state: State):
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

flow.add_edge(START, "a")
# fan-out
flow.add_conditional_edges(
    "a",
    route_bc_or_cd,
    ["b", "c", "d"]
)

# fan-in
flow.add_edge("b", "e")
flow.add_edge("c", "e")
flow.add_edge("d", "e")
flow.add_edge("e", END)

graph = flow.compile()

 

의도한대로 병렬 처리가 조건부로 수행되었다. which='bc' 인자로 D 노드는 수행되지 않고 B, C 노드만 수행되고 E 노드로 이동했다.

input = {"aggregate": [], "which": "bc"}
config = {"configurable": {"thread_id": "42"}}

graph.invoke(input, config)

 

Adding Start research with this topic! to []
Adding negative perspective to ['Start research with this topic!']
Adding neutral perspective to ['Start research with this topic!']
Adding I'm writing a report to ['Start research with this topic!', 'negative perspective', 'neutral perspective']
{'aggregate': ['Start research with this topic!',
  'negative perspective',
  'neutral perspective',
  "I'm writing a report"],
 'which': 'bc'}

Sorting

앞서 말했듯 fan-out 된 후의 결과는 단일의 superstep으로 처리한다. 즉, 이 단계의 상태 업데이트는 superstep 내 모든 단계가 완료되고 난 뒤 일어난다.

또한, superstep은 논리적으로는 병렬 처리를 수행하지만 실제 내부에서는 일종의 순서가 존재한다. 따라서, 각 fan-out된 노드에서의 처리 결과를 모을 때 순서가 생기게 된다. 앞에 예제에서도 A -> (B, C, D) -> E 순으로 aggregate에 합쳐졌다.

근데 만약 B, C, D 순서가 아니라, fan-in할 때 C, D, B 순서대로 결과를 정렬하고 싶다면 어떻게 해야할까?

 

만약 병렬 superstep에서 사전에 정의한 순서대로 결과물을 합치고 싶다면, State의 별도 Field에 (식별 키와 함께) 출력을 기록한 다음 마지막 노드에서 결합해야 한다.

 

즉, 병렬 처리되는 노드들의 결과를 모아두는 fanout_values 를 State에 추가하고, fan-in이 되는 노드에서 해당 fanout_values를 가중치에 해당하는 weight로 정렬하고 aggregate에 추가하는 로직이다.

import operator
from typing import Annotated, Any, TypedDict, Sequence

from langgraph.graph import StateGraph, START, END

# fan-in 후, fanout_values를 초기화하기 위함
def reduce_fanouts(left, right):
    if left is None:
        left = []
    if not right:
        return []    # overwrite
    return left + right


# 상태 정의
class State(TypedDict):
    # operator.add 리듀서 함수가 기존 list에 새로운 list를 concat하도록 함
    aggregate: Annotated[list, operator.add]
    # 병렬 처리 노드의 결과를 모아두는 필드드
    fanout_values: Annotated[list, reduce_fanouts]
    which: str


# 호출 시 node의 값(node_secret)을 "aggregate" 키에 리스트 값으로 반환하는 클래스
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}

# 병렬 처리 노드 별도 정의
class ParallelReturnNodeValue:
    def __init__(self, node_secret: str, weight: float):
        self._value = node_secret
        self._weight = weight

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']} in parallel.")
        return {
            "fanout_values": [
                {
                    "value": self._value,
                    "weight": self._weight
                }
            ]
        }

# fan-in 후의 sink node 정의 (기존의 e 노드)
def aggregate_fanout_values(state: State) -> Any:
    # 가중치로 정렬
    ranked_values = sorted(
        state["fanout_values"], key=lambda x: x["weight"], reverse=True
    )
    return {
        "aggregate": [x["value"] for x in ranked_values] + ["I'm writing a report"],
        "fanout_values": []
    }


flow = StateGraph(State)

flow.add_node("a", ReturnNodeValue("Start research with this topic!"))
flow.add_node("b", ParallelReturnNodeValue("negative perspective", weight=0.1))
flow.add_node("c", ParallelReturnNodeValue("neutral perspective", weight=0.9))
flow.add_node("d", ParallelReturnNodeValue("positive perspective", weight=0.5))
flow.add_node("e", aggregate_fanout_values)

def route_bc_or_cd(state: State):
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

flow.add_edge(START, "a")
# fan-out
flow.add_conditional_edges(
    "a",
    route_bc_or_cd,
    ["b", "c", "d"]
)

# fan-in
flow.add_edge("b", "e")
flow.add_edge("c", "e")
flow.add_edge("d", "e")
flow.add_edge("e", END)

graph = flow.compile()

 

의도한대로 병렬 처리가 조건부로 수행되면서, 이전과는 달리 weight를 더 높게 준 C 노드의 결과가 먼저 aggregate에 누적되는 것을 확인 할 수 있다.

Adding Start research with this topic! to []
Adding negative perspective to ['Start research with this topic!'] in parallel.
Adding neutral perspective to ['Start research with this topic!'] in parallel.
{'aggregate': ['Start research with this topic!',
  'neutral perspective',
  'negative perspective',
  "I'm writing a report"],
 'fanout_values': [],
 'which': 'bc'}

 
 
---
출처:
LangGraph. "How to create branches for parallel node execution". https://langchain-ai.github.io/langgraph/how-tos/branching/
위키독스 - <랭체인LangChain 노트> - LangChain 한국어 튜토리얼🇰🇷  (https://wikidocs.net/book/14314)

'LLM > LangGraph' 카테고리의 다른 글

[LangGraph] - Subgraph(서브그래프) 1  (0) 2025.01.01
[LangGraph] 과거 대화 이력의 요약  (0) 2025.01.01
[LangGraph] ToolNode  (0) 2024.12.31
[LangGraph] Delete Messages  (0) 2024.12.31
[LangGraph] Human Node (LLM이 판단)  (0) 2024.12.30