⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/workspace-engine/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"workspace-engine/pkg/events/handler/system"
"workspace-engine/pkg/events/handler/tick"
"workspace-engine/pkg/events/handler/userapprovalrecords"
"workspace-engine/pkg/events/handler/workflows"
)

var handlers = handler.HandlerRegistry{
Expand Down Expand Up @@ -87,6 +88,9 @@ var handlers = handler.HandlerRegistry{
handler.WorkspaceTick: tick.HandleWorkspaceTick,

handler.ReleaseTargetDeploy: redeploy.HandleReleaseTargetDeploy,

handler.WorkflowTemplateCreate: workflows.HandleWorkflowTemplateCreated,
handler.WorkflowCreate: workflows.HandleWorkflowCreated,
}

func NewEventHandler() *handler.EventListener {
Expand Down
3 changes: 3 additions & 0 deletions apps/workspace-engine/pkg/events/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ const (
WorkspaceSave EventType = "workspace.save"

ReleaseTargetDeploy EventType = "release-target.deploy"

WorkflowTemplateCreate EventType = "workflow-template.created"
WorkflowCreate EventType = "workflow.created"
)

// RawEvent represents the raw event data received from Kafka messages
Expand Down
11 changes: 11 additions & 0 deletions apps/workspace-engine/pkg/events/handler/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ func triggerActionsOnStatusChange(ctx context.Context, ws *workspace.Workspace,
if err != nil {
log.Error("error triggering actions on status change", "job_id", job.Id, "from", previousStatus, "to", job.Status, "error", err.Error())
}

if job.Status != oapi.JobStatusSuccessful {
return
}

err = ws.
WorkflowActionOrchestrator().
OnJobSuccess(ctx, job)
if err != nil {
log.Error("error triggering actions on job success", "job_id", job.Id, "error", err.Error())
}
}

// invalidateCacheForJob invalidates the release target state cache for the job's release target.
Expand Down
38 changes: 38 additions & 0 deletions apps/workspace-engine/pkg/events/handler/workflows/workflows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package workflows

import (
"context"
"encoding/json"
"workspace-engine/pkg/events/handler"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace"
)

func HandleWorkflowTemplateCreated(
ctx context.Context,
ws *workspace.Workspace,
event handler.RawEvent,
) error {
workflowTemplate := &oapi.WorkflowTemplate{}
if err := json.Unmarshal(event.Data, workflowTemplate); err != nil {
return err
}
ws.WorkflowTemplates().Upsert(ctx, workflowTemplate)
return nil
}

func HandleWorkflowCreated(
ctx context.Context,
ws *workspace.Workspace,
event handler.RawEvent,
) error {
workflow := &oapi.Workflow{}
if err := json.Unmarshal(event.Data, workflow); err != nil {
return err
}

if _, err := ws.WorkflowManager().CreateWorkflow(ctx, workflow.WorkflowTemplateId, workflow.Inputs); err != nil {
return err
}
return nil
}
10 changes: 10 additions & 0 deletions apps/workspace-engine/pkg/workspace/store/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ func (w *Workflows) Get(id string) (*oapi.Workflow, bool) {
return w.repo.Workflows.Get(id)
}

func (w *Workflows) GetByTemplateID(templateID string) map[string]*oapi.Workflow {
workflows := make(map[string]*oapi.Workflow)
for _, workflow := range w.repo.Workflows.Items() {
if workflow.WorkflowTemplateId == templateID {
workflows[workflow.Id] = workflow
}
}
return workflows
}

func (w *Workflows) Upsert(ctx context.Context, workflow *oapi.Workflow) {
w.repo.Workflows.Set(workflow.Id, workflow)
w.store.changeset.RecordUpsert(workflow)
Expand Down
42 changes: 42 additions & 0 deletions apps/workspace-engine/pkg/workspace/workflowmanager/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package workflowmanager

import (
"context"
"fmt"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/store"
)

type WorkflowManagerAction struct {
store *store.Store
manager *Manager
}

func NewWorkflowManagerAction(store *store.Store, manager *Manager) *WorkflowManagerAction {
return &WorkflowManagerAction{
store: store,
manager: manager,
}
}

func (w *WorkflowManagerAction) Name() string {
return "workflowmanager"
}

func (w *WorkflowManagerAction) Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error {
if trigger != ActionTriggerJobSuccess {
return nil
}

workflowStep, ok := w.store.WorkflowSteps.Get(job.WorkflowStepId)
if !ok {
return nil
}

workflow, ok := w.store.Workflows.Get(workflowStep.WorkflowId)
if !ok {
return fmt.Errorf("workflow %s not found for step %s", workflowStep.WorkflowId, workflowStep.Id)
}

return w.manager.ReconcileWorkflow(ctx, workflow)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package workflowmanager

import (
"context"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/store"
)

type ActionTrigger string

const (
ActionTriggerJobSuccess ActionTrigger = "job.success"
)

type WorkflowAction interface {
Name() string
Execute(ctx context.Context, trigger ActionTrigger, job *oapi.Job) error
}

type WorkflowActionOrchestrator struct {
store *store.Store
actions []WorkflowAction
}

func NewWorkflowActionOrchestrator(store *store.Store) *WorkflowActionOrchestrator {
return &WorkflowActionOrchestrator{
store: store,
actions: make([]WorkflowAction, 0),
}
}

func (w *WorkflowActionOrchestrator) RegisterAction(action WorkflowAction) *WorkflowActionOrchestrator {
w.actions = append(w.actions, action)
return w
}

func (w *WorkflowActionOrchestrator) OnJobSuccess(ctx context.Context, job *oapi.Job) error {
for _, action := range w.actions {
if err := action.Execute(ctx, ActionTriggerJobSuccess, job); err != nil {
return err
}
}
return nil
}
46 changes: 39 additions & 7 deletions apps/workspace-engine/pkg/workspace/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"workspace-engine/pkg/workspace/releasemanager/trace/spanstore"
"workspace-engine/pkg/workspace/releasemanager/verification"
"workspace-engine/pkg/workspace/store"
"workspace-engine/pkg/workspace/workflowmanager"
)

func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace {
Expand All @@ -38,6 +39,7 @@ func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace

// Create release manager with trace store (will panic if nil)
ws.releasemanager = releasemanager.New(s, ws.traceStore, ws.verificationManager, ws.jobAgentRegistry)
ws.workflowManager = workflowmanager.NewWorkflowManager(s, ws.jobAgentRegistry)

reconcileFn := func(ctx context.Context, targets []*oapi.ReleaseTarget) error {
return ws.releasemanager.ReconcileTargets(ctx, targets, releasemanager.WithTrigger(trace.TriggerJobSuccess))
Expand All @@ -50,25 +52,35 @@ func New(ctx context.Context, id string, options ...WorkspaceOption) *Workspace
RegisterAction(environmentprogression.NewEnvironmentProgressionAction(s, reconcileFn)).
RegisterAction(rollback.NewRollbackAction(s, ws.jobAgentRegistry))

ws.workflowActionOrchestrator = workflowmanager.
NewWorkflowActionOrchestrator(s).
RegisterAction(workflowmanager.NewWorkflowManagerAction(s, ws.workflowManager))

return ws
}

type Workspace struct {
ID string

changeset *statechange.ChangeSet[any]
store *store.Store
verificationManager *verification.Manager
releasemanager *releasemanager.Manager
traceStore releasemanager.PersistenceStore
actionOrchestrator *action.Orchestrator
jobAgentRegistry *jobagents.Registry
changeset *statechange.ChangeSet[any]
store *store.Store
verificationManager *verification.Manager
workflowManager *workflowmanager.Manager
releasemanager *releasemanager.Manager
traceStore releasemanager.PersistenceStore
actionOrchestrator *action.Orchestrator
workflowActionOrchestrator *workflowmanager.WorkflowActionOrchestrator
jobAgentRegistry *jobagents.Registry
}

func (w *Workspace) ActionOrchestrator() *action.Orchestrator {
return w.actionOrchestrator
}

func (w *Workspace) WorkflowActionOrchestrator() *workflowmanager.WorkflowActionOrchestrator {
return w.workflowActionOrchestrator
}

func (w *Workspace) Store() *store.Store {
return w.store
}
Expand All @@ -85,6 +97,10 @@ func (w *Workspace) VerificationManager() *verification.Manager {
return w.verificationManager
}

func (w *Workspace) WorkflowManager() *workflowmanager.Manager {
return w.workflowManager
}

func (w *Workspace) JobAgentRegistry() *jobagents.Registry {
return w.jobAgentRegistry
}
Expand Down Expand Up @@ -164,3 +180,19 @@ func (w *Workspace) DeploymentVariableValues() *store.DeploymentVariableValues {
func (w *Workspace) Relations() *store.Relations {
return w.store.Relations
}

func (w *Workspace) WorkflowTemplates() *store.WorkflowTemplates {
return w.store.WorkflowTemplates
}

func (w *Workspace) WorkflowStepTemplates() *store.WorkflowStepTemplates {
return w.store.WorkflowStepTemplates
}

func (w *Workspace) Workflows() *store.Workflows {
return w.store.Workflows
}

func (w *Workspace) WorkflowSteps() *store.WorkflowSteps {
return w.store.WorkflowSteps
}
Loading
Loading