chore: make workflow jobs run concurrently#773
Conversation
📝 WalkthroughWalkthroughThe pull request shifts the workflow orchestration model from sequential reconciliation to immediate concurrent job dispatch. The WorkflowManagerAction no longer invokes ReconcileWorkflow; instead, CreateWorkflow now constructs all workflow jobs and dispatches them immediately via dispatchJob. WorkflowView is simplified by removing GetNextJob and per-job completion helpers. Tests are updated to reflect concurrent dispatch expectations. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Manager
participant Store
participant JobAgentRegistry
Client->>Manager: CreateWorkflow(workflowTemplateId, inputs)
Manager->>Store: Retrieve WorkflowTemplate
Store-->>Manager: WorkflowTemplate
Manager->>Manager: Construct Workflow object
Manager->>Store: Upsert Workflow
loop For each jobTemplate in workflow
Manager->>Manager: Create WorkflowJob
Manager->>Store: Upsert WorkflowJob
Manager->>Manager: dispatchJob(workflowJob)
Manager->>Store: Create Job entity
Manager->>Store: Upsert Job
Manager->>JobAgentRegistry: Dispatch Job
JobAgentRegistry-->>Manager: Dispatch acknowledgment
end
Manager-->>Client: Workflow + all jobs dispatched
sequenceDiagram
participant WorkflowView
participant Store
Note over WorkflowView: IsComplete() check
WorkflowView->>Store: Retrieve workflowJobs for workflow
loop For each workflowJob
WorkflowView->>Store: Get job list via wfJob.Id
alt Job list empty
WorkflowView-->>WorkflowView: Return false (incomplete)
else Job list exists
WorkflowView->>WorkflowView: Check completion status
end
end
WorkflowView-->>WorkflowView: Return true if all jobs complete
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go (1)
63-65:⚠️ Potential issue | 🟡 MinorMissing bounds check in
GetJobcould cause panic.
GetJob(index)accessesw.workflowJobs[index]directly without validating thatindexis within bounds. This could cause a panic if called with an out-of-range index.🛡️ Proposed fix to add bounds check
func (w *WorkflowView) GetJob(index int) *oapi.WorkflowJob { + if index < 0 || index >= len(w.workflowJobs) { + return nil + } return w.workflowJobs[index] }apps/workspace-engine/pkg/workspace/workflowmanager/action.go (1)
9-27:⚠️ Potential issue | 🟡 MinorRemove unused struct fields or clarify their purpose.
The
storeandmanagerfields are instantiated but unused in theExecutemethod, which returnsnilas an intentional no-op. If these fields are not needed for future functionality, remove them to reduce confusion. If they are placeholders for upcoming logic, add a comment explaining the intent.
🤖 Fix all issues with AI agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go`:
- Around line 39-58: The code upserts WorkflowJob records
(m.store.WorkflowJobs.Upsert inside the loop over workflowTemplate.Jobs) before
the parent Workflow is persisted (m.store.Workflows.Upsert), and then calls
m.dispatchJob which may partially dispatch jobs on error; change the flow to
persist the Workflow first by calling m.store.Workflows.Upsert(workflow) before
creating/upserting any WorkflowJob, then insert all WorkflowJob records, and
only after all upserts succeed call m.dispatchJob for each job; alternatively
implement a transaction-like pattern (wrap the Upsert of Workflow and all
WorkflowJobs in a single transactional operation or add compensating rollback
logic if dispatch fails) so the operations are atomic and you avoid orphaned
jobs and partially-dispatched workflows.
| workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs)) | ||
| for idx, jobTemplate := range workflowTemplate.Jobs { | ||
| job := &oapi.WorkflowJob{ | ||
| wfJob := &oapi.WorkflowJob{ | ||
| Id: uuid.New().String(), | ||
| WorkflowId: workflow.Id, | ||
| Index: idx, | ||
| Ref: jobTemplate.Ref, | ||
| Config: maps.Clone(jobTemplate.Config), | ||
| } | ||
| w.store.WorkflowJobs.Upsert(ctx, job) | ||
| m.store.WorkflowJobs.Upsert(ctx, wfJob) | ||
| workflowJobs = append(workflowJobs, wfJob) | ||
| } | ||
|
|
||
| w.store.Workflows.Upsert(ctx, workflow) | ||
| m.store.Workflows.Upsert(ctx, workflow) | ||
|
|
||
| for _, wfJob := range workflowJobs { | ||
| if err := m.dispatchJob(ctx, wfJob); err != nil { | ||
| return workflow, err | ||
| } | ||
| } |
There was a problem hiding this comment.
Workflow jobs are upserted before the workflow entity exists.
Lines 48-49 upsert WorkflowJob records referencing workflow.Id before the Workflow itself is persisted at line 52. If a failure occurs between these operations, orphaned workflow jobs could exist without their parent workflow.
Additionally, if dispatchJob fails partway through the loop (lines 54-58), some jobs will be dispatched while others won't, potentially leaving the workflow in a partially-dispatched state.
Consider upserting the workflow first, or wrapping the entire operation in a transaction-like pattern if atomicity is required.
🔧 Proposed fix to upsert workflow before its jobs
workflow := &oapi.Workflow{
Id: uuid.New().String(),
WorkflowTemplateId: workflowTemplateId,
Inputs: maps.Clone(inputs),
}
+ m.store.Workflows.Upsert(ctx, workflow)
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
- m.store.Workflows.Upsert(ctx, workflow)
-
for _, wfJob := range workflowJobs {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs)) | |
| for idx, jobTemplate := range workflowTemplate.Jobs { | |
| job := &oapi.WorkflowJob{ | |
| wfJob := &oapi.WorkflowJob{ | |
| Id: uuid.New().String(), | |
| WorkflowId: workflow.Id, | |
| Index: idx, | |
| Ref: jobTemplate.Ref, | |
| Config: maps.Clone(jobTemplate.Config), | |
| } | |
| w.store.WorkflowJobs.Upsert(ctx, job) | |
| m.store.WorkflowJobs.Upsert(ctx, wfJob) | |
| workflowJobs = append(workflowJobs, wfJob) | |
| } | |
| w.store.Workflows.Upsert(ctx, workflow) | |
| m.store.Workflows.Upsert(ctx, workflow) | |
| for _, wfJob := range workflowJobs { | |
| if err := m.dispatchJob(ctx, wfJob); err != nil { | |
| return workflow, err | |
| } | |
| } | |
| workflow := &oapi.Workflow{ | |
| Id: uuid.New().String(), | |
| WorkflowTemplateId: workflowTemplateId, | |
| Inputs: maps.Clone(inputs), | |
| } | |
| m.store.Workflows.Upsert(ctx, workflow) | |
| workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs)) | |
| for idx, jobTemplate := range workflowTemplate.Jobs { | |
| wfJob := &oapi.WorkflowJob{ | |
| Id: uuid.New().String(), | |
| WorkflowId: workflow.Id, | |
| Index: idx, | |
| Ref: jobTemplate.Ref, | |
| Config: maps.Clone(jobTemplate.Config), | |
| } | |
| m.store.WorkflowJobs.Upsert(ctx, wfJob) | |
| workflowJobs = append(workflowJobs, wfJob) | |
| } | |
| for _, wfJob := range workflowJobs { | |
| if err := m.dispatchJob(ctx, wfJob); err != nil { | |
| return workflow, err | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
39 - 58, The code upserts WorkflowJob records (m.store.WorkflowJobs.Upsert
inside the loop over workflowTemplate.Jobs) before the parent Workflow is
persisted (m.store.Workflows.Upsert), and then calls m.dispatchJob which may
partially dispatch jobs on error; change the flow to persist the Workflow first
by calling m.store.Workflows.Upsert(workflow) before creating/upserting any
WorkflowJob, then insert all WorkflowJob records, and only after all upserts
succeed call m.dispatchJob for each job; alternatively implement a
transaction-like pattern (wrap the Upsert of Workflow and all WorkflowJobs in a
single transactional operation or add compensating rollback logic if dispatch
fails) so the operations are atomic and you avoid orphaned jobs and
partially-dispatched workflows.
Summary by CodeRabbit