我想模拟以下场景:多个使用者,生产者线程正在修改一些数据
建立
BlockingQueue<String> q1 = new SynchronousQueue<String>(); BlockingQueue<String> q2 = new SynchronousQueue<String>(); Producer dataProducer = new Producer(q1); // publish to q1 Filter1 filter1 = new Filter1(q1,q2); // read from q1,publish to q2 Filter2 filter2 = new Filter2(q2); // read from q2 new Thread(dataProducer,"Producer-Thread").start(); new Thread(filter1,"Filter1-Thread").start(); new Thread(filter2,"Filter2-Thread").start();
制片人
public void run() { try { while (true) { this.q.put(saySomething()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } public String saySomething() { return "Something"; }
过滤1
public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String take) { //Modify data according to some rules String newData = take.replace("m","-"); produce(newData); } private void produce(String newData) { // put new data in queue out qOut.add(newData); // <-- Stacktrace points here }
过滤2
public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String s) { System.out.println("Something became: " + s); }
因此,回顾一下:Producer将一些内容放入Filter 1读取的队列中.它修改数据并将其发布到Filter 2读取的另一个队列.过滤器2打印最终数据.
此代码失败
Exception in thread "Thread-2" java.lang.IllegalStateException: Queue full
你能帮我理解为什么吗?
解决方法
你应该使用put()而不是add()一个SynchronousQueue总是满和空,它没有深度. put()将告诉队列暂停此线程,直到另一个线程进入以从队列中取出该元素.
add()方法只有在线程等待时才会成功,如果没有线程在等待,您将获得Queue Full异常.