원재와 알아보는 LangGraph
서론
안녕하세요. 백엔드 개발자 허원재입니다.
이 글은 LangGraph 개발 경험이 없는 백엔드 개발자들을 대상으로 합니다.
LangChain
LangGraph를 알아보기 전에 먼저 LangChain에 대해서 알아야 합니다.
LangChain은 LLM을 편하게 사용할 수 있게 해주는 프레임워크입니다. LangChain을 통해 프롬프트를 구성하고, LLM을 호출하고, JSON등 원하는 구조로 파싱된 응답을 받을 수 있습니다.
LangGraph
LangGraph는 LangChain을 만든 팀에서 개발한 라이브러리입니다.
LangChain만을 이용해도 간단한 분기처리는 가능하지만 복잡한 분기 처리나 반복, LLM 호출 간의 상태 공유 등을 처리하기 어렵습니다.
LangGraph는 이를 그래프 구조와 모든 노드에서 공유하는 상태를 통해 해결합니다. 그래프 구조를 도입함으로써 복잡한 분기 처리나 반복을 쉽게 구현할 수 있게 되고, 모든 노드에서 공유하는 상태를 두어 노드 간에 데이터를 공유하고 누적시킬 수 있습니다.
LangGraph의 구성 요소는 다음과 같습니다.
state: 그래프 전체에서 공유하는 데이터 구조입니다.
node: 실제로 로직을 수행하는 곳입니다. 노드들은 현재 상태를 입력으로 받고 로직을 수행한 후 변경된 상태를 반환합니다.
edge: 노드 간의 연결을 나타냅니다. 조건부 엣지를 통해 분기 처리를 할 수 있습니다.
펫닥터 예시
전체 요청 흐름
AI 요청의 흐름은 그림과 같습니다.
클라이언트가 스프링 백엔드로 요청을 보냅니다.
스프링 백엔드가 필요한 정보(반려동물 정보, 영양 분석 정보 등)을 포함해서 AI 서버로 전달합니다.
FastAPI로 구현된 AI 서버가 요청을 받아 LangGraph를 실행합니다.
LangGraph에서 LLM을 호출하며 응답을 생성합니다.
각 노드 실행이 끝나면 checkpointer가 현재 상태를 자동으로 MongoDB에 저장합니다.
생성된 응답은 SSE를 통해 스프링 백엔드를 거쳐서 클라이언트에게 실시간으로 전달됩니다
스프링 백엔드에서 대화 내역을 조회할 수 있도록 별도로 MongoDB에 저장한 후 요청이 종료됩니다.
LangGraph에는 checkpointer라는 기능이 있습니다. checkpointer는 노드의 실행이 완료될 때마다 상태를 자동으로 저장해줍니다. 서버 시작 시 checkpointer를 생성하고, 그래프에 연결합니다.
# server_config.py
# AsyncMongoDBSaver 초기화 (context manager로 관리)
async with AsyncMongoDBSaver.from_conn_string(
settings.MONGODB_URI, db_name=settings.MONGODB_DB_NAME
) as checkpointer:
# 전역 그래프 인스턴스 초기화
graph_builder.vet_chatbot_graph = graph_builder.create_vet_chatbot_graph(checkpointer)
logger.info("수의사 챗봇 그래프가 MongoDB 체크포인터와 함께 초기화되었습니다")
# graph_builder.py
def create_vet_chatbot_graph(checkpointer: BaseCheckpointSaver | None = None) -> CompiledStateGraph:
....
compiled = graph.compile(checkpointer=checkpointer)
....
# 전역 그래프 인스턴스 (FastAPI lifespan에서 초기화)
vet_chatbot_graph: CompiledStateGraph | None = None
이후 요청마다 thread_id를 전달해주면 자동으로 대화 상태를 복원해서 대화를 이어갑니다.
# doctor_chat_service.py
async def process_chat(self, request: DoctorChatRequestDTO) -> DoctorChatResponseDTO:
....
# LangGraph 설정 (세션 관리 + 스트리밍 모드)
config = {
"configurable": {
"thread_id": request.session_id,
"streaming": True,
}
}
# 실제로는 astream_events를 사용하지만, 설명의 간결함을 위해 ainvoke로 표현
result = await current_graph.ainvoke(initial_state, config=config)
....
그래프 구조
펫닥터에는 2가지의 챗봇(수의사, 영양사)이 존재합니다. 영양사 챗봇은 수의사 챗봇에 영양 관련 노드가 하나 추가된 형태입니다. 이 문서에서는 수의사 챗봇을 예시로 설명하겠습니다.
trim_messages: 너무 많은 대화 내역을 가지지 않도록 오래된 대화 내역부터 자르는 역할을 합니다. 이후 이미지/동영상이 존재하면 image_node, 존재하지 않는다면 detection_node로 분기합니다.
image_node: 이미지/동영상이 첨부된 경우 분석하고 결과를 저장합니다.
detection_node: 휴먼 메세지에서 증상/특이사항을 추출해서 상태에 저장합니다.
generate_response: 상태 값을 기반으로 적절한 응답을 생성합니다.
다음은 노드를 연결하고 그래프를 빌드하는 코드입니다.
def create_vet_chatbot_graph(checkpointer: BaseCheckpointSaver | None = None) -> CompiledStateGraph:
# 노드 인스턴스 생성
trim_node = TrimMessagesNode()
image_node = AnalyzeImageNode()
detect_node = DetectSymptomNode()
generate_node = GenerateResponseNode()
# 그래프 생성
graph = StateGraph(VetChatbotState)
# 노드 추가
graph.add_node("trim_messages", trim_node)
graph.add_node("image_node", image_node)
graph.add_node("detection_node", detect_node)
graph.add_node("generate_response", generate_node)
# 진입점: trim
graph.set_entry_point("trim_messages")
# trim → 조건부 라우팅 (미디어 유무)
graph.add_conditional_edges(
"trim_messages",
route_by_media_presence,
{
"image_node": "image_node",
"detection_node": "detection_node",
},
)
# image → detection
graph.add_edge("image_node", "detection_node")
# detection → generate
graph.add_edge("detection_node", "generate_response")
# generate → END
graph.add_edge("generate_response", END)
# 컴파일
compiled = graph.compile(checkpointer=checkpointer)
다음은 노드에서 사용하는 상태입니다.
class BaseAgentState(TypedDict):
"""기본 에이전트 상태"""
messages: Annotated[Sequence[BaseMessage], add_messages]
# 토큰 사용량 (매 턴 시작 시 0으로 초기화, 각 노드에서 누적)
tokens_used: int
class VetChatbotState(BaseAgentState):
"""수의사 챗봇 상태"""
# 펫 정보 (백엔드에서 전달)
dog_info: dict
# 분석 결과
symptoms: list[dict] # 현재 감지된 일시적 증상 목록 (최대 3개, SymptomInfo dict 형태)
special_notes: list[str] # 영구적 건강 특성 목록 (평생 가지고 가는 것들)
recommend_hospital: bool
# 미디어 링크 (이미지/동영상)
image_url: str | None
video_url: str | None
BaseAgentState에는 영양사 챗봇과 공유하는 값인 메세지와 토큰 사용량 등의 필드가 들어갑니다.
메세지 필드에는 SystemMessage + 대화 내역(HumanMessage + AIMessage)을 저장합니다.
SystemMessage: AI의 역할과 규칙 등을 정의하는 시스템 프롬프트입니다. ex) "당신은 반려동물 건강 상담 전문가 ‘펫닥터’입니다. ……”
HumanMessage: 사용자가 입력한 메세지입니다. ex) “갑자기 배탈이 났어”
AIMessage: AI의 응답 메세지입니다. ex) “걱정이 많으시겠어요. 강아지의 배탈은 ……”
메세지 필드에는 add_messages 리듀서가 적용되어 있습니다. 일반 필드는 상태값을 반환하면 덮어씌워지는 방식으로 동작합니다. 리듀서가 적용된 필드는 정해진 규칙에 맞게 값을 처리합니다. add_messages 리듀서는 새 메세지는 리스트에 추가하고, 기존에 존재하는 메세지인 경우에는 대체해서 저장합니다.
VetChatbotState에는 전달받은 펫에 대한 정보와 detection 노드에서 분석한 결과를 저장하는 필드, image 노드에서 분석할 이미지의 경로를 저장하는 필드가 존재합니다.
trim_messages
trim 노드는 메세지를 자르는 역할을 하는 노드입니다.
챗봇은 이전 대화 컨텍스트를 유지해야 하기 때문에 대화 내역을 전부 LLM으로 전송해야 합니다. 하지만 대화가 지속되면 휴먼 메세지와 AI 메세지가 계속 쌓이게 됩니다. 이를 처리해주지 않으면 토큰 사용량이 계속 늘어나게 됩니다.
이를 해결하기 위해서 trim 노드에서는 4000 토큰을 기준으로 메세지를 자르게 됩니다. 메세지의 토큰 사용량이 4000이 넘어가게 되면 가장 오래된 메세지부터 제거합니다. 4000 토큰이 넘지 않으면 노드 실행을 스킵합니다.
# trim_node.py
async def execute(self, state: dict, _config: RunnableConfig | None = None) -> dict:
"""
메시지 트리밍 실행 (전처리 단계)
- 토큰이 4000 이하면 그대로 통과
- 토큰이 4000 초과면 3500으로 트리밍
"""
try:
messages = state["messages"]
tokens = chat_model.get_num_tokens_from_messages(messages)
# 4000 토큰 이하면 트리밍 불필요
if tokens <= self.max_tokens:
return state # 빈 dict 반환 - state를 변경하지 않음
# 4000 초과 시 트리밍 실행
trimmed = trim_messages(
messages,
max_tokens=3500, # 안전 여유를 위해 3500으로 트리밍
strategy="last",
token_counter=chat_model,
include_system=True,
start_on="human",
)
# IMPORTANT: add_messages reducer를 위한 RemoveMessage 사용
# 1. 제거할 메시지들 식별 (trimmed에 없는 메시지들)
trimmed_ids = {msg.id for msg in trimmed if hasattr(msg, "id") and msg.id}
messages_to_remove = []
for msg in messages:
if hasattr(msg, "id") and msg.id:
if msg.id not in trimmed_ids:
messages_to_remove.append(RemoveMessage(id=msg.id))
# 2. RemoveMessage 반환
return {"messages": messages_to_remove}
except Exception as e:
logger.error(f"메시지 트리밍 실패: {e}", exc_info=True)
return state
다음은 자세한 작동 방식입니다.
4000토큰이 넘지 않으면 스킵합니다
4000 토큰이 넘으면 LangChain에서 제공하는 trim_messages 함수를 통해 살아남을 메세지를 선택합니다.
trimmed에 있는 id값과 상태의 messages에 있는 id를 비교합니다. trimmed에 존재하지 않는 id를 가진 메세지를 messages_to_remove에 저장합니다.
RemoveMessage를 이용해 삭제 대상 메세지를 삭제합니다. add_messages 리듀서가 자동으로 messages_to_remove에 포함된 메세지를 삭제해줍니다.
image_node
image 노드는 이미지/동영상을 분석하는 노드입니다.
image 노드는 구조화된 출력을 사용합니다. 구조화된 출력은 LLM을 호출할 때 이런 형식으로 출력하라고 명시해주는 역할을 합니다. docstring과 각 필드의 description 필드의 설명을 읽고 LLM이 어떻게 응답을 출력해야 하는지 판단해서 생성해줍니다. 구조화된 출력을 사용하면 LLM의 응답을 파이썬 객체로 바로 파싱해서 받을 수 있습니다.
# image_schemas.py
class ImageFinding(BaseModel):
"""개별 이상 징후"""
body_part: str = Field(description="발견된 부위 (눈, 귀, 피부, 발, 구강, 전신, 털)")
symptom: str = Field(description="증상 설명 (심각도 포함). 예: '눈이 빨간색으로 충혈됨, 경미한 증상으로 판단됨'")
class PetImageAnalysis(BaseModel):
"""반려동물 이미지 분석 결과 (단순화된 스키마)"""
findings: list[ImageFinding] = Field(
default_factory=list,
description="발견된 이상 징후 목록 (정상이면 빈 리스트)",
)
additional_photos: list[str] = Field(
default_factory=list,
description="추가 사진 제안 (부위와 이유 포함). 예: '귀 안쪽 - 털에 가려져 확인 어려움'",
)
summary_message: str = Field(description="사용자에게 보여줄 분석 결과 요약 메시지")
# image_node.py
async def _analyze_media(
self, media_urls: list[tuple[str, str]], state: VetChatbotState
) -> tuple[PetImageAnalysis | None, int]:
# include_raw=True로 원본 AIMessage도 함께 반환받아 토큰 추출 가능
structured_llm = chat_model.with_structured_output(PetImageAnalysis, method="json_mode", include_raw=True)
dog_info = state.get("dog_info", {})
system_prompt = self.prompt_provider.get_image_analysis_prompt(dog_info=dog_info)
# 사용자 메시지 추출 (질문 컨텍스트 포함)
user_message = self.extract_user_message(state)
# 멀티모달 콘텐츠 구성 (이미지/동영상 + 텍스트)
content: list[dict] = []
for url, media_type in media_urls:
content.append({"type": "image_url", "image_url": {"url": url}})
if user_message:
content.append({"type": "text", "text": f"사용자 질문: {user_message}"})
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=content),
]
response = await structured_llm.ainvoke(messages)
# 토큰 사용량 추출
tokens_used = extract_tokens_from_raw_response(response)
# 구조화 출력 추출
result = response.get("parsed") if isinstance(response, dict) else response
return result, tokens_used
출력을 받으면 그 분석 결과를 휴먼 메세지에 덧붙여서 저장합니다.
휴먼 메세지에 저장하는 이유는 LLM이 이미지 분석 결과를 계속해서 참조할 수 있게 하기 위함입니다. 상태에만 저장하게 되면 다음 요청이 들어오면 초기 상태가 입력되면서 컨텍스트를 잃어버리게 됩니다. 이를 방지하기 위해서 휴먼 메세지에 분석 결과를 덧붙여서 저장해서 LLM이 이후의 대화에서도 이미지를 분석한 결과를 참조할 수 있도록 합니다.
#image_node.py
async def execute(self, state: VetChatbotState, _config: RunnableConfig | None = None) -> VetChatbotState:
....
# 마지막 HumanMessage에 분석 결과 텍스트 추가 (이후 대화 맥락 유지용)
appended_msg = self._append_analysis_to_message(state, analysis, media_label)
if appended_msg:
return {
"messages": [appended_msg],
"tokens_used": accumulated_tokens,
}
return {"tokens_used": accumulated_tokens}
detection_node
detection 노드는 휴먼 메세지에서 증상/특이사항을 추출하는 노드입니다. 이전 노드인 이미지 노드가 실행된 경우 휴먼 메세지에 이미지 분석 결과가 포함되어 있습니다.
LLM을 호출할 때 dog_info 값도 시스템 프롬프트에 같이 넣어서 호출합니다. 그 이유는 이미 등록된 증상들을 다시 추출하지 않도록 제한하기 위해서입니다. 프롬프트에 해당 내용을 명시해서 LLM이 중복된 증상을 추출하지 않도록 제한합니다.
# vet_prompts.py
"""
....
## 이미 알고 있는 정보 (추출하지 말 것)
{existing_info}
- **이미 등록된 질병/증상/특이사항은 다시 추출하면 절대 안 됨**
....
"""
detection 노드 또한 구조화된 출력을 사용합니다. 해당 출력을 상태에 포함시켜 응답 생성에 사용하도록 합니다.
# detection_schemas.py
class SymptomInfo(BaseModel):
"""개별 증상 정보"""
symptom: str = Field(..., description="증상 설명")
occurred_at: str | None = Field(None, description="증상 발생일 (YYYY-MM-DD)")
class SymptomAnalysis(BaseModel):
"""증상 분석 결과 스키마 (다중 증상 지원)"""
symptoms: list[SymptomInfo] = Field(
default_factory=list,
description="현재 발견된 일시적 증상 목록 (최대 3개). 각 증상은 symptom(설명)과 occurred_at(발생일 YYYY-MM-DD) 포함",
)
special_notes: list[str] = Field(
default_factory=list,
description="영구적 건강 특성 목록 (평생 가지고 가는 것들: 선천성 슬개골 탈구, 만성 심장병, 알러지 등)",
)
recommend_hospital: bool = Field(
default=False,
description="병원 방문 권장 여부",
)
# detection_node.py
....
recommend_hospital = result.recommend_hospital
# 다중 증상 리스트를 dict 리스트로 변환 (최대 3개)
symptom_list = result.to_symptom_info_list()
symptoms_dict_list = [s.model_dump() for s in symptom_list]
logger.info(
f"증상 분석 완료 - 증상: {len(symptoms_dict_list)}개, "
f"특이사항: {len(result.special_notes)}개, 병원 추천: {recommend_hospital}, "
f"토큰: {tokens_used}"
)
return {
"symptoms": symptoms_dict_list,
"special_notes": result.special_notes,
"recommend_hospital": recommend_hospital,
"tokens_used": accumulated_tokens,
}
generate_response
generate_response 노드는 사용자에게 전달될 최종 응답을 생성하는 노드입니다.
이전 detection 노드에서 분석한 결과인 recommend_hospital의 정보를 시스템 메세지로 추가해서 LLM이 답변 생성에 참고할 수 있도록 합니다. 지금까지의 대화 기록은 messages를 넣고 LLM을 호출해서 응답을 생성합니다.
실제로는 astream_events를 사용하여 SSE로 토큰 단위 스트리밍을 하지만, 설명의 간결함을 위해 단순 호출 방식으로 표현했습니다.
# generate_response_node.py
....
messages = list(state["messages"])
# AI 분석 컨텍스트 구성 (이전 노드들의 분석 결과)
ai_context_lines = ["[AI 분석 컨텍스트 - 이전 노드에서 생성된 분석 결과입니다. 응답 생성 시 참고하세요.]"]
# 증상 분석 결과 (recommend_hospital)
recommend_hospital = state.get("recommend_hospital", False)
if recommend_hospital:
ai_context_lines.append("- 병원 방문 권장: 예 (응답에서 병원 방문을 권장하세요)")
else:
ai_context_lines.append("- 병원 방문 권장: 아니오 (집에서 관찰하고 필요시 병원 방문 언급)")
# SystemMessage로 AI 컨텍스트 추가 (사용자 메시지가 아님을 명시)
context_msg = SystemMessage(content="\\n".join(ai_context_lines))
messages.append(context_msg)
response = await chat_model.ainvoke(messages)
return {
"messages": [response],
"tokens_used": accumulated_tokens,
}
....
요약 기능 추가
대화를 요약해서 보여달라는 요구가 있었습니다.
처음에는 단순히 응답 노드 뒤에 요약을 생성하는 노드를 두어 응답 생성 후 그 내용으로 요약을 생성했습니다.
... → generate_response → summarize_node → END하지만 이 방법에는 여러가지 문제가 있었습니다.
응답 지연
사용자는 이미 AI 응답 메세지를 스트림을 통해 전달받았습니다. 하지만 이후에 요약이 끝나기 전까지는 처리가 계속되고 있기 때문에 요청이 끝나지 않은 상황입니다. 따라서 응답을 전달받았음에도 불구하고 추가 메세지를 입력하지 못하는 문제가 생겼습니다.
타임아웃
스프링 백엔드에서 다음 청크가 10초 이내로 오지 않으면 AI 서버 호출이 실패했다고 판단하는 로직이 있었습니다. 요약도 결국 LLM을 호출하기 때문에 거의 모든 경우에서 타임아웃에 걸렸습니다.
모델 최적화
채팅에는 gemini 3.0 flash 모델을 사용하고 있습니다. 단순히 질문&답변 한 세트를 받아 요약하는 기능에 쓰기에는 너무 과한 모델입니다.
이 문제들을 해결하기 위해서 요약 기능을 그래프에서 떼어내고 별도의 API로 분리했습니다.
# summary_service.py
async def generate_summary(self, request: SummaryRequestDTO) -> SummaryResponseDTO:
prompt = f"""다음 상담을 10~20자로 요약하세요. 핵심 내용만 추출하고 명사형으로 끝내세요.
사용자: {request.user_message[:100]}
AI: {request.ai_response[:200]}
요약:"""
response = await report_model.ainvoke([HumanMessage(content=prompt)])
tokens_used = extract_token_usage(response)
summary = response.content.strip().strip('"').strip("'")
logger.info(f"요약 생성 완료: {summary} (토큰: {tokens_used})")
return SummaryResponseDTO(conversation_summary=summary, tokens_used=tokens_used)
그리고 스프링 백엔드에서 채팅 요청이 마무리된 이후에 비동기로 호출하도록 코드를 수정했습니다.
private void saveCompleteAiResponse(
Long consultationId, AiChatResponse aiResponse, Long userId, SignalType signal, String userMessageText) {
aiMessageSaveRetryTemplate.execute(retryContext -> {
transactionService.saveAiMessageInSeparateTransaction(consultationId, aiResponse);
return null;
});
recordTokenUsage(userId, aiResponse);
// 비동기로 대화 요약 생성 (첫 대화인 경우에만 실제 실행)
summaryService.requestSummaryAsync(consultationId, userId, userMessageText, aiResponse.message());
}
요약이 채팅 API와 분리되어 비동기로 실행되기 때문에 스트리밍이 끝나면 바로 요청이 종료됩니다. 따라서 사용자는 응답을 받자마자 곧바로 다음 채팅을 보낼 수 있고 타임아웃도 발생하지 않습니다.
영양 분석 리포트 생성에 사용되던 더 저렴한 모델을 사용하여 비용을 절감할 수 있었습니다.
마무리
스프링 백엔드 개발자로서 파이썬과 LangGraph 모두 처음이었지만, 그래프라는 익숙한 구조 덕분에 전체 흐름을 파악하는 데 큰 어려움은 없었습니다. 각 노드가 하나의 책임만 가지고, 상태를 통해 데이터를 주고받는 구조는 스프링에서 서비스 레이어를 나누는 것과 크게 다르지 않았습니다.
다만 요약 기능을 구현하면서 느낀 점은, 모든 로직을 그래프 안에 넣으려 하면 안 된다는 것입니다. 그래프의 실행 흐름과 맞지 않는 기능은 과감히 분리하는 것이 오히려 전체 구조를 깔끔하게 유지하는 방법이었습니다.
LangGraph는 LLM 호출이 여러 단계로 나뉘고, 단계 간에 상태를 공유해야 하는 상황에서 특히 유용했습니다. 반대로 단순한 질문-응답 구조라면 LangChain만으로도 충분할 것 같습니다. 도입을 고려하고 계신 분들께 이 글이 도움이 되었으면 합니다.