diff --git a/.bumpversion.toml b/.bumpversion.toml index afd41c23..92d08eaa 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "3.0.0" +current_version = "4.0.0" commit = false tag = false diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 656b4177..af23930c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,48 @@ Changelog ========= +4.0.0 (2025-10-17) +------------------ + +New Breaking Change Feature +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +From QuestDB 9.1.0 onwards you can use ``CREATE TABLE`` SQL statements with +``TIMESTAMP_NS`` column types, or rely on column auto-creation. + +This client release adds support for sending nanoseconds timestamps to the +server without loss of precision. + +This release does not introduce new APIs, instead enhancing the sender/buffer's +``.row()`` API to additionally accept nanosecond precision. + +.. code-block:: python + + conf = 'http::addr=localhost:9000;' + # or `conf = 'tcp::addr=localhost:9009;protocol_version=2;'` + with Sender.from_conf(conf) as sender: + sender.row( + 'trade_executions', + symbols={ + 'product': 'VOD.L', + 'parent_order': '65d1ba36-390e-49a2-93e3-a05ef004b5ff' + 'side': 'buy'}, + columns={ + 'order_sent': TimestampNanos(1759246702031355012)}, + at=TimestampNanos(1759246702909423071)) + +If you're using dataframes, nanosecond timestamps are now also transferred with +full precision. + +The change is backwards compatible with older QuestDB releases which will simply +continue using the ``TIMESTAMP`` column, even when nanoseconds are specified in +the client. + +This is a breaking change because it introduces new breaking timestamp +`column auto-creation ` +behaviour. For full details and upgrade advice, see the +`nanosecond PR on GitHub `_. + 3.0.0 (2025-07-07) ------------------ diff --git a/README.rst b/README.rst index 11a989d4..97352457 100644 --- a/README.rst +++ b/README.rst @@ -18,7 +18,7 @@ and full-connection encryption with Install ======= -The latest version of the library is **3.0.0** (`changelog `_). +The latest version of the library is 4.0.0 (`changelog `_). :: diff --git a/c-questdb-client b/c-questdb-client index 924bc390..05d9ada6 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 924bc3905388d24dbebb31dfe326fd64123cf52f +Subproject commit 05d9ada6d3c9ad48e16d66a4ba1cf37ed4a80f72 diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 713816e2..98842056 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -28,7 +28,7 @@ stages: pool: name: $(poolName) vmImage: $(imageName) - timeoutInMinutes: 45 + timeoutInMinutes: 90 steps: - checkout: self fetchDepth: 1 @@ -63,18 +63,18 @@ stages: - script: python3 proj.py test 1 displayName: "Test vs released" env: - JAVA_HOME: $(JAVA_HOME_11_X64) + JAVA_HOME: $(JAVA_HOME_17_X64) - script: python3 proj.py test 1 displayName: "Test vs master" env: - JAVA_HOME: $(JAVA_HOME_11_X64) + JAVA_HOME: $(JAVA_HOME_17_X64) QDB_REPO_PATH: './questdb' condition: eq(variables.vsQuestDbMaster, true) - job: TestsAgainstVariousNumpyVersion1x pool: name: "Azure Pipelines" vmImage: "ubuntu-latest" - timeoutInMinutes: 45 + timeoutInMinutes: 90 steps: - checkout: self fetchDepth: 1 @@ -98,7 +98,7 @@ stages: pool: name: "Azure Pipelines" vmImage: "ubuntu-latest" - timeoutInMinutes: 45 + timeoutInMinutes: 90 steps: - checkout: self fetchDepth: 1 diff --git a/docs/conf.py b/docs/conf.py index 3fce6413..bb645be2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -28,7 +28,7 @@ year = '2024' author = 'QuestDB' copyright = '{0}, {1}'.format(year, author) -version = release = '3.0.0' +version = release = '4.0.0' github_repo_url = 'https://github.com/questdb/py-questdb-client' diff --git a/pyproject.toml b/pyproject.toml index 72487a56..64821079 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ # See: https://packaging.python.org/en/latest/specifications/declaring-project-metadata/ name = "questdb" requires-python = ">=3.9" -version = "3.0.0" +version = "4.0.0" description = "QuestDB client library for Python" readme = "README.rst" classifiers = [ @@ -65,6 +65,10 @@ skip = [ # Skip all 32-bit builds, except for Windows. # Those builds are named `*win32*` in cibuildwheel. "*i686*", + + # Skip Python 3.14 builds until the dependencies catch up + "cp314-*", + "cp314t-*" ] # [tool.cibuildwheel.windows] diff --git a/setup.py b/setup.py index da7b9ecb..2605e773 100755 --- a/setup.py +++ b/setup.py @@ -171,7 +171,7 @@ def readme(): setup( name='questdb', - version='3.0.0', + version='4.0.0', platforms=['any'], python_requires='>=3.8', install_requires=[], diff --git a/src/questdb/__init__.py b/src/questdb/__init__.py index 4eb28e38..d6497a81 100644 --- a/src/questdb/__init__.py +++ b/src/questdb/__init__.py @@ -1 +1 @@ -__version__ = '3.0.0' +__version__ = '4.0.0' diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 0c620f17..d9139de8 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -104,7 +104,7 @@ cnp.import_array() # This value is automatically updated by the `bump2version` tool. # If you need to update it, also update the search definition in # .bumpversion.cfg. -VERSION = '3.0.0' +VERSION = '4.0.0' WARN_HIGH_RECONNECTS = True @@ -648,7 +648,7 @@ cdef class SenderTransaction: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime.datetime, numpy.ndarray]] + Union[None, bool, int, float, str, TimestampMicros, TimestampNanos, datetime.datetime, numpy.ndarray]] ]=None, at: Union[ServerTimestampType, TimestampNanos, datetime.datetime]): """ @@ -962,12 +962,18 @@ cdef class Buffer: if not line_sender_buffer_column_str(self._impl, c_name, c_value, &err): raise c_err_to_py(err) - cdef inline void_int _column_ts( + cdef inline void_int _column_ts_micros( self, line_sender_column_name c_name, TimestampMicros ts) except -1: cdef line_sender_error* err = NULL if not line_sender_buffer_column_ts_micros(self._impl, c_name, ts._value, &err): raise c_err_to_py(err) + cdef inline void_int _column_ts_nanos( + self, line_sender_column_name c_name, TimestampNanos ts) except -1: + cdef line_sender_error* err = NULL + if not line_sender_buffer_column_ts_nanos(self._impl, c_name, ts._value, &err): + raise c_err_to_py(err) + cdef inline void_int _column_numpy( self, line_sender_column_name c_name, cnp.ndarray arr) except -1: if cnp.PyArray_TYPE(arr) != cnp.NPY_FLOAT64: @@ -1004,6 +1010,8 @@ cdef class Buffer: cdef inline void_int _column_dt( self, line_sender_column_name c_name, cp_datetime dt) except -1: cdef line_sender_error* err = NULL + # We limit ourselves to micros, since this is the maxium precision + # exposed by the datetime library in Python. if not line_sender_buffer_column_ts_micros( self._impl, c_name, datetime_to_micros(dt), &err): raise c_err_to_py(err) @@ -1020,7 +1028,9 @@ cdef class Buffer: elif PyUnicode_CheckExact(value): self._column_str(c_name, value) elif isinstance(value, TimestampMicros): - self._column_ts(c_name, value) + self._column_ts_micros(c_name, value) + elif isinstance(value, TimestampNanos): + self._column_ts_nanos(c_name, value) elif PyArray_CheckExact( value): self._column_numpy(c_name, value) elif isinstance(value, cp_datetime): @@ -1045,15 +1055,20 @@ cdef class Buffer: if sender != NULL: may_flush_on_row_complete(self, sender) - cdef inline void_int _at_ts(self, TimestampNanos ts) except -1: + cdef inline void_int _at_ts_us(self, TimestampMicros ts) except -1: + cdef line_sender_error* err = NULL + if not line_sender_buffer_at_micros(self._impl, ts._value, &err): + raise c_err_to_py(err) + + cdef inline void_int _at_ts_ns(self, TimestampNanos ts) except -1: cdef line_sender_error* err = NULL if not line_sender_buffer_at_nanos(self._impl, ts._value, &err): raise c_err_to_py(err) cdef inline void_int _at_dt(self, cp_datetime dt) except -1: - cdef int64_t value = datetime_to_nanos(dt) + cdef int64_t value = datetime_to_micros(dt) cdef line_sender_error* err = NULL - if not line_sender_buffer_at_nanos(self._impl, value, &err): + if not line_sender_buffer_at_micros(self._impl, value, &err): raise c_err_to_py(err) cdef inline void_int _at_now(self) except -1: @@ -1064,8 +1079,10 @@ cdef class Buffer: cdef inline void_int _at(self, object ts) except -1: if ts is None: self._at_now() + elif isinstance(ts, TimestampMicros): + self._at_ts_us(ts) elif isinstance(ts, TimestampNanos): - self._at_ts(ts) + self._at_ts_ns(ts) elif isinstance(ts, cp_datetime): self._at_dt(ts) else: @@ -1115,7 +1132,7 @@ cdef class Buffer: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime.datetime, numpy.ndarray]] + Union[None, bool, int, float, str, TimestampMicros, TimestampNanos, datetime.datetime, numpy.ndarray]] ]=None, at: Union[ServerTimestampType, TimestampNanos, datetime.datetime]): """ diff --git a/test/system_test.py b/test/system_test.py index 4de3254f..76805205 100755 --- a/test/system_test.py +++ b/test/system_test.py @@ -27,7 +27,7 @@ import questdb.ingress as qi -QUESTDB_VERSION = '8.3.2' +QUESTDB_VERSION = '9.1.0' QUESTDB_PLAIN_INSTALL_PATH = None QUESTDB_AUTH_INSTALL_PATH = None FIRST_ARRAY_RELEASE = (8, 4, 0) @@ -211,13 +211,18 @@ def test_http(self): return resp = self.qdb_plain.retry_check_table(table_name, min_rows=3) + + # Re-enable the line below once https://github.com/questdb/questdb/pull/6220 is merged + # exp_ts_type = 'TIMESTAMP' if self.qdb_plain.version <= (9, 1, 0) else 'TIMESTAMP_NS' + exp_ts_type = 'TIMESTAMP' + exp_columns = [ {'name': 'name_a', 'type': 'SYMBOL'}, {'name': 'name_b', 'type': 'BOOLEAN'}, {'name': 'name_c', 'type': 'LONG'}, {'name': 'name_d', 'type': 'DOUBLE'}, {'name': 'name_e', 'type': 'VARCHAR'}, - {'name': 'timestamp', 'type': 'TIMESTAMP'}] + {'name': 'timestamp', 'type': exp_ts_type}] self.assertEqual(resp['columns'], exp_columns) exp_dataset = [ # Comparison excludes timestamp column. diff --git a/test/test.py b/test/test.py index cb0267aa..1ef5c05a 100755 --- a/test/test.py +++ b/test/test.py @@ -14,7 +14,10 @@ import patch_path -from test_tools import _float_binary_bytes, _array_binary_bytes +from test_tools import ( + _float_binary_bytes, + _array_binary_bytes, + TimestampEncodingMixin) PROJ_ROOT = patch_path.PROJ_ROOT sys.path.append(str(PROJ_ROOT / 'c-questdb-client' / 'system_test')) @@ -72,7 +75,24 @@ class TestBases: The discoverable subclasses can drive extra parameters. """ - class TestBuffer(unittest.TestCase): + class TestBuffer(unittest.TestCase, TimestampEncodingMixin): + def _test_buffer_row_ts(self, ts): + buffer = qi.Buffer(protocol_version=self.version) + buffer.row('trades', columns={'t': ts}, at=ts) + ec = self.enc_ts + ed = self.enc_des_ts + exp = f'trades t={ec(ts)} {ed(ts)}\n'.encode() + self.assertEqual(bytes(buffer), exp) + + def test_buffer_row_ts_micros(self): + self._test_buffer_row_ts(qi.TimestampMicros(10001)) + + def test_buffer_row_ts_nanos(self): + self._test_buffer_row_ts(qi.TimestampNanos(10000333)) + + def test_buffer_row_ts_datetime(self): + self._test_buffer_row_ts(datetime.datetime.now()) + def test_buffer_row_at_disallows_none(self): with self.assertRaisesRegex( qi.IngressError, @@ -149,11 +169,14 @@ def test_column(self): 'col4': 0.5, 'col5': 'val', 'col6': qi.TimestampMicros(12345), - 'col7': two_h_after_epoch, - 'col8': None}, at=qi.ServerTimestamp) + 'col7': qi.TimestampNanos(12345678), + 'col8': two_h_after_epoch, + 'col9': None}, at=qi.ServerTimestamp) + et = self.enc_ts_t + en = self.enc_ts_n exp = ( b'tbl1 col1=t,col2=f,col3=-1i,col4' + _float_binary_bytes(0.5, self.version == 1) + - b',col5="val",col6=12345t,col7=7200000000t\n') + f',col5="val",col6={et(12345)},col7={en(12345678)},col8={et(7200000000)}\n'.encode()) self.assertEqual(bytes(buf), exp) def test_none_symbol(self): @@ -300,7 +323,7 @@ def test_int_range(self): with self.assertRaises(OverflowError): buf.row('tbl1', columns={'num': -2 ** 63 - 1}, at=qi.ServerTimestamp) - class TestSender(unittest.TestCase): + class TestSender(unittest.TestCase, TimestampEncodingMixin): def test_transaction_row_at_disallows_none(self): with HttpServer() as server, self.builder('http', '127.0.0.1', server.port) as sender: with self.assertRaisesRegex( @@ -384,8 +407,8 @@ def test_basic(self): msgs = server.recv() self.assertEqual(msgs, [ (b'tab1,t1=val1,t2=val2 ' - b'f1=t,f2=12345i,f3' + _float_binary_bytes(10.75) + b',f4="val3" ' - b'111222233333'), + b'f1=t,f2=12345i,f3' + _float_binary_bytes(10.75) + b',f4="val3" ' + + self.enc_des_ts_n(111222233333, v=2).encode()), b'tab1,tag3=value\\ 3,tag4=value:4 field5=f']) def test_bad_protocol_versions(self): @@ -513,8 +536,8 @@ def test_two_rows_explicit_buffer(self): columns={'price': '111222233343i', 'qty': 2.5}, at=qi.TimestampNanos(111222233343)) exp = ( - b'line_sender_buffer_example2,id=Hola price="111222233333i",qty' + _float_binary_bytes(3.5) + b' 111222233333\n' - b'line_sender_example,id=Adios price="111222233343i",qty' + _float_binary_bytes(2.5) + b' 111222233343\n') + b'line_sender_buffer_example2,id=Hola price="111222233333i",qty' + _float_binary_bytes(3.5) + b' 111222233333n\n' + b'line_sender_example,id=Adios price="111222233343i",qty' + _float_binary_bytes(2.5) + b' 111222233343n\n') self.assertEqual(bytes(buffer), exp) sender.flush(buffer) msgs = server.recv() @@ -755,9 +778,10 @@ def test_transaction_over_tcp(self): def test_transaction_basic(self): ts = qi.TimestampNanos.now() + e = lambda ts: self.enc_des_ts(ts, v=2) expected = ( - f'table_name,sym1=val1 {ts.value}\n' + - f'table_name,sym2=val2 {ts.value}\n').encode('utf-8') + f'table_name,sym1=val1 {e(ts)}\n' + + f'table_name,sym2=val2 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port) as sender: with sender.transaction('table_name') as txn: self.assertIs(txn.row(symbols={'sym1': 'val1'}, at=ts), txn) @@ -768,9 +792,10 @@ def test_transaction_basic(self): @unittest.skipIf(not pd, 'pandas not installed') def test_transaction_basic_df(self): ts = qi.TimestampNanos.now() + e = lambda num: self.enc_des_ts(num, v=2) expected = ( - f'table_name,sym1=val1 {ts.value}\n' + - f'table_name,sym2=val2 {ts.value}\n').encode('utf-8') + f'table_name,sym1=val1 {e(ts)}\n' + + f'table_name,sym2=val2 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port) as sender: with sender.transaction('table_name') as txn: df = pd.DataFrame({'sym1': ['val1', None], 'sym2': [None, 'val2']}) @@ -780,9 +805,10 @@ def test_transaction_basic_df(self): def test_transaction_no_auto_flush(self): ts = qi.TimestampNanos.now() + e = lambda ts: self.enc_des_ts(ts, v=2) expected = ( - f'table_name,sym1=val1 {ts.value}\n' + - f'table_name,sym2=val2 {ts.value}\n').encode('utf-8') + f'table_name,sym1=val1 {e(ts)}\n' + + f'table_name,sym2=val2 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port, auto_flush=False) as sender: with sender.transaction('table_name') as txn: txn.row(symbols={'sym1': 'val1'}, at=ts) @@ -793,9 +819,10 @@ def test_transaction_no_auto_flush(self): @unittest.skipIf(not pd, 'pandas not installed') def test_transaction_no_auto_flush_df(self): ts = qi.TimestampNanos.now() + e = lambda ts: self.enc_des_ts(ts, v=2) expected = ( - f'table_name,sym1=val1 {ts.value}\n' + - f'table_name,sym2=val2 {ts.value}\n').encode('utf-8') + f'table_name,sym1=val1 {e(ts)}\n' + + f'table_name,sym2=val2 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port, auto_flush=False) as sender: with sender.transaction('table_name') as txn: df = pd.DataFrame({'sym1': ['val1', None], 'sym2': [None, 'val2']}) @@ -805,12 +832,13 @@ def test_transaction_no_auto_flush_df(self): def test_transaction_auto_flush_pending_buf(self): ts = qi.TimestampNanos.now() + e = lambda ts: self.enc_des_ts(ts, v=2) expected1 = ( - f'tbl1,sym1=val1 {ts.value}\n' + - f'tbl1,sym2=val2 {ts.value}\n').encode('utf-8') + f'tbl1,sym1=val1 {e(ts)}\n' + + f'tbl1,sym2=val2 {e(ts)}\n').encode('utf-8') expected2 = ( - f'tbl2,sym3=val3 {ts.value}\n' + - f'tbl2,sym4=val4 {ts.value}\n').encode('utf-8') + f'tbl2,sym3=val3 {e(ts)}\n' + + f'tbl2,sym4=val4 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port, auto_flush=True) as sender: self.assertIs(sender.row('tbl1', symbols={'sym1': 'val1'}, at=ts), sender) self.assertIs(sender.row('tbl1', symbols={'sym2': 'val2'}, at=ts), sender) @@ -835,11 +863,12 @@ def test_transaction_no_auto_flush_pending_buf(self): def test_transaction_immediate_auto_flush(self): ts = qi.TimestampNanos.now() - expected1 = f'tbl1,sym1=val1 {ts.value}\n'.encode('utf-8') - expected2 = f'tbl2,sym2=val2 {ts.value}\n'.encode('utf-8') + e = lambda num: self.enc_des_ts(num, v=2) + expected1 = f'tbl1,sym1=val1 {e(ts)}\n'.encode('utf-8') + expected2 = f'tbl2,sym2=val2 {e(ts)}\n'.encode('utf-8') expected3 = ( - f'tbl3,sym3=val3 {ts.value}\n' + - f'tbl3,sym4=val4 {ts.value}\n').encode('utf-8') + f'tbl3,sym3=val3 {e(ts)}\n' + + f'tbl3,sym4=val4 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port, auto_flush_rows=1) as sender: self.assertIs(sender.row('tbl1', symbols={'sym1': 'val1'}, at=ts), sender) self.assertIs(sender.row('tbl2', symbols={'sym2': 'val2'}, at=ts), sender) @@ -855,11 +884,12 @@ def test_transaction_immediate_auto_flush(self): @unittest.skipIf(not pd, 'pandas not installed') def test_transaction_immediate_auto_flush_df(self): ts = qi.TimestampNanos.now() - expected1 = f'tbl1,sym1=val1 {ts.value}\n'.encode('utf-8') - expected2 = f'tbl2,sym2=val2 {ts.value}\n'.encode('utf-8') + e = lambda ts: self.enc_des_ts(ts, v=2) + expected1 = f'tbl1,sym1=val1 {e(ts)}\n'.encode('utf-8') + expected2 = f'tbl2,sym2=val2 {e(ts)}\n'.encode('utf-8') expected3 = ( - f'tbl3,sym3=val3 {ts.value}\n' + - f'tbl3,sym4=val4 {ts.value}\n').encode('utf-8') + f'tbl3,sym3=val3 {e(ts)}\n' + + f'tbl3,sym4=val4 {e(ts)}\n').encode('utf-8') with HttpServer() as server, self.builder('http', '127.0.0.1', server.port, auto_flush_rows=1) as sender: self.assertIs(sender.row('tbl1', symbols={'sym1': 'val1'}, at=ts), sender) self.assertIs(sender.row('tbl2', symbols={'sym2': 'val2'}, at=ts), sender) @@ -1132,8 +1162,9 @@ def _test_sender_http_auto_protocol_version(self, settings, expected_version: in symbols={'id': 'Hola'}, columns={'price': '111222233333i', 'qty': 3.5}, at=qi.TimestampNanos(111222233333)) + e = lambda num: self.enc_des_ts_n(num, v=expected_version) exp = b'line_sender_buffer_old_server2,id=Hola price="111222233333i",qty' + _float_binary_bytes( - 3.5, expected_version == 1) + b' 111222233333\n' + 3.5, expected_version == 1) + f' {e(111222233333)}\n'.encode() self.assertEqual(bytes(buffer), exp) sender.flush(buffer) self.assertEqual(len(server.requests), 1) @@ -1183,7 +1214,7 @@ def test_line_protocol_version_on_tcp(self): symbols={'id': 'Hola'}, columns={'qty': 3.5}, at=qi.TimestampNanos(111222233333)) - exp = b'line_sender_buffer_tcp_v1,id=Hola qty' + _float_binary_bytes(3.5) + b' 111222233333\n' + exp = b'line_sender_buffer_tcp_v1,id=Hola qty' + _float_binary_bytes(3.5) + b' 111222233333n\n' self.assertEqual(bytes(buffer), exp) sender.flush(buffer) self.assertEqual(server.recv()[0] + b'\n', exp) @@ -1209,7 +1240,7 @@ def _test_array_basic(self, arr: np.ndarray): 'array_test', columns={'array': arr}, at=qi.TimestampNanos(11111)) - exp = b'array_test array=' + _array_binary_bytes(arr) + b' 11111\n' + exp = b'array_test array=' + _array_binary_bytes(arr) + b' 11111n\n' sender.flush() self.assertEqual(len(server.requests), 1) self.assertEqual(server.requests[0], exp) @@ -1222,7 +1253,7 @@ def _test_array_basic(self, arr: np.ndarray): 'array_test', columns={'array': arr}, at=qi.TimestampNanos(11111)) - exp = b'array_test array=' + _array_binary_bytes(arr) + b' 11111\n' + exp = b'array_test array=' + _array_binary_bytes(arr) + b' 11111n\n' self.assertEqual(bytes(sender), exp) sender.flush() self.assertEqual(server.recv()[0] + b'\n', exp) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index df1822e2..66bbd71d 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -8,7 +8,7 @@ import functools import tempfile import pathlib -from test_tools import _float_binary_bytes, _array_binary_bytes +from test_tools import _float_binary_bytes, _array_binary_bytes, TimestampEncodingMixin BROKEN_TIMEZONES = True @@ -89,7 +89,7 @@ def wrapper(self, *args, **kwargs): return wrapper class TestPandasBase: - class TestPandas(unittest.TestCase): + class TestPandas(unittest.TestCase, TimestampEncodingMixin): def test_mandatory_at_dataframe(self): with self.assertRaisesRegex(TypeError, "needs keyword-only argument at"): _dataframe(self.version, []) @@ -204,11 +204,12 @@ def test_basic(self): table_name_col='T', symbols=['A', 'B', 'C', 'D'], at=-1) - self.assertEqual( - buf, - b't1,A=a1,B=b1,C=b1,D=a1 E' + _float_binary_bytes(1.0, self.version == 1) + b',F=1i 1520640000000000000\n' + - b't2,A=a2,D=a2 E' + _float_binary_bytes(2.0, self.version == 1) + b',F=2i 1520726400000000000\n' + - b't1,A=a3,B=b3,C=b3,D=a3 E' + _float_binary_bytes(3.0, self.version == 1) + b',F=3i 1520812800000000000\n') + e = self.enc_des_ts_n + exp = ( + b't1,A=a1,B=b1,C=b1,D=a1 E' + _float_binary_bytes(1.0, self.version == 1) + f',F=1i {e(1520640000000000000)}\n'.encode() + + b't2,A=a2,D=a2 E' + _float_binary_bytes(2.0, self.version == 1) + f',F=2i {e(1520726400000000000)}\n'.encode() + + b't1,A=a3,B=b3,C=b3,D=a3 E' + _float_binary_bytes(3.0, self.version == 1) + f',F=3i {e(1520812800000000000)}\n'.encode()) + self.assertEqual(buf, exp) def test_basic_with_arrays(self): if self.version == 1: @@ -219,11 +220,12 @@ def test_basic_with_arrays(self): table_name_col='T', symbols=['A', 'B', 'C', 'D'], at=-1) - self.assertEqual( - buf, - b't1,A=a1,B=b1,C=b1,D=a1 E' + _float_binary_bytes(1.0, self.version == 1) + b',F=1i,G=' + _array_binary_bytes(np.array([1.0])) + b' 1520640000000000000\n' + - b't2,A=a2,D=a2 E' + _float_binary_bytes(2.0, self.version == 1) + b',F=2i,G=' + _array_binary_bytes(np.array([10.0])) + b' 1520726400000000000\n' + - b't1,A=a3,B=b3,C=b3,D=a3 E' + _float_binary_bytes(3.0, self.version == 1) + b',F=3i,G=' + _array_binary_bytes(np.array([100.0])) + b' 1520812800000000000\n') + e = self.enc_des_ts_n + exp = ( + b't1,A=a1,B=b1,C=b1,D=a1 E' + _float_binary_bytes(1.0, self.version == 1) + b',F=1i,G=' + _array_binary_bytes(np.array([1.0])) + f' {e(1520640000000000000)}\n'.encode() + + b't2,A=a2,D=a2 E' + _float_binary_bytes(2.0, self.version == 1) + b',F=2i,G=' + _array_binary_bytes(np.array([10.0])) + f' {e(1520726400000000000)}\n'.encode() + + b't1,A=a3,B=b3,C=b3,D=a3 E' + _float_binary_bytes(3.0, self.version == 1) + b',F=3i,G=' + _array_binary_bytes(np.array([100.0])) + f' {e(1520812800000000000)}\n'.encode()) + self.assertEqual(buf, exp) def test_named_dataframe(self): df = pd.DataFrame({ @@ -276,13 +278,14 @@ def test_at_good(self): t6 = qi.TimestampNanos.from_datetime(t2) t7 = qi.TimestampNanos.from_datetime(t3) timestamps = [t1, t2, t3, t4, t5, t6, t7] + e = self.enc_des_ts_n for ts in timestamps: buf = _dataframe(self.version, df, table_name='tbl1', at=ts) self.assertEqual( buf, - b'tbl1 a=1i,b="a" 1520640000000000000\n' + - b'tbl1 a=2i,b="b" 1520640000000000000\n' + - b'tbl1 a=3i,b="c" 1520640000000000000\n') + f'tbl1 a=1i,b="a" {e(1520640000000000000)}\n'.encode() + + f'tbl1 a=2i,b="b" {e(1520640000000000000)}\n'.encode() + + f'tbl1 a=3i,b="c" {e(1520640000000000000)}\n'.encode()) @unittest.skipIf(BROKEN_TIMEZONES, 'requires accurate timezones') def test_at_neg(self): @@ -313,13 +316,14 @@ def test_at_ts_0(self): e7 = qi.TimestampNanos.from_datetime(e3) edge_timestamps = [e1, e2, e3, e4, e5, e6, e7] + e = self.enc_des_ts_n for ts in edge_timestamps: buf = _dataframe(self.version, df, table_name='tbl1', at=ts) self.assertEqual( buf, - b'tbl1 a=1i,b="a" 0\n' + - b'tbl1 a=2i,b="b" 0\n' + - b'tbl1 a=3i,b="c" 0\n') + f'tbl1 a=1i,b="a" {e(0)}\n'.encode() + + f'tbl1 a=2i,b="b" {e(0)}\n'.encode() + + f'tbl1 a=3i,b="c" {e(0)}\n'.encode()) def test_single_at_col(self): df = pd.DataFrame({'timestamp': pd.to_datetime(['2023-01-01'])}) @@ -837,17 +841,18 @@ def test_datetime64_numpy_col(self): dtype='datetime64[ns]'), 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}) buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) - self.assertEqual( - buf, - b'tbl1 a=1546300800000000t,b="a"\n' + - b'tbl1 a=1546300801000000t,b="b"\n' + - b'tbl1 a=1546300802000000t,b="c"\n' + - b'tbl1 a=1546300803000000t,b="d"\n' + - b'tbl1 a=1546300804000000t,b="e"\n' + - b'tbl1 a=1546300805000000t,b="f"\n' + + e = self.enc_ts_n + exp = ( + f'tbl1 a={e(1546300800000000000)},b="a"\n'.encode() + + f'tbl1 a={e(1546300801000000000)},b="b"\n'.encode() + + f'tbl1 a={e(1546300802000000000)},b="c"\n'.encode() + + f'tbl1 a={e(1546300803000000000)},b="d"\n'.encode() + + f'tbl1 a={e(1546300804000000000)},b="e"\n'.encode() + + f'tbl1 a={e(1546300805000000000)},b="f"\n'.encode() + b'tbl1 b="g"\n' + b'tbl1 b="h"\n' + b'tbl1 b="i"\n') + self.assertEqual(buf, exp) df = pd.DataFrame({'a': pd.Series([ pd.Timestamp('1970-01-01 00:00:00'), @@ -856,9 +861,9 @@ def test_datetime64_numpy_col(self): buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) self.assertEqual( buf, - b'tbl1 a=0t\n' + - b'tbl1 a=1000000t\n' + - b'tbl1 a=2000000t\n') + f'tbl1 a={e(0)}\n'.encode() + + f'tbl1 a={e(1000000000)}\n'.encode() + + f'tbl1 a={e(2000000000)}\n'.encode()) def test_datetime64_tz_arrow_col(self): df = pd.DataFrame({ @@ -875,13 +880,14 @@ def test_datetime64_tz_arrow_col(self): hour=0, minute=0, second=3, tz=_TZ)], 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) + e = self.enc_ts_n self.assertEqual( buf, # Note how these are 5hr offset from `test_datetime64_numpy_col`. - b'tbl1,b=sym1 a=1546318800000000t\n' + - b'tbl1,b=sym2 a=1546318801000000t\n' + + f'tbl1,b=sym1 a={e(1546318800000000000)}\n'.encode() + + f'tbl1,b=sym2 a={e(1546318801000000000)}\n'.encode() + b'tbl1,b=sym3\n' + - b'tbl1,b=sym4 a=1546318803000000t\n') + f'tbl1,b=sym4 a={e(1546318803000000000)}\n'.encode()) # Not epoch 0. df = pd.DataFrame({ @@ -900,9 +906,9 @@ def test_datetime64_tz_arrow_col(self): self.assertEqual( buf, # Note how these are 5hr offset from `test_datetime64_numpy_col`. - b'tbl1,b=sym1 a=18000000000t\n' + - b'tbl1,b=sym2 a=18001000000t\n' + - b'tbl1,b=sym3 a=18002000000t\n') + f'tbl1,b=sym1 a={e(18000000000000)}\n'.encode() + + f'tbl1,b=sym2 a={e(18001000000000)}\n'.encode() + + f'tbl1,b=sym3 a={e(18002000000000)}\n'.encode()) # Actual epoch 0. df = pd.DataFrame({ @@ -920,9 +926,9 @@ def test_datetime64_tz_arrow_col(self): buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) self.assertEqual( buf, - b'tbl1,b=sym1 a=0t\n' + - b'tbl1,b=sym2 a=1000000t\n' + - b'tbl1,b=sym3 a=2000000t\n') + f'tbl1,b=sym1 a={e(0)}\n'.encode() + + f'tbl1,b=sym2 a={e(1000000000)}\n'.encode() + + f'tbl1,b=sym3 a={e(2000000000)}\n'.encode()) df2 = pd.DataFrame({ 'a': [ @@ -936,8 +942,8 @@ def test_datetime64_tz_arrow_col(self): # Mostly, here assert that negative timestamps are allowed. self.assertIn( buf, - [b'tbl1,b=sym1 a=-2208970800000000t\n', - b'tbl1,b=sym1 a=-2208971040000000t\n']) + [f'tbl1,b=sym1 a={e(-2208970800000000000)}\n'.encode(), + f'tbl1,b=sym1 a={e(-2208971040000000000)}\n'.encode()]) def test_datetime64_numpy_at(self): df = pd.DataFrame({ @@ -954,18 +960,18 @@ def test_datetime64_numpy_at(self): dtype='datetime64[ns]'), 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) buf = _dataframe(self.version, df, table_name='tbl1', at='a') - self.assertEqual( - buf, - b'tbl1 b=1i 1546300800000000000\n' + - b'tbl1 b=2i 1546300801000000000\n' + - b'tbl1 b=3i 1546300802000000000\n' + - b'tbl1 b=4i 1546300803000000000\n' + - b'tbl1 b=5i 1546300804000000000\n' + - b'tbl1 b=6i 1546300805000000000\n' + + e = self.enc_des_ts_n + exp = ( + f'tbl1 b=1i {e(1546300800000000000)}\n'.encode() + + f'tbl1 b=2i {e(1546300801000000000)}\n'.encode() + + f'tbl1 b=3i {e(1546300802000000000)}\n'.encode() + + f'tbl1 b=4i {e(1546300803000000000)}\n'.encode() + + f'tbl1 b=5i {e(1546300804000000000)}\n'.encode() + + f'tbl1 b=6i {e(1546300805000000000)}\n'.encode() + b'tbl1 b=7i\n' + b'tbl1 b=8i\n' + b'tbl1 b=9i\n') - + self.assertEqual(buf, exp) df = pd.DataFrame({ 'a': pd.Series([ pd.Timestamp('1970-01-01 00:00:00'), @@ -976,9 +982,9 @@ def test_datetime64_numpy_at(self): buf = _dataframe(self.version, df, table_name='tbl1', at='a') self.assertEqual( buf, - b'tbl1 b=1i 0\n' + - b'tbl1 b=2i 1000000000\n' + - b'tbl1 b=3i 2000000000\n') + f'tbl1 b=1i {e(0)}\n'.encode() + + f'tbl1 b=2i {e(1000000000)}\n'.encode() + + f'tbl1 b=3i {e(2000000000)}\n'.encode()) def test_datetime64_tz_arrow_at(self): df = pd.DataFrame({ @@ -995,13 +1001,14 @@ def test_datetime64_tz_arrow_at(self): hour=0, minute=0, second=3, tz=_TZ)], 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at='a') - self.assertEqual( - buf, + e = self.enc_des_ts_n + exp = ( # Note how these are 5hr offset from `test_datetime64_numpy_col`. - b'tbl1,b=sym1 1546318800000000000\n' + - b'tbl1,b=sym2 1546318801000000000\n' + + f'tbl1,b=sym1 {e(1546318800000000000)}\n'.encode() + + f'tbl1,b=sym2 {e(1546318801000000000)}\n'.encode() + b'tbl1,b=sym3\n' + - b'tbl1,b=sym4 1546318803000000000\n') + f'tbl1,b=sym4 {e(1546318803000000000)}\n'.encode()) + self.assertEqual(buf, exp) df2 = pd.DataFrame({ 'a': [ diff --git a/test/test_tools.py b/test/test_tools.py index 69da3ae2..09857c4a 100644 --- a/test/test_tools.py +++ b/test/test_tools.py @@ -1,6 +1,8 @@ import struct +import datetime import numpy as np +import questdb.ingress as qi ARRAY_TYPE_TAGS = { np.float64: 10, @@ -50,4 +52,59 @@ def _array_binary_bytes(value: np.ndarray) -> bytes: ndim + shape_bytes + data_body - ) \ No newline at end of file + ) + + +class TimestampEncodingMixin: + def enc_ts_t(self, num): + return f'{num}t' + + def enc_ts_n(self, num, v=None): + protocol_version = v or self.version + if protocol_version == 1: + num = num // 1000 + suffix = 't' + else: + suffix = 'n' + return f'{num}{suffix}' + + def enc_ts(self, ts, v=None): + """encode a non-designated timestamp in ILP""" + if isinstance(ts, datetime.datetime): + return self.enc_ts_t( + qi.TimestampMicros.from_datetime(ts).value) + elif isinstance(ts, qi.TimestampMicros): + return self.enc_ts_t(ts.value) + elif isinstance(ts, qi.TimestampNanos): + return self.enc_ts_n(ts.value, v=v) + else: + raise ValueError(f'unsupported ts {ts!r}') + + def enc_des_ts_t(self, num, v=None): + protocol_version = v or self.version + if protocol_version == 1: + num = num * 1000 + suffix = '' + else: + suffix = 't' + return f'{num}{suffix}' + + def enc_des_ts_n(self, num, v=None): + protocol_version = v or self.version + if protocol_version == 1: + suffix = '' + else: + suffix = 'n' + return f'{num}{suffix}' + + def enc_des_ts(self, ts, v=None): + """encode a designated timestamp in ILP""" + if isinstance(ts, datetime.datetime): + return self.enc_des_ts_t( + qi.TimestampMicros.from_datetime(ts).value, v=v) + elif isinstance(ts, qi.TimestampMicros): + return self.enc_des_ts_t(ts.value, v=v) + elif isinstance(ts, qi.TimestampNanos): + return self.enc_des_ts_n(ts.value, v=v) + else: + raise ValueError(f'unsupported ts {ts!r}')