欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

深刻理解Netty异步原理高仿Promise机制

程序员文章站 2022-07-02 21:20:48
...

Callback.java

package async;

@FunctionalInterface
public interface Callback<T> {
    void call(T t);
}
ChannelFuture.java
package async;

public interface ChannelFuture<IN, OUT> {
    void addListener(int hashId, Task<IN, OUT> task, IN input, Callback<OUT> callback);

    static <T, R> ChannelFuture<T, R> newPromise() {
        return new ChannelFutureImpl<>();
    }
}
ChannelFutureImpl.java
package async;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ChannelFutureImpl<IN, OUT> implements ChannelFuture<IN, OUT> {

    private final static ExecutorService[] _esArray = new ExecutorService[Runtime.getRuntime().availableProcessors()];

    static {
        for (int i = 0; i < _esArray.length; i++) {
            final String threadName = "Processor_" + i;
            _esArray[i] = Executors.newSingleThreadExecutor((newRunnable) -> {
                Thread newThread = new Thread(newRunnable);
                newThread.setName(threadName);
                return newThread;
            });
        }
    }

    @Override
    public void addListener(int hashId, Task<IN, OUT> task, IN input, Callback<OUT> callback) {
        final FutureTask<OUT> future = new FutureTask<>();
        if (hashId < 0) {
            hashId = 0;
        }
        if (hashId >= _esArray.length) {
            hashId = _esArray.length - 1;
        }

        _esArray[hashId].execute(() -> {
            // 阻塞的在背的线层等待执行完毕
            OUT result = task.get(input);
            future.setSuccess(result);

            // 将结果回调过去
            if (null != callback) {
                callback.call(result);
            }
        });
    }
}
Future.java
package async;

public interface Future<T>
{
    T get() throws InterruptedException;
}

FutureTask.java

package async;

public class FutureTask<T> implements Future<T> {
    private T result;
    private boolean isDone = false;
    private final Object LOCK = new Object();

    @Override
    public T get() throws InterruptedException {
        synchronized (LOCK) {
            while (!isDone) {
                LOCK.wait();
            }

            return result;
        }
    }

    protected void setSuccess(T result) {
        synchronized (LOCK) {
            if (isDone)
                return;

            this.result = result;
            this.isDone = true;
            LOCK.notifyAll();
        }
    }
}
Task.java
package async;

public interface Task<IN, OUT>
{
    OUT get(IN input);
}

测试

package async;

import java.util.concurrent.TimeUnit;

public class FutureTest {
    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 30; i++) {
            ChannelFuture<String, String> future = ChannelFuture.newPromise();
            future.addListener(i % Runtime.getRuntime().availableProcessors(), input -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return input + " World";

            }, "Hello", (ret) -> {
                System.out.println(ret + ":" + Thread.currentThread().getName());
            });
        }

    }
}

/*
Hello World:Processor_7
Hello World:Processor_1
Hello World:Processor_4
Hello World:Processor_2
Hello World:Processor_6
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_5
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_7
Hello World:Processor_6
Hello World:Processor_1
Hello World:Processor_5
Hello World:Processor_2
Hello World:Processor_4
Hello World:Processor_3
Hello World:Processor_2
Hello World:Processor_6
Hello World:Processor_5
Hello World:Processor_4
Hello World:Processor_7
Hello World:Processor_1
Hello World:Processor_0
Hello World:Processor_3
Hello World:Processor_0
Hello World:Processor_5
Hello World:Processor_4
Hello World:Processor_2
Hello World:Processor_1
 */

相关标签: # java并发编程

上一篇: axios使用

下一篇: axios使用