提问人:szg12345 提问时间:11/14/2023 更新时间:11/19/2023 访问量:44
在多个线程上运行时出现 Quarkus 事务错误
Quarkus Transaction error when running on multiple threads
问:
当我运行post到 http://localhost:80/poc/copyOne 时,代码运行良好,但是当我卷曲 http://localhost:80/poc/streamHPO/2 时,它失败并显示以下消息:
无法使用 EntityManager/Session,因为事务和 CDI 请求上下文都处于活动状态。请考虑将@Transactional添加到方法中以自动激活事务,或者@ActivateRequestContext如果您有正当理由不使用事务
我正在使用MSSQL服务器(不确定它是否相关),以及HighPriorityOpenEntity和ComPocTtEntity的PanacheEntityBase-s
@GET
@Path("/streamHPO/{maxt}")
public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
//get the largest id
//start on a new thread
new Thread(() ->
{
try {
streamHPOThread(maxt);
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
return Response.ok().build();
}
@POST
@Transactional
@Path("/copyOne")
public void copyOne(long id)
{
ComPocTtEntity ComPocTtEntityn = ComPocTtEntity.findById(id);
System.out.println("Id: " + ComPocTtEntityn.getId());
//Copy item to HighPriorityOpenEntity
HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
highPriorityOpenEntityn.setId(ComPocTtEntityn.getId());
highPriorityOpenEntityn.setName(ComPocTtEntityn.getName());
highPriorityOpenEntityn.setDescription(ComPocTtEntityn.getDescription());
highPriorityOpenEntityn.setType(ComPocTtEntityn.getType());
highPriorityOpenEntityn.setOwner(ComPocTtEntityn.getOwner());
highPriorityOpenEntityn.setCreated(ComPocTtEntityn.getCreated());
highPriorityOpenEntityn.setUpdated(ComPocTtEntityn.getUpdated());
highPriorityOpenEntityn.setDue(ComPocTtEntityn.getDue());
highPriorityOpenEntityn.setResolved(ComPocTtEntityn.getResolved());
highPriorityOpenEntityn.setClosed(ComPocTtEntityn.getClosed());
highPriorityOpenEntityn.setSeverity(ComPocTtEntityn.getSeverity());
//Persist item and flush
highPriorityOpenEntityn.persistAndFlush();
}
public void streamHPOThread(int maxt) throws IOException {
//Get all HIGH priority OPEN status items
List<ComPocTtEntity> ComPocTtEntityList = ComPocTtEntity.listAll();
List<Long> HPOids = ComPocTtEntityList.stream()
.filter(ComPocTtEntity -> ComPocTtEntity.getPriority().equals("HIGH"))
.filter(ComPocTtEntity -> ComPocTtEntity.getStatus().equals("OPEN"))
.toList().stream().map(ComPocTtEntity::getId).toList();
System.out.println("HPO size: " + HPOids.size());
int s = HPOids.size();
List<Thread> threads = new ArrayList<>();
for (int c = 0; c < s; c++) {
while (threads.size() >= maxt)
for (int t = 0; t < threads.size(); t++)
if (!threads.get(t).isAlive())
threads.remove(t);
//Get first item
long id = HPOids.get(c);
//Start copyone on a new thread
threads.add(new Thread(() ->
copyOne(id)
));
threads.get(c).start();
System.out.println("HPO: " + c + " of " + s);
}
}
答:
0赞
CodingSamples
11/17/2023
#1
您不应该自己创建线程。要将 CDI 和 JTA 用于多个线程,您可以注入或创建受管执行程序。此外,为了获得事务,您应该将事务方法移动到另一个 bean。
@ApplicationScope
public class Service {
@Inject
Logger logger;
@Transactional
public void copyOne(long id)
{
ComPocTtEntity comPocTtEntityn = ComPocTtEntity.findById(id);
logger.info("Id: " + comPocTtEntityn.getId());
//Copy item to HighPriorityOpenEntity
HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
highPriorityOpenEntityn.setId(comPocTtEntityn.getId());
highPriorityOpenEntityn.setName(comPocTtEntityn.getName());
highPriorityOpenEntityn.setDescription(comPocTtEntityn.getDescription());
highPriorityOpenEntityn.setType(comPocTtEntityn.getType());
highPriorityOpenEntityn.setOwner(comPocTtEntityn.getOwner());
highPriorityOpenEntityn.setCreated(comPocTtEntityn.getCreated());
highPriorityOpenEntityn.setUpdated(comPocTtEntityn.getUpdated());
highPriorityOpenEntityn.setDue(comPocTtEntityn.getDue());
highPriorityOpenEntityn.setResolved(comPocTtEntityn.getResolved());
highPriorityOpenEntityn.setClosed(comPocTtEntityn.getClosed());
highPriorityOpenEntityn.setSeverity(comPocTtEntityn.getSeverity());
//Persist item and flush
highPriorityOpenEntityn.persistAndFlush();
}
}
@Path("/")
public class Endpoint {
@Inject
Logger logger;
@Inject
Service service;
@POST
@Path("/copyOne")
public void copyOne(long id) {
service.copyOne(id);
}
@Path("/streamHPO/{maxt}")
public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(maxt)
.propagated(ThreadContext.CDI, ThreadContext.TRANSACTION)
.build();
//Get all HIGH priority OPEN status items
List<ComPocTtEntity> comPocTtEntityList = ComPocTtEntity.listAll();
List<Long> hpoIdList = comPocTtEntityList.stream()
.filter(e -> e.getPriority().equals("HIGH"))
.filter(e -> e.getStatus().equals("OPEN"))
.toList().stream().map(ComPocTtEntity::getId).toList();
int s = hpoIdList.size();
int i = 0;
logger.info("HPO size: " + s);
for (long id : hpoIdList) {
executor.runAsync(() => {
service.copyOne(id);
});
logger.info("HPO: " + i + " of " + s);
++i;
}
return Response.ok().build();
}
}
我不完全确定,但我认为传播 CDI 上下文应该足够了,因为另一个 CDI bean 具有事务上下文。
https://github.com/eclipse/microprofile-context-propagation/tree/main
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(maxt)
.propagated(ThreadContext.CDI)
.build();
评论