runnable 中最终变量的空指针异常

Null pointer exception from final variable within runnable

提问人:newmanne 提问时间:6/30/2015 最后编辑:newmanne 更新时间:11/17/2023 访问量:1423

问:

我有一段代码,可以对给定的问题运行一系列算法,然后一旦一个算法找到问题的答案,程序就会继续运行。投资组合中的其他算法获得自愿终止的信号,并且执行的主线程继续进行。

此代码的一个用户向我发送了一个堆栈跟踪,该行带有 NullPointerException “结果参考.set(solverResult);” 从下面的代码中可以看出,resultReference 是最后一个变量,会立即初始化。我看不出它怎么可能变成空。我花了很长时间试图在我这边重现这个问题,但无济于事。用户堆栈跟踪上的行号与我的代码上的行号匹配。用户报告说在 3 个不同的场合看到过错误,但很少见(这不会在每次解决问题时都发生),所以可能是某种竞争条件。这是在 jdk 1.8_25 中。

我假设这个错误应该是不可能的,因为变量是最终的,这是对的吗?我不确定该如何看待此堆栈跟踪,并希望得到一些保证,这应该是不可能的。

public class ParallelSolver {

private final ListeningExecutorService executorService;
private final AtomicReference<Throwable> error;
private final List<Solver> solvers;
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(ParallelSolver.class);

public ParallelSolver(int threadPoolSize, List<Solvers> solvers) {
    executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
    error = new AtomicReference<>();
    this.solvers = solvers;
}

public SolverResult solve(Problem p) {
    final AtomicReference<SolverResult> resultReference = new AtomicReference<>();
    final List<Future> futures = new ArrayList<>();
    final Semaphore workDone = new Semaphore(0);
    try {
        // Submit one job per each solver in the portfolio
        solvers.forEach(solver -> {
            final ListenableFuture<Void> future = executorService.submit(() -> {
                SolverResult solverResult = solver.solve(p);
                if (solverResult.isConclusive()) {
                    log.debug("Signalling the blocked thread to wake up!");
                     // NPE HERE ON THIS LINE
                    resultReference.set(solverResult);
                    workDone.release(solvers.size());
                }
                log.debug("Releasing a single permit as the work for this thread is done.");
                workDone.release(1);
                log.debug("Job ending...");
                return null;
            });
            futures.add(future);
            Futures.addCallback(future, new FutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {

                }

                @Override
                public void onFailure(Throwable t) {
                    if (t instanceof CancellationException) {
                        return;
                    }
                    error.compareAndSet(null, t);
                    // Wake up the main thread (if it's still sleeping)
                    workDone.release(solvers.size());
                }
            });
        });
        // Wait for a thread to complete solving and signal you, or all threads to timeout
        log.debug("Main thread going to sleep");
        workDone.acquire(solvers.size());
        log.debug("Main thread waking up, checking for errors then cancelling futures");
        checkForErrors();
        // cancel any still to be launched futures
        futures.forEach(future -> future.cancel(false));
        log.debug("Returning now");
        return resultReference.get() == null ? SolverResult.createTimeoutResult() : resultReference.get();
    } catch (InterruptedException e) {
        throw new RuntimeException("Interrupted while running parallel job", e);
    }
}

/**
 * We want a fail-fast policy, but java executors aren't going to throw the exception on the main thread.
 * We can't call Future.get() and check for errors, because that might block.
 * So we set a variable when an error occurs, and check it here.
 */
private void checkForErrors() {
    if (error.get() != null) {
        log.error("Error occured while executing a task", error.get());
        throw new RuntimeException("Error occurred while executing a task", error.get());
    }
}
java nullpointerexception java-8

评论

0赞 FDinoff 6/30/2015
编写的代码永远不会初始化,因此它始终是 .我不知道是什么。solversnullworkDone
0赞 newmanne 6/30/2015
谢谢,更正 - 复制和粘贴代码时出现问题。workDone 是一个信号量,用于确保主线程阻塞,直到每个人都超时或某个算法找到答案。
1赞 Stuart Marks 6/30/2015
你能发布完整的堆栈跟踪吗?一个问题是 NPE 是起源于标记行还是代码中的某个位置。AtomicReference

答:

1赞 Derrops 6/30/2015 #1

这几乎是您使用 akka 想要的:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.util.Timeout;


public class AlgorithmTester extends UntypedActor
{
    public AlgorithmTester(){}
    
    public static class RegisterResultListener
    {
        
    }
    
    public static class Result
    {
        final double result;
        public Result(double result)
        {
            this.result = result;
        }
    }
    
    public static interface Algorithmable
    {
        public Result solve();
    }
    
    @SuppressWarnings("serial")
    public static class AlgorithmsToTest extends ArrayList<Algorithmable> {
    }
    
    public static class AlgorithmRunner extends UntypedActor
    {
        
        public AlgorithmRunner(){}

        @Override
        public void onReceive(Object msg) throws Exception
        {
            if (msg instanceof Algorithmable)
            {
                Algorithmable alg = (Algorithmable) msg;
                getSender().tell(alg.solve(), getSelf());
            }
        }
    }
    
    List<ActorRef> runners = new ArrayList<ActorRef>();
    List<ActorRef> resultListeners = new ArrayList<ActorRef>();
    
    @Override
    public void onReceive(Object msg) throws Exception
    {
        
        if (msg instanceof RegisterResultListener)
        {
            resultListeners.add(getSender());
        }
        else if (msg instanceof AlgorithmsToTest)
        {
            AlgorithmsToTest algorithms = (AlgorithmsToTest) msg;
            for (Algorithmable algorithm : algorithms)
            {
                ActorRef runner = getContext().actorOf(Props.create(AlgorithmRunner.class));
                runners.add(runner);
                runner.tell(algorithm, getSelf());
            }
            getSelf().tell(new RegisterResultListener(), getSender());
        }
        else if (msg instanceof Result)
        {
            for (ActorRef runner : runners)
            {
                getContext().stop(runner);
            }
            runners.clear();
            
            for (ActorRef l : resultListeners)
            {
                l.tell(msg, getSelf());
            }
        }
    }
    
    
    public static void main(String[] args)
    {
        ActorSystem system = ActorSystem.create("AlogrithmTest");
        ActorRef tester = system.actorOf(Props.create(AlgorithmTester.class), "algorithmTest");
        
        Algorithmable a1 = new Algorithmable()
       {

            public Result solve() {
                try {
                    Thread.sleep(7000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return new Result(1100.0);
                }
        };
        
        Algorithmable a2 = new Algorithmable()
       {

            public Result solve() {
                try {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return new Result(330.0);
                }
        };
        
       Algorithmable a3 = new Algorithmable()
       {

            public Result solve() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return new Result(1000);
                }
        };
        
        AlgorithmsToTest algorithmsToTest = new AlgorithmsToTest();
        algorithmsToTest.add(a1);
        algorithmsToTest.add(a2);
        algorithmsToTest.add(a3);
        
        Timeout t = new Timeout(5, TimeUnit.SECONDS);
        
        Future<Object> future = Patterns.ask(tester, algorithmsToTest, 100000);
        try {
            Result response = (Result)Await.result(future, t.duration());
            System.out.println(response.result);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Continuing on");
        system.terminate();
        System.out.println("Terminated");
    }
}

然而,在 akka 中,当参与者处理消息时,他们没有办法杀死他们,你会注意到,当参与者正在处理其他算法时,这个程序会继续执行,即使已经找到了第一个答案。 杀死线程从来都不是一件好事,所以你的问题没有很好的解决方案。我猜你可以在 main 方法的末尾标记 System.exit(0),或者在你的算法中的某个地方有一个可怕的原子变量,如果它们正在迭代并抛出异常,或者将它们作为线程并杀死它们,都不是很好): 就我个人而言,如果您可以摆脱它,我会使用 System.exit(0)。

看,这是你想做的替代代码,没有原子变量、原子变量和锁,所有其他东西都是非常危险和容易出错的,这是一个更干净的答案。更改此代码以匹配您想要的代码所需的只是 Result 或 Algorithmable 接口,并提供您想要的所有实现,这完全符合您的要求。如果这个反对票推断出你认为原子变量比基于演员的模型更可取,那么我建议你做一些阅读。 Asker 甚至没有给出空指针异常的堆栈跟踪,因此无法直接解决此问题。