diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go index a44fcdcff..713f55a55 100644 --- a/apps/workspace-engine/pkg/events/events.go +++ b/apps/workspace-engine/pkg/events/events.go @@ -18,6 +18,7 @@ import ( "workspace-engine/pkg/events/handler/system" "workspace-engine/pkg/events/handler/tick" "workspace-engine/pkg/events/handler/userapprovalrecords" + "workspace-engine/pkg/events/handler/workflows" ) var handlers = handler.HandlerRegistry{ @@ -87,6 +88,9 @@ var handlers = handler.HandlerRegistry{ handler.WorkspaceTick: tick.HandleWorkspaceTick, handler.ReleaseTargetDeploy: redeploy.HandleReleaseTargetDeploy, + + handler.WorkflowTemplateCreate: workflows.HandleWorkflowTemplateCreated, + handler.WorkflowCreate: workflows.HandleWorkflowCreated, } func NewEventHandler() *handler.EventListener { diff --git a/apps/workspace-engine/pkg/events/handler/handler.go b/apps/workspace-engine/pkg/events/handler/handler.go index 18b0b2d82..d8d9a159a 100644 --- a/apps/workspace-engine/pkg/events/handler/handler.go +++ b/apps/workspace-engine/pkg/events/handler/handler.go @@ -90,6 +90,9 @@ const ( WorkspaceSave EventType = "workspace.save" ReleaseTargetDeploy EventType = "release-target.deploy" + + WorkflowTemplateCreate EventType = "workflow-template.created" + WorkflowCreate EventType = "workflow.created" ) // RawEvent represents the raw event data received from Kafka messages diff --git a/apps/workspace-engine/pkg/events/handler/jobs/jobs.go b/apps/workspace-engine/pkg/events/handler/jobs/jobs.go index 93019aea6..8d717f21c 100644 --- a/apps/workspace-engine/pkg/events/handler/jobs/jobs.go +++ b/apps/workspace-engine/pkg/events/handler/jobs/jobs.go @@ -111,6 +111,17 @@ func triggerActionsOnStatusChange(ctx context.Context, ws *workspace.Workspace, if err != nil { log.Error("error triggering actions on status change", "job_id", job.Id, "from", previousStatus, "to", job.Status, "error", err.Error()) } + + if job.Status != oapi.JobStatusSuccessful { + return + } + + err = ws. + WorkflowActionOrchestrator(). + OnJobSuccess(ctx, job) + if err != nil { + log.Error("error triggering actions on job success", "job_id", job.Id, "error", err.Error()) + } } // invalidateCacheForJob invalidates the release target state cache for the job's release target. diff --git a/apps/workspace-engine/pkg/events/handler/workflows/workflows.go b/apps/workspace-engine/pkg/events/handler/workflows/workflows.go new file mode 100644 index 000000000..6bd57c1a0 --- /dev/null +++ b/apps/workspace-engine/pkg/events/handler/workflows/workflows.go @@ -0,0 +1,38 @@ +package workflows + +import ( + "context" + "encoding/json" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace" +) + +func HandleWorkflowTemplateCreated( + ctx context.Context, + ws *workspace.Workspace, + event handler.RawEvent, +) error { + workflowTemplate := &oapi.WorkflowTemplate{} + if err := json.Unmarshal(event.Data, workflowTemplate); err != nil { + return err + } + ws.WorkflowTemplates().Upsert(ctx, workflowTemplate) + return nil +} + +func HandleWorkflowCreated( + ctx context.Context, + ws *workspace.Workspace, + event handler.RawEvent, +) error { + workflow := &oapi.Workflow{} + if err := json.Unmarshal(event.Data, workflow); err != nil { + return err + } + + if _, err := ws.WorkflowManager().CreateWorkflow(ctx, workflow.WorkflowTemplateId, workflow.Inputs); err != nil { + return err + } + return nil +} diff --git a/apps/workspace-engine/pkg/workspace/store/workflows.go b/apps/workspace-engine/pkg/workspace/store/workflows.go index 99a3b52e7..f5cd3224b 100644 --- a/apps/workspace-engine/pkg/workspace/store/workflows.go +++ b/apps/workspace-engine/pkg/workspace/store/workflows.go @@ -26,6 +26,16 @@ func (w *Workflows) Get(id string) (*oapi.Workflow, bool) { return w.repo.Workflows.Get(id) } +func (w *Workflows) GetByTemplateID(templateID string) map[string]*oapi.Workflow { + workflows := make(map[string]*oapi.Workflow) + for _, workflow := range w.repo.Workflows.Items() { + if workflow.WorkflowTemplateId == templateID { + workflows[workflow.Id] = workflow + } + } + return workflows +} + func (w *Workflows) Upsert(ctx context.Context, workflow *oapi.Workflow) { w.repo.Workflows.Set(workflow.Id, workflow) w.store.changeset.RecordUpsert(workflow) diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/action.go b/apps/workspace-engine/pkg/workspace/workflowmanager/action.go new file mode 100644 index 000000000..cd611d595 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/action.go @@ -0,0 +1,42 @@ +package workflowmanager + +import ( + "context" + "fmt" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/store" +) + +type WorkflowManagerAction struct { + store *store.Store + manager *Manager +} + +func NewWorkflowManagerAction(store *store.Store, manager *Manager) *WorkflowManagerAction { + return &WorkflowManagerAction{ + store: store, + manager: manager, + } +} + +func (w *WorkflowManagerAction) Name() string { + return "workflowmanager" +} + +func (w *WorkflowManagerAction) Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error { + if trigger != ActionTriggerJobSuccess { + return nil + } + + workflowStep, ok := w.store.WorkflowSteps.Get(job.WorkflowStepId) + if !ok { + return nil + } + + workflow, ok := w.store.Workflows.Get(workflowStep.WorkflowId) + if !ok { + return fmt.Errorf("workflow %s not found for step %s", workflowStep.WorkflowId, workflowStep.Id) + } + + return w.manager.ReconcileWorkflow(ctx, workflow) +} diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_orchestrator.go b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_orchestrator.go new file mode 100644 index 000000000..ec83ea1a2 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_orchestrator.go @@ -0,0 +1,44 @@ +package workflowmanager + +import ( + "context" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/store" +) + +type ActionTrigger string + +const ( + ActionTriggerJobSuccess ActionTrigger = "job.success" +) + +type WorkflowAction interface { + Name() string + Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error +} + +type WorkflowActionOrchestrator struct { + store *store.Store + actions []WorkflowAction +} + +func NewWorkflowActionOrchestrator(store *store.Store) *WorkflowActionOrchestrator { + return &WorkflowActionOrchestrator{ + store: store, + actions: make([]WorkflowAction, 0), + } +} + +func (w *WorkflowActionOrchestrator) RegisterAction(action WorkflowAction) *WorkflowActionOrchestrator { + w.actions = append(w.actions, action) + return w +} + +func (w *WorkflowActionOrchestrator) OnJobSuccess(ctx context.Context, job *oapi.Job) error { + for _, action := range w.actions { + if err := action.Execute(ctx, ActionTriggerJobSuccess, job); err != nil { + return err + } + } + return nil +} diff --git a/apps/workspace-engine/pkg/workspace/workspace.go b/apps/workspace-engine/pkg/workspace/workspace.go index 75381b198..727e42c00 100644 --- a/apps/workspace-engine/pkg/workspace/workspace.go +++ b/apps/workspace-engine/pkg/workspace/workspace.go @@ -15,6 +15,7 @@ import ( "workspace-engine/pkg/workspace/releasemanager/trace/spanstore" "workspace-engine/pkg/workspace/releasemanager/verification" "workspace-engine/pkg/workspace/store" + "workspace-engine/pkg/workspace/workflowmanager" ) func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace { @@ -38,6 +39,7 @@ func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace // Create release manager with trace store (will panic if nil) ws.releasemanager = releasemanager.New(s, ws.traceStore, ws.verificationManager, ws.jobAgentRegistry) + ws.workflowManager = workflowmanager.NewWorkflowManager(s, ws.jobAgentRegistry) reconcileFn := func(ctx context.Context, targets []*oapi.ReleaseTarget) error { return ws.releasemanager.ReconcileTargets(ctx, targets, releasemanager.WithTrigger(trace.TriggerJobSuccess)) @@ -50,25 +52,35 @@ func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace RegisterAction(environmentprogression.NewEnvironmentProgressionAction(s, reconcileFn)). RegisterAction(rollback.NewRollbackAction(s, ws.jobAgentRegistry)) + ws.workflowActionOrchestrator = workflowmanager. + NewWorkflowActionOrchestrator(s). + RegisterAction(workflowmanager.NewWorkflowManagerAction(s, ws.workflowManager)) + return ws } type Workspace struct { ID string - changeset *statechange.ChangeSet[any] - store *store.Store - verificationManager *verification.Manager - releasemanager *releasemanager.Manager - traceStore releasemanager.PersistenceStore - actionOrchestrator *action.Orchestrator - jobAgentRegistry *jobagents.Registry + changeset *statechange.ChangeSet[any] + store *store.Store + verificationManager *verification.Manager + workflowManager *workflowmanager.Manager + releasemanager *releasemanager.Manager + traceStore releasemanager.PersistenceStore + actionOrchestrator *action.Orchestrator + workflowActionOrchestrator *workflowmanager.WorkflowActionOrchestrator + jobAgentRegistry *jobagents.Registry } func (w *Workspace) ActionOrchestrator() *action.Orchestrator { return w.actionOrchestrator } +func (w *Workspace) WorkflowActionOrchestrator() *workflowmanager.WorkflowActionOrchestrator { + return w.workflowActionOrchestrator +} + func (w *Workspace) Store() *store.Store { return w.store } @@ -85,6 +97,10 @@ func (w *Workspace) VerificationManager() *verification.Manager { return w.verificationManager } +func (w *Workspace) WorkflowManager() *workflowmanager.Manager { + return w.workflowManager +} + func (w *Workspace) JobAgentRegistry() *jobagents.Registry { return w.jobAgentRegistry } @@ -164,3 +180,19 @@ func (w *Workspace) DeploymentVariableValues() *store.DeploymentVariableValues { func (w *Workspace) Relations() *store.Relations { return w.store.Relations } + +func (w *Workspace) WorkflowTemplates() *store.WorkflowTemplates { + return w.store.WorkflowTemplates +} + +func (w *Workspace) WorkflowStepTemplates() *store.WorkflowStepTemplates { + return w.store.WorkflowStepTemplates +} + +func (w *Workspace) Workflows() *store.Workflows { + return w.store.Workflows +} + +func (w *Workspace) WorkflowSteps() *store.WorkflowSteps { + return w.store.WorkflowSteps +} diff --git a/apps/workspace-engine/test/e2e/engine_workflow_test.go b/apps/workspace-engine/test/e2e/engine_workflow_test.go new file mode 100644 index 000000000..cbe1ea9aa --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_workflow_test.go @@ -0,0 +1,407 @@ +package e2e + +import ( + "context" + "testing" + "time" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/workflowmanager" + "workspace-engine/test/integration" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestEngine_Workflow_BasicFlow(t *testing.T) { + jobAgentID := uuid.New().String() + workflowTemplateID := uuid.New().String() + workflowStepTemplateID := uuid.New().String() + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + integration.JobAgentName("Test Agent"), + ), + integration.WithWorkflowTemplate( + integration.WorkflowTemplateID(workflowTemplateID), + integration.WithWorkflowStringInput( + integration.WorkflowStringInputName("input-1"), + integration.WorkflowStringInputDefault("default-1"), + ), + integration.WithWorkflowStepTemplate( + integration.WorkflowStepTemplateID(workflowStepTemplateID), + integration.WorkflowStepTemplateJobAgentID(jobAgentID), + integration.WorkflowStepTemplateJobAgentConfig(map[string]any{ + "delaySeconds": 10, + }), + integration.WorkflowStepTemplateName("Test Step 1"), + ), + ), + ) + + ctx := context.Background() + + workflowCreate := &oapi.Workflow{ + WorkflowTemplateId: workflowTemplateID, + Inputs: map[string]any{ + "input-1": "custom-1", + }, + } + engine.PushEvent(ctx, handler.WorkflowCreate, workflowCreate) + + workflows := engine.Workspace().Workflows().GetByTemplateID(workflowTemplateID) + assert.Len(t, workflows, 1) + workflowsSlice := make([]*oapi.Workflow, 0) + for _, workflow := range workflows { + workflowsSlice = append(workflowsSlice, workflow) + } + workflow := workflowsSlice[0] + assert.NotNil(t, workflow) + assert.Equal(t, workflowTemplateID, workflow.WorkflowTemplateId) + assert.Equal(t, map[string]any{ + "input-1": "custom-1", + }, workflow.Inputs) + + workflowSteps := engine.Workspace().WorkflowSteps().GetByWorkflowId(workflow.Id) + assert.Len(t, workflowSteps, 1) + assert.Equal(t, 0, workflowSteps[0].Index) + + jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflowSteps[0].Id) + assert.Len(t, jobs, 1) + assert.Equal(t, oapi.JobStatusPending, jobs[0].Status) + assert.Equal(t, workflowSteps[0].Id, jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID, jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(10), + }, jobs[0].JobAgentConfig) +} + +func TestEngine_Workflow_MultipleInputs(t *testing.T) { + jobAgentID := uuid.New().String() + workflowTemplateID := uuid.New().String() + workflowStepTemplateID := uuid.New().String() + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + integration.JobAgentName("Test Agent"), + ), + integration.WithWorkflowTemplate( + integration.WorkflowTemplateID(workflowTemplateID), + integration.WithWorkflowStringInput( + integration.WorkflowStringInputName("input-1"), + integration.WorkflowStringInputDefault("default-1"), + ), + integration.WithWorkflowNumberInput( + integration.WorkflowNumberInputName("input-2"), + integration.WorkflowNumberInputDefault(2), + ), + integration.WithWorkflowBooleanInput( + integration.WorkflowBooleanInputName("input-3"), + integration.WorkflowBooleanInputDefault(true), + ), + integration.WithWorkflowStepTemplate( + integration.WorkflowStepTemplateID(workflowStepTemplateID), + integration.WorkflowStepTemplateJobAgentID(jobAgentID), + integration.WorkflowStepTemplateJobAgentConfig(map[string]any{ + "delaySeconds": 10, + }), + integration.WorkflowStepTemplateName("Test Step 1"), + ), + ), + ) + + ctx := context.Background() + + workflowCreate := &oapi.Workflow{ + WorkflowTemplateId: workflowTemplateID, + Inputs: map[string]any{ + "input-1": "custom-1", + "input-2": 5, + "input-3": false, + }, + } + engine.PushEvent(ctx, handler.WorkflowCreate, workflowCreate) + + workflows := engine.Workspace().Workflows().GetByTemplateID(workflowTemplateID) + assert.Len(t, workflows, 1) + workflowsSlice := make([]*oapi.Workflow, 0) + for _, workflow := range workflows { + workflowsSlice = append(workflowsSlice, workflow) + } + workflow := workflowsSlice[0] + assert.NotNil(t, workflow) + assert.Equal(t, workflowTemplateID, workflow.WorkflowTemplateId) + assert.Equal(t, map[string]any{ + "input-1": "custom-1", + "input-2": float64(5), + "input-3": false, + }, workflow.Inputs) + + workflowSteps := engine.Workspace().WorkflowSteps().GetByWorkflowId(workflow.Id) + assert.Len(t, workflowSteps, 1) + assert.Equal(t, 0, workflowSteps[0].Index) + + jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflowSteps[0].Id) + assert.Len(t, jobs, 1) + assert.Equal(t, oapi.JobStatusPending, jobs[0].Status) + assert.Equal(t, workflowSteps[0].Id, jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID, jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(10), + }, jobs[0].JobAgentConfig) +} + +func TestEngine_Workflow_MultipleSteps(t *testing.T) { + jobAgentID1 := uuid.New().String() + jobAgentID2 := uuid.New().String() + workflowTemplateID := uuid.New().String() + workflowStepTemplateID1 := uuid.New().String() + workflowStepTemplateID2 := uuid.New().String() + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID1), + integration.JobAgentName("Test Agent 1"), + ), + integration.WithJobAgent( + integration.JobAgentID(jobAgentID2), + integration.JobAgentName("Test Agent 2"), + ), + integration.WithWorkflowTemplate( + integration.WorkflowTemplateID(workflowTemplateID), + integration.WithWorkflowStepTemplate( + integration.WorkflowStepTemplateID(workflowStepTemplateID1), + integration.WorkflowStepTemplateJobAgentID(jobAgentID1), + integration.WorkflowStepTemplateJobAgentConfig(map[string]any{ + "delaySeconds": 10, + }), + integration.WorkflowStepTemplateName("Test Step 1"), + ), + integration.WithWorkflowStepTemplate( + integration.WorkflowStepTemplateID(workflowStepTemplateID2), + integration.WorkflowStepTemplateJobAgentID(jobAgentID2), + integration.WorkflowStepTemplateJobAgentConfig(map[string]any{ + "delaySeconds": 20, + }), + integration.WorkflowStepTemplateName("Test Step 2"), + ), + ), + ) + + ctx := context.Background() + + workflowCreate := &oapi.Workflow{ + WorkflowTemplateId: workflowTemplateID, + } + engine.PushEvent(ctx, handler.WorkflowCreate, workflowCreate) + + workflows := engine.Workspace().Workflows().GetByTemplateID(workflowTemplateID) + assert.Len(t, workflows, 1) + workflowsSlice := make([]*oapi.Workflow, 0) + for _, workflow := range workflows { + workflowsSlice = append(workflowsSlice, workflow) + } + workflow := workflowsSlice[0] + assert.NotNil(t, workflow) + assert.Equal(t, workflowTemplateID, workflow.WorkflowTemplateId) + + workflowSteps := engine.Workspace().WorkflowSteps().GetByWorkflowId(workflow.Id) + assert.Len(t, workflowSteps, 2) + assert.Equal(t, 0, workflowSteps[0].Index) + assert.Equal(t, 1, workflowSteps[1].Index) + + step1jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflowSteps[0].Id) + assert.Len(t, step1jobs, 1) + assert.Equal(t, oapi.JobStatusPending, step1jobs[0].Status) + assert.Equal(t, workflowSteps[0].Id, step1jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID1, step1jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(10), + }, step1jobs[0].JobAgentConfig) + + step2jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflowSteps[1].Id) + assert.Len(t, step2jobs, 0) + + completedAt := time.Now() + jobUpdateEvent := &oapi.JobUpdateEvent{ + Id: &step1jobs[0].Id, + Job: oapi.Job{ + Id: step1jobs[0].Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &completedAt, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent) + + step2jobs = engine.Workspace().Jobs().GetByWorkflowStepId(workflowSteps[1].Id) + assert.Len(t, step2jobs, 1) + assert.Equal(t, oapi.JobStatusPending, step2jobs[0].Status) + assert.Equal(t, workflowSteps[1].Id, step2jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID2, step2jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(20), + }, step2jobs[0].JobAgentConfig) + + wfv, err := workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow.Id) + assert.NoError(t, err) + assert.NotNil(t, wfv) + assert.False(t, wfv.IsComplete()) + + completedAt2 := time.Now() + jobUpdateEvent2 := &oapi.JobUpdateEvent{ + Id: &step2jobs[0].Id, + Job: oapi.Job{ + Id: step2jobs[0].Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &completedAt2, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent2) + + wfv, err = workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow.Id) + assert.NoError(t, err) + assert.NotNil(t, wfv) + assert.True(t, wfv.IsComplete()) +} + +func TestEngine_Workflow_ConcurrentWorkflows(t *testing.T) { + jobAgentID := uuid.New().String() + workflowTemplateID := uuid.New().String() + workflowStepTemplateID := uuid.New().String() + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + integration.JobAgentName("Test Agent"), + ), + integration.WithWorkflowTemplate( + integration.WorkflowTemplateID(workflowTemplateID), + integration.WithWorkflowStringInput( + integration.WorkflowStringInputName("input-1"), + integration.WorkflowStringInputDefault("default-1"), + ), + integration.WithWorkflowStepTemplate( + integration.WorkflowStepTemplateID(workflowStepTemplateID), + integration.WorkflowStepTemplateJobAgentID(jobAgentID), + integration.WorkflowStepTemplateJobAgentConfig(map[string]any{ + "delaySeconds": 10, + }), + integration.WorkflowStepTemplateName("Test Step 1"), + ), + ), + ) + + ctx := context.Background() + + workflow1Create := &oapi.Workflow{ + WorkflowTemplateId: workflowTemplateID, + Inputs: map[string]any{ + "input-1": "custom-1", + }, + } + engine.PushEvent(ctx, handler.WorkflowCreate, workflow1Create) + + workflow2Create := &oapi.Workflow{ + WorkflowTemplateId: workflowTemplateID, + Inputs: map[string]any{ + "input-1": "custom-2", + }, + } + engine.PushEvent(ctx, handler.WorkflowCreate, workflow2Create) + + workflows := engine.Workspace().Workflows().GetByTemplateID(workflowTemplateID) + assert.Len(t, workflows, 2) + assert.Equal(t, 2, len(workflows)) + + workflowsSlice := make([]*oapi.Workflow, 0) + for _, workflow := range workflows { + workflowsSlice = append(workflowsSlice, workflow) + } + + var workflow1 *oapi.Workflow + var workflow2 *oapi.Workflow + + for _, workflow := range workflowsSlice { + if workflow.Inputs["input-1"] == "custom-1" { + workflow1 = workflow + } else { + workflow2 = workflow + } + } + + workflow1Steps := engine.Workspace().WorkflowSteps().GetByWorkflowId(workflow1.Id) + assert.Len(t, workflow1Steps, 1) + assert.Equal(t, 0, workflow1Steps[0].Index) + + workflow2Steps := engine.Workspace().WorkflowSteps().GetByWorkflowId(workflow2.Id) + assert.Len(t, workflow2Steps, 1) + assert.Equal(t, 0, workflow2Steps[0].Index) + + workflow1Jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflow1Steps[0].Id) + assert.Len(t, workflow1Jobs, 1) + assert.Equal(t, oapi.JobStatusPending, workflow1Jobs[0].Status) + assert.Equal(t, workflow1Steps[0].Id, workflow1Jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID, workflow1Jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(10), + }, workflow1Jobs[0].JobAgentConfig) + + completedAt1 := time.Now() + jobUpdateEvent1 := &oapi.JobUpdateEvent{ + Id: &workflow1Jobs[0].Id, + Job: oapi.Job{ + Id: workflow1Jobs[0].Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &completedAt1, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent1) + + workflow2Jobs := engine.Workspace().Jobs().GetByWorkflowStepId(workflow2Steps[0].Id) + assert.Len(t, workflow2Jobs, 1) + assert.Equal(t, oapi.JobStatusPending, workflow2Jobs[0].Status) + assert.Equal(t, workflow2Steps[0].Id, workflow2Jobs[0].WorkflowStepId) + assert.Equal(t, jobAgentID, workflow2Jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(10), + }, workflow2Jobs[0].JobAgentConfig) + + completedAt2 := time.Now() + jobUpdateEvent2 := &oapi.JobUpdateEvent{ + Id: &workflow2Jobs[0].Id, + Job: oapi.Job{ + Id: workflow2Jobs[0].Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &completedAt2, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + } + engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent2) + + wfv1, err := workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow1.Id) + assert.NoError(t, err) + assert.NotNil(t, wfv1) + assert.True(t, wfv1.IsComplete()) + + wfv2, err := workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow2.Id) + assert.NoError(t, err) + assert.NotNil(t, wfv2) + assert.True(t, wfv2.IsComplete()) +} diff --git a/apps/workspace-engine/test/integration/creators/workflow.go b/apps/workspace-engine/test/integration/creators/workflow.go new file mode 100644 index 000000000..619ceeca2 --- /dev/null +++ b/apps/workspace-engine/test/integration/creators/workflow.go @@ -0,0 +1,69 @@ +package creators + +import ( + "fmt" + "workspace-engine/pkg/oapi" + + "github.com/google/uuid" +) + +func NewWorkflowTemplate(workspaceID string) *oapi.WorkflowTemplate { + id := uuid.New().String() + idSubstring := id[:8] + + workflowTemplate := &oapi.WorkflowTemplate{ + Id: id, + Name: fmt.Sprintf("workflow-template-%s", idSubstring), + Inputs: []oapi.WorkflowInput{}, + Steps: []oapi.WorkflowStepTemplate{}, + } + + return workflowTemplate +} + +func NewStringWorkflowInput(workflowTemplateID string) *oapi.WorkflowInput { + input := &oapi.WorkflowInput{} + name := fmt.Sprintf("test-input-%s", uuid.New().String()[:8]) + _ = input.FromWorkflowStringInput(oapi.WorkflowStringInput{ + Name: name, + Type: oapi.String, + Default: "", + }) + return input +} + +func NewNumberWorkflowInput(workflowTemplateID string) *oapi.WorkflowInput { + input := &oapi.WorkflowInput{} + name := fmt.Sprintf("test-input-%s", uuid.New().String()[:8]) + _ = input.FromWorkflowNumberInput(oapi.WorkflowNumberInput{ + Name: name, + Type: oapi.Number, + Default: 0, + }) + return input +} + +func NewBooleanWorkflowInput(workflowTemplateID string) *oapi.WorkflowInput { + input := &oapi.WorkflowInput{} + name := fmt.Sprintf("test-input-%s", uuid.New().String()[:8]) + _ = input.FromWorkflowBooleanInput(oapi.WorkflowBooleanInput{ + Name: name, + Type: oapi.Boolean, + Default: false, + }) + return input +} + +func NewWorkflowStepTemplate(workflowTemplateID string) *oapi.WorkflowStepTemplate { + id := uuid.New().String() + idSubstring := id[:8] + stepTemplate := &oapi.WorkflowStepTemplate{ + Id: id, + Name: fmt.Sprintf("test-step-%s", idSubstring), + JobAgent: oapi.WorkflowJobAgentConfig{ + Id: "", + Config: make(map[string]any), + }, + } + return stepTemplate +} diff --git a/apps/workspace-engine/test/integration/opts.go b/apps/workspace-engine/test/integration/opts.go index f98a75f32..955de7663 100644 --- a/apps/workspace-engine/test/integration/opts.go +++ b/apps/workspace-engine/test/integration/opts.go @@ -2,6 +2,7 @@ package integration import ( "context" + "encoding/json" "fmt" "time" @@ -1322,3 +1323,164 @@ func DeploymentVariableValueReferenceValue(reference string, path []string) Depl dvv.Value = *c.NewValueFromReference(reference, path) } } + +// ===== Workflow Options ===== + +type WorkflowTemplateOption func(*TestWorkspace, *oapi.WorkflowTemplate) + +func WithWorkflowTemplate(options ...WorkflowTemplateOption) WorkspaceOption { + return func(ws *TestWorkspace) error { + wft := c.NewWorkflowTemplate(ws.workspace.ID) + for _, option := range options { + option(ws, wft) + } + ws.PushEvent(context.Background(), handler.WorkflowTemplateCreate, wft) + return nil + } +} + +func WorkflowTemplateName(name string) WorkflowTemplateOption { + return func(_ *TestWorkspace, wft *oapi.WorkflowTemplate) { + wft.Name = name + } +} + +func WorkflowTemplateID(id string) WorkflowTemplateOption { + return func(_ *TestWorkspace, wft *oapi.WorkflowTemplate) { + wft.Id = id + } +} + +type WorkflowInputOption func(*TestWorkspace, *oapi.WorkflowInput) + +func WithWorkflowStringInput(options ...WorkflowInputOption) WorkflowTemplateOption { + return func(ws *TestWorkspace, wft *oapi.WorkflowTemplate) { + input := c.NewStringWorkflowInput(wft.Id) + for _, option := range options { + option(ws, input) + } + wft.Inputs = append(wft.Inputs, *input) + } +} + +func WorkflowStringInputName(name string) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["name"] = name + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +func WorkflowStringInputDefault(defaultValue string) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["default"] = defaultValue + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +func WithWorkflowNumberInput(options ...WorkflowInputOption) WorkflowTemplateOption { + return func(ws *TestWorkspace, wft *oapi.WorkflowTemplate) { + input := c.NewNumberWorkflowInput(wft.Id) + for _, option := range options { + option(ws, input) + } + wft.Inputs = append(wft.Inputs, *input) + } +} + +func WorkflowNumberInputName(name string) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["name"] = name + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +func WorkflowNumberInputDefault(defaultValue float32) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["default"] = defaultValue + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +func WithWorkflowBooleanInput(options ...WorkflowInputOption) WorkflowTemplateOption { + return func(ws *TestWorkspace, wft *oapi.WorkflowTemplate) { + input := c.NewBooleanWorkflowInput(wft.Id) + for _, option := range options { + option(ws, input) + } + wft.Inputs = append(wft.Inputs, *input) + } +} + +func WorkflowBooleanInputName(name string) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["name"] = name + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +func WorkflowBooleanInputDefault(defaultValue bool) WorkflowInputOption { + return func(_ *TestWorkspace, input *oapi.WorkflowInput) { + curr, _ := input.MarshalJSON() + var cfg map[string]any + json.Unmarshal(curr, &cfg) + cfg["default"] = defaultValue + new, _ := json.Marshal(cfg) + _ = input.UnmarshalJSON(new) + } +} + +type WorkflowStepTemplateOption func(*TestWorkspace, *oapi.WorkflowStepTemplate) + +func WithWorkflowStepTemplate(options ...WorkflowStepTemplateOption) WorkflowTemplateOption { + return func(ws *TestWorkspace, wft *oapi.WorkflowTemplate) { + stepTemplate := c.NewWorkflowStepTemplate(wft.Id) + for _, option := range options { + option(ws, stepTemplate) + } + wft.Steps = append(wft.Steps, *stepTemplate) + } +} + +func WorkflowStepTemplateID(id string) WorkflowStepTemplateOption { + return func(_ *TestWorkspace, stepTemplate *oapi.WorkflowStepTemplate) { + stepTemplate.Id = id + } +} + +func WorkflowStepTemplateJobAgentID(id string) WorkflowStepTemplateOption { + return func(_ *TestWorkspace, stepTemplate *oapi.WorkflowStepTemplate) { + stepTemplate.JobAgent.Id = id + } +} + +func WorkflowStepTemplateJobAgentConfig(config map[string]any) WorkflowStepTemplateOption { + return func(_ *TestWorkspace, stepTemplate *oapi.WorkflowStepTemplate) { + stepTemplate.JobAgent.Config = config + } +} + +func WorkflowStepTemplateName(name string) WorkflowStepTemplateOption { + return func(_ *TestWorkspace, stepTemplate *oapi.WorkflowStepTemplate) { + stepTemplate.Name = name + } +}