Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/dl_temporal/dl_temporal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
from .client import (
AlreadyExists,
EmptyMetadataProvider,
EmptyMetadataProviderSettings,
MetadataProvider,
MetadataProviderSettings,
PermissionDenied,
TemporalClient,
TemporalClientDependencies,
TemporalClientError,
TemporalClientSettings,
)
Expand All @@ -33,10 +36,13 @@
"WorkflowProtocol",
"define_activity",
"define_workflow",
"MetadataProvider",
"EmptyMetadataProvider",
"EmptyMetadataProviderSettings",
"MetadataProvider",
"MetadataProviderSettings",
"TemporalClientError",
"TemporalClient",
"TemporalClientDependencies",
"TemporalClientSettings",
"AlreadyExists",
"PermissionDenied",
Expand Down
32 changes: 32 additions & 0 deletions lib/dl_temporal/dl_temporal/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from .aiohttp import (
HttpServerAppFactoryMixin,
HttpServerAppMixin,
HttpServerAppSettingsMixin,
HttpServerSettings,
)
from .base import (
BaseTemporalWorkerApp,
BaseTemporalWorkerAppFactory,
BaseTemporalWorkerAppSettings,
)
from .temporal import (
TemporalWorkerAppFactoryMixin,
TemporalWorkerAppMixin,
TemporalWorkerAppSettingsMixin,
TemporalWorkerSettings,
)


__all__ = [
"HttpServerSettings",
"HttpServerAppFactoryMixin",
"HttpServerAppMixin",
"HttpServerAppSettingsMixin",
"TemporalWorkerSettings",
"TemporalWorkerAppFactoryMixin",
"TemporalWorkerAppMixin",
"TemporalWorkerAppSettingsMixin",
"BaseTemporalWorkerApp",
"BaseTemporalWorkerAppFactory",
"BaseTemporalWorkerAppSettings",
]
118 changes: 118 additions & 0 deletions lib/dl_temporal/dl_temporal/app/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import (
Generic,
TypeVar,
)

import aiohttp.web
import attr
from typing_extensions import override

import dl_settings
import dl_temporal.utils.aiohttp.handlers as aiohttp_handlers
import dl_temporal.utils.aiohttp.printer as aiohttp_printer
import dl_temporal.utils.app as app_utils
import dl_temporal.utils.singleton as singleton_utils


class HttpServerSettings(dl_settings.BaseSettings):
host: str
port: int


class HttpServerAppSettingsMixin(app_utils.BaseAppSettings):
http_server: HttpServerSettings = NotImplemented


@attr.define(frozen=True, kw_only=True)
class HttpServerAppMixin(app_utils.BaseApp):
...


AppType = TypeVar("AppType", bound=HttpServerAppMixin)


@attr.define(kw_only=True, slots=False)
class HttpServerAppFactoryMixin(
app_utils.BaseAppFactory[AppType],
Generic[AppType],
):
settings: HttpServerAppSettingsMixin

@override
@singleton_utils.singleton_class_method_result
async def _get_main_callbacks(
self,
) -> list[app_utils.Callback]:
result = await super()._get_main_callbacks()

result.append(
app_utils.Callback(
coroutine=aiohttp.web._run_app(
app=await self._get_aiohttp_app(),
host=self.settings.http_server.host,
port=self.settings.http_server.port,
print=aiohttp_printer.PrintLogger(),
),
name="run_http_server",
),
)

return result

@singleton_utils.singleton_class_method_result
async def _get_aiohttp_app(
self,
) -> aiohttp.web.Application:
app = aiohttp.web.Application()
app.add_routes(
routes=await self._get_aiohttp_app_routes(),
)
return app

@singleton_utils.singleton_class_method_result
async def _get_aiohttp_app_routes(
self,
) -> list[aiohttp.web.RouteDef]:
result: list[aiohttp.web.RouteDef] = []

aiohttp_liveness_probe_handler = await self._get_aiohttp_liveness_probe_handler()
result.append(
aiohttp.web.route(
method="GET",
path="/api/v1/health/liveness",
handler=aiohttp_liveness_probe_handler.process,
),
)

aiohttp_readiness_probe_handler = await self._get_aiohttp_readiness_probe_handler()
result.append(
aiohttp.web.route(
method="GET",
path="/api/v1/health/readiness",
handler=aiohttp_readiness_probe_handler.process,
),
)

return result

@singleton_utils.singleton_class_method_result
async def _get_aiohttp_liveness_probe_handler(
self,
) -> aiohttp_handlers.LivenessProbeHandler:
return aiohttp_handlers.LivenessProbeHandler()

@singleton_utils.singleton_class_method_result
async def _get_aiohttp_readiness_probe_handler(
self,
) -> aiohttp_handlers.ReadinessProbeHandler:
subsystems = await self._get_aiohttp_subsystem_readiness_callbacks()

return aiohttp_handlers.ReadinessProbeHandler(
subsystems=subsystems,
)

@singleton_utils.singleton_class_method_result
async def _get_aiohttp_subsystem_readiness_callbacks(
self,
) -> list[aiohttp_handlers.SubsystemReadinessCallback]:
return []
68 changes: 68 additions & 0 deletions lib/dl_temporal/dl_temporal/app/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import (
Generic,
TypeVar,
)

import attr
from typing_extensions import override

import dl_temporal.app.aiohttp as aiohttp_app
import dl_temporal.app.temporal as temporal_app
import dl_temporal.utils.aiohttp as aiohttp_utils
import dl_temporal.utils.app as app_utils
import dl_temporal.utils.singleton as singleton_utils


class BaseTemporalWorkerAppSettings(
temporal_app.TemporalWorkerAppSettingsMixin,
aiohttp_app.HttpServerAppSettingsMixin,
app_utils.BaseAppSettings,
):
...


@attr.define(frozen=True, kw_only=True)
class BaseTemporalWorkerApp(
temporal_app.TemporalWorkerAppMixin,
aiohttp_app.HttpServerAppMixin,
app_utils.BaseApp,
):
...


AppType = TypeVar("AppType", bound=BaseTemporalWorkerApp)


@attr.define(kw_only=True, slots=False)
class BaseTemporalWorkerAppFactory(
temporal_app.TemporalWorkerAppFactoryMixin[AppType],
aiohttp_app.HttpServerAppFactoryMixin[AppType],
app_utils.BaseAppFactory[AppType],
Generic[AppType],
):
settings: BaseTemporalWorkerAppSettings

@override
@singleton_utils.singleton_class_method_result
async def _get_aiohttp_subsystem_readiness_callbacks(
self,
) -> list[aiohttp_utils.SubsystemReadinessCallback]:
result = await super()._get_aiohttp_subsystem_readiness_callbacks()

temporal_client = await self._get_temporal_client()
result.append(
aiohttp_utils.SubsystemReadinessAsyncCallback(
name="temporal_client.check_health",
is_ready=temporal_client.check_health,
),
)

temporal_worker = await self._get_temporal_worker()
result.append(
aiohttp_utils.SubsystemReadinessSyncCallback(
name="temporal_worker.is_running",
is_ready=lambda: temporal_worker.is_running,
),
)

return result
100 changes: 100 additions & 0 deletions lib/dl_temporal/dl_temporal/app/temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import abc
from typing import (
Generic,
TypeVar,
)

import attr
import temporalio.worker
from typing_extensions import override

import dl_settings
import dl_temporal.base as base
import dl_temporal.client as client
import dl_temporal.utils.app as app_utils
import dl_temporal.utils.singleton as singleton_utils
import dl_temporal.worker as worker


class TemporalWorkerSettings(dl_settings.BaseSettings):
task_queue: str


class TemporalWorkerAppSettingsMixin(app_utils.BaseAppSettings):
temporal_client: client.TemporalClientSettings = NotImplemented
temporal_worker: TemporalWorkerSettings = NotImplemented


@attr.define(frozen=True, kw_only=True)
class TemporalWorkerAppMixin(app_utils.BaseApp):
...


AppType = TypeVar("AppType", bound=TemporalWorkerAppMixin)


@attr.define(kw_only=True, slots=False)
class TemporalWorkerAppFactoryMixin(
app_utils.BaseAppFactory[AppType],
Generic[AppType],
):
settings: TemporalWorkerAppSettingsMixin

@override
@singleton_utils.singleton_class_method_result
async def _get_main_callbacks(
self,
) -> list[app_utils.Callback]:
result = await super()._get_main_callbacks()

temporal_worker = await self._get_temporal_worker()
result.append(app_utils.Callback(name="temporal_worker", coroutine=temporal_worker.run()))

return result

@singleton_utils.singleton_class_method_result
async def _get_temporal_client(
self,
) -> client.TemporalClient:
return await client.TemporalClient.from_dependencies(
dependencies=client.TemporalClientDependencies(
namespace=self.settings.temporal_client.namespace,
host=self.settings.temporal_client.host,
port=self.settings.temporal_client.port,
tls=self.settings.temporal_client.tls,
lazy=False,
metadata_provider=await self._get_temporal_client_metadata_provider(),
),
)

@singleton_utils.singleton_class_method_result
async def _get_temporal_worker(
self,
) -> temporalio.worker.Worker:
return worker.create_worker(
task_queue=self.settings.temporal_worker.task_queue,
client=await self._get_temporal_client(),
workflows=await self._get_temporal_workflows(),
activities=await self._get_temporal_activities(),
)

@abc.abstractmethod
@singleton_utils.singleton_class_method_result
async def _get_temporal_workflows(
self,
) -> list[type[base.WorkflowProtocol]]:
...

@abc.abstractmethod
@singleton_utils.singleton_class_method_result
async def _get_temporal_activities(
self,
) -> list[base.ActivityProtocol]:
...

@abc.abstractmethod
@singleton_utils.singleton_class_method_result
async def _get_temporal_client_metadata_provider(
self,
) -> client.MetadataProvider:
...
8 changes: 7 additions & 1 deletion lib/dl_temporal/dl_temporal/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import (
TemporalClient,
TemporalClientDependencies,
TemporalClientSettings,
)
from .exc import (
Expand All @@ -9,14 +10,19 @@
)
from .metadata import (
EmptyMetadataProvider,
EmptyMetadataProviderSettings,
MetadataProvider,
MetadataProviderSettings,
)


__all__ = [
"MetadataProvider",
"EmptyMetadataProvider",
"EmptyMetadataProviderSettings",
"MetadataProvider",
"MetadataProviderSettings",
"TemporalClient",
"TemporalClientDependencies",
"TemporalClientSettings",
"TemporalClientError",
"PermissionDenied",
Expand Down
Loading
Loading