This commit is contained in:
2019-01-25 01:39:48 +01:00
commit 41ac3fb32a
3269 changed files with 396937 additions and 0 deletions

415
node_modules/mongodb/lib/gridfs-stream/download.js generated vendored Normal file
View File

@@ -0,0 +1,415 @@
'use strict';
var stream = require('stream'),
util = require('util');
module.exports = GridFSBucketReadStream;
/**
* A readable stream that enables you to read buffers from GridFS.
*
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
*
* @class
* @param {Collection} chunks Handle for chunks collection
* @param {Collection} files Handle for files collection
* @param {Object} readPreference The read preference to use
* @param {Object} filter The query to use to find the file document
* @param {Object} [options] Optional settings.
* @param {Number} [options.sort] Optional sort for the file find query
* @param {Number} [options.skip] Optional skip for the file find query
* @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
* @fires GridFSBucketReadStream#error
* @fires GridFSBucketReadStream#file
* @return {GridFSBucketReadStream} a GridFSBucketReadStream instance.
*/
function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
this.s = {
bytesRead: 0,
chunks: chunks,
cursor: null,
expected: 0,
files: files,
filter: filter,
init: false,
expectedEnd: 0,
file: null,
options: options,
readPreference: readPreference
};
stream.Readable.call(this);
}
util.inherits(GridFSBucketReadStream, stream.Readable);
/**
* An error occurred
*
* @event GridFSBucketReadStream#error
* @type {Error}
*/
/**
* Fires when the stream loaded the file document corresponding to the
* provided id.
*
* @event GridFSBucketReadStream#file
* @type {object}
*/
/**
* Emitted when a chunk of data is available to be consumed.
*
* @event GridFSBucketReadStream#data
* @type {object}
*/
/**
* Fired when the stream is exhausted (no more data events).
*
* @event GridFSBucketReadStream#end
* @type {object}
*/
/**
* Fired when the stream is exhausted and the underlying cursor is killed
*
* @event GridFSBucketReadStream#close
* @type {object}
*/
/**
* Reads from the cursor and pushes to the stream.
* @method
*/
GridFSBucketReadStream.prototype._read = function() {
var _this = this;
if (this.destroyed) {
return;
}
waitForFile(_this, function() {
doRead(_this);
});
};
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
* @method
* @param {Number} start Offset in bytes to start reading at
* @return {GridFSBucketReadStream}
*/
GridFSBucketReadStream.prototype.start = function(start) {
throwIfInitialized(this);
this.s.options.start = start;
return this;
};
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
* @method
* @param {Number} end Offset in bytes to stop reading at
* @return {GridFSBucketReadStream}
*/
GridFSBucketReadStream.prototype.end = function(end) {
throwIfInitialized(this);
this.s.options.end = end;
return this;
};
/**
* Marks this stream as aborted (will never push another `data` event)
* and kills the underlying cursor. Will emit the 'end' event, and then
* the 'close' event once the cursor is successfully killed.
*
* @method
* @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
* @fires GridFSBucketWriteStream#close
* @fires GridFSBucketWriteStream#end
*/
GridFSBucketReadStream.prototype.abort = function(callback) {
var _this = this;
this.push(null);
this.destroyed = true;
if (this.s.cursor) {
this.s.cursor.close(function(error) {
_this.emit('close');
callback && callback(error);
});
} else {
if (!this.s.init) {
// If not initialized, fire close event because we will never
// get a cursor
_this.emit('close');
}
callback && callback();
}
};
/**
* @ignore
*/
function throwIfInitialized(self) {
if (self.s.init) {
throw new Error('You cannot change options after the stream has entered' + 'flowing mode!');
}
}
/**
* @ignore
*/
function doRead(_this) {
if (_this.destroyed) {
return;
}
_this.s.cursor.next(function(error, doc) {
if (_this.destroyed) {
return;
}
if (error) {
return __handleError(_this, error);
}
if (!doc) {
_this.push(null);
return _this.s.cursor.close(function(error) {
if (error) {
return __handleError(_this, error);
}
_this.emit('close');
});
}
var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
var expectedN = _this.s.expected++;
var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining);
if (doc.n > expectedN) {
var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
return __handleError(_this, new Error(errmsg));
}
if (doc.n < expectedN) {
errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
return __handleError(_this, new Error(errmsg));
}
var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
if (buf.length !== expectedLength) {
if (bytesRemaining <= 0) {
errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
return __handleError(_this, new Error(errmsg));
}
errmsg =
'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength;
return __handleError(_this, new Error(errmsg));
}
_this.s.bytesRead += buf.length;
if (buf.length === 0) {
return _this.push(null);
}
var sliceStart = null;
var sliceEnd = null;
if (_this.s.bytesToSkip != null) {
sliceStart = _this.s.bytesToSkip;
_this.s.bytesToSkip = 0;
}
if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
sliceEnd = _this.s.bytesToTrim;
}
// If the remaining amount of data left is < chunkSize read the right amount of data
if (_this.s.options.end && _this.s.options.end - _this.s.bytesToSkip < buf.length) {
sliceEnd = _this.s.options.end - _this.s.bytesToSkip;
}
if (sliceStart != null || sliceEnd != null) {
buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
}
_this.push(buf);
});
}
/**
* @ignore
*/
function init(self) {
var findOneOptions = {};
if (self.s.readPreference) {
findOneOptions.readPreference = self.s.readPreference;
}
if (self.s.options && self.s.options.sort) {
findOneOptions.sort = self.s.options.sort;
}
if (self.s.options && self.s.options.skip) {
findOneOptions.skip = self.s.options.skip;
}
self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
if (error) {
return __handleError(self, error);
}
if (!doc) {
var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename;
var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
var err = new Error(errmsg);
err.code = 'ENOENT';
return __handleError(self, err);
}
// If document is empty, kill the stream immediately and don't
// execute any reads
if (doc.length <= 0) {
self.push(null);
return;
}
if (self.destroyed) {
// If user destroys the stream before we have a cursor, wait
// until the query is done to say we're 'closed' because we can't
// cancel a query.
self.emit('close');
return;
}
self.s.bytesToSkip = handleStartOption(self, doc, self.s.options);
var filter = { files_id: doc._id };
// Currently (MongoDB 3.4.4) skip function does not support the index,
// it needs to retrieve all the documents first and then skip them. (CS-25811)
// As work around we use $gte on the "n" field.
if (self.s.options && self.s.options.start != null) {
var skip = Math.floor(self.s.options.start / doc.chunkSize);
if (skip > 0) {
filter['n'] = { $gte: skip };
}
}
self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 });
if (self.s.readPreference) {
self.s.cursor.setReadPreference(self.s.readPreference);
}
self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
self.s.file = doc;
self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options);
self.emit('file', doc);
});
}
/**
* @ignore
*/
function waitForFile(_this, callback) {
if (_this.s.file) {
return callback();
}
if (!_this.s.init) {
init(_this);
_this.s.init = true;
}
_this.once('file', function() {
callback();
});
}
/**
* @ignore
*/
function handleStartOption(stream, doc, options) {
if (options && options.start != null) {
if (options.start > doc.length) {
throw new Error(
'Stream start (' +
options.start +
') must not be ' +
'more than the length of the file (' +
doc.length +
')'
);
}
if (options.start < 0) {
throw new Error('Stream start (' + options.start + ') must not be ' + 'negative');
}
if (options.end != null && options.end < options.start) {
throw new Error(
'Stream start (' +
options.start +
') must not be ' +
'greater than stream end (' +
options.end +
')'
);
}
stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
stream.s.expected = Math.floor(options.start / doc.chunkSize);
return options.start - stream.s.bytesRead;
}
}
/**
* @ignore
*/
function handleEndOption(stream, doc, cursor, options) {
if (options && options.end != null) {
if (options.end > doc.length) {
throw new Error(
'Stream end (' +
options.end +
') must not be ' +
'more than the length of the file (' +
doc.length +
')'
);
}
if (options.start < 0) {
throw new Error('Stream end (' + options.end + ') must not be ' + 'negative');
}
var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
}
}
/**
* @ignore
*/
function __handleError(_this, error) {
_this.emit('error', error);
}

358
node_modules/mongodb/lib/gridfs-stream/index.js generated vendored Normal file
View File

@@ -0,0 +1,358 @@
'use strict';
var Emitter = require('events').EventEmitter;
var GridFSBucketReadStream = require('./download');
var GridFSBucketWriteStream = require('./upload');
var shallowClone = require('../utils').shallowClone;
var toError = require('../utils').toError;
var util = require('util');
var executeOperation = require('../utils').executeOperation;
var DEFAULT_GRIDFS_BUCKET_OPTIONS = {
bucketName: 'fs',
chunkSizeBytes: 255 * 1024
};
module.exports = GridFSBucket;
/**
* Constructor for a streaming GridFS interface
* @class
* @param {Db} db A db handle
* @param {object} [options] Optional settings.
* @param {string} [options.bucketName="fs"] The 'files' and 'chunks' collections will be prefixed with the bucket name followed by a dot.
* @param {number} [options.chunkSizeBytes=255 * 1024] Number of bytes stored in each chunk. Defaults to 255KB
* @param {object} [options.writeConcern] Optional write concern to be passed to write operations, for instance `{ w: 1 }`
* @param {object} [options.readPreference] Optional read preference to be passed to read operations
* @fires GridFSBucketWriteStream#index
* @return {GridFSBucket}
*/
function GridFSBucket(db, options) {
Emitter.apply(this);
this.setMaxListeners(0);
if (options && typeof options === 'object') {
options = shallowClone(options);
var keys = Object.keys(DEFAULT_GRIDFS_BUCKET_OPTIONS);
for (var i = 0; i < keys.length; ++i) {
if (!options[keys[i]]) {
options[keys[i]] = DEFAULT_GRIDFS_BUCKET_OPTIONS[keys[i]];
}
}
} else {
options = DEFAULT_GRIDFS_BUCKET_OPTIONS;
}
this.s = {
db: db,
options: options,
_chunksCollection: db.collection(options.bucketName + '.chunks'),
_filesCollection: db.collection(options.bucketName + '.files'),
checkedIndexes: false,
calledOpenUploadStream: false,
promiseLibrary: db.s.promiseLibrary || Promise
};
}
util.inherits(GridFSBucket, Emitter);
/**
* When the first call to openUploadStream is made, the upload stream will
* check to see if it needs to create the proper indexes on the chunks and
* files collections. This event is fired either when 1) it determines that
* no index creation is necessary, 2) when it successfully creates the
* necessary indexes.
*
* @event GridFSBucket#index
* @type {Error}
*/
/**
* Returns a writable stream (GridFSBucketWriteStream) for writing
* buffers to GridFS. The stream's 'id' property contains the resulting
* file's id.
* @method
* @param {string} filename The value of the 'filename' key in the files doc
* @param {object} [options] Optional settings.
* @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file
* @param {object} [options.metadata] Optional object to store in the file document's `metadata` field
* @param {string} [options.contentType] Optional string to store in the file document's `contentType` field
* @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field
* @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
* @return {GridFSBucketWriteStream}
*/
GridFSBucket.prototype.openUploadStream = function(filename, options) {
if (options) {
options = shallowClone(options);
} else {
options = {};
}
if (!options.chunkSizeBytes) {
options.chunkSizeBytes = this.s.options.chunkSizeBytes;
}
return new GridFSBucketWriteStream(this, filename, options);
};
/**
* Returns a writable stream (GridFSBucketWriteStream) for writing
* buffers to GridFS for a custom file id. The stream's 'id' property contains the resulting
* file's id.
* @method
* @param {string|number|object} id A custom id used to identify the file
* @param {string} filename The value of the 'filename' key in the files doc
* @param {object} [options] Optional settings.
* @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file
* @param {object} [options.metadata] Optional object to store in the file document's `metadata` field
* @param {string} [options.contentType] Optional string to store in the file document's `contentType` field
* @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field
* @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
* @return {GridFSBucketWriteStream}
*/
GridFSBucket.prototype.openUploadStreamWithId = function(id, filename, options) {
if (options) {
options = shallowClone(options);
} else {
options = {};
}
if (!options.chunkSizeBytes) {
options.chunkSizeBytes = this.s.options.chunkSizeBytes;
}
options.id = id;
return new GridFSBucketWriteStream(this, filename, options);
};
/**
* Returns a readable stream (GridFSBucketReadStream) for streaming file
* data from GridFS.
* @method
* @param {ObjectId} id The id of the file doc
* @param {Object} [options] Optional settings.
* @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
* @return {GridFSBucketReadStream}
*/
GridFSBucket.prototype.openDownloadStream = function(id, options) {
var filter = { _id: id };
options = {
start: options && options.start,
end: options && options.end
};
return new GridFSBucketReadStream(
this.s._chunksCollection,
this.s._filesCollection,
this.s.options.readPreference,
filter,
options
);
};
/**
* Deletes a file with the given id
* @method
* @param {ObjectId} id The id of the file doc
* @param {GridFSBucket~errorCallback} [callback]
*/
GridFSBucket.prototype.delete = function(id, callback) {
return executeOperation(this.s.db.s.topology, _delete, [this, id, callback], {
skipSessions: true
});
};
/**
* @ignore
*/
function _delete(_this, id, callback) {
_this.s._filesCollection.deleteOne({ _id: id }, function(error, res) {
if (error) {
return callback(error);
}
_this.s._chunksCollection.deleteMany({ files_id: id }, function(error) {
if (error) {
return callback(error);
}
// Delete orphaned chunks before returning FileNotFound
if (!res.result.n) {
var errmsg = 'FileNotFound: no file with id ' + id + ' found';
return callback(new Error(errmsg));
}
callback();
});
});
}
/**
* Convenience wrapper around find on the files collection
* @method
* @param {Object} filter
* @param {Object} [options] Optional settings for cursor
* @param {number} [options.batchSize] Optional batch size for cursor
* @param {number} [options.limit] Optional limit for cursor
* @param {number} [options.maxTimeMS] Optional maxTimeMS for cursor
* @param {boolean} [options.noCursorTimeout] Optionally set cursor's `noCursorTimeout` flag
* @param {number} [options.skip] Optional skip for cursor
* @param {object} [options.sort] Optional sort for cursor
* @return {Cursor}
*/
GridFSBucket.prototype.find = function(filter, options) {
filter = filter || {};
options = options || {};
var cursor = this.s._filesCollection.find(filter);
if (options.batchSize != null) {
cursor.batchSize(options.batchSize);
}
if (options.limit != null) {
cursor.limit(options.limit);
}
if (options.maxTimeMS != null) {
cursor.maxTimeMS(options.maxTimeMS);
}
if (options.noCursorTimeout != null) {
cursor.addCursorFlag('noCursorTimeout', options.noCursorTimeout);
}
if (options.skip != null) {
cursor.skip(options.skip);
}
if (options.sort != null) {
cursor.sort(options.sort);
}
return cursor;
};
/**
* Returns a readable stream (GridFSBucketReadStream) for streaming the
* file with the given name from GridFS. If there are multiple files with
* the same name, this will stream the most recent file with the given name
* (as determined by the `uploadDate` field). You can set the `revision`
* option to change this behavior.
* @method
* @param {String} filename The name of the file to stream
* @param {Object} [options] Optional settings
* @param {number} [options.revision=-1] The revision number relative to the oldest file with the given filename. 0 gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the newest.
* @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
* @return {GridFSBucketReadStream}
*/
GridFSBucket.prototype.openDownloadStreamByName = function(filename, options) {
var sort = { uploadDate: -1 };
var skip = null;
if (options && options.revision != null) {
if (options.revision >= 0) {
sort = { uploadDate: 1 };
skip = options.revision;
} else {
skip = -options.revision - 1;
}
}
var filter = { filename: filename };
options = {
sort: sort,
skip: skip,
start: options && options.start,
end: options && options.end
};
return new GridFSBucketReadStream(
this.s._chunksCollection,
this.s._filesCollection,
this.s.options.readPreference,
filter,
options
);
};
/**
* Renames the file with the given _id to the given string
* @method
* @param {ObjectId} id the id of the file to rename
* @param {String} filename new name for the file
* @param {GridFSBucket~errorCallback} [callback]
*/
GridFSBucket.prototype.rename = function(id, filename, callback) {
return executeOperation(this.s.db.s.topology, _rename, [this, id, filename, callback], {
skipSessions: true
});
};
/**
* @ignore
*/
function _rename(_this, id, filename, callback) {
var filter = { _id: id };
var update = { $set: { filename: filename } };
_this.s._filesCollection.updateOne(filter, update, function(error, res) {
if (error) {
return callback(error);
}
if (!res.result.n) {
return callback(toError('File with id ' + id + ' not found'));
}
callback();
});
}
/**
* Removes this bucket's files collection, followed by its chunks collection.
* @method
* @param {GridFSBucket~errorCallback} [callback]
*/
GridFSBucket.prototype.drop = function(callback) {
return executeOperation(this.s.db.s.topology, _drop, [this, callback], {
skipSessions: true
});
};
/**
* Return the db logger
* @method
* @return {Logger} return the db logger
* @ignore
*/
GridFSBucket.prototype.getLogger = function() {
return this.s.db.s.logger;
};
/**
* @ignore
*/
function _drop(_this, callback) {
_this.s._filesCollection.drop(function(error) {
if (error) {
return callback(error);
}
_this.s._chunksCollection.drop(function(error) {
if (error) {
return callback(error);
}
return callback();
});
});
}
/**
* Callback format for all GridFSBucket methods that can accept a callback.
* @callback GridFSBucket~errorCallback
* @param {MongoError} error An error instance representing any errors that occurred
*/

538
node_modules/mongodb/lib/gridfs-stream/upload.js generated vendored Normal file
View File

@@ -0,0 +1,538 @@
'use strict';
var core = require('mongodb-core');
var crypto = require('crypto');
var stream = require('stream');
var util = require('util');
var Buffer = require('safe-buffer').Buffer;
var ERROR_NAMESPACE_NOT_FOUND = 26;
module.exports = GridFSBucketWriteStream;
/**
* A writable stream that enables you to write buffers to GridFS.
*
* Do not instantiate this class directly. Use `openUploadStream()` instead.
*
* @class
* @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
* @param {string} filename The value of the 'filename' key in the files doc
* @param {object} [options] Optional settings.
* @param {string|number|object} [options.id] Custom file id for the GridFS file.
* @param {number} [options.chunkSizeBytes] The chunk size to use, in bytes
* @param {number} [options.w] The write concern
* @param {number} [options.wtimeout] The write concern timeout
* @param {number} [options.j] The journal write concern
* @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
* @fires GridFSBucketWriteStream#error
* @fires GridFSBucketWriteStream#finish
* @return {GridFSBucketWriteStream} a GridFSBucketWriteStream instance.
*/
function GridFSBucketWriteStream(bucket, filename, options) {
options = options || {};
this.bucket = bucket;
this.chunks = bucket.s._chunksCollection;
this.filename = filename;
this.files = bucket.s._filesCollection;
this.options = options;
// Signals the write is all done
this.done = false;
this.id = options.id ? options.id : core.BSON.ObjectId();
this.chunkSizeBytes = this.options.chunkSizeBytes;
this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
this.length = 0;
this.md5 = !options.disableMD5 && crypto.createHash('md5');
this.n = 0;
this.pos = 0;
this.state = {
streamEnd: false,
outstandingRequests: 0,
errored: false,
aborted: false,
promiseLibrary: this.bucket.s.promiseLibrary
};
if (!this.bucket.s.calledOpenUploadStream) {
this.bucket.s.calledOpenUploadStream = true;
var _this = this;
checkIndexes(this, function() {
_this.bucket.s.checkedIndexes = true;
_this.bucket.emit('index');
});
}
}
util.inherits(GridFSBucketWriteStream, stream.Writable);
/**
* An error occurred
*
* @event GridFSBucketWriteStream#error
* @type {Error}
*/
/**
* `end()` was called and the write stream successfully wrote the file
* metadata and all the chunks to MongoDB.
*
* @event GridFSBucketWriteStream#finish
* @type {object}
*/
/**
* Write a buffer to the stream.
*
* @method
* @param {Buffer} chunk Buffer to write
* @param {String} encoding Optional encoding for the buffer
* @param {Function} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
* @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
*/
GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
var _this = this;
return waitForIndexes(this, function() {
return doWrite(_this, chunk, encoding, callback);
});
};
/**
* Places this write stream into an aborted state (all future writes fail)
* and deletes all chunks that have already been written.
*
* @method
* @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
* @return {Promise} if no callback specified
*/
GridFSBucketWriteStream.prototype.abort = function(callback) {
if (this.state.streamEnd) {
var error = new Error('Cannot abort a stream that has already completed');
if (typeof callback === 'function') {
return callback(error);
}
return this.state.promiseLibrary.reject(error);
}
if (this.state.aborted) {
error = new Error('Cannot call abort() on a stream twice');
if (typeof callback === 'function') {
return callback(error);
}
return this.state.promiseLibrary.reject(error);
}
this.state.aborted = true;
this.chunks.deleteMany({ files_id: this.id }, function(error) {
if (typeof callback === 'function') callback(error);
});
};
/**
* Tells the stream that no more data will be coming in. The stream will
* persist the remaining data to MongoDB, write the files document, and
* then emit a 'finish' event.
*
* @method
* @param {Buffer} chunk Buffer to write
* @param {String} encoding Optional encoding for the buffer
* @param {Function} callback Function to call when all files and chunks have been persisted to MongoDB
*/
GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
var _this = this;
if (typeof chunk === 'function') {
(callback = chunk), (chunk = null), (encoding = null);
} else if (typeof encoding === 'function') {
(callback = encoding), (encoding = null);
}
if (checkAborted(this, callback)) {
return;
}
this.state.streamEnd = true;
if (callback) {
this.once('finish', function(result) {
callback(null, result);
});
}
if (!chunk) {
waitForIndexes(this, function() {
writeRemnant(_this);
});
return;
}
this.write(chunk, encoding, function() {
writeRemnant(_this);
});
};
/**
* @ignore
*/
function __handleError(_this, error, callback) {
if (_this.state.errored) {
return;
}
_this.state.errored = true;
if (callback) {
return callback(error);
}
_this.emit('error', error);
}
/**
* @ignore
*/
function createChunkDoc(filesId, n, data) {
return {
_id: core.BSON.ObjectId(),
files_id: filesId,
n: n,
data: data
};
}
/**
* @ignore
*/
function checkChunksIndex(_this, callback) {
_this.chunks.listIndexes().toArray(function(error, indexes) {
if (error) {
// Collection doesn't exist so create index
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
var index = { files_id: 1, n: 1 };
_this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
if (error) {
return callback(error);
}
callback();
});
return;
}
return callback(error);
}
var hasChunksIndex = false;
indexes.forEach(function(index) {
if (index.key) {
var keys = Object.keys(index.key);
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
hasChunksIndex = true;
}
}
});
if (hasChunksIndex) {
callback();
} else {
index = { files_id: 1, n: 1 };
var indexOptions = getWriteOptions(_this);
indexOptions.background = false;
indexOptions.unique = true;
_this.chunks.createIndex(index, indexOptions, function(error) {
if (error) {
return callback(error);
}
callback();
});
}
});
}
/**
* @ignore
*/
function checkDone(_this, callback) {
if (_this.done) return true;
if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) {
// Set done so we dont' trigger duplicate createFilesDoc
_this.done = true;
// Create a new files doc
var filesDoc = createFilesDoc(
_this.id,
_this.length,
_this.chunkSizeBytes,
_this.md5 && _this.md5.digest('hex'),
_this.filename,
_this.options.contentType,
_this.options.aliases,
_this.options.metadata
);
if (checkAborted(_this, callback)) {
return false;
}
_this.files.insertOne(filesDoc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error, callback);
}
_this.emit('finish', filesDoc);
});
return true;
}
return false;
}
/**
* @ignore
*/
function checkIndexes(_this, callback) {
_this.files.findOne({}, { _id: 1 }, function(error, doc) {
if (error) {
return callback(error);
}
if (doc) {
return callback();
}
_this.files.listIndexes().toArray(function(error, indexes) {
if (error) {
// Collection doesn't exist so create index
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
var index = { filename: 1, uploadDate: 1 };
_this.files.createIndex(index, { background: false }, function(error) {
if (error) {
return callback(error);
}
checkChunksIndex(_this, callback);
});
return;
}
return callback(error);
}
var hasFileIndex = false;
indexes.forEach(function(index) {
var keys = Object.keys(index.key);
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
hasFileIndex = true;
}
});
if (hasFileIndex) {
checkChunksIndex(_this, callback);
} else {
index = { filename: 1, uploadDate: 1 };
var indexOptions = getWriteOptions(_this);
indexOptions.background = false;
_this.files.createIndex(index, indexOptions, function(error) {
if (error) {
return callback(error);
}
checkChunksIndex(_this, callback);
});
}
});
});
}
/**
* @ignore
*/
function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) {
var ret = {
_id: _id,
length: length,
chunkSize: chunkSize,
uploadDate: new Date(),
filename: filename
};
if (md5) {
ret.md5 = md5;
}
if (contentType) {
ret.contentType = contentType;
}
if (aliases) {
ret.aliases = aliases;
}
if (metadata) {
ret.metadata = metadata;
}
return ret;
}
/**
* @ignore
*/
function doWrite(_this, chunk, encoding, callback) {
if (checkAborted(_this, callback)) {
return false;
}
var inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
_this.length += inputBuf.length;
// Input is small enough to fit in our buffer
if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
inputBuf.copy(_this.bufToStore, _this.pos);
_this.pos += inputBuf.length;
callback && callback();
// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// True means client can keep writing.
return true;
}
// Otherwise, buffer is too big for current chunk, so we need to flush
// to MongoDB.
var inputBufRemaining = inputBuf.length;
var spaceRemaining = _this.chunkSizeBytes - _this.pos;
var numToCopy = Math.min(spaceRemaining, inputBuf.length);
var outstandingRequests = 0;
while (inputBufRemaining > 0) {
var inputBufPos = inputBuf.length - inputBufRemaining;
inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy);
_this.pos += numToCopy;
spaceRemaining -= numToCopy;
if (spaceRemaining === 0) {
if (_this.md5) {
_this.md5.update(_this.bufToStore);
}
var doc = createChunkDoc(_this.id, _this.n, _this.bufToStore);
++_this.state.outstandingRequests;
++outstandingRequests;
if (checkAborted(_this, callback)) {
return false;
}
_this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error);
}
--_this.state.outstandingRequests;
--outstandingRequests;
if (!outstandingRequests) {
_this.emit('drain', doc);
callback && callback();
checkDone(_this);
}
});
spaceRemaining = _this.chunkSizeBytes;
_this.pos = 0;
++_this.n;
}
inputBufRemaining -= numToCopy;
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
}
// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// False means the client should wait for the 'drain' event.
return false;
}
/**
* @ignore
*/
function getWriteOptions(_this) {
var obj = {};
if (_this.options.writeConcern) {
obj.w = _this.options.writeConcern.w;
obj.wtimeout = _this.options.writeConcern.wtimeout;
obj.j = _this.options.writeConcern.j;
}
return obj;
}
/**
* @ignore
*/
function waitForIndexes(_this, callback) {
if (_this.bucket.s.checkedIndexes) {
return callback(false);
}
_this.bucket.once('index', function() {
callback(true);
});
return true;
}
/**
* @ignore
*/
function writeRemnant(_this, callback) {
// Buffer is empty, so don't bother to insert
if (_this.pos === 0) {
return checkDone(_this, callback);
}
++_this.state.outstandingRequests;
// Create a new buffer to make sure the buffer isn't bigger than it needs
// to be.
var remnant = Buffer.alloc(_this.pos);
_this.bufToStore.copy(remnant, 0, 0, _this.pos);
if (_this.md5) {
_this.md5.update(remnant);
}
var doc = createChunkDoc(_this.id, _this.n, remnant);
// If the stream was aborted, do not write remnant
if (checkAborted(_this, callback)) {
return false;
}
_this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error);
}
--_this.state.outstandingRequests;
checkDone(_this);
});
}
/**
* @ignore
*/
function checkAborted(_this, callback) {
if (_this.state.aborted) {
if (typeof callback === 'function') {
callback(new Error('this stream has been aborted'));
}
return true;
}
return false;
}