일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- toolnode
- summarize_chat_history
- tool_call_chunks
- langgrpah
- update_state
- 추천시스템
- agenticrag
- Ai
- RecSys
- add_subgraph
- tool_calls
- humannode
- human-in-the-loop
- removemessage
- 밑바닥부터 시작하는 딥러닝
- 강화학습의 수학적 기초와 알고리듬 이해
- conditional_edges
- Python
- langgraph
- LangChain
- adaptive_rag
- rl
- 강화학습의 수학적 기초와 알고리듬의 이해
- 밑바닥부터시작하는딥러닝 #딥러닝 #머신러닝 #신경망
- subgraph
- pinecone
- REACT
- 강화학습
- chat_history
- rag
- Today
- Total
타임트리
[LangGraph] Agentic RAG 본문
Agentic RAG
일반적인 RAG (흔히 말하는 Naive RAG 혹은 Advanced RAG)의 경우, 개발자가 전체적인 흐름을 정의하게 된다. 그리고 해당 흐름은 고정되어 있다.
아래와 같은 그래프의 경우 에이전트를 사용하지 않은 RAG의 흐름이다. 아래 흐름에서는 우선 문서를 검색해서 가져온다. 그리고 문서의 연관성을 판단한 뒤, 만약 질문과 연관있는 문서가 retrieve되지 않았다면, 쿼리를 재작성하고 새로 작성된 쿼리로 문서를 가져온다.
다시 생각해보면, 위 흐름은 고정되어 있기 때문에 질문이 들어오면 해당 질문을 토대로 무조건 문서를 가져오게 된다. 즉, 벡터스토어로부터 문서를 검색하는 선택지 이외의 web search가 필요한지 여부 혹은 바로 LLM의 답변이 필요한지 여부는 고려하지 않는다. 이럴 경우, 불필요한 과정을 거치게 되므로 시간 그리고 임베딩 비용 등 불필요한 비용이 발생한다.
이런 경우, 좀 더 유연하고 효율적인 해결책은 두 가지로 생각해볼 수 있다.
- Agent를 사용하지 않고, 라우터를 구현
- Agent를 사용해서 LLM이 도구 사용 여부를 결정하도록 구현
이번에는 2번에 해당하는 Agentic RAG를 랭그래프로 구현해보자! (다음 글에서는 query의 복잡도를 판별해 task를 라우팅하는 Adaptive RAG를 다룬다)
즉, Agent가 스스로 생각하고 도구를 사용할지 여부를 결정하도록 해보자. 위 그림을 다시 보면서 생각해보면, Retrieve tool
을 갖고 있는 검색 에이전트를 구현하고, 이 에이전트가 도구 호출이 필요하다면 해당 tool로 문서를 검색하고 relevance check를 통해 연관성을 살펴보는 구조를 생각해볼 수 있다.
먼저 우리가 그리고자하는 flow는 아래와 같이 도식화 할 수 있다.
- Retrieve tool을 가진 LLM Agent가 사용자의 query를 바탕으로 tool 사용 여부 결정
tool_calls
반환 여부로 분기tool_calls
를 반환하지 않는다면, 바로 답변 반환 (END node)
tool_calls
를 반환한다면 문서 검색 수행 (Tool Node)- 검색한 문서와 사용자 query와의 관련성 검사 수행 (relevance check)
- 관련성이 있다면, 답변 생성 후 종료 (Generate node -> END node)
- 관련성이 없다면, 사용자 쿼리 재작성 후 1번으로 이동 (Agent node)
단, 주의해야할 점은 이번에는 이 flow로 구현하겠지만, 이 flow는 무한 루프에 빠질 가능성이 존재한다. 만약 LLM이 바로 답변할 수 없고 검색이 필요하지만, vector store에 해당 정보가 없는 경우 Agent - Tool Node - Rewrite 가 무한으로 순회하게 된다. 따라서, 이러한 무한 루프를 방지하기 위해서는 graph를 호출할 때 config에 recursion_limit
인자로 최대 방문 노드를 제한하거나, Web 검색 tool 등을 추가로 LLM에게 바인딩하거나 하는 처리가 필요하다.
1. retriever tool
여기서는 단순한 구조의 retriever를 사용하자. WebBaseLoader로 내 블로그의 langgraph 관련 글 2개를 가져올 예정이다. 2개 글은 langgrpah에 멀티턴을 사용한 예제와 병렬 노드 처리에 대한 글이다.
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
urls = [
"https://sean-j.tistory.com/entry/LangGraph-Add-summary-of-the-conversation-history", # 멀티턴 글
"https://sean-j.tistory.com/entry/LangGraph-Branches-for-parallel-node-execution" # 병렬 노드 글
]
# load documents
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
# split documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
doc_splits = text_splitter.split_documents(docs_list)
# add to vector db
vectorstore = Chroma.from_documents(
doc_splits,
OpenAIEmbeddings(),
collection_name="langgraph_tistory",
)
# init retriever
retriever = vectorstore.as_retriever()
그리고 LLM에게 쥐어줄 도구로 retriever tool을 만든다. decription을 통해 이 도구는 langgraph 관련 블로그 글 검색이 필요할 때 사용할 수 있다고 알려주자.
from typing import List
from langchain.schema import Document
from langchain_core.tools import tool
def format_docs(docs: List[Document]) -> str:
return "\n".join(
[
f"<document><content>{doc.page_content}</content><source>{doc.metadata['source']}</source></document>"
for doc in docs
]
)
@tool
def retrieve_from_blog(query: str) -> str:
"""
Search and return blogs on langgraph multiturn and parallel node execution. Run this function if you need to search for langgraph.
"""
docs = retriever.invoke(query)
formatted_docs = format_docs(docs)
return formatted_docs
tools = [retrieve_from_blog]
2. 상태 정의
그래프를 정의하기 전에, 노드 간 공유할 State를 정의하자. 여기서는 message list만을 상태로 가져간다.
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
3. Node, Edge 정의
먼저 Node를 정의하자. 위 그림대로라면 구현이 필요한 node는 총 4개가 있다.
- Agent Node: 바인딩된 tool을 가진 노드
- Retriever Node (Tool Node): LLM의 tool_calls를 받아, tool을 실행하는 노드
- Rewrite Node: 사용자 query를 재작성하는 노드
- Generate Node: 검색된 문서 context와 사용자 query를 입력받아 답변을 생성하는 노드
차례대로 하나씩 구현해보자.
from langchain import hub
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
# Agent Node: binding tool node
def agent(state: AgentState):
"""
현재 State를 기반으로 agent 호출. 도구 사용이 필요한지 결정
Args:
state: The current state
Returns:
dict: the updated state with the agent response appended to messages
"""
print("=== [CALL AGENT] ===")
model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools(tools)
messages = state["messages"]
response = model_with_tools.invoke(messages)
return {"messages": [response]}
# Retreive Node: tool을 실행하는 노드
retrieve = ToolNode(tools=tools)
# Rewrite Node: 사용자 query를 재작성하는 노드
def rewrite(state: AgentState):
print("=== [REWRITE QUERY] ===")
messages = state["messages"]
question = messages[0].content # 가장 처음 사용자 입력
msg = [
HumanMessage(
f""" \n
주어진 initial question을 보고 사용자의 의도 혹은 의미에 대해 추론해 보세요. 그리고, 더 나은 question으로 변환하고 변환한 question만 출력해주세요.
이때 변화한 question을 위해 영어로도 변환해보세요.
\n ------- \n
{question}
\n ------- \n
변경한 question: """
)
]
model = ChatOpenAI(model="gpt-4o")
response = model.invoke(msg)
return {"messages": [response]}
# Generate Node: 주어진 context와 사용자 query를 입력 받아 답변을 생성하는 노드
def generate(state: AgentState):
print("=== [GENERATE] ===")
messages = state["messages"]
question = messages[0].content
docs = messages[-1].content
template = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
When answering, be sure to credit which retrieved context you used. Answer in Korean.
Question: {question}
Context: {context}
Answer:"""
prompt = ChatPromptTemplate([("human", template)])
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | StrOutputParser()
response = chain.invoke({"question": question, "context": docs})
return {"messages": [response]}
이번에는 conditional edge에 필요한 함수를 정의하자.
tools_condition
: tool 사용이 필요한지 여부 판단grade_documents
: user query와 retrieved documents간 연관성 확인
tools_condition은 state의 messages 내 가장 최근 message를 가져와 tool_calls가 존재하는지 여부로 분기하는 함수로 langgraph에 사전 정의된 tools_condition
함수를 사용하자. 따라서 아래에서는 grade_document
만 구현하면 된다!
이때, LLM이 판단하도록 하기 위해 pydantic의 BaseModel과 Field를 사용해 llm의 output을 제한하자.
from langchain_core.prompts import PromptTemplate
from langgraph.prebuilt import tools_condition
from pydantic import BaseModel, Field
class grade(BaseModel):
"""Binary score for relevance check."""
binary_score: str = Field(description="Relevance score 'yes' or 'no'")
def relevance_check(state: AgentState):
"""
retrieved documents가 주어진 question과 연관성 있는지 결정하는 함수
Args:
state: The current state
Returns:
str: A decision for whether the documents are relevant or not
"""
print("=== [RELEVANCE CHECK] ===")
model = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_structured_output = model.with_structured_output(grade)
prompt = PromptTemplate(
template="""You are a grader assessing relevance of a retrieved document to a user question. \n
Here is the retrieved document: \n\n {context} \n\n
Here is the user question: {question} \n
If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.""",
input_variables=["context", "question"],
)
chain = prompt | llm_with_structured_output
# get context and question
messages = state["messages"]
question = messages[0].content
docs = messages[-1].content
score_result = chain.invoke({"question": question, "context": docs})
score = score_result.binary_score
if score == "yes":
print("=== [DECISION: RELEVANT] ===")
return "generate"
else:
print("=== [DECISION: NOT RELEVANT] ===")
return "rewrite"
4. Graph 정의
이제 모든 Node와 condition edge를 위한 분기 함수가 준비되었다. 의도한대로 잘 조립해서 그래프를 만들자.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
flow = StateGraph(AgentState)
# Node 추가
flow.add_node("agent", agent)
flow.add_node("retrieve", retrieve)
flow.add_node("rewrite", rewrite)
flow.add_node("generate", generate)
# Edge 연결
flow.add_edge(START, "agent")
flow.add_conditional_edges(
"agent",
tools_condition,
{"tools": "retrieve", END: END}
)
flow.add_conditional_edges(
"retrieve",
relevance_check,
{"generate": "generate", "rewrite": "rewrite"}
)
flow.add_edge("rewrite", "agent")
flow.add_edge("generate", END)
memory = MemorySaver()
graph = flow.compile(checkpointer=memory)
컴파일한 그래프를 시각화하면 깔끔하게 나오진 않지만, 의도한 흐름대로 그래프가 구성되었다!
의도한대로 잘 작동하는지 그래프를 실행해보자.
사용자 입력이 들어왔을 때 이 그래프는 크게 3가지 루트로 END에 도달할 수 있다.
- retrieve tool이 필요하지 않고, LLM의 사전 지식으로 대답할 수 있는 경우
- agent → END
- retrieve tool이 필요하고, 정보를 검색해서 대답하는 경우
- agent → retrieve → generate → END
- retrieve tool이 필요하고, 정보를 검색했으나 사용자 쿼리와 관련된 문서가 검색되지 않아 사용자 쿼리를 개선하는 경우
- agent → retrieve → rewrite → agent → retrieve → ... → generate
먼저 graph를 스트림하고, 각 노드에서 업데이트된 key, value 쌍을 출력하는 함수를 정의하자.
import pprint
def stream_graph(inputs, config, exclude_node=[]):
for output in graph.stream(inputs, config, stream_mode="updates"):
for k, v in output.items():
if k not in exclude_node:
pprint.pprint(f"Output from node '{k}':")
pprint.pprint("---")
pprint.pprint(v, indent=2, width=80, depth=None)
pprint.pprint("\n---\n")
# Case 1.
config = {"configurable": {"thread_id": "1"}}
inputs = {"messages": [("user", "안녕하세요, 제 이름은 Sean입니다!")]}
stream_graph(inputs, config)
=== [CALL AGENT] ===
"Output from node 'agent':"
'---'
{ 'messages': [ AIMessage(content='안녕하세요, Sean! 만나서 반갑습니다. 어떻게 도와드릴까요?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 67, 'total_tokens': 87, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'stop', 'logprobs': None}, id='run-4b166e9c-5292-4044-bc6f-8efb8891dbe2-0', usage_metadata={'input_tokens': 67, 'output_tokens': 20, 'total_tokens': 87, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'\n---\n'
# Case 2.
config = {"configurable": {"thread_id": "2"}}
inputs = {"messages": [("user", "Langgraph에서 fan-out과 fan-in이 뭐에요?")]}
stream_graph(inputs, config, exclude_node=["retrieve"])
=== [CALL AGENT] ===
"Output from node 'agent':"
'---'
{ 'messages': [ AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_SwmeoO6GSW6s2wD7h6wNiDQb', 'function': {'arguments': '{"query": "fan-out"}', 'name': 'retrieve_from_blog'}, 'type': 'function'}, {'id': 'call_SQ2vW40QSGCUcG20zFpiD6BM', 'function': {'arguments': '{"query": "fan-in"}', 'name': 'retrieve_from_blog'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 49, 'prompt_tokens': 71, 'total_tokens': 120, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-7adc1db5-084c-489c-9ed0-582289b17cad-0', tool_calls=[{'name': 'retrieve_from_blog', 'args': {'query': 'fan-out'}, 'id': 'call_SwmeoO6GSW6s2wD7h6wNiDQb', 'type': 'tool_call'}, {'name': 'retrieve_from_blog', 'args': {'query': 'fan-in'}, 'id': 'call_SQ2vW40QSGCUcG20zFpiD6BM', 'type': 'tool_call'}], usage_metadata={'input_tokens': 71, 'output_tokens': 49, 'total_tokens': 120, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'\n---\n'
=== [RELEVANCE CHECK] ===
=== [DECISION: RELEVANT] ===
'\n---\n'
=== [GENERATE] ===
"Output from node 'generate':"
'---'
{ 'messages': [ 'Langgraph에서 fan-out은 하나의 노드가 여러 다른 노드로 분기되는 것을 의미하며, fan-in은 '
'여러 노드가 하나의 노드로 집합되는 것을 의미합니다. 예를 들어, A 노드가 B, C, D로 분기되면 '
'fan-out이고, B, C, D가 E로 집합되면 fan-in입니다. 이 정보는 '
'[Tistory](https://sean-j.tistory.com/entry/LangGraph-Branches-for-parallel-node-execution)에서 '
'확인할 수 있습니다.']}
'\n---\n'
# Case 3.
config = {"configurable": {"thread_id": "6"}}
inputs = {"messages": [("user", "랭그래프 슈퍼스텝")]}
stream_graph(inputs, config, exclude_node=["retrieve"])
=== [CALL AGENT] ===
"Output from node 'agent':"
'---'
{ 'messages': [ AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_BoiVkbHMoDuR5I31zIdL5Wbg', 'function': {'arguments': '{"query":"랭그래프 슈퍼스텝"}', 'name': 'retrieve_from_blog'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 25, 'prompt_tokens': 68, 'total_tokens': 93, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-931d6390-f05e-4493-8be3-c0288ec89f09-0', tool_calls=[{'name': 'retrieve_from_blog', 'args': {'query': '랭그래프 슈퍼스텝'}, 'id': 'call_BoiVkbHMoDuR5I31zIdL5Wbg', 'type': 'tool_call'}], usage_metadata={'input_tokens': 68, 'output_tokens': 25, 'total_tokens': 93, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'\n---\n'
=== [RELEVANCE CHECK] ===
=== [DECISION: NOT RELEVANT] ===
'\n---\n'
=== [REWRITE QUERY] ===
"Output from node 'rewrite':"
'---'
{ 'messages': [ AIMessage(content='랭크그래프의 슈퍼스텝이란 무엇인가요?\nWhat is a superstep in RankGraph?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 88, 'total_tokens': 116, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'stop', 'logprobs': None}, id='run-3a75f78b-6cac-4a49-83c7-c99ded69f2ba-0', usage_metadata={'input_tokens': 88, 'output_tokens': 28, 'total_tokens': 116, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'\n---\n'
=== [CALL AGENT] ===
"Output from node 'agent':"
'---'
{ 'messages': [ AIMessage(content='랭크그래프(LangGraph)의 슈퍼스텝(superstep)은 그래프 상에서 노드의 병렬 처리를 지원하는 기능입니다. 이는 대화 히스토리 요약 및 서브그래프 생성 등 다양한 작업에서 사용되며, 여러 노드를 효과적으로 병렬 처리함으로써 성능을 향상시킵니다. 이는 특정 노드들이 독립적으로 실행될 수 있도록 하여, 복잡한 작업을 보다 빠르게 수행할 수 있도록 돕는다.\n\n랭크그래프와 관련된 좀 더 상세한 정보는 [여기](https://sean-j.tistory.com/entry/LangGraph-Branches-for-parallel-node-execution)를 참조할 수 있습니다.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 157, 'prompt_tokens': 1077, 'total_tokens': 1234, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'stop', 'logprobs': None}, id='run-9cdd0183-73fa-4a7d-9dd1-f08426dd5e87-0', usage_metadata={'input_tokens': 1077, 'output_tokens': 157, 'total_tokens': 1234, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'\n---\n'
그런데 3번 케이스에서 무한 루프에 빠질 수 있다는 것을 주의해야 한다. (agent → retrieve → rewrite → agent → retrieve → rewrite → agent → ...)
이 경우는 retrieve tool이 필요하고, 정보를 검색했으나 검색 대상인 vector store에 해당 정보가 들어있지 않은 경우 발생할 수 있다. 따라서, 위 그래프를 개선시켜 이러한 오류를 방지해야 한다. 이 글에서는 다루지 않지만 다음과 같은 방법으로 어느정도 해결할 수 있을 것 같다.
recursion_limit
인자로 최대 방문 노드를 제한하고 최대 한도에 도달한 경우에 대한 예외처리- vector store에 없는 정보이므로, Web 검색 tool 등을 추가로 LLM에게 바인딩
출처:
LangGraph. "Agentic RAG". https://langchain-ai.github.io/langgraph/tutorials/rag/langgraph_agentic_rag/
'LLM > LangGraph' 카테고리의 다른 글
[LangGraph] 요구사항 연속적으로 수집하기 (prompt generation) (0) | 2025.01.29 |
---|---|
[LangGraph] Adaptive RAG (1) | 2025.01.28 |
[LangGraph] Subgraph State(상태) (0) | 2025.01.02 |
[LangGraph] - Subgraph(서브그래프) 1 (0) | 2025.01.01 |
[LangGraph] 과거 대화 이력의 요약 (0) | 2025.01.01 |