개요
악보 생성 단계는 총 3단계로 나뉘어져있었습니다. 1) 음원 분리 2) 코드 추출 3) 악보 배치 단계로 이루어져있었는데요 이러한 단계가 완료될때마다 해당 음원을 요청한 사용자에게 악보 생성정도를 전파해줘야 했습니다.
특정 악보가 처음 요청 될 경우 N 개의 추론서버중 1개의 유효한 추론서버가 메세지를 가져와서 처리하면 되었기 때문에 Topic 기반 이벤트 전파 방식이 아닌 메세지 큐를 사용하고 Long Polling 방식으로 메세지를 하나의 서버에서 한번만 가져와서 처리할 수 있도록 하였습니다.
Redis 를 통한 API 서버에 이벤트 전달
1) 악보 생성 단계를 실시간으로 전파해야하고, 2) 동일한 음원에 대한 악보 생성 요청이 동시에 혹은 아주 짧은 시간의 간격을 두고 요청이 올 경우에 대해 모든 서버에 상태를 전파해줘야 했기에 Group 개념을 사용하는 Kafka 보다 Redis 가 이벤트 전달에 적합하다고 생각했습니다.
또한 Redis Publish/Subscribe 는 웹 소켓에 비해 추가적인 네트워크 통신 비용이 덜 들어가고 인메모리 방식이라 Read/Write 연산이 빠르기 때문에 데이터 전달 과정에서 장점이 있다고 생각했습니다.
악보 생성 요청 단계
state 0: 음원 분리 전
state 1: 음원 분리 완료
state 2: 코드 생성 완료
state 3: 악보 생성 완료
- 클라이언트는 서버와 SSE (Server sent event) 로 통신한다.
- API 서버는 악보 생성 요청을 통해 {video_id} 로 메세지를 생성하여 SQS 에 보낸다
- 이후 API 서버는 Redis 의 {video_id} 채널을 구독한다.
- 3단계까지 완료된 정보를 받은 이후 Unsubscribe 를 통해 채널에 대한 구독을 해지한다.
- 추론 Controller 서버는 SQS 에서 메세지를 받아온 이후 각 단계가 끝날때마다 Redis Publish 를 수행한다.
- API 서버는 Subscribe 한 {video_id} 토픽에 대한 데이터를 받고 SSE 채널을 통해 클라이언트에 상태를 전파한다.
추론 서버 (Controller) 서버 코드
메세지 Long polling 이후 악보 생성 요청 Progress
async startMessagePolling() {
while (true) {
// Long polling
const sqsMessage = await this.sqsService.receiveMessage();
if (sqsMessage.Messages === undefined) continue;
const message = sqsMessage.Messages[0];
const videoId = message.Body;
const receiptHandle = message.ReceiptHandle;
console.log('Start Polling', message, videoId, receiptHandle);
try {
await this.startCreateSheet({ videoId: videoId });
} catch (e) {
Logger.log('Error occur', e);
} finally {
await this.sqsService.deleteMessage(receiptHandle);
}
}
}
- SQS 에서 메세지를 가져오고 악보 생성 비즈니스 로직을 실행합니다.
- 예기치 못한 에러로 비즈니스 로직이 실패했을때 큐에서 메세지를 지워줍니다.
각 단계가 완료 될때마다 Redis Publish
async startCreateSheet(sheetDto: PostSheetDto): Promise<PostSheetResponseDto> {
const stageDoneHandler = async (message: CreateAISheetMessage) => await this.redisService.renderUserProgressBar(message, sheetDto.videoId);
const { videoId, accompanimentPath }: Separate = await this.getWav(sheetDto.videoId, stageDoneHandler);
const wavPath = convertPath(accompanimentPath);
const beatInfo: BeatInfo = await this.getBeatInfo(wavPath);
const chord: Chord = await this.getChord(wavPath, stageDoneHandler);
const sheet: Sheet = await this.getSheet(
videoId,
{
csvPath: convertPath(chord.csvPath),
midiPath: convertPath(chord.midiPath),
},
beatInfo,
stageDoneHandler,
);
const response: PostSheetResponseDto = {
success: true,
payload: sheet,
};
return response;
}
CheckAndSet 방식을 통한 Race Condition 방지
@Injectable()
export class RedisService {
constructor(
@Inject(PUBLISH_PROGRESS_CONNECTION) private connection: RedisClientType,
@Inject(SET_PROGRESS_CONNECTION) private setConnection: RedisClientType,
) {}
private async publishMessage(message: CreateAISheetMessage, channel: string) {
await this.connection.publish(channel, message.status.toString());
}
private async setProgressStatus(message: CreateAISheetMessage, channel: string) {
await this.setConnection.set(channel, message.status.toString());
}
private async checkAndSetProgressStatus(message: CreateAISheetMessage, channel: string) {
Logger.log('Check and set progress status: ', message.status);
// make it atomic prevent race condition
await this.setConnection.watch(channel);
const multi = this.setConnection.multi();
const status: number = Number(await this.setConnection.get(channel));
if (status >= message.status) {
return;
}
await multi.set(channel, message.status.toString()).exec();
}
async renderUserProgressBar(message: CreateAISheetMessage, channel: string) {
await this.checkAndSetProgressStatus(message, channel);
await this.publishMessage(message, channel);
}
}
- SQS 에서 추론서버 N 개가 메세지를 동시에 수신하고 처리할경우 악보처리 상태정보가 원치않게 변할 수 있습니다.
- ex) 추론서버 A, 추론서버 B 가 존재할때 A,B 가 동시에 요청을 받고 A가 2단계까지 처리할때 B가 1단계까지 처리하여 2단계가 1단계로 잘못 갱신되는 경우
- 이러한 경우를 방지하기 위해 트랜잭션을 통해 채널에 대한 락을 걸고 status 정보를 가져와서 만약 현재 정보보다 이전 단계가 더 높은 단계일 경우 이벤트를 publish 하지 않습니다.
참고
'프로젝트 > 소프트웨어 마에스트로' 카테고리의 다른 글
[소프트웨어 마에스트로] - 백엔드 아키텍처 설계 (0) | 2024.01.12 |
---|---|
[소프트웨어 마에스트로] - 프로젝트 개요 (1) | 2024.01.08 |