Java实现FIFO任务调度队列策略
前言
在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个fifo(先进先出调度器),这也是常见的一种调度方式。
fifo任务调度器架构
主要实现的逻辑可以归纳为:
1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。
2、如果任务无法获得资源,则将任务塞回队列原位置。
示例代码
maven依赖如下:
<dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <optional>true</optional> </dependency> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-all</artifactid> <version>5.5.2</version> </dependency>
具体的原理就不细说了,通过代码我们看看fifo任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列linkedblockingdeque。后面的代码说明会提到。
package ai.guiji.csdn.dispatch; import cn.hutool.core.thread.threadutil; import lombok.builder; import lombok.data; import lombok.extern.slf4j.slf4j; import org.springframework.scheduling.concurrent.customizablethreadfactory; import java.util.random; import java.util.concurrent.*; import java.util.concurrent.atomic.atomicinteger; import java.util.stream.intstream; /** * @program: csdn @classname: fifodemo @author: 剑客阿良_aliang @date: 2021-12-24 21:21 @description: * fifo队列 @version: v1.0 */ @slf4j public class fifodemo { private static final linkedblockingdeque<task> task_queue = new linkedblockingdeque<>(); private static final concurrenthashmap<integer, linkedblockingqueue<resource>> resource_map = new concurrenthashmap<>(); private static final executorservice task_pool = new threadpoolexecutor( 8, 16, 0l, timeunit.milliseconds, new linkedblockingqueue<>(), new customizablethreadfactory("task-thread-"), new threadpoolexecutor.abortpolicy()); private static final scheduledexecutorservice engine_pool = executors.newsinglethreadscheduledexecutor(new customizablethreadfactory("engine-")); private static final atomicinteger code_builder = new atomicinteger(0); @data @builder private static class resource { private integer rid; private type type; } @data @builder private static class task implements runnable { private integer tid; private runnable work; private type type; private resource resource; @override public void run() { log.info("[{}]任务,使用资源编号:[{}]", tid, resource.getrid()); try { work.run(); } catch (exception exception) { exception.printstacktrace(); } finally { log.info("[{}]任务结束,回归资源", tid); returnresource(resource); } } } private enum type { /** 资源类型 */ a("a资源", 1), b("b资源", 2), c("c资源", 3); private final string desc; private final integer code; type(string desc, integer code) { this.desc = desc; this.code = code; } public string getdesc() { return desc; } public integer getcode() { return code; } } public static void initresource() { random random = new random(); int acount = random.nextint(10) + 1; int bcount = random.nextint(10) + 1; int ccount = random.nextint(10) + 1; resource_map.put(type.a.getcode(), new linkedblockingqueue<>()); resource_map.put(type.b.getcode(), new linkedblockingqueue<>()); resource_map.put(type.c.getcode(), new linkedblockingqueue<>()); intstream.rangeclosed(1, acount) .foreach( a -> resource_map .get(type.a.getcode()) .add(resource.builder().rid(a).type(type.a).build())); intstream.rangeclosed(1, bcount) .foreach( a -> resource_map .get(type.b.getcode()) .add(resource.builder().rid(a).type(type.b).build())); intstream.rangeclosed(1, ccount) .foreach( a -> resource_map .get(type.c.getcode()) .add(resource.builder().rid(a).type(type.c).build())); log.info("初始化资源a数量:{},资源b数量:{},资源c数量:{}", acount, bcount, ccount); } public static resource extractresource(type type) { return resource_map.get(type.getcode()).poll(); } public static void returnresource(resource resource) { log.info("开始归还资源,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc()); resource_map.get(resource.gettype().code).add(resource); log.info("归还资源完成,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc()); } public static void engindo() { engine_pool.scheduleatfixedrate( () -> { task task = task_queue.poll(); if (task == null) { log.info("任务队列为空,无需要执行的任务"); } else { resource resource = extractresource(task.gettype()); if (resource == null) { log.info("[{}]任务无法获取[{}],返回队列", task.gettid(), task.gettype().getdesc()); task_queue.addfirst(task); } else { task.setresource(resource); task_pool.submit(task); } } }, 0, 1, timeunit.seconds); } public static void addtask(runnable runnable, type type) { integer tid = code_builder.incrementandget(); task task = task.builder().tid(tid).type(type).work(runnable).build(); log.info("提交任务[{}]到任务队列", tid); task_queue.add(task); } public static void main(string[] args) { initresource(); engindo(); random random = new random(); threadutil.sleep(5000); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.a)); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.b)); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.c)); } }
代码说明:
1、首先我们构造了任务队列,使用的是linkedblockingdeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到队首,保证任务的有序性。
2、使用concurrenthashmap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。
3、其中实现了添加任务、提取资源、回归资源几个方法。
4、initresource方法可以初始化资源队列,这里面只是简单的随机了几个资源到a、b、c三种资源,塞入各类别队列。
5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。
6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。
执行结果
我们可以通过结果发现任务有序调度,使用完任务后回归队列。
以上就是java实现fifo任务调度队列策略的详细内容,更多关于java fifo任务调度的资料请关注其它相关文章!
上一篇: Java中的递归方法示例介绍
下一篇: python面向对象之类的继承详解