타임트리

[LangChain] Tool Calls 스트리밍 하는 법 본문

LLM/LangChain

[LangChain] Tool Calls 스트리밍 하는 법

sean_j 2025. 1. 29. 20:18

LangGraph 글 중 [LangGraph] 요구사항 연속적으로 수집하기 (prompt generation) 를 FastAPI로 스트리밍 방식으로 서빙해보려다 문제가 발생해서 한참 헤맸다.

 

 

위 구조에서 information_gather 노드는 Pydantic model로 정의된 tool의 4가지 변수를 모두 채울 때까지 사용자에게 재질문하고, 모든 정보를 획득했다면 도구를 호출한다.

 

즉, 해당 노드는 일반적인 답변(content)을 뱉을 수도 있고, tool_calls를 반환할 수도 있는 상황이다. 먼저 일반적인 답변을 뱉는 상황을 살펴보자. (gather_chain은 이전 글 참조)

from langchain_core.messages import HumanMessage

messages = [HumanMessage(content='목표는 RAG, 변수는 user_input')]

response = []
async for chunk in gather_chain.astream({"messages": messages}):
    response.append(chunk.content)
    print(chunk.content, end="", flush=True)
출력에 대한 제약 조건이나 반드시 따라야 할 요구 사항이 있나요?

 

그런데 만약 tool 호출이 일어나는 상황이라면, 반환되는 chunk의 content가 비어있으므로 아래의 코드처럼 마치 invoke 메서드로 호출하듯 예외처리를 하면 될 것이라고 생각했다.

from langchain_core.messages import HumanMessage

messages = [HumanMessage(content='목표는 RAG, 변수는 user_input, 요구 사항이나 제약사항 없음')]

response = []
async for chunk in gather_chain.astream({"messages": messages}):
    if hasattr(chunk, 'tool_calls'):
        response.append(chunk)
    else:
        response.append(chunk.content)
    print(chunk.content, end="", flush=True)

response
[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_h1uxhv3VSOvVCHukFctoPE6q', 'function': {'arguments': '', 'name': 'PromptInstructions'}, 'type': 'function'}]}, response_metadata={}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b', tool_calls=[{'name': 'PromptInstructions', 'args': {}, 'id': 'call_h1uxhv3VSOvVCHukFctoPE6q', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'PromptInstructions', 'args': '', 'id': 'call_h1uxhv3VSOvVCHukFctoPE6q', 'index': 0, 'type': 'tool_call_chunk'}]),
 AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b', tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}], tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]),
 AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'objective', 'name': None}, 'type': None}]}, response_metadata={}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b', invalid_tool_calls=[{'name': None, 'args': 'objective', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'objective', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]),
 AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b', invalid_tool_calls=[{'name': None, 'args': '":"', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]),

...

 AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '[]}', 'name': None}, 'type': None}]}, response_metadata={}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b', invalid_tool_calls=[{'name': None, 'args': '[]}', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '[]}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]),
 AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_4691090a87'}, id='run-a3551328-45a9-42b1-8bc2-4af81f13791b')]

 

그런데 위 결과에서 보듯, tool_calls도 한 번에 반환되는 것이 아니라, tool_call_chunk를 포함한 AIMesageChunk 타입으로 반환된다.

tool_call_chunks

AIMessageChunk에는 되게 좋은 기능이 구현되어 있는데 바로 + 연산자로 두 개의 AIMessageChunk를 더하면, tool_call_chunks가 각 key값에 매칭되는 values끼리 오른쪽으로 concat된다.

 

만약 첫 번째 AIMessageChunk의 tool_call_chunks와 두 번째 AIMessageChunk의 tool_call_chunks가 각각 [{'name': 'Multiply', 'args': '', 'id': 'call_3aQwTP9CYlFxwOvQZPHDu6wL', 'index': 0}][{'name': None, 'args': '{"a"', 'id': None, 'index': 0}] 라면, 이 둘을 더한 결과는 다음과 같다.

[{'name': 'Multiply', 'args': '{"a"', 'id': 'call_AkL3dVeCjjiqvjv8ckLxL3gP', 'index': 0}]

 

따라서, 아래처럼 코드를 작성해서 일반적인 답변의 경우와 tool_calls인 경우를 분기처리할 수 있다.

response = []
tool_calls = None

first = True

async for chunk in gather_chain.astream({"messages": messages}):
    if chunk.tool_call_chunks:
        if first:
            tool_calls = chunk
            first = False
        else:
            tool_calls += chunk
    elif chunk.content:
        response.append(chunk.content)
    else:
        continue

if response:
    response = AIMessage(content="".join(response))

 

---

참고: LangChain. "How to stream tool calls". https://python.langchain.com/docs/how_to/tool_streaming/

'LLM > LangChain' 카테고리의 다른 글

[LangChain] RAG with Pinecone (LCEL)  (0) 2024.06.21
[LangChain] RAG with Pinecone  (0) 2024.06.17
[LangChain] Document Loaders  (0) 2024.06.14
[LangChain] AgentExecutor와 ReAct  (0) 2024.06.12