Redis【有与无】【Lettuce】L3.使用Reactive
本文章基于Redis 6.0.9版本,Lettuce 6.0.1.RELEASE版本
目录
13.调度程序(Schedulers)和线程(threads)
1.动机
异步和响应式方法使你可以利用更好的系统资源,而不是浪费线程等待网络或磁盘I/O。 可以充分利用线程来执行其他工作。
存在广泛的技术来促进这种编程风格,从非常有限且不易使用的java.util.concurrent.Future
到完整的库和运行时(如Akka)。Project Reactor具有非常丰富的运算符集,可以组成异步工作流,它对其他框架没有更多的依赖关系,并且支持非常成熟的Reactive Streams模型。
2.了解Reactive Streams
Reactive Streams是一项主动,旨在为具有无阻塞背压的异步流处理提供标准。 这包括针对运行时环境(JVM和JavaScript)以及网络协议的工作。
响应式流的范围是找到最小的接口,方法和协议集,这些接口,方法和协议将描述实现目标所必需的操作和实体-具有无阻塞背压(back pressure)的异步数据流。
它是多个反应性合成库之间的互操作性标准,允许交互而无需在应用程序代码中的库之间进行桥接。
Reactive Streams的集成通常伴随使用组成库,该库将易于使用的API掩盖了Publisher<T>
和Subscriber<T>
类型的复杂性。 Lettuce使用Project Reactor将其发布者公开为Mono
和Flux
。
有关Reactive Streams的更多信息,请参见http://reactive-streams.org。
3.了解Publishers
异步处理将I/O或计算与调用该操作的线程进行解耦。 返回结果的句柄,通常是java.util.concurrent.Future
或类似的东西,它返回单个对象,集合或异常。 检索结果(异步获取)通常不会结束处理一个流。 一旦获得数据,就可以始终或有条件地发出进一步的请求。 使用Java 8或Promise模式,可以设置future的线性链接,以便发出后续的异步请求。 一旦需要条件处理,就必须中断并同步异步流。 尽管这种方法是可行的,但它并未充分利用异步处理的优势。
与前面的示例相比,Publisher<T>
对象以不同的方式回答了多重性和异步问题:通过将Pull
模式转换为Push
模式。
Publisher 是asynchronous/push的synchronous/pull “dual”
event | Iterable (pull) | Publisher (push) |
---|---|---|
retrieve data |
T next() |
onNext(T) |
discover error |
throws Exception |
onError(Exception) |
complete |
!hasNext() |
onCompleted() |
Publisher<T>
支持值甚至是无限流的发射序列,而不仅是单个标量值的发射(如Future那样)。 一旦开始处理流而不是单个值,你将非常感谢这个事实。 Project Reactor的词汇表使用两种类型:Mono
和Flux
,它们都是发布者。
Mono
可以发出0到1个事件,而Flux
可以发出0到N个事件。
Publisher<T>
不会偏向某些特定的并发性或异步性来源,也不会偏向于在ThreadPool
中运行基础代码的执行方式-同步还是异步。 作为Publisher<T>
的消费者,你将实际的实现留给了生产者,生产者可以在以后修改它而无需修改代码。
Publisher<T>
的最后一个关键点是,底层处理不是在获取Publisher<T>
时开始的,而是在观察者订阅或向 Publisher<T>
发出信号的那一刻开始的。 这与java.util.concurrent.Future
至关重要,后者在创建/获取(created/obtained)时在某个地方启动。 因此,如果没有观察者订阅Publisher<T>
,则将不会发生任何事情。
4.Lettuce Reactive API
所有命令都返回订阅者可以订阅的Flux<T>, Mono<T>或Mono<Void>。 该订阅者对Publisher <T>发出的任何项目或项目序列做出反应。 此模式有助于并发操作,因为在等待Publisher<T>发出对象时不需要阻塞。 相反,它以订阅者的形式创建一个哨兵,随时准备在Publisher<T>以后的任何时间做出适当的反应。
5.消费者Publisher <T>
与发布者合作时,你要做的第一件事就是消费它们。 消费发布者意味着订阅它。 这是一个订阅并打印所有发出的项目的示例:
Flux.just("Ben", "Michael", "Mark").subscribe(new Subscriber<String>() {
public void onSubscribe(Subscription s) {
s.request(3);
}
public void onNext(String s) {
System.out.println("Hello " + s + "!");
}
public void onError(Throwable t) {
}
public void onComplete() {
System.out.println("Completed");
}
});
该示例打印以下行:
Hello Ben
Hello Michael
Hello Mark
Completed
你可以看到订阅者(或观察者)收到每个事件的通知,并且还接收到已完成的事件。 Publisher<T>
会发出项目(items),直到引发异常或Publisher<T>
完成调用onCompleted
的发出为止。 在那之后不再发出其他元素。
对subscribe
的调用会注册一个允许取消的Subscription
,因此不会接收其他事件。 一旦订阅者从Publisher<T>
中取消订阅,发布者便可以与取消订阅和免费资源进行互操作。
实现Subscriber<T>
需要实现多种方法,因此让我们将代码重写为更简单的形式:
Flux.just("Ben", "Michael", "Mark").doOnNext(new Consumer<String>() {
public void accept(String s) {
System.out.println("Hello " + s + "!");
}
}).doOnComplete(new Runnable() {
public void run() {
System.out.println("Completed");
}
}).subscribe();
或者,使用Java 8 Lambdas甚至更简单:
Flux.just("Ben", "Michael", "Mark")
.doOnNext(s -> System.out.println("Hello " + s + "!"))
.doOnComplete(() -> System.out.println("Completed"))
.subscribe();
你可以使用运算符控制Subscriber
处理的元素。 如果仅对前N
个元素感兴趣,take()
运算符将限制发射项目的数量。
Flux.just("Ben", "Michael", "Mark") //
.doOnNext(s -> System.out.println("Hello " + s + "!"))
.doOnComplete(() -> System.out.println("Completed"))
.take(2)
.subscribe();
该示例打印以下行:
Hello Ben
Hello Michael
Completed
请注意,一旦发出预期的元素计数,take操作符就会从Publisher<T>隐式取消其订阅。
可以通过另一个Flux或Subscriber来完成对Publisher<T>的订阅。 除非要实现自定义Publisher,否则请始终使用Subscriber。 上例中使用的订阅者Consumer不处理异常,因此一旦引发异常,你将看到如下堆栈跟踪:
Exception in thread "main" reactor.core.Exceptions$BubblingException: java.lang.RuntimeException: Example exception
at reactor.core.Exceptions.bubble(Exceptions.java:96)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:296)
at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:117)
...
Caused by: java.lang.RuntimeException: Example exception
at demos.lambda$example3Lambda$4(demos.java:87)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:157)
... 23 more
始终建议从一开始就实施错误处理程序。 在某些时候,事情可能并且会出错。
完全实现的订阅者声明onCompleted
和onError
方法,使你可以对以下事件作出反应:
Flux.just("Ben", "Michael", "Mark").subscribe(new Subscriber<String>() {
public void onSubscribe(Subscription s) {
s.request(3);
}
public void onNext(String s) {
System.out.println("Hello " + s + "!");
}
public void onError(Throwable t) {
System.out.println("onError: " + e);
}
public void onComplete() {
System.out.println("Completed");
}
});
6.从push到pull
上面的示例说明了如何以一种非阻塞式或非阻塞式执行的方式设置发布者。 Flux<T>
可以显式转换为Iterable<T>
或与block()
同步。 开始表达代码内部执行的本质时,请避免在代码中调用block()
。 调用block()
消除了应用程序反应链(reactive chain)的所有非阻塞优势。
String last = Flux.just("Ben", "Michael", "Mark").last().block();
System.out.println(last);
该示例打印以下行:
Mark
阻塞调用可用于同步发布者链(chain),并找到进入普通且众所周知的Pull
模式的方法。
List<String> list = Flux.just("Ben", "Michael", "Mark").collectList().block();
System.out.println(list);
toList
运算符收集所有发出的元素,并将列表通过BlockingPublisher<T>
传递。
该示例打印以下行:
[Ben, Michael, Mark]
7.使用Lettuce创建Flux
和Mono
建立发布者的方法有很多。 你已经看过just()
,take()
和collectList()
。 有关可用于创建Flux
和Mono
的更多方法,请参考Project Reactor文档。
Lettuce发布者可用于初始和链接(chaining)操作。 使用Lettuce发布者时,你会注意到非阻塞行为。 这是因为所有的I/O和命令处理都是使用netty EventLoop异步处理的。
连接到Redis非常简单:
RedisClient client = RedisClient.create("redis://localhost");
RedisStringReactiveCommands<String, String> commands = client.connect().reactive();
下一步,从键获取值需要GET操作:
commands.get("key").subscribe(new Consumer<String>() {
public void accept(String value) {
System.out.println(value);
}
});
或者,用Java 8 lambdas编写:
commands
.get("key")
.subscribe(value -> System.out.println(value));
执行是异步处理的,并且在Netty EventLoop线程上完成操作时,可以使用调用线程在处理中进行处理。 由于其解耦性质,可以在完成Publisher<T>
的执行之前保留调用方法。
可以在链接(chaining)的上下文中使用Lettuce发布者来异步加载多个键:
Flux.just("Ben", "Michael", "Mark").
flatMap(key -> commands.get(key)).
subscribe(value -> System.out.println("Got value: " + value));
8.冷热发布者
尚未涵盖的发布者之间存在区别:
- 一个冷的发布者会等待订阅,直到它发出值并为每个订阅者重新执行该操作。
- 热的发布者开始预先发出价值,然后将其呈现给每个订阅者。
从Redis Standalone,Redis Cluster和Redis Sentinel API返回的所有发布者都是冷,这意味着在订阅它们之前不会发生任何I/O。 这样保证了订阅者从一开始就可以看到整个序列。 因此,仅创建发布者将不会导致任何网络I/O,因此创建和丢弃发布者很便宜的。 订阅后,为发布/订阅创建的发布者将发出PatternMessage
s和ChannelMessage
s。 发布者保证从头到尾发出所有物品。 尽管这对于发布/订阅发布者是正确的,但是订阅频道/模式的性质由于其订阅性质而不是发布者的热/冷(Hot/Cold)区别,因此允许错过消息。
9.转换发布者
发布者可以以各种方式转换发出的值。 最基本的转换之一是flatMap()
,你可以从上面的示例中看到该转换将输入值转换为另一个值。 另一个是 map()
。 map()
和flatMap()
之间的区别在于,flatMap()
允许你使用Publisher<T>
调用进行这些转换。
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::get)
.flatMap(value -> commands.rpush("result", value))
.subscribe();
第一个flatMap()
函数用于检索值,第二个flatMap()
函数将值追加到名为result
的Redis列表中。flatMap()
函数返回一个Publisher,而普通map 仅返回<T>
。 在处理这样的流程时,你将大量使用flatMap()
,你将成为好朋友。
可以使用reduce()
转换实现值的聚合。 它对Publisher<T>
发出的每个值依次应用一个函数,并依次发出每个后续值。 我们可以使用它来聚合值,以计算多个Redis集中的元素数量:
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::scard)
.reduce((sum, current) -> sum + current)
.subscribe(result -> System.out.println("Number of elements in sets: " + result));
将reduce()
的聚合函数应用于每个发出的值,因此在上面的示例中是三次。 如果要获取最后一个值(表示包含所有Redis集中元素数量的最终结果),请应用last()
转换:
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::scard)
.reduce((sum, current) -> sum + current)
.last()
.subscribe(result -> System.out.println("Number of elements in sets: " + result));
现在,让我们看一下对发出的项目进行分组的方法。 下面的示例发出三个项目,并按开始字符对其进行分组。
Flux.just("Ben", "Michael", "Mark")
.groupBy(key -> key.substring(0, 1))
.subscribe(
groupedFlux -> {
groupedFlux.collectList().subscribe(list -> {
System.out.println("First character: " + groupedFlux.key() + ", elements: " + list);
});
}
);
该示例打印以下行:
First character: B, elements: [Ben]
First character: M, elements: [Michael, Mark]
10.缺失值
值的存在与否是反应式编程的重要组成部分。 传统方法将null
视为缺少特定值。 在Java 8中,引入了Optional<T>
来封装可空性。 Reactive Streams禁止使用null
值。
在Redis的范围内,缺少的值是空列表,不存在的键或任何其他空数据结构。 反应性(Reactive)编程不鼓励使用null作为值。 由于Publisher<T>
从0到N的性质,对缺少值的反应性答案只是不发出任何可能的值。
假设我们有键Ben
和Michael
分别将其设置为值value
。 我们使用以下代码查询那些和另一个缺少的键:
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::get)
.doOnNext(value -> System.out.println(value))
.subscribe();
该示例打印以下行:
value
value
输出只是两个值。 缺少键Mark
的GET
不发出值。
当需要一个值时,响应式API可为操作员提供空结果。 你可以使用以下运算符之一:
-
defaultIfEmpty
: 如果Publisher<T>
根本不发出任何值,则发出默认值 -
switchIfEmpty
: 切换到回调Publisher<T>
以发出值 -
Flux.hasElements
/Flux.hasElement
: 发出包含标志的Mono<Boolean>
是否原始Publisher<T>
为空 -
next
/last
/elementAt
: 位置运算符检索第一个/最后一个/第N个元素或发出默认值
11.筛选(Filtering)项目(items)
如果只需要特定结果,则可以过滤Publisher<T>
发出的值。 过滤不会更改发射的值本身。 筛选器(Filtering )会影响发出多少项目(items)以及在什么时候(如果有的话)发出。
Flux.just("Ben", "Michael", "Mark")
.filter(s -> s.startsWith("M"))
.flatMap(commands::get)
.subscribe(value -> System.out.println("Got value: " + value));
该代码将仅获取Michael
和Mark
的键,而不获取Ben
的键。 筛选条件是key
是否以M
开头。
你已经遇到了last()
过滤器来检索最后一个值:
Flux.just("Ben", "Michael", "Mark")
.last()
.subscribe(value -> System.out.println("Got value: " + value));
last()
的扩展变体允许你采用最后N个值:
Flux.just("Ben", "Michael", "Mark")
.takeLast(3)
.subscribe(value -> System.out.println("Got value: " + value));
上面的示例采用最后两个值。
与next()
相反的是 first()
过滤器,用于检索下一个值:
Flux.just("Ben", "Michael", "Mark")
.next()
.subscribe(value -> System.out.println("Got value: " + value));
12.错误处理
错误处理是每个现实应用程序中必不可少的组件,应从一开始就加以考虑。 Project Reactor提供了几种处理错误的机制。
通常,你希望通过以下方式做出反应:
- 返回默认值
- 使用备份发布者
- 重试发布者(立即或延迟)
在第一个发出的项目引发异常之后,以下代码将恢复为默认值:
Flux.just("Ben", "Michael", "Mark")
.doOnNext(value -> {
throw new IllegalStateException("Takes way too long");
})
.onErrorReturn("Default value")
.subscribe();
你可以使用第一个失败的备份Publisher<T>
。
Flux.just("Ben", "Michael", "Mark")
.doOnNext(value -> {
throw new IllegalStateException("Takes way too long");
})
.switchOnError(commands.get("Default Key"))
.subscribe();
可以通过重新订阅重试发布者(re-subscribing)。 可以尽快或以等待间隔完成重新订阅(Re-subscribing),这在涉及外部资源时是首选的。
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::get)
.retry()
.subscribe();
如果你想使用backoff重试,请使用以下代码:
Flux.just("Ben", "Michael", "Mark")
.doOnNext(v -> {
if (new Random().nextInt(10) + 1 == 5) {
throw new RuntimeException("Boo!");
}
})
.doOnSubscribe(subscription ->
{
System.out.println(subscription);
})
.retryWhen(throwableFlux -> Flux.range(1, 5)
.flatMap(i -> {
System.out.println(i);
return Flux.just(i)
.delay(Duration.of(i, ChronoUnit.SECONDS));
}))
.blockLast();
尝试被传递到retryWhen()
方法中,延迟了要等待的秒数。 一旦计时器完成,就会使用delay方法。
13.调度程序(Schedulers)和线程(threads)
Project Reactor中的调度程序(Schedulers)用于指示多线程(multi-threading)。 某些运算符具有将Scheduler作为参数的变体。 这些指示操作员在特定的调度程序上执行其部分或全部工作。
Project Reactor附带了一组预配置的调度程序(Schedulers),都可以通过Schedulers
类进行访问:
- Schedulers.parallel(): 执行诸如事件循环和回调处理之类的计算工作。
- Schedulers.immediate(): 在当前线程中立即执行工作
- Schedulers.elastic(): 执行I/O绑定的工作,例如阻塞I/O的异步性能,此调度程序由线程池支持,该线程池将根据需要增长
- Schedulers.newSingle(): 在新线程上执行工作
- Schedulers.fromExecutor():
从java.util.concurrent.Executor创建调度程序
- Schedulers.timer(): 创建或重新使用分辨率为50ms的基于hash-wheel的TimedScheduler。
不要将计算调度程序用于I/O。
调度程序可以通过以下几种不同的方式执行发布者:
- 使用利用调度程序的运算符
- 明确地通过将调度程序传递给这样的运算符
- 通过使用
subscribeOn(Scheduler)
- 通过使用
publishOn(Scheduler)
如果没有其他说明,默认情况下,诸如buffer
, replay
, skip
, delay
, parallel
等操作符将使用调度程序。
如果需要,所有列出的运算符都允许你传入自定义调度程序。 大多数时候都使用默认值是一个好主意。
如果希望在特定的调度程序上执行订阅链,请使用subscribeOn()
运算符。 该代码在未设置调度程序的情况下在主线程上执行:
Flux.just("Ben", "Michael", "Mark").flatMap(key -> {
System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
return Flux.just(key);
}
).flatMap(value -> {
System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
return Flux.just(value);
}
).subscribe();
该示例打印以下行:
Map 1: Ben (main)
Map 2: Ben (main)
Map 1: Michael (main)
Map 2: Michael (main)
Map 1: Mark (main)
Map 2: Mark (main)
此示例显示了添加到流中的subscribeOn()
方法(在哪里添加都无所谓):
Flux.just("Ben", "Michael", "Mark").flatMap(key -> {
System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
return Flux.just(key);
}
).flatMap(value -> {
System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
return Flux.just(value);
}
).subscribeOn(Schedulers.parallel()).subscribe();
该示例的输出显示了subscribeOn()
的效果。 你可以看到Publisher在同一线程上执行,但在计算线程池上执行:
Map 1: Ben (parallel-1)
Map 2: Ben (parallel-1)
Map 1: Michael (parallel-1)
Map 2: Michael (parallel-1)
Map 1: Mark (parallel-1)
Map 2: Mark (parallel-1)
如果将相同的代码应用于Lettuce,你将注意到执行第二个flatMap()
的线程有所不同:
Flux.just("Ben", "Michael", "Mark").flatMap(key -> {
System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
return commands.set(key, key);
}).flatMap(value -> {
System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
return Flux.just(value);
}).subscribeOn(Schedulers.parallel()).subscribe();
该示例打印以下行:
Map 1: Ben (parallel-1)
Map 1: Michael (parallel-1)
Map 1: Mark (parallel-1)
Map 2: OK (lettuce-nioEventLoop-3-1)
Map 2: OK (lettuce-nioEventLoop-3-1)
Map 2: OK (lettuce-nioEventLoop-3-1)
与独立示例有两点不同:
- 这些值是同时设置的,而不是顺序设置的
- 第二个
flatMap()
转换输出netty EventLoop线程名称
这是因为默认情况下,Lettuce发布者是在netty EventLoop线程上执行和完成的。
publishOn
指示发布者在特定的Scheduler上调用其观察者的nNext
, onError
和onCompleted
方法。 在这里,顺序很重要:
Flux.just("Ben", "Michael", "Mark").flatMap(key -> {
System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
return commands.set(key, key);
}).publishOn(Schedulers.parallel()).flatMap(value -> {
System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
return Flux.just(value);
}).subscribe();
publishOn()
调用之前的所有操作均在main中执行,而调度器中的以下所有操作均在其中执行:
Map 1: Ben (main)
Map 1: Michael (main)
Map 1: Mark (main)
Map 2: OK (parallel-1)
Map 2: OK (parallel-1)
Map 2: OK (parallel-1)
调度程序(Schedulers)允许直接调度操作。 有关更多信息,请参考Project Reactor文档。
14.Redis 事务(Transactions)
Lettuce提供了一种以反应方式使用Redis Transactions的reactive 方法。 可以在执行MULTI
命令之后执行应在事务内执行的命令。 功能链允许在闭包内执行命令,并且每个命令都会收到其适当的响应。 TransactionResult
还返回累积响应,以响应EXEC
。
有关更多详细信息,请参见事务。
15.其他例子
15.1.阻塞例子
RedisStringReactiveCommands<String, String> reactive = client.connect().reactive();
Mono<String> set = reactive.set("key", "value");
set.block();
15.2.非阻塞例子
RedisStringReactiveCommands<String, String> reactive = client.connect().reactive();
Mono<String> set = reactive.set("key", "value");
set.subscribe();
15.3.功能链(Functional chaining)
RedisStringReactiveCommands<String, String> reactive = client.connect().reactive();
Flux.just("Ben", "Michael", "Mark")
.flatMap(key -> commands.sadd("seen", key))
.flatMap(value -> commands.randomkey())
.flatMap(commands::type)
.doOnNext(System.out::println).subscribe();
15.4.Redis Transaction
RedisReactiveCommands<String, String> reactive = client.connect().reactive();
reactive.multi().doOnSuccess(s -> {
reactive.set("key", "1").doOnNext(s1 -> System.out.println(s1)).subscribe();
reactive.incr("key").doOnNext(s1 -> System.out.println(s1)).subscribe();
}).flatMap(s -> reactive.exec())
.doOnNext(transactionResults -> System.out.println(transactionResults.wasRolledBack()))
.subscribe();