|  | 
|  | 1 | +from testcontainers.core.container import DockerContainer | 
|  | 2 | +from testcontainers.core.waiting_utils import wait_for_logs, wait_container_is_ready | 
|  | 3 | +import os | 
|  | 4 | +import ssl | 
|  | 5 | +import socket | 
|  | 6 | +from typing import Iterable, Callable | 
|  | 7 | +from typing_extensions import Self | 
|  | 8 | +from azure.cosmos import CosmosClient as SyncCosmosClient | 
|  | 9 | +from azure.cosmos.aio import CosmosClient as AsyncCosmosClient | 
|  | 10 | +from azure.core.exceptions import ServiceRequestError | 
|  | 11 | + | 
|  | 12 | +from urllib.request import urlopen | 
|  | 13 | +from urllib.error import HTTPError, URLError | 
|  | 14 | + | 
|  | 15 | +from enum import Enum, auto | 
|  | 16 | + | 
|  | 17 | +__all__ = ["CosmosDBEmulatorContainer", "Endpoints"] | 
|  | 18 | + | 
|  | 19 | +class Endpoints(Enum): | 
|  | 20 | +	Direct    = auto() | 
|  | 21 | +	Gremlin   = auto() | 
|  | 22 | +	Table     = auto() | 
|  | 23 | +	MongoDB   = auto() | 
|  | 24 | +	Cassandra = auto() | 
|  | 25 | + | 
|  | 26 | +ALL_ENDPOINTS = { e for e in Endpoints } | 
|  | 27 | + | 
|  | 28 | +# Ports mostly derived from https://docs.microsoft.com/en-us/azure/cosmos-db/emulator-command-line-parameters | 
|  | 29 | +EMULATOR_PORT = 8081 | 
|  | 30 | +endpoint_ports = { | 
|  | 31 | +	Endpoints.Direct   : frozenset([10251, 10252, 10253, 10254]), | 
|  | 32 | +	Endpoints.Gremlin  : frozenset([8901]), | 
|  | 33 | +	Endpoints.Table    : frozenset([8902]), | 
|  | 34 | +	Endpoints.MongoDB  : frozenset([10255]), | 
|  | 35 | +	Endpoints.Cassandra: frozenset([10350]), | 
|  | 36 | +} | 
|  | 37 | + | 
|  | 38 | +def is_truthy_string(s: str): | 
|  | 39 | +	return s.lower().strip() in {"true", "yes", "y", "1"} | 
|  | 40 | + | 
|  | 41 | +class CosmosDBEmulatorContainer(DockerContainer): | 
|  | 42 | +	""" | 
|  | 43 | +    CosmosDB Emulator container. | 
|  | 44 | +
 | 
|  | 45 | +    Example: | 
|  | 46 | +
 | 
|  | 47 | +			.. doctest:: | 
|  | 48 | +				>>> from testcontainers.cosmosdb import CosmosDBEmulatorContainer | 
|  | 49 | +				>>> with CosmosDBEmulatorContainer() as cosmosdb: | 
|  | 50 | +				...    db = cosmosdb.sync_client().create_database_if_not_exists("test") | 
|  | 51 | +
 | 
|  | 52 | +			.. doctest:: | 
|  | 53 | +				>>> from testcontainers.cosmosdb import CosmosDBEmulatorContainer | 
|  | 54 | +				>>> with CosmosDBEmulatorContainer() as emulator: | 
|  | 55 | +				...    cosmosdb = CosmosClient(url=emulator.url, credential=emulator.key, connection_verify=False) | 
|  | 56 | +				...    db = cosmosdb.create_database_if_not_exists("test") | 
|  | 57 | +
 | 
|  | 58 | +			.. doctest:: | 
|  | 59 | +				>>> from testcontainers.cosmosdb import CosmosDBEmulatorContainer, Endpoints | 
|  | 60 | +				>>> with CosmosDBEmulatorContainer(endpoints=[Endpoints.MongoDB]) as emulator: | 
|  | 61 | +				...    print(f"Point yout MongoDB client to {emulator.host}:{emulator.ports(Endpoints.MongoDB)[0]}") | 
|  | 62 | +	""" | 
|  | 63 | +	def __init__( | 
|  | 64 | +			self, | 
|  | 65 | +			image: str                     = os.getenv("AZURE_COSMOS_EMULATOR_IMAGE", "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"), | 
|  | 66 | +			partition_count: int           = os.getenv("AZURE_COSMOS_EMULATOR_PARTITION_COUNT", None), | 
|  | 67 | +			enable_data_persistence: bool  = is_truthy_string(os.getenv("AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE", "false")), | 
|  | 68 | +			bind_ports: bool               = is_truthy_string(os.getenv("AZURE_COSMOS_EMULATOR_BIND_PORTS", "true")), | 
|  | 69 | +			key: str                       = os.getenv("AZURE_COSMOS_EMULATOR_KEY", "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="), | 
|  | 70 | +			endpoints: Iterable[Endpoints] = ALL_ENDPOINTS, # the emulator image does not support host-container port mapping | 
|  | 71 | +			**docker_client_kw, | 
|  | 72 | +		): | 
|  | 73 | +		super().__init__(image=image, **docker_client_kw) | 
|  | 74 | + | 
|  | 75 | +		self.partition_count = partition_count | 
|  | 76 | +		self.key = key | 
|  | 77 | +		self.enable_data_persistence = enable_data_persistence | 
|  | 78 | +		self.endpoints = frozenset(endpoints) | 
|  | 79 | +		 | 
|  | 80 | +		self.with_bind_ports(EMULATOR_PORT, EMULATOR_PORT) | 
|  | 81 | + | 
|  | 82 | +		endpoints_ports = [] | 
|  | 83 | +		for endpoint in self.endpoints: | 
|  | 84 | +			endpoints_ports.extend(endpoint_ports[endpoint]) | 
|  | 85 | + | 
|  | 86 | +		if bind_ports: | 
|  | 87 | +			[ self.with_bind_ports(port, port) for port in endpoints_ports ] | 
|  | 88 | +		else: | 
|  | 89 | +			self.with_exposed_ports(*endpoints_ports) | 
|  | 90 | + | 
|  | 91 | +	def start(self) -> Self: | 
|  | 92 | +		self._configure() | 
|  | 93 | +		super().start() | 
|  | 94 | +		self._wait_until_ready() | 
|  | 95 | +		return self | 
|  | 96 | + | 
|  | 97 | +	@property | 
|  | 98 | +	def url(self) -> str: | 
|  | 99 | +		""" | 
|  | 100 | +		Returns the url to interact with the emulator | 
|  | 101 | +		""" | 
|  | 102 | +		return f"https://{self.host}:{self.get_exposed_port(EMULATOR_PORT)}" | 
|  | 103 | + | 
|  | 104 | +	@property | 
|  | 105 | +	def host(self) -> str: | 
|  | 106 | +		return self.get_container_host_ip() | 
|  | 107 | + | 
|  | 108 | +	def ports(self, endpoint: Endpoints) -> Iterable[int]: | 
|  | 109 | +		assert endpoint in self.endpoints, f"Endpoint {endpoint} is not exposed" | 
|  | 110 | +		return { self.get_exposed_port(p) for p in endpoint_ports[endpoint] } | 
|  | 111 | +	 | 
|  | 112 | +	def async_client(self) -> AsyncCosmosClient: | 
|  | 113 | +		""" | 
|  | 114 | +		Returns an asynchronous CosmosClient instance to interact with the CosmosDB server | 
|  | 115 | +		""" | 
|  | 116 | +		return AsyncCosmosClient(url=self.url, credential=self.key, connection_verify=False) | 
|  | 117 | + | 
|  | 118 | +	def sync_client(self) -> SyncCosmosClient: | 
|  | 119 | +		""" | 
|  | 120 | +		Returns a synchronous CosmosClient instance to interact with the CosmosDB server | 
|  | 121 | +		""" | 
|  | 122 | +		return SyncCosmosClient(url=self.url, credential=self.key, connection_verify=False) | 
|  | 123 | + | 
|  | 124 | +	def _configure(self) -> None: | 
|  | 125 | +		( | 
|  | 126 | +			self | 
|  | 127 | +				.with_env("AZURE_COSMOS_EMULATOR_PARTITION_COUNT", str(self.partition_count)) | 
|  | 128 | +				.with_env("AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE", socket.gethostbyname(socket.gethostname())) | 
|  | 129 | +				.with_env("AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE", str(self.enable_data_persistence)) | 
|  | 130 | +				.with_env("AZURE_COSMOS_EMULATOR_KEY", str(self.key)) | 
|  | 131 | +		) | 
|  | 132 | + | 
|  | 133 | +	@wait_container_is_ready(HTTPError, URLError, ServiceRequestError) | 
|  | 134 | +	def _wait_until_ready(self) -> Self: | 
|  | 135 | +		""" | 
|  | 136 | +		Waits until the CosmosDB Emulator image is ready to be used. | 
|  | 137 | +		""" | 
|  | 138 | +		( | 
|  | 139 | +			self | 
|  | 140 | +				._wait_for_logs(container=self, predicate="Started\\s*$") | 
|  | 141 | +				._wait_for_url(f"{self.url}/_explorer/index.html") | 
|  | 142 | +				._wait_for_query_success(lambda sync_client: list(sync_client.list_databases())) | 
|  | 143 | +		) | 
|  | 144 | +		return self | 
|  | 145 | + | 
|  | 146 | +	def _wait_for_url(self, url: str) -> Self: | 
|  | 147 | +		with urlopen(url, context=ssl._create_unverified_context()) as response: | 
|  | 148 | +			response.read() | 
|  | 149 | +		return self | 
|  | 150 | + | 
|  | 151 | +	def _wait_for_logs(self, *args, **kwargs) -> Self: | 
|  | 152 | +		wait_for_logs(*args, **kwargs) | 
|  | 153 | +		return self | 
|  | 154 | + | 
|  | 155 | +	def _wait_for_query_success(self, query: Callable[[SyncCosmosClient], None]) -> Self: | 
|  | 156 | +		with self.sync_client() as c: | 
|  | 157 | +			query(c) | 
|  | 158 | +		return self | 
0 commit comments