Java 阻止列表实现

Java Blocking List Implementation

提问人:Daniel Gerber 提问时间:4/27/2015 最后编辑:MarounDaniel Gerber 更新时间:4/27/2015 访问量:6797

问:

我在 SO 和 Google 上搜索了这个问题的答案,但到目前为止还没有找到合适的解决方案。

我目前正在处理图形路由问题中的 LayerManager。经理负责提供和重置一组固定的层。

我想通过阻止列表实现消费者-生产者模式,这样只要没有可用的空闲层,传入的路由请求就会被阻止。到目前为止,我只找到了一个阻塞队列,但由于我们不需要 FIFO、LIFO 而是随机访问队列,因此队列并不真正起作用。更准确地说,这样的事情应该是可能的:

/* this should be blocking until a layer becomes available */
public Layer getLayer(){ 

    for ( Layer layer : layers ) {
        if ( layer.isUnused() && layer.matches(request) )
            return layers.pop(layer);
    }
}

有什么办法可以做到这一点吗?

Java 列表 阻止

评论

0赞 StanislavL 4/27/2015
java.util.concurrent.PriorityBlockingQueue 和你自己的比较器呢?
0赞 Daniel Gerber 4/27/2015
谢谢。好吧,在我看来,层并没有真正的可比性。它们仅与给定的请求匹配。
0赞 Guillaume 6/13/2019
@Daniel,我知道这是一个老问题,但是您实施了什么解决方案来解决这个问题?

答:

0赞 user4624062 4/27/2015 #1

你要找的叫做“信号量”。

  1. 创建 Semaphore 类
  2. 将其作为字段添加到 Layer 类


 public class Semaphore 
{
    private boolean signal = false;

    public synchronized boolean take() 
    {
       if(this.signal==true)
            return false;  //already in use
       this.signal = true;
       this.notify();
       return true;
    }

     public synchronized void release() throws InterruptedException
     {
        while(!this.signal) wait();
        this.signal = false;
     }


     public boolean isUnused()
     {
         return !signal ;
     }

}


//2.
class Layer
{
    Semaphore sem =null;

    /*your code*/
     /*sem = new Semaphore(); in constructors*/
    public boolean take()
    {
        return this.sem.take();
    }

    public void release()
    {
        this.sem.release();
    }

    public Layer getLayer()
    { 

        for ( Layer layer : layers ) 
        {
         if ( layer.matches(request) && layer.take())
             return layer;
        }

         return null;
    }
}


同步方法处理访问并发

3.在 getLayer 上循环直到

Layer l=null;
while(l==null)
{
    l= getlayer();
    Thread.sleep(100); //set time
}
 // continue
 // do not forget to release the layer when you are done

评论

1赞 Daniel Gerber 4/27/2015
这就是我现在所做的,我不喜欢这种通知/等待模式。它比阻止更容易出错
0赞 dieter 4/27/2015 #2

尝试使用 .这个想法是在 .每个请求都有自己的队列。Map<String, BlockingQueue<Layer>>BlockingQueue

public class LayerQueue {

    Map<String, BlockingQueue<Layer>> freeLayers = Collections.synchronizedMap(new HashMap<String, BlockingQueue<Layer>>());

    public LayerQueue() {
        //init QUEUEs
        freeLayers.put("request-1", new ArrayBlockingQueue<Layer>(1)); // one to one...
        freeLayers.put("request-2", new ArrayBlockingQueue<Layer>(1));
        [...] 
    }

    public void addUnusedLayer(Layer layer, String request) {
        BlockingQueue<Layer> freeLayersForRequest = freeLayers.get(request);
        freeLayersForRequest.add(layer);
    }

    public Layer getLayer(String request) {

        BlockingQueue<Layer> freeLayersForRequest = freeLayers.get(request);

        try {
            return freeLayersForRequest.take(); // blocks until a layer becomes available
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

评论

0赞 Daniel Gerber 4/27/2015
1 每个请求有一个图层,而且地图大小不固定
0赞 dieter 4/27/2015
@DanielGerber为什么它应该是一个问题呢?
0赞 Daniel Gerber 4/27/2015
因为 1 层可能需要高达 500mb 的 RAM。
0赞 dieter 4/27/2015
@DanielGerber 即使它需要 4GB,它仍然可以工作。您正在使用引用。
0赞 Atuos 4/27/2015 #3

我不太确定我是否正确理解您的需求,但您可以使用阻塞队列并将结果放入列表中。如果在列表中找不到合适的图层,请调用 wait() 并在从队列中将新项目添加到列表中时再次检查。这听起来像是在概念上可以工作,即使下面的代码没有正确处理(我很确定这没有完全同步)

public class PredicateBlockingQueue<Product> {

private final List<Product> products = new LinkedList<Product>();
private final BlockingQueue<Product> queue;
private final Thread consumer;

public PredicateBlockingQueue(int capacity) {
    queue = new ArrayBlockingQueue<Product>(capacity);

    consumer = new Thread() {
        @Override
        public void run() {
            while(!Thread.interrupted()) {
                try {
                    products.add(queue.take());
                    synchronized(queue) {
                        queue.notifyAll();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    consumer.start();
}

public void put(Product product) throws InterruptedException {
    queue.put(product);
}

public Product take(Predicate<Product> predicate) throws InterruptedException {
    Product product;
    while((product=find(predicate))==null) {
        synchronized(queue) {
            queue.wait();
        }
    }
    return product;
}

private synchronized Product find(Predicate<Product> predicate) {
    Iterator<Product> it = products.iterator();
    while(it.hasNext()) {
        Product product = it.next();
        if(predicate.test(product)) {
            it.remove();
            return product;
        }
    }
    return null;
}