java BlockingQueue 阻塞队列版多线程消费生产实例
程序员文章站
2022-04-21 09:23:21
...
BlockingQueue为阻塞队列,它的实现形式有许多中,有固定的,链表的等等。机制都一样,让进入的线程加入等待
以下是BlockingQueue 阻塞队列版多线程消费生产实例:
public class Fetcher implements Runnable { private BlockingQueue<String> queue = null; public Fetcher(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { // TODO Auto-generated method stub try { int i=0; /*while(true) {*/ for(i=0;i<5;i++) { queue.put("segment-name-"+i); System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成"); } Thread.sleep(50000); int j=0; for(j=0;j<25;j++) { queue.put("segment-name-"+(j+i)); System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成"); } System.out.println(Thread.currentThread().getName()+"thread--------"+(i+j)); } catch (InterruptedException e) { // TODO: handle exception e.printStackTrace(); } } }
生产者
public class Indexer implements Runnable { private BlockingQueue<String> queue; public Indexer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { // TODO Auto-generated method stub try{ /*while(true) {*/ int i; for(i=0;i<10;i++) { Thread.sleep(1000); String name = queue.take(); System.out.println("ThreadName : " +Thread.currentThread().getName()+ " 索引创建完成 " +name); } }catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
消费者
package com.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestConsumer { private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); Fetcher producer = new Fetcher(queue); Indexer consumer = new Indexer(queue); Indexer consumerSecond = new Indexer(queue); service.submit(producer); service.submit(consumer); service.submit(consumerSecond); try{ Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
主函数