|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Data MongoDB 5.0.4! |
可尾随游标
默认情况下,当客户端耗尽游标提供的所有结果后,MongoDB 会自动关闭该游标。 在结果耗尽时关闭游标会将流转换为有限流。对于固定集合(capped collections), 您可以使用可尾随游标(Tailable Cursor),它在客户端消费完最初返回的所有数据后仍保持打开状态。
可以使用 MongoOperations.createCollection 创建固定集合(Capped collections)。为此,请提供所需的 CollectionOptions.empty().capped()…。 |
尾随游标(Tailable cursors)既可以与命令式(imperative)MongoDB API 配合使用,也可以与响应式(reactive)MongoDB API 配合使用。强烈建议使用响应式变体,因为它对资源的消耗更少。然而,如果您无法使用响应式 API,仍然可以采用 Spring 生态系统中已广泛使用的消息传递机制。
带有尾随游标的MessageListener
使用同步驱动程序监听一个有上限的集合(capped collection)会创建一个长时间运行且阻塞的任务,该任务需要委派给一个独立的组件来处理。在这种情况下,我们首先需要创建一个 MessageListenerContainer,它将成为执行特定 SubscriptionRequest 的主要入口点。Spring Data MongoDB 已经提供了一个默认实现,该实现基于 MongoTemplate,并能够为 Task 创建和运行 TailableCursorRequest 实例。
以下示例展示了如何在 MessageListener 实例中使用可尾随游标(tailable cursors):
MessageListener 实例的可尾随游标(Tailable Cursors)MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<Document, User> listener = System.out::println; (2)
TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") (3)
.filter(query(where("value").lt(100))) (4)
.publishTo(listener) (5)
.build();
container.register(request, User.class); (6)
// ...
container.stop(); (7)
| 1 | 启动容器会初始化资源,并为已注册的 Task 实例启动相应的 SubscriptionRequest 实例。在启动之后添加的请求将立即执行。 |
| 2 | 定义在接收到 Message 时调用的监听器。Message#getBody() 将被转换为所请求的领域类型。使用 Document 可以接收未经转换的原始结果。 |
| 3 | 设置要监听的集合。 |
| 4 | 提供一个可选的过滤器,用于接收文档。 |
| 5 | 设置消息监听器,用于发布传入的Message。 |
| 6 | 注册该请求。返回的 Subscription 可用于检查当前 Task 的状态,并可取消该任务以释放资源。 |
| 7 | 一旦确定不再需要该容器,请务必将其停止。这样做会停止容器内所有正在运行的Task实例。 |
响应式可滚动游标
将可追踪游标(tailable cursors)与响应式数据类型结合使用,可以构建无限数据流。可追踪游标会一直保持打开状态,直到被外部显式关闭。当新的文档进入固定集合(capped collection)时,它会发出这些数据。
如果查询未返回任何匹配结果,或者游标返回了集合“末尾”的文档后应用程序又删除了该文档,则可追踪游标(tailable cursors)可能会变为失效或无效。以下示例展示了如何创建并使用无限流查询:
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB 的响应式仓库通过使用 @Tailable 注解查询方法来支持无限流。这种方式适用于返回 Flux 以及其他能够发射多个元素的响应式类型的查询方法,如下例所示:
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();