在多个线程上运行时出现 Quarkus 事务错误

Quarkus Transaction error when running on multiple threads

提问人:szg12345 提问时间:11/14/2023 更新时间:11/19/2023 访问量:44

问:

当我运行post到 http://localhost:80/poc/copyOne 时,代码运行良好,但是当我卷曲 http://localhost:80/poc/streamHPO/2 时,它失败并显示以下消息:

无法使用 EntityManager/Session,因为事务和 CDI 请求上下文都处于活动状态。请考虑将@Transactional添加到方法中以自动激活事务,或者@ActivateRequestContext如果您有正当理由不使用事务

我正在使用MSSQL服务器(不确定它是否相关),以及HighPriorityOpenEntity和ComPocTtEntity的PanacheEntityBase-s

@GET
    @Path("/streamHPO/{maxt}")
    public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
        //get the largest id
        //start on a new thread
        new Thread(() ->
        {
            try {
                streamHPOThread(maxt);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).start();
        return Response.ok().build();
    }

    @POST
    @Transactional
    @Path("/copyOne")
    public void copyOne(long id)
    {
        ComPocTtEntity ComPocTtEntityn = ComPocTtEntity.findById(id);
        System.out.println("Id: " + ComPocTtEntityn.getId());
        //Copy item to HighPriorityOpenEntity
        HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
        highPriorityOpenEntityn.setId(ComPocTtEntityn.getId());
        highPriorityOpenEntityn.setName(ComPocTtEntityn.getName());
        highPriorityOpenEntityn.setDescription(ComPocTtEntityn.getDescription());
        highPriorityOpenEntityn.setType(ComPocTtEntityn.getType());
        highPriorityOpenEntityn.setOwner(ComPocTtEntityn.getOwner());
        highPriorityOpenEntityn.setCreated(ComPocTtEntityn.getCreated());
        highPriorityOpenEntityn.setUpdated(ComPocTtEntityn.getUpdated());
        highPriorityOpenEntityn.setDue(ComPocTtEntityn.getDue());
        highPriorityOpenEntityn.setResolved(ComPocTtEntityn.getResolved());
        highPriorityOpenEntityn.setClosed(ComPocTtEntityn.getClosed());
        highPriorityOpenEntityn.setSeverity(ComPocTtEntityn.getSeverity());
        //Persist item and flush
        highPriorityOpenEntityn.persistAndFlush();
    }
    public void streamHPOThread(int maxt) throws IOException {
        //Get all HIGH priority OPEN status items
        List<ComPocTtEntity> ComPocTtEntityList = ComPocTtEntity.listAll();
        List<Long> HPOids = ComPocTtEntityList.stream()
                .filter(ComPocTtEntity -> ComPocTtEntity.getPriority().equals("HIGH"))
                .filter(ComPocTtEntity -> ComPocTtEntity.getStatus().equals("OPEN"))
                .toList().stream().map(ComPocTtEntity::getId).toList();
        System.out.println("HPO size: " + HPOids.size());
        int s = HPOids.size();
        List<Thread> threads = new ArrayList<>();
        for (int c = 0; c < s; c++) {
            while (threads.size() >= maxt)
                for (int t = 0; t < threads.size(); t++)
                    if (!threads.get(t).isAlive())
                        threads.remove(t);
            //Get first item
            long id = HPOids.get(c);
            //Start copyone on a new thread
            threads.add(new Thread(() ->
                copyOne(id)
            ));
            threads.get(c).start();

            System.out.println("HPO: " + c + " of " + s);
        }
    }
java 多线程 quarkus quarkus-panache

评论

0赞 Sagar 11/14/2023
持久性上下文是用新线程开始清除的。您可以参考此处以获取类似的问题

答:

0赞 CodingSamples 11/17/2023 #1

您不应该自己创建线程。要将 CDI 和 JTA 用于多个线程,您可以注入或创建受管执行程序。此外,为了获得事务,您应该将事务方法移动到另一个 bean。

@ApplicationScope
public class Service {

    @Inject
    Logger logger;

    @Transactional
    public void copyOne(long id)
    {
        ComPocTtEntity comPocTtEntityn = ComPocTtEntity.findById(id);
        logger.info("Id: " + comPocTtEntityn.getId());
        //Copy item to HighPriorityOpenEntity
        HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
        highPriorityOpenEntityn.setId(comPocTtEntityn.getId());
        highPriorityOpenEntityn.setName(comPocTtEntityn.getName());
        highPriorityOpenEntityn.setDescription(comPocTtEntityn.getDescription());
        highPriorityOpenEntityn.setType(comPocTtEntityn.getType());
        highPriorityOpenEntityn.setOwner(comPocTtEntityn.getOwner());
        highPriorityOpenEntityn.setCreated(comPocTtEntityn.getCreated());
        highPriorityOpenEntityn.setUpdated(comPocTtEntityn.getUpdated());
        highPriorityOpenEntityn.setDue(comPocTtEntityn.getDue());
        highPriorityOpenEntityn.setResolved(comPocTtEntityn.getResolved());
        highPriorityOpenEntityn.setClosed(comPocTtEntityn.getClosed());
        highPriorityOpenEntityn.setSeverity(comPocTtEntityn.getSeverity());
        //Persist item and flush
        highPriorityOpenEntityn.persistAndFlush();
    }
}
@Path("/")
public class Endpoint {

    @Inject
    Logger logger;

    @Inject
    Service service;

    @POST
    @Path("/copyOne")
    public void copyOne(long id) {
        service.copyOne(id);
    }

    @Path("/streamHPO/{maxt}")
    public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
        
        ManagedExecutor executor = ManagedExecutor.builder()
            .maxAsync(maxt)
            .propagated(ThreadContext.CDI, ThreadContext.TRANSACTION)
            .build();

        //Get all HIGH priority OPEN status items
        List<ComPocTtEntity> comPocTtEntityList = ComPocTtEntity.listAll();
        List<Long> hpoIdList = comPocTtEntityList.stream()
                .filter(e -> e.getPriority().equals("HIGH"))
                .filter(e -> e.getStatus().equals("OPEN"))
                .toList().stream().map(ComPocTtEntity::getId).toList();
        int s = hpoIdList.size();
        int i = 0;
        logger.info("HPO size: " + s);
        for (long id : hpoIdList) {
            executor.runAsync(() => {
                service.copyOne(id);
            });

            logger.info("HPO: " + i + " of " + s);
            ++i;
        }
        return Response.ok().build();
    }
}

我不完全确定,但我认为传播 CDI 上下文应该足够了,因为另一个 CDI bean 具有事务上下文。

https://github.com/eclipse/microprofile-context-propagation/tree/main

ManagedExecutor executor = ManagedExecutor.builder()
    .maxAsync(maxt)
    .propagated(ThreadContext.CDI)
    .build();