🌏 인트로
서비스를 개발하다 보면 생각보다 비동기적으로 처리해야 하는 작업들이 많아진다. LLM 호출한다던지, 외부 API 연동, S3에 저장한다던지 등등 외부와 함께 해야하는 작업이 이어질 때, 응답 지연이 길고, 순차적으로 처리하면 전체 서비스가 느려질 수 있다.
그래서 우리는 보통 “비동기 처리”로 이런 작업을 분리한다. 하지만 문제는, 비동기 처리 과정에서 데이터가 섞이거나 순서가 깨지는 상황이 생긴다는 점이다. 동시에 여러 이벤트가 발행될 때 Race Condition이 발생하는 것이다.
이 글에서는 Spring 환경에서 이벤트 기반 아키텍처를 사용하면서 응답 순서를 보장하기 위해 이벤트를 직렬로 처리하는 방식을 설계한 경험을 공유하고자 한다.
🌏 문제 상황: 비동기 이벤트 처리 시의 순서 불일치 + 스레드풀 제어의 불확실성
처음엔 Spring의 @Async와 @EventListener 조합으로 다음과 같은 구조를 만들었다:
@Async
@EventListener
fun handle(event: NodeCreatedEvent) {
callLLM() // 비동기 작업 1
fetchExternalInfo() // 비동기 작업 2
saveToDB() // 비동기 작업 3
postProcess()
}
| 시간 | 스레드 | 이벤트 | 상태 |
|---|---|---|---|
| t=0ms | Thread-1 | NodeCreatedEvent(1) | DB에서 node_id=1 읽음 |
| t=1ms | Thread-2 | NodeCreatedEvent(1) | 같은 row 다시 읽음 |
| t=50ms | Thread-1 | LLM 호출 끝나고 결과 A 저장 | |
| t=80ms | Thread-2 | 외부 API 호출 끝나고 결과 B 저장 (A를 덮어씀) |
결과적으로 데이터 무결성 문제와 예측할 수 없는 흐름이 생기기 시작했다.
이렇게 보면 간단하지만 실제로는 왜 덮어씌우기가 되는지 발견하기 위해 꽤 힘들었다 ㅎㅎ,,
데이터의 문제 외에도 더 치명적인 문제가 존재하는데 스레드 개수와 실행 시점이 예측 불가능하다는 것이다 ㅋㅎㅋㅎ
Spring의 @Async는 내부적으로 스레드풀(ThreadPoolTaskExecutor) 을 사용하지만, 명시적으로 설정하지 않으면 SimpleAsyncTaskExecutor가 기본값이다. 이 경우에는 스레드 수 제한이 없고, 호출마다 새로운 스레드를 생성합니다. (=커넥션 풀 고갈의 공포 ㄷㄷ)
🌏 해결 전략
✅ 해결 전략(1): DB Lock으로 막기
처음에는 단순하게 생각했다.
“같은 리소스를 동시에 수정하려고 하면 DB Lock을 걸면 되지 않을까?”
@Transactional
public void handleEvent(Event event) {
var data = repository.findByIdForUpdate(event.getTargetId()); // SELECT ... FOR UPDATE
data.process(event);
repository.save(data);
}
이렇게 하면 한 트랜잭션이 끝날 때까지 다른 트랜잭션이 접근하지 못하므로 일시적으로는 Race Condition이 사라진다. 하지만 이 방식에는 한계가 있다.
- 순서 보장 불가: 비동기로 발행된 이벤트가 1, 2, 3 순서여도 Lock이 걸리는 타이밍에 따라 2 → 1 → 3 순서로 처리될 수 있다.
- 성능 저하: Lock을 걸고 대기하는 트랜잭션이 많아질수록 DB 커넥션이 묶인다.
- 트랜잭션 유지 시간 증가: LLM 호출처럼 외부 연동이 포함된 작업은 수 초 단위로 트랜잭션이 열려 있는 상태가 되어버린다.
DB Lock은 정합성을 강제할 수는 있지만 순서까지 보장하지는 못한다.
우리 서비스에서는 앞선 이벤트에서 얻은 메타 데이터를 바탕으로 이후 이벤트에서 llm을 호출해야 하는 경우도 다수 있었기 때문에 이 방법으로는 몇몇 이벤트 처리에서 제대로 된 llm 응답을 받을 수 없다는 문제가 있었다.
✅ 해결 전략(2): 이벤트를 직렬 처리 (전용 스레드 1개)
이벤트를 발행하자마자 실행하지 않고, 하나의 큐(Queue)에 넣고, 전용 스레드 하나가 순서대로 꺼내 처리하자.
@Component
public class EventQueueProcessor {
private final BlockingQueue<Event> queue = new LinkedBlockingQueue<>();
private final Thread worker;
public EventQueueProcessor() {
this.worker = new Thread(this::processEvents);
this.worker.setName("event-processor-thread");
this.worker.start();
}
public void enqueue(Event event) {
queue.add(event);
}
private void processEvents() {
while (true) {
try {
Event event = queue.take(); // 큐에서 하나씩 꺼냄
handleEvent(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleEvent(Event event) {
// 실제 이벤트 처리 로직
System.out.println("Processing event: " + event);
}
}
이 구조에서, 이벤트는 모두 큐로 들어간다. 그리고 빈 내에서 초기화 시 지정된 단 하나의 스레드만 이 큐를 소비(consume) 한다. 따라서 같은 리소스에 대해 동시에 접근할 일이 없고, 처리 순서도 큐에 넣은 순서대로 유지된다.
덕분에 순서 보장 + Race Condition 제거를 동시에 달성할 수 있었다.
우리 서비스는 기본적으로 비동기에는 급한 일은 넣고 있지 않았고, 트래픽 자체가 스레드 2개 이상이 들러붙어 처리해야 할 만큼은 아니었기 때문에 이정도로 끝냈지만, 블로그 포스팅에서는 더 확장을 해보려고 한다.
🌏 확장: 다중 큐와 큐 Manager
✅ 서비스가 커진다면?
하지만 서비스가 커지고 이벤트 종류가 많아지면 큐가 하나라는 것이 오히려 병목이 될 수 있다.
예를 들어, LLM 결과 저장 이벤트, 이메일 발송 이벤트, 알림 전송 이벤트 이 세 가지가 한 큐에 들어가면, 알림 하나 보내려고 10초 걸리는 LLM 처리가 끝날 때까지 기다려야 한다.
그래서 큐를 여러 개로 분리하고, 각 큐를 중앙에서 관리하는 QueueManager 구조로 확장해 보았다.
public interface EventQueue {
void enqueue(Event event);
}
@Component
public class SimpleEventQueue implements EventQueue {
private final BlockingQueue<Event> queue = new LinkedBlockingQueue<>();
private final ExecutorService workerPool = Executors.newFixedThreadPool(4);
public SimpleEventQueue() {
for (int i = 0; i < 4; i++) {
workerPool.submit(this::process);
}
}
@Override
public void enqueue(Event event) {
queue.add(event);
}
private void process() {
while (true) {
try {
Event event = queue.take();
handle(event);
} catch (Exception e) { e.printStackTrace(); }
}
}
private void handle(Event event) {
// 이벤트별 처리 로직
}
}
@Component
public class QueueManager {
private final List<EventQueue> queues = new ArrayList<>();
public void register(EventQueue queue) {
queues.add(queue);
}
public EventQueue getQueue(int index) {
return queues.get(index);
}
}
LinkedBlockingQueue는 자체적으로 동기화(synchronized) 메커니즘이 내장된 Thread-safe 큐로, 여러 스레드가 동시에 put(), take(), add() 등을 호출해도 데이터 손상이나 race condition이 발생하지 않는다.
✅ 활용 예시
쉽게 상황 예시를 들어보겠당
사용자가 회원가입할 때 동시에 3가지 비동기 작업을 처리해야 한다.
- LLM을 이용한 환영 메시지 생성
- 이메일 발송
- 가입 기록 로깅
이 세 가지는 서로 관련은 있지만, 결과 순서를 보장해야 하는 작업도 있고, 서로 다른 큐에서 병렬로 처리 가능한 작업도 있다.
그럼 위의 코드를 이렇게 활용할 수 있다.
// 1. 이벤트 정의
public class UserCreatedEvent {
public final String userId;
public UserCreatedEvent(String userId) { this.userId = userId; }
}
// 2. 이벤트 처리기 (핸들러)
@Component
public class UserCreatedEventHandler {
private final QueueManager queueManager;
public UserCreatedEventHandler(QueueManager queueManager) {
this.queueManager = queueManager;
}
public void handle(UserCreatedEvent event) {
// 서로 다른 큐에 분배
queueManager.getQueue(0).enqueue(() -> sendWelcomeEmail(event));
queueManager.getQueue(1).enqueue(() -> generateWelcomeMessage(event));
queueManager.getQueue(2).enqueue(() -> saveUserLog(event));
}
private void sendWelcomeEmail(UserCreatedEvent event) {
System.out.println("📧 이메일 발송 중... (" + event.userId + ")");
sleep(1000);
System.out.println("✅ 이메일 발송 완료 (" + event.userId + ")");
}
private void generateWelcomeMessage(UserCreatedEvent event) {
System.out.println("🤖 LLM 환영 메시지 생성 중... (" + event.userId + ")");
sleep(2000);
System.out.println("✅ 메시지 생성 완료 (" + event.userId + ")");
}
private void saveUserLog(UserCreatedEvent event) {
System.out.println("📝 가입 로그 저장 중... (" + event.userId + ")");
sleep(500);
System.out.println("✅ 로그 저장 완료 (" + event.userId + ")");
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
}
// 3. 큐 매니저 구성 (여기가 중요!!!!)
@Component
public class QueueManager {
// 리스트가 아니라 이메일, LLM, 로그용 이런 이름의 Map으로 관리할 수도 있겠쥬?
private final List<SimpleEventQueue> queues = new ArrayList<>();
public QueueManager() {
// 큐 3개 생성 (이메일, LLM, 로그용)
for (int i = 0; i < 3; i++) {
queues.add(new SimpleEventQueue()); // 여기가 중요!!!!!
}
}
public SimpleEventQueue getQueue(int index) {
return queues.get(index);
}
}
// 4. 큐 자체
public class SimpleEventQueue {
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private final ExecutorService workerPool = Executors.newFixedThreadPool(2);
public SimpleEventQueue() {
for (int i = 0; i < 2; i++) { // 일하는 스레드 수가 2개라는 뜻 (설정 가능)
workerPool.submit(this::process);
}
}
public void enqueue(Runnable task) {
queue.add(task);
}
private void process() {
while (true) {
try {
Runnable task = queue.take();
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
처리 관련 시각화를 해보면???
사용자: "회원가입 완료"
│
▼
UserCreatedEvent 발행
│
▼
┌──────────────────────────────┐
│ QueueManager │
└──────────────────────────────┘
│ │ │
▼ ▼ ▼
[Queue0] [Queue1] [Queue2]
이메일 LLM생성 로그저장
│ │ │
▼ ▼ ▼
📧▶✅ 🤖▶✅ 📝▶✅
모두 병렬로 처리되지만, 각 큐 내부에서는 직렬로 순서가 유지된다!
✅ 메세징 큐
이 구조는 사실 Spring 내부에 작은 메시징 시스템을 구현한 것이다.
각 큐는 토픽처럼 독립적으로 동작하며, QueueManager는 브로커 역할을 한다.
나중에 Kafka나 RabbitMQ로 교체할 때도, 이 구조를 그대로 확장할 수 있다.
https://engineerinsight.tistory.com/435
[아키텍처/Kafka] 비동기 메시징 시스템: 개념 + Consumer Group을 통한 병렬 처리의 원리를 간단히 알
🌏 비동기 메시징 시스템이란?✅ 먼저, 비동기(Asynchronous)란?동기: 요청을 보내고 응답을 받을 때까지 기다림비동기: 요청을 보내고 응답을 기다리지 않고 다른 일 진행메시징 시스템은 일단 메
engineerinsight.tistory.com


도움이 되었다면, 공감/댓글을 달아주면 깃짱에게 큰 힘이 됩니다!🌟
비밀댓글과 메일을 통해 오는 개인적인 질문은 받지 않고 있습니다. 꼭 공개댓글로 남겨주세요!
'PROJECT > AIGOYA LABS' 카테고리의 다른 글
| [AIGOYA LABS] LLM 코드 생성 파이프라인 설계: 중복 코드 방지를 위한 RAG, 코드 품질을 위한 LLM Evaluation (0) | 2025.10.21 |
|---|---|
| [AIGOYA LABS] 비동기 이벤트에서 데이터가 덮어씌워진다고요?: 비동기 이벤트를 직렬화하기 (5) | 2025.05.27 |
| [AIGOYA LABS] 트랜잭션 내에서 외부 API 호출을 하겠다고요?!!! (9) | 2025.04.21 |