Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 129 additions & 30 deletions lib/mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ exports.initialize = function initializeDataSource(dataSource, callback) {

s.safe = s.safe !== false;
s.w = s.w || 1;
s.writeConcern = s.writeConcern || {
w: s.w,
wtimeout: s.wtimeout || null,
j: s.j || null,
journal: s.journal || null,
fsync: s.fsync || null,
};
s.url = s.url || generateMongoDBURL(s);
s.useNewUrlParser = s.useNewUrlParser !== false;
s.useUnifiedTopology = s.useUnifiedTopology !== false;
Expand Down Expand Up @@ -251,9 +258,6 @@ MongoDB.prototype.connect = function(callback) {
'acceptableLatencyMS',
'connectWithNoPrimary',
'authSource',
'w',
'wtimeout',
'j',
'forceServerObjectId',
'serializeFunctions',
'ignoreUndefined',
Expand All @@ -278,13 +282,13 @@ MongoDB.prototype.connect = function(callback) {
'password',
'authMechanism',
'compression',
'fsync',
'readPreferenceTags',
'numberOfRetries',
'auto_reconnect',
'minSize',
'useNewUrlParser',
'useUnifiedTopology',
'writeConcern',
// Ignored options
'native_parser',
// Legacy options
Expand All @@ -293,6 +297,11 @@ MongoDB.prototype.connect = function(callback) {
'replSet',
'mongos',
'db',
'w',
'wtimeout',
'j',
'journal',
'fsync',
];

const lbOptions = Object.keys(self.settings);
Expand Down Expand Up @@ -683,7 +692,7 @@ MongoDB.prototype.exists = function(modelName, id, options, callback) {
debug('exists', modelName, id);
}
id = self.coerceId(modelName, id, options);
this.execute(modelName, 'findOne', {_id: id}, function(err, data) {
this.execute(modelName, 'findOne', {_id: id}, buildOptions({}, options), function(err, data) {
if (self.debug) {
debug('exists.callback', modelName, id, err, data);
}
Expand All @@ -704,7 +713,7 @@ MongoDB.prototype.find = function find(modelName, id, options, callback) {
}
const idName = self.idName(modelName);
const oid = self.coerceId(modelName, id, options);
this.execute(modelName, 'findOne', {_id: oid}, function(err, data) {
this.execute(modelName, 'findOne', {_id: oid}, buildOptions({}, options), function(err, data) {
if (self.debug) {
debug('find.callback', modelName, id, err, data);
}
Expand Down Expand Up @@ -893,7 +902,7 @@ MongoDB.prototype.destroy = function destroy(modelName, id, options, callback) {
debug('delete', modelName, id);
}
id = self.coerceId(modelName, id, options);
this.execute(modelName, 'deleteOne', {_id: id}, function(err, result) {
this.execute(modelName, 'deleteOne', {_id: id}, buildOptions({}, options), function(err, result) {
if (self.debug) {
debug('delete.callback', modelName, id, err, result);
}
Expand Down Expand Up @@ -1034,6 +1043,16 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {

query[k] = {$regex: cond};
} else {
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
if (Array.isArray(cond)) {
cond = cond.map(function(c) {
return ObjectID(c);
});
} else {
cond = ObjectID(cond);
}
}

query[k] = {};
query[k]['$' + spec] = cond;
}
Expand All @@ -1044,8 +1063,15 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {
query[k] = {$type: 10};
} else {
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
cond = ObjectID(cond);
if (Array.isArray(cond)) {
cond = cond.map(function(c) {
return ObjectID(c);
});
} else {
cond = ObjectID(cond);
}
}

query[k] = cond;
}
}
Expand Down Expand Up @@ -1312,8 +1338,17 @@ MongoDB.prototype.convertColumnNames = function(model, data, direction) {
}

if (direction === 'database') {
data[columnName] = data[propName];
delete data[propName];
// Handle data is Array object - in case of fields filter
if (Array.isArray(data)) {
const idx = data.indexOf(propName);
if (idx !== -1) {
data.push(columnName);
delete data[idx];
}
} else { // Handle data as Object - in case to create / update
data[columnName] = data[propName];
delete data[propName];
}
}

if (direction === 'property') {
Expand Down Expand Up @@ -1351,17 +1386,23 @@ MongoDB.prototype.all = function all(modelName, filter, options, callback) {
if (filter.where) {
query = self.buildWhere(modelName, filter.where, options);
}
let fields = filter.fields;
// Use Object.assign to avoid change filter.fields
// which will cause error when create model from data
let fields = undefined;
if (typeof filter.fields !== 'undefined') {
fields = [];
Object.assign(fields, filter.fields);
}

// Convert custom column names
fields = self.fromPropertyToDatabaseNames(modelName, fields);

options = buildOptions({}, options);

if (fields) {
const findOpts = {projection: fieldsArrayToObj(fields)};
this.execute(modelName, 'find', query, findOpts, processResponse);
} else {
this.execute(modelName, 'find', query, processResponse);
options.projection = fieldsArrayToObj(fields);
}
this.execute(modelName, 'find', query, options, processResponse);

function processResponse(err, cursor) {
if (err) {
Expand Down Expand Up @@ -1461,7 +1502,7 @@ MongoDB.prototype.destroyAll = function destroyAll(
where = self.buildWhere(modelName, where, options);
if (debug.enabled) debug('destroyAll where %s', util.inspect(where));

this.execute(modelName, 'deleteMany', where || {}, function(err, info) {
this.execute(modelName, 'deleteMany', where || {}, buildOptions({}, options), function(err, info) {
if (err) return callback && callback(err);

if (self.debug) debug('destroyAll.callback', modelName, where, err, info);
Expand All @@ -1488,15 +1529,26 @@ MongoDB.prototype.count = function count(modelName, where, options, callback) {
debug('count', modelName, where);
}
where = self.buildWhere(modelName, where, options) || {};
const method = Object.keys(where).length === 0 ? 'estimatedDocumentCount' : 'countDocuments';
this.execute(modelName, method, where, function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
options = buildOptions({}, options);
if (Object.keys(where).length === 0 && !options.session) {
this.execute(modelName, 'estimatedDocumentCount', function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
} else {
this.execute(modelName, 'countDocuments', where, options, function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
}
};

/**
Expand Down Expand Up @@ -1538,7 +1590,7 @@ MongoDB.prototype.replaceWithOptions = function(modelName, id, data, options, cb
const idName = self.idName(modelName);
delete data[idName];
data = self.toDatabase(modelName, data);
this.execute(modelName, 'replaceOne', {_id: id}, data, options, function(
this.execute(modelName, 'replaceOne', {_id: id}, data, buildOptions({}, options), function(
err,
info,
) {
Expand Down Expand Up @@ -1735,11 +1787,11 @@ MongoDB.prototype.upsertWithWhere = function upsertWithWhere(
'findOneAndUpdate',
where,
updateData,
{
buildOptions({
upsert: true,
returnOriginal: false,
sort: [['_id', 'asc']],
},
}, options),
function(err, result) {
if (err) return cb && cb(err);

Expand Down Expand Up @@ -2015,6 +2067,48 @@ MongoDB.prototype.ping = function(cb) {
}
};

MongoDB.prototype.beginTransaction = function(isolationLevel, cb) {
// TODO: think about how to convert READ_COMMITED, etc. to transactionOptions
const transactionOptions = {
readPreference: 'primary',
readConcern: {level: 'local'},
writeConcern: {w: 'majority'},
};
if (isolationLevel instanceof Object) {
Object.assign(transactionOptions, isolationLevel || {});
}
const session = this.client.startSession();
session.startTransaction(transactionOptions);
cb(null, session);
};

MongoDB.prototype.commit = function(tx, cb) {
tx.commitTransaction(function(err) {
tx.endSession(null, function(error) {
if (err) return cb(err);
if (error) return cb(error);
cb();
});
});
};

MongoDB.prototype.rollback = function(tx, cb) {
tx.abortTransaction(function(err) {
tx.endSession(null, function(error) {
if (err) return cb(err);
if (error) return cb(error);
cb();
});
});
};

function isInTransation(options) {
const ops = {};
if (options && options.transaction && options.transaction.isInTransation)
ops.session = options.transaction.session;
return ops;
}

// Case insensitive check if a string looks like "ObjectID"
function typeIsObjectId(input) {
if (!input) return false;
Expand Down Expand Up @@ -2072,7 +2166,8 @@ function coerceToObjectId(modelCtor, propDef, propValue) {
function isObjectIDProperty(modelCtor, propDef, value, options) {
if (!propDef) return false;

if (typeof value === 'string' && value.match(ObjectIdValueRegex)) {
if ((typeof value === 'string' && value.match(ObjectIdValueRegex)) ||
(Array.isArray(value) && value.every((v) => v.match(ObjectIdValueRegex)))) {
if (isStoredAsObjectID(propDef)) return true;
else return !isStrictObjectIDCoercionEnabled(modelCtor, options);
} else if (value instanceof mongodb.ObjectID) {
Expand Down Expand Up @@ -2306,5 +2401,9 @@ function hasDataType(dataType, propertyDef) {
* @param {*} connectorOptions User specified Options
*/
function buildOptions(requiredOptions, connectorOptions) {
return Object.assign({}, connectorOptions, requiredOptions);
if (connectorOptions && connectorOptions.transaction && connectorOptions.transaction.isActive()) {
return Object.assign({session: connectorOptions.transaction.connection}, connectorOptions, requiredOptions);
} else {
return Object.assign({}, connectorOptions, requiredOptions);
}
}
Loading