提问人:Kartik 提问时间:11/7/2023 更新时间:11/8/2023 访问量:57
如何在使用事件流体的同时从 webflux 中提取标头?
How to extract header from webflux while consuming the event stream body?
问:
我正在使用 WebClient 来使用无限的事件流。这段代码在测试中似乎工作正常:
webClient.get()
.uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(EventStreamBatch::class.java)
.retry()
.subscribe(
{ batches ->
eventsConsumer.accept(batches.events.map { it.toInternalEvent() })
},
{ error ->
log.error(error)
}
)
现在,我还想从响应中提取一个标头,在使用事件时需要使用该标头。我试图更改为这样我就可以同时获得标题和正文。但是我很困惑如何使用正文流,同时获取标头值。bodyToFlux()
toEntityFlux()
...
.retrieve()
.toEntityFlux(EventStreamBatch::class.java)
.subscribe { entity: ResponseEntity<Flux<EventStreamBatch>> ->
val header = entity.headers["my-header"]?.firstOrNull()
// How to consume the `entity.body` which is of type Flux<EventStreamBatch> ?
// Create another `.subscribe()` here? Nested subscribe() seems incorrect, there must be a better way.
}
答:
0赞
Kartik
11/8/2023
#1
不确定这是否是一个好的解决方案,但我最终创建了一对主体和接头的通量,使用:flatMapMany()
webClient.get()
.uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.toEntityFlux(EventStreamBatch::class.java)
.flatMapMany { entity ->
val header = entity.headers["my-header"]?.firstOrNull()
return@flatMapMany when (entity.body) {
null -> Flux.empty<Pair<EventStreamBatch, String>>()
else -> entity.body!!.map { Pair(it, header) }
}
}
.subscribe { (batch, header) ->
// have both the body and the header here now
}
评论