반응형

CreateTest

public class CreateTest {
    @Test
    public void createMono() {
        Mono<String> mono = Mono.just("Hello World");
        // Mono<String> mono = Mono.defer(() -> Mono.just("Hello World"));
        // Mono<String> mono = Mono.fromSupplier(() -> "Hello World"); // CheckedException를 try/catch로 묶어 처리해야함
        // Mono<String> mono = Mono.fromCallable(() -> "Hello World"); // CheckedException를 try/catch로 묶지 않고 그냥 사용 가능함
        // Mono<String> mono = Mono.create(sink -> sink.success("Hello World"));

        StepVerifier.create(mono)
            .expectNext("Hello World")
            .verifyComplete();
    }

    @Test
    public void createFlux() {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        // Flux<Integer> flux = Flux.range(1, 3);
        // Flux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3});
        // Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1, 2, 3));
        // Flux<Integer> flux = Flux.fromStream(Arrays.asList(1, 2, 3).stream());
        // Flux<Integer> flux = Flux.fromStream(() -> Arrays.asList(1, 2, 3).stream());
        // Flux<Integer> flux = Flux.create(sink -> {
        //     sink.next(1);
        //     sink.next(2);
        //     sink.next(3);
        //     sink.complete();
        // });

        StepVerifier.create(flux)
            .expectNext(1, 2, 3)
            .verifyComplete();
    }
}

FilterTest

public class FilterTest {
    @Test
    public void filter() {
        Flux<String> flux = Flux.just("Hello", "World")
            .filter(value -> value.startsWith("W"));

        StepVerifier.create(flux)
            .expectNext("World")
            .verifyComplete();
    }

    @Test
    public void first() {
        Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth").delaySubscription(Duration.ofMillis(100));
        Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

        Flux<String> firstFlux = Flux.firstWithValue(slowFlux, fastFlux);

        StepVerifier.create(firstFlux)
            .expectNext("hare")
            .expectNext("cheetah")
            .expectNext("squirrel")
            .verifyComplete();
    }

    @Test
    public void take() {
        Flux<Integer> flux = Flux.range(1, 5).take(2);

        StepVerifier.create(flux)
            .expectNext(1, 2)
            .verifyComplete();
    }

    @Test
    public void take2() {
        Flux<Integer> flux = Flux.range(1, 5)
            .delayElements(Duration.ofSeconds(1))
            .take(Duration.ofMillis(3500));

        StepVerifier.create(flux)
            .expectNext(1, 2, 3)
            .verifyComplete();
    }

    @Test
    public void skip() {
        Flux<Integer> flux = Flux.range(1, 5).skip(2);

        StepVerifier.create(flux)
            .expectNext(3, 4, 5)
            .verifyComplete();
    }

    @Test
    public void skip2() {
        Flux<Integer> flux = Flux.range(1, 5)
            .delayElements(Duration.ofSeconds(1))
            .skip(Duration.ofSeconds(3));

        StepVerifier.create(flux)
            .expectNext(3, 4, 5)
            .verifyComplete();
    }
}

TransformTest

public class TransformTest {
    @Test
    public void map() {
        Flux<String> flux = Flux.just("Hello", "World")
            .map(value -> value + "2");

        StepVerifier.create(flux)
            .expectNext("Hello2", "World2")
            .verifyComplete();
    }

    @Test
    public void flatMap() {
        Flux<String> flux = Flux.just("Hello", "World")
            .flatMap(value -> Mono.just(value + "2"));

        StepVerifier.create(flux)
            .expectNext("Hello2", "World2")
            .verifyComplete();
    }

    @Test
    public void distinct() {
        Flux<String> flux = Flux.just("dog", "cat", "bird", "dog").distinct();

        StepVerifier.create(flux)
            .expectNext("dog", "cat", "bird")
            .verifyComplete();
    }

    @Test
    public void buffer() {
        Flux<String> flux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
        Flux<List<String>> bufferedFlux = flux.buffer(3);

        StepVerifier.create(bufferedFlux)
            .expectNext(Arrays.asList("apple", "orange", "banana"))
            .expectNext(Arrays.asList("kiwi", "strawberry"))
            .verifyComplete();
    }

    @Test
    public void collectList() {
        Flux<String> flux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
        Mono<List<String>> mono = flux.collectList();

        StepVerifier.create(mono)
            .expectNext(Arrays.asList("apple", "orange", "banana", "kiwi", "strawberry"))
            .verifyComplete();
    }

    @Test
    public void collectMap() {
        Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
        Mono<Map<Character, String>> mono = flux.collectMap(s -> s.charAt(0));

        StepVerifier.create(mono)
            .expectNextMatches(map ->
                map.size() == 3
                    && map.get('a').equals("aardvark")
                    && map.get('e').equals("eagle")
                    && map.get('k').equals("kangaroo")
            )
            .verifyComplete();
    }

    @Test
    public void all() {
        Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
        Mono<Boolean> mono = flux.all(s -> s.contains("a"));

        StepVerifier.create(mono)
            .expectNext(true)
            .verifyComplete();
    }

    @Test
    public void any() {
        Flux<String> flux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
        Mono<Boolean> mono = flux.any(s -> s.contains("t"));

        StepVerifier.create(mono)
            .expectNext(true)
            .verifyComplete();
    }
}

CombineTest

public class CombineTest {
    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
        Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));

        // flux1, flux2를 동시에 구독&처리
        // flux1, flux2의 순서에 상관 없이 먼저 끝나는 데이터부터 병합하는 방식
        // Publisher를 동시에 실행하고, 그 결과 병합할 데이터 순서가 중요하지 않을 경우 유용
        // 가장 마지막에 끝나는 C 시간에 맞춰 총 처리시간은 대략 2100ms
        // Flux<String> flux3 = flux1.mergeWith(flux2);
        Flux<String> flux3 = Flux.merge(flux1, flux2);

        StepVerifier.create(flux3)
            .expectNext("X") // 300ms
            .expectNext("Y") // 600ms
            .expectNext("A") // 700ms
            .expectNext("Z") // 900ms
            .expectNext("B") // 1400ms
            .expectNext("C") // 2100ms
            .verifyComplete();
    }

    @Test
    public void mergeSequential() {
        Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
        Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));

        // flux1, flux2를 동시에 구독&처리
        // flux1의 마지막 데이터 이후에 flux2의 첫 데이터부터 병합하는 방식
        // Publisher를 동시에 실행하고, 그 결과 병합할 데이터는 순서가 보장되어야할 경우 유용
        // 가장 마지막에 끝나는 C 시간에 맞춰 총 처리시간은 대략 2100ms
        Flux<String> flux3 = Flux.mergeSequential(flux1, flux2);

        StepVerifier.create(flux3.log())
            .expectNext("A") // 700ms
            .expectNext("B") // 1400ms
            .expectNext("C") // 2100ms
            .expectNext("X") // 300ms
            .expectNext("Y") // 600ms
            .expectNext("Z") // 900ms
            .verifyComplete();
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
        Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));

        // flux1을 구독&처리한 이후에 flux2를 구독&처리
        // flux1을 처리한 이후 순서대로 flux2를 처리하여 합치는 방식
        // Publisher도 순서대로 실행되어야 하고, 그 결과 병합할 데이터도 순서가 보장되어야할 경우 유용
        // 가장 마지막에 끝나는 Z 시간에 맞춰 총 처리시간은 3000ms
        // Flux<String> flux3 = flux1.concatWith(flux2);
        Flux<String> flux3 = Flux.concat(flux1, flux2);

        StepVerifier.create(flux3.log())
            .expectNext("A") // 700ms
            .expectNext("B") // 1400ms
            .expectNext("C") // 2100ms
            .expectNext("X") // 2400ms
            .expectNext("Y") // 2700ms
            .expectNext("Z") // 3000ms
            .verifyComplete();
    }

    @Test
    public void zip() {
        Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
        Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));

        // flux1, flux2를 동시에 구독&처리
        // Flux<Tuple2<String, String>> flux3 = flux1.zipWith(flux2);
        Flux<Tuple2<String, String>> flux3 = Flux.zip(flux1, flux2);

        StepVerifier.create(flux3.map(tuple -> tuple.getT1() + tuple.getT2()).log())
            .expectNext("AX") // A : 700ms, X : 300ms -> 시간이 더 오래 걸린 A 시간에 맞춰 700ms
            .expectNext("BY") // B : 1400ms, X : 600ms -> 시간이 더 오래 걸린 B 시간에 맞춰 1400ms
            .expectNext("CZ") // C : 2100ms, Z : 900ms -> 시간이 더 오래 걸린 C 시간에 맞춰 2100ms
            .verifyComplete();
    }

    @Test
    public void zip2() {
        Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(700));
        Flux<String> flux2 = Flux.just("X", "Y", "Z").delayElements(Duration.ofMillis(300));

        Flux<String> flux3 = flux1.zipWith(flux2, (s1, s2) -> s1 + s2);

        StepVerifier.create(flux3)
            .expectNext("AX", "BY", "CZ")
            .verifyComplete();
    }
}

SchedulerTest

  • Schedulers.newSingle("TEST")
    • 새 스레드 생성하여 실행
  • Schedulers.immediate()
    • 현재 스레드로 실행
  • Schedulers.single()
    • 단일의 재사용 가능한 스레드를 활용하여 실행.
    • 호출시마다 새로운 스레드를 원하면 Schedulers.newSingle()을 사용해야함.
  • Schedulers.parallel()
    • CPU 코어 수 만큼 워커 스레드를 생성하여 실행(Fixed Thread Pool).
    • 생명주기가 짧은 태스크일 경우 사용.
  • Schedulers.elastic()
    • 필요한 만큼 새 스레드풀을 생성하고, 놀고 있는 스레드를 재사용하여 실행.
    • 너무 오래동안(기본값 60초) 놀고 있는 스레드풀은 폐기된다.
    • 생명주기가 긴 태스크일 경우 사용.
  • Schedulers.boundedElastic()
    • Schedulers.elastic()와 동일.
    • 스레드풀 개수가 어느정도 정해져있다는 점이 다르다.
    • 생명주기가 긴 태스크일 경우 사용.
  • Schedulers.fromExecutorService(Executors.newFixedThreadPool(10))
    • 지정된 Executor를 활용하여 실행
public class SchedulerTest {
    @Test
    public void test1() {
        Flux<Integer> flux = Flux.just(1, 2, 3)
            .publishOn(Schedulers.newSingle("PUB")) // onNext(), onComplete()를 지정된 스레드에서 실행
            .log()
            .map(value -> value * 2)
            .subscribeOn(Schedulers.newSingle("SUB")) // onSubscribe(), request()를 지정된 스레드에서 실행
            .log();

        StepVerifier.create(flux)
            .expectNext(2, 4, 6)
            .verifyComplete();
    }

    @Test
    public void test2() {
        Flux<Integer> flux = Flux.just(1, 2, 3)
            .map(value -> value * 2)
            .subscribeOn(Schedulers.single());

        StepVerifier.create(flux.log())
            .expectNext(2, 4, 6)
            .verifyComplete();
    }

    @Test
    public void test3() {
        ParallelFlux<Integer> flux = Flux.range(1, 3)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(value -> value * 2);

        StepVerifier.create(flux.log())
            .expectNextCount(3)
            .verifyComplete();
    }
}

ColdVsHotTest

  • Cold
    • subscribe 하지 않으면 아무 일도 일어나지 않는다.
    • subscribe 할 때마다 매번 독립적인 데이터를 생성하고 동작한다.
  • Hot
    • subscribe 하기 전에 데이터를 생성할 수 있다.
    • subscribe 하기 시작한 시점의 데이터부터 활용할 수 있다. (이전 데이터는 활용 불가)
public class ColdVsHotTest {
    @SneakyThrows
    @Test
    public void cold() {
        Flux<String> flux = Flux.just("A", "B", "C", "D")
            .delayElements(Duration.ofSeconds(1));

        flux.subscribe(value -> System.out.println("subscriber1 : " + value));

        Thread.sleep(2000);

        flux.subscribe(value -> System.out.println("subscriber2 : " + value));

        Thread.sleep(2000);

        /*
            subscriber1 : A
            subscriber1 : B
            subscriber2 : A
            subscriber1 : C
            subscriber2 : B
         */
    }

    @SneakyThrows
    @Test
    public void hot() {
        Flux<String> flux = Flux.just("A", "B", "C", "D")
            .delayElements(Duration.ofSeconds(1));

        ConnectableFlux<String> connectableFlux = flux.publish();
        connectableFlux.connect();

        connectableFlux.subscribe(value -> System.out.println("subscriber1 : " + value));

        Thread.sleep(2000);

        connectableFlux.subscribe(value -> System.out.println("subscriber2 : " + value));

        Thread.sleep(2000);

        /*
            subscriber1 : A
            subscriber1 : B
            subscriber2 : B
            subscriber1 : C
            subscriber2 : C
         */
    }

    @Test
    public void cold2() {
        Mono<String> name = Mono.create(sink -> sink.success(getName()));

        name.subscribe(System.out::println);
        name.subscribe(System.out::println);
        name.subscribe(System.out::println);

        /*
            getName
            john
            getName
            john
            getName
            john
         */
    }

    @Test
    public void hot2() {
        Mono<String> name = Mono.just(getName());

        name.subscribe(System.out::println);
        name.subscribe(System.out::println);
        name.subscribe(System.out::println);

        /*
            getName
            john
            john
            john
         */
    }

    @SneakyThrows
    private String getName() {
        System.out.println("getName");
        Thread.sleep(1000);
        return "john";
    }
}

참고

반응형

'Development > WebFlux' 카테고리의 다른 글

[WebFlux] Spring Data Reactive  (0) 2021.07.17
[WebFlux] WebClient  (0) 2021.07.17
[WebFlux] Test  (0) 2021.07.17
[WebFlux] Controller  (0) 2021.07.17
[WebFlux] Introduction  (0) 2021.07.13

+ Recent posts