生产者消费者模式,基于阻塞队列 javaBlockingQueue
程序员文章站
2022-04-21 10:32:27
...
基于阻塞队列可以分容易实现生产者消费者模式
基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。
应用场景
服务器段分发器的处理、消息队列实现等等
核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue
下面一个简单的例子
生产者
消费者
客户端
基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。
应用场景
服务器段分发器的处理、消息队列实现等等
核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue
下面一个简单的例子
生产者
package com.gbcom.java.blockqueue; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生产者线程。 * * @author SYZ * @date 2016-12-8 下午02:07:34 * @version 1.0.0 * @see com.gbcom.java.blockqueue.Producer */ public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning) { System.out.println("正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("将数据:" + data + "放入队列..."); if (queue.size() >= 5) { System.out .println("/***************** clear**********************/"); queue.clear(); } queue.put(data); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者线程!"); } } public void stop() { isRunning = false; } }
消费者
package com.gbcom.java.blockqueue; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 消费线程 * * @author SYZ * @date 2016-12-8 下午02:07:24 * @version 1.0.0 * @see com.gbcom.java.blockqueue.Consumer */ public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; private static AtomicInteger count = new AtomicInteger(); public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列获取数据..."); String data = queue.take(); if (null != data) { System.out.println("拿到数据:" + data + " : queue size = " + queue.size()); System.out.println(Thread.currentThread().getName() + " - 正在消费数据:" + data + "::::consumer times=" + count.incrementAndGet()); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } }
客户端
package com.gbcom.java.blockqueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; /** * 生产者消费者客户端 * * @author syz * @date 2014-7-2 * @version v1.0.0 * @see com.gbcom.java.blockqueue.BlockingQueueClient */ public class BlockingQueueClient { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // Consumer consumer2 = new Consumer(queue); // Consumer consumer3 = new Consumer(queue); // 借助Executors ThreadPoolExecutor service = (ThreadPoolExecutor) Executors .newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); // service.execute(consumer2); // service.execute(consumer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); System.out.println("active count = " + service.getActiveCount()); // producer1.stop(); // producer2.stop(); // producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } }
上一篇: 券池重构
下一篇: BlockingQueue 源码分析
推荐阅读