返回带有 reactive-pg-client 的分页多<>

Return a paginated Multi<> with reactive-pg-client

提问人:T. Franz 提问时间:11/13/2023 更新时间:11/13/2023 访问量:11

问:

我有一个问题。 我必须实现一个 Quarkus 服务,该服务使用一些过滤器设置的 REST 请求,这些设置用于从 TimescaleDB/PostgreSQL 请求大型数据集(可能是 >10.000.000 个条目)。

REST 请求只有三个字段(短class_id、OffsetDateTime time_min、OffsetDateTime time_max)。

My Resource 类如下所示:

@Path( "/export" )
public class DataExporterResource {

  @Inject
  io.vertx.mutiny.pgclient.PgPool client;

  @Inject
  RequestValidator helper;

  private static final Logger LOGGER = Logger.getLogger( DataExporterResource.class );

  @Path( "/test-data" )
  @POST
  @Consumes( MediaType.APPLICATION_JSON )
  @Produces( MediaType.APPLICATION_JSON )
  @APIResponses( value = { @APIResponse( responseCode = "200", description = "Success", content = @Content( mediaType = "application/json" ) ),
                           @APIResponse( responseCode = "400", description = "Bad Request", content = @Content( mediaType = "application/json" ) ) } )
  public Multi< TestData > exportTestData( TestRequest request ) {
    LOGGER.info( "Test Request received" );
    LOGGER.debug( request.toString() );
    try {
      LOGGER.debug( "Validating request" );
      helper.validateRequest( request );
      ResponseBuilder.create( RestResponse.Status.ACCEPTED );
    } catch( ConstraintViolationException e ) {
      LOGGER.warn( e.getConstraintViolations() );
      ResponseBuilder.create( RestResponse.Status.BAD_REQUEST );
    }
    TestExporter exporter = new TestExporter( client, request );
    Multi< TestData > result = exporter.process();
    return result;
  }
}

执行数据库查询等操作的 Exporter 类如下所示:

public class TestExporter {

  private static final Logger LOGGER = Logger.getLogger( TestExporter.class );

  private final TestRequest request;

  private PreparedQuery< RowSet< Row > > preparedQueryStatement;

  private List< Object > tupleList;

  public TestExporter( PgPool client, TestRequest request ) {
    this.request = request;
    this.tupleList = new ArrayList<>();
    this.preparedQueryStatement = client.preparedQuery( preparePreparedStatement() );
  }

  public Multi< TestData > process() {
    LOGGER.info( "Processing Test export request" );
    Multi< TestData > result =
        this.preparedQueryStatement.execute( Tuple.tuple( tupleList ) ).onItem().transformToMulti( set -> Multi.createFrom().iterable( set ) )
                                   .onItem().transform( TestExporter::from );
    return result;
  }

  private String preparePreparedStatement() {
    String selectSql = "SELECT * FROM test WHERE class_id = $1 AND timestamp >= $2 and timestamp <= $3 ORDER BY timestamp ASC;";
    tupleList.add( request.class_id );
    tupleList.add( request.time_min );
    tupleList.add( request.time_max );
    return selectSql;
  }

  private static TestData from( Row row ) {
    TestData test = new TestData();
    // set values
    return test;
  }

现在我遇到了一个问题,当我尝试查询大量数据时,内存会上升到很多,并且在某个时候会出现堆空间问题。还出现了一些警告:“(vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-6,5,main] 已被阻止 3533 毫秒,时间限制为 2000 毫秒:io.vertx.core.VertxException:线程被阻止”。

现在我正在寻找实现某种分页的可能性。我的首选方法是通过获取 time_min 和 time_max 值来实现分页,将这个时间范围缩短到例如每小时。每个小时将比要处理的“页面”还要多。

reactive-pg-client 将 Multi 返回给我直接返回给客户端。因此,我的目标是,重新处理每个页面的所有内容,将 Multi 或类似的东西发送回客户端,释放资源并处理下一页(小时)。但我不知道我怎么能实现这样的行为。我在互联网上没有找到这方面的任何内容。

至少我最大的问题是如何重新设计应用程序,以便它使用REST请求信息,查询例如一个小时的数据,将这些数据流式传输回客户端,最好是在需要时释放资源并继续下一个小时(页面)。这样我就不会炸毁堆空间,而且回复更像是一个流,而不是一个巨大的一次性回复。我认为这应该主要影响 TestExporter 类中的进程方法。

当然,我可以轻松地向 REST API 添加一些 LIMIT 和 OFFSET 并让客户端处理此问题,但首先它很慢,尤其是当数据集很大而且也不是很方便时。

如果有人能帮助我,或者至少提出一个更好的解决方案,那就太好了。 谢谢:-)

Quarkus Vert.X 反应性 叛变 pgpool

评论

0赞 tsegismont 11/14/2023
这可能是很好的起点 quarkus.io/blog/mutiny-pagination

答: 暂无答案