欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java BlockingQueue 阻塞队列版多线程消费生产实例

程序员文章站 2022-04-21 09:23:21
...

BlockingQueue为阻塞队列,它的实现形式有许多中,有固定的,链表的等等。机制都一样,让进入的线程加入等待

 

以下是BlockingQueue 阻塞队列版多线程消费生产实例:

 

public class Fetcher implements Runnable {
	
	private BlockingQueue<String> queue = null;
	
	public Fetcher(BlockingQueue<String> queue)
	{
		this.queue = queue;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try
		{
			int i=0;
			/*while(true)
			{*/
			for(i=0;i<5;i++)
			{
				queue.put("segment-name-"+i);
				System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成");
				
			}
			Thread.sleep(50000);
			int j=0;
			for(j=0;j<25;j++)
			{
				queue.put("segment-name-"+(j+i));
				System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成");
			}
			System.out.println(Thread.currentThread().getName()+"thread--------"+(i+j));
		}
		catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	

}

 

   生产者

 

 

 

public class Indexer implements Runnable {

	private BlockingQueue<String> queue;
	public Indexer(BlockingQueue<String> queue)
	{
		this.queue = queue;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			/*while(true)
			{*/
			int i;
			for(i=0;i<10;i++)
			{
				Thread.sleep(1000);
				String name = queue.take();
				System.out.println("ThreadName : " +Thread.currentThread().getName()+ " 索引创建完成 " +name);  
			}
		}catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
		
	}

}

   消费者

 

 

package com.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestConsumer {
	
	private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);

	public static void main(String[] args)
	{
		ExecutorService service = Executors.newCachedThreadPool();
		
		Fetcher producer = new Fetcher(queue);
		Indexer consumer = new Indexer(queue);
		Indexer consumerSecond = new Indexer(queue);
		service.submit(producer);
		service.submit(consumer);
		service.submit(consumerSecond);
		
		try{
			Thread.sleep(5000);
		}
		catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}

   主函数