任务处理队列简单demo2
程序员文章站
2024-01-05 10:33:22
...
本篇在上篇的基础上进行扩展。可以用来处理一些单线程基本任务。比如发邮件,发短信等。 具体业务请实现IHandler接口。
package com;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TaskConsumer<T> implements Runnable {
private ConcurrentLinkedQueue<T> queue;
private IHandler<T> handler;
private boolean stop;
public boolean isStop() {
return stop;
}
public void setStop(boolean stop) {
this.stop = stop;
}
public IHandler<T> getHandler() {
return handler;
}
public void setHandler(IHandler<T> handler) {
this.handler = handler;
}
public ConcurrentLinkedQueue<T> getQueue() {
return queue;
}
public void setQueue(ConcurrentLinkedQueue<T> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!stop) {
T poll = queue.poll();
if (poll != null) {
handler.handle(poll);
}
}
}
}
package com;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TaskQueue<T> {
private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private TaskConsumer<T> consumer = new TaskConsumer<>();
private boolean isStart;
public Thread start(IHandler<T> handler) {
if (isStart) {
throw new RuntimeException("has started");
}
isStart = true;
consumer.setQueue(queue);
consumer.setHandler(handler);
Thread thread = new Thread(consumer);
thread.start();
return thread;
}
public void put(T t) {
queue.add(t);
}
public int size() {
return queue.size();
}
public void stop() {
consumer.setStop(true);
}
}
测试:
public static void main(String[] args) throws InterruptedException {
TaskQueue<String> taskQueue = new TaskQueue<>();
taskQueue.start(new IHandler<String>() {
@Override
public void handle(String t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(t);
}
});
for (int i = 0; i < 20; i++) {
taskQueue.put("来来来"+i);
}
Thread.currentThread().sleep(3000);
}