欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

用netty Promise机制完成异步IO

程序员文章站 2022-07-02 21:21:00
...

| | |____java
| | | |____org
| | | | |____example
| | | | | |____Main.java(测试)
| | | | | |____GameEventExecutorGroup.java(线程池)
| | | | | |____AsyncIO.java(异步IO的实现)
 

 

| | |____java
| | | |____org
| | | | |____example
| | | | | |____Main.java

package org.example;

import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main {
    private static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {
        // 可以设置多个业务线程
        DefaultEventExecutor businessThread = new DefaultEventExecutor();

        for (int i = 2; i <= 5; i++) {
            DefaultPromise<Integer> promise = new DefaultPromise<>(businessThread);

            // 在这个回调中,再次回到业务线程
            promise.addListener((val) -> {
                LOG.info("DB耗时任务执行结束,获得结果{}", val.get());
            });

            // 相当于DB耗时线程
            int finalI = i;
            AsyncIO.execute(i, promise, () -> {
                LOG.info("DB耗时任务执行中");
                try {
                    Thread.sleep(finalI * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                promise.setSuccess(finalI);
            });
        }

        // 普通的执行业务逻辑
        businessThread.execute(() -> {
            LOG.info("普通的执行非耗时任务");
        });
    }
}

/*
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-3] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,199] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$2(Main.java:38): 普通的执行非耗时任务
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-2] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-4] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-1] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:19,210] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果2
[2021-05-08 14:40:20,207] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果3
[2021-05-08 14:40:21,206] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果4
[2021-05-08 14:40:22,206] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果5
 */

| | | | | |____GameEventExecutorGroup.java

package org.example;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;


public class GameEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
    private final AtomicInteger childIndex = new AtomicInteger();
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    @SuppressWarnings("rawtypes")
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));


    public GameEventExecutorGroup(int nThreads) {
        this(nThreads, null);
    }

    /**
     * Create a new instance.
     *
     * @param nThreads the number of threads that will be used by this instance.
     * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
     */
    public GameEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
    }

    /**
     * Create a new instance.
     *
     * @param nThreads the number of threads that will be used by this instance.
     * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
     * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
     */
    public GameEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }
        children = new SingleThreadEventExecutor[nThreads];
        for (int i = 0; i < nThreads; i++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, maxPendingTasks, rejectedHandler);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e : children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }
    @Override
    public EventExecutor next() {
        return this.getEventExecutor(childIndex.getAndIncrement());
    }
    public EventExecutor select(Object selectKey) {
        if (selectKey == null) {
            throw new IllegalArgumentException("selectKey不能为空");
        }
        int hashCode = selectKey.hashCode();
        return this.getEventExecutor(hashCode);
    }

    public <T> Future<T> submit(Object selectKey, Callable<T> task) {
        return this.select(selectKey).submit(task);
    }
    public Future<?> submit(Object selectKey, Runnable task) {
        return this.select(selectKey).submit(task);
    }
    public void execute(Object selectKey,Runnable command) {
        this.select(selectKey).execute(command);
    }
    public <V> ScheduledFuture<V> schedule(Object selectKey,Callable<V> callable, long delay, TimeUnit unit) {
        return this.select(selectKey).schedule(callable, delay, unit);
    }
    public ScheduledFuture<?> schedule(Object selectKey,Runnable command, long delay, TimeUnit unit) {
        return this.select(selectKey).schedule(command, delay, unit);
    }
    public ScheduledFuture<?> scheduleAtFixedRate(Object selectKey,Runnable command, long initialDelay, long period, TimeUnit unit) {
        return this.select(selectKey).scheduleAtFixedRate(command, initialDelay, period, unit);
    }
    public ScheduledFuture<?> scheduleWithFixedDelay(Object selectKey,Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return this.select(selectKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    private EventExecutor getEventExecutor(int value) {
        if (isPowerOfTwo(this.children.length)) {
            return children[value & children.length - 1];
        } else {
            return children[Math.abs(value % children.length)];
        }
    }

    @Override
    public Iterator<EventExecutor> iterator() {
        return children().iterator();
    }

    /**
     * Return the number of {@link EventExecutor} this implementation uses. This number is the maps 1:1
     * to the threads it use.
     */
    public final int executorCount() {
        return children.length;
    }

    /**
     * Return a safe-copy of all of the children of this group.
     */
    protected Set<EventExecutor> children() {
        Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
        Collections.addAll(children, this.children);
        return children;
    }

    /**
     * Create a new EventExecutor which will later then accessible via the {@link #next()} method. This
     * method will be called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
     *
     */
    protected EventExecutor newChild(ThreadFactory threadFactory, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) throws Exception {
        return new DefaultEventExecutor(this, threadFactory, maxPendingTasks, rejectedHandler);
    }

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l : children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }
    @Override
    public boolean isShuttingDown() {
        for (EventExecutor l : children) {
            if (!l.isShuttingDown()) {
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean isShutdown() {
        for (EventExecutor l : children) {
            if (!l.isShutdown()) {
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean isTerminated() {
        for (EventExecutor l : children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        loop: for (EventExecutor l : children) {
            for (;;) {
                long timeLeft = deadline - System.nanoTime();
                if (timeLeft <= 0) {
                    break loop;
                }
                if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                    break;
                }
            }
        }
        return isTerminated();
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    @Override
    public void shutdown() {
        this.shutdownGracefully();
    }
}

| | | | | |____AsyncIO.java
 

package org.example;

import io.netty.util.concurrent.EventExecutor;

import io.netty.util.concurrent.Promise;

public class AsyncIO {
    private static GameEventExecutorGroup eventExecutorGroup = new GameEventExecutorGroup(Runtime.getRuntime().availableProcessors());

    public static <T> void execute(int uniqueKey, Promise<T> promise, Runnable task) {
        EventExecutor eventExecutor = eventExecutorGroup.select(uniqueKey);
        eventExecutor.execute(() -> {
            try {
                task.run();
            } catch (Exception e) {
                promise.setFailure(e);
            }
        });
    }
}

可以看出来,轻松实现了,将DB业务分离到其它线程执行,从而不阻塞业务线程。

 

 

 

 

相关标签: # IO异步处理