diff --git a/docs/docs.json b/docs/docs.json index ff281e47..ce020d58 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -229,7 +229,8 @@ ] }, "router/event-driven-federated-subscriptions-edfs/kafka", - "router/event-driven-federated-subscriptions-edfs/redis" + "router/event-driven-federated-subscriptions-edfs/redis", + "router/event-driven-federated-subscriptions-edfs/custom-modules" ] }, "router/compliance-and-data-management", diff --git a/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx b/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx new file mode 100644 index 00000000..176aa69e --- /dev/null +++ b/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx @@ -0,0 +1,393 @@ +--- +title: "Custom Modules" +description: "Customize Streams behavior with powerful hooks for subscription lifecycle, event processing, and data transformation." +icon: "cubes" +--- + +Cosmo Router provides powerful hooks for customizing Custom Streams (a.k.a. Event-Driven Federated Subscriptions, or Cosmo Streams) behavior. These hooks allow you to implement custom logic for subscription lifecycle management, event processing, and data transformation. + +## Available Hooks + +The Cosmo Streams system provides three main hook interfaces that you can implement in your custom modules: + +- `SubscriptionOnStartHandler`: Called once at subscription start +- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery +- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider + +## Hook Interfaces + +### SubscriptionOnStartHandler + +This hook is called once when a subscription starts, allowing you to implement custom logic such as authorization checks or initial message sending. + +```go +type SubscriptionOnStartHandler interface { + // OnSubscriptionOnStart is called once at subscription start + // Returning an error will result in a GraphQL error being returned to the client + SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error +} +``` + +**Use cases:** +- Authorization checks at subscription start +- Sending initial messages to clients +- Validating subscription parameters + +### StreamReceiveEventHandler + +This hook is triggered for each client/subscription when a batch of events is received from the provider, before delivering them to the client. + +```go +type StreamReceiveEventHandler interface { + // OnReceiveEvents is called each time a batch of events is received from the provider before delivering them to the client + // So for a single batch of events received from the provider, this hook will be called one time for each active subscription. + // It is important to optimize the logic inside this hook to avoid performance issues. + // Returning an error will result in a GraphQL error being returned to the client + OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) +} +``` + +**Use cases:** +- Event filtering based on client permissions +- Data transformation and mapping +- Event validation and sanitization + + +The `StreamReceiveEventHandler` is called for each active subscription when events are received, so optimize your logic to avoid performance issues. Even small inefficiencies can lead to significant delays when many subscriptions are active. + + +### StreamPublishEventHandler + +This hook is called each time a batch of events is going to be sent to the provider. + +```go +type StreamPublishEventHandler interface { + // OnPublishEvents is called each time a batch of events is going to be sent to the provider + // Returning an error will result in an error being returned and the client will see the mutation failing + OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) +} +``` + +**Use cases:** +- Data transformation before publishing +- Event validation +- Adding metadata to events + +## Context Interfaces + +Each hook provides a rich context interface that gives you access to request information, authentication, and configuration: + +### SubscriptionOnStartHandlerContext + +```go +type SubscriptionOnStartHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration is the subscription event configuration + SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration + // WriteEvent writes an event to the stream of the current subscription + // It returns true if the event was written to the stream, false if the event was dropped + WriteEvent(event datasource.StreamEvent) bool +} +``` + +### StreamReceiveEventHandlerContext + +```go +type StreamReceiveEventHandlerContext interface { + // Request is the initial client request that started the subscription + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration is the subscription event configuration + SubscriptionEventConfiguration() SubscriptionEventConfiguration +} +``` + +### StreamPublishEventHandlerContext + +```go +type StreamPublishEventHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // PublishEventConfiguration is the publish event configuration + PublishEventConfiguration() PublishEventConfiguration +} +``` + +## Core Types + +### StreamEvent Interface + +The `StreamEvent` interface allows the hooks system to be provider-agnostic: + +```go +type StreamEvent interface { + GetData() []byte +} +``` + +Each provider (NATS, Kafka, Redis) will have its own event type with custom fields, but they all implement this common interface. + +### OperationContext + +The `OperationContext` provides access to GraphQL operation information: + +```go +type OperationContext interface { + Name() string + // the variables are currently not available, so we need to expose them here + Variables() *astjson.Value +} +``` + +### Configuration Interfaces + +#### SubscriptionEventConfiguration + +```go +type SubscriptionEventConfiguration interface { + ProviderID() string + ProviderType() string + // the root field name of the subscription in the schema + RootFieldName() string +} +``` + +#### PublishEventConfiguration + +```go +type PublishEventConfiguration interface { + ProviderID() string + ProviderType() string + // the root field name of the mutation in the schema + RootFieldName() string +} +``` + +## Example: Authorization and Event Filtering + +Here's a complete example that demonstrates how to implement authorization checks and event filtering: + +```go +package mymodule + +import ( + "encoding/json" + "slices" + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" +) + +func init() { + // Register your module here and it will be loaded at router start + core.RegisterModule(&MyModule{}) +} + +type MyModule struct {} + +// Implement SubscriptionOnStartHandler for authorization +func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error { + // Check if the provider is NATS + if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats { + return nil + } + + // Check if the provider ID matches + if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" { + return nil + } + + // Check if the subscription is the expected one + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" { + return nil + } + + // Check if the client is authenticated + if ctx.Authentication() == nil { + return core.NewHttpGraphqlError("client is not authenticated", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + } + + // Check if the client has the required permissions + clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["readEmployee"] + if !found { + return core.NewHttpGraphqlError( + "client is not allowed to read employees", + http.StatusText(http.StatusForbidden), + http.StatusForbidden + ) + } + + return nil +} + +// Implement StreamReceiveEventHandler for event filtering and transformation +func (m *MyModule) OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []core.StreamEvent) ([]core.StreamEvent, error) { + // Check if the provider is NATS + if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats { + return events, nil + } + + // Check if the provider ID matches + if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" { + return events, nil + } + + // Check if the subscription is the expected one + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" { + return events, nil + } + + newEvents := make([]core.StreamEvent, 0, len(events)) + + // Check if the client is authenticated + if ctx.Authentication() == nil { + // If the client is not authenticated, return no events + return newEvents, nil + } + + // Get client's allowed entity IDs + clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["allowedEntitiesIds"] + if !found { + return newEvents, fmt.Errorf("client is not allowed to subscribe to the stream") + } + + for _, evt := range events { + natsEvent, ok := evt.(*nats.NatsEvent) + if !ok { + newEvents = append(newEvents, evt) + continue + } + + // Decode the event data coming from the provider + var dataReceived struct { + EmployeeId string `json:"EmployeeId"` + OtherField string `json:"OtherField"` + } + err := json.Unmarshal(natsEvent.Data, &dataReceived) + if err != nil { + return events, fmt.Errorf("error unmarshalling data: %w", err) + } + + // Filter events based on client's permissions + if !slices.Contains(clientAllowedEntitiesIds, dataReceived.EmployeeId) { + continue + } + + // Transform the data to match the expected GraphQL schema + var dataToSend struct { + Id string `json:"id"` + TypeName string `json:"__typename"` + } + dataToSend.Id = dataReceived.EmployeeId + dataToSend.TypeName = "Employee" + + // Marshal the transformed data + dataToSendMarshalled, err := json.Marshal(dataToSend) + if err != nil { + return events, fmt.Errorf("error marshalling data: %w", err) + } + + // Create the new event + newEvent := &nats.NatsEvent{ + Data: dataToSendMarshalled, + Metadata: natsEvent.Metadata, + } + newEvents = append(newEvents, newEvent) + } + return newEvents, nil +} + +func (m *MyModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: "myModule", + Priority: 1, + New: func() core.Module { + return &MyModule{} + }, + } +} + +// Interface guards +var ( + _ core.SubscriptionOnStartHandler = (*MyModule)(nil) + _ core.StreamReceiveEventHandler = (*MyModule)(nil) +) +``` + +## Example: Event Publishing with Transformation + +Here's an example of how to transform events before publishing: + +```go +// Implement StreamPublishEventHandler for event transformation +func (m *MyModule) OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) { + // Check if the provider is NATS + if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeNats { + return events, nil + } + + // Check if the provider ID matches + if ctx.PublishEventConfiguration().ProviderID() != "my-nats" { + return events, nil + } + + // Check if the mutation is the expected one + if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployee" { + return events, nil + } + + transformedEvents := make([]StreamEvent, 0, len(events)) + + for _, evt := range events { + natsEvent, ok := evt.(*nats.NatsEvent) + if !ok { + transformedEvents = append(transformedEvents, evt) + continue + } + + // Decode the original event data + var originalData map[string]interface{} + err := json.Unmarshal(natsEvent.Data, &originalData) + if err != nil { + return events, fmt.Errorf("error unmarshalling data: %w", err) + } + + // Add metadata or transform the data + originalData["timestamp"] = time.Now().Unix() + originalData["source"] = "cosmo-router" + + // Marshal the transformed data + transformedData, err := json.Marshal(originalData) + if err != nil { + return events, fmt.Errorf("error marshalling transformed data: %w", err) + } + + // Create the transformed event + transformedEvent := &nats.NatsEvent{ + Data: transformedData, + Metadata: natsEvent.Metadata, + } + transformedEvents = append(transformedEvents, transformedEvent) + } + + return transformedEvents, nil +} +```