diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/action.go b/apps/workspace-engine/pkg/workspace/workflowmanager/action.go index 00ed89981..1b3c7af38 100644 --- a/apps/workspace-engine/pkg/workspace/workflowmanager/action.go +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/action.go @@ -2,7 +2,6 @@ package workflowmanager import ( "context" - "fmt" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace/store" ) @@ -24,19 +23,5 @@ func (w *WorkflowManagerAction) Name() string { } func (w *WorkflowManagerAction) Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error { - if trigger != ActionTriggerJobSuccess { - return nil - } - - workflowJob, ok := w.store.WorkflowJobs.Get(job.WorkflowJobId) - if !ok { - return nil - } - - workflow, ok := w.store.Workflows.Get(workflowJob.WorkflowId) - if !ok { - return fmt.Errorf("workflow %s not found for job %s", workflowJob.WorkflowId, workflowJob.Id) - } - - return w.manager.ReconcileWorkflow(ctx, workflow) + return nil } diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go index 22e2ab686..46b15c08d 100644 --- a/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go @@ -24,8 +24,8 @@ func NewWorkflowManager(store *store.Store, jobAgentRegistry *jobagents.Registry } } -func (w *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, inputs map[string]any) (*oapi.Workflow, error) { - workflowTemplate, ok := w.store.WorkflowTemplates.Get(workflowTemplateId) +func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, inputs map[string]any) (*oapi.Workflow, error) { + workflowTemplate, ok := m.store.WorkflowTemplates.Get(workflowTemplateId) if !ok { return nil, fmt.Errorf("workflow template %s not found", workflowTemplateId) } @@ -36,34 +36,37 @@ func (w *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, Inputs: maps.Clone(inputs), } + workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs)) for idx, jobTemplate := range workflowTemplate.Jobs { - job := &oapi.WorkflowJob{ + wfJob := &oapi.WorkflowJob{ Id: uuid.New().String(), WorkflowId: workflow.Id, Index: idx, Ref: jobTemplate.Ref, Config: maps.Clone(jobTemplate.Config), } - w.store.WorkflowJobs.Upsert(ctx, job) + m.store.WorkflowJobs.Upsert(ctx, wfJob) + workflowJobs = append(workflowJobs, wfJob) } - w.store.Workflows.Upsert(ctx, workflow) + m.store.Workflows.Upsert(ctx, workflow) + + for _, wfJob := range workflowJobs { + if err := m.dispatchJob(ctx, wfJob); err != nil { + return workflow, err + } + } - w.ReconcileWorkflow(ctx, workflow) return workflow, nil } -// dispatchJobForStep dispatches a job for the given step func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) error { jobAgent, ok := m.store.JobAgents.Get(wfJob.Ref) if !ok { return fmt.Errorf("job agent %s not found", wfJob.Ref) } - mergedConfig, err := mergeJobAgentConfig( - jobAgent.Config, - wfJob.Config, - ) + mergedConfig, err := mergeJobAgentConfig(jobAgent.Config, wfJob.Config) if err != nil { return fmt.Errorf("failed to merge job agent config: %w", err) } @@ -87,28 +90,6 @@ func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) erro return nil } -// ReconcileWorkflow reconciles a workflow, advancing to the next step if ready. -func (m *Manager) ReconcileWorkflow(ctx context.Context, workflow *oapi.Workflow) error { - wfv, err := NewWorkflowView(m.store, workflow.Id) - if err != nil { - return fmt.Errorf("failed to create workflow view: %w", err) - } - - if wfv.IsComplete() { - return nil - } - if wfv.HasActiveJobs() { - return nil - } - - nextJob := wfv.GetNextJob() - if nextJob == nil { - return nil - } - - return m.dispatchJob(ctx, nextJob) -} - func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) (oapi.JobAgentConfig, error) { mergedConfig := make(map[string]any) for _, config := range configs { diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go b/apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go index c61b308b9..4cb1c88a2 100644 --- a/apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go @@ -36,16 +36,6 @@ func TestWorkflowManager_CreatesNewWorkflow(t *testing.T) { } store.JobAgents.Upsert(ctx, jobAgent1) - jobAgent2 := &oapi.JobAgent{ - Id: "test-job-agent-2", - Name: "test-job-agent-2", - Type: "test-runner", - Config: map[string]any{ - "test-config": "test-value-2", - }, - } - store.JobAgents.Upsert(ctx, jobAgent2) - workflowTemplate := &oapi.WorkflowTemplate{ Id: "test-workflow-template", Name: "test-workflow-template", @@ -93,7 +83,7 @@ func TestWorkflowManager_CreatesNewWorkflow(t *testing.T) { }, job.JobAgentConfig) } -func TestWorkflowManager_ContinuesWorkflowAfterJobComplete(t *testing.T) { +func TestWorkflowManager_DispatchesAllJobsConcurrently(t *testing.T) { ctx := context.Background() store := store.New("test-workspace", statechange.NewChangeSet[any]()) jobAgentRegistry := jobagents.NewRegistry(store, verification.NewManager(store)) @@ -157,65 +147,41 @@ func TestWorkflowManager_ContinuesWorkflowAfterJobComplete(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, wf) assert.Equal(t, "test-workflow-template", wf.WorkflowTemplateId) - assert.Equal(t, map[string]any{ - "test-input": "test-value", - }, wf.Inputs) wfJobs := store.WorkflowJobs.GetByWorkflowId(wf.Id) assert.Len(t, wfJobs, 2) assert.Equal(t, 0, wfJobs[0].Index) assert.Equal(t, 1, wfJobs[1].Index) - now := time.Now().UTC() - job1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)[0] - job1.CompletedAt = &now - job1.Status = oapi.JobStatusSuccessful - store.Jobs.Upsert(ctx, job1) - - wfv, err := NewWorkflowView(store, wf.Id) - assert.NoError(t, err) - assert.False(t, wfv.IsComplete()) - assert.Equal(t, 1, wfv.GetNextJob().Index) - - err = manager.ReconcileWorkflow(ctx, wf) - assert.NoError(t, err) - - wfJobs = store.WorkflowJobs.GetByWorkflowId(wf.Id) - assert.Len(t, wfJobs, 2) - assert.Equal(t, 0, wfJobs[0].Index) - assert.Equal(t, 1, wfJobs[1].Index) + jobs1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id) + assert.Len(t, jobs1, 1) + assert.Equal(t, oapi.JobStatusPending, jobs1[0].Status) + assert.Equal(t, jobAgent1.Id, jobs1[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "test-config": "test-value-1", + "delaySeconds": 10, + }, jobs1[0].JobAgentConfig) - jobs := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id) - assert.Len(t, jobs, 1) - assert.Equal(t, oapi.JobStatusPending, jobs[0].Status) - assert.Equal(t, wfJobs[1].Id, jobs[0].WorkflowJobId) - assert.Equal(t, jobAgent2.Id, jobs[0].JobAgentId) + jobs2 := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id) + assert.Len(t, jobs2, 1) + assert.Equal(t, oapi.JobStatusPending, jobs2[0].Status) + assert.Equal(t, jobAgent2.Id, jobs2[0].JobAgentId) assert.Equal(t, oapi.JobAgentConfig{ "test-config": "test-value-2", "delaySeconds": 20, - }, jobs[0].JobAgentConfig) + }, jobs2[0].JobAgentConfig) } -func TestWorkflowManager_DoesNotContinueIfJobIsInProgress(t *testing.T) { +func TestWorkflowView_IsComplete(t *testing.T) { ctx := context.Background() store := store.New("test-workspace", statechange.NewChangeSet[any]()) jobAgentRegistry := jobagents.NewRegistry(store, verification.NewManager(store)) manager := NewWorkflowManager(store, jobAgentRegistry) - var stringInput oapi.WorkflowInput - _ = stringInput.FromWorkflowStringInput(oapi.WorkflowStringInput{ - Name: "test-input", - Type: oapi.String, - Default: "test-default", - }) - jobAgent1 := &oapi.JobAgent{ Id: "test-job-agent-1", Name: "test-job-agent-1", Type: "test-runner", - Config: map[string]any{ - "test-config": "test-value-1", - }, } store.JobAgents.Upsert(ctx, jobAgent1) @@ -223,75 +189,41 @@ func TestWorkflowManager_DoesNotContinueIfJobIsInProgress(t *testing.T) { Id: "test-job-agent-2", Name: "test-job-agent-2", Type: "test-runner", - Config: map[string]any{ - "test-config": "test-value-2", - }, } store.JobAgents.Upsert(ctx, jobAgent2) workflowTemplate := &oapi.WorkflowTemplate{ - Id: "test-workflow-template", - Name: "test-workflow-template", - Inputs: []oapi.WorkflowInput{stringInput}, + Id: "test-workflow-template", + Name: "test-workflow-template", Jobs: []oapi.WorkflowJobTemplate{ - { - Id: "test-job-1", - Name: "test-job-1", - Ref: "test-job-agent-1", - Config: map[string]any{ - "delaySeconds": 10, - }, - }, - { - Id: "test-job-2", - Name: "test-job-2", - Ref: "test-job-agent-2", - Config: map[string]any{ - "delaySeconds": 20, - }, - }, + {Id: "test-job-1", Name: "test-job-1", Ref: "test-job-agent-1"}, + {Id: "test-job-2", Name: "test-job-2", Ref: "test-job-agent-2"}, }, } store.WorkflowTemplates.Upsert(ctx, workflowTemplate) - wf, err := manager.CreateWorkflow(ctx, "test-workflow-template", map[string]any{ - "test-input": "test-value", - }) + wf, _ := manager.CreateWorkflow(ctx, "test-workflow-template", nil) + + wfv, err := NewWorkflowView(store, wf.Id) assert.NoError(t, err) - assert.NotNil(t, wf) - assert.Equal(t, "test-workflow-template", wf.WorkflowTemplateId) - assert.Equal(t, map[string]any{ - "test-input": "test-value", - }, wf.Inputs) + assert.False(t, wfv.IsComplete()) wfJobs := store.WorkflowJobs.GetByWorkflowId(wf.Id) - assert.Len(t, wfJobs, 2) - assert.Equal(t, 0, wfJobs[0].Index) - assert.Equal(t, 1, wfJobs[1].Index) - now := time.Now().UTC() + job1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)[0] job1.CompletedAt = &now - job1.Status = oapi.JobStatusInProgress + job1.Status = oapi.JobStatusSuccessful store.Jobs.Upsert(ctx, job1) - wfv, err := NewWorkflowView(store, wf.Id) - assert.NoError(t, err) + wfv, _ = NewWorkflowView(store, wf.Id) assert.False(t, wfv.IsComplete()) - assert.Nil(t, wfv.GetNextJob()) - assert.True(t, wfv.HasActiveJobs()) - - wfJob0Jobs := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id) - assert.Len(t, wfJob0Jobs, 1) - err = manager.ReconcileWorkflow(ctx, wf) - assert.NoError(t, err) - - wfJobs = store.WorkflowJobs.GetByWorkflowId(wf.Id) - assert.Len(t, wfJobs, 2) - assert.Equal(t, 0, wfJobs[0].Index) - assert.Equal(t, 1, wfJobs[1].Index) + job2 := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id)[0] + job2.CompletedAt = &now + job2.Status = oapi.JobStatusSuccessful + store.Jobs.Upsert(ctx, job2) - jobs := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id) - assert.Len(t, jobs, 0) + wfv, _ = NewWorkflowView(store, wf.Id) + assert.True(t, wfv.IsComplete()) } diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go index 1f1a67cf6..522e9bbc8 100644 --- a/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go @@ -11,8 +11,7 @@ type WorkflowView struct { store *store.Store workflow *oapi.Workflow workflowJobs []*oapi.WorkflowJob - - jobs map[string][]*oapi.Job + jobs map[string][]*oapi.Job } func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, error) { @@ -26,8 +25,8 @@ func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, erro }) jobs := make(map[string][]*oapi.Job) - for _, job := range workflowJobs { - jobs[job.Id] = store.Jobs.GetByWorkflowJobId(job.Id) + for _, wfJob := range workflowJobs { + jobs[wfJob.Id] = store.Jobs.GetByWorkflowJobId(wfJob.Id) } return &WorkflowView{ @@ -39,57 +38,18 @@ func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, erro } func (w *WorkflowView) IsComplete() bool { - for _, job := range w.workflowJobs { - if !w.isJobComplete(job.Id) { - return false - } - } - return true -} - -func (w *WorkflowView) isJobComplete(jobId string) bool { - if len(w.jobs[jobId]) == 0 { - return false - } - - for _, job := range w.jobs[jobId] { - if !job.IsInTerminalState() { + for _, wfJob := range w.workflowJobs { + jobs := w.jobs[wfJob.Id] + if len(jobs) == 0 { return false } - } - return true -} - -func (w *WorkflowView) isJobInProgress(jobId string) bool { - for _, job := range w.jobs[jobId] { - if !job.IsInTerminalState() { - return true - } - } - return false -} - -func (w *WorkflowView) HasActiveJobs() bool { - for _, jobs := range w.jobs { for _, job := range jobs { if !job.IsInTerminalState() { - return true + return false } } } - return false -} - -func (w *WorkflowView) GetNextJob() *oapi.WorkflowJob { - for _, job := range w.workflowJobs { - if !w.isJobComplete(job.Id) { - if w.isJobInProgress(job.Id) { - return nil - } - return job - } - } - return nil + return true } func (w *WorkflowView) GetWorkflow() *oapi.Workflow { diff --git a/apps/workspace-engine/test/e2e/engine_workflow_test.go b/apps/workspace-engine/test/e2e/engine_workflow_test.go index 7f9055f3c..76991b610 100644 --- a/apps/workspace-engine/test/e2e/engine_workflow_test.go +++ b/apps/workspace-engine/test/e2e/engine_workflow_test.go @@ -153,7 +153,7 @@ func TestEngine_Workflow_MultipleInputs(t *testing.T) { }, jobs[0].JobAgentConfig) } -func TestEngine_Workflow_MultipleSteps(t *testing.T) { +func TestEngine_Workflow_MultipleJobsConcurrent(t *testing.T) { jobAgentID1 := uuid.New().String() jobAgentID2 := uuid.New().String() workflowTemplateID := uuid.New().String() @@ -222,7 +222,18 @@ func TestEngine_Workflow_MultipleSteps(t *testing.T) { }, wfJob1jobs[0].JobAgentConfig) wfJob2jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[1].Id) - assert.Len(t, wfJob2jobs, 0) + assert.Len(t, wfJob2jobs, 1) + assert.Equal(t, oapi.JobStatusPending, wfJob2jobs[0].Status) + assert.Equal(t, workflowJobs[1].Id, wfJob2jobs[0].WorkflowJobId) + assert.Equal(t, jobAgentID2, wfJob2jobs[0].JobAgentId) + assert.Equal(t, oapi.JobAgentConfig{ + "delaySeconds": float64(20), + }, wfJob2jobs[0].JobAgentConfig) + + wfv, err := workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow.Id) + assert.NoError(t, err) + assert.NotNil(t, wfv) + assert.False(t, wfv.IsComplete()) completedAt := time.Now() jobUpdateEvent := &oapi.JobUpdateEvent{ @@ -239,18 +250,8 @@ func TestEngine_Workflow_MultipleSteps(t *testing.T) { } engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent) - wfJob2jobs = engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[1].Id) - assert.Len(t, wfJob2jobs, 1) - assert.Equal(t, oapi.JobStatusPending, wfJob2jobs[0].Status) - assert.Equal(t, workflowJobs[1].Id, wfJob2jobs[0].WorkflowJobId) - assert.Equal(t, jobAgentID2, wfJob2jobs[0].JobAgentId) - assert.Equal(t, oapi.JobAgentConfig{ - "delaySeconds": float64(20), - }, wfJob2jobs[0].JobAgentConfig) - - wfv, err := workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow.Id) + wfv, err = workflowmanager.NewWorkflowView(engine.Workspace().Store(), workflow.Id) assert.NoError(t, err) - assert.NotNil(t, wfv) assert.False(t, wfv.IsComplete()) completedAt2 := time.Now()