반응형

RestClient를 활용한 예제

build.gradle.kts

dependencies {
    // for HttpInterface + RestClient
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.apache.httpcomponents.client5:httpclient5:5.4.1")
}

TodoClient

interface TodoClient {
    @GetExchange("/todos/{id}")
    fun getTodo(@PathVariable id: Long): Todo

    @PostExchange("/posts")
    fun savePosts(@RequestBody request: Post): Post

    data class Todo(
        val userId: Long,
        val id: Long,
        val title: String,
        val completed: Boolean,
    )

    data class Post(
        val title: String,
        val body: String,
        val userId: Long,
    )
}

TodoClientTest

@SpringBootTest
class TodoClientTest {
    @Autowired
    private lateinit var todoClient: TodoClient

    @Test
    fun test() {
        val result1 = todoClient.getTodo(1)
        val result2 = todoClient.savePosts(TodoClient.Post("title", "body", 1))

        assertEquals(1, result1.id)
        assertEquals("title", result2.title)
    }
}

TodoClientConfig

@Configuration
class TodoClientConfig {
    private val log = LoggerFactory.getLogger(this::class.java)

    @Bean
    fun todoClient(objectMapper: ObjectMapper): TodoClient {
        val client = RestClient.builder()
            .baseUrl("https://jsonplaceholder.typicode.com")
            .messageConverters {
                it[0] = MappingJackson2HttpMessageConverter(objectMapper)
            }
            .requestFactory(
                HttpComponentsClientHttpRequestFactory(
                    HttpClientBuilder.create()
                        .setDefaultRequestConfig(
                            RequestConfig.custom()
                                .setConnectionRequestTimeout(Timeout.ofSeconds(5))
                                .setResponseTimeout(Timeout.ofSeconds(30))
                                .build()
                        )
                        .setConnectionManager(
                            PoolingHttpClientConnectionManagerBuilder.create()
                                .setMaxConnPerRoute(30) // 동일한 호스트와의 연결 커넥션 수
                                .setMaxConnTotal(30)    // 전체 연결 커넥션 수
                                .build()
                        )
                        .build()
                )
            )
            .requestInterceptor { request, body, execution ->
                val response = execution.execute(request, body)
                val bodyBytes = response.body.use { it.readBytes() }

                log.info("requestBody : {}", String(body))
                log.info("responseBody : {}", String(bodyBytes))

                object : ClientHttpResponse by response {
                    override fun getBody() = bodyBytes.inputStream()
                }
            }
            .defaultStatusHandler(
                { it.is4xxClientError },
                { request, response -> log.warn("url: {}, status: {}", request.uri, response.statusCode) }
            )
            .build()

        return HttpServiceProxyFactory
            .builderFor(RestClientAdapter.create(client))
            .build()
            .createClient(TodoClient::class.java)
    }
}

WebClient를 활용한 예제

build.gradle.kts

dependencies {
    // for HttpInterface
    implementation("org.springframework.boot:spring-boot-starter-web")

    // for WebClient
    implementation("org.springframework:spring-webflux:6.2.0")
    implementation("io.projectreactor.netty:reactor-netty:1.2.0")
}

TodoClient

  • 위 예제와 동일

TodoClientTest

  • 위 예제와 동일

TodoClientConfig

@Configuration
class TodoClientConfig {
    private val log = LoggerFactory.getLogger(this::class.java)

    @Bean
    fun todoClient(objectMapper: ObjectMapper): TodoClient {
        val client = WebClient.builder()
            .baseUrl("https://jsonplaceholder.typicode.com")
            .clientConnector(
                ReactorClientHttpConnector(
                    HttpClient
                        .create(
                            ConnectionProvider.builder("custom-pool")
                                .maxConnections(200) // 최대 연결 수(동시 접속이 100건 초과될 경우 오류 발생. 충분히 큰 숫자로 설정 필요)
                                .pendingAcquireTimeout(Duration.ofSeconds(5)) // 연결 풀에서 대기 시간
                                .maxIdleTime(Duration.ofSeconds(30)) // 최대 유휴 시간
                                .build()
                        )
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 연결 타임아웃
                        .responseTimeout(Duration.ofSeconds(15)) // 응답 타임아웃
                        .keepAlive(true)
                )
            )
            .exchangeStrategies(
                ExchangeStrategies.builder()
                    .codecs {
                        it.defaultCodecs().jackson2JsonEncoder(Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON))
                        it.defaultCodecs().jackson2JsonDecoder(Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON))
                    }
                    .build()
            )
            .defaultStatusHandler(
                { status -> status.is4xxClientError },
                { response ->
                    log.warn("url: {}, status: {}", response.request().uri, response.statusCode())
                    Mono.empty()
                }
            )
            .build()

        return HttpServiceProxyFactory
            .builderFor(WebClientAdapter.create(client))
            .build()
            .createClient(TodoClient::class.java)
    }
}

WebClient 요청/응답 로깅하기 - Filter

WebClientLoggingFilter

class WebClientLoggingFilter : ExchangeFilterFunction {
    private val log = LoggerFactory.getLogger(this::class.java)

    override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
        var requestBody = ""
        var responseBody = ""

        return next
            .exchange(
                ClientRequest.from(request)
                    .body { outputMessage: ClientHttpRequest, context: BodyInserter.Context ->
                        request.body()
                            .insert(object : ClientHttpRequestDecorator(outputMessage) {
                                override fun writeWith(bodyPublisher: Publisher<out DataBuffer>): Mono<Void> {
                                    return super.writeWith(Mono.from(bodyPublisher).doOnNext { buffer ->
                                        requestBody += buffer.toString(Charsets.UTF_8)
                                    })
                                }
                            }, context)
                            .doOnEach {
                                log.info("[REQ] url: {}, method: {}, body: {}", request.url(), request.method(), requestBody)
                            }
                    }
                    .build()
            )
            .map { response ->
                response
                    .mutate().body { body ->
                        body
                            .doOnNext { buffer ->
                                responseBody += buffer.toString(Charsets.UTF_8)
                            }
                            .doOnComplete {
                                log.info("[RES] url: {}, method: {}, status: {}, body: {}", request.url(), request.method(), response.statusCode(), responseBody)
                            }
                    }
                    .build()
            }
    }
}

TodoClientConfig

val client = WebClient.builder()
    // ...
    .filter(WebClientLoggingFilter()) // 추가
    .build()

WebClient 요청/응답 로깅하기 - Handler

WebClientLoggingHandler

class WebClientLoggingHandler : ChannelDuplexHandler() {
    private val log = LoggerFactory.getLogger(this::class.java)

    private lateinit var request: DefaultHttpRequest
    private lateinit var response: DefaultHttpResponse

    private var requestBody = ""
    private var responseBody = ""

    override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
        if (msg is DefaultHttpRequest) {
            request = msg
        }

        if (msg is LastHttpContent) {
            requestBody = msg.content().toString(Charsets.UTF_8)

            val uri = URI(request.uri())
            val requestLog = HttpLog(
                url = uri.path,
                method = request.method().name(),
                status = null,
                headers = request.headers().entries().groupBy({ it.key }, { it.value }),
                params = UriComponentsBuilder.fromUri(uri).build().queryParams,
                body = requestBody,
            )

            log.info("### [REQUEST] {}", requestLog)
        }

        super.write(ctx, msg, promise)
    }

    override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
        if (msg is DefaultHttpResponse) {
            response = msg
        }

        if (msg is DefaultHttpContent) {
            responseBody += msg.content().toString(Charsets.UTF_8)
        }

        if (msg is LastHttpContent) {
            responseBody += msg.content().toString(Charsets.UTF_8)

            val uri = URI(request.uri())
            val responseLog = HttpLog(
                url = uri.path,
                method = request.method().name(),
                status = response.status().code(),
                headers = response.headers().entries().groupBy({ it.key }, { it.value }),
                params = null,
                body = responseBody,
            )

            log.info("### [RESPONSE] {}", responseLog)
        }

        super.channelRead(ctx, msg)
    }

    data class HttpLog(
        val url: String,
        val method: String,
        val status: Int?,
        val headers: Map<String, Collection<String>>?,
        val params: Map<String, Collection<String>>?,
        val body: String,
    )
}

TodoClientConfig

HttpClient
    .create()
    // ...
    .doOnRequest { request, connection -> connection.addHandlerFirst(WebClientLoggingHandler()) } // 추가

WebClient 요청/응답 로깅하기 - Handler + Context

AppContext

data class AppContext(
    val rootGuid: String,
)

AppContextHolder

object AppContextHolder {
    private val threadLocal = ThreadLocal<AppContext>()

    fun set(context: AppContext) {
        threadLocal.set(context)
    }

    fun get(): AppContext {
        return threadLocal.get()
    }

    fun clear() {
        threadLocal.remove()
    }

    fun asContextElement(context: AppContext = threadLocal.get()): ThreadContextElement<AppContext> {
        return threadLocal.asContextElement(context)
    }
}

AppContextFilter

@Component
class AppContextFilter : OncePerRequestFilter() {
    override fun doFilterInternal(request: HttpServletRequest, response: HttpServletResponse, filterChain: FilterChain) {
        try {
            AppContextHolder.set(AppContext(rootGuid = UUID.randomUUID().toString()))
            filterChain.doFilter(request, response)
        } finally {
            AppContextHolder.clear()
        }
    }
}

TaskExecutorConfig

@Configuration
class TaskExecutorConfig {
    @Bean
    fun webClientTaskExecutor(): TaskExecutor {
        val executor = ThreadPoolTaskExecutor()
        executor.setThreadNamePrefix("web-client-task-")
        executor.corePoolSize = Runtime.getRuntime().availableProcessors() * 2
        executor.maxPoolSize = Runtime.getRuntime().availableProcessors() * 2
        executor.setWaitForTasksToCompleteOnShutdown(true)
        executor.setTaskDecorator { runnable ->
            val context = AppContextHolder.get()
            Runnable {
                try {
                    AppContextHolder.set(context)
                    runnable.run()
                } finally {
                    AppContextHolder.clear()
                }
            }
        }
        executor.initialize()
        return executor
    }
}

WebClientLoggingHandler

override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
    val rootGuid = AppContextHolder.get().rootGuid

    // ...
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
    val rootGuid = AppContextHolder.get().rootGuid

    // ...
}

TodoClientConfig

HttpClient
    // ...
    .runOn(NioEventLoopGroup(0, webClientTaskExecutor))

WebClient + Coroutine 조합으로 병렬 요청하기

build.gradle.kts

dependencies {
    // for HttpInterface
    implementation("org.springframework.boot:spring-boot-starter-web")

    // for WebClient
    implementation("org.springframework:spring-webflux:6.2.0")
    implementation("io.projectreactor.netty:reactor-netty:1.2.0")
    
    // for Coroutine
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.9.0")
}

TodoClient

interface TodoClient {
    @GetExchange("/todos/{id}")
    suspend fun getTodo(@PathVariable id: Long): Todo // suspend 추가

    data class Todo(
        val userId: Long,
        val id: Long,
        val title: String,
        val completed: Boolean,
    )
}

TodoService

@Service
class TodoService(
    private val todoClient: TodoClient,
) {
    fun getTodos(): List<TodoClient.Todo> {
        return runBlocking {
                async {
                    val todo1 = async { todoClient.getTodo(1) }
                    val todo2 = async { todoClient.getTodo(2) }
                    val todo3 = async { todoClient.getTodo(3) }
                    listOf(
                        todo1.await(),
                        todo2.await(),
                        todo3.await(),
                    )
                }
                .await()
        }
    }

    fun getTodosV2(): List<TodoClient.Todo> {
        return runBlocking {
                async {
                    (1..3)
                        .map {
                            async { todoClient.getTodo(it.toLong()) }
                        }
                        .awaitAll()
                }
                .await()
        }
    }
}

TodoServiceTest

@SpringBootTest
class TodoServiceTest {
    @Autowired
    private lateinit var todoService: TodoService

    @Test
    fun test() {
        val todos1 = todoService.getTodos()
        val todos2 = todoService.getTodosV2()
        assertEquals(todos1, todos2)
    }
}

참고

반응형

'Development > Spring' 카테고리의 다른 글

[Spring] WebSocket  (1) 2024.12.09
[Spring] Partitioning  (2) 2024.09.13
[Spring] Circuit Breaker(with resilience4j)  (0) 2024.07.15
[Spring] Sharding  (0) 2024.07.12
[Spring] multi-module 프로젝트 구성하기(kotlin, gradle)  (0) 2024.07.07

+ Recent posts