深刻理解Netty异步原理高仿Promise机制
程序员文章站
2022-07-02 21:20:48
...
Callback.java
package async;
@FunctionalInterface
public interface Callback<T> {
void call(T t);
}
ChannelFuture.java
package async;
public interface ChannelFuture<IN, OUT> {
void addListener(int hashId, Task<IN, OUT> task, IN input, Callback<OUT> callback);
static <T, R> ChannelFuture<T, R> newPromise() {
return new ChannelFutureImpl<>();
}
}
ChannelFutureImpl.java
package async;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChannelFutureImpl<IN, OUT> implements ChannelFuture<IN, OUT> {
private final static ExecutorService[] _esArray = new ExecutorService[Runtime.getRuntime().availableProcessors()];
static {
for (int i = 0; i < _esArray.length; i++) {
final String threadName = "Processor_" + i;
_esArray[i] = Executors.newSingleThreadExecutor((newRunnable) -> {
Thread newThread = new Thread(newRunnable);
newThread.setName(threadName);
return newThread;
});
}
}
@Override
public void addListener(int hashId, Task<IN, OUT> task, IN input, Callback<OUT> callback) {
final FutureTask<OUT> future = new FutureTask<>();
if (hashId < 0) {
hashId = 0;
}
if (hashId >= _esArray.length) {
hashId = _esArray.length - 1;
}
_esArray[hashId].execute(() -> {
// 阻塞的在背的线层等待执行完毕
OUT result = task.get(input);
future.setSuccess(result);
// 将结果回调过去
if (null != callback) {
callback.call(result);
}
});
}
}
Future.java
package async;
public interface Future<T>
{
T get() throws InterruptedException;
}
FutureTask.java
package async;
public class FutureTask<T> implements Future<T> {
private T result;
private boolean isDone = false;
private final Object LOCK = new Object();
@Override
public T get() throws InterruptedException {
synchronized (LOCK) {
while (!isDone) {
LOCK.wait();
}
return result;
}
}
protected void setSuccess(T result) {
synchronized (LOCK) {
if (isDone)
return;
this.result = result;
this.isDone = true;
LOCK.notifyAll();
}
}
}
Task.java
package async;
public interface Task<IN, OUT>
{
OUT get(IN input);
}
测试
package async;
import java.util.concurrent.TimeUnit;
public class FutureTest {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 30; i++) {
ChannelFuture<String, String> future = ChannelFuture.newPromise();
future.addListener(i % Runtime.getRuntime().availableProcessors(), input -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return input + " World";
}, "Hello", (ret) -> {
System.out.println(ret + ":" + Thread.currentThread().getName());
});
}
}
}
/*
Hello World:Processor_7
Hello World:Processor_1
Hello World:Processor_4
Hello World:Processor_2
Hello World:Processor_6
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_5
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_7
Hello World:Processor_6
Hello World:Processor_1
Hello World:Processor_5
Hello World:Processor_2
Hello World:Processor_4
Hello World:Processor_3
Hello World:Processor_2
Hello World:Processor_6
Hello World:Processor_5
Hello World:Processor_4
Hello World:Processor_7
Hello World:Processor_1
Hello World:Processor_0
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_5
Hello World:Processor_4
Hello World:Processor_2
Hello World:Processor_1
*/