欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ThreadPool定时重试

程序员文章站 2022-04-04 16:50:26
...
项目需要当某事件触发时,执行http请求任务,失败时需要有重试机制,并根据失败次数的增加,重试间隔也相应增加,任务可能并发。
由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。
为了解决不定间隔的重试,选择Timer和TimerTask来完成

package threadpool;

public class ThreadPoolTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("start");
		
		ThreadPoolManager poolManager = new ThreadPoolManager(3);
		poolManager.start();

		MyTaskList list = new MyTaskList(poolManager);
		
		new MyTask(list, "A").start();
		new MyTask(list, "B").start();
		new MyTask(list, "C").start();
		new MyTask(list, "D").start();
		new MyTask(list, "E").start();
		new MyTask(list, "F").start();
		new MyTask(list, "G").start();

		try {
			Thread.sleep(30000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		poolManager.stop();
		
		System.out.println("stop");
		
	}

}

package threadpool;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolManager {

	/** 线程池的大小 */
	private int poolSize;
	private static final int MIN_POOL_SIZE = 1;
	private static final int MAX_POOL_SIZE = 10;
	/** 线程池 */
	private ExecutorService threadPool;
	/** 请求队列 */
	private LinkedList<ThreadPoolTask> asyncTasks;
	/** 轮询线程 */
	private Thread poolThread;
	/** 轮询时间 */
	private static final int SLEEP_TIME = 200;

	public ThreadPoolManager(int poolSize) {
		if (poolSize < MIN_POOL_SIZE)
			poolSize = MIN_POOL_SIZE;
		if (poolSize > MAX_POOL_SIZE)
			poolSize = MAX_POOL_SIZE;
		this.poolSize = poolSize;
		threadPool = Executors.newFixedThreadPool(this.poolSize);
		asyncTasks = new LinkedList<ThreadPoolTask>();
	}

	/**
	 * 向任务队列中添加任务
	 * 
	 * @param task
	 */
	public void addAsyncTask(ThreadPoolTask task) {
		synchronized (asyncTasks) {
			// Log.i(TAG, "add task: " + task.getURL());
			asyncTasks.addLast(task);
		}
	}

	/**
	 * 从任务队列中提取任务
	 * 
	 * @return
	 */
	private ThreadPoolTask getAsyncTask() {
		synchronized (asyncTasks) {
			if (asyncTasks.size() > 0) {
				ThreadPoolTask task = asyncTasks.removeFirst();
				// Log.i(TAG, "remove task: " + task.getURL());
				return task;
			}
		}
		return null;
	}

	/**
	 * 开启线程池轮询
	 * 
	 * @return
	 */
	public void start() {
		if (poolThread == null) {
			poolThread = new Thread(new PoolRunnable());
			poolThread.start();
		}
	}

	/**
	 * 结束轮询,关闭线程池
	 */
	public void stop() {
		poolThread.interrupt();
		poolThread = null;
	}

	/**
	 * 实现轮询的Runnable
	 * 
	 * @author carrey
	 * 
	 */
	private class PoolRunnable implements Runnable {
		@Override
		public void run() {
			// Log.i(TAG, "开始轮询");
			try {
				while (!Thread.currentThread().isInterrupted()) {
					ThreadPoolTask task = getAsyncTask();
					if (task == null) {
						try {
							Thread.sleep(SLEEP_TIME);
						} catch (InterruptedException e) {
							Thread.currentThread().interrupt();
						}
						continue;
					}
					threadPool.execute(task);
				}
			} finally {
				threadPool.shutdown();
			}
			// Log.i(TAG, "结束轮询");
		}
	}
}

package threadpool;


public class ThreadPoolTask implements Runnable {

	private String tag;
	
	private Callback callback;
	
	public ThreadPoolTask(String tag, Callback callback) {
		this.tag = tag;
		this.callback = callback;
	}

	@Override
	public void run() {
		System.out.println(tag + " is running on " + Thread.currentThread());
		
		try {
			// 模拟耗时任务
			Thread.sleep(700);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		if (callback != null)
			callback.onRetry();
	}

	public interface Callback {
		public void onRetry();
	}

}

package threadpool;

import java.lang.reflect.Field;
import java.util.Timer;
import java.util.TimerTask;

public class MyTaskList {

	private ThreadPoolManager poolManager;

	private Timer timer;

	public MyTaskList(ThreadPoolManager poolManager) {
		this.poolManager = poolManager;

		timer = new Timer();
	}

	public void addTask(ThreadPoolTask task) {
		if (task != null)
			poolManager.addAsyncTask(task);
	}

	public void addTask(TimerTask task, long delay) {
		// 重置TimerTask,不然会发生Exception
		try {
			Class<?> clazz = TimerTask.class;
			Field field = clazz.getDeclaredField("state");
			field.setAccessible(true);
			field.set(task, 0);
		} catch (Exception e) {
		}

		timer.schedule(task, delay);
	}

}

package threadpool;

import java.util.TimerTask;

import threadpool.ThreadPoolTask.Callback;

public class MyTask implements Callback {

	private MyTaskList list;
	private ThreadPoolTask task;

	private String tag;

	private int retry = 0;

	public MyTask(MyTaskList list, String tag) {
		this.list = list;
		this.tag = tag;
	}

	public void start() {
		task = new ThreadPoolTask(tag, this);
		start(0);
	}

	private void start(int retry) {
		// 最多重试3次
		if (retry >= 3) {
			System.out.println(tag + " finished " + Thread.currentThread());
			return;
		}

		doSomething();

		this.retry = retry;

		list.addTask(task);
	}

	@Override
	public void onRetry() {
		// 重试间隔
		list.addTask(timertask, 1000);
	}

	private TimerTask timertask = new TimerTask() {

		@Override
		public void run() {
			start(retry + 1);
		}

	};

	private void doSomething() {
		System.out.println("Retry[" + retry + "] " + tag + " on "
				+ Thread.currentThread());
	}
}