欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java实现FIFO任务调度队列策略

程序员文章站 2022-06-23 22:52:19
目录前言fifo任务调度器架构示例代码前言在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟...

前言

在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个fifo(先进先出调度器),这也是常见的一种调度方式。

fifo任务调度器架构

主要实现的逻辑可以归纳为:

1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。

2、如果任务无法获得资源,则将任务塞回队列原位置。

示例代码

maven依赖如下:

      	<dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <optional>true</optional>
        </dependency>
                <dependency>
            <groupid>cn.hutool</groupid>
            <artifactid>hutool-all</artifactid>
            <version>5.5.2</version>
        </dependency>

具体的原理就不细说了,通过代码我们看看fifo任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列linkedblockingdeque。后面的代码说明会提到。

package ai.guiji.csdn.dispatch;

import cn.hutool.core.thread.threadutil;
import lombok.builder;
import lombok.data;
import lombok.extern.slf4j.slf4j;
import org.springframework.scheduling.concurrent.customizablethreadfactory;

import java.util.random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.atomicinteger;
import java.util.stream.intstream;

/**
 * @program: csdn @classname: fifodemo @author: 剑客阿良_aliang @date: 2021-12-24 21:21 @description:
 * fifo队列 @version: v1.0
 */
@slf4j
public class fifodemo {
  private static final linkedblockingdeque<task> task_queue = new linkedblockingdeque<>();
  private static final concurrenthashmap<integer, linkedblockingqueue<resource>> resource_map =
      new concurrenthashmap<>();
  private static final executorservice task_pool =
      new threadpoolexecutor(
          8,
          16,
          0l,
          timeunit.milliseconds,
          new linkedblockingqueue<>(),
          new customizablethreadfactory("task-thread-"),
          new threadpoolexecutor.abortpolicy());
  private static final scheduledexecutorservice engine_pool =
      executors.newsinglethreadscheduledexecutor(new customizablethreadfactory("engine-"));
  private static final atomicinteger code_builder = new atomicinteger(0);

  @data
  @builder
  private static class resource {
    private integer rid;
    private type type;
  }

  @data
  @builder
  private static class task implements runnable {
    private integer tid;
    private runnable work;
    private type type;
    private resource resource;

    @override
    public void run() {
      log.info("[{}]任务,使用资源编号:[{}]", tid, resource.getrid());
      try {
        work.run();
      } catch (exception exception) {
        exception.printstacktrace();
      } finally {
        log.info("[{}]任务结束,回归资源", tid);
        returnresource(resource);
      }
    }
  }

  private enum type {
    /** 资源类型 */
    a("a资源", 1),
    b("b资源", 2),
    c("c资源", 3);

    private final string desc;
    private final integer code;

    type(string desc, integer code) {
      this.desc = desc;
      this.code = code;
    }

    public string getdesc() {
      return desc;
    }

    public integer getcode() {
      return code;
    }
  }

  public static void initresource() {
    random random = new random();
    int acount = random.nextint(10) + 1;
    int bcount = random.nextint(10) + 1;
    int ccount = random.nextint(10) + 1;
    resource_map.put(type.a.getcode(), new linkedblockingqueue<>());
    resource_map.put(type.b.getcode(), new linkedblockingqueue<>());
    resource_map.put(type.c.getcode(), new linkedblockingqueue<>());
    intstream.rangeclosed(1, acount)
        .foreach(
            a ->
                resource_map
                    .get(type.a.getcode())
                    .add(resource.builder().rid(a).type(type.a).build()));
    intstream.rangeclosed(1, bcount)
        .foreach(
            a ->
                resource_map
                    .get(type.b.getcode())
                    .add(resource.builder().rid(a).type(type.b).build()));
    intstream.rangeclosed(1, ccount)
        .foreach(
            a ->
                resource_map
                    .get(type.c.getcode())
                    .add(resource.builder().rid(a).type(type.c).build()));
    log.info("初始化资源a数量:{},资源b数量:{},资源c数量:{}", acount, bcount, ccount);
  }

  public static resource extractresource(type type) {
    return resource_map.get(type.getcode()).poll();
  }

  public static void returnresource(resource resource) {
    log.info("开始归还资源,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());
    resource_map.get(resource.gettype().code).add(resource);
    log.info("归还资源完成,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());
  }

  public static void engindo() {
    engine_pool.scheduleatfixedrate(
        () -> {
          task task = task_queue.poll();
          if (task == null) {
            log.info("任务队列为空,无需要执行的任务");
          } else {
            resource resource = extractresource(task.gettype());
            if (resource == null) {
              log.info("[{}]任务无法获取[{}],返回队列", task.gettid(), task.gettype().getdesc());
              task_queue.addfirst(task);
            } else {
              task.setresource(resource);
              task_pool.submit(task);
            }
          }
        },
        0,
        1,
        timeunit.seconds);
  }

  public static void addtask(runnable runnable, type type) {
    integer tid = code_builder.incrementandget();
    task task = task.builder().tid(tid).type(type).work(runnable).build();
    log.info("提交任务[{}]到任务队列", tid);
    task_queue.add(task);
  }

  public static void main(string[] args) {
    initresource();
    engindo();
    random random = new random();
    threadutil.sleep(5000);
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.a));
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.b));
    intstream.range(0, 10)
        .foreach(
            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.seconds), type.c));
  }
}

代码说明:

1、首先我们构造了任务队列,使用的是linkedblockingdeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到队首,保证任务的有序性。

2、使用concurrenthashmap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。

3、其中实现了添加任务、提取资源、回归资源几个方法。

4、initresource方法可以初始化资源队列,这里面只是简单的随机了几个资源到a、b、c三种资源,塞入各类别队列。

5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。

6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。

执行结果

Java实现FIFO任务调度队列策略

Java实现FIFO任务调度队列策略

Java实现FIFO任务调度队列策略

我们可以通过结果发现任务有序调度,使用完任务后回归队列。 

以上就是java实现fifo任务调度队列策略的详细内容,更多关于java fifo任务调度的资料请关注其它相关文章!

相关标签: Java FIFO