Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"src/async-future.cc",
"src/batch.cc",
"src/buffer-pool.cc",
"src/bulk-prepared.cc",
"src/cassandra-driver.cc",
"src/client.cc",
"src/logging.cc",
Expand Down
6 changes: 5 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function parseArgs() {
Client.prototype._execute = function(query, params, options, callback, endCallback) {
// Args should have been parsed previously
if (!query || !params || !options || !callback || !endCallback) {
throw new Error("invalid arguments");
throw new Error("execute: invalid arguments");
}

var self = this;
Expand Down Expand Up @@ -169,4 +169,8 @@ Client.prototype.new_batch = function(type) {
return this.client.new_batch(type);
};

Client.prototype.new_bulk_prepared = function() {
return this.client.new_bulk_prepared();
};

module.exports = Client;
51 changes: 47 additions & 4 deletions scripts/insert-performance.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var Promise = require('bluebird');
var TestClient = require('./test-client');
var TestClient = require('../test/test-client');
var expect = require('chai').expect;
var ks = 'prepared_test';
var ks = 'insert_performance';
var table = 'test';
var _ = require('underscore');
var util = require('util');
Expand All @@ -15,7 +15,7 @@ var mode = process.argv[4];

var check_results = false;

var prepared, batch;
var prepared, batch, bulk;
if (mode === "prepared") {
prepared = true;
batch = false;
Expand All @@ -26,11 +26,20 @@ else if (mode === "batch") {

if (! batch) { throw new Error('invalid batch size'); }
}
else if (mode === "bulk") {
prepared = true;
bulk = parseInt(process.argv[5]);

if (! bulk) { throw new Error('invalid bulk size'); }
}
else {
prepared = false;
batch = false;
bulk = false;
}

console.log('prepared:', prepared, 'batch:', batch, 'bulk:', bulk);

var fields = {
"row": "varchar",
"col": "int",
Expand All @@ -57,7 +66,13 @@ function generate() {
var data = _.times(count, generate);

var start, end;
var client = new TestClient();
var client = new TestClient({
queue_size_io: 100000,
pending_requests_high_water_mark: 10000,
pending_requests_low_water_mark: 10000,
max_requests_per_flush: 1024
});

client.connect({address: '127.0.0.1'})
.then(function() {
return client.cleanKeyspace(ks);
Expand All @@ -76,6 +91,8 @@ client.connect({address: '127.0.0.1'})
start = new Date();
if (batch) {
return client.insertRowsPreparedBatch(table, data, {concurrency: concurrency, batch_size: batch});
} else if (bulk) {
return client.insertRowsBulkPrepared(table, data, {concurrency: concurrency, bulk_size: bulk});
} else if (prepared) {
return client.insertRowsPrepared(table, data, {concurrency: concurrency});
} else {
Expand All @@ -88,6 +105,32 @@ client.connect({address: '127.0.0.1'})
var N = count;
console.log(util.format('inserted %d rows in %d ms (%d us / pt, %d points per second)',
N, elapsed, 1000 * elapsed / N, Math.floor(1000 * N / elapsed)));

})
.then(function() {
start = new Date();

var N = 0;

return client.client.queryAsync('select count(*) from ' + table, [], {autoPage: true},
function(results) {
N = results.rows[0].count
}
)
.then(function() {
end = new Date();
var elapsed = end - start;
console.log(util.format('counted %d rows in %d ms (%d us / pt, %d points per second)',
N, elapsed, 1000 * elapsed / N, Math.floor(1000 * N / elapsed)));

expect(N).equal(count);
if (check_results) {
cols = _.sortBy(cols, function(c) { return parseInt(c); });
for (i = 0; i < cols.length; ++i) {
expect(cols[i]).equal(i);
}
}
});
})
.then(function() {
start = new Date();
Expand Down
236 changes: 236 additions & 0 deletions src/bulk-prepared.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#include <cassandra.h>

#include "bulk-prepared.h"
#include "client.h"
#include "persistent-string.h"
#include "prepared-query.h"
#include "query.h"
#include "type-mapper.h"

#define dprintf(...)
//#define dprintf printf

Persistent<Function> BulkPrepared::constructor;

void BulkPrepared::Init() {
NanScope();

// Prepare constructor template
Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New);
tpl->SetClassName(NanNew("BulkPrepared"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);

NODE_SET_PROTOTYPE_METHOD(tpl, "add", WRAPPED_METHOD_NAME(Add));
NODE_SET_PROTOTYPE_METHOD(tpl, "done", WRAPPED_METHOD_NAME(Done));

NanAssignPersistent(constructor, tpl->GetFunction());
}

Local<Object> BulkPrepared::NewInstance() {
NanEscapableScope();

const unsigned argc = 0;
Local<Value> argv[argc] = {};
Local<Function> cons = NanNew<Function>(constructor);
Local<Object> instance = cons->NewInstance(argc, argv);

return NanEscapeScope(instance);
}

NAN_METHOD(BulkPrepared::New) {
NanEscapableScope();

BulkPrepared* obj = new BulkPrepared();
obj->Wrap(args.This());

NanReturnValue(args.This());
}

BulkPrepared::BulkPrepared()
: pending_(0),
success_(0),
callback_(NULL)
{

uv_mutex_init(&lock_);

async_ = new uv_async_t();
uv_async_init(uv_default_loop(), async_, on_async_ready);
async_->data = this;
}

static void
async_destroy(uv_handle_t* handle)
{
uv_async_t* async = (uv_async_t*)handle;
delete async;
}

BulkPrepared::~BulkPrepared()
{
uv_mutex_destroy(&lock_);
uv_close((uv_handle_t*)async_, async_destroy);
}

void
BulkPrepared::set_client(const Local<Object>& client)
{
static PersistentString client_str("client");
handle_->Set(client_str, client);

Client* c = node::ObjectWrap::Unwrap<Client>(client);
session_ = c->get_session();
}

WRAPPED_METHOD(BulkPrepared, Add)
{
if (args.Length() < 2) {
return NanThrowError("add_prepared requires prepared and vals");
}

if (callback_) {
return NanThrowError("cannot add a query after done is called");
}

Local<Object> prepared_obj = args[0].As<Object>();
if (! prepared_obj->IsObject() || prepared_obj->InternalFieldCount() == 0) {
return NanThrowError("add_prepared requires a valid prepared object");
}

PreparedQuery* prepared = node::ObjectWrap::Unwrap<PreparedQuery>(prepared_obj->ToObject());
Local<Array> params = args[1].As<Array>();
Local<Array> hints;

if (args.Length() > 2) {
static PersistentString hints_str("hints");
Local<Object> options = args[2].As<Object>();
if (options->Has(hints_str)) {
hints = options->Get(hints_str).As<Array>();
}
}

CassStatement* statement = prepared->prepare_statement();
int bindingStatus = TypeMapper::bind_statement_params(statement, params, hints);

if (bindingStatus != -1) {
char err[1024];
sprintf(err, "error binding statement argument %d", bindingStatus);
return NanThrowError(err);
}

uv_mutex_lock(&lock_);
pending_++;
uv_mutex_unlock(&lock_);

CassFuture* future = cass_session_execute(session_, statement);
cass_future_set_callback(future, on_future_ready, this);

NanReturnUndefined();
}

void
BulkPrepared::on_future_ready(CassFuture* future, void* data)
{
BulkPrepared* self = (BulkPrepared*)data;
self->future_ready(future);
}

void
BulkPrepared::future_ready(CassFuture* future)
{
uv_mutex_lock(&lock_);

CassError code = cass_future_error_code(future);
if (code != CASS_OK) {
CassString error = cass_future_error_message(future);
std::string error_str = std::string(error.data, error.length);
errors_.push_back(error_str);
} else {
success_++;
}
pending_--;

// NOTE: check_if_done assumes the lock is held
check_if_done();

uv_mutex_unlock(&lock_);

cass_future_free(future);
}

WRAPPED_METHOD(BulkPrepared, Done)
{
NanScope();

if (args.Length() != 1) {
return NanThrowError("done requires 1 arguments: callback");
}

if (session_ == NULL) {
return NanThrowError("client must be connected");
}

if (callback_) {
return NanThrowError("cannot call done a second time");
}

// Need a reference while waiting
Ref();

uv_mutex_lock(&lock_);
callback_ = new NanCallback(args[0].As<Function>());

// NOTE: check_if_done assumes the lock is held
check_if_done();

uv_mutex_unlock(&lock_);

NanReturnUndefined();
}

void
BulkPrepared::check_if_done()
{
// NOTE: lock_ is already held
if (callback_ == NULL || pending_ > 0) {
return;
}

uv_async_send(async_);
}

void
BulkPrepared::on_async_ready(uv_async_t* handle, int status)
{
BulkPrepared* self = (BulkPrepared*)handle->data;
self->async_ready();
}

void
BulkPrepared::async_ready()
{
NanScope();

static PersistentString success_str("success");
static PersistentString errors_str("errors");

Local<Object> res = NanNew<Object>();
res->Set(success_str, NanNew<Number>(success_));

if (errors_.size() != 0) {
Local<Array> errs = NanNew<Array>();
for (size_t i = 0; i < errors_.size(); ++i) {
errs->Set(i, NanNew<String>(errors_[i]));
}
res->Set(errors_str, errs);
}

Handle<Value> argv[] = {
NanNull(),
res
};

callback_->Call(2, argv);

Unref();
}
Loading