欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

生产者消费者模式,基于阻塞队列 javaBlockingQueue 

程序员文章站 2022-04-21 10:32:27
...
基于阻塞队列可以分容易实现生产者消费者模式

基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。


应用场景
服务器段分发器的处理、消息队列实现等等

核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue


下面一个简单的例子
生产者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生产者线程。
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:34
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Producer
 */
public class Producer implements Runnable {

	private volatile boolean isRunning = true;
	private BlockingQueue queue;
	private static AtomicInteger count = new AtomicInteger();
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		String data = null;
		Random r = new Random();

		System.out.println("启动生产者线程!");
		try {
			while (isRunning) {
				System.out.println("正在生产数据...");
				Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

				data = "data:" + count.incrementAndGet();
				System.out.println("将数据:" + data + "放入队列...");
				if (queue.size() >= 5) {
					System.out
							.println("/***************** clear**********************/");
					queue.clear();
				}
				queue.put(data);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出生产者线程!");
		}
	}

	public void stop() {
		isRunning = false;
	}

}





消费者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消费线程
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:24
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Consumer
 */
public class Consumer implements Runnable {
	private BlockingQueue<String> queue;
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
	private static AtomicInteger count = new AtomicInteger();

	public Consumer(BlockingQueue<String> queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("启动消费者线程!");
		Random r = new Random();
		boolean isRunning = true;
		try {
			while (isRunning) {
				System.out.println("正从队列获取数据...");
				String data = queue.take();
				if (null != data) {
					System.out.println("拿到数据:" + data + "  : queue size = "
							+ queue.size());
					System.out.println(Thread.currentThread().getName()
							+ " - 正在消费数据:" + data + "::::consumer times="
							+ count.incrementAndGet());
					Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
				} else {
					isRunning = false;
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出消费者线程!");
		}
	}

}



客户端
package com.gbcom.java.blockqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 生产者消费者客户端
 * 
 * @author syz
 * @date 2014-7-2
 * @version v1.0.0
 * @see com.gbcom.java.blockqueue.BlockingQueueClient
 */
public class BlockingQueueClient {

	public static void main(String[] args) throws InterruptedException {
		// 声明一个容量为10的缓存队列
		BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

		Producer producer1 = new Producer(queue);
		Producer producer2 = new Producer(queue);
		Producer producer3 = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		// Consumer consumer2 = new Consumer(queue);
		// Consumer consumer3 = new Consumer(queue);

		// 借助Executors
		ThreadPoolExecutor service = (ThreadPoolExecutor) Executors
				.newCachedThreadPool();
		// 启动线程
		service.execute(producer1);
		service.execute(producer2);
		service.execute(producer3);
		// service.execute(consumer2);
		// service.execute(consumer3);
		service.execute(consumer);

		// 执行10s
		Thread.sleep(10 * 1000);

		System.out.println("active count = " + service.getActiveCount());
		// producer1.stop();
		// producer2.stop();
		// producer3.stop();
		Thread.sleep(2000);
		// 退出Executor
		service.shutdown();
	}
}
相关标签: java BlockingQueue