From 91c865bce828d505ccb8493666a494b3e439fba2 Mon Sep 17 00:00:00 2001 From: Sergey Nosenko Date: Thu, 29 Jul 2021 20:15:36 +0800 Subject: [PATCH] feat: add transaction support Signed-off by: Sergey Nosenko Signed-off-by: Rifa Achrinza <25147899+achrinza@users.noreply.github.com> --- lib/mongodb.js | 159 +++++++++++++++++++++----- test/transaction.test.js | 234 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 363 insertions(+), 30 deletions(-) create mode 100644 test/transaction.test.js diff --git a/lib/mongodb.js b/lib/mongodb.js index 520cdd0a8..66bea5c9e 100644 --- a/lib/mongodb.js +++ b/lib/mongodb.js @@ -105,6 +105,13 @@ exports.initialize = function initializeDataSource(dataSource, callback) { s.safe = s.safe !== false; s.w = s.w || 1; + s.writeConcern = s.writeConcern || { + w: s.w, + wtimeout: s.wtimeout || null, + j: s.j || null, + journal: s.journal || null, + fsync: s.fsync || null, + }; s.url = s.url || generateMongoDBURL(s); s.useNewUrlParser = s.useNewUrlParser !== false; s.useUnifiedTopology = s.useUnifiedTopology !== false; @@ -251,9 +258,6 @@ MongoDB.prototype.connect = function(callback) { 'acceptableLatencyMS', 'connectWithNoPrimary', 'authSource', - 'w', - 'wtimeout', - 'j', 'forceServerObjectId', 'serializeFunctions', 'ignoreUndefined', @@ -278,13 +282,13 @@ MongoDB.prototype.connect = function(callback) { 'password', 'authMechanism', 'compression', - 'fsync', 'readPreferenceTags', 'numberOfRetries', 'auto_reconnect', 'minSize', 'useNewUrlParser', 'useUnifiedTopology', + 'writeConcern', // Ignored options 'native_parser', // Legacy options @@ -293,6 +297,11 @@ MongoDB.prototype.connect = function(callback) { 'replSet', 'mongos', 'db', + 'w', + 'wtimeout', + 'j', + 'journal', + 'fsync', ]; const lbOptions = Object.keys(self.settings); @@ -683,7 +692,7 @@ MongoDB.prototype.exists = function(modelName, id, options, callback) { debug('exists', modelName, id); } id = self.coerceId(modelName, id, options); - this.execute(modelName, 'findOne', {_id: id}, function(err, data) { + this.execute(modelName, 'findOne', {_id: id}, buildOptions({}, options), function(err, data) { if (self.debug) { debug('exists.callback', modelName, id, err, data); } @@ -704,7 +713,7 @@ MongoDB.prototype.find = function find(modelName, id, options, callback) { } const idName = self.idName(modelName); const oid = self.coerceId(modelName, id, options); - this.execute(modelName, 'findOne', {_id: oid}, function(err, data) { + this.execute(modelName, 'findOne', {_id: oid}, buildOptions({}, options), function(err, data) { if (self.debug) { debug('find.callback', modelName, id, err, data); } @@ -893,7 +902,7 @@ MongoDB.prototype.destroy = function destroy(modelName, id, options, callback) { debug('delete', modelName, id); } id = self.coerceId(modelName, id, options); - this.execute(modelName, 'deleteOne', {_id: id}, function(err, result) { + this.execute(modelName, 'deleteOne', {_id: id}, buildOptions({}, options), function(err, result) { if (self.debug) { debug('delete.callback', modelName, id, err, result); } @@ -1034,6 +1043,16 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) { query[k] = {$regex: cond}; } else { + if (isObjectIDProperty(modelCtor, propDef, cond, options)) { + if (Array.isArray(cond)) { + cond = cond.map(function(c) { + return ObjectID(c); + }); + } else { + cond = ObjectID(cond); + } + } + query[k] = {}; query[k]['$' + spec] = cond; } @@ -1044,8 +1063,15 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) { query[k] = {$type: 10}; } else { if (isObjectIDProperty(modelCtor, propDef, cond, options)) { - cond = ObjectID(cond); + if (Array.isArray(cond)) { + cond = cond.map(function(c) { + return ObjectID(c); + }); + } else { + cond = ObjectID(cond); + } } + query[k] = cond; } } @@ -1312,8 +1338,17 @@ MongoDB.prototype.convertColumnNames = function(model, data, direction) { } if (direction === 'database') { - data[columnName] = data[propName]; - delete data[propName]; + // Handle data is Array object - in case of fields filter + if (Array.isArray(data)) { + const idx = data.indexOf(propName); + if (idx !== -1) { + data.push(columnName); + delete data[idx]; + } + } else { // Handle data as Object - in case to create / update + data[columnName] = data[propName]; + delete data[propName]; + } } if (direction === 'property') { @@ -1351,17 +1386,23 @@ MongoDB.prototype.all = function all(modelName, filter, options, callback) { if (filter.where) { query = self.buildWhere(modelName, filter.where, options); } - let fields = filter.fields; + // Use Object.assign to avoid change filter.fields + // which will cause error when create model from data + let fields = undefined; + if (typeof filter.fields !== 'undefined') { + fields = []; + Object.assign(fields, filter.fields); + } // Convert custom column names fields = self.fromPropertyToDatabaseNames(modelName, fields); + options = buildOptions({}, options); + if (fields) { - const findOpts = {projection: fieldsArrayToObj(fields)}; - this.execute(modelName, 'find', query, findOpts, processResponse); - } else { - this.execute(modelName, 'find', query, processResponse); + options.projection = fieldsArrayToObj(fields); } + this.execute(modelName, 'find', query, options, processResponse); function processResponse(err, cursor) { if (err) { @@ -1461,7 +1502,7 @@ MongoDB.prototype.destroyAll = function destroyAll( where = self.buildWhere(modelName, where, options); if (debug.enabled) debug('destroyAll where %s', util.inspect(where)); - this.execute(modelName, 'deleteMany', where || {}, function(err, info) { + this.execute(modelName, 'deleteMany', where || {}, buildOptions({}, options), function(err, info) { if (err) return callback && callback(err); if (self.debug) debug('destroyAll.callback', modelName, where, err, info); @@ -1488,15 +1529,26 @@ MongoDB.prototype.count = function count(modelName, where, options, callback) { debug('count', modelName, where); } where = self.buildWhere(modelName, where, options) || {}; - const method = Object.keys(where).length === 0 ? 'estimatedDocumentCount' : 'countDocuments'; - this.execute(modelName, method, where, function(err, count) { - if (self.debug) { - debug('count.callback', modelName, err, count); - } - if (callback) { - callback(err, count); - } - }); + options = buildOptions({}, options); + if (Object.keys(where).length === 0 && !options.session) { + this.execute(modelName, 'estimatedDocumentCount', function(err, count) { + if (self.debug) { + debug('count.callback', modelName, err, count); + } + if (callback) { + callback(err, count); + } + }); + } else { + this.execute(modelName, 'countDocuments', where, options, function(err, count) { + if (self.debug) { + debug('count.callback', modelName, err, count); + } + if (callback) { + callback(err, count); + } + }); + } }; /** @@ -1538,7 +1590,7 @@ MongoDB.prototype.replaceWithOptions = function(modelName, id, data, options, cb const idName = self.idName(modelName); delete data[idName]; data = self.toDatabase(modelName, data); - this.execute(modelName, 'replaceOne', {_id: id}, data, options, function( + this.execute(modelName, 'replaceOne', {_id: id}, data, buildOptions({}, options), function( err, info, ) { @@ -1735,11 +1787,11 @@ MongoDB.prototype.upsertWithWhere = function upsertWithWhere( 'findOneAndUpdate', where, updateData, - { + buildOptions({ upsert: true, returnOriginal: false, sort: [['_id', 'asc']], - }, + }, options), function(err, result) { if (err) return cb && cb(err); @@ -2015,6 +2067,48 @@ MongoDB.prototype.ping = function(cb) { } }; +MongoDB.prototype.beginTransaction = function(isolationLevel, cb) { + // TODO: think about how to convert READ_COMMITED, etc. to transactionOptions + const transactionOptions = { + readPreference: 'primary', + readConcern: {level: 'local'}, + writeConcern: {w: 'majority'}, + }; + if (isolationLevel instanceof Object) { + Object.assign(transactionOptions, isolationLevel || {}); + } + const session = this.client.startSession(); + session.startTransaction(transactionOptions); + cb(null, session); +}; + +MongoDB.prototype.commit = function(tx, cb) { + tx.commitTransaction(function(err) { + tx.endSession(null, function(error) { + if (err) return cb(err); + if (error) return cb(error); + cb(); + }); + }); +}; + +MongoDB.prototype.rollback = function(tx, cb) { + tx.abortTransaction(function(err) { + tx.endSession(null, function(error) { + if (err) return cb(err); + if (error) return cb(error); + cb(); + }); + }); +}; + +function isInTransation(options) { + const ops = {}; + if (options && options.transaction && options.transaction.isInTransation) + ops.session = options.transaction.session; + return ops; +} + // Case insensitive check if a string looks like "ObjectID" function typeIsObjectId(input) { if (!input) return false; @@ -2072,7 +2166,8 @@ function coerceToObjectId(modelCtor, propDef, propValue) { function isObjectIDProperty(modelCtor, propDef, value, options) { if (!propDef) return false; - if (typeof value === 'string' && value.match(ObjectIdValueRegex)) { + if ((typeof value === 'string' && value.match(ObjectIdValueRegex)) || + (Array.isArray(value) && value.every((v) => v.match(ObjectIdValueRegex)))) { if (isStoredAsObjectID(propDef)) return true; else return !isStrictObjectIDCoercionEnabled(modelCtor, options); } else if (value instanceof mongodb.ObjectID) { @@ -2306,5 +2401,9 @@ function hasDataType(dataType, propertyDef) { * @param {*} connectorOptions User specified Options */ function buildOptions(requiredOptions, connectorOptions) { - return Object.assign({}, connectorOptions, requiredOptions); + if (connectorOptions && connectorOptions.transaction && connectorOptions.transaction.isActive()) { + return Object.assign({session: connectorOptions.transaction.connection}, connectorOptions, requiredOptions); + } else { + return Object.assign({}, connectorOptions, requiredOptions); + } } diff --git a/test/transaction.test.js b/test/transaction.test.js new file mode 100644 index 000000000..9055c2664 --- /dev/null +++ b/test/transaction.test.js @@ -0,0 +1,234 @@ +// Copyright IBM Corp. 2013,2019. All Rights Reserved. +// Node module: loopback-connector-mongodb +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +'use strict'; +const should = require('./init.js'); +const Transaction = require('loopback-connector/lib/transaction'); + +const juggler = require('loopback-datasource-juggler'); +let db, Post, Review; +describe.skip('transactions', function() { + before(function(done) { + // use run-rs -v 4.2.0 --host localhost --portStart 27000 to start replicaset for transaction testing + db = global.getDataSource({ + url: 'mongodb://localhost:27000,localhost:27001,localhost:27002/testdb?replicaSet=rs', + retryWrites: false, + }); + db.once('connected', function() { + Post = db.define('PostTX', { + title: {type: String, length: 255, index: true}, + content: {type: String}, + }); + Review = db.define('ReviewTX', { + author: String, + content: {type: String}, + }); + Post.hasMany(Review, {as: 'reviews', foreignKey: 'postId'}); + db.automigrate(done); + }); + }); + + let currentTx; + let hooks = []; + // Return an async function to start a transaction and create a post + function createPostInTx(post, timeout) { + return function(done) { + // Transaction.begin(db.connector, Transaction.READ_COMMITTED, + Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: timeout, + }, + function(err, tx) { + if (err) return done(err); + tx.should.have.property('id').and.be.a.String(); + hooks = []; + tx.observe('before commit', function(context, next) { + hooks.push('before commit'); + next(); + }); + tx.observe('after commit', function(context, next) { + hooks.push('after commit'); + next(); + }); + tx.observe('before rollback', function(context, next) { + hooks.push('before rollback'); + next(); + }); + tx.observe('after rollback', function(context, next) { + hooks.push('after rollback'); + next(); + }); + currentTx = tx; + Post.create(post, {transaction: tx, model: 'Post'}, + function(err, p) { + if (err) { + done(err); + } else { + p.reviews.create({ + author: 'John', + content: 'Review for ' + p.title, + }, {transaction: tx, model: 'Review'}, + function(err, c) { + done(err); + }); + } + }); + }); + }; + } + + // Return an async function to find matching posts and assert number of + // records to equal to the count + function expectToFindPosts(where, count, inTx) { + return function(done) { + const options = {model: 'Post'}; + if (inTx) { + options.transaction = currentTx; + } + Post.find({where: where}, options, + function(err, posts) { + if (err) return done(err); + posts.length.should.equal(count); + // Make sure both find() and count() behave the same way + Post.count(where, options, + function(err, result) { + if (err) return done(err); + result.should.equal(count); + if (count) { + // Find related reviews + options.model = 'Review'; + // Please note the empty {} is required, otherwise, the options + // will be treated as a filter + posts[0].reviews({}, options, function(err, reviews) { + if (err) return done(err); + reviews.length.should.equal(count); + done(); + }); + } else { + done(); + } + }); + }); + }; + } + + describe('commit', function() { + const post = {title: 't1', content: 'c1'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should commit a transaction', function(done) { + currentTx.commit(function(err) { + hooks.should.eql(['before commit', 'after commit']); + done(err); + }); + }); + + it('should see the committed insert', expectToFindPosts(post, 1)); + + it('should report error if the transaction is not active', function(done) { + currentTx.commit(function(err) { + err.should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('rollback', function() { + before(function() { + // Reset the collection + db.connector.data = {}; + }); + + const post = {title: 't2', content: 'c2'}; + before(createPostInTx(post)); + + it('should not see the uncommitted insert', expectToFindPosts(post, 0)); + + it('should see the uncommitted insert from the same transaction', + expectToFindPosts(post, 1, true)); + + it('should rollback a transaction', function(done) { + currentTx.rollback(function(err) { + hooks.should.eql(['before rollback', 'after rollback']); + done(err); + }); + }); + + it('should not see the rolledback insert', expectToFindPosts(post, 0)); + + it('should report error if the transaction is not active', function(done) { + currentTx.rollback(function(err) { + err.should.be.instanceof(Error); + done(); + }); + }); + }); + + describe('timeout', function() { + const TIMEOUT = 50; + before(function() { + // Reset the collection + db.connector.data = {}; + }); + + const post = {title: 't3', content: 'c3'}; + beforeEach(createPostInTx(post, TIMEOUT)); + + it('should report timeout', function(done) { + // wait until the "create post" transaction times out + setTimeout(runTheTest, TIMEOUT * 3); + + function runTheTest() { + Post.find({where: {title: 't3'}}, {transaction: currentTx}, + function(err, posts) { + err.should.match(/transaction.*not active/); + done(); + }); + } + }); + + it('should invoke the timeout hook', function(done) { + currentTx.observe('timeout', function(context, next) { + next(); + done(); + }); + + // If the event is not fired quickly enough, then the test can + // quickly fail - no need to wait full two seconds (Mocha's default) + this.timeout(TIMEOUT * 3); + }); + }); + + describe('isActive', function() { + it('returns true when connection is active', function(done) { + Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: 1000, + }, + function(err, tx) { + if (err) return done(err); + tx.isActive().should.equal(true); + return done(); + }); + }); + it('returns false when connection is not active', function(done) { + Post.beginTransaction({ + isolationLevel: Transaction.READ_COMMITTED, + timeout: 1000, + }, + function(err, tx) { + if (err) return done(err); + delete tx.connection; + tx.isActive().should.equal(false); + return done(); + }); + }); + }); +});