更改流

更改流 允许您监听给定模型集合中文档的更新,甚至整个数据库中的文档。与 中间件 不同,更改流是 MongoDB 服务器构造,这意味着它们可以从任何地方获取更改。即使您从 MongoDB GUI 更新文档,您的 Mongoose 更改流也会收到通知。

watch() 函数创建一个更改流。当文档更新时,更改流会发出 'data' 事件。

const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));

// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
  on('change', data => console.log(data));

// Insert a doc, will trigger the change stream handler above
await Person.create({ name: 'Axl Rose' });

上面的脚本将打印类似于以下的输出

{ _id: { _data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1648397740, i: 1 }), fullDocument: { _id: new ObjectId("62408dac6f5c42ff5ee087a2"), name: 'Axl Rose', __v: 0 }, ns: { db: 'test', coll: 'people' }, documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") } }

请注意,您 **必须** 连接到 MongoDB 副本集或分片集群才能使用更改流。如果您尝试在连接到独立的 MongoDB 服务器时调用 watch(),您将收到以下错误。

MongoServerError: $changeStream 阶段仅在副本集上支持

如果您在生产环境中使用 watch(),我们建议您使用 MongoDB Atlas。对于本地开发,我们建议您使用 mongodb-memory-serverrun-rs 在本地启动副本集。

使用 next() 迭代

如果您想在 AWS Lambda 函数 中迭代更改流,**不要** 使用事件发射器来监听更改流。您需要确保在 Lambda 函数执行完毕时关闭更改流,因为如果 Lambda 在更改流从 MongoDB 拉取数据时停止容器,您的更改流可能会处于不一致状态。

更改流还具有一个 next() 函数,允许您显式等待下一个更改到来。使用 resumeAfter 来跟踪上一个更改流停止的位置,并添加超时以确保您的处理程序不会在没有更改的情况下无限期等待。

let resumeAfter = undefined;

exports.handler = async(event, context) => {
  // add this so that we can re-use any static/global variables between function calls if Lambda
  // happens to re-use existing containers for the invocation.
  context.callbackWaitsForEmptyEventLoop = false;

  await connectToDatabase();

  const changeStream = await Country.watch([], { resumeAfter });

  // Change stream `next()` will wait forever if there are no changes. So make sure to
  // stop listening to the change stream after a fixed period of time.
  const timeoutPromise = new Promise(resolve => setTimeout(() => resolve(false), 1000));
  let doc = null;
  while (doc = await Promise.race([changeStream.next(), timeoutPromise])) {
    console.log('Got', doc);
  }

  // `resumeToken` tells you where the change stream left off, so next function instance
  // can pick up any changes that happened in the meantime.
  resumeAfter = changeStream.resumeToken;
  await changeStream.close();
};