diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index fa6bb2e0..5ddf2a69 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -14,6 +14,7 @@ from __future__ import annotations +import gc from time import sleep from typing import TYPE_CHECKING, List @@ -21,6 +22,8 @@ from aws_advanced_python_wrapper.errors import ( FailoverSuccessError, TransactionResolutionUnknownError) +from aws_advanced_python_wrapper.host_monitoring_plugin import \ + MonitoringThreadContainer from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) from .utils.conditions import (disable_on_features, enable_on_deployments, @@ -56,6 +59,8 @@ def setup_method(self, request): self.logger.info(f"Starting test: {request.node.name}") yield self.logger.info(f"Ending test: {request.node.name}") + MonitoringThreadContainer.clean_up() + gc.collect() @pytest.fixture(scope='class') def aurora_utility(self): @@ -132,6 +137,7 @@ def test_fail_from_writer_to_new_writer_fail_on_connection_bound_object_invocati assert aurora_utility.is_db_instance_writer(current_connection_id) is True assert current_connection_id != initial_writer_id + @pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"]) @enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED, TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED]) def test_fail_from_reader_to_writer( @@ -140,12 +146,13 @@ def test_fail_from_reader_to_writer( test_driver: TestDriver, conn_utils, proxied_props, - aurora_utility): + aurora_utility, + plugins): target_driver_connect = DriverHelper.get_connect_func(test_driver) reader: TestInstanceInfo = test_environment.get_proxy_instances()[1] writer_id: str = test_environment.get_proxy_writer().get_instance_id() - proxied_props["plugins"] = "failover,host_monitoring" + proxied_props["plugins"] = plugins with AwsWrapperConnection.connect( target_driver_connect, **conn_utils.get_proxy_connect_params(reader.get_host()), diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index e0c5504f..4de78f57 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -16,6 +16,9 @@ from typing import TYPE_CHECKING +from aws_advanced_python_wrapper.host_monitoring_plugin import \ + MonitoringThreadContainer + if TYPE_CHECKING: from .utils.test_instance_info import TestInstanceInfo from .utils.test_driver import TestDriver @@ -126,12 +129,13 @@ def test_proxied_wrapper_connection_failed( # That is expected exception. Test pass. assert True + @pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"]) @enable_on_num_instances(min_instances=2) @enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER]) @enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED]) def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins): props: Properties = Properties({ - WrapperProperties.PLUGINS.name: "host_monitoring", + WrapperProperties.PLUGINS.name: plugins, "socket_timeout": 5, "connect_timeout": 5, "monitoring-connect_timeout": 3, @@ -145,3 +149,5 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T assert 1 == result[0] conn.close() + + MonitoringThreadContainer.clean_up() diff --git a/tests/integration/container/test_read_write_splitting.py b/tests/integration/container/test_read_write_splitting.py index d73dabb6..6cf94137 100644 --- a/tests/integration/container/test_read_write_splitting.py +++ b/tests/integration/container/test_read_write_splitting.py @@ -24,6 +24,8 @@ AwsWrapperError, FailoverFailedError, FailoverSuccessError, ReadWriteSplittingError, TransactionResolutionUnknownError) from aws_advanced_python_wrapper.host_list_provider import RdsHostListProvider +from aws_advanced_python_wrapper.host_monitoring_plugin import \ + MonitoringThreadContainer from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider from aws_advanced_python_wrapper.utils.log import Logger @@ -61,6 +63,9 @@ def setup_method(self, request): yield self.logger.info(f"Ending test: {request.node.name}") + MonitoringThreadContainer.clean_up() + gc.collect() + @pytest.fixture(scope='class') def rds_utils(self): region: str = TestEnvironment.get_current().get_info().get_region() @@ -357,15 +362,18 @@ def test_failover_to_new_writer__switch_read_only( current_id = rds_utils.query_instance_id(conn) assert new_writer_id == current_id + @pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"]) @enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED, TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED]) @enable_on_num_instances(min_instances=3) @disable_on_engines([DatabaseEngine.MYSQL]) def test_failover_to_new_reader__switch_read_only( self, test_environment: TestEnvironment, test_driver: TestDriver, - proxied_failover_props, conn_utils, rds_utils): + proxied_failover_props, conn_utils, rds_utils, plugins): WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer") + WrapperProperties.PLUGINS.set(proxied_failover_props, plugins) + target_driver_connect = DriverHelper.get_connect_func(test_driver) with AwsWrapperConnection.connect( target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn: @@ -404,13 +412,15 @@ def test_failover_to_new_reader__switch_read_only( current_id = rds_utils.query_instance_id(conn) assert other_reader_id == current_id + @pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"]) @enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED, TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED]) @enable_on_num_instances(min_instances=3) @disable_on_engines([DatabaseEngine.MYSQL]) def test_failover_reader_to_writer__switch_read_only( self, test_environment: TestEnvironment, test_driver: TestDriver, - proxied_failover_props, conn_utils, rds_utils): + proxied_failover_props, conn_utils, rds_utils, plugins): + WrapperProperties.PLUGINS.set(proxied_failover_props, plugins) target_driver_connect = DriverHelper.get_connect_func(test_driver) with AwsWrapperConnection.connect( target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn: @@ -522,12 +532,13 @@ def test_pooled_connection__cluster_url_failover( new_driver_conn = conn.target_connection assert initial_driver_conn is not new_driver_conn + @pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring", "read_write_splitting,failover,host_monitoring_v2"]) @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED, TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED]) @disable_on_engines([DatabaseEngine.MYSQL]) def test_pooled_connection__failover_failed( self, test_environment: TestEnvironment, test_driver: TestDriver, - rds_utils, conn_utils, proxied_failover_props): + rds_utils, conn_utils, proxied_failover_props, plugins): writer_host = test_environment.get_writer().get_host() provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host) ConnectionProviderManager.set_connection_provider(provider) @@ -535,6 +546,7 @@ def test_pooled_connection__failover_failed( WrapperProperties.FAILOVER_TIMEOUT_SEC.set(proxied_failover_props, "1") WrapperProperties.FAILURE_DETECTION_TIME_MS.set(proxied_failover_props, "1000") WrapperProperties.FAILURE_DETECTION_COUNT.set(proxied_failover_props, "1") + WrapperProperties.PLUGINS.set(proxied_failover_props, plugins) target_driver_connect = DriverHelper.get_connect_func(test_driver) with AwsWrapperConnection.connect(