更改流

从 MongoDB 3.6 开始,Change Streams 允许应用程序获得有关更改的通知,而无需跟踪 oplog。spring-doc.cadn.net.cn

更改流支持仅适用于副本集或分片集群。

Change Streams 可以与命令式和响应式 MongoDB Java 驱动程序一起使用。强烈建议使用响应式变体,因为它对资源的占用较少。但是,如果您无法使用响应式 API,您仍然可以使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。spring-doc.cadn.net.cn

可以在集合级别和数据库级别监视,而数据库级别变体发布 数据库内所有集合的更改。订阅数据库更改流时,请确保使用 适合事件类型的类型,因为转换可能无法正确应用于不同的实体类型。 如有疑问,请使用Document.spring-doc.cadn.net.cn

更改流MessageListener

使用同步驱动程序侦听更改流会创建一个长时间运行的阻塞任务,该任务需要委托给单独的组件。 在这种情况下,我们需要首先创建一个MessageListenerContainer这将是运行特定SubscriptionRequest任务。 Spring Data MongoDB 已经附带了一个默认实现,该实现在MongoTemplate并且能够创建和运行Task实例ChangeStreamRequest.spring-doc.cadn.net.cn

以下示例演示如何将更改流与MessageListener实例:spring-doc.cadn.net.cn

示例 1.更改流MessageListener实例
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 启动容器会初始化资源并启动Task已注册的实例SubscriptionRequest实例。启动后添加的请求会立即运行。
2 定义当Message收到。这Message#getBody()转换为请求的域类型。用Document接收原始结果而不进行转换。
3 设置要收听的集合,并通过以下方式提供附加选项ChangeStreamOptions.
4 注册请求。返回的Subscription可用于检查电流Task状态并取消它以释放资源。
5 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止所有运行Task容器内的实例。

处理过程中的错误将传递给org.springframework.util.ErrorHandler.如果未说明,则日志附加ErrorHandler默认应用。
请使用
register(request, body, errorHandler)以提供附加功能。spring-doc.cadn.net.cn

响应式更改流

使用响应式 API 订阅 Change Streams 是一种更自然的流处理方法。尽管如此,基本的构建块,例如ChangeStreamOptions,保持不变。以下示例演示如何使用 Change Streams 发出ChangeStreamEvents:spring-doc.cadn.net.cn

示例 2.更改流发出ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 基础文档应转换为的事件目标类型。将此选项排除在外,无需转换即可接收原始结果。
2 使用聚合管道或仅使用查询Criteria以过滤事件。
3 获取一个Flux的变化流事件。这ChangeStreamEvent#getBody()从 (2) 转换为请求的域类型。

恢复更改流

可以恢复更改流,并在离开的地方恢复发出事件。要恢复流,您需要提供简历 Tokens或最后已知的服务器时间(以 UTC 为单位)。用ChangeStreamOptions以相应地设置值。spring-doc.cadn.net.cn

以下示例演示如何使用服务器时间设置恢复偏移量:spring-doc.cadn.net.cn

示例 3.恢复更改流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1 您可以获取ChangeStreamEvent通过getTimestamp方法或使用resumeToken通过getResumeToken.
在某些情况下,一个Instant在恢复更改流时可能不够精确。为此,请使用 MongoDB 原生 BsonTimestamp