用netty Promise机制完成异步IO
程序员文章站
2022-07-02 21:21:00
...
| | |____java
| | | |____org
| | | | |____example
| | | | | |____Main.java(测试)
| | | | | |____GameEventExecutorGroup.java(线程池)
| | | | | |____AsyncIO.java(异步IO的实现)
| | |____java
| | | |____org
| | | | |____example
| | | | | |____Main.java
package org.example;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {
private static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
// 可以设置多个业务线程
DefaultEventExecutor businessThread = new DefaultEventExecutor();
for (int i = 2; i <= 5; i++) {
DefaultPromise<Integer> promise = new DefaultPromise<>(businessThread);
// 在这个回调中,再次回到业务线程
promise.addListener((val) -> {
LOG.info("DB耗时任务执行结束,获得结果{}", val.get());
});
// 相当于DB耗时线程
int finalI = i;
AsyncIO.execute(i, promise, () -> {
LOG.info("DB耗时任务执行中");
try {
Thread.sleep(finalI * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess(finalI);
});
}
// 普通的执行业务逻辑
businessThread.execute(() -> {
LOG.info("普通的执行非耗时任务");
});
}
}
/*
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-3] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,199] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$2(Main.java:38): 普通的执行非耗时任务
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-2] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-4] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:17,198] [gameEventExecutorGroup-3-1] INFO org.example.Main.lambda$main$1(Main.java:25): DB耗时任务执行中
[2021-05-08 14:40:19,210] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果2
[2021-05-08 14:40:20,207] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果3
[2021-05-08 14:40:21,206] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果4
[2021-05-08 14:40:22,206] [defaultEventExecutor-1-1] INFO org.example.Main.lambda$main$0(Main.java:19): DB耗时任务执行结束,获得结果5
*/
| | | | | |____GameEventExecutorGroup.java
package org.example;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
public class GameEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
@SuppressWarnings("rawtypes")
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
public GameEventExecutorGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/
public GameEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
public GameEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, maxPendingTasks, rejectedHandler);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e : children) {
e.terminationFuture().addListener(terminationListener);
}
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
@Override
public EventExecutor next() {
return this.getEventExecutor(childIndex.getAndIncrement());
}
public EventExecutor select(Object selectKey) {
if (selectKey == null) {
throw new IllegalArgumentException("selectKey不能为空");
}
int hashCode = selectKey.hashCode();
return this.getEventExecutor(hashCode);
}
public <T> Future<T> submit(Object selectKey, Callable<T> task) {
return this.select(selectKey).submit(task);
}
public Future<?> submit(Object selectKey, Runnable task) {
return this.select(selectKey).submit(task);
}
public void execute(Object selectKey,Runnable command) {
this.select(selectKey).execute(command);
}
public <V> ScheduledFuture<V> schedule(Object selectKey,Callable<V> callable, long delay, TimeUnit unit) {
return this.select(selectKey).schedule(callable, delay, unit);
}
public ScheduledFuture<?> schedule(Object selectKey,Runnable command, long delay, TimeUnit unit) {
return this.select(selectKey).schedule(command, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Object selectKey,Runnable command, long initialDelay, long period, TimeUnit unit) {
return this.select(selectKey).scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Object selectKey,Runnable command, long initialDelay, long delay, TimeUnit unit) {
return this.select(selectKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
private EventExecutor getEventExecutor(int value) {
if (isPowerOfTwo(this.children.length)) {
return children[value & children.length - 1];
} else {
return children[Math.abs(value % children.length)];
}
}
@Override
public Iterator<EventExecutor> iterator() {
return children().iterator();
}
/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps 1:1
* to the threads it use.
*/
public final int executorCount() {
return children.length;
}
/**
* Return a safe-copy of all of the children of this group.
*/
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
}
/**
* Create a new EventExecutor which will later then accessible via the {@link #next()} method. This
* method will be called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
*/
protected EventExecutor newChild(ThreadFactory threadFactory, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) throws Exception {
return new DefaultEventExecutor(this, threadFactory, maxPendingTasks, rejectedHandler);
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l : children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
public boolean isShuttingDown() {
for (EventExecutor l : children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventExecutor l : children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventExecutor l : children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l : children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
@Override
public void shutdown() {
this.shutdownGracefully();
}
}
| | | | | |____AsyncIO.java
package org.example;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
public class AsyncIO {
private static GameEventExecutorGroup eventExecutorGroup = new GameEventExecutorGroup(Runtime.getRuntime().availableProcessors());
public static <T> void execute(int uniqueKey, Promise<T> promise, Runnable task) {
EventExecutor eventExecutor = eventExecutorGroup.select(uniqueKey);
eventExecutor.execute(() -> {
try {
task.run();
} catch (Exception e) {
promise.setFailure(e);
}
});
}
}
可以看出来,轻松实现了,将DB业务分离到其它线程执行,从而不阻塞业务线程。
上一篇: 超级简单的利用javascript实现文件拖拽事件
下一篇: vue2 动态劫持代理复习