ThreadPool定时重试
程序员文章站
2022-04-04 16:50:26
...
项目需要当某事件触发时,执行http请求任务,失败时需要有重试机制,并根据失败次数的增加,重试间隔也相应增加,任务可能并发。
由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。
为了解决不定间隔的重试,选择Timer和TimerTask来完成
由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。
为了解决不定间隔的重试,选择Timer和TimerTask来完成
package threadpool; public class ThreadPoolTest { /** * @param args */ public static void main(String[] args) { System.out.println("start"); ThreadPoolManager poolManager = new ThreadPoolManager(3); poolManager.start(); MyTaskList list = new MyTaskList(poolManager); new MyTask(list, "A").start(); new MyTask(list, "B").start(); new MyTask(list, "C").start(); new MyTask(list, "D").start(); new MyTask(list, "E").start(); new MyTask(list, "F").start(); new MyTask(list, "G").start(); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } poolManager.stop(); System.out.println("stop"); } }
package threadpool; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolManager { /** 线程池的大小 */ private int poolSize; private static final int MIN_POOL_SIZE = 1; private static final int MAX_POOL_SIZE = 10; /** 线程池 */ private ExecutorService threadPool; /** 请求队列 */ private LinkedList<ThreadPoolTask> asyncTasks; /** 轮询线程 */ private Thread poolThread; /** 轮询时间 */ private static final int SLEEP_TIME = 200; public ThreadPoolManager(int poolSize) { if (poolSize < MIN_POOL_SIZE) poolSize = MIN_POOL_SIZE; if (poolSize > MAX_POOL_SIZE) poolSize = MAX_POOL_SIZE; this.poolSize = poolSize; threadPool = Executors.newFixedThreadPool(this.poolSize); asyncTasks = new LinkedList<ThreadPoolTask>(); } /** * 向任务队列中添加任务 * * @param task */ public void addAsyncTask(ThreadPoolTask task) { synchronized (asyncTasks) { // Log.i(TAG, "add task: " + task.getURL()); asyncTasks.addLast(task); } } /** * 从任务队列中提取任务 * * @return */ private ThreadPoolTask getAsyncTask() { synchronized (asyncTasks) { if (asyncTasks.size() > 0) { ThreadPoolTask task = asyncTasks.removeFirst(); // Log.i(TAG, "remove task: " + task.getURL()); return task; } } return null; } /** * 开启线程池轮询 * * @return */ public void start() { if (poolThread == null) { poolThread = new Thread(new PoolRunnable()); poolThread.start(); } } /** * 结束轮询,关闭线程池 */ public void stop() { poolThread.interrupt(); poolThread = null; } /** * 实现轮询的Runnable * * @author carrey * */ private class PoolRunnable implements Runnable { @Override public void run() { // Log.i(TAG, "开始轮询"); try { while (!Thread.currentThread().isInterrupted()) { ThreadPoolTask task = getAsyncTask(); if (task == null) { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } continue; } threadPool.execute(task); } } finally { threadPool.shutdown(); } // Log.i(TAG, "结束轮询"); } } }
package threadpool; public class ThreadPoolTask implements Runnable { private String tag; private Callback callback; public ThreadPoolTask(String tag, Callback callback) { this.tag = tag; this.callback = callback; } @Override public void run() { System.out.println(tag + " is running on " + Thread.currentThread()); try { // 模拟耗时任务 Thread.sleep(700); } catch (InterruptedException e) { e.printStackTrace(); } if (callback != null) callback.onRetry(); } public interface Callback { public void onRetry(); } }
package threadpool; import java.lang.reflect.Field; import java.util.Timer; import java.util.TimerTask; public class MyTaskList { private ThreadPoolManager poolManager; private Timer timer; public MyTaskList(ThreadPoolManager poolManager) { this.poolManager = poolManager; timer = new Timer(); } public void addTask(ThreadPoolTask task) { if (task != null) poolManager.addAsyncTask(task); } public void addTask(TimerTask task, long delay) { // 重置TimerTask,不然会发生Exception try { Class<?> clazz = TimerTask.class; Field field = clazz.getDeclaredField("state"); field.setAccessible(true); field.set(task, 0); } catch (Exception e) { } timer.schedule(task, delay); } }
package threadpool; import java.util.TimerTask; import threadpool.ThreadPoolTask.Callback; public class MyTask implements Callback { private MyTaskList list; private ThreadPoolTask task; private String tag; private int retry = 0; public MyTask(MyTaskList list, String tag) { this.list = list; this.tag = tag; } public void start() { task = new ThreadPoolTask(tag, this); start(0); } private void start(int retry) { // 最多重试3次 if (retry >= 3) { System.out.println(tag + " finished " + Thread.currentThread()); return; } doSomething(); this.retry = retry; list.addTask(task); } @Override public void onRetry() { // 重试间隔 list.addTask(timertask, 1000); } private TimerTask timertask = new TimerTask() { @Override public void run() { start(retry + 1); } }; private void doSomething() { System.out.println("Retry[" + retry + "] " + tag + " on " + Thread.currentThread()); } }
上一篇: 定时器的使用
下一篇: (转)[IOS]Timer定时器