From 595b56475667b62ba114ccd11c56bc3d07172030 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sun, 3 May 2015 09:05:26 -0700 Subject: [PATCH 1/4] insert-performance: minor tweaks to get script working again --- scripts/insert-performance.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/insert-performance.js b/scripts/insert-performance.js index eb64b31..bd3b14b 100644 --- a/scripts/insert-performance.js +++ b/scripts/insert-performance.js @@ -1,7 +1,7 @@ var Promise = require('bluebird'); -var TestClient = require('./test-client'); +var TestClient = require('../test/test-client'); var expect = require('chai').expect; -var ks = 'prepared_test'; +var ks = 'insert_performance'; var table = 'test'; var _ = require('underscore'); var util = require('util'); From d826332ff6d83066ee9fcda1eed1937f6a89503f Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 4 May 2015 10:04:32 -0700 Subject: [PATCH 2/4] type-mapper: add BIGINT support --- src/type-mapper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/type-mapper.cc b/src/type-mapper.cc index 5779924..46e6145 100644 --- a/src/type-mapper.cc +++ b/src/type-mapper.cc @@ -224,6 +224,7 @@ TypeMapper::v8_from_cassandra(v8::Local* result, CassValueType type, *result = NanNew(intValue); return true; } + case CASS_VALUE_TYPE_BIGINT: case CASS_VALUE_TYPE_COUNTER: case CASS_VALUE_TYPE_TIMESTAMP: { cass_int64_t intValue; @@ -270,7 +271,6 @@ TypeMapper::v8_from_cassandra(v8::Local* result, CassValueType type, case CASS_VALUE_TYPE_UNKNOWN: case CASS_VALUE_TYPE_CUSTOM: case CASS_VALUE_TYPE_ASCII: - case CASS_VALUE_TYPE_BIGINT: case CASS_VALUE_TYPE_DECIMAL: case CASS_VALUE_TYPE_FLOAT: case CASS_VALUE_TYPE_TEXT: From 4635fc23cdc48112b9967de74c17ae81aebcebad Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 4 May 2015 22:06:45 -0700 Subject: [PATCH 3/4] bulk-prepared: add first cut at a bulk prepared interface --- binding.gyp | 1 + lib/client.js | 6 +- src/bulk-prepared.cc | 236 +++++++++++++++++++++++++++++++++++++ src/bulk-prepared.h | 70 +++++++++++ src/cassandra-driver.cc | 2 + src/client.cc | 16 +++ src/client.h | 1 + src/query.cc | 4 +- test/bulk-prepared.spec.js | 138 ++++++++++++++++++++++ test/test-client.js | 69 +++++++++++ 10 files changed, 540 insertions(+), 3 deletions(-) create mode 100644 src/bulk-prepared.cc create mode 100644 src/bulk-prepared.h create mode 100644 test/bulk-prepared.spec.js diff --git a/binding.gyp b/binding.gyp index 0cad196..7bf69b5 100644 --- a/binding.gyp +++ b/binding.gyp @@ -65,6 +65,7 @@ "src/async-future.cc", "src/batch.cc", "src/buffer-pool.cc", + "src/bulk-prepared.cc", "src/cassandra-driver.cc", "src/client.cc", "src/logging.cc", diff --git a/lib/client.js b/lib/client.js index 25970cf..58a5075 100644 --- a/lib/client.js +++ b/lib/client.js @@ -50,7 +50,7 @@ function parseArgs() { Client.prototype._execute = function(query, params, options, callback, endCallback) { // Args should have been parsed previously if (!query || !params || !options || !callback || !endCallback) { - throw new Error("invalid arguments"); + throw new Error("execute: invalid arguments"); } var self = this; @@ -169,4 +169,8 @@ Client.prototype.new_batch = function(type) { return this.client.new_batch(type); }; +Client.prototype.new_bulk_prepared = function() { + return this.client.new_bulk_prepared(); +}; + module.exports = Client; diff --git a/src/bulk-prepared.cc b/src/bulk-prepared.cc new file mode 100644 index 0000000..68074ba --- /dev/null +++ b/src/bulk-prepared.cc @@ -0,0 +1,236 @@ +#include + +#include "bulk-prepared.h" +#include "client.h" +#include "persistent-string.h" +#include "prepared-query.h" +#include "query.h" +#include "type-mapper.h" + +#define dprintf(...) +//#define dprintf printf + +Persistent BulkPrepared::constructor; + +void BulkPrepared::Init() { + NanScope(); + + // Prepare constructor template + Local tpl = NanNew(New); + tpl->SetClassName(NanNew("BulkPrepared")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + + NODE_SET_PROTOTYPE_METHOD(tpl, "add", WRAPPED_METHOD_NAME(Add)); + NODE_SET_PROTOTYPE_METHOD(tpl, "done", WRAPPED_METHOD_NAME(Done)); + + NanAssignPersistent(constructor, tpl->GetFunction()); +} + +Local BulkPrepared::NewInstance() { + NanEscapableScope(); + + const unsigned argc = 0; + Local argv[argc] = {}; + Local cons = NanNew(constructor); + Local instance = cons->NewInstance(argc, argv); + + return NanEscapeScope(instance); +} + +NAN_METHOD(BulkPrepared::New) { + NanEscapableScope(); + + BulkPrepared* obj = new BulkPrepared(); + obj->Wrap(args.This()); + + NanReturnValue(args.This()); +} + +BulkPrepared::BulkPrepared() + : pending_(0), + success_(0), + callback_(NULL) +{ + + uv_mutex_init(&lock_); + + async_ = new uv_async_t(); + uv_async_init(uv_default_loop(), async_, on_async_ready); + async_->data = this; +} + +static void +async_destroy(uv_handle_t* handle) +{ + uv_async_t* async = (uv_async_t*)handle; + delete async; +} + +BulkPrepared::~BulkPrepared() +{ + uv_mutex_destroy(&lock_); + uv_close((uv_handle_t*)async_, async_destroy); +} + +void +BulkPrepared::set_client(const Local& client) +{ + static PersistentString client_str("client"); + handle_->Set(client_str, client); + + Client* c = node::ObjectWrap::Unwrap(client); + session_ = c->get_session(); +} + +WRAPPED_METHOD(BulkPrepared, Add) +{ + if (args.Length() < 2) { + return NanThrowError("add_prepared requires prepared and vals"); + } + + if (callback_) { + return NanThrowError("cannot add a query after done is called"); + } + + Local prepared_obj = args[0].As(); + if (! prepared_obj->IsObject() || prepared_obj->InternalFieldCount() == 0) { + return NanThrowError("add_prepared requires a valid prepared object"); + } + + PreparedQuery* prepared = node::ObjectWrap::Unwrap(prepared_obj->ToObject()); + Local params = args[1].As(); + Local hints; + + if (args.Length() > 2) { + static PersistentString hints_str("hints"); + Local options = args[2].As(); + if (options->Has(hints_str)) { + hints = options->Get(hints_str).As(); + } + } + + CassStatement* statement = prepared->prepare_statement(); + int bindingStatus = TypeMapper::bind_statement_params(statement, params, hints); + + if (bindingStatus != -1) { + char err[1024]; + sprintf(err, "error binding statement argument %d", bindingStatus); + return NanThrowError(err); + } + + uv_mutex_lock(&lock_); + pending_++; + uv_mutex_unlock(&lock_); + + CassFuture* future = cass_session_execute(session_, statement); + cass_future_set_callback(future, on_future_ready, this); + + NanReturnUndefined(); +} + +void +BulkPrepared::on_future_ready(CassFuture* future, void* data) +{ + BulkPrepared* self = (BulkPrepared*)data; + self->future_ready(future); +} + +void +BulkPrepared::future_ready(CassFuture* future) +{ + uv_mutex_lock(&lock_); + + CassError code = cass_future_error_code(future); + if (code != CASS_OK) { + CassString error = cass_future_error_message(future); + std::string error_str = std::string(error.data, error.length); + errors_.push_back(error_str); + } else { + success_++; + } + pending_--; + + // NOTE: check_if_done assumes the lock is held + check_if_done(); + + uv_mutex_unlock(&lock_); + + cass_future_free(future); +} + +WRAPPED_METHOD(BulkPrepared, Done) +{ + NanScope(); + + if (args.Length() != 1) { + return NanThrowError("done requires 1 arguments: callback"); + } + + if (session_ == NULL) { + return NanThrowError("client must be connected"); + } + + if (callback_) { + return NanThrowError("cannot call done a second time"); + } + + // Need a reference while waiting + Ref(); + + uv_mutex_lock(&lock_); + callback_ = new NanCallback(args[0].As()); + + // NOTE: check_if_done assumes the lock is held + check_if_done(); + + uv_mutex_unlock(&lock_); + + NanReturnUndefined(); +} + +void +BulkPrepared::check_if_done() +{ + // NOTE: lock_ is already held + if (callback_ == NULL || pending_ > 0) { + return; + } + + uv_async_send(async_); +} + +void +BulkPrepared::on_async_ready(uv_async_t* handle, int status) +{ + BulkPrepared* self = (BulkPrepared*)handle->data; + self->async_ready(); +} + +void +BulkPrepared::async_ready() +{ + NanScope(); + + static PersistentString success_str("success"); + static PersistentString errors_str("errors"); + + Local res = NanNew(); + res->Set(success_str, NanNew(success_)); + + if (errors_.size() != 0) { + Local errs = NanNew(); + for (size_t i = 0; i < errors_.size(); ++i) { + errs->Set(i, NanNew(errors_[i])); + } + res->Set(errors_str, errs); + } + + Handle argv[] = { + NanNull(), + res + }; + + callback_->Call(2, argv); + + Unref(); +} diff --git a/src/bulk-prepared.h b/src/bulk-prepared.h new file mode 100644 index 0000000..cd08fdd --- /dev/null +++ b/src/bulk-prepared.h @@ -0,0 +1,70 @@ +#ifndef __CASS_DRIVER_BULK_PREPARED_H__ +#define __CASS_DRIVER_BULK_PREPARED_H__ + +#include "node.h" +#include "nan.h" +#include "result.h" +#include "wrapped-method.h" +#include + +using namespace v8; + +class AsyncFuture; +class Client; + +// For cases where multiple rows are being inserted or updated as a group but +// without wrapping them in a batch operation, the caller can create a bulk +// query to wrap a bunch of operations without requiring a separate +// javascript-exposed query object for each operation. This is only suitable for +// INSERT / UPDATE queries since the results are not gathered individually but +// instead the whole operation can either succeed or fail. +class BulkPrepared: public node::ObjectWrap { +public: + // Initialize the class constructor. + static void Init(); + + // Create a new instance of the class. + static v8::Local NewInstance(); + + // Stash the reference to the parent client object and extract the pointer + // to the session. + void set_client(const v8::Local& client); + +private: + BulkPrepared(); + virtual ~BulkPrepared(); + + // The actual implementation of the constructor + static NAN_METHOD(New); + + // Start executing the given prepared query, adding it to the queue of + // pending queries. + WRAPPED_METHOD_DECL(Add); + + // Mark the end of this bulk operation. Waits for all outstanding queries + // and then issues the given callback. + WRAPPED_METHOD_DECL(Done); + + void check_if_done(); + + static void on_future_ready(CassFuture* future, void* data); + void future_ready(CassFuture* future); + + static void on_async_ready(uv_async_t* handle, int status); + void async_ready(); + + CassSession* session_; + + uv_mutex_t lock_; + + size_t pending_; // count of outstanding queries + size_t success_; // count of successful queries + std::vector errors_; // error strings from failed queries + + NanCallback* callback_; + uv_async_t* async_; + + static v8::Persistent constructor; +}; + +#endif diff --git a/src/cassandra-driver.cc b/src/cassandra-driver.cc index 40d0844..33d3596 100644 --- a/src/cassandra-driver.cc +++ b/src/cassandra-driver.cc @@ -4,6 +4,7 @@ #include #include "batch.h" +#include "bulk-prepared.h" #include "client.h" #include "logging.h" #include "prepared-query.h" @@ -20,6 +21,7 @@ void InitAll(Handle exports) { NanScope(); Batch::Init(); + BulkPrepared::Init(); Client::Init(); PreparedQuery::Init(); Query::Init(); diff --git a/src/client.cc b/src/client.cc index 6e3cac3..eb20f87 100644 --- a/src/client.cc +++ b/src/client.cc @@ -1,6 +1,7 @@ #include "client.h" #include "batch.h" +#include "bulk-prepared.h" #include "persistent-string.h" #include "prepared-query.h" #include "query.h" @@ -36,6 +37,7 @@ void Client::Init() { NODE_SET_PROTOTYPE_METHOD(tpl, "new_query", WRAPPED_METHOD_NAME(NewQuery)); NODE_SET_PROTOTYPE_METHOD(tpl, "new_prepared_query", WRAPPED_METHOD_NAME(NewPreparedQuery)); NODE_SET_PROTOTYPE_METHOD(tpl, "new_batch", WRAPPED_METHOD_NAME(NewBatch)); + NODE_SET_PROTOTYPE_METHOD(tpl, "new_bulk_prepared", WRAPPED_METHOD_NAME(NewBulkPrepared)); NanAssignPersistent(constructor, tpl->GetFunction()); } @@ -238,3 +240,17 @@ WRAPPED_METHOD(Client, NewBatch) { NanReturnValue(val); } + +WRAPPED_METHOD(Client, NewBulkPrepared) { + NanScope(); + + Local val = BulkPrepared::NewInstance(); + if (! val.IsEmpty()) { + BulkPrepared* bulk = node::ObjectWrap::Unwrap(val->ToObject()); + + Local self = Local::New(handle_); + bulk->set_client(self); + } + + NanReturnValue(val); +} diff --git a/src/client.h b/src/client.h index 65bee50..59815b5 100644 --- a/src/client.h +++ b/src/client.h @@ -33,6 +33,7 @@ class Client : public node::ObjectWrap { WRAPPED_METHOD_DECL(NewQuery); WRAPPED_METHOD_DECL(NewPreparedQuery); WRAPPED_METHOD_DECL(NewBatch); + WRAPPED_METHOD_DECL(NewBulkPrepared); void configure(v8::Local opts); diff --git a/src/query.cc b/src/query.cc index f7ac53f..1ea9f18 100644 --- a/src/query.cc +++ b/src/query.cc @@ -93,7 +93,7 @@ WRAPPED_METHOD(Query, Parse) NanScope(); if (args.Length() < 1) { - return NanThrowError("invalid arguments"); + return NanThrowError("parse: invalid arguments"); } Local query = args[0].As(); @@ -123,7 +123,7 @@ WRAPPED_METHOD(Query, Parse) WRAPPED_METHOD(Query, Bind) { if (args.Length() != 2) { - return NanThrowError("invalid arguments"); + return NanThrowError("bind: invalid arguments"); } if (statement_ == NULL || prepared_ == false) { diff --git a/test/bulk-prepared.spec.js b/test/bulk-prepared.spec.js new file mode 100644 index 0000000..71cea96 --- /dev/null +++ b/test/bulk-prepared.spec.js @@ -0,0 +1,138 @@ +var TestClient = require('./test-client'); +var Promise = require('bluebird'); +var expect = require('chai').expect; +var ks = 'bulk_test'; +var table = 'test'; +var table2 = 'test2'; +var _ = require('underscore'); +var util = require('util'); +var test_utils = require('./test-utils'); + +var fields = { + 'row': 'varchar', + 'col': 'int', + 'val': 'int' +}; + +var key = 'row, col'; +var data = test_utils.generate(100); +var client; + +describe('bulk queries', function() { + before(function() { + client = new TestClient(); + return test_utils.setup_environment(client) + .then(function() { + return client.createTable(table, fields, key); + }) + .then(function() { + return client.createTable(table2, fields, key); + }); + }); + + it('inserts data using prepared queries', function() { + var rows = _.times(10, function (i) { return 'row-' + i; }); + + var cql = util.format('INSERT INTO %s (row, col, val) VALUES (?, ?, ?)', table); + return client.prepare(cql) + .then(function(prepared) { + var bulk = client.new_bulk_prepared(); + _.each(rows, function(row) { + bulk.add(prepared, [row, 10, 1000000]); + }); + + Promise.promisifyAll(bulk); + return bulk.doneAsync(); + }) + .then(function(results) { + expect(Object.keys(results)).deep.equal(['success']); + expect(results.success).equal(10); + return client.execute('SELECT * FROM ' + table, [], {}); + }) + .then(function(results) { + expect(results.rows.length).equal(10); + }); + }); + + it('supports different prepared queries in the same bulk operation', function() { + var rows = _.times(10, function (i) { return 'row-' + i; }); + + var cqls = [ + util.format('INSERT INTO %s (row, col, val) VALUES (?, ?, ?)', table), + util.format('INSERT INTO %s (row, col, val) VALUES (?, ?, ?)', table2) + ]; + + return Promise.map(cqls, function(cql) { + return client.prepare(cql); + }) + .then(function(prepareds) { + var bulk = client.new_bulk_prepared(); + _.each(rows, function(row, i) { + bulk.add(prepareds[i % 2], [row, 10, 1000000]); + }); + + Promise.promisifyAll(bulk); + return bulk.doneAsync(); + }) + .then(function(results) { + expect(Object.keys(results)).deep.equal(['success']); + expect(results.success).equal(10); + return Promise.map([table, table2], function(table) { + return client.execute('SELECT * FROM ' + table, [], {}); + }) + }) + .then(function(results) { + expect(results.length).equal(2); + expect(results[0].rows.length).equal(10); + expect(results[1].rows.length).equal(5); + }); + }); + + it('inserts data using a bulk with a bad prepared query', function() { + var rows = _.times(10, function (i) { return 'row-' + i; }); + + var cql = util.format('INSERT INTO %s (row, col, val) VALUES (?, ?, ?)', table); + return client.prepare(cql) + .then(function(prepared) { + var bulk = client.new_bulk_prepared(); + bulk.add(prepared, ["foo", "bar", "baz"]); + Promise.promisifyAll(bulk); + return bulk.doneAsync(); + }) + .then(function(results) { + expect(results.success).equal(0); + expect(results.errors[0]).match(/Expected 4 or 0 byte int/); + }); + }); + + it('errors properly if a bogus prepared is added to a bulk', function() { + var bogus = [null, undefined, {}, 1, "foo"]; + return Promise.map(bogus, function(bad) { + return Promise.try(function() { + var bulk = client.new_bulk_prepared(); + bulk.add(bad, []); + }) + .then(function() { + throw new Error('unexpected success'); + }) + .catch(function(err) { + expect(err).match(/requires a valid prepared object/); + }); + }); + }); + + it('handles errors in a prepared query with a bogus table', function() { + var cql = util.format('SELECT * FROM %s where ROW = ? and col < ?', 'bogus'); + return client.prepare(cql) + .then(function() { + throw new Error('unexpected success'); + }) + .catch(function(err) { + expect(err).match(/unconfigured columnfamily/); + }); + }); + + after(function() { + return client.cleanup(); + }); +}); diff --git a/test/test-client.js b/test/test-client.js index 26ff433..a8b94cb 100644 --- a/test/test-client.js +++ b/test/test-client.js @@ -33,6 +33,10 @@ var TestClient = Base.extend({ return this.client.new_batch(style); }, + new_bulk_prepared: function() { + return this.client.new_bulk_prepared(); + }, + createKeyspace: function(name, replication) { replication = replication || 1; this.keyspaces.push(name); @@ -200,6 +204,71 @@ var TestClient = Base.extend({ return prepare() .then(function() { return Promise.map(batches, insert_batch, {concurrency: options.concurrency}); + }) + .then(function(results) { + return results; + }); + }, + + insertRowsBulkPrepared: function(table, data, options) { + var self = this; + var prepared; + + var count = data.length; + var keys = _.keys(data[0]); + var bulk_size = options.bulk_size || 1; + + var query = this._getInsertQuery(table, data, options); + function prepare() { + return self.client.prepareAsync(query.cql) + .then(function(p) { + prepared = p; + }); + } + + function _insert_bulk(x, bulk_i, n, cb) { + var bulk = self.client.new_bulk_prepared(); + + _.times(bulk_size, function(i) { + var d = data[(bulk_i * bulk_size) + i]; + + // The last bulk might not be full + if (d === undefined) { + return; + } + + var vals = _.map(keys, function(k) { return d[k]; }); + if (options.timestamp && options.ttl) { + vals.push(options.timestamp); + vals.push(options.ttl); + } + bulk.add(prepared, vals, {hints: query.hints}); + }); + bulk.done(cb); + } + + // Create an array with a dummy entry for each bulk just to be able to + // use Promise.map + var bulks = _.times(Math.ceil(count / bulk_size), _.noop); + + var insert_bulk = Promise.promisify(_insert_bulk); + return prepare() + .then(function() { + return Promise.map(bulks, insert_bulk, {concurrency: options.concurrency}); + }) + .then(function(results) { + _.each(results, function(result) { + var errs = result.errors || []; + var expected = Math.min(bulk_size, count); + if (expected != result.success + errs.length) { + throw new Error('result counting error: ', expected, result.success, errs.length); + } + + if (result.success != expected) { + var err = "error: inserted " + result.success + " out of " + expected; + console.error(err); + } + }); }); } }); From 9f07fc75b92c9608a5d22014310c8f23d7e7aa8e Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 4 May 2015 22:07:13 -0700 Subject: [PATCH 4/4] insert-performance: add bulk-prepared support --- scripts/insert-performance.js | 47 +++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/scripts/insert-performance.js b/scripts/insert-performance.js index bd3b14b..88519ef 100644 --- a/scripts/insert-performance.js +++ b/scripts/insert-performance.js @@ -15,7 +15,7 @@ var mode = process.argv[4]; var check_results = false; -var prepared, batch; +var prepared, batch, bulk; if (mode === "prepared") { prepared = true; batch = false; @@ -26,11 +26,20 @@ else if (mode === "batch") { if (! batch) { throw new Error('invalid batch size'); } } +else if (mode === "bulk") { + prepared = true; + bulk = parseInt(process.argv[5]); + + if (! bulk) { throw new Error('invalid bulk size'); } +} else { prepared = false; batch = false; + bulk = false; } +console.log('prepared:', prepared, 'batch:', batch, 'bulk:', bulk); + var fields = { "row": "varchar", "col": "int", @@ -57,7 +66,13 @@ function generate() { var data = _.times(count, generate); var start, end; -var client = new TestClient(); +var client = new TestClient({ + queue_size_io: 100000, + pending_requests_high_water_mark: 10000, + pending_requests_low_water_mark: 10000, + max_requests_per_flush: 1024 +}); + client.connect({address: '127.0.0.1'}) .then(function() { return client.cleanKeyspace(ks); @@ -76,6 +91,8 @@ client.connect({address: '127.0.0.1'}) start = new Date(); if (batch) { return client.insertRowsPreparedBatch(table, data, {concurrency: concurrency, batch_size: batch}); + } else if (bulk) { + return client.insertRowsBulkPrepared(table, data, {concurrency: concurrency, bulk_size: bulk}); } else if (prepared) { return client.insertRowsPrepared(table, data, {concurrency: concurrency}); } else { @@ -88,6 +105,32 @@ client.connect({address: '127.0.0.1'}) var N = count; console.log(util.format('inserted %d rows in %d ms (%d us / pt, %d points per second)', N, elapsed, 1000 * elapsed / N, Math.floor(1000 * N / elapsed))); + +}) +.then(function() { + start = new Date(); + + var N = 0; + + return client.client.queryAsync('select count(*) from ' + table, [], {autoPage: true}, + function(results) { + N = results.rows[0].count + } + ) + .then(function() { + end = new Date(); + var elapsed = end - start; + console.log(util.format('counted %d rows in %d ms (%d us / pt, %d points per second)', + N, elapsed, 1000 * elapsed / N, Math.floor(1000 * N / elapsed))); + + expect(N).equal(count); + if (check_results) { + cols = _.sortBy(cols, function(c) { return parseInt(c); }); + for (i = 0; i < cols.length; ++i) { + expect(cols[i]).equal(i); + } + } + }); }) .then(function() { start = new Date();