如何在使用事件流体的同时从 webflux 中提取标头?

How to extract header from webflux while consuming the event stream body?

提问人:Kartik 提问时间:11/7/2023 更新时间:11/8/2023 访问量:57

问:

我正在使用 WebClient 来使用无限的事件流。这段代码在测试中似乎工作正常:

webClient.get()
    .uri("/events")
    .accept(MediaType.TEXT_EVENT_STREAM)
    .retrieve()
    .bodyToFlux(EventStreamBatch::class.java)
    .retry()
    .subscribe(
        { batches ->
            eventsConsumer.accept(batches.events.map { it.toInternalEvent() })
        },
        { error ->
            log.error(error)
        }
    )

现在,我还想从响应中提取一个标头,在使用事件时需要使用该标头。我试图更改为这样我就可以同时获得标题和正文。但是我很困惑如何使用正文流,同时获取标头值。bodyToFlux()toEntityFlux()

...
    .retrieve()
    .toEntityFlux(EventStreamBatch::class.java)
    .subscribe { entity: ResponseEntity<Flux<EventStreamBatch>> ->

        val header = entity.headers["my-header"]?.firstOrNull()

        // How to consume the `entity.body` which is of type Flux<EventStreamBatch> ?
        // Create another `.subscribe()` here? Nested subscribe() seems incorrect, there must be a better way.
    }
spring-boot kotlin 事件 spring-webflux spring-webclient

评论


答:

0赞 Kartik 11/8/2023 #1

不确定这是否是一个好的解决方案,但我最终创建了一对主体和接头的通量,使用:flatMapMany()

webClient.get()
    .uri("/events")
    .accept(MediaType.TEXT_EVENT_STREAM)
    .retrieve()
    .toEntityFlux(EventStreamBatch::class.java)
    .flatMapMany { entity ->
        val header = entity.headers["my-header"]?.firstOrNull()
        return@flatMapMany when (entity.body) {
            null -> Flux.empty<Pair<EventStreamBatch, String>>()
            else -> entity.body!!.map { Pair(it, header) }
        }
    }
    .subscribe { (batch, header) ->
        // have both the body and the header here now
    }