提问人:T. Franz 提问时间:11/13/2023 更新时间:11/13/2023 访问量:11
返回带有 reactive-pg-client 的分页多<>
Return a paginated Multi<> with reactive-pg-client
问:
我有一个问题。 我必须实现一个 Quarkus 服务,该服务使用一些过滤器设置的 REST 请求,这些设置用于从 TimescaleDB/PostgreSQL 请求大型数据集(可能是 >10.000.000 个条目)。
REST 请求只有三个字段(短class_id、OffsetDateTime time_min、OffsetDateTime time_max)。
My Resource 类如下所示:
@Path( "/export" )
public class DataExporterResource {
@Inject
io.vertx.mutiny.pgclient.PgPool client;
@Inject
RequestValidator helper;
private static final Logger LOGGER = Logger.getLogger( DataExporterResource.class );
@Path( "/test-data" )
@POST
@Consumes( MediaType.APPLICATION_JSON )
@Produces( MediaType.APPLICATION_JSON )
@APIResponses( value = { @APIResponse( responseCode = "200", description = "Success", content = @Content( mediaType = "application/json" ) ),
@APIResponse( responseCode = "400", description = "Bad Request", content = @Content( mediaType = "application/json" ) ) } )
public Multi< TestData > exportTestData( TestRequest request ) {
LOGGER.info( "Test Request received" );
LOGGER.debug( request.toString() );
try {
LOGGER.debug( "Validating request" );
helper.validateRequest( request );
ResponseBuilder.create( RestResponse.Status.ACCEPTED );
} catch( ConstraintViolationException e ) {
LOGGER.warn( e.getConstraintViolations() );
ResponseBuilder.create( RestResponse.Status.BAD_REQUEST );
}
TestExporter exporter = new TestExporter( client, request );
Multi< TestData > result = exporter.process();
return result;
}
}
执行数据库查询等操作的 Exporter 类如下所示:
public class TestExporter {
private static final Logger LOGGER = Logger.getLogger( TestExporter.class );
private final TestRequest request;
private PreparedQuery< RowSet< Row > > preparedQueryStatement;
private List< Object > tupleList;
public TestExporter( PgPool client, TestRequest request ) {
this.request = request;
this.tupleList = new ArrayList<>();
this.preparedQueryStatement = client.preparedQuery( preparePreparedStatement() );
}
public Multi< TestData > process() {
LOGGER.info( "Processing Test export request" );
Multi< TestData > result =
this.preparedQueryStatement.execute( Tuple.tuple( tupleList ) ).onItem().transformToMulti( set -> Multi.createFrom().iterable( set ) )
.onItem().transform( TestExporter::from );
return result;
}
private String preparePreparedStatement() {
String selectSql = "SELECT * FROM test WHERE class_id = $1 AND timestamp >= $2 and timestamp <= $3 ORDER BY timestamp ASC;";
tupleList.add( request.class_id );
tupleList.add( request.time_min );
tupleList.add( request.time_max );
return selectSql;
}
private static TestData from( Row row ) {
TestData test = new TestData();
// set values
return test;
}
现在我遇到了一个问题,当我尝试查询大量数据时,内存会上升到很多,并且在某个时候会出现堆空间问题。还出现了一些警告:“(vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-6,5,main] 已被阻止 3533 毫秒,时间限制为 2000 毫秒:io.vertx.core.VertxException:线程被阻止”。
现在我正在寻找实现某种分页的可能性。我的首选方法是通过获取 time_min 和 time_max 值来实现分页,将这个时间范围缩短到例如每小时。每个小时将比要处理的“页面”还要多。
reactive-pg-client 将 Multi 返回给我直接返回给客户端。因此,我的目标是,重新处理每个页面的所有内容,将 Multi 或类似的东西发送回客户端,释放资源并处理下一页(小时)。但我不知道我怎么能实现这样的行为。我在互联网上没有找到这方面的任何内容。
至少我最大的问题是如何重新设计应用程序,以便它使用REST请求信息,查询例如一个小时的数据,将这些数据流式传输回客户端,最好是在需要时释放资源并继续下一个小时(页面)。这样我就不会炸毁堆空间,而且回复更像是一个流,而不是一个巨大的一次性回复。我认为这应该主要影响 TestExporter 类中的进程方法。
当然,我可以轻松地向 REST API 添加一些 LIMIT 和 OFFSET 并让客户端处理此问题,但首先它很慢,尤其是当数据集很大而且也不是很方便时。
如果有人能帮助我,或者至少提出一个更好的解决方案,那就太好了。 谢谢:-)
答: 暂无答案
评论