此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Data MongoDB 5.0.4spring-doc.cadn.net.cn

变更流

从 MongoDB 3.6 开始,变更流(Change Streams) 允许应用程序在无需跟踪操作日志(oplog)的情况下接收变更通知。spring-doc.cadn.net.cn

变更流(Change Stream)支持仅适用于副本集(replica sets)或分片集群(sharded cluster)。

变更流(Change Streams)既可以使用 MongoDB Java 驱动程序的命令式(imperative)方式,也可以使用响应式(reactive)方式来消费。强烈建议使用响应式变体,因为它对资源的消耗更少。然而,如果您无法使用响应式 API,仍然可以通过 Spring 生态系统中已广泛采用的消息传递机制来获取变更事件。spring-doc.cadn.net.cn

既可以监听集合级别的变更,也可以监听数据库级别的变更,其中数据库级别的变体会发布该数据库内所有集合的变更。在订阅数据库变更流时,请确保为事件类型使用合适的类型,因为跨不同实体类型时,转换可能无法正确应用。 如有疑问,请使用 Documentspring-doc.cadn.net.cn

使用变更流MessageListener

监听 使用同步驱动程序更改流 会创建一个需要委托给独立组件的长时间运行的阻塞任务。 在这种情况下,我们首先需要创建一个 MessageListenerContainer,它将作为运行特定 SubscriptionRequest 任务的主要入口点。 Spring Data MongoDB 已自带一个默认实现,该实现在 MongoTemplate 上运行,并能够为 ChangeStreamRequest 创建和运行 Task 实例。spring-doc.cadn.net.cn

以下示例展示了如何将变更流(Change Streams)与 MessageListener 实例一起使用:spring-doc.cadn.net.cn

示例 1. 使用 MessageListener 实例的变更流
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 启动容器会初始化资源,并为已注册的 Task 实例启动相应的 SubscriptionRequest 实例。在容器启动后添加的请求将立即执行。
2 定义在接收到 Message 时调用的监听器。Message#getBody() 将被转换为所请求的领域类型。使用 Document 可以接收未经转换的原始结果。
3 设置要监听的集合,并通过 ChangeStreamOptions 提供额外的选项。
4 注册该请求。返回的 Subscription 可用于检查当前 Task 的状态,并可取消该任务以释放资源。
5 一旦确定不再需要该容器,请务必将其停止。这样做会停止容器内所有正在运行的Task实例。

DefaultMessageListenerContainer 实现了 SmartLifecycle 接口,默认情况下,当其作为 Bean 注册到应用上下文中时会自动启动。spring-doc.cadn.net.cn

处理过程中出现的错误会被传递到org.springframework.util.ErrorHandler. 除非另有说明,否则为日志追加模式ErrorHandler默认应用。
请使用register(request, body, errorHandler)提供额外功能。spring-doc.cadn.net.cn

响应式变更流

使用响应式 API 订阅变更流(Change Streams)是处理流数据更为自然的方式。不过,诸如 ChangeStreamOptions 等核心构建模块仍然保持不变。以下示例展示了如何使用变更流来发出 ChangeStreamEvent 事件:spring-doc.cadn.net.cn

示例 2. 发射 ChangeStreamEvent 的变更流
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 事件目标类型,用于指定底层文档应转换成的类型。若省略此项,则接收未经转换的原始结果。
2 使用聚合管道或仅使用查询 Criteria 来过滤事件。
3 获取一个变更流事件的 Flux。其中,ChangeStreamEvent#getBody() 会从 (2) 转换为所请求的领域类型。

恢复变更流

更改流可以恢复,并从您离开的地方继续发出事件。要恢复流,您需要提供恢复Tokens或最后已知的服务器时间(UTC)。使用 ChangeStreamOptions 来相应地设置值。spring-doc.cadn.net.cn

以下示例展示了如何使用服务器时间设置恢复偏移量:spring-doc.cadn.net.cn

示例 3. 恢复变更流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1 您可以通过 ChangeStreamEvent 方法获取 getTimestamp 的服务器时间,或使用通过 resumeToken 暴露的 getResumeToken
在某些情况下,使用 Instant 可能不足以精确地恢复变更流(Change Stream)。此时应使用 MongoDB 原生的 BsonTimestamp