Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions tests/integration/container/test_aurora_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

from __future__ import annotations

import gc
from time import sleep
from typing import TYPE_CHECKING, List

import pytest

from aws_advanced_python_wrapper.errors import (
FailoverSuccessError, TransactionResolutionUnknownError)
from aws_advanced_python_wrapper.host_monitoring_plugin import \
MonitoringThreadContainer
from aws_advanced_python_wrapper.utils.properties import (Properties,
WrapperProperties)
from .utils.conditions import (disable_on_features, enable_on_deployments,
Expand Down Expand Up @@ -56,6 +59,8 @@ def setup_method(self, request):
self.logger.info(f"Starting test: {request.node.name}")
yield
self.logger.info(f"Ending test: {request.node.name}")
MonitoringThreadContainer.clean_up()
gc.collect()

@pytest.fixture(scope='class')
def aurora_utility(self):
Expand Down Expand Up @@ -132,6 +137,7 @@ def test_fail_from_writer_to_new_writer_fail_on_connection_bound_object_invocati
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
assert current_connection_id != initial_writer_id

@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
def test_fail_from_reader_to_writer(
Expand All @@ -140,12 +146,13 @@ def test_fail_from_reader_to_writer(
test_driver: TestDriver,
conn_utils,
proxied_props,
aurora_utility):
aurora_utility,
plugins):
target_driver_connect = DriverHelper.get_connect_func(test_driver)
reader: TestInstanceInfo = test_environment.get_proxy_instances()[1]
writer_id: str = test_environment.get_proxy_writer().get_instance_id()

proxied_props["plugins"] = "failover,host_monitoring"
proxied_props["plugins"] = plugins
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_proxy_connect_params(reader.get_host()),
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/container/test_basic_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from typing import TYPE_CHECKING

from aws_advanced_python_wrapper.host_monitoring_plugin import \
MonitoringThreadContainer

if TYPE_CHECKING:
from .utils.test_instance_info import TestInstanceInfo
from .utils.test_driver import TestDriver
Expand Down Expand Up @@ -126,12 +129,13 @@ def test_proxied_wrapper_connection_failed(
# That is expected exception. Test pass.
assert True

@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
@enable_on_num_instances(min_instances=2)
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
props: Properties = Properties({
WrapperProperties.PLUGINS.name: "host_monitoring",
WrapperProperties.PLUGINS.name: plugins,
"socket_timeout": 5,
"connect_timeout": 5,
"monitoring-connect_timeout": 3,
Expand All @@ -145,3 +149,5 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T
assert 1 == result[0]

conn.close()

MonitoringThreadContainer.clean_up()
18 changes: 15 additions & 3 deletions tests/integration/container/test_read_write_splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
AwsWrapperError, FailoverFailedError, FailoverSuccessError,
ReadWriteSplittingError, TransactionResolutionUnknownError)
from aws_advanced_python_wrapper.host_list_provider import RdsHostListProvider
from aws_advanced_python_wrapper.host_monitoring_plugin import \
MonitoringThreadContainer
from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \
SqlAlchemyPooledConnectionProvider
from aws_advanced_python_wrapper.utils.log import Logger
Expand Down Expand Up @@ -61,6 +63,9 @@ def setup_method(self, request):
yield
self.logger.info(f"Ending test: {request.node.name}")

MonitoringThreadContainer.clean_up()
gc.collect()

@pytest.fixture(scope='class')
def rds_utils(self):
region: str = TestEnvironment.get_current().get_info().get_region()
Expand Down Expand Up @@ -357,15 +362,18 @@ def test_failover_to_new_writer__switch_read_only(
current_id = rds_utils.query_instance_id(conn)
assert new_writer_id == current_id

@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"])
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
@enable_on_num_instances(min_instances=3)
@disable_on_engines([DatabaseEngine.MYSQL])
def test_failover_to_new_reader__switch_read_only(
self, test_environment: TestEnvironment, test_driver: TestDriver,
proxied_failover_props, conn_utils, rds_utils):
proxied_failover_props, conn_utils, rds_utils, plugins):
WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer")

WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)

target_driver_connect = DriverHelper.get_connect_func(test_driver)
with AwsWrapperConnection.connect(
target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn:
Expand Down Expand Up @@ -404,13 +412,15 @@ def test_failover_to_new_reader__switch_read_only(
current_id = rds_utils.query_instance_id(conn)
assert other_reader_id == current_id

@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"])
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
@enable_on_num_instances(min_instances=3)
@disable_on_engines([DatabaseEngine.MYSQL])
def test_failover_reader_to_writer__switch_read_only(
self, test_environment: TestEnvironment, test_driver: TestDriver,
proxied_failover_props, conn_utils, rds_utils):
proxied_failover_props, conn_utils, rds_utils, plugins):
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
target_driver_connect = DriverHelper.get_connect_func(test_driver)
with AwsWrapperConnection.connect(
target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn:
Expand Down Expand Up @@ -522,19 +532,21 @@ def test_pooled_connection__cluster_url_failover(
new_driver_conn = conn.target_connection
assert initial_driver_conn is not new_driver_conn

@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"])
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
@disable_on_engines([DatabaseEngine.MYSQL])
def test_pooled_connection__failover_failed(
self, test_environment: TestEnvironment, test_driver: TestDriver,
rds_utils, conn_utils, proxied_failover_props):
rds_utils, conn_utils, proxied_failover_props, plugins):
writer_host = test_environment.get_writer().get_host()
provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host)
ConnectionProviderManager.set_connection_provider(provider)

WrapperProperties.FAILOVER_TIMEOUT_SEC.set(proxied_failover_props, "1")
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(proxied_failover_props, "1000")
WrapperProperties.FAILURE_DETECTION_COUNT.set(proxied_failover_props, "1")
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)

target_driver_connect = DriverHelper.get_connect_func(test_driver)
with AwsWrapperConnection.connect(
Expand Down