diff --git a/module.jai b/module.jai index 5b31fa0..1d1a3a8 100644 --- a/module.jai +++ b/module.jai @@ -12,9 +12,9 @@ connect :: (conn_str: string) -> *PGconn, success: bool { result := PQconnectPoll(conn); while result != PostgresPollingStatusType.PGRES_POLLING_OK && result != PostgresPollingStatusType.PGRES_POLLING_FAILED { newResult := PQconnectPoll(conn); - // if (result != newResult) { - // print("Connecting… %\n", result); - // } + if (result != newResult) { + // print("Connecting… %\n", result); + } result = newResult; } @@ -31,6 +31,276 @@ disconnect :: (conn: *PGconn) { } } +binary_copy_start :: (conn: *PGconn, schema: string, table: string) -> bool { + command := sprint("COPY %.% FROM STDIN (FORMAT binary)", schema, table); + c_command := to_c_string(command); + defer { + free(command); + free(c_command); + } + + res := PQexec(conn, c_command); + + if (PQresultStatus(res) != .PGRES_COPY_IN) { + PQclear(res); + log_error("Failed to open copy stream.\n"); + return false; + } +} + +binary_copy_put :: (conn: *PGconn, data: string) -> bool { + if PQputCopyData(conn, data.data, cast(s32) data.count) != 1 { + log_error("Failed to put data (% bytes)\n", data.count); + return false; + } + + return true; +} + +binary_copy_end :: (conn: *PGconn) -> bool { + res := PQputCopyEnd(conn, null); + print("Ending binary copy : %\n...", res); + + while PQflush(conn) != 0 { + sleep_milliseconds(1000); + print("Sleeping...\n"); + } + print("Flushed!\n"); + + result := get_last_query_result(conn); + PQclear(result); + + return true; +} + +prepare_query :: (conn: *PGconn, name: string, command: string, args: [..] Type) -> PreparedStatement, bool { + statement : PreparedStatement; + set_allocators(*statement.pool); + push_allocator(pool_allocator_proc, *statement.pool); + + c_command := to_c_string(command); + + statement.name = to_c_string(name); + array_resize(*statement.param_types, args.count, initialize = false); + array_resize(*statement.param_values, args.count, initialize = false); + array_resize(*statement.param_lengths, args.count, initialize = false); + array_resize(*statement.param_formats, args.count, initialize = false); + + for arg, index: args { + type := cast(*Type_Info) arg; + + if type.type == { + case .INTEGER; + info := cast(*Type_Info_Integer) type; + // @ToDo: implement unsigned ints + // assert(info.signed, "Unsigned not yet supported"); + be_value := cast(*u8) alloc(info.runtime_size); + if info.runtime_size == { + case 2; + statement.param_types[index] = cast(Oid) Pq_Type.INT2; + case 4; + statement.param_types[index] = cast(Oid) Pq_Type.INT4; + case 8; + statement.param_types[index] = cast(Oid) Pq_Type.INT8; + case; + assert(false); + } + statement.param_values[index] = be_value; + statement.param_lengths[index] = cast(s32) info.runtime_size; + statement.param_formats[index] = 1; + case .FLOAT; + be_value := cast(*u8) alloc(type.runtime_size); + if type.runtime_size == 4 { + statement.param_types[index] = cast(Oid) Pq_Type.FLOAT4; + } + if type.runtime_size == 8 { + statement.param_types[index] = cast(Oid) Pq_Type.FLOAT8; + } + statement.param_values[index] = be_value; + statement.param_lengths[index] = cast(s32) type.runtime_size; + statement.param_formats[index] = 1; + + case .STRING; + statement.param_types[index] = cast(Oid) Pq_Type.VARCHAR; + statement.param_formats[index] = 1; + + case; + // @ToDo: Implement + log_error("Unsupported param type: %", arg); + return statement, false; + } + } + + result := PQprepare(conn, statement.name, c_command, xx statement.param_types.count, statement.param_types.data); + has_results, success := check_query_result(result); + + if success { + statement.pg_result = result; + } + return statement, success; +} + +prepare_query :: (conn: *PGconn, name: string, command: string, args: .. Type) -> PreparedStatement, bool { + statement : PreparedStatement; + set_allocators(*statement.pool); + push_allocator(pool_allocator_proc, *statement.pool); + + c_command := to_c_string(command); + + statement.name = to_c_string(name); + array_resize(*statement.param_types, args.count, initialize = false); + array_resize(*statement.param_values, args.count, initialize = false); + array_resize(*statement.param_lengths, args.count, initialize = false); + array_resize(*statement.param_formats, args.count, initialize = false); + + for arg, index: args { + type := cast(*Type_Info) arg; + + if type.type == { + case .INTEGER; + info := cast(*Type_Info_Integer) type; + // @ToDo: implement unsigned ints + //assert(info.signed, "Unsigned not yet supported"); + be_value := cast(*u8) alloc(info.runtime_size); + if info.runtime_size == { + case 2; + statement.param_types[index] = cast(Oid) Pq_Type.INT2; + case 4; + statement.param_types[index] = cast(Oid) Pq_Type.INT4; + case 8; + statement.param_types[index] = cast(Oid) Pq_Type.INT8; + case; + assert(false); + } + statement.param_values[index] = be_value; + statement.param_lengths[index] = cast(s32) info.runtime_size; + statement.param_formats[index] = 1; + case .FLOAT; + be_value := cast(*u8) alloc(type.runtime_size); + if type.runtime_size == 4 { + statement.param_types[index] = cast(Oid) Pq_Type.FLOAT4; + } + if type.runtime_size == 8 { + statement.param_types[index] = cast(Oid) Pq_Type.FLOAT8; + } + statement.param_values[index] = be_value; + statement.param_lengths[index] = cast(s32) type.runtime_size; + statement.param_formats[index] = 1; + + case .STRING; + statement.param_types[index] = cast(Oid) Pq_Type.VARCHAR; + statement.param_formats[index] = 1; + + case; + // @ToDo: Implement + log_error("Unsupported param type: %", arg); + return statement, false; + } + } + + result := PQprepare(conn, statement.name, c_command, xx statement.param_types.count, statement.param_types.data); + has_results, success := check_query_result(result); + + if success { + statement.pg_result = result; + } + return statement, success; +} + +set_prepared_arg :: (statement: PreparedStatement, arg: Any, index: u32) { + if arg.type.type == { + case .INTEGER; + info := cast(*Type_Info_Integer) arg.type; + // @ToDo: implement unsigned ints + //assert(info.signed, "Unsigned not yet supported"); + if info.runtime_size == { + case 1; + // Postgres doesn't have 8 bit integers, but sometimes people use char for this. + be_val := hton(<<(cast(*s8) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, info.runtime_size); + case 2; + be_val := hton(<<(cast(*s16) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, info.runtime_size); + case 4; + be_val := hton(<<(cast(*s32) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, info.runtime_size); + case 8; + be_val := hton(<<(cast(*s64) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, info.runtime_size); + case; + assert(false); + } + + case .FLOAT;; + if arg.type.runtime_size == 4 { + be_val := hton(<<(cast(*float) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, arg.type.runtime_size); + } + if arg.type.runtime_size == 8 { + be_val := hton(<<(cast(*float64) arg.value_pointer)); + memcpy(statement.param_values[index], *be_val, arg.type.runtime_size); + } + + case .STRING; + str := cast(*string) arg.value_pointer; + statement.param_values[index] = str.data; + statement.param_lengths[index] = cast(s32) str.count; + + + // case .ARRAY; + // @ToDo: Implement binary format according to array_recv format. + + // ToDo: Struct as Jsonb? + + case; + // @ToDo: Implement + log_error("Unsupported param type: %", arg.type.type); + return; + } +} + + +execute_prepared :: (conn: *PGconn, statement: PreparedStatement) { + if statement.pg_result == null { + return; + } + PQexecPrepared(conn, statement.name, xx statement.param_values.count, statement.param_values.data, statement.param_lengths.data, statement.param_formats.data, 1); +} + +execute_prepared_with_args :: (conn: *PGconn, statement: PreparedStatement, args: .. Any) { + if statement.pg_result == null { + return; + } + + for arg, index: args { + set_prepared_arg(statement, arg, index); + } + + PQexecPrepared(conn, statement.name, xx args.count, statement.param_values.data, statement.param_lengths.data, statement.param_formats.data, 1); +} + +PreparedStatement :: struct { + pool : Pool; + + // All memory allocated for these comes from the above pool, + name : *u8; + param_types : [..] Oid; + param_values : [..] *u8; + param_lengths : [..] s32; + param_formats : [..] s32; + + pg_result : *PGresult; +} + +free_statement :: (statement: PreparedStatement, conn: *PGconn) { + push_allocator(pool_allocator_proc, *statement.pool); + + dealloc := sprint("DEALLOCATE %", to_string(statement.name)); + result, success := execute(conn, dealloc); + + release(*statement.pool); +} + // Execute a statement and parse the result execute :: (conn: *PGconn, $T: Type, command: string, args: .. Any, $ignore_unknown := false) -> [] T, success: bool { success := send_query(conn, command, ..args); @@ -82,7 +352,7 @@ send_query :: (conn: *PGconn, command: string, args: .. Any) -> success: bool { case .INTEGER; info := cast(*Type_Info_Integer) arg.type; // @ToDo: implement unsigned ints - assert(info.signed, "Unsigned not yet supported"); + //assert(info.signed, "Unsigned not yet supported"); be_value := cast(*u8) alloc(info.runtime_size); if info.runtime_size == { case 2; @@ -116,21 +386,25 @@ send_query :: (conn: *PGconn, command: string, args: .. Any) -> success: bool { be_val := hton(<<(cast(*float64) arg.value_pointer)); memcpy(be_value, *be_val, arg.type.runtime_size); } - param_values[index] = be_value; + param_values[index] = be_value; param_lengths[index] = cast(s32) arg.type.runtime_size; param_formats[index] = 1; case .STRING; str := cast(*string) arg.value_pointer; - param_types[index] = cast(Oid) Pq_Type.TEXT; - param_values[index] = ifx str.data else ""; // This handles empty strings. Sending no data means null string + param_types[index] = cast(Oid) Pq_Type.TEXT; + param_values[index] = ifx str.data else ""; // This handles empty strings. Sending no data means null string; param_lengths[index] = cast(s32) str.count; param_formats[index] = 1; case .BOOL; - val := cast(*bool) arg.value_pointer; - param_types[index] = cast(Oid) Pq_Type.BOOL; - param_values[index] = ifx val.* then *BOOL_TRUE_VALUE else *BOOL_FALSE_VALUE; + bool_as_num := ifx arg.value_pointer.(*bool).* then cast(u8) 1 else cast(u8) 0; + be_value := alloc(1).(*u8); + network_val := hton(bool_as_num); + memcpy(be_value, *network_val, 1); + + param_types[index] = Pq_Type.BOOL.(Oid); + param_values[index] = be_value; param_lengths[index] = 1; param_formats[index] = 1; @@ -360,6 +634,7 @@ assign_member :: (name: string, info: *Type_Info, slot: *u8, col_type: Pq_Type, case .BPCHAR; #through; case .VARCHAR; #through; case .NAME; #through; + case .JSON; #through; case .TEXT; val: string; val.data = data; @@ -367,6 +642,16 @@ assign_member :: (name: string, info: *Type_Info, slot: *u8, col_type: Pq_Type, write_string_value(col, name, info, slot, col_type, val, is_custom = false); return true; + // JSONB is the same response format as JSON, except the first byte is a version number. + case .JSONB; + val: string; + // version := << cast(*u8) data; + // Here we could optionally do different things depending on the version. + val.data = data + 1; + val.count = len - 1; + write_string_value(col, name, info, slot, col_type, val, is_custom = false); + return true; + case .DATE; assert(len == 4); days_since_2000_01_01 := ntoh(<< cast(*s32) data); @@ -495,7 +780,7 @@ write_string_value :: (col: int, name: string, info: *Type_Info, slot: *void, co } write_integer :: (col: int, name: string, info: *Type_Info, pointer: *void, col_type: Pq_Type, value: s64) -> bool { - if info.type != Type_Info_Tag.INTEGER { + if info.type != Type_Info_Tag.INTEGER && info.type != Type_Info_Tag.ENUM { log_error("Error: Trying to write column % of type % as an integer into member field \"%\" of type %", col, col_type, name, info.type); return false; }