Skip to content

Commit

Permalink
feat: introduce AbstractCursor and its concrete subclasses (#2619)
Browse files Browse the repository at this point in the history
This change introduces a fundamental redesign of the cursor types
in the driver. The first change is to add a new `AbstractCursor`
type, which is only concerned with iterating a cursor (using
`getMore`) once it has been initialized. The `_initialize` method
must be implemented by subclasses. The concrete subclasses are
generally builders for `find` and `aggregate` commands, each
providing their own custom initialization method.

NODE-2809
  • Loading branch information
mbroadst authored Nov 25, 2020
1 parent 7efbba0 commit a2d78b2
Show file tree
Hide file tree
Showing 54 changed files with 2,087 additions and 3,060 deletions.
9 changes: 7 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"dependencies": {
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1"
"denque": "^1.4.1",
"lodash": "^4.17.20"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
Expand All @@ -38,6 +39,7 @@
"@types/bl": "^2.1.0",
"@types/bson": "^4.0.2",
"@types/kerberos": "^1.1.0",
"@types/lodash": "^4.14.164",
"@types/node": "^14.6.4",
"@types/saslprep": "^1.0.0",
"@typescript-eslint/eslint-plugin": "^3.10.0",
Expand Down
149 changes: 87 additions & 62 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import {
relayEvents,
Expand All @@ -21,9 +20,18 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';
import type { Readable } from 'stream';
import {
AbstractCursor,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { ClientSession } from './sessions';
import { executeOperation, ExecutionResult } from './operations/execute_operation';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
const kClosed = Symbol('closed');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -162,13 +170,6 @@ interface UpdateDescription {
removedFields: string[];
}

/** @internal */
export class ChangeStreamStream extends CursorStream {
constructor(cursor: ChangeStreamCursor) {
super(cursor);
}
}

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
Expand All @@ -180,10 +181,10 @@ export class ChangeStream extends EventEmitter {
namespace: MongoDBNamespace;
type: symbol;
cursor?: ChangeStreamCursor;
closed: boolean;
streamOptions?: CursorStreamOptions;
[kResumeQueue]: Denque;
[kCursorStream]?: CursorStream;
[kCursorStream]?: Readable;
[kClosed]: boolean;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -241,7 +242,7 @@ export class ChangeStream extends EventEmitter {
// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);

this.closed = false;
this[kClosed] = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
Expand All @@ -252,13 +253,13 @@ export class ChangeStream extends EventEmitter {

this.on('removeListener', eventName => {
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
this[kCursorStream]?.removeAllListeners('data');
}
});
}

/** @internal */
get cursorStream(): CursorStream | undefined {
get cursorStream(): Readable | undefined {
return this[kCursorStream];
}

Expand Down Expand Up @@ -296,23 +297,20 @@ export class ChangeStream extends EventEmitter {
}

/** Is the cursor closed */
isClosed(): boolean {
return this.closed || (this.cursor?.isClosed() ?? false);
get closed(): boolean {
return this[kClosed] || (this.cursor?.closed ?? false);
}

/** Close the Change Stream */
close(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
if (this.closed) return cb();
this[kClosed] = true;

// flag the change stream as explicitly closed
this.closed = true;

if (!this.cursor) return cb();
return maybePromise(callback, cb => {
if (!this.cursor) {
return cb();
}

// Tidy up the existing cursor
const cursor = this.cursor;

return cursor.close(err => {
endStream(this);
this.cursor = undefined;
Expand All @@ -325,7 +323,7 @@ export class ChangeStream extends EventEmitter {
* Return a modified Readable stream including a possible transform method.
* @throws MongoError if this.cursor is undefined
*/
stream(options?: CursorStreamOptions): ChangeStreamStream {
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
Expand All @@ -335,28 +333,34 @@ export class ChangeStream extends EventEmitter {
}

/** @public */
export interface ChangeStreamCursorOptions extends CursorOptions {
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: boolean;
}

/** @internal */
export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamCursorOptions> {
export class ChangeStreamCursor extends AbstractCursor {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;

postBatchResumeToken?: ResumeToken;
pipeline: Document[];

constructor(
topology: Topology,
operation: AggregateOperation,
options: ChangeStreamCursorOptions
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
) {
super(topology, operation, options);
super(topology, namespace, options);

options = options || {};
this.pipeline = pipeline;
this.options = options;
this._resumeToken = null;
this.startAtOperationTime = options.startAtOperationTime;

Expand Down Expand Up @@ -421,18 +425,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
}
}

_initializeCursor(callback: Callback): void {
super._initializeCursor((err, response) => {
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(
{ s: { namespace: this.namespace } },
this.pipeline,
{
...this.cursorOptions,
...this.options,
session
}
);

executeOperation(this.topology, aggregateOperation, (err, response) => {
if (err || response == null) {
callback(err, response);
return;
return callback(err);
}

const server = aggregateOperation.server;
if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(this.server) >= 7
maxWireVersion(server) >= 7
) {
this.startAtOperationTime = response.operationTime;
}
Expand All @@ -441,15 +455,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC

this.emit('init', response);
this.emit('response');
callback(err, response);

// TODO: NODE-2882
callback(undefined, { server, session, response });
});
}

_getMore(callback: Callback): void {
super._getMore((err, response) => {
_getMore(batchSize: number, callback: Callback): void {
super._getMore(batchSize, (err, response) => {
if (err) {
callback(err);
return;
return callback(err);
}

this._processBatch('nextBatch', response);
Expand All @@ -466,26 +481,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
* @internal
*/
function createChangeStreamCursor(
self: ChangeStream,
changeStream: ChangeStream,
options: ChangeStreamOptions
): ChangeStreamCursor {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
getTopology(changeStream.parent),
changeStream.namespace,
pipeline,
cursorOptions
);

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
streamEvents(changeStream, changeStreamCursor);
}

if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
return changeStreamCursor;
}

Expand Down Expand Up @@ -532,24 +553,24 @@ function waitForTopologyConnected(
}

function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, error);
}

changeStream.close(() => callback && callback(error));
}

function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
stream.on('data', change => processNewChange(changeStream, change));
stream.on('error', error => processError(changeStream, error));
}

function endStream(changeStream: ChangeStream): void {
const cursorStream = changeStream[kCursorStream];
if (cursorStream) {
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
cursorStream.removeAllListeners(event)
);

['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

Expand All @@ -561,7 +582,7 @@ function processNewChange(
change: ChangeStreamDocument,
callback?: Callback
) {
if (changeStream.closed) {
if (changeStream[kClosed]) {
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}
Expand Down Expand Up @@ -591,8 +612,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
if (changeStream[kClosed]) {
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

Expand All @@ -604,7 +625,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca

// otherwise, raise an error and close the change stream
function unresumableError(err: AnyError) {
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, err);
}

changeStream.close(() => processResumeQueue(changeStream, err));
}

Expand Down Expand Up @@ -648,8 +672,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
* @param changeStream - the parent ChangeStream
*/
function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCursor>) {
if (changeStream.isClosed()) {
callback(new MongoError('ChangeStream is closed.'));
if (changeStream[kClosed]) {
callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

Expand All @@ -672,10 +696,11 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream.isClosed() && !err) {
request(new MongoError('Change Stream is not open.'));
if (changeStream[kClosed] && !err) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
}

request(err, changeStream.cursor);
}
}
Loading

0 comments on commit a2d78b2

Please sign in to comment.