SSE를 이용한 간단한 채팅 프로그램의 원리 알아보기
MongoDB(NoSQL)를 이용한 이유
User와 Board 테이블이 있다고 했을때, RDBMS는 Board 테이블에서 한가지를 select하려면, 글을 작성한 User가 있기 때문에 두 가지의 테이블에 모두 접근해야한다.
NoSQL은 컬렉션(List) 형식이기 때문에 중복이 허용되어 한 가지 테이블에만 접근하면 되기 떄문에 데이터를 찾을 때 접근성이 좋다. 하지만 한가지의 데이터를 수정하려면 다른 곳에 있는 데이터까지 수정해야하기 때문에 일관성을 유지하기 힘들다는 단점이 있다.
Chat 도큐먼트와 레포지토리
@Data
@Document(collection = "chat")
public class Chat {
@Id
private String id;
private String msg;
private String sender;
private String receiver;
private Integer roomNum;
private LocalDateTime createdAt;
}
@Document(collection = “chat”)
Spring Data MongoDB의 어노테이션으로, 이 클래스가 MongoDB의 도큐먼트를 나타낸다는 것을 명시한다. collection = "chat"은 이 도큐먼트가 MongoDB의 chat 컬렉션에 저장된다는 것을 의미한다.
@Id
Spring Data MongoDB의 어노테이션으로, 해당 필드가 도큐먼트의 Primary Key임을 나타낸다.
@Repository
public interface ChatRepository extends ReactiveMongoRepository<Chat, String> {
@Tailable
@Query("{sender:?0, receiver:?1}")
Flux<Chat> mFindBySender(String sender, String receiver);
@Tailable
@Query("{roomNum:?0}")
Flux<Chat> mFindByRoomNum(Integer roomNum);
}
@Tailable
Spring Data MongoDB의 어노테이션으로, MongoDB의 tailable cursor 기능을 사용하여 커서를 닫지 않고 계속 유지한다. 즉, 새로운 데이터가 추가될 때마다 계속해서 데이터를 스트리밍할 수 있다. 예를 들어, 위의 Repository에서 @Tailable 어노테이션을 통하여 ssar라는 쿼리를 요청을 하면, sender가 ssar인 데이터는 Controller로 계속 전달된다.
@Query
MongoDB 쿼리를 정의할 수 있는 Spring Data MongoDB의 어노테이션이다. 이 어노테이션을 통해 커스터마이즈된 쿼리를 작성할 수 있다.
ReactiveMongoRepository<Chat, String>
Spring Data MongoDB의 인터페이스로, Reactive 프로그래밍 모델을 사용하여 MongoDB와의 비동기적인 CRUD 연산을 제공한다.
mFindBySender(String sender, String receiver)
sender와 receiver 필드를 기준으로 채팅 메시지를 찾는다. @Tailable 어어노테이션을 통해 tailable cursor를 사용하여 실시간으로 메시지를 스트리밍한다.
mFindByRoomNum(Integer roomNum)
roomNum 필드를 기준으로 채팅 메시지를 찾는다. @Tailable 어노테이션을 통해 tailable cursor를 사용하여 실시간으로 메시지를 스트리밍한다.
여기서 중요한 것은, 만약 http프로토콜을 사용하게 된다면 Flux를 통해 흘러들어오는 데이터들이 동시에 한번에 처리된 뒤 응답이 끊어지기 때문에 사용할 수 없다. 따라서 SSE프로토콜을 사용한다. SSE프로토콜은 클라이언트가 서버로 요청을 하게 되면, 데이터를 응답을 한 뒤 요청이 끊어지지만 응답은 유지하면서 지속적으로 데이터를 흘러보낼 수 있다.
ChatController
@RequiredArgsConstructor
@RestController
public class ChatController {
private final ChatRepository chatRepository;
@CrossOrigin
@GetMapping(value = "/sender/{sender}/receiver/{receiver}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Chat> getMsg(@PathVariable String sender, @PathVariable String receiver){
return chatRepository.mFindBySender(sender, receiver)
.subscribeOn(Schedulers.boundedElastic());
}
@CrossOrigin
@PostMapping("/chat")
public Mono<Chat> setMsg(@RequestBody Chat chat){
chat.setCreatedAt(LocalDateTime.now());
return chatRepository.save(chat);
}
@CrossOrigin
@GetMapping(value = "/chat/roomNum/{roomNum}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Chat> findByRoomNum(@PathVariable Integer roomNum){
return chatRepository.mFindByRoomNum(roomNum)
.subscribeOn(Schedulers.boundedElastic());
}
}
getMsg(String sender, String receiver)
@CrossOrigin
- CORS(Cross-Origin Resource Sharing) 설정을 통해 다른 도메인에서의 요청을 허용한다.
@GetMapping
- value 속성으로 엔드포인트 URL을 정의하고, produces 속성으로 응답 형식을 지정한다. 여기서는
MediaType.TEXT_EVENT_STREAM_VALUE로 SSE(Server-Sent Events)를 사용하여 실시간 데이터 스트리밍을 설정한다.
메서드 내용
-
chatRepository.mFindBySender(sender, receiver)를 호출하여 특정 발신자와 수신자 간의 메시지를 실시간 스트리밍한다.
-
subscribeOn(Schedulers.boundedElastic())를 사용하여 작업을 별도의 스레드 풀에서 비동기로 처리한다.
setMsg(Chat chat)
@PostMapping
- value 속성으로 엔드포인트 URL을 정의한다.
메서드 내용
-
요청 본문으로 받은 Chat 객체에 현재 시간을 설정한다.
-
chatRepository.save(chat)를 호출하여 새로운 채팅 메시지를 데이터베이스에 저장한다.
findByRoomNum(Integer roomNum)
@GetMapping
- value 속성으로 엔드포인트 URL을 정의하고, produces 속성으로 응답 형식을 지정한다. 여기서도 마찬가지로
MediaType.TEXT_EVENT_STREAM_VALUE로 SSE를 사용한다.
메서드 내용
-
chatRepository.mFindByRoomNum(roomNum)를 호출하여 특정 채팅 방의 메시지를 실시간 스트리밍한다.
-
subscribeOn(Schedulers.boundedElastic())를 사용하여 작업을 별도의 스레드 풀에서 비동기로 처리한다.
아래는 이 프로그램의 결과물이다
