가장 많이 나오는 용어가 바로 Non-Blocking으로, Non-Blocking은 리액티브 프로그래밍의 핵심적인 특징이며, Reactor 역시 완전한 Non-Blocking 통신을 지원함
Reactor는 Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공
Mono[0|1]에서 0과 1의 의미는 0건 또는 1건의 데이터를 emit 할 수 있음을 의미함
Flux[N]에서 N의 의미는 여러 건의 데이터를 emit할 수 있음을 의미함
서비스들 간의 통신이 잦은 MSA(Microservice Architecture) 기반 애플리케이션들은 요청 스레드가 차단되는 Blocking 통신을 사용하기에는 무리가 있으므로, 기본적으로 Non-Blocking 통신을 완벽하게 지원하는 Reactor는 MSA 구조에 적합한 라이브러리라고 볼 수 있음
Backpressure란 Subscriber의 처리 속도가 Publihser의 emit 속도를 따라가지 못할 때 적절하게 제어하는 전략을 의미함
Publisher에서 끊임없이 들어오는 데이터를 emit하는 것과 달리 Subscriber의 처리 속도가 느리면 오버플로우가 발생하게 되고 급기야는 시스템이 다운될 수 있음
(1)은 Reactor Sequence의 시작점으로, (1)에서 Flux로 시작한다는 것은 Reactor Sequence가 여러 건의 데이터를 처리함을 의미함
(2) just() Operator는 원본 데이터 소스로부터(Original Data Source) 데이터를 emit하는 Publisher의 역할을 함
(3) map() Operator는 Publisher로부터 전달받은 데이터를 가공하는 Operator로, (3)에서는 just() Operator에서 emit 된 영문 문자열을 대문자로 변환하고 있음
Reactor에서는 map() Operator처럼 데이터를 가공 처리할 수 있는 수많은 Operator를 지원함
(4) publishOn() Operator는 Reactor Sequence에서 스레드 관리자 역할을 하는 Scheduler를 지정하는 Operator
(4)와 같이 publishOn() Operator에 Scheduler를 지정하면 publishOn()을 기준으로 Downstream의 실행 스레드가 Scheduler에서 지정한 유형의 스레드로 변경함
위 코드에서는 Reactor Sequence 상에서 두 개의 스레드가 실행됨
subscribe()는 파라미터로 총 세 개의 람다 표현식을 가지는데 (5)의 첫 번째 파라미터는 Publisher가 emit한 데이터를 전달받아서 처리하는 역할을 함
(6)의 두 번째 파라미터는 Reqctor Sequence 상에서 에러가 발생할 경우, 해당 에러를 전달받아서 처리하는 역할을 함
(7)의 세 번째 파라미터는 Reactor Sequence가 종료된 후 어떤 후 처리를 하는 역할을 함
출력 결과를 보면 두 개의 문자열이 map() Operator를 거쳐 대문자로 변환된 후, subscribe()의 첫 번째 람다 표현식으로 전달되어, 출력됨
“# onComplete” 문자열이 출력되었다는 것은 Reactor의 Sequence가 정상적으로 종료되었다는 의미함
subscribe()의 세 번째 파라미터인 람다 표현식은 Reactor Sequence가 정상적으로 종료되면 동작을 수행한다는 사실을 알 수 있음
Reactor Sequence에 Scheduler를 지정하면 main 스레드 이외에 별도의 스레드가 하나 더 생기고, Reactor에서 Scheduler로 지정한 스레드는 모두 데몬 스레드이기 때문에 주 스레드인 main 스레드가 종료되면 동시에 종료됨
main 스레드를 Thread.sleep(100L)을 통해 0.1초 정도 동작을 지연시키면 그 0.1초 사이에 Scheduler로 지정한 데몬 스레드를 통해 Reactor Sequence가 정상 동작을 하게 됨
마블 다이어그램(Marble Diagram)
마블 다이어그램이란?
마블(Marble)은 실제로 구슬이라는 뜻으로, 구슬 모양의 알록달록한 동그라미는 하나의 데이터를 의미하며, 다이어그램 상에서 시간의 흐름에 따라 변화하는 데이터의 흐름을 표현함
리액티브 프로그래밍에서는 어떤 Operator를 사용하느냐에 따라서 데이터의 흐름이 다양하게 변화할 수 있으며, 이러한 복잡한 데이터의 흐름을 마블 다이어그램을 통해 좀 더 쉽게 이해할 수 있음
Reactor에서 마블 다이어그램을 보는 제일 중요한 이유는 Reactor에서 제공하는 수많은 Operator의 내부 동작을 조금 더 쉽게 이해하고, 해당 Operator를 적절하게 사용하기 위함
마블 다이어그램(Marble Diagram)으로 Mono와 Flux 이해하기
Mono의 마블 다이어그램
마블 다이어그램에는 아래위로 두 개의 타임 라인이 있는데 모두 데이터가 흘러가는 시간의 흐름을 표현하고 있으며, 시간은 왼쪽에서 오른쪽으로 흘러가기 때문에 시간 상으로는 왼쪽이 빠른 시간을 의미함
(1)은 원본 Mono(Original Mono)에서 Sequence가 시작되는 것을 타임라인으로 표현한 것
(2)는 Mono의 Sequence에서 데이터가 emit 되는 것을 표현하고 있음
그림 상에서 구슬 모양의 데이터 하나가 emit되는 것을 확인할 수 있는데, 단순히 그냥 구슬 모양의 데이터 하나만 표시한 게 아니라 Mono는 0건 또는 1건의 데이터만 emit하는 Reactor 타입이기 때문에 마블 다이어그램에서 이를 표현하고 있는 것
(3)의 수직 막대 바는 Mono의 Sequence가 정상 종료됨을 의미함
(4)는 Mono에서 지원하는 어떤 Operator에서 입력으로 들어오는 구슬 모양의 데이터를 가공 처리되는 것을 표현하고 있음
(5)는 Operator에서 가공 처리된 데이터가 Downstream으로 전달될 때의 타임라인이며, 시간의 흐름은 위쪽에 있는 타임라인과 마찬가지로 왼쪽이 빠른 시간
만약 Mono에서 emit 된 데이터가 처리되는 과정에 에러가 발생한다면 (6)과 같이 ‘X’로 표시함
‘|’는 정상 종료, ‘X’는 에러로 인한 비정상 종료를 의미함
Flux의 마블 다이어그램
Flux의 마블 다이어그램은 앞에서 살펴본 Mono의 마블 다이어그램과 큰 차이가 없으나 (2)와 같이 emit되는 데이터가 Mono와는 달리 구슬이 하나가 아니라 여러 개의 구슬이 있음
Mono가 0 또는 1개의 데이터만 emit하는 것과는 달리 Flux는 여러 개(0 … N)의 데이터를 emit하는 Reactor 타입임을 표현하는 것
Operator의 마블 다이어그램 예시
map() Operator는 입력으로 들어오는 데이터를 개발자가 구현하는 동작대로 변환해서 Downstream으로 전달하는 역할을 함
색깔을 가진 동그라미 데이터를 동일한 색의 네모로 변환해서 Downstream으로 전달하고 있음
// map() Operator의 사용 예
import reactor.core.publisher.Flux;
public class MarbleDiagramExample {
public static void main(String[] args) {
Flux
.just("Green-Circle", "Orange-Circle", "Blue-Circle") // (1)
.map(figure -> figure.replace("Circle", "Rectangle")) // (2)
.subscribe(System.out::println); // (3)
}
}
(1)에서 세 개의 문자열(색깔을 가지는 동그라미를 문자열로 표현)을 emit함
(2)를 보면 map() Operator 내부에서 동그라미를 네모로 바꾸고 있음
map() Operator 내부에서 변환된 문자열을 (3)에서 출력함
스케줄러(Scheduler)
스케줄러란?
Scheduler는 한마디로 Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 스레드에서 동작하도록 별도의 스레드를 제공해준다고 할 수 있음
Reactor는 기본적으로 Non-Blocking 통신을 위한 비동기프로그래밍을 위해 탄생했기 때문에 여러 스레드를 손쉽게 관리해 주는 Scheduler는 Reactor에서 중요한 역할을 함
Java의 멀티스레딩 프로그래밍은 굉장히 복잡하데 Reactor의 Scheduler를 사용하면 멀티스레딩에서 발생할 수 있는 여러 문제점들에 대비하기 위한 복잡한 처리를 직접 할 필요가 없음
Reactor의 Scheduler는 복잡한 멀티스레딩 프로세스를 단순하게 해 줌
Scheduler 전용 Operator
Reactor에서는 Scheduler를 위한 별도의 Operator를 제공함
적절한 상황에 맞는 스레드를 추가로 생성하는 Operator
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* Scheduler를 추가하지 않을 경우
*/
@Slf4j
public class SchedulersExample01 {
public static void main(String[] args) {
Flux
.range(1, 10)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
range() Operator를 이용해 1부터 10개의 숫자를 emit한 후, emit 된 숫자 데이터 중에서 filter() Opertor를 이용해 짝수만 필터링 한 뒤, 필터링된 데이터를 map() Operator에서 2를 곱해서 Subscriber에게 전달하는 간단한 예제
Scheduler를 지정하지 않았기 때문에 실행 결과를 보면 main 스레드에서 실행이 되는 것을 확인할 수 있음
subscribeOn() Operator
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* subscribeOn() Operator를 이용해서 Scheduler를 추가할 경우
*/
@Slf4j
public class SchedulersExample02 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 10)
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) // (1)
.subscribeOn(Schedulers.boundedElastic()) // (2)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
subscribeOn()이라는 Operator를 추가해서 스레드를 하나 더 생성함
(2)와 같이 subscribeOn() Operator 내부에 Schedulers.boundedElastic() 같은 Scheduler를 지정하면 구독 직후에 실행되는 스레드가 main 스레드에서 Scheduler로 지정한 스레드로 바뀌게 됨
subscribeOn()의 경우, 구독 직후 실행되는 Operator 체인의 실행 스레드를 Scheduler에서 지정한 스레드로 변경함
doOnSubscribe() Operator는 구독 발생 직후에 트리거 되는 Operator로써 구독 직후에 실행되는 스레드와 동일한 스레드에서 실행됨
만약 구독 직후에 어떤 동작을 수행하고 싶다면 doOnSubscribe()에 로직을 작성하면 됨