Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/restate-e2e-services/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function rawCall(
ctx: restate.Context,
request: ProxyRequest
): Promise<Uint8Array> {
return ctx.genericCall({
return ctx.call({
service: request.serviceName,
method: request.handlerName,
key: request.virtualObjectKey,
Expand All @@ -44,7 +44,7 @@ function rawCall(
}

function rawSend(ctx: restate.Context, request: ProxyRequest): Promise<string> {
const handle = ctx.genericSend({
const handle = ctx.send({
service: request.serviceName,
method: request.handlerName,
key: request.virtualObjectKey,
Expand Down
2 changes: 2 additions & 0 deletions packages/restate-sdk/src/common_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ export type {
WorkflowContext,
WorkflowSharedContext,
Rand,
Call,
GenericCall,
Send,
GenericSend,
InvocationId,
InvocationHandle,
Expand Down
59 changes: 55 additions & 4 deletions packages/restate-sdk/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export type RunOptions<T> = {
* This is a generic mechanism to invoke handlers directly by only knowing
* the service and handler name, (or key in the case of objects or workflows)
*/
export type GenericCall<REQ, RES> = {
export type Call<REQ, RES> = {
service: string;
method: string;
parameter: REQ;
Expand All @@ -245,12 +245,17 @@ export type GenericCall<REQ, RES> = {
idempotencyKey?: string;
};

/**
* @deprecated use {@link Call}
*/
export type GenericCall<REQ, RES> = Call<REQ, RES>;

/**
* Send a message to an handler directly avoiding restate's type safety checks.
* This is a generic mechanism to invoke handlers directly by only knowing
* the service and handler name, (or key in the case of objects or workflows)
*/
export type GenericSend<REQ> = {
export type Send<REQ> = {
service: string;
method: string;
parameter: REQ;
Expand All @@ -261,6 +266,11 @@ export type GenericSend<REQ> = {
idempotencyKey?: string;
};

/**
* @deprecated use {@link Send}
*/
export type GenericSend<REQ> = Send<REQ>;

/**
* The context that gives access to all Restate-backed operations, for example
* - sending reliable messages / RPC through Restate
Expand Down Expand Up @@ -569,11 +579,52 @@ export interface Context extends RestateContext {
opts?: SendOptions
): SendClient<VirtualObject<D>>;

/**
* @deprecated use {@link call} instead
*/
genericCall<REQ = Uint8Array, RES = Uint8Array>(
call: GenericCall<REQ, RES>
call: Call<REQ, RES>
): InvocationPromise<RES>;

genericSend<REQ = Uint8Array>(call: GenericSend<REQ>): InvocationHandle;
/**
* @deprecated use {@link send} instead
*/
genericSend<REQ = Uint8Array>(call: Send<REQ>): InvocationHandle;

/**
* Make a request/response RPC to the specified target service.
*
* The RPC goes through Restate and is guaranteed to be reliably delivered. The RPC is also
* journaled for durable execution and will thus not be duplicated when the handler is re-invoked
* for retries or after suspending.
*
* This call will return the result produced by the target handler, or the TerminalError, if the target
* handler finishes with a Terminal Error.
*
* This call is a suspension point: The handler might suspend while awaiting the response and
* resume once the response is available.
*
* @param call send target and options
*/
call<REQ, RES>(call: Call<REQ, RES>): InvocationPromise<RES>;

/**
* Send a request to the specified target service. This method effectively behaves
* like enqueuing the message in a message queue.
*
* The message goes through Restate and is guaranteed to be reliably delivered. The RPC is also
* journaled for durable execution and will thus not be duplicated when the handler is re-invoked
* for retries or after suspending.
*
* This returns immediately; the message sending happens asynchronously in the background.
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
* triggers the send (calls this function) happens logically after the sending. That means that any
* failure where the message does not reach Restate also cannot complete this invocation, and will
* hence recover this handler and (through the durable execution) recover the message to be sent.
*
* @param send send target and options
*/
send<REQ>(send: Send<REQ>): InvocationHandle;

/**
* Returns the raw request that triggered that handler.
Expand Down
54 changes: 32 additions & 22 deletions packages/restate-sdk/src/context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import type {
ContextDate,
DurablePromise,
GenericCall,
GenericSend,
Call,
Send,
InvocationHandle,
InvocationId,
InvocationPromise,
Expand Down Expand Up @@ -217,13 +217,11 @@ export class ContextImpl implements ObjectContext, WorkflowContext {

// --- Calls, background calls, etc
//
public genericCall<REQ = Uint8Array, RES = Uint8Array>(
call: GenericCall<REQ, RES>
): InvocationPromise<RES> {
public call<REQ, RES>(call: Call<REQ, RES>): InvocationPromise<RES> {
const requestSerde: Serde<REQ> =
call.inputSerde ?? (serde.binary as Serde<REQ>);
call.inputSerde ?? (this.defaultSerde as Serde<REQ>);
const responseSerde: Serde<RES> =
call.outputSerde ?? (serde.binary as Serde<RES>);
call.outputSerde ?? (this.defaultSerde as Serde<RES>);

let parameter: Uint8Array;
try {
Expand Down Expand Up @@ -284,10 +282,8 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
}
}

public genericSend<REQ = Uint8Array>(
send: GenericSend<REQ>
): InvocationHandle {
const requestSerde = send.inputSerde ?? (serde.binary as Serde<REQ>);
public send<REQ>(send: Send<REQ>): InvocationHandle {
const requestSerde = send.inputSerde ?? (this.defaultSerde as Serde<REQ>);

let parameter: Uint8Array;
try {
Expand Down Expand Up @@ -345,21 +341,35 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
}
}

serviceClient<D>({ name }: ServiceDefinitionFrom<D>): Client<Service<D>> {
return makeRpcCallProxy(
(call) => this.genericCall(call),
this.defaultSerde,
public genericCall<REQ = Uint8Array, RES = Uint8Array>(
call: Call<REQ, RES>
): InvocationPromise<RES> {
return this.call({
...call,
// Just different defaults here.
inputSerde: call.inputSerde ?? (serde.binary as Serde<REQ>),
outputSerde: call.outputSerde ?? (serde.binary as Serde<RES>),
});
}

name
);
public genericSend<REQ = Uint8Array>(send: Send<REQ>): InvocationHandle {
return this.send({
...send,
// Just different defaults here.
inputSerde: send.inputSerde ?? (serde.binary as Serde<REQ>),
});
}

serviceClient<D>({ name }: ServiceDefinitionFrom<D>): Client<Service<D>> {
return makeRpcCallProxy((call) => this.call(call), this.defaultSerde, name);
}

objectClient<D>(
{ name }: VirtualObjectDefinitionFrom<D>,
key: string
): Client<VirtualObject<D>> {
return makeRpcCallProxy(
(call) => this.genericCall(call),
(call) => this.call(call),
this.defaultSerde,
name,
key
Expand All @@ -371,7 +381,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
key: string
): Client<Workflow<D>> {
return makeRpcCallProxy(
(call) => this.genericCall(call),
(call) => this.call(call),
this.defaultSerde,
name,
key
Expand All @@ -383,7 +393,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
opts?: SendOptions
): SendClient<Service<D>> {
return makeRpcSendProxy(
(send) => this.genericSend(send),
(send) => this.send(send),
this.defaultSerde,
name,
undefined,
Expand All @@ -397,7 +407,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
opts?: SendOptions
): SendClient<VirtualObject<D>> {
return makeRpcSendProxy(
(send) => this.genericSend(send),
(send) => this.send(send),
this.defaultSerde,
name,
key,
Expand All @@ -411,7 +421,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
opts?: SendOptions
): SendClient<Workflow<D>> {
return makeRpcSendProxy(
(send) => this.genericSend(send),
(send) => this.send(send),
this.defaultSerde,
name,
key,
Expand Down
8 changes: 4 additions & 4 deletions packages/restate-sdk/src/types/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
/* eslint-disable @typescript-eslint/ban-types */
import type {
Context,
GenericCall,
GenericSend,
Call,
Send,
InvocationHandle,
InvocationPromise,
ObjectContext,
Expand Down Expand Up @@ -168,7 +168,7 @@ function optsFromArgs(args: unknown[]): {
}

export const makeRpcCallProxy = <T>(
genericCall: (call: GenericCall<unknown, unknown>) => Promise<unknown>,
genericCall: (call: Call<unknown, unknown>) => Promise<unknown>,
defaultSerde: Serde<any>,
service: string,
key?: string
Expand Down Expand Up @@ -203,7 +203,7 @@ export const makeRpcCallProxy = <T>(
};

export const makeRpcSendProxy = <T>(
genericSend: (send: GenericSend<unknown>) => void,
genericSend: (send: Send<unknown>) => void,
defaultSerde: Serde<any>,
service: string,
key?: string,
Expand Down
Loading