diff --git a/.github/workflows/reusable_buildtest.yml b/.github/workflows/reusable_buildtest.yml index 2c84c26..9a00e53 100644 --- a/.github/workflows/reusable_buildtest.yml +++ b/.github/workflows/reusable_buildtest.yml @@ -11,11 +11,11 @@ on: test_windows: type: boolean default: true - description: "Run Windows Build and Test" + description: "Run Windows Build and Test" test_macintosh: type: boolean default: true - description: "Run Mac OS Build" + description: "Run Mac OS Build" upload_artifacts: type: boolean default: true @@ -63,6 +63,57 @@ jobs: --bolt-cert-file=/etc/memgraph/ssl/cert.pem \ --bolt-key-file=/etc/memgraph/ssl/key.pem + - name: Run Memgraph HA Cluster + env: + MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} + MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} + run: | + # High availability is a Memgraph Enterprise feature. The license + # secrets are unavailable on PRs from forks, so skip the cluster (and + # the HA test) with a warning rather than failing the whole build. + if [ -z "$MEMGRAPH_ENTERPRISE_LICENSE" ] || [ -z "$MEMGRAPH_ORGANIZATION_NAME" ]; then + echo "::warning::Memgraph enterprise license secrets are not set; skipping the HA cluster and the get_routing_table HA test." + exit 0 + fi + + # Two data instances... + for name in mg-data1 mg-data2; do + docker run -d --rm --name "$name" --network pymgclient-network \ + -e MEMGRAPH_ENTERPRISE_LICENSE="$MEMGRAPH_ENTERPRISE_LICENSE" \ + -e MEMGRAPH_ORGANIZATION_NAME="$MEMGRAPH_ORGANIZATION_NAME" \ + memgraph/memgraph:latest \ + --bolt-port=7687 \ + --management-port=13011 \ + --telemetry-enabled=false + done + + # ...and three coordinators for a Raft quorum. mg-coord1 is the + # bootstrap leader; the test harness adds the other two coordinators, + # registers the instances and elects a main once the containers are up. + for id in 1 2 3; do + docker run -d --rm --name "mg-coord${id}" --network pymgclient-network \ + -e MEMGRAPH_ENTERPRISE_LICENSE="$MEMGRAPH_ENTERPRISE_LICENSE" \ + -e MEMGRAPH_ORGANIZATION_NAME="$MEMGRAPH_ORGANIZATION_NAME" \ + memgraph/memgraph:latest \ + --bolt-port=7687 \ + --coordinator-id="${id}" \ + --coordinator-port=12121 \ + --coordinator-hostname="mg-coord${id}" \ + --management-port=13011 \ + --telemetry-enabled=false + done + + # Fail with a clear message if the cluster didn't come up, rather than + # as an opaque DNS error when the test connects to the coordinator. + sleep 10 + for name in mg-data1 mg-data2 mg-coord1 mg-coord2 mg-coord3; do + if [ "$(docker inspect -f '{{.State.Running}}' "$name" 2>/dev/null)" != "true" ]; then + echo "::error::Memgraph HA container $name is not running" + docker logs "$name" || true + exit 1 + fi + done + - name: Build Docker Image run: | docker buildx build \ @@ -76,9 +127,9 @@ jobs: run: | if [[ "${{ inputs.version }}" != "" ]]; then echo "Building version ${{ inputs.version }}" - docker run -d --rm -e PYMGCLIENT_OVERRIDE_VERSION=${{ inputs.version }} --name testcontainer --network pymgclient-network pymgclient-builder:latest + docker run -d --rm -e PYMGCLIENT_OVERRIDE_VERSION=${{ inputs.version }} --name testcontainer --network pymgclient-network pymgclient-builder:latest else - docker run -d --rm --name testcontainer --network pymgclient-network pymgclient-builder:latest + docker run -d --rm --name testcontainer --network pymgclient-network pymgclient-builder:latest fi - name: Build Python Wheels @@ -108,6 +159,12 @@ jobs: bash -c "export MEMGRAPH_HOST=memgraph && cd /home/memgraph/pymgclient && python$python_version -m pytest -v" docker exec -i testcontainer \ bash -c "export MEMGRAPH_STARTED_WITH_SSL=true && export MEMGRAPH_HOST=memgraph-ssl && cd /home/memgraph/pymgclient && python$python_version -m pytest -v" + # Run the HA routing test only when the cluster is up (it is skipped + # when the enterprise license secrets are unavailable, e.g. forks). + if [ "$(docker inspect -f '{{.State.Running}}' mg-coord1 2>/dev/null)" = "true" ]; then + docker exec -i testcontainer \ + bash -c "export MEMGRAPH_HA_COORDINATOR_HOST=mg-coord1 && cd /home/memgraph/pymgclient && python$python_version -m pytest -v test/test_connection.py -k get_routing_table_ha" + fi done - name: Build docs @@ -135,10 +192,13 @@ jobs: docker wait memgraph || echo "Memgraph container does not exist" docker stop memgraph-ssl || echo "Memgraph container with SSL does not exist" docker wait memgraph-ssl || echo "Memgraph container with SSL does not exist" + for name in mg-coord1 mg-coord2 mg-coord3 mg-data1 mg-data2; do + docker stop "$name" || echo "$name does not exist" + done docker rmi pymgclient-builder:latest || echo "Image does not exist" docker network rm pymgclient-network || echo "Network does not exist" - + build_and_test_windows: if: ${{ inputs.test_windows }} name: Build and Test on Windows @@ -172,13 +232,13 @@ jobs: mingw-w64-x86_64-cmake mingw-w64-x86_64-make mingw-w64-x86_64-openssl - + - name: Add MSYS2 mingw64/bin to PATH shell: msys2 {0} run: | echo "/mingw64/bin" >> $GITHUB_PATH - - + + - name: Set up Windows Python uses: actions/setup-python@v5 with: @@ -191,7 +251,7 @@ jobs: echo "$pythonLocation" >> $GITHUB_PATH env: pythonLocation: ${{ env.pythonLocation }} - + - name: Install Python build tools shell: msys2 {0} @@ -200,7 +260,7 @@ jobs: python -m pip install --upgrade pip setuptools wheel pyopenssl pytest tzdata build env: pythonLocation: ${{ env.pythonLocation }} - + - name: Build pymgclient Wheel shell: msys2 {0} run: | @@ -239,25 +299,25 @@ jobs: sudo apt install -y ./memgraph.deb openssl req -x509 -newkey rsa:4096 -days 3650 -nodes -keyout key.pem -out cert.pem -subj "/C=GB/ST=London/L=London/O=Testing Corp./CN=PymgclientTest" nohup /usr/lib/memgraph/memgraph --bolt-port 7687 --bolt-cert-file="cert.pem" --bolt-key-file="key.pem" --data-directory="~/memgraph/data" --storage-properties-on-edges=true --storage-snapshot-interval-sec=0 --storage-wal-enabled=false --storage-snapshot-on-exit=false --telemetry-enabled=false --log-file='' & - + # sleep here instead of using script because it just doesn't work in Windows sleep 3 # sed $'s/\r$//' ./tools/wait_for_memgraph.sh > ./tools/wait_for_memgraph.unix.sh # bash ./tools/wait_for_memgraph.unix.sh localhost - + - name: Run Tests shell: msys2 {0} run: | export PATH="$(cygpath -u "$pythonLocation"):/mingw64/bin:$PATH" echo $PATH - python -m pytest -v + python -m pytest -v env: pythonLocation: ${{ env.pythonLocation }} MEMGRAPH_HOST: localhost MEMGRAPH_STARTED_WITH_SSL: - + - name: Upload Wheel Artifact if: ${{ inputs.upload_artifacts && matrix.os == 'windows-2025' }} @@ -289,7 +349,7 @@ jobs: - uses: actions/checkout@v2 with: submodules: true - + - name: Set override version if provided if: ${{ inputs.version != '' }} run: | @@ -346,4 +406,3 @@ jobs: with: name: pymgclient-${{ matrix.platform[0] || matrix.platform }}-${{ matrix.python_version }} path: dist/*.whl - diff --git a/mgclient b/mgclient index 55cc7f5..6c6ba96 160000 --- a/mgclient +++ b/mgclient @@ -1 +1 @@ -Subproject commit 55cc7f5fa291d649e4965a08751a4e05ed75696a +Subproject commit 6c6ba9604060d8cecc6ddb472132fcf56007d6a1 diff --git a/src/connection.c b/src/connection.c index 8618333..7a96198 100644 --- a/src/connection.c +++ b/src/connection.c @@ -18,6 +18,7 @@ #include "cursor.h" #include "exceptions.h" +#include "glue.h" static void connection_dealloc(ConnectionObject *conn) { mg_session_destroy(conn->session); @@ -338,6 +339,177 @@ int connection_autocommit_set(ConnectionObject *conn, PyObject *value, return 0; } +// clang-format off +PyDoc_STRVAR(connection_get_routing_table_doc, +"get_routing_table(routing_context=None, bookmarks=None, extra=None)\n\ +--\n\ +\n\ +Fetch the client-side routing table from a Memgraph coordinator.\n\ +\n\ +Sends a Bolt ``ROUTE`` message to the server this connection is attached to\n\ +(which must be a coordinator of a high-availability cluster) and returns the\n\ +current cluster topology. This is a low-level primitive intended for building\n\ +client-side routing on top of pymgclient: it only retrieves the table and does\n\ +not open any further connections or dispatch queries.\n\ +\n\ +The negotiated Bolt protocol version must be at least 4.3, and the connection\n\ +must be idle (no query in progress and no open transaction).\n\ +\n\ + * :obj:`routing_context`\n\ +\n\ + Optional :class:`dict` with routing context (for example the address\n\ + used to contact the coordinator). Defaults to an empty map.\n\ +\n\ + * :obj:`bookmarks`\n\ +\n\ + Optional iterable of bookmark strings, or :obj:`None` for none.\n\ +\n\ + * :obj:`extra`\n\ +\n\ + Optional :class:`dict` with extra information. On Bolt 4.4 it is sent\n\ + verbatim; on Bolt 4.3 only its ``\"db\"`` string entry (if present) is\n\ + used to populate the database-name field.\n\ +\n\ +Returns a :class:`dict` of the form::\n\ +\n\ + {\n\ + \"ttl\": ,\n\ + \"servers\": [\n\ + {\"addresses\": [\"host:port\", ...], \"role\": \"READ\"|\"WRITE\"|\"ROUTE\"},\n\ + ...\n\ + ]\n\ + }"); +// clang-format on + +static PyObject *connection_get_routing_table(ConnectionObject *conn, + PyObject *args, + PyObject *kwargs) { + static char *kwlist[] = {"routing_context", "bookmarks", "extra", NULL}; + PyObject *routing_context = NULL; + PyObject *bookmarks = NULL; + PyObject *extra = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kwlist, + &routing_context, &bookmarks, &extra)) { + return NULL; + } + + if (connection_raise_if_bad_status(conn) < 0) { + return NULL; + } + + if (conn->status != CONN_STATUS_READY) { + PyErr_SetString(InterfaceError, + "cannot get routing table while a query is in progress or " + "a transaction is open"); + return NULL; + } + + mg_map *mg_routing = NULL; + mg_map *mg_extra = NULL; + mg_list *mg_bookmarks = NULL; + + if (routing_context && routing_context != Py_None) { + if (!PyDict_Check(routing_context)) { + PyErr_SetString(PyExc_TypeError, "routing_context must be a dict"); + return NULL; + } + mg_routing = py_dict_to_mg_map(routing_context); + if (!mg_routing) { + return NULL; + } + } else { + mg_routing = mg_map_make_empty(0); + if (!mg_routing) { + PyErr_SetString(PyExc_RuntimeError, "couldn't allocate routing map"); + return NULL; + } + } + + if (extra && extra != Py_None) { + if (!PyDict_Check(extra)) { + PyErr_SetString(PyExc_TypeError, "extra must be a dict"); + goto cleanup_error; + } + mg_extra = py_dict_to_mg_map(extra); + if (!mg_extra) { + goto cleanup_error; + } + } + + if (bookmarks && bookmarks != Py_None) { + // A str/bytes is itself a sequence, so without this check it would be + // silently iterated into single-character bookmarks. + if (PyUnicode_Check(bookmarks) || PyBytes_Check(bookmarks)) { + PyErr_SetString(PyExc_TypeError, + "bookmarks must be an iterable of str, not a single " + "str or bytes"); + goto cleanup_error; + } + PyObject *seq = + PySequence_Fast(bookmarks, "bookmarks must be an iterable of str"); + if (!seq) { + goto cleanup_error; + } + Py_ssize_t size = PySequence_Fast_GET_SIZE(seq); + if (size > UINT32_MAX) { + Py_DECREF(seq); + PyErr_SetString(PyExc_ValueError, "bookmarks size exceeded"); + goto cleanup_error; + } + mg_bookmarks = mg_list_make_empty((uint32_t)size); + if (!mg_bookmarks) { + Py_DECREF(seq); + PyErr_SetString(PyExc_RuntimeError, "couldn't allocate bookmarks list"); + goto cleanup_error; + } + for (Py_ssize_t i = 0; i < size; ++i) { + PyObject *item = PySequence_Fast_GET_ITEM(seq, i); // borrowed reference + if (!PyUnicode_Check(item)) { + Py_DECREF(seq); + PyErr_SetString(PyExc_TypeError, "bookmarks must be str"); + goto cleanup_error; + } + const char *bookmark = PyUnicode_AsUTF8(item); + if (!bookmark) { + Py_DECREF(seq); + goto cleanup_error; + } + mg_value *value = mg_value_make_string(bookmark); + if (!value || mg_list_append(mg_bookmarks, value) != 0) { + mg_value_destroy(value); + Py_DECREF(seq); + PyErr_SetString(PyExc_RuntimeError, "couldn't build bookmarks list"); + goto cleanup_error; + } + } + Py_DECREF(seq); + } + + mg_map *routing_table = NULL; + int status = mg_session_route(conn->session, mg_routing, mg_bookmarks, + mg_extra, &routing_table); + + mg_map_destroy(mg_routing); + mg_map_destroy(mg_extra); + mg_list_destroy(mg_bookmarks); + + if (status != 0) { + connection_handle_error(conn, status); + return NULL; + } + + PyObject *result = mg_map_to_py_dict(routing_table); + mg_map_destroy(routing_table); + return result; + +cleanup_error: + mg_map_destroy(mg_routing); + mg_map_destroy(mg_extra); + mg_list_destroy(mg_bookmarks); + return NULL; +} + static PyMethodDef connection_methods[] = { {"close", (PyCFunction)connection_close, METH_NOARGS, connection_close_doc}, {"commit", (PyCFunction)connection_commit, METH_NOARGS, @@ -346,6 +518,8 @@ static PyMethodDef connection_methods[] = { connection_rollback_doc}, {"cursor", (PyCFunction)connection_cursor, METH_NOARGS, connection_cursor_doc}, + {"get_routing_table", (PyCFunction)connection_get_routing_table, + METH_VARARGS | METH_KEYWORDS, connection_get_routing_table_doc}, {NULL, NULL, 0, NULL}}; // clang-format off diff --git a/src/connection.h b/src/connection.h index 9b9bdd8..8cb78b6 100644 --- a/src/connection.h +++ b/src/connection.h @@ -42,6 +42,8 @@ extern PyTypeObject ConnectionType; int connection_raise_if_bad_status(const ConnectionObject *conn); +void connection_handle_error(ConnectionObject *conn, int error); + int connection_run_without_results(ConnectionObject *conn, const char *query); int connection_run(ConnectionObject *conn, const char *query, PyObject *params, diff --git a/src/glue.h b/src/glue.h index 1fe2db4..5003a66 100644 --- a/src/glue.h +++ b/src/glue.h @@ -25,6 +25,8 @@ PyObject *mg_list_to_py_list(const mg_list *list); PyObject *mg_value_to_py_object(const mg_value *value); +PyObject *mg_map_to_py_dict(const mg_map *map); + mg_map *py_dict_to_mg_map(PyObject *dict); mg_value *py_object_to_mg_value(PyObject *object); diff --git a/test/common.py b/test/common.py index 4206493..b46419c 100644 --- a/test/common.py +++ b/test/common.py @@ -24,6 +24,11 @@ MEMGRAPH_PORT = int(os.getenv("MEMGRAPH_PORT", 7687)) MEMGRAPH_HOST = os.getenv("MEMGRAPH_HOST", None) MEMGRAPH_STARTED_WITH_SSL = os.getenv("MEMGRAPH_STARTED_WITH_SSL", None) + +# Host/port of a coordinator in a Memgraph high-availability cluster. When set, +# the HA routing tests run against it; otherwise they are skipped. +MEMGRAPH_HA_COORDINATOR_HOST = os.getenv("MEMGRAPH_HA_COORDINATOR_HOST", None) +MEMGRAPH_HA_COORDINATOR_PORT = int(os.getenv("MEMGRAPH_HA_COORDINATOR_PORT", 7687)) DURABILITY_DIR = tempfile.TemporaryDirectory() PYTHON_VERSION = sys.version_info[:2] @@ -54,6 +59,11 @@ def wait_for_server(port): reason="requires insecure connection", ) +requires_ha_cluster = pytest.mark.skipif( + MEMGRAPH_HA_COORDINATOR_HOST is None, + reason="requires a Memgraph HA cluster (set MEMGRAPH_HA_COORDINATOR_HOST)", +) + class Memgraph: def __init__(self, host, port, use_ssl, process): diff --git a/test/test_connection.py b/test/test_connection.py index adfb8a1..282c1f2 100644 --- a/test/test_connection.py +++ b/test/test_connection.py @@ -15,8 +15,17 @@ import mgclient import pytest import tempfile - -from common import start_memgraph, Memgraph, requires_ssl_enabled, requires_ssl_disabled +import time + +from common import ( + start_memgraph, + Memgraph, + requires_ssl_enabled, + requires_ssl_disabled, + requires_ha_cluster, + MEMGRAPH_HA_COORDINATOR_HOST, + MEMGRAPH_HA_COORDINATOR_PORT, +) from OpenSSL import crypto @@ -82,6 +91,134 @@ def test_connect_args_validation(): ) +@requires_ssl_disabled +def test_get_routing_table_args_validation(memgraph_server): + host, port, sslmode, _ = memgraph_server + conn = mgclient.connect(host=host, port=port, sslmode=sslmode) + + # routing_context must be a dict + with pytest.raises(TypeError): + conn.get_routing_table(routing_context=["not", "a", "dict"]) + + # extra must be a dict + with pytest.raises(TypeError): + conn.get_routing_table(extra=42) + + # bookmarks must be an iterable of str + with pytest.raises(TypeError): + conn.get_routing_table(bookmarks=[1, 2, 3]) + + # a single str/bytes must be rejected rather than iterated char by char + with pytest.raises(TypeError): + conn.get_routing_table(bookmarks="single-bookmark") + with pytest.raises(TypeError): + conn.get_routing_table(bookmarks=b"single-bookmark") + + +@requires_ssl_disabled +def test_get_routing_table_closed_connection(memgraph_server): + host, port, sslmode, _ = memgraph_server + conn = mgclient.connect(host=host, port=port, sslmode=sslmode) + conn.close() + + with pytest.raises(mgclient.InterfaceError): + conn.get_routing_table() + + +# Topology created by the "Run Memgraph HA Cluster" CI step. The fixture below +# must agree with the container names and ports used there. +HA_COORDINATORS = ["mg-coord1", "mg-coord2", "mg-coord3"] +HA_DATA_INSTANCES = [("instance_1", "mg-data1"), ("instance_2", "mg-data2")] +HA_MAIN = "instance_1" +HA_BOLT_PORT = 7687 +HA_COORDINATOR_PORT = 12121 +HA_MANAGEMENT_PORT = 13011 +HA_REPLICATION_PORT = 10000 + + +def _ha_admin(conn, query): + """Run a coordinator admin query, tolerating idempotent re-runs.""" + cursor = conn.cursor() + try: + cursor.execute(query) + try: + cursor.fetchall() + except mgclient.Error: + pass + except mgclient.DatabaseError as exc: + # Re-running setup against an already-configured cluster is fine. + print(f"HA setup query ignored error: {exc}") + + +@pytest.fixture(scope="module") +def ha_cluster(): + host = MEMGRAPH_HA_COORDINATOR_HOST + port = MEMGRAPH_HA_COORDINATOR_PORT + + conn = mgclient.connect(host=host, port=port) + conn.autocommit = True + + # mg-coord1 is the bootstrap coordinator; add the remaining ones. + for cid, name in enumerate(HA_COORDINATORS, start=1): + if cid == 1: + continue + _ha_admin( + conn, + f'ADD COORDINATOR {cid} WITH CONFIG ' + f'{{"bolt_server": "{name}:{HA_BOLT_PORT}", ' + f'"coordinator_server": "{name}:{HA_COORDINATOR_PORT}", ' + f'"management_server": "{name}:{HA_MANAGEMENT_PORT}"}}', + ) + + for name, data_host in HA_DATA_INSTANCES: + _ha_admin( + conn, + f'REGISTER INSTANCE {name} WITH CONFIG ' + f'{{"bolt_server": "{data_host}:{HA_BOLT_PORT}", ' + f'"management_server": "{data_host}:{HA_MANAGEMENT_PORT}", ' + f'"replication_server": "{data_host}:{HA_REPLICATION_PORT}"}}', + ) + + _ha_admin(conn, f"SET INSTANCE {HA_MAIN} TO MAIN") + + # Wait for the cluster to converge and advertise all roles: the main + # (WRITE), the replica (READ) and the coordinators (ROUTE). + timeout = 60 + for _ in range(timeout): + table = conn.get_routing_table() + roles = {server["role"] for server in table["servers"]} + if {"READ", "WRITE", "ROUTE"} <= roles: + break + time.sleep(1) + else: + conn.close() + raise RuntimeError(f"HA cluster did not converge: {table}") + + yield host, port + + conn.close() + + +@requires_ha_cluster +def test_get_routing_table_ha(ha_cluster): + host, port = ha_cluster + conn = mgclient.connect(host=host, port=port) + + table = conn.get_routing_table() + + assert isinstance(table["ttl"], int) + assert table["servers"] + + # The cluster has a main (WRITE), a replica (READ) and coordinators (ROUTE), + # so all three roles must be present. + roles = {server["role"] for server in table["servers"]} + assert roles == {"READ", "WRITE", "ROUTE"} + + for server in table["servers"]: + assert isinstance(server["addresses"], list) + assert server["addresses"] + + @requires_ssl_disabled def test_connect_insecure_success(memgraph_server): host, port, sslmode, _ = memgraph_server