diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index 5ddac021e..3197fafb4 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -25,12 +25,14 @@ var ( flagPurgeOlderThan string flagPurgeAll bool flagPurgeConn *connFlag + flagPurgeForce bool schedulerNamespace string ) var PurgeCmd = &cobra.Command{ Use: "purge", - Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.", + Short: "Purge workflow instances with a terminal state.", + Long: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.", Args: func(cmd *cobra.Command, args []string) error { switch { case cmd.Flags().Changed("all-older-than"), @@ -63,6 +65,7 @@ var PurgeCmd = &cobra.Command{ All: flagPurgeAll, ConnectionString: flagPurgeConn.connectionString, TableName: flagPurgeConn.tableName, + Force: flagPurgeForce, } if cmd.Flags().Changed("all-older-than") { @@ -80,6 +83,7 @@ func init() { PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.") PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.") PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all") + PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.") PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set") diff --git a/cmd/workflow/raiseevent.go b/cmd/workflow/raiseevent.go index cc533a0a1..6a40cc9fc 100644 --- a/cmd/workflow/raiseevent.go +++ b/cmd/workflow/raiseevent.go @@ -30,7 +30,8 @@ var ( var RaiseEventCmd = &cobra.Command{ Use: "raise-event", - Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '/'.", + Short: "Raise an event for a workflow waiting for an external event.", + Long: "Raise an event for a workflow waiting for an external event. Expects a single argument '/'.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/rerun.go b/cmd/workflow/rerun.go index 29b85ba7a..2d45290ba 100644 --- a/cmd/workflow/rerun.go +++ b/cmd/workflow/rerun.go @@ -32,7 +32,8 @@ var ( var ReRunCmd = &cobra.Command{ Use: "rerun [instance ID]", - Short: "ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided.", + Short: "Re-run a workflow instance.", + Long: "ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/run.go b/cmd/workflow/run.go index 2536def0f..6a26b6ed3 100644 --- a/cmd/workflow/run.go +++ b/cmd/workflow/run.go @@ -31,7 +31,8 @@ var ( var RunCmd = &cobra.Command{ Use: "run", - Short: "Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name.", + Short: "Run a workflow instance.", + Long: "Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/workflow.go b/cmd/workflow/workflow.go index 3174792ed..bbfb91ff5 100644 --- a/cmd/workflow/workflow.go +++ b/cmd/workflow/workflow.go @@ -205,7 +205,7 @@ func connectionCmd(cmd *cobra.Command) *connFlag { flagTableName string ) - cmd.Flags().StringVarP(&flagConnectionString, "connection-string", "c", "", "The connection string used to connect and authenticate to the actor state store") + cmd.Flags().StringVarP(&flagConnectionString, "connection-string", "c", "", "Only used for Dapr runtime versions 1.16. The connection string used to connect and authenticate to the actor state store") cmd.Flags().StringVarP(&flagTableName, "table-name", "t", "", "The name of the table or collection which is used as the actor state store") var cflag connFlag diff --git a/go.mod b/go.mod index 3b3bef80a..49b5afda3 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,23 @@ module github.com/dapr/cli -go 1.24.7 +go 1.24.9 + +toolchain go1.24.10 require ( github.com/Masterminds/semver v1.5.0 github.com/Masterminds/semver/v3 v3.3.0 github.com/Pallinder/sillyname-go v0.0.0-20130730142914-97aeae9e6ba1 github.com/briandowns/spinner v1.19.0 - github.com/dapr/dapr v1.16.0 - github.com/dapr/durabletask-go v0.10.0 + github.com/dapr/dapr v1.16.1-rc.3.0.20251118161632-84fbd05c585c + github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547 github.com/dapr/go-sdk v1.13.0 - github.com/dapr/kit v0.16.1 - github.com/diagridio/go-etcd-cron v0.9.1 + github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524 + github.com/diagridio/go-etcd-cron v0.10.0 github.com/docker/docker v25.0.6+incompatible github.com/evanphx/json-patch/v5 v5.9.0 github.com/fatih/color v1.17.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/go-version v1.6.0 @@ -38,6 +41,7 @@ require ( go.mongodb.org/mongo-driver v1.14.0 golang.org/x/mod v0.25.0 golang.org/x/sys v0.33.0 + google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.1 @@ -54,6 +58,7 @@ require ( cel.dev/expr v0.23.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect dario.cat/mergo v1.0.1 // indirect + filippo.io/edwards25519 v1.1.0 // indirect github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/BurntSushi/toml v1.4.0 // indirect @@ -85,7 +90,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect - github.com/dapr/components-contrib v1.16.0 // indirect + github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -250,7 +255,6 @@ require ( golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/grpc v1.73.0 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index b9fec0821..8f0375bfd 100644 --- a/go.sum +++ b/go.sum @@ -169,16 +169,16 @@ github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.3.6 h1:4d9N5ykBnSp5Xn2JkhocYDkOpURL/18CYMpo6xB9uWM= github.com/cyphar/filepath-securejoin v0.3.6/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= -github.com/dapr/components-contrib v1.16.0 h1:kUif6UyxtRz6tXnkuIjbx6z+VLMfc6y+SIYa9T7J3eA= -github.com/dapr/components-contrib v1.16.0/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4= -github.com/dapr/dapr v1.16.0 h1:la2WLZM8Myr2Pq3cyrFjHKWDSPYLzGZCs3p502TwBjI= -github.com/dapr/dapr v1.16.0/go.mod h1:ln/mxvNOeqklaDmic4ppsxmnjl2D/oZGKaJy24IwaEY= -github.com/dapr/durabletask-go v0.10.0 h1:vfIivPl4JYd55xZTslDwhA6p6F8ipcNxBtMaupxArr8= -github.com/dapr/durabletask-go v0.10.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491 h1:ms0IhiGK6Mow+e1DZgIxD3qo7xqOPjpxeFkhahBo+Tg= +github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4= +github.com/dapr/dapr v1.16.1-rc.3.0.20251118161632-84fbd05c585c h1:JIIlb/Qe01bv13lTGwBt9DMdY/hcguBYtQOo+1pquvk= +github.com/dapr/dapr v1.16.1-rc.3.0.20251118161632-84fbd05c585c/go.mod h1:xgEItLT6gtIv/OP8KVb31DhLIZddLZbp0LTAczR8buQ= +github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547 h1:bD4JBlXDHURsgvhIB1HQ1q0k8kYfVo/iNSBi0guSoe0= +github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= github.com/dapr/go-sdk v1.13.0 h1:Qw2BmUonClQ9yK/rrEEaFL1PyDgq616RrvYj0CT67Lk= github.com/dapr/go-sdk v1.13.0/go.mod h1:RsffVNZitDApmQqoS68tNKGMXDZUjTviAbKZupJSzts= -github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= -github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= +github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524 h1:SQ7VeWGnypENpGjsL94wN2IgH+oYHx9ULpYrpFoRExQ= +github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -187,8 +187,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/diagridio/go-etcd-cron v0.9.1 h1:KUfcceDtypL8s3hL0jD2ZoiIzjjXY6xDQ4kT1DJF4Ws= -github.com/diagridio/go-etcd-cron v0.9.1/go.mod h1:CSzuxoCDFu+Gbds0RO73GE8CnmL5t85axiPLptsej3I= +github.com/diagridio/go-etcd-cron v0.10.0 h1:PG1ptpdToeyBjhlINR1Ark5Ne3uAGnBRsGK8hRu+J70= +github.com/diagridio/go-etcd-cron v0.10.0/go.mod h1:CSzuxoCDFu+Gbds0RO73GE8CnmL5t85axiPLptsej3I= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2 h1:aBfCb7iqHmDEIp6fBvC/hQUddQfg+3qdYjwzaiP9Hnc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2/go.mod h1:WHNsWjnIn2V1LYOrME7e8KxSeKunYHsxEm4am0BUtcI= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= diff --git a/pkg/workflow/db/sql.go b/pkg/workflow/db/sql.go index fd1b30dcc..0c0d2d4af 100644 --- a/pkg/workflow/db/sql.go +++ b/pkg/workflow/db/sql.go @@ -18,6 +18,7 @@ import ( "database/sql" "fmt" + _ "github.com/go-sql-driver/mysql" _ "github.com/jackc/pgx/v5/stdlib" _ "github.com/mattn/go-sqlite3" _ "github.com/microsoft/go-mssqldb" @@ -35,8 +36,8 @@ func SQL(ctx context.Context, driver, connString string) (*sql.DB, error) { return db, nil } -func ListSQL(ctx context.Context, db *sql.DB, table string, opts ListOptions) ([]string, error) { - query := fmt.Sprintf(`SELECT key FROM "%s" WHERE key LIKE ?;`, table) +func ListSQL(ctx context.Context, db *sql.DB, table, key string, opts ListOptions) ([]string, error) { + query := fmt.Sprintf(`SELECT "%s" FROM "%s" WHERE key LIKE ?;`, key, table) like := opts.AppID + "||dapr.internal." + opts.Namespace + "." + opts.AppID + ".workflow||%||metadata" rows, err := db.QueryContext(ctx, query, like) diff --git a/pkg/workflow/dclient/dclient.go b/pkg/workflow/dclient/dclient.go index f345fad4f..318a9ca14 100644 --- a/pkg/workflow/dclient/dclient.go +++ b/pkg/workflow/dclient/dclient.go @@ -15,31 +15,52 @@ package dclient import ( "context" + "encoding/json" + "errors" "fmt" "slices" + "sort" "strconv" + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/standalone" + "github.com/dapr/cli/pkg/workflow/db" "github.com/dapr/dapr/pkg/apis/components/v1alpha1" "github.com/dapr/dapr/pkg/components/loader" + "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/durabletask-go/workflow" "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" ) +const maxHistoryEntries = 1000 + type Options struct { - KubernetesMode bool - Namespace string - AppID string - RuntimePath string + KubernetesMode bool + Namespace string + AppID string + RuntimePath string + DBConnectionString *string } type Client struct { - Dapr client.Client - Cancel context.CancelFunc - StateStoreDriver string - ConnectionString *string - TableName *string + Dapr client.Client + WF *workflow.Client + Cancel context.CancelFunc + + kubernetesMode bool + resourcePaths []string + appID string + ns string + dbConnString *string } func DaprClient(ctx context.Context, opts Options) (*Client, error) { @@ -48,7 +69,7 @@ func DaprClient(ctx context.Context, opts Options) (*Client, error) { var client *Client var err error if opts.KubernetesMode { - client, err = kube(opts) + client, err = kube(ctx, opts) } else { client, err = stand(ctx, opts) } @@ -74,31 +95,44 @@ func stand(ctx context.Context, opts Options) (*Client, error) { return nil, fmt.Errorf("Dapr app with id '%s' not found", opts.AppID) } - if len(proc.ResourcePaths) == 0 { + resourcePaths := proc.ResourcePaths + if len(resourcePaths) == 0 { var daprDirPath string daprDirPath, err = standalone.GetDaprRuntimePath(opts.RuntimePath) if err != nil { return nil, err } - proc.ResourcePaths = []string{standalone.GetDaprComponentsPath(daprDirPath)} + resourcePaths = []string{standalone.GetDaprComponentsPath(daprDirPath)} } - comps, err := loader.NewLocalLoader(opts.AppID, proc.ResourcePaths).Load(ctx) + client, err := client.NewClientWithAddress("localhost:" + strconv.Itoa(proc.GRPCPort)) if err != nil { return nil, err } - c, err := clientFromComponents(comps, opts.AppID, strconv.Itoa(proc.GRPCPort)) + //nolint:staticcheck + conn, err := grpc.DialContext(ctx, "localhost:"+strconv.Itoa(proc.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) if err != nil { return nil, err } - c.Cancel = func() {} - return c, nil + return &Client{ + Dapr: client, + WF: workflow.NewClient(conn), + Cancel: func() { conn.Close() }, + kubernetesMode: false, + resourcePaths: resourcePaths, + appID: opts.AppID, + ns: opts.Namespace, + dbConnString: opts.DBConnectionString, + }, nil } -func kube(opts Options) (*Client, error) { +func kube(ctx context.Context, opts Options) (*Client, error) { list, err := kubernetes.List(opts.Namespace) if err != nil { return nil, err @@ -143,27 +177,134 @@ func kube(opts Options) (*Client, error) { return nil, err } - kclient, err := kubernetes.DaprClient() + client, err := client.NewClientWithAddress("localhost:" + strconv.Itoa(port)) if err != nil { return nil, err } - comps, err := kubernetes.ListComponents(kclient, pod.Namespace) + //nolint:staticcheck + conn, err := grpc.DialContext(ctx, "localhost:"+strconv.Itoa(port), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) if err != nil { return nil, err } - c, err := clientFromComponents(comps.Items, opts.AppID, pod.DaprGRPCPort) + return &Client{ + WF: workflow.NewClient(conn), + Dapr: client, + Cancel: func() { conn.Close(); portForward.Stop() }, + kubernetesMode: true, + appID: opts.AppID, + ns: opts.Namespace, + dbConnString: opts.DBConnectionString, + }, nil +} + +func (c *Client) InstanceIDs(ctx context.Context) ([]string, error) { + resp, err := c.WF.ListInstanceIDs(ctx) if err != nil { - portForward.Stop() + code, ok := status.FromError(err) + if !ok || (code.Code() != codes.Unimplemented && code.Code() != codes.Unknown) { + return nil, err + } + + // Dapr is pre v1.17, so fall back to reading from the state store + // directly. + var metaKeys []string + metaKeys, err = c.metaKeysFromDB(ctx) + if err != nil { + return nil, err + } + + instanceIDs := make([]string, 0, len(metaKeys)) + for _, key := range metaKeys { + split := strings.Split(key, "||") + if len(split) != 4 { + continue + } + + instanceIDs = append(instanceIDs, split[2]) + } + + return instanceIDs, err } - c.Cancel = portForward.Stop + ids := resp.InstanceIds - return c, nil + for resp.ContinuationToken != nil { + resp, err = c.WF.ListInstanceIDs(ctx, workflow.WithListInstanceIDsContinuationToken(*resp.ContinuationToken)) + if err != nil { + return nil, err + } + + ids = append(ids, resp.InstanceIds...) + } + + return ids, nil } -func clientFromComponents(comps []v1alpha1.Component, appID string, port string) (*Client, error) { +func (c *Client) InstanceHistory(ctx context.Context, instanceID string) ([]*protos.HistoryEvent, error) { + var history []*protos.HistoryEvent + resp, err := c.WF.GetInstanceHistory(ctx, instanceID) + if err != nil { + code, ok := status.FromError(err) + if !ok || (code.Code() != codes.Unimplemented && code.Code() != codes.Unknown) { + return nil, err + } + + // Dapr is pre v1.17, so fall back to reading from the state store + // directly. + history, err = c.fetchHistory(ctx, instanceID) + if err != nil { + return nil, err + } + } else { + history = resp.Events + } + + // Sort: EventId if both present, else Timestamp + sort.SliceStable(history, func(i, j int) bool { + ei, ej := history[i], history[j] + if ei.EventId > 0 && ej.EventId > 0 { + return ei.EventId < ej.EventId + } + ti, tj := ei.GetTimestamp().AsTime(), ej.GetTimestamp().AsTime() + if !ti.Equal(tj) { + return ti.Before(tj) + } + return ei.EventId < ej.EventId + }) + + return history, nil +} + +func (c *Client) metaKeysFromDB(ctx context.Context) ([]string, error) { + if c.dbConnString == nil { + return nil, fmt.Errorf("connection string is required for all database drivers for Dapr pre v1.17") + } + + var comps []v1alpha1.Component + if c.kubernetesMode { + kclient, err := kubernetes.DaprClient() + if err != nil { + return nil, err + } + + kcomps, err := kubernetes.ListComponents(kclient, c.ns) + if err != nil { + return nil, err + } + comps = kcomps.Items + } else { + var err error + comps, err = loader.NewLocalLoader(c.appID, c.resourcePaths).Load(ctx) + if err != nil { + return nil, err + } + } + var comp *v1alpha1.Component for _, c := range comps { for _, meta := range c.Spec.Metadata { @@ -175,7 +316,7 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) } if comp == nil { - return nil, fmt.Errorf("no state store configured for app id %s", appID) + return nil, fmt.Errorf("no actor state store configured for app id %s", c.appID) } driver, err := driverFromType(comp.Spec.Type) @@ -183,11 +324,6 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) return nil, err } - client, err := client.NewClientWithAddress("localhost:" + port) - if err != nil { - return nil, err - } - var tableName *string for _, meta := range comp.Spec.Metadata { switch meta.Name { @@ -196,11 +332,58 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) } } - return &Client{ - Dapr: client, - StateStoreDriver: driver, - TableName: tableName, - }, nil + switch { + case isSQLDriver(driver): + if tableName == nil { + tableName = ptr.Of("state") + } + + sqldb, err := db.SQL(ctx, driver, *c.dbConnString) + if err != nil { + return nil, err + } + defer sqldb.Close() + + key := "key" + if driver == "mysql" { + key = "id" + } + + return db.ListSQL(ctx, sqldb, *tableName, key, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + case driver == "redis": + client, err := db.Redis(ctx, *c.dbConnString) + if err != nil { + return nil, err + } + + return db.ListRedis(ctx, client, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + case driver == "mongodb": + client, err := db.Mongo(ctx, *c.dbConnString) + if err != nil { + return nil, err + } + + collectionName := "daprCollection" + if tableName != nil { + collectionName = *tableName + } + + return db.ListMongo(ctx, client.Database("daprStore"), collectionName, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + default: + return nil, fmt.Errorf("unsupported driver: %s", driver) + } } func driverFromType(v string) (string, error) { @@ -226,7 +409,7 @@ func driverFromType(v string) (string, error) { } } -func IsSQLDriver(driver string) bool { +func isSQLDriver(driver string) bool { return slices.Contains([]string{ "mysql", "pgx", @@ -235,3 +418,74 @@ func IsSQLDriver(driver string) bool { "oracle", }, driver) } + +func (c *Client) fetchHistory(ctx context.Context, instanceID string) ([]*protos.HistoryEvent, error) { + + actorType := "dapr.internal." + c.ns + "." + c.appID + ".workflow" + + var events []*protos.HistoryEvent + for startIndex := 0; startIndex <= 1; startIndex++ { + if len(events) > 0 { + break + } + + for i := startIndex; i < maxHistoryEntries; i++ { + key := fmt.Sprintf("history-%06d", i) + + resp, err := c.Dapr.GetActorState(ctx, &client.GetActorStateRequest{ + ActorType: actorType, + ActorID: instanceID, + KeyName: key, + }) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return nil, err + } + break + } + + if resp == nil || len(resp.Data) == 0 { + break + } + + var event protos.HistoryEvent + if err = decodeKey(resp.Data, &event); err != nil { + return nil, fmt.Errorf("failed to decode history event %s: %w", key, err) + } + + events = append(events, &event) + } + } + + return events, nil +} + +func decodeKey(data []byte, item proto.Message) error { + if len(data) == 0 { + return fmt.Errorf("empty value") + } + + if err := protojson.Unmarshal(data, item); err == nil { + return nil + } + + if unquoted, err := UnquoteJSON(data); err == nil { + if err := protojson.Unmarshal([]byte(unquoted), item); err == nil { + return nil + } + } + + if err := proto.Unmarshal(data, item); err == nil { + return nil + } + + return fmt.Errorf("unable to decode history event (len=%d)", len(data)) +} + +func UnquoteJSON(data []byte) (string, error) { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return "", err + } + return s, nil +} diff --git a/pkg/workflow/events.go b/pkg/workflow/events.go index 4080f7fcc..a21eae847 100644 --- a/pkg/workflow/events.go +++ b/pkg/workflow/events.go @@ -42,14 +42,12 @@ func RaiseEvent(ctx context.Context, opts RaiseEventOptions) error { } defer cli.Cancel() - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - var wopts []workflow.RaiseEventOptions if opts.Input != nil { wopts = append(wopts, workflow.WithEventPayload(*opts.Input)) } - return wf.RaiseEvent(ctx, opts.InstanceID, opts.Name, wopts...) + return cli.WF.RaiseEvent(ctx, opts.InstanceID, opts.Name, wopts...) } type SuspendOptions struct { diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index b8a4f8f2b..348650e90 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -15,15 +15,11 @@ package workflow import ( "context" - "encoding/json" - "errors" "fmt" "sort" "strings" "time" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/wrapperspb" "github.com/dapr/cli/cmd/runtime" @@ -31,12 +27,9 @@ import ( "github.com/dapr/cli/utils" "github.com/dapr/durabletask-go/api/protos" "github.com/dapr/durabletask-go/workflow" - "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" ) -const maxHistoryEntries = 100 - type HistoryOptions struct { KubernetesMode bool Namespace string @@ -116,28 +109,11 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide } defer cli.Cancel() - history, err := fetchHistory(ctx, - cli.Dapr, - "dapr.internal."+opts.Namespace+"."+opts.AppID+".workflow", - opts.InstanceID, - ) + history, err := cli.InstanceHistory(ctx, opts.InstanceID) if err != nil { return nil, err } - // Sort: EventId if both present, else Timestamp - sort.SliceStable(history, func(i, j int) bool { - ei, ej := history[i], history[j] - if ei.EventId > 0 && ej.EventId > 0 { - return ei.EventId < ej.EventId - } - ti, tj := ei.GetTimestamp().AsTime(), ej.GetTimestamp().AsTime() - if !ti.Equal(tj) { - return ti.Before(tj) - } - return ei.EventId < ej.EventId - }) - var rows []*HistoryOutputWide var prevTs time.Time replay := 0 @@ -265,74 +241,6 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide return rows, nil } -func fetchHistory(ctx context.Context, cl client.Client, actorType, instanceID string) ([]*protos.HistoryEvent, error) { - var events []*protos.HistoryEvent - for startIndex := 0; startIndex <= 1; startIndex++ { - if len(events) > 0 { - break - } - - for i := startIndex; i < maxHistoryEntries; i++ { - key := fmt.Sprintf("history-%06d", i) - - resp, err := cl.GetActorState(ctx, &client.GetActorStateRequest{ - ActorType: actorType, - ActorID: instanceID, - KeyName: key, - }) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - return nil, err - } - break - } - - if resp == nil || len(resp.Data) == 0 { - break - } - - var event protos.HistoryEvent - if err = decodeKey(resp.Data, &event); err != nil { - return nil, fmt.Errorf("failed to decode history event %s: %w", key, err) - } - - events = append(events, &event) - } - } - - return events, nil -} - -func decodeKey(data []byte, item proto.Message) error { - if len(data) == 0 { - return fmt.Errorf("empty value") - } - - if err := protojson.Unmarshal(data, item); err == nil { - return nil - } - - if unquoted, err := unquoteJSON(data); err == nil { - if err := protojson.Unmarshal([]byte(unquoted), item); err == nil { - return nil - } - } - - if err := proto.Unmarshal(data, item); err == nil { - return nil - } - - return fmt.Errorf("unable to decode history event (len=%d)", len(data)) -} - -func unquoteJSON(data []byte) (string, error) { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return "", err - } - return s, nil -} - func eventTypeName(h *protos.HistoryEvent) string { switch h.GetEventType().(type) { case *protos.HistoryEvent_ExecutionStarted: @@ -506,7 +414,7 @@ func trim(ww *wrapperspb.StringValue, limit int) string { return "" } - s, err := unquoteJSON([]byte(ww.Value)) + s, err := dclient.UnquoteJSON([]byte(ww.Value)) if err != nil { s = ww.Value } diff --git a/pkg/workflow/list.go b/pkg/workflow/list.go index 44f40fec0..477512e3c 100644 --- a/pkg/workflow/list.go +++ b/pkg/workflow/list.go @@ -24,8 +24,6 @@ import ( "github.com/dapr/cli/pkg/workflow/dclient" "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/protos" - "github.com/dapr/durabletask-go/workflow" - "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" "k8s.io/apimachinery/pkg/util/duration" ) @@ -94,52 +92,29 @@ func ListShort(ctx context.Context, opts ListOptions) ([]*ListOutputShort, error func ListWide(ctx context.Context, opts ListOptions) ([]*ListOutputWide, error) { dclient, err := dclient.DaprClient(ctx, dclient.Options{ - KubernetesMode: opts.KubernetesMode, - Namespace: opts.Namespace, - AppID: opts.AppID, - RuntimePath: runtime.GetDaprRuntimePath(), + KubernetesMode: opts.KubernetesMode, + Namespace: opts.Namespace, + AppID: opts.AppID, + RuntimePath: runtime.GetDaprRuntimePath(), + DBConnectionString: opts.ConnectionString, }) if err != nil { return nil, fmt.Errorf("failed to create Dapr client: %w", err) } defer dclient.Cancel() - connString := opts.ConnectionString - if connString == nil { - connString = dclient.ConnectionString - } - tableName := opts.TableName - if tableName == nil { - tableName = dclient.TableName - } - - metaKeys, err := metakeys(ctx, DBOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - Driver: dclient.StateStoreDriver, - ConnectionString: connString, - TableName: tableName, - }) + instanceIDs, err := dclient.InstanceIDs(ctx) if err != nil { return nil, err } - return list(ctx, metaKeys, dclient.Dapr, opts) + return list(ctx, instanceIDs, dclient, opts) } -func list(ctx context.Context, metaKeys []string, cl client.Client, opts ListOptions) ([]*ListOutputWide, error) { - wf := workflow.NewClient(cl.GrpcClientConn()) - +func list(ctx context.Context, instanceIDs []string, cl *dclient.Client, opts ListOptions) ([]*ListOutputWide, error) { var listOutput []*ListOutputWide - for _, key := range metaKeys { - split := strings.Split(key, "||") - if len(split) != 4 { - continue - } - - instanceID := split[2] - - resp, err := wf.FetchWorkflowMetadata(ctx, instanceID) + for _, instanceID := range instanceIDs { + resp, err := cl.WF.FetchWorkflowMetadata(ctx, instanceID) if err != nil { return nil, err } @@ -153,6 +128,7 @@ func list(ctx context.Context, metaKeys []string, cl client.Client, opts ListOpt if opts.Filter.MaxAge != nil && resp.CreatedAt.AsTime().Before(*opts.Filter.MaxAge) { continue } + // TODO: @joshvanl: add `WorkflowIsCompleted` func to workflow package. //nolint:govet if opts.Filter.Terminal && !api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*resp))) { diff --git a/pkg/workflow/purge.go b/pkg/workflow/purge.go index 9f84484d3..711473009 100644 --- a/pkg/workflow/purge.go +++ b/pkg/workflow/purge.go @@ -19,11 +19,8 @@ import ( "os" "time" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/dapr/cli/cmd/runtime" "github.com/dapr/cli/pkg/print" - "github.com/dapr/cli/pkg/scheduler" "github.com/dapr/cli/pkg/workflow/dclient" "github.com/dapr/durabletask-go/workflow" ) @@ -36,29 +33,20 @@ type PurgeOptions struct { InstanceIDs []string AllOlderThan *time.Time All bool + Force bool ConnectionString *string TableName *string } func Purge(ctx context.Context, opts PurgeOptions) error { - cli, err := dclient.DaprClient(ctx, dclient.Options{ - KubernetesMode: opts.KubernetesMode, - Namespace: opts.Namespace, - AppID: opts.AppID, - RuntimePath: runtime.GetDaprRuntimePath(), - }) - if err != nil { - return err - } - defer cli.Cancel() - var toPurge []string if len(opts.InstanceIDs) > 0 { toPurge = opts.InstanceIDs } else { var list []*ListOutputWide + var err error list, err = ListWide(ctx, ListOptions{ KubernetesMode: opts.KubernetesMode, Namespace: opts.Namespace, @@ -88,41 +76,25 @@ func Purge(ctx context.Context, opts PurgeOptions) error { } } - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - - etcdClient, cancel, err := scheduler.EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + cli, err := dclient.DaprClient(ctx, dclient.Options{ + KubernetesMode: opts.KubernetesMode, + Namespace: opts.Namespace, + AppID: opts.AppID, + RuntimePath: runtime.GetDaprRuntimePath(), + DBConnectionString: opts.ConnectionString, + }) if err != nil { return err } - defer cancel() + defer cli.Cancel() print.InfoStatusEvent(os.Stdout, "Purging %d workflow instance(s)", len(toPurge)) for _, id := range toPurge { - if err = wf.PurgeWorkflowState(ctx, id); err != nil { + if err = cli.WF.PurgeWorkflowState(ctx, id, workflow.WithForcePurge(opts.Force)); err != nil { return fmt.Errorf("%s: %w", id, err) } - paths := []string{ - fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id), - } - - oopts := make([]clientv3.Op, 0, len(paths)) - for _, path := range paths { - oopts = append(oopts, clientv3.OpDelete(path, - clientv3.WithPrefix(), - clientv3.WithPrevKV(), - clientv3.WithKeysOnly(), - )) - } - - if _, err = etcdClient.Txn(ctx).Then(oopts...).Commit(); err != nil { - return err - } - print.SuccessStatusEvent(os.Stdout, "Purged workflow instance %q", id) } diff --git a/pkg/workflow/rerun.go b/pkg/workflow/rerun.go index 02abc2555..b262d0383 100644 --- a/pkg/workflow/rerun.go +++ b/pkg/workflow/rerun.go @@ -43,8 +43,6 @@ func ReRun(ctx context.Context, opts ReRunOptions) (string, error) { } defer cli.Cancel() - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - var wopts []workflow.RerunOptions if opts.NewInstanceID != nil { wopts = append(wopts, workflow.WithRerunNewInstanceID(*opts.NewInstanceID)) @@ -53,5 +51,5 @@ func ReRun(ctx context.Context, opts ReRunOptions) (string, error) { wopts = append(wopts, workflow.WithRerunInput(*opts.Input)) } - return wf.RerunWorkflowFromEvent(ctx, opts.InstanceID, opts.EventID, wopts...) + return cli.WF.RerunWorkflowFromEvent(ctx, opts.InstanceID, opts.EventID, wopts...) } diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go deleted file mode 100644 index 033b89d58..000000000 --- a/pkg/workflow/workflow.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Copyright 2025 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package workflow - -import ( - "context" - "fmt" - - "github.com/dapr/cli/pkg/workflow/db" - "github.com/dapr/cli/pkg/workflow/dclient" -) - -type DBOptions struct { - Namespace string - AppID string - Driver string - ConnectionString *string - TableName *string -} - -func metakeys(ctx context.Context, opts DBOptions) ([]string, error) { - if opts.ConnectionString == nil { - return nil, fmt.Errorf("connection string is required for all drivers") - } - - switch { - case dclient.IsSQLDriver(opts.Driver): - tableName := "state" - if opts.TableName != nil { - tableName = *opts.TableName - } - - sqldb, err := db.SQL(ctx, opts.Driver, *opts.ConnectionString) - if err != nil { - return nil, err - } - defer sqldb.Close() - - return db.ListSQL(ctx, sqldb, tableName, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - case opts.Driver == "redis": - client, err := db.Redis(ctx, *opts.ConnectionString) - if err != nil { - return nil, err - } - - return db.ListRedis(ctx, client, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - case opts.Driver == "mongodb": - client, err := db.Mongo(ctx, *opts.ConnectionString) - if err != nil { - return nil, err - } - - collectionName := "daprCollection" - if opts.TableName != nil { - collectionName = *opts.TableName - } - - return db.ListMongo(ctx, client.Database("daprStore"), collectionName, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - default: - return nil, fmt.Errorf("unsupported driver: %s", opts.Driver) - } -}