Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 75 additions & 16 deletions .github/workflows/reusable_buildtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ on:
test_windows:
type: boolean
default: true
description: "Run Windows Build and Test"
description: "Run Windows Build and Test"
test_macintosh:
type: boolean
default: true
description: "Run Mac OS Build"
description: "Run Mac OS Build"
upload_artifacts:
type: boolean
default: true
Expand Down Expand Up @@ -63,6 +63,57 @@ jobs:
--bolt-cert-file=/etc/memgraph/ssl/cert.pem \
--bolt-key-file=/etc/memgraph/ssl/key.pem

- name: Run Memgraph HA Cluster
env:
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
run: |
Comment thread
mattkjames7 marked this conversation as resolved.
# High availability is a Memgraph Enterprise feature. The license
# secrets are unavailable on PRs from forks, so skip the cluster (and
# the HA test) with a warning rather than failing the whole build.
if [ -z "$MEMGRAPH_ENTERPRISE_LICENSE" ] || [ -z "$MEMGRAPH_ORGANIZATION_NAME" ]; then
echo "::warning::Memgraph enterprise license secrets are not set; skipping the HA cluster and the get_routing_table HA test."
exit 0
fi

# Two data instances...
for name in mg-data1 mg-data2; do
docker run -d --rm --name "$name" --network pymgclient-network \
-e MEMGRAPH_ENTERPRISE_LICENSE="$MEMGRAPH_ENTERPRISE_LICENSE" \
-e MEMGRAPH_ORGANIZATION_NAME="$MEMGRAPH_ORGANIZATION_NAME" \
memgraph/memgraph:latest \
--bolt-port=7687 \
--management-port=13011 \
--telemetry-enabled=false
done

# ...and three coordinators for a Raft quorum. mg-coord1 is the
# bootstrap leader; the test harness adds the other two coordinators,
# registers the instances and elects a main once the containers are up.
for id in 1 2 3; do
docker run -d --rm --name "mg-coord${id}" --network pymgclient-network \
-e MEMGRAPH_ENTERPRISE_LICENSE="$MEMGRAPH_ENTERPRISE_LICENSE" \
-e MEMGRAPH_ORGANIZATION_NAME="$MEMGRAPH_ORGANIZATION_NAME" \
memgraph/memgraph:latest \
--bolt-port=7687 \
--coordinator-id="${id}" \
--coordinator-port=12121 \
--coordinator-hostname="mg-coord${id}" \
--management-port=13011 \
--telemetry-enabled=false
done

# Fail with a clear message if the cluster didn't come up, rather than
# as an opaque DNS error when the test connects to the coordinator.
sleep 10
for name in mg-data1 mg-data2 mg-coord1 mg-coord2 mg-coord3; do
if [ "$(docker inspect -f '{{.State.Running}}' "$name" 2>/dev/null)" != "true" ]; then
echo "::error::Memgraph HA container $name is not running"
docker logs "$name" || true
exit 1
fi
done

- name: Build Docker Image
run: |
docker buildx build \
Expand All @@ -76,9 +127,9 @@ jobs:
run: |
if [[ "${{ inputs.version }}" != "" ]]; then
echo "Building version ${{ inputs.version }}"
docker run -d --rm -e PYMGCLIENT_OVERRIDE_VERSION=${{ inputs.version }} --name testcontainer --network pymgclient-network pymgclient-builder:latest
docker run -d --rm -e PYMGCLIENT_OVERRIDE_VERSION=${{ inputs.version }} --name testcontainer --network pymgclient-network pymgclient-builder:latest
else
docker run -d --rm --name testcontainer --network pymgclient-network pymgclient-builder:latest
docker run -d --rm --name testcontainer --network pymgclient-network pymgclient-builder:latest
fi

- name: Build Python Wheels
Expand Down Expand Up @@ -108,6 +159,12 @@ jobs:
bash -c "export MEMGRAPH_HOST=memgraph && cd /home/memgraph/pymgclient && python$python_version -m pytest -v"
docker exec -i testcontainer \
bash -c "export MEMGRAPH_STARTED_WITH_SSL=true && export MEMGRAPH_HOST=memgraph-ssl && cd /home/memgraph/pymgclient && python$python_version -m pytest -v"
# Run the HA routing test only when the cluster is up (it is skipped
# when the enterprise license secrets are unavailable, e.g. forks).
if [ "$(docker inspect -f '{{.State.Running}}' mg-coord1 2>/dev/null)" = "true" ]; then
docker exec -i testcontainer \
bash -c "export MEMGRAPH_HA_COORDINATOR_HOST=mg-coord1 && cd /home/memgraph/pymgclient && python$python_version -m pytest -v test/test_connection.py -k get_routing_table_ha"
fi
done

- name: Build docs
Expand Down Expand Up @@ -135,10 +192,13 @@ jobs:
docker wait memgraph || echo "Memgraph container does not exist"
docker stop memgraph-ssl || echo "Memgraph container with SSL does not exist"
docker wait memgraph-ssl || echo "Memgraph container with SSL does not exist"
for name in mg-coord1 mg-coord2 mg-coord3 mg-data1 mg-data2; do
docker stop "$name" || echo "$name does not exist"
done
docker rmi pymgclient-builder:latest || echo "Image does not exist"
docker network rm pymgclient-network || echo "Network does not exist"


build_and_test_windows:
if: ${{ inputs.test_windows }}
name: Build and Test on Windows
Expand Down Expand Up @@ -172,13 +232,13 @@ jobs:
mingw-w64-x86_64-cmake
mingw-w64-x86_64-make
mingw-w64-x86_64-openssl

- name: Add MSYS2 mingw64/bin to PATH
shell: msys2 {0}
run: |
echo "/mingw64/bin" >> $GITHUB_PATH


- name: Set up Windows Python
uses: actions/setup-python@v5
with:
Expand All @@ -191,7 +251,7 @@ jobs:
echo "$pythonLocation" >> $GITHUB_PATH
env:
pythonLocation: ${{ env.pythonLocation }}


- name: Install Python build tools
shell: msys2 {0}
Expand All @@ -200,7 +260,7 @@ jobs:
python -m pip install --upgrade pip setuptools wheel pyopenssl pytest tzdata build
env:
pythonLocation: ${{ env.pythonLocation }}

- name: Build pymgclient Wheel
shell: msys2 {0}
run: |
Expand Down Expand Up @@ -239,25 +299,25 @@ jobs:
sudo apt install -y ./memgraph.deb
openssl req -x509 -newkey rsa:4096 -days 3650 -nodes -keyout key.pem -out cert.pem -subj "/C=GB/ST=London/L=London/O=Testing Corp./CN=PymgclientTest"
nohup /usr/lib/memgraph/memgraph --bolt-port 7687 --bolt-cert-file="cert.pem" --bolt-key-file="key.pem" --data-directory="~/memgraph/data" --storage-properties-on-edges=true --storage-snapshot-interval-sec=0 --storage-wal-enabled=false --storage-snapshot-on-exit=false --telemetry-enabled=false --log-file='' &

# sleep here instead of using script because it just doesn't work in Windows
sleep 3
# sed $'s/\r$//' ./tools/wait_for_memgraph.sh > ./tools/wait_for_memgraph.unix.sh
# bash ./tools/wait_for_memgraph.unix.sh localhost


- name: Run Tests
shell: msys2 {0}
run: |
export PATH="$(cygpath -u "$pythonLocation"):/mingw64/bin:$PATH"
echo $PATH
python -m pytest -v
python -m pytest -v

env:
pythonLocation: ${{ env.pythonLocation }}
MEMGRAPH_HOST: localhost
MEMGRAPH_STARTED_WITH_SSL:


- name: Upload Wheel Artifact
if: ${{ inputs.upload_artifacts && matrix.os == 'windows-2025' }}
Expand Down Expand Up @@ -289,7 +349,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true

- name: Set override version if provided
if: ${{ inputs.version != '' }}
run: |
Expand Down Expand Up @@ -346,4 +406,3 @@ jobs:
with:
name: pymgclient-${{ matrix.platform[0] || matrix.platform }}-${{ matrix.python_version }}
path: dist/*.whl

174 changes: 174 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "cursor.h"
#include "exceptions.h"
#include "glue.h"

static void connection_dealloc(ConnectionObject *conn) {
mg_session_destroy(conn->session);
Expand Down Expand Up @@ -338,6 +339,177 @@ int connection_autocommit_set(ConnectionObject *conn, PyObject *value,
return 0;
}

// clang-format off
PyDoc_STRVAR(connection_get_routing_table_doc,
"get_routing_table(routing_context=None, bookmarks=None, extra=None)\n\
--\n\
\n\
Fetch the client-side routing table from a Memgraph coordinator.\n\
\n\
Sends a Bolt ``ROUTE`` message to the server this connection is attached to\n\
(which must be a coordinator of a high-availability cluster) and returns the\n\
current cluster topology. This is a low-level primitive intended for building\n\
client-side routing on top of pymgclient: it only retrieves the table and does\n\
not open any further connections or dispatch queries.\n\
\n\
The negotiated Bolt protocol version must be at least 4.3, and the connection\n\
must be idle (no query in progress and no open transaction).\n\
\n\
* :obj:`routing_context`\n\
\n\
Optional :class:`dict` with routing context (for example the address\n\
used to contact the coordinator). Defaults to an empty map.\n\
\n\
* :obj:`bookmarks`\n\
\n\
Optional iterable of bookmark strings, or :obj:`None` for none.\n\
\n\
* :obj:`extra`\n\
\n\
Optional :class:`dict` with extra information. On Bolt 4.4 it is sent\n\
verbatim; on Bolt 4.3 only its ``\"db\"`` string entry (if present) is\n\
used to populate the database-name field.\n\
\n\
Returns a :class:`dict` of the form::\n\
\n\
{\n\
\"ttl\": <int seconds>,\n\
\"servers\": [\n\
{\"addresses\": [\"host:port\", ...], \"role\": \"READ\"|\"WRITE\"|\"ROUTE\"},\n\
...\n\
]\n\
}");
// clang-format on

static PyObject *connection_get_routing_table(ConnectionObject *conn,
PyObject *args,
PyObject *kwargs) {
static char *kwlist[] = {"routing_context", "bookmarks", "extra", NULL};
PyObject *routing_context = NULL;
PyObject *bookmarks = NULL;
PyObject *extra = NULL;

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kwlist,
&routing_context, &bookmarks, &extra)) {
return NULL;
}

if (connection_raise_if_bad_status(conn) < 0) {
return NULL;
}

if (conn->status != CONN_STATUS_READY) {
PyErr_SetString(InterfaceError,
"cannot get routing table while a query is in progress or "
"a transaction is open");
return NULL;
}

mg_map *mg_routing = NULL;
mg_map *mg_extra = NULL;
mg_list *mg_bookmarks = NULL;

if (routing_context && routing_context != Py_None) {
if (!PyDict_Check(routing_context)) {
PyErr_SetString(PyExc_TypeError, "routing_context must be a dict");
return NULL;
}
mg_routing = py_dict_to_mg_map(routing_context);
if (!mg_routing) {
return NULL;
}
} else {
mg_routing = mg_map_make_empty(0);
if (!mg_routing) {
PyErr_SetString(PyExc_RuntimeError, "couldn't allocate routing map");
return NULL;
}
}

if (extra && extra != Py_None) {
if (!PyDict_Check(extra)) {
PyErr_SetString(PyExc_TypeError, "extra must be a dict");
goto cleanup_error;
}
mg_extra = py_dict_to_mg_map(extra);
if (!mg_extra) {
goto cleanup_error;
}
}

if (bookmarks && bookmarks != Py_None) {
// A str/bytes is itself a sequence, so without this check it would be
// silently iterated into single-character bookmarks.
if (PyUnicode_Check(bookmarks) || PyBytes_Check(bookmarks)) {
PyErr_SetString(PyExc_TypeError,
"bookmarks must be an iterable of str, not a single "
"str or bytes");
goto cleanup_error;
}
PyObject *seq =
PySequence_Fast(bookmarks, "bookmarks must be an iterable of str");
if (!seq) {
goto cleanup_error;
}
Py_ssize_t size = PySequence_Fast_GET_SIZE(seq);
if (size > UINT32_MAX) {
Py_DECREF(seq);
PyErr_SetString(PyExc_ValueError, "bookmarks size exceeded");
goto cleanup_error;
}
mg_bookmarks = mg_list_make_empty((uint32_t)size);
if (!mg_bookmarks) {
Comment thread
mattkjames7 marked this conversation as resolved.
Py_DECREF(seq);
PyErr_SetString(PyExc_RuntimeError, "couldn't allocate bookmarks list");
goto cleanup_error;
}
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *item = PySequence_Fast_GET_ITEM(seq, i); // borrowed reference
if (!PyUnicode_Check(item)) {
Py_DECREF(seq);
PyErr_SetString(PyExc_TypeError, "bookmarks must be str");
goto cleanup_error;
}
const char *bookmark = PyUnicode_AsUTF8(item);
if (!bookmark) {
Py_DECREF(seq);
goto cleanup_error;
}
mg_value *value = mg_value_make_string(bookmark);
if (!value || mg_list_append(mg_bookmarks, value) != 0) {
mg_value_destroy(value);
Py_DECREF(seq);
PyErr_SetString(PyExc_RuntimeError, "couldn't build bookmarks list");
goto cleanup_error;
}
}
Py_DECREF(seq);
}

mg_map *routing_table = NULL;
int status = mg_session_route(conn->session, mg_routing, mg_bookmarks,
mg_extra, &routing_table);

mg_map_destroy(mg_routing);
mg_map_destroy(mg_extra);
mg_list_destroy(mg_bookmarks);

if (status != 0) {
connection_handle_error(conn, status);
return NULL;
}

PyObject *result = mg_map_to_py_dict(routing_table);
mg_map_destroy(routing_table);
return result;

cleanup_error:
mg_map_destroy(mg_routing);
mg_map_destroy(mg_extra);
mg_list_destroy(mg_bookmarks);
return NULL;
}

static PyMethodDef connection_methods[] = {
{"close", (PyCFunction)connection_close, METH_NOARGS, connection_close_doc},
{"commit", (PyCFunction)connection_commit, METH_NOARGS,
Expand All @@ -346,6 +518,8 @@ static PyMethodDef connection_methods[] = {
connection_rollback_doc},
{"cursor", (PyCFunction)connection_cursor, METH_NOARGS,
connection_cursor_doc},
{"get_routing_table", (PyCFunction)connection_get_routing_table,
METH_VARARGS | METH_KEYWORDS, connection_get_routing_table_doc},
{NULL, NULL, 0, NULL}};

// clang-format off
Expand Down
2 changes: 2 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ extern PyTypeObject ConnectionType;

int connection_raise_if_bad_status(const ConnectionObject *conn);

void connection_handle_error(ConnectionObject *conn, int error);

int connection_run_without_results(ConnectionObject *conn, const char *query);

int connection_run(ConnectionObject *conn, const char *query, PyObject *params,
Expand Down
Loading
Loading