⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

chore: make workflow jobs run concurrently#773

Merged
adityachoudhari26 merged 1 commit intomainfrom
workflow-jobs-concurrent
Feb 4, 2026
Merged

chore: make workflow jobs run concurrently#773
adityachoudhari26 merged 1 commit intomainfrom
workflow-jobs-concurrent

Conversation

@adityachoudhari26
Copy link
Member

@adityachoudhari26 adityachoudhari26 commented Feb 4, 2026

Summary by CodeRabbit

  • Refactor
    • Workflow execution model updated: jobs are now dispatched concurrently during workflow creation instead of through sequential step-by-step reconciliation.
    • Eliminated the runtime reconciliation process for improved performance and streamlined execution flow.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

The pull request shifts the workflow orchestration model from sequential reconciliation to immediate concurrent job dispatch. The WorkflowManagerAction no longer invokes ReconcileWorkflow; instead, CreateWorkflow now constructs all workflow jobs and dispatches them immediately via dispatchJob. WorkflowView is simplified by removing GetNextJob and per-job completion helpers. Tests are updated to reflect concurrent dispatch expectations.

Changes

Cohort / File(s) Summary
Workflow Dispatch Refactoring
apps/workspace-engine/pkg/workspace/workflowmanager/manager.go, apps/workspace-engine/pkg/workspace/workflowmanager/action.go
Removes runtime workflow reconciliation logic and ReconcileWorkflow method. CreateWorkflow now creates and immediately dispatches all workflow jobs via dispatchJob in a single pass. Execute method in WorkflowManagerAction returns nil instead of reconciling. Receiver name in CreateWorkflow changed from w to m.
Workflow View Simplification
apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go
Removes GetNextJob and per-job helper methods (isJobComplete, isJobInProgress, HasActiveJobs). IsComplete now performs a single linear check over workflowJobs instead of multi-step decision logic. NewWorkflowView populates jobs map using wfJob.Id as key.
Test Updates
apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go, apps/workspace-engine/test/e2e/engine_workflow_test.go
Renames tests to reflect concurrent dispatch (TestWorkflowManager_ContinuesWorkflowAfterJobComplete → TestWorkflowManager_DispatchesAllJobsConcurrently). Updates expectations: workflow jobs are now created as pending in a single step and dispatched immediately. Removes assertions related to sequential reconciliation flow. E2E test validates second workflow job exists immediately after creation.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Manager
    participant Store
    participant JobAgentRegistry

    Client->>Manager: CreateWorkflow(workflowTemplateId, inputs)
    
    Manager->>Store: Retrieve WorkflowTemplate
    Store-->>Manager: WorkflowTemplate
    
    Manager->>Manager: Construct Workflow object
    Manager->>Store: Upsert Workflow
    
    loop For each jobTemplate in workflow
        Manager->>Manager: Create WorkflowJob
        Manager->>Store: Upsert WorkflowJob
        Manager->>Manager: dispatchJob(workflowJob)
        Manager->>Store: Create Job entity
        Manager->>Store: Upsert Job
        Manager->>JobAgentRegistry: Dispatch Job
        JobAgentRegistry-->>Manager: Dispatch acknowledgment
    end
    
    Manager-->>Client: Workflow + all jobs dispatched
Loading
sequenceDiagram
    participant WorkflowView
    participant Store
    
    Note over WorkflowView: IsComplete() check
    WorkflowView->>Store: Retrieve workflowJobs for workflow
    
    loop For each workflowJob
        WorkflowView->>Store: Get job list via wfJob.Id
        alt Job list empty
            WorkflowView-->>WorkflowView: Return false (incomplete)
        else Job list exists
            WorkflowView->>WorkflowView: Check completion status
        end
    end
    
    WorkflowView-->>WorkflowView: Return true if all jobs complete
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 Once reconciliation ruled the day,
Jobs queued up in sequential sway,
But now they leap forth, wild and free,
Dispatched concurrent—what a spree!
No GetNextJob, no waiting queue,
All workflows bloom at once—hippity-hoo! 🎉

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately reflects the main architectural change: refactoring workflow jobs to run concurrently instead of sequentially through reconciliation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch workflow-jobs-concurrent

Important

Action Needed: IP Allowlist Update

If your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:

  • 136.113.208.247/32 (new)
  • 34.170.211.100/32
  • 35.222.179.152/32

Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go (1)

63-65: ⚠️ Potential issue | 🟡 Minor

Missing bounds check in GetJob could cause panic.

GetJob(index) accesses w.workflowJobs[index] directly without validating that index is within bounds. This could cause a panic if called with an out-of-range index.

🛡️ Proposed fix to add bounds check
 func (w *WorkflowView) GetJob(index int) *oapi.WorkflowJob {
+	if index < 0 || index >= len(w.workflowJobs) {
+		return nil
+	}
 	return w.workflowJobs[index]
 }
apps/workspace-engine/pkg/workspace/workflowmanager/action.go (1)

9-27: ⚠️ Potential issue | 🟡 Minor

Remove unused struct fields or clarify their purpose.

The store and manager fields are instantiated but unused in the Execute method, which returns nil as an intentional no-op. If these fields are not needed for future functionality, remove them to reduce confusion. If they are placeholders for upcoming logic, add a comment explaining the intent.

🤖 Fix all issues with AI agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go`:
- Around line 39-58: The code upserts WorkflowJob records
(m.store.WorkflowJobs.Upsert inside the loop over workflowTemplate.Jobs) before
the parent Workflow is persisted (m.store.Workflows.Upsert), and then calls
m.dispatchJob which may partially dispatch jobs on error; change the flow to
persist the Workflow first by calling m.store.Workflows.Upsert(workflow) before
creating/upserting any WorkflowJob, then insert all WorkflowJob records, and
only after all upserts succeed call m.dispatchJob for each job; alternatively
implement a transaction-like pattern (wrap the Upsert of Workflow and all
WorkflowJobs in a single transactional operation or add compensating rollback
logic if dispatch fails) so the operations are atomic and you avoid orphaned
jobs and partially-dispatched workflows.

Comment on lines +39 to +58
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
job := &oapi.WorkflowJob{
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
w.store.WorkflowJobs.Upsert(ctx, job)
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}

w.store.Workflows.Upsert(ctx, workflow)
m.store.Workflows.Upsert(ctx, workflow)

for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Workflow jobs are upserted before the workflow entity exists.

Lines 48-49 upsert WorkflowJob records referencing workflow.Id before the Workflow itself is persisted at line 52. If a failure occurs between these operations, orphaned workflow jobs could exist without their parent workflow.

Additionally, if dispatchJob fails partway through the loop (lines 54-58), some jobs will be dispatched while others won't, potentially leaving the workflow in a partially-dispatched state.

Consider upserting the workflow first, or wrapping the entire operation in a transaction-like pattern if atomicity is required.

🔧 Proposed fix to upsert workflow before its jobs
 	workflow := &oapi.Workflow{
 		Id:                 uuid.New().String(),
 		WorkflowTemplateId: workflowTemplateId,
 		Inputs:             maps.Clone(inputs),
 	}
+	m.store.Workflows.Upsert(ctx, workflow)

 	workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
 	for idx, jobTemplate := range workflowTemplate.Jobs {
 		wfJob := &oapi.WorkflowJob{
 			Id:         uuid.New().String(),
 			WorkflowId: workflow.Id,
 			Index:      idx,
 			Ref:        jobTemplate.Ref,
 			Config:     maps.Clone(jobTemplate.Config),
 		}
 		m.store.WorkflowJobs.Upsert(ctx, wfJob)
 		workflowJobs = append(workflowJobs, wfJob)
 	}

-	m.store.Workflows.Upsert(ctx, workflow)
-
 	for _, wfJob := range workflowJobs {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
job := &oapi.WorkflowJob{
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
w.store.WorkflowJobs.Upsert(ctx, job)
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
w.store.Workflows.Upsert(ctx, workflow)
m.store.Workflows.Upsert(ctx, workflow)
for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
workflow := &oapi.Workflow{
Id: uuid.New().String(),
WorkflowTemplateId: workflowTemplateId,
Inputs: maps.Clone(inputs),
}
m.store.Workflows.Upsert(ctx, workflow)
workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
for idx, jobTemplate := range workflowTemplate.Jobs {
wfJob := &oapi.WorkflowJob{
Id: uuid.New().String(),
WorkflowId: workflow.Id,
Index: idx,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}
m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
for _, wfJob := range workflowJobs {
if err := m.dispatchJob(ctx, wfJob); err != nil {
return workflow, err
}
}
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
39 - 58, The code upserts WorkflowJob records (m.store.WorkflowJobs.Upsert
inside the loop over workflowTemplate.Jobs) before the parent Workflow is
persisted (m.store.Workflows.Upsert), and then calls m.dispatchJob which may
partially dispatch jobs on error; change the flow to persist the Workflow first
by calling m.store.Workflows.Upsert(workflow) before creating/upserting any
WorkflowJob, then insert all WorkflowJob records, and only after all upserts
succeed call m.dispatchJob for each job; alternatively implement a
transaction-like pattern (wrap the Upsert of Workflow and all WorkflowJobs in a
single transactional operation or add compensating rollback logic if dispatch
fails) so the operations are atomic and you avoid orphaned jobs and
partially-dispatched workflows.

@adityachoudhari26 adityachoudhari26 merged commit 291f702 into main Feb 4, 2026
9 checks passed
@adityachoudhari26 adityachoudhari26 deleted the workflow-jobs-concurrent branch February 4, 2026 16:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant