⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions apps/workspace-engine/pkg/workspace/workflowmanager/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workflowmanager

import (
"context"
"fmt"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/store"
)
Expand All @@ -24,19 +23,5 @@ func (w *WorkflowManagerAction) Name() string {
}

func (w *WorkflowManagerAction) Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error {
if trigger != ActionTriggerJobSuccess {
return nil
}

workflowJob, ok := w.store.WorkflowJobs.Get(job.WorkflowJobId)
if !ok {
return nil
}

workflow, ok := w.store.Workflows.Get(workflowJob.WorkflowId)
if !ok {
return fmt.Errorf("workflow %s not found for job %s", workflowJob.WorkflowId, workflowJob.Id)
}

return w.manager.ReconcileWorkflow(ctx, workflow)
return nil
}
47 changes: 14 additions & 33 deletions apps/workspace-engine/pkg/workspace/workflowmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func NewWorkflowManager(store *store.Store, jobAgentRegistry *jobagents.Registry
}
}

func (w *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, inputs map[string]any) (*oapi.Workflow, error) {
workflowTemplate, ok := w.store.WorkflowTemplates.Get(workflowTemplateId)
func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, inputs map[string]any) (*oapi.Workflow, error) {
workflowTemplate, ok := m.store.WorkflowTemplates.Get(workflowTemplateId)
if !ok {
return nil, fmt.Errorf("workflow template %s not found", workflowTemplateId)
}
Expand All @@ -36,34 +36,37 @@ func (w *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string,
Inputs: maps.Clone(inputs),
}

workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
job := &oapi.WorkflowJob{
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
w.store.WorkflowJobs.Upsert(ctx, job)
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}

w.store.Workflows.Upsert(ctx, workflow)
m.store.Workflows.Upsert(ctx, workflow)

for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
Comment on lines +39 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Workflow jobs are upserted before the workflow entity exists.

Lines 48-49 upsert WorkflowJob records referencing workflow.Id before the Workflow itself is persisted at line 52. If a failure occurs between these operations, orphaned workflow jobs could exist without their parent workflow.

Additionally, if dispatchJob fails partway through the loop (lines 54-58), some jobs will be dispatched while others won't, potentially leaving the workflow in a partially-dispatched state.

Consider upserting the workflow first, or wrapping the entire operation in a transaction-like pattern if atomicity is required.

🔧 Proposed fix to upsert workflow before its jobs
 	workflow := &oapi.Workflow{
 		Id:                 uuid.New().String(),
 		WorkflowTemplateId: workflowTemplateId,
 		Inputs:             maps.Clone(inputs),
 	}
+	m.store.Workflows.Upsert(ctx, workflow)

 	workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
 	for idx, jobTemplate := range workflowTemplate.Jobs {
 		wfJob := &oapi.WorkflowJob{
 			Id:         uuid.New().String(),
 			WorkflowId: workflow.Id,
 			Index:      idx,
 			Ref:        jobTemplate.Ref,
 			Config:     maps.Clone(jobTemplate.Config),
 		}
 		m.store.WorkflowJobs.Upsert(ctx, wfJob)
 		workflowJobs = append(workflowJobs, wfJob)
 	}

-	m.store.Workflows.Upsert(ctx, workflow)
-
 	for _, wfJob := range workflowJobs {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
job := &oapi.WorkflowJob{
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
w.store.WorkflowJobs.Upsert(ctx, job)
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
w.store.Workflows.Upsert(ctx, workflow)
m.store.Workflows.Upsert(ctx, workflow)
for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
workflow := &oapi.Workflow{
Id: uuid.New().String(),
WorkflowTemplateId: workflowTemplateId,
Inputs: maps.Clone(inputs),
}
m.store.Workflows.Upsert(ctx, workflow)
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
39 - 58, The code upserts WorkflowJob records (m.store.WorkflowJobs.Upsert
inside the loop over workflowTemplate.Jobs) before the parent Workflow is
persisted (m.store.Workflows.Upsert), and then calls m.dispatchJob which may
partially dispatch jobs on error; change the flow to persist the Workflow first
by calling m.store.Workflows.Upsert(workflow) before creating/upserting any
WorkflowJob, then insert all WorkflowJob records, and only after all upserts
succeed call m.dispatchJob for each job; alternatively implement a
transaction-like pattern (wrap the Upsert of Workflow and all WorkflowJobs in a
single transactional operation or add compensating rollback logic if dispatch
fails) so the operations are atomic and you avoid orphaned jobs and
partially-dispatched workflows.


w.ReconcileWorkflow(ctx, workflow)
return workflow, nil
}

// dispatchJobForStep dispatches a job for the given step
func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) error {
jobAgent, ok := m.store.JobAgents.Get(wfJob.Ref)
if !ok {
return fmt.Errorf("job agent %s not found", wfJob.Ref)
}

mergedConfig, err := mergeJobAgentConfig(
jobAgent.Config,
wfJob.Config,
)
mergedConfig, err := mergeJobAgentConfig(jobAgent.Config, wfJob.Config)
if err != nil {
return fmt.Errorf("failed to merge job agent config: %w", err)
}
Expand All @@ -87,28 +90,6 @@ func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) erro
return nil
}

// ReconcileWorkflow reconciles a workflow, advancing to the next step if ready.
func (m *Manager) ReconcileWorkflow(ctx context.Context, workflow *oapi.Workflow) error {
wfv, err := NewWorkflowView(m.store, workflow.Id)
if err != nil {
return fmt.Errorf("failed to create workflow view: %w", err)
}

if wfv.IsComplete() {
return nil
}
if wfv.HasActiveJobs() {
return nil
}

nextJob := wfv.GetNextJob()
if nextJob == nil {
return nil
}

return m.dispatchJob(ctx, nextJob)
}

func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) (oapi.JobAgentConfig, error) {
mergedConfig := make(map[string]any)
for _, config := range configs {
Expand Down
132 changes: 32 additions & 100 deletions apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ func TestWorkflowManager_CreatesNewWorkflow(t *testing.T) {
}
store.JobAgents.Upsert(ctx, jobAgent1)

jobAgent2 := &oapi.JobAgent{
Id: "test-job-agent-2",
Name: "test-job-agent-2",
Type: "test-runner",
Config: map[string]any{
"test-config": "test-value-2",
},
}
store.JobAgents.Upsert(ctx, jobAgent2)

workflowTemplate := &oapi.WorkflowTemplate{
Id: "test-workflow-template",
Name: "test-workflow-template",
Expand Down Expand Up @@ -93,7 +83,7 @@ func TestWorkflowManager_CreatesNewWorkflow(t *testing.T) {
}, job.JobAgentConfig)
}

func TestWorkflowManager_ContinuesWorkflowAfterJobComplete(t *testing.T) {
func TestWorkflowManager_DispatchesAllJobsConcurrently(t *testing.T) {
ctx := context.Background()
store := store.New("test-workspace", statechange.NewChangeSet[any]())
jobAgentRegistry := jobagents.NewRegistry(store, verification.NewManager(store))
Expand Down Expand Up @@ -157,141 +147,83 @@ func TestWorkflowManager_ContinuesWorkflowAfterJobComplete(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, wf)
assert.Equal(t, "test-workflow-template", wf.WorkflowTemplateId)
assert.Equal(t, map[string]any{
"test-input": "test-value",
}, wf.Inputs)

wfJobs := store.WorkflowJobs.GetByWorkflowId(wf.Id)
assert.Len(t, wfJobs, 2)
assert.Equal(t, 0, wfJobs[0].Index)
assert.Equal(t, 1, wfJobs[1].Index)

now := time.Now().UTC()
job1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)[0]
job1.CompletedAt = &now
job1.Status = oapi.JobStatusSuccessful
store.Jobs.Upsert(ctx, job1)

wfv, err := NewWorkflowView(store, wf.Id)
assert.NoError(t, err)
assert.False(t, wfv.IsComplete())
assert.Equal(t, 1, wfv.GetNextJob().Index)

err = manager.ReconcileWorkflow(ctx, wf)
assert.NoError(t, err)

wfJobs = store.WorkflowJobs.GetByWorkflowId(wf.Id)
assert.Len(t, wfJobs, 2)
assert.Equal(t, 0, wfJobs[0].Index)
assert.Equal(t, 1, wfJobs[1].Index)
jobs1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)
assert.Len(t, jobs1, 1)
assert.Equal(t, oapi.JobStatusPending, jobs1[0].Status)
assert.Equal(t, jobAgent1.Id, jobs1[0].JobAgentId)
assert.Equal(t, oapi.JobAgentConfig{
"test-config": "test-value-1",
"delaySeconds": 10,
}, jobs1[0].JobAgentConfig)

jobs := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id)
assert.Len(t, jobs, 1)
assert.Equal(t, oapi.JobStatusPending, jobs[0].Status)
assert.Equal(t, wfJobs[1].Id, jobs[0].WorkflowJobId)
assert.Equal(t, jobAgent2.Id, jobs[0].JobAgentId)
jobs2 := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id)
assert.Len(t, jobs2, 1)
assert.Equal(t, oapi.JobStatusPending, jobs2[0].Status)
assert.Equal(t, jobAgent2.Id, jobs2[0].JobAgentId)
assert.Equal(t, oapi.JobAgentConfig{
"test-config": "test-value-2",
"delaySeconds": 20,
}, jobs[0].JobAgentConfig)
}, jobs2[0].JobAgentConfig)
}

func TestWorkflowManager_DoesNotContinueIfJobIsInProgress(t *testing.T) {
func TestWorkflowView_IsComplete(t *testing.T) {
ctx := context.Background()
store := store.New("test-workspace", statechange.NewChangeSet[any]())
jobAgentRegistry := jobagents.NewRegistry(store, verification.NewManager(store))
manager := NewWorkflowManager(store, jobAgentRegistry)

var stringInput oapi.WorkflowInput
_ = stringInput.FromWorkflowStringInput(oapi.WorkflowStringInput{
Name: "test-input",
Type: oapi.String,
Default: "test-default",
})

jobAgent1 := &oapi.JobAgent{
Id: "test-job-agent-1",
Name: "test-job-agent-1",
Type: "test-runner",
Config: map[string]any{
"test-config": "test-value-1",
},
}
store.JobAgents.Upsert(ctx, jobAgent1)

jobAgent2 := &oapi.JobAgent{
Id: "test-job-agent-2",
Name: "test-job-agent-2",
Type: "test-runner",
Config: map[string]any{
"test-config": "test-value-2",
},
}
store.JobAgents.Upsert(ctx, jobAgent2)

workflowTemplate := &oapi.WorkflowTemplate{
Id: "test-workflow-template",
Name: "test-workflow-template",
Inputs: []oapi.WorkflowInput{stringInput},
Id: "test-workflow-template",
Name: "test-workflow-template",
Jobs: []oapi.WorkflowJobTemplate{
{
Id: "test-job-1",
Name: "test-job-1",
Ref: "test-job-agent-1",
Config: map[string]any{
"delaySeconds": 10,
},
},
{
Id: "test-job-2",
Name: "test-job-2",
Ref: "test-job-agent-2",
Config: map[string]any{
"delaySeconds": 20,
},
},
{Id: "test-job-1", Name: "test-job-1", Ref: "test-job-agent-1"},
{Id: "test-job-2", Name: "test-job-2", Ref: "test-job-agent-2"},
},
}
store.WorkflowTemplates.Upsert(ctx, workflowTemplate)

wf, err := manager.CreateWorkflow(ctx, "test-workflow-template", map[string]any{
"test-input": "test-value",
})
wf, _ := manager.CreateWorkflow(ctx, "test-workflow-template", nil)

wfv, err := NewWorkflowView(store, wf.Id)
assert.NoError(t, err)
assert.NotNil(t, wf)
assert.Equal(t, "test-workflow-template", wf.WorkflowTemplateId)
assert.Equal(t, map[string]any{
"test-input": "test-value",
}, wf.Inputs)
assert.False(t, wfv.IsComplete())

wfJobs := store.WorkflowJobs.GetByWorkflowId(wf.Id)
assert.Len(t, wfJobs, 2)
assert.Equal(t, 0, wfJobs[0].Index)
assert.Equal(t, 1, wfJobs[1].Index)

now := time.Now().UTC()

job1 := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)[0]
job1.CompletedAt = &now
job1.Status = oapi.JobStatusInProgress
job1.Status = oapi.JobStatusSuccessful
store.Jobs.Upsert(ctx, job1)

wfv, err := NewWorkflowView(store, wf.Id)
assert.NoError(t, err)
wfv, _ = NewWorkflowView(store, wf.Id)
assert.False(t, wfv.IsComplete())
assert.Nil(t, wfv.GetNextJob())
assert.True(t, wfv.HasActiveJobs())

wfJob0Jobs := store.Jobs.GetByWorkflowJobId(wfJobs[0].Id)
assert.Len(t, wfJob0Jobs, 1)

err = manager.ReconcileWorkflow(ctx, wf)
assert.NoError(t, err)

wfJobs = store.WorkflowJobs.GetByWorkflowId(wf.Id)
assert.Len(t, wfJobs, 2)
assert.Equal(t, 0, wfJobs[0].Index)
assert.Equal(t, 1, wfJobs[1].Index)
job2 := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id)[0]
job2.CompletedAt = &now
job2.Status = oapi.JobStatusSuccessful
store.Jobs.Upsert(ctx, job2)

jobs := store.Jobs.GetByWorkflowJobId(wfJobs[1].Id)
assert.Len(t, jobs, 0)
wfv, _ = NewWorkflowView(store, wf.Id)
assert.True(t, wfv.IsComplete())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ type WorkflowView struct {
store *store.Store
workflow *oapi.Workflow
workflowJobs []*oapi.WorkflowJob

jobs map[string][]*oapi.Job
jobs map[string][]*oapi.Job
}

func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, error) {
Expand All @@ -26,8 +25,8 @@ func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, erro
})

jobs := make(map[string][]*oapi.Job)
for _, job := range workflowJobs {
jobs[job.Id] = store.Jobs.GetByWorkflowJobId(job.Id)
for _, wfJob := range workflowJobs {
jobs[wfJob.Id] = store.Jobs.GetByWorkflowJobId(wfJob.Id)
}

return &WorkflowView{
Expand All @@ -39,57 +38,18 @@ func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, erro
}

func (w *WorkflowView) IsComplete() bool {
for _, job := range w.workflowJobs {
if !w.isJobComplete(job.Id) {
return false
}
}
return true
}

func (w *WorkflowView) isJobComplete(jobId string) bool {
if len(w.jobs[jobId]) == 0 {
return false
}

for _, job := range w.jobs[jobId] {
if !job.IsInTerminalState() {
for _, wfJob := range w.workflowJobs {
jobs := w.jobs[wfJob.Id]
if len(jobs) == 0 {
return false
}
}
return true
}

func (w *WorkflowView) isJobInProgress(jobId string) bool {
for _, job := range w.jobs[jobId] {
if !job.IsInTerminalState() {
return true
}
}
return false
}

func (w *WorkflowView) HasActiveJobs() bool {
for _, jobs := range w.jobs {
for _, job := range jobs {
if !job.IsInTerminalState() {
return true
return false
}
}
}
return false
}

func (w *WorkflowView) GetNextJob() *oapi.WorkflowJob {
for _, job := range w.workflowJobs {
if !w.isJobComplete(job.Id) {
if w.isJobInProgress(job.Id) {
return nil
}
return job
}
}
return nil
return true
}

func (w *WorkflowView) GetWorkflow() *oapi.Workflow {
Expand Down
Loading
Loading