此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Data MongoDB 5.0.4spring-doc.cadn.net.cn

会话与事务

从 3.6 版本起,MongoDB 支持会话(sessions)的概念。 使用会话可启用 MongoDB 的因果一致性(Causal Consistency)模型,该模型保证以符合操作之间因果关系的顺序执行操作。 这些会话分为 ServerSession 实例和 ClientSession 实例。 在本节中,当我们提到会话时,指的是 ClientSessionspring-doc.cadn.net.cn

客户端会话内的操作不会与会话外的操作相互隔离。

MongoOperationsReactiveMongoOperations 均提供了网关方法,用于将 ClientSession 绑定到操作上。 MongoCollectionMongoDatabase 使用会话代理对象,这些代理对象实现了 MongoDB 的集合和数据库接口,因此您无需在每次调用时都显式添加会话。 这意味着对 MongoCollection#find() 的潜在调用会被委托给 MongoCollection#find(ClientSession)spring-doc.cadn.net.cn

诸如 (Reactive)MongoOperations#getCollection 之类的方法会返回原生的 MongoDB Java 驱动程序网关对象(例如 MongoCollection),这些对象自身提供了专门用于 ClientSession 的方法。 这些方法不会被会话代理(session-proxied)。 当您直接与 ClientSessionMongoCollection 交互(而不是通过 MongoDatabase 上的某个 #execute 回调)时,应在需要的地方显式提供 MongoOperations

ClientSession 支持

以下示例展示了会话的用法:spring-doc.cadn.net.cn

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); (1)

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  (2)

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                (3)

        return azoth;
    });

session.close()                                              (4)
1 从服务器获取一个新会话。
2 像以前一样使用 MongoOperation 方法。 ClientSession 会自动应用。
3 请务必关闭 ClientSession
4 关闭会话。
在处理 DBRef 实例时,尤其是延迟加载的实例,切勿在所有数据加载完成之前关闭 ClientSession。 否则,延迟加载将失败。
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); (1)

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            (2)
            });
    }, ClientSession::close)                                            (3)
    .subscribe();                                                       (4)
1 获取用于检索新会话的 Publisher
2 像以前一样使用 ReactiveMongoOperation 方法。 ClientSession 会被自动获取并应用。
3 请务必关闭 ClientSession
4 在你订阅之前,什么都不会发生。 详情请参阅Project Reactor 参考指南

通过使用一个提供实际会话的 Publisher,您可以将会话获取推迟到实际订阅时再进行。 然而,您在使用完毕后仍需关闭会话,以免服务器中残留过期会话造成污染。 请在 doFinally 上使用 execute 钩子,在不再需要该会话时调用 ClientSession#close()。 如果您希望对会话本身拥有更多控制权,也可以通过驱动程序获取 ClientSession,并通过 Supplier 提供它。spring-doc.cadn.net.cn

ClientSession 的响应式使用仅限于模板 API 的使用。 目前响应式仓库尚未集成会话功能。

MongoDB 事务

从 4 版本开始,MongoDB 支持事务(Transactions)。 事务基于会话(Sessions)构建,因此需要一个活跃的ClientSessionspring-doc.cadn.net.cn

除非你在应用程序上下文中指定一个 MongoTransactionManager,否则事务支持将被禁用。 你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。

若要对事务进行完全的编程控制,您可以使用 MongoOperations 上的会话回调。spring-doc.cadn.net.cn

以下示例展示了编程式事务控制:spring-doc.cadn.net.cn

编程式事务
ClientSession session = client.startSession(options);                   (1)

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     (2)

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                (3)

        } catch (RuntimeException e) {
            session.abortTransaction();                                 (4)
        }
    }, ClientSession::close)                                            (5)
1 获取一个新的ClientSession
2 开始事务。
3 如果一切按预期正常运行,请提交更改。
4 出了问题,因此回滚所有操作。
5 完成操作后不要忘记关闭会话。

前面的示例允许您完全控制事务行为,同时在回调中使用会话作用域的 MongoOperations 实例,以确保会话被传递到每个服务器调用。 为了避免这种方法带来的一些开销,您可以使用 TransactionTemplate 来减少手动事务流程中的冗余代码。spring-doc.cadn.net.cn

Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             (1)

    .flatMap(session -> {
        session.startTransaction();                                                          (2)

        return Mono.from(collection.deleteMany(session, ...))                                (3)

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   (4)

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     (5)

            .doFinally(signal -> session.close());                                           (6)
      });
1 首先,我们显然需要初始化会话。
2 一旦我们手头有了ClientSession,就开始事务。
3 通过将 ClientSession 传递给操作,在事务中执行操作。
4 如果操作异常完成,我们需要停止事务并保留错误。
5 或者,当然也可以在成功时提交更改。 同时仍保留操作结果。
6 最后,我们需要确保关闭会话。

上述操作的问题在于保留了主流程的 DeleteResult,而不是通过 commitTransaction()abortTransaction() 发布的事务结果,这导致了相当复杂的设置。spring-doc.cadn.net.cn

除非你在应用程序上下文中指定一个 ReactiveMongoTransactionManager,否则事务支持将被禁用。 你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。

使用 TransactionTemplate / TransactionalOperator 进行事务管理

Spring Data MongoDB 事务同时支持 TransactionTemplateTransactionalOperatorspring-doc.cadn.net.cn

使用 TransactionTemplate / TransactionalOperator 进行事务处理
template.setSessionSynchronization(ALWAYS);                                     (1)

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         (2)

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     (3)

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 在模板 API 配置期间启用事务同步。
2 使用提供的 TransactionTemplate 创建 PlatformTransactionManager
3 在回调中,ClientSession 和事务已经被注册。
在运行时更改MongoTemplate的状态(如你可能认为在前面列表的第1项中是可行的)可能会导致线程安全和可见性问题。
template.setSessionSynchronization(ALWAYS);                                          (1)

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              (2)


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         (3)
    .then();
1 为事务参与启用事务同步。
2 使用提供的 TransactionalOperator 创建 ReactiveTransactionManager
3 TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。

使用 MongoTransactionManager 和 ReactiveMongoTransactionManager 进行事务处理

MongoTransactionManager / ReactiveMongoTransactionManager 是广为人知的 Spring 事务支持的入口。 它允许应用程序使用 Spring 的托管事务功能MongoTransactionManagerClientSession 绑定到线程,而 ReactiveMongoTransactionManager 则使用 ReactorContext 来实现这一功能。 MongoTemplate 检测会话,并相应地操作与事务关联的这些资源。 MongoTemplate 也可以参与其他正在进行的事务。 以下示例展示了如何使用 MongoTransactionManager 创建和使用事务:spring-doc.cadn.net.cn

使用 MongoTransactionManager / ReactiveMongoTransactionManager 进行事务处理
@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  (1)
        return new MongoTransactionManager(dbFactory);
    }

    @Bean
    MongoTemplate mongoTemplate(MongoDatabaseFactory dbFactory) {                 (1)
        return new MongoTemplate(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        (2)

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 在应用程序上下文中注册 MongoTransactionManager。 此外,在创建 MongoDatabaseFactory 时,请确保使用相同的 MongoTemplate,以便在同一 MongoDatabaseFactory 范围内参与事务。
2 将方法标记为事务性的。
@Transactional(readOnly = true) 会通知 MongoTransactionManager 同样启动一个事务,将 ClientSession 添加到发出的请求中。
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  (1)
        return new ReactiveMongoTransactionManager(factory);
    }

    @Bean
    ReactiveMongoTemplate reactiveMongoTemplate(ReactiveMongoDatabaseFactory dbFactory) {       (1)
        return new ReactiveMongoTemplate(dbFactory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  (2)

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 在应用程序上下文中注册 ReactiveMongoTransactionManager。 此外,在创建 ReactiveMongoDatabaseFactory 时,请确保使用相同的 ReactiveMongoTemplate,以便在同一 ReactiveMongoDatabaseFactory 范围内参与事务。
2 将方法标记为事务性的。
@Transactional(readOnly = true) 会通知 ReactiveMongoTransactionManager 同样启动一个事务,将 ClientSession 添加到发出的请求中。

控制 MongoDB 特定的事务选项

事务性服务方法可能需要特定的事务选项来运行事务。 Spring Data MongoDB 的事务管理器支持对事务标签(例如 @Transactional(label = { "mongo:readConcern=available" }))进行求值。spring-doc.cadn.net.cn

默认情况下,使用 mongo: 前缀的标签命名空间由默认配置的 MongoTransactionOptionsResolver 进行解析。 事务标签由 TransactionAttribute 提供,并可通过 TransactionTemplateTransactionalOperator 用于编程式事务控制。 由于其声明式特性,@Transactional(label = …) 提供了一个良好的起点,同时也可作为文档使用。spring-doc.cadn.net.cn

目前,支持以下选项:spring-doc.cadn.net.cn

最大提交时间

控制服务器上 commitTransaction 操作的最大执行时间。 该值的格式遵循 ISO-8601 持续时间格式,与 Duration.parse(…) 中使用的格式一致。spring-doc.cadn.net.cn

用法: mongo:maxCommitTime=PT1Sspring-doc.cadn.net.cn

读关注

设置事务的读关注(read concern)。spring-doc.cadn.net.cn

用法: mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLEspring-doc.cadn.net.cn

读取偏好

设置事务的读取偏好。spring-doc.cadn.net.cn

用法: mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEARESTspring-doc.cadn.net.cn

写关注

设置事务的写关注(write concern)。spring-doc.cadn.net.cn

用法: mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITYspring-doc.cadn.net.cn

加入外部事务的嵌套事务不会影响初始事务的选项,因为事务已经启动。 事务选项仅在启动新事务时才会被应用。

事务内的特殊行为

在事务内部,MongoDB 服务器的行为略有不同。spring-doc.cadn.net.cn

MongoDB 驱动程序提供了一个专用的副本集名称配置选项,可将驱动程序置于自动检测模式。 该选项有助于在事务期间识别主副本集节点并进行命令路由。spring-doc.cadn.net.cn

请确保在 MongoDB URI 中添加 replicaSet。 更多详细信息,请参阅连接字符串选项

MongoDB 支持在事务内执行集合操作,例如创建集合。 这也会影响首次使用时自动创建集合的行为。 因此,请确保所有必需的结构已预先创建好。spring-doc.cadn.net.cn

MongoDB 可以在事务操作期间引发的错误上添加特殊标签。 这些标签可能表示暂时性故障,只需重试操作就可能会消失。 我们强烈推荐为此类场景使用 Spring Retry。 不过,也可以通过重写 MongoTransactionManager#doCommit(MongoTransactionObject) 来实现 MongoDB 参考手册中所述的重试提交操作行为。spring-doc.cadn.net.cn

MongoDB 的 count 操作基于集合统计信息,这些信息在事务中可能无法反映实际情况。 当在多文档事务中执行 count 命令时,服务器会返回错误 50851。 一旦 MongoTemplate 检测到存在活跃的事务,所有公开的 count() 方法都会被转换并委托给聚合框架,使用 $match$count 操作符来执行,同时保留 Query 的设置(例如 collation)。spring-doc.cadn.net.cn

在聚合计数辅助方法中使用地理命令时存在限制。 以下操作符不能使用,必须替换为其他操作符:spring-doc.cadn.net.cn

使用 Criteria.near(…)Criteria.nearSphere(…) 的查询必须重写为分别使用 Criteria.within(…)Criteria.withinSphere(…)。 同样,仓库查询方法中的 near 查询关键字也必须更改为 within。 更多详情请参见 MongoDB JIRA 工单 DRIVERS-518spring-doc.cadn.net.cn

以下代码片段展示了在会话绑定的闭包中使用 count 的方式:spring-doc.cadn.net.cn

session.startTransaction();

template.withSession(session)
    .execute(ops -> {
        return ops.count(query(where("state").is("active")), Step.class)
        });

上面的代码片段会生成如下命令:spring-doc.cadn.net.cn

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)
db.collection.find( { state: "active" } ).count()