-
Notifications
You must be signed in to change notification settings - Fork 7
Add binary copy, prepared statements, and handling for JSON, JSONB an… #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,9 +12,9 @@ connect :: (conn_str: string) -> *PGconn, success: bool { | |
| result := PQconnectPoll(conn); | ||
| while result != PostgresPollingStatusType.PGRES_POLLING_OK && result != PostgresPollingStatusType.PGRES_POLLING_FAILED { | ||
| newResult := PQconnectPoll(conn); | ||
| // if (result != newResult) { | ||
| // print("Connecting… %\n", result); | ||
| // } | ||
| if (result != newResult) { | ||
| // print("Connecting… %\n", result); | ||
| } | ||
| result = newResult; | ||
| } | ||
|
|
||
|
|
@@ -31,6 +31,276 @@ disconnect :: (conn: *PGconn) { | |
| } | ||
| } | ||
|
|
||
| binary_copy_start :: (conn: *PGconn, schema: string, table: string) -> bool { | ||
| command := sprint("COPY %.% FROM STDIN (FORMAT binary)", schema, table); | ||
| c_command := to_c_string(command); | ||
| defer { | ||
| free(command); | ||
| free(c_command); | ||
| } | ||
|
|
||
| res := PQexec(conn, c_command); | ||
|
|
||
| if (PQresultStatus(res) != .PGRES_COPY_IN) { | ||
| PQclear(res); | ||
| log_error("Failed to open copy stream.\n"); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| binary_copy_put :: (conn: *PGconn, data: string) -> bool { | ||
| if PQputCopyData(conn, data.data, cast(s32) data.count) != 1 { | ||
| log_error("Failed to put data (% bytes)\n", data.count); | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| binary_copy_end :: (conn: *PGconn) -> bool { | ||
| res := PQputCopyEnd(conn, null); | ||
| print("Ending binary copy : %\n...", res); | ||
|
|
||
| while PQflush(conn) != 0 { | ||
| sleep_milliseconds(1000); | ||
| print("Sleeping...\n"); | ||
| } | ||
| print("Flushed!\n"); | ||
|
|
||
| result := get_last_query_result(conn); | ||
| PQclear(result); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| prepare_query :: (conn: *PGconn, name: string, command: string, args: [..] Type) -> PreparedStatement, bool { | ||
| statement : PreparedStatement; | ||
| set_allocators(*statement.pool); | ||
| push_allocator(pool_allocator_proc, *statement.pool); | ||
|
|
||
| c_command := to_c_string(command); | ||
|
|
||
| statement.name = to_c_string(name); | ||
| array_resize(*statement.param_types, args.count, initialize = false); | ||
| array_resize(*statement.param_values, args.count, initialize = false); | ||
| array_resize(*statement.param_lengths, args.count, initialize = false); | ||
| array_resize(*statement.param_formats, args.count, initialize = false); | ||
|
|
||
| for arg, index: args { | ||
| type := cast(*Type_Info) arg; | ||
|
|
||
| if type.type == { | ||
| case .INTEGER; | ||
| info := cast(*Type_Info_Integer) type; | ||
| // @ToDo: implement unsigned ints | ||
| // assert(info.signed, "Unsigned not yet supported"); | ||
| be_value := cast(*u8) alloc(info.runtime_size); | ||
| if info.runtime_size == { | ||
| case 2; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT2; | ||
| case 4; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT4; | ||
| case 8; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT8; | ||
| case; | ||
| assert(false); | ||
| } | ||
| statement.param_values[index] = be_value; | ||
| statement.param_lengths[index] = cast(s32) info.runtime_size; | ||
| statement.param_formats[index] = 1; | ||
| case .FLOAT; | ||
| be_value := cast(*u8) alloc(type.runtime_size); | ||
| if type.runtime_size == 4 { | ||
| statement.param_types[index] = cast(Oid) Pq_Type.FLOAT4; | ||
| } | ||
| if type.runtime_size == 8 { | ||
| statement.param_types[index] = cast(Oid) Pq_Type.FLOAT8; | ||
| } | ||
| statement.param_values[index] = be_value; | ||
| statement.param_lengths[index] = cast(s32) type.runtime_size; | ||
| statement.param_formats[index] = 1; | ||
|
|
||
| case .STRING; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.VARCHAR; | ||
| statement.param_formats[index] = 1; | ||
|
|
||
| case; | ||
| // @ToDo: Implement | ||
| log_error("Unsupported param type: %", arg); | ||
| return statement, false; | ||
| } | ||
| } | ||
|
|
||
| result := PQprepare(conn, statement.name, c_command, xx statement.param_types.count, statement.param_types.data); | ||
| has_results, success := check_query_result(result); | ||
|
|
||
| if success { | ||
| statement.pg_result = result; | ||
| } | ||
| return statement, success; | ||
| } | ||
|
|
||
| prepare_query :: (conn: *PGconn, name: string, command: string, args: .. Type) -> PreparedStatement, bool { | ||
| statement : PreparedStatement; | ||
| set_allocators(*statement.pool); | ||
| push_allocator(pool_allocator_proc, *statement.pool); | ||
|
|
||
| c_command := to_c_string(command); | ||
|
|
||
| statement.name = to_c_string(name); | ||
| array_resize(*statement.param_types, args.count, initialize = false); | ||
| array_resize(*statement.param_values, args.count, initialize = false); | ||
| array_resize(*statement.param_lengths, args.count, initialize = false); | ||
| array_resize(*statement.param_formats, args.count, initialize = false); | ||
|
|
||
| for arg, index: args { | ||
| type := cast(*Type_Info) arg; | ||
|
|
||
| if type.type == { | ||
| case .INTEGER; | ||
| info := cast(*Type_Info_Integer) type; | ||
| // @ToDo: implement unsigned ints | ||
| //assert(info.signed, "Unsigned not yet supported"); | ||
| be_value := cast(*u8) alloc(info.runtime_size); | ||
| if info.runtime_size == { | ||
| case 2; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT2; | ||
| case 4; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT4; | ||
| case 8; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.INT8; | ||
| case; | ||
| assert(false); | ||
| } | ||
| statement.param_values[index] = be_value; | ||
| statement.param_lengths[index] = cast(s32) info.runtime_size; | ||
| statement.param_formats[index] = 1; | ||
| case .FLOAT; | ||
| be_value := cast(*u8) alloc(type.runtime_size); | ||
| if type.runtime_size == 4 { | ||
| statement.param_types[index] = cast(Oid) Pq_Type.FLOAT4; | ||
| } | ||
| if type.runtime_size == 8 { | ||
| statement.param_types[index] = cast(Oid) Pq_Type.FLOAT8; | ||
| } | ||
| statement.param_values[index] = be_value; | ||
| statement.param_lengths[index] = cast(s32) type.runtime_size; | ||
| statement.param_formats[index] = 1; | ||
|
|
||
| case .STRING; | ||
| statement.param_types[index] = cast(Oid) Pq_Type.VARCHAR; | ||
| statement.param_formats[index] = 1; | ||
|
|
||
| case; | ||
| // @ToDo: Implement | ||
| log_error("Unsupported param type: %", arg); | ||
| return statement, false; | ||
| } | ||
| } | ||
|
|
||
| result := PQprepare(conn, statement.name, c_command, xx statement.param_types.count, statement.param_types.data); | ||
| has_results, success := check_query_result(result); | ||
|
|
||
| if success { | ||
| statement.pg_result = result; | ||
| } | ||
| return statement, success; | ||
| } | ||
|
|
||
| set_prepared_arg :: (statement: PreparedStatement, arg: Any, index: u32) { | ||
| if arg.type.type == { | ||
| case .INTEGER; | ||
| info := cast(*Type_Info_Integer) arg.type; | ||
| // @ToDo: implement unsigned ints | ||
| //assert(info.signed, "Unsigned not yet supported"); | ||
| if info.runtime_size == { | ||
| case 1; | ||
| // Postgres doesn't have 8 bit integers, but sometimes people use char for this. | ||
| be_val := hton(<<(cast(*s8) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, info.runtime_size); | ||
| case 2; | ||
| be_val := hton(<<(cast(*s16) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, info.runtime_size); | ||
| case 4; | ||
| be_val := hton(<<(cast(*s32) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, info.runtime_size); | ||
| case 8; | ||
| be_val := hton(<<(cast(*s64) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, info.runtime_size); | ||
| case; | ||
| assert(false); | ||
| } | ||
|
|
||
| case .FLOAT;; | ||
| if arg.type.runtime_size == 4 { | ||
| be_val := hton(<<(cast(*float) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, arg.type.runtime_size); | ||
| } | ||
| if arg.type.runtime_size == 8 { | ||
| be_val := hton(<<(cast(*float64) arg.value_pointer)); | ||
| memcpy(statement.param_values[index], *be_val, arg.type.runtime_size); | ||
| } | ||
|
|
||
| case .STRING; | ||
| str := cast(*string) arg.value_pointer; | ||
| statement.param_values[index] = str.data; | ||
| statement.param_lengths[index] = cast(s32) str.count; | ||
|
|
||
|
|
||
| // case .ARRAY; | ||
| // @ToDo: Implement binary format according to array_recv format. | ||
|
|
||
| // ToDo: Struct as Jsonb? | ||
|
|
||
| case; | ||
| // @ToDo: Implement | ||
| log_error("Unsupported param type: %", arg.type.type); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| execute_prepared :: (conn: *PGconn, statement: PreparedStatement) { | ||
| if statement.pg_result == null { | ||
| return; | ||
| } | ||
| PQexecPrepared(conn, statement.name, xx statement.param_values.count, statement.param_values.data, statement.param_lengths.data, statement.param_formats.data, 1); | ||
| } | ||
|
|
||
| execute_prepared_with_args :: (conn: *PGconn, statement: PreparedStatement, args: .. Any) { | ||
| if statement.pg_result == null { | ||
| return; | ||
| } | ||
|
|
||
| for arg, index: args { | ||
| set_prepared_arg(statement, arg, index); | ||
| } | ||
|
|
||
| PQexecPrepared(conn, statement.name, xx args.count, statement.param_values.data, statement.param_lengths.data, statement.param_formats.data, 1); | ||
| } | ||
|
|
||
| PreparedStatement :: struct { | ||
| pool : Pool; | ||
|
|
||
| // All memory allocated for these comes from the above pool, | ||
| name : *u8; | ||
| param_types : [..] Oid; | ||
| param_values : [..] *u8; | ||
| param_lengths : [..] s32; | ||
| param_formats : [..] s32; | ||
|
|
||
| pg_result : *PGresult; | ||
| } | ||
|
|
||
| free_statement :: (statement: PreparedStatement, conn: *PGconn) { | ||
| push_allocator(pool_allocator_proc, *statement.pool); | ||
|
|
||
| dealloc := sprint("DEALLOCATE %", to_string(statement.name)); | ||
| result, success := execute(conn, dealloc); | ||
|
|
||
| release(*statement.pool); | ||
| } | ||
|
|
||
| // Execute a statement and parse the result | ||
| execute :: (conn: *PGconn, $T: Type, command: string, args: .. Any, $ignore_unknown := false) -> [] T, success: bool { | ||
| success := send_query(conn, command, ..args); | ||
|
|
@@ -82,7 +352,7 @@ send_query :: (conn: *PGconn, command: string, args: .. Any) -> success: bool { | |
| case .INTEGER; | ||
| info := cast(*Type_Info_Integer) arg.type; | ||
| // @ToDo: implement unsigned ints | ||
| assert(info.signed, "Unsigned not yet supported"); | ||
| //assert(info.signed, "Unsigned not yet supported"); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you disable this? |
||
| be_value := cast(*u8) alloc(info.runtime_size); | ||
| if info.runtime_size == { | ||
| case 2; | ||
|
|
@@ -116,21 +386,25 @@ send_query :: (conn: *PGconn, command: string, args: .. Any) -> success: bool { | |
| be_val := hton(<<(cast(*float64) arg.value_pointer)); | ||
| memcpy(be_value, *be_val, arg.type.runtime_size); | ||
| } | ||
| param_values[index] = be_value; | ||
| param_values[index] = be_value; | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don’t reformat existing code without a good reason. |
||
| param_lengths[index] = cast(s32) arg.type.runtime_size; | ||
| param_formats[index] = 1; | ||
|
|
||
| case .STRING; | ||
| str := cast(*string) arg.value_pointer; | ||
| param_types[index] = cast(Oid) Pq_Type.TEXT; | ||
| param_values[index] = ifx str.data else ""; // This handles empty strings. Sending no data means null string | ||
| param_types[index] = cast(Oid) Pq_Type.TEXT; | ||
| param_values[index] = ifx str.data else ""; // This handles empty strings. Sending no data means null string; | ||
| param_lengths[index] = cast(s32) str.count; | ||
| param_formats[index] = 1; | ||
|
|
||
| case .BOOL; | ||
| val := cast(*bool) arg.value_pointer; | ||
| param_types[index] = cast(Oid) Pq_Type.BOOL; | ||
| param_values[index] = ifx val.* then *BOOL_TRUE_VALUE else *BOOL_FALSE_VALUE; | ||
| bool_as_num := ifx arg.value_pointer.(*bool).* then cast(u8) 1 else cast(u8) 0; | ||
| be_value := alloc(1).(*u8); | ||
| network_val := hton(bool_as_num); | ||
| memcpy(be_value, *network_val, 1); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you replace "pointer to constant" with "allocation for every bool"? |
||
|
|
||
| param_types[index] = Pq_Type.BOOL.(Oid); | ||
| param_values[index] = be_value; | ||
| param_lengths[index] = 1; | ||
| param_formats[index] = 1; | ||
|
|
||
|
|
@@ -360,13 +634,24 @@ assign_member :: (name: string, info: *Type_Info, slot: *u8, col_type: Pq_Type, | |
| case .BPCHAR; #through; | ||
| case .VARCHAR; #through; | ||
| case .NAME; #through; | ||
| case .JSON; #through; | ||
| case .TEXT; | ||
| val: string; | ||
| val.data = data; | ||
| val.count = len; | ||
| write_string_value(col, name, info, slot, col_type, val, is_custom = false); | ||
| return true; | ||
|
|
||
| // JSONB is the same response format as JSON, except the first byte is a version number. | ||
| case .JSONB; | ||
| val: string; | ||
| // version := << cast(*u8) data; | ||
| // Here we could optionally do different things depending on the version. | ||
| val.data = data + 1; | ||
| val.count = len - 1; | ||
| write_string_value(col, name, info, slot, col_type, val, is_custom = false); | ||
| return true; | ||
|
|
||
| case .DATE; | ||
| assert(len == 4); | ||
| days_since_2000_01_01 := ntoh(<< cast(*s32) data); | ||
|
|
@@ -495,7 +780,7 @@ write_string_value :: (col: int, name: string, info: *Type_Info, slot: *void, co | |
| } | ||
|
|
||
| write_integer :: (col: int, name: string, info: *Type_Info, pointer: *void, col_type: Pq_Type, value: s64) -> bool { | ||
| if info.type != Type_Info_Tag.INTEGER { | ||
| if info.type != Type_Info_Tag.INTEGER && info.type != Type_Info_Tag.ENUM { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not correct! |
||
| log_error("Error: Trying to write column % of type % as an integer into member field \"%\" of type %", col, col_type, name, info.type); | ||
| return false; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That change should be reverted.