반응형
CreateTest
public class CreateTest {
@Test
public void createMono() {
Mono<String> mono = Mono.just("Hello World");
// Mono<String> mono = Mono.defer(() -> Mono.just("Hello World"));
// Mono<String> mono = Mono.fromSupplier(() -> "Hello World"); // CheckedException를 try/catch로 묶어 처리해야함
// Mono<String> mono = Mono.fromCallable(() -> "Hello World"); // CheckedException를 try/catch로 묶지 않고 그냥 사용 가능함
// Mono<String> mono = Mono.create(sink -> sink.success("Hello World"));
StepVerifier.create(mono)
.expectNext("Hello World")
.verifyComplete();
}
@Test
public void createFlux() {
Flux<Integer> flux = Flux.just(1, 2, 3);
// Flux<Integer> flux = Flux.range(1, 3);
// Flux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3});
// Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1, 2, 3));
// Flux<Integer> flux = Flux.fromStream(Arrays.asList(1, 2, 3).stream());
// Flux<Integer> flux = Flux.fromStream(() -> Arrays.asList(1, 2, 3).stream());
// Flux<Integer> flux = Flux.create(sink -> {
// sink.next(1);
// sink.next(2);
// sink.next(3);
// sink.complete();
// });
StepVerifier.create(flux)
.expectNext(1, 2, 3)
.verifyComplete();
}
}
FilterTest
public class FilterTest {
@Test
public void filter() {
Flux<String> flux = Flux.just("Hello", "World")
.filter(value -> value.startsWith("W"));
StepVerifier.create(flux)
.expectNext("World")
.verifyComplete();
}
@Test
public void first() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth").delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.firstWithValue(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
@Test
public void take() {
Flux<Integer> flux = Flux.range(1, 5).take(2);
StepVerifier.create(flux)
.expectNext(1, 2)
.verifyComplete();
}
@Test
public void take2() {
Flux<Integer> flux = Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(flux)
.expectNext(1, 2, 3)
.verifyComplete();
}
@Test
public void skip() {
Flux<Integer> flux = Flux.range(1, 5).skip(2);
StepVerifier.create(flux)
.expectNext(3, 4, 5)
.verifyComplete();
}
@Test
public void skip2() {
Flux<Integer> flux = Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(3));
StepVerifier.create(flux)
.expectNext(3, 4, 5)
.verifyComplete();
}
}
TransformTest
public class TransformTest {
@Test
public void map() {
Flux<String> flux = Flux.just("Hello", "World")
.map(value -> value + "2");
StepVerifier.create(flux)
.expectNext("Hello2", "World2")
.verifyComplete();
}
@Test
public void flatMap() {
Flux<String> flux = Flux.just("Hello", "World")
.flatMap(value -> Mono.just(value + "2"));
StepVerifier.create(flux)
.expectNext("Hello2", "World2")
.verifyComplete();
}
@Test
public void distinct() {
Flux<String> flux = Flux.just("dog", "cat", "bird", "dog").distinct();
StepVerifier.create(flux)
.expectNext("dog", "cat", "bird")
.verifyComplete();
}
@Test
public void buffer() {
Flux<String> flux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = flux.buffer(3);
StepVerifier.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}
@Test
public void collectList() {
Flux<String> flux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> mono = flux.collectList();
StepVerifier.create(mono)
.expectNext(Arrays.asList("apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
@Test
public void collectMap() {
Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> mono = flux.collectMap(s -> s.charAt(0));
StepVerifier.create(mono)
.expectNextMatches(map ->
map.size() == 3
&& map.get('a').equals("aardvark")
&& map.get('e').equals("eagle")
&& map.get('k').equals("kangaroo")
)
.verifyComplete();
}
@Test
public void all() {
Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> mono = flux.all(s -> s.contains("a"));
StepVerifier.create(mono)
.expectNext(true)
.verifyComplete();
}
@Test
public void any() {
Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> mono = flux.any(s -> s.contains("t"));
StepVerifier.create(mono)
.expectNext(true)
.verifyComplete();
}
}
CombineTest
public class CombineTest {
@Test
public void merge() {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));
// flux1, flux2를 동시에 구독&처리
// flux1, flux2의 순서에 상관 없이 먼저 끝나는 데이터부터 병합하는 방식
// Publisher를 동시에 실행하고, 그 결과 병합할 데이터 순서가 중요하지 않을 경우 유용
// 가장 마지막에 끝나는 C 시간에 맞춰 총 처리시간은 대략 2100ms
// Flux<String> flux3 = flux1.mergeWith(flux2);
Flux<String> flux3 = Flux.merge(flux1, flux2);
StepVerifier.create(flux3)
.expectNext("X") // 300ms
.expectNext("Y") // 600ms
.expectNext("A") // 700ms
.expectNext("Z") // 900ms
.expectNext("B") // 1400ms
.expectNext("C") // 2100ms
.verifyComplete();
}
@Test
public void mergeSequential() {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));
// flux1, flux2를 동시에 구독&처리
// flux1의 마지막 데이터 이후에 flux2의 첫 데이터부터 병합하는 방식
// Publisher를 동시에 실행하고, 그 결과 병합할 데이터는 순서가 보장되어야할 경우 유용
// 가장 마지막에 끝나는 C 시간에 맞춰 총 처리시간은 대략 2100ms
Flux<String> flux3 = Flux.mergeSequential(flux1, flux2);
StepVerifier.create(flux3.log())
.expectNext("A") // 700ms
.expectNext("B") // 1400ms
.expectNext("C") // 2100ms
.expectNext("X") // 300ms
.expectNext("Y") // 600ms
.expectNext("Z") // 900ms
.verifyComplete();
}
@Test
public void concat() {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));
// flux1을 구독&처리한 이후에 flux2를 구독&처리
// flux1을 처리한 이후 순서대로 flux2를 처리하여 합치는 방식
// Publisher도 순서대로 실행되어야 하고, 그 결과 병합할 데이터도 순서가 보장되어야할 경우 유용
// 가장 마지막에 끝나는 Z 시간에 맞춰 총 처리시간은 3000ms
// Flux<String> flux3 = flux1.concatWith(flux2);
Flux<String> flux3 = Flux.concat(flux1, flux2);
StepVerifier.create(flux3.log())
.expectNext("A") // 700ms
.expectNext("B") // 1400ms
.expectNext("C") // 2100ms
.expectNext("X") // 2400ms
.expectNext("Y") // 2700ms
.expectNext("Z") // 3000ms
.verifyComplete();
}
@Test
public void zip() {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));
// flux1, flux2를 동시에 구독&처리
// Flux<Tuple2<String, String>> flux3 = flux1.zipWith(flux2);
Flux<Tuple2<String, String>> flux3 = Flux.zip(flux1, flux2);
StepVerifier.create(flux3.map(tuple -> tuple.getT1() + tuple.getT2()).log())
.expectNext("AX") // A : 700ms, X : 300ms -> 시간이 더 오래 걸린 A 시간에 맞춰 700ms
.expectNext("BY") // B : 1400ms, X : 600ms -> 시간이 더 오래 걸린 B 시간에 맞춰 1400ms
.expectNext("CZ") // C : 2100ms, Z : 900ms -> 시간이 더 오래 걸린 C 시간에 맞춰 2100ms
.verifyComplete();
}
@Test
public void zip2() {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));
Flux<String> flux3 = flux1.zipWith(flux2, (s1, s2) -> s1 + s2);
StepVerifier.create(flux3)
.expectNext("AX", "BY", "CZ")
.verifyComplete();
}
}
SchedulerTest
- Schedulers.newSingle("TEST")
- 새 스레드 생성하여 실행
- Schedulers.immediate()
- 현재 스레드로 실행
- Schedulers.single()
- 단일의 재사용 가능한 스레드를 활용하여 실행.
- 호출시마다 새로운 스레드를 원하면 Schedulers.newSingle()을 사용해야함.
- Schedulers.parallel()
- CPU 코어 수 만큼 워커 스레드를 생성하여 실행(Fixed Thread Pool).
- 생명주기가 짧은 태스크일 경우 사용.
- Schedulers.elastic()
- 필요한 만큼 새 스레드풀을 생성하고, 놀고 있는 스레드를 재사용하여 실행.
- 너무 오래동안(기본값 60초) 놀고 있는 스레드풀은 폐기된다.
- 생명주기가 긴 태스크일 경우 사용.
- Schedulers.boundedElastic()
- Schedulers.elastic()와 동일.
- 스레드풀 개수가 어느정도 정해져있다는 점이 다르다.
- 생명주기가 긴 태스크일 경우 사용.
- Schedulers.fromExecutorService(Executors.newFixedThreadPool(10))
- 지정된 Executor를 활용하여 실행
public class SchedulerTest {
@Test
public void test1() {
Flux<Integer> flux = Flux.just(1, 2, 3)
.publishOn(Schedulers.newSingle("PUB")) // onNext(), onComplete()를 지정된 스레드에서 실행
.log()
.map(value -> value * 2)
.subscribeOn(Schedulers.newSingle("SUB")) // onSubscribe(), request()를 지정된 스레드에서 실행
.log();
StepVerifier.create(flux)
.expectNext(2, 4, 6)
.verifyComplete();
}
@Test
public void test2() {
Flux<Integer> flux = Flux.just(1, 2, 3)
.map(value -> value * 2)
.subscribeOn(Schedulers.single());
StepVerifier.create(flux.log())
.expectNext(2, 4, 6)
.verifyComplete();
}
@Test
public void test3() {
ParallelFlux<Integer> flux = Flux.range(1, 3)
.parallel()
.runOn(Schedulers.parallel())
.map(value -> value * 2);
StepVerifier.create(flux.log())
.expectNextCount(3)
.verifyComplete();
}
}
ColdVsHotTest
- Cold
- subscribe 하지 않으면 아무 일도 일어나지 않는다.
- subscribe 할 때마다 매번 독립적인 데이터를 생성하고 동작한다.
- Hot
- subscribe 하기 전에 데이터를 생성할 수 있다.
- subscribe 하기 시작한 시점의 데이터부터 활용할 수 있다. (이전 데이터는 활용 불가)
public class ColdVsHotTest {
@SneakyThrows
@Test
public void cold() {
Flux<String> flux = Flux.just("A", "B", "C", "D")
.delayElements(Duration.ofSeconds(1));
flux.subscribe(value -> System.out.println("subscriber1 : " + value));
Thread.sleep(2000);
flux.subscribe(value -> System.out.println("subscriber2 : " + value));
Thread.sleep(2000);
/*
subscriber1 : A
subscriber1 : B
subscriber2 : A
subscriber1 : C
subscriber2 : B
*/
}
@SneakyThrows
@Test
public void hot() {
Flux<String> flux = Flux.just("A", "B", "C", "D")
.delayElements(Duration.ofSeconds(1));
ConnectableFlux<String> connectableFlux = flux.publish();
connectableFlux.connect();
connectableFlux.subscribe(value -> System.out.println("subscriber1 : " + value));
Thread.sleep(2000);
connectableFlux.subscribe(value -> System.out.println("subscriber2 : " + value));
Thread.sleep(2000);
/*
subscriber1 : A
subscriber1 : B
subscriber2 : B
subscriber1 : C
subscriber2 : C
*/
}
@Test
public void cold2() {
Mono<String> name = Mono.create(sink -> sink.success(getName()));
name.subscribe(System.out::println);
name.subscribe(System.out::println);
name.subscribe(System.out::println);
/*
getName
john
getName
john
getName
john
*/
}
@Test
public void hot2() {
Mono<String> name = Mono.just(getName());
name.subscribe(System.out::println);
name.subscribe(System.out::println);
name.subscribe(System.out::println);
/*
getName
john
john
john
*/
}
@SneakyThrows
private String getName() {
System.out.println("getName");
Thread.sleep(1000);
return "john";
}
}
참고
반응형
'Development > WebFlux' 카테고리의 다른 글
[WebFlux] Spring Data Reactive (0) | 2021.07.17 |
---|---|
[WebFlux] WebClient (0) | 2021.07.17 |
[WebFlux] Test (0) | 2021.07.17 |
[WebFlux] Controller (0) | 2021.07.17 |
[WebFlux] Introduction (0) | 2021.07.13 |