diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index f91647cb66..42765d1bb4 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -74,6 +74,18 @@ testcontainers test + + io.micrometer + micrometer-observation + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index b0fa24a5e9..881b9e9586 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -28,6 +28,8 @@ import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import javax.annotation.Nullable; import java.io.FileInputStream; @@ -41,6 +43,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; /** @@ -57,6 +60,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final DataConverter dataConverter; private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; + private final Tracer tracer; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); @@ -130,6 +134,13 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { sidecarGrpcChannel = this.managedSidecarChannel; } + if (builder.tracer != null) { + this.tracer = builder.tracer; + } else { + //this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient"); + this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow"); + } + this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); } @@ -188,9 +199,12 @@ public String scheduleNewOrchestrationInstance( builder.setScheduledStartTimestamp(ts); } + AtomicReference response = new AtomicReference<>(); + OrchestratorService.CreateInstanceRequest request = builder.build(); - OrchestratorService.CreateInstanceResponse response = this.sidecarClient.startInstance(request); - return response.getInstanceId(); + response.set(this.sidecarClient.startInstance(request)); + + return response.get().getInstanceId(); } @Override @@ -208,6 +222,7 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) OrchestratorService.RaiseEventRequest request = builder.build(); this.sidecarClient.raiseEvent(request); + } @Override diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java index f3ba1cd82a..b934fc9713 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java @@ -14,6 +14,7 @@ package io.dapr.durabletask; import io.grpc.Channel; +import io.opentelemetry.api.trace.Tracer; /** * Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process @@ -27,6 +28,7 @@ public final class DurableTaskGrpcClientBuilder { String tlsCertPath; String tlsKeyPath; boolean insecure; + Tracer tracer; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder grpcChannel(Channel channel) { return this; } + /** + * Sets the Tracer object to be used by DurableTaskClient to emit traces. + * + * @param tracer to be used by the DurableTaskClient + * @return this builder object + */ + public DurableTaskGrpcClientBuilder tracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + /** * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index eb3be6bb9a..f594226a1f 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -13,19 +13,25 @@ package io.dapr.durabletask; -import com.google.protobuf.StringValue; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; -import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.dapr.durabletask.runner.ActivityRunner; +import io.dapr.durabletask.runner.OrchestratorRunner; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; import java.time.Duration; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -50,6 +56,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final Duration maximumTimerInterval; private final ExecutorService workerPool; private final String appId; // App ID for cross-app routing + private final Tracer tracer; private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; @@ -81,12 +88,20 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { sidecarGrpcChannel = this.managedSidecarChannel; } + this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow"); + this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + + ExecutorService rawExecutor = builder.executorService != null + ? builder.executorService : Executors.newCachedThreadPool(); + this.workerPool = Context.taskWrapping(rawExecutor); + this.isExecutorServiceManaged = builder.executorService == null; + + } /** @@ -159,102 +174,25 @@ public void startAndBlock() { while (workItemStream.hasNext()) { OrchestratorService.WorkItem workItem = workItemStream.next(); OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); + if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - logger.log(Level.FINEST, + logger.log(Level.INFO, String.format("Processing orchestrator request for instance: {0}", orchestratorRequest.getInstanceId())); - // TODO: Error handling - this.workerPool.submit(() -> { - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); - - OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .addAllActions(taskOrchestratorResult.getActions()) - .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) - .setCompletionToken(workItem.getCompletionToken()) - .build(); - - try { - this.sidecarClient.completeOrchestratorTask(response); - logger.log(Level.FINEST, - "Completed orchestrator request for instance: {0}", - orchestratorRequest.getInstanceId()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, - "The sidecar at address {0} is unavailable while completing the orchestrator task.", - this.getSidecarAddress()); - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, - "Durable Task worker has disconnected from {0} while completing the orchestrator task.", - this.getSidecarAddress()); - } else { - logger.log(Level.WARNING, - "Unexpected failure completing the orchestrator task at {0}.", - this.getSidecarAddress()); - } - } - }); + this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); - logger.log(Level.FINEST, - String.format("Processing activity request: %s for instance: %s}", - activityRequest.getName(), - activityRequest.getOrchestrationInstance().getInstanceId())); - - // TODO: Error handling - this.workerPool.submit(() -> { - String output = null; - TaskFailureDetails failureDetails = null; - try { - output = taskActivityExecutor.execute( + + logger.log(Level.INFO, + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", activityRequest.getName(), - activityRequest.getInput().getValue(), - activityRequest.getTaskExecutionId(), - activityRequest.getTaskId()); - } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); - } - - OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse - .newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) - .setTaskId(activityRequest.getTaskId()) - .setCompletionToken(workItem.getCompletionToken()); - - if (output != null) { - responseBuilder.setResult(StringValue.of(output)); - } - - if (failureDetails != null) { - responseBuilder.setFailureDetails(failureDetails); - } - - try { - this.sidecarClient.completeActivityTask(responseBuilder.build()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, - "The sidecar at address {0} is unavailable while completing the activity task.", - this.getSidecarAddress()); - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, - "Durable Task worker has disconnected from {0} while completing the activity task.", - this.getSidecarAddress()); - } else { - logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", - this.getSidecarAddress()); - } - } - }); + activityRequest.getOrchestrationInstance().getInstanceId(), + Context.current())); + + this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); + } else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) { // No-op } else { @@ -325,4 +263,57 @@ private void shutDownWorkerPool() { private String getSidecarAddress() { return this.sidecarClient.getChannel().authority(); } + + /** + * Extracts trace context from the ActivityRequest's ParentTraceContext field + * and creates an OpenTelemetry Context with the parent span set. + * + * @param activityRequest The activity request containing the parent trace context + * @return A Context with the parent span set, or the current context if no trace context is present + */ + private Context extractTraceContext(OrchestratorService.ActivityRequest activityRequest) { + if (!activityRequest.hasParentTraceContext()) { + logger.log(Level.FINE, "No parent trace context in activity request"); + return Context.current(); + } + + OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext(); + String traceParent = traceContext.getTraceParent(); + + if (traceParent.isEmpty()) { + logger.log(Level.FINE, "Empty traceparent in activity request"); + return Context.current(); + } + + logger.log(Level.INFO, + String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceParent)); + + // Use W3CTraceContextPropagator to extract the trace context + Map carrier = new HashMap<>(); + carrier.put("traceparent", traceParent); + if (traceContext.hasTraceState()) { + carrier.put("tracestate", traceContext.getTraceState().getValue()); + } + + TextMapGetter> getter = new TextMapGetter>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } + }; + + + Context extractedContext = W3CTraceContextPropagator.getInstance() + .extract(Context.current(), carrier, getter); + + logger.log(Level.INFO, + String.format("Extracted trace context: %s", extractedContext)); + + return extractedContext; + } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java b/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java index f5d9d834ea..357946ab31 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java @@ -124,7 +124,13 @@ public boolean isCausedBy(Class exceptionClass) { } } - static String getFullStackTrace(Throwable e) { + /** + * Gets the full stack trace of the specified exception. + * + * @param e the exception + * @return the full stack trace of the exception + */ + public static String getFullStackTrace(Throwable e) { StackTraceElement[] elements = e.getStackTrace(); // Plan for 256 characters per stack frame (which is likely on the high-end) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index b2043b51ee..2c39820a7f 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -48,4 +48,10 @@ public interface TaskActivityContext { * @return the task id of the current task activity */ int getTaskId(); + + /** + * Gets the trace parent id for the current workflow execution. + * @return trace parent id + */ + String getTraceParent(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index a8ef6c67e0..c4c66e892a 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -16,11 +16,18 @@ import java.util.HashMap; import java.util.logging.Logger; -final class TaskActivityExecutor { +public final class TaskActivityExecutor { private final HashMap activityFactories; private final DataConverter dataConverter; private final Logger logger; + /** + * Constructor. + * + * @param activityFactories the activity factories to use for creating activities + * @param dataConverter the data converter to use for serializing and deserializing activity inputs and outputs + * @param logger the logger to use for logging + */ public TaskActivityExecutor( HashMap activityFactories, DataConverter dataConverter, @@ -30,7 +37,19 @@ public TaskActivityExecutor( this.logger = logger; } - public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable { + /** + * Executes an activity task. + * + * @param taskName the name of the activity task to execute + * @param input the serialized input payload for the activity task + * @param taskExecutionId Unique ID for the task execution. + * @param taskId Auto-incrementing ID for the task. + * @param traceParent The traceparent header value. + * @return the serialized output payload for the activity task, or null if the activity task returned null. + * @throws Throwable if an unhandled exception occurs during activity task execution. + */ + public String execute(String taskName, String input, + String taskExecutionId, int taskId, String traceParent) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -43,7 +62,8 @@ public String execute(String taskName, String input, String taskExecutionId, int String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } - TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId); + TaskActivityContextImpl context = new TaskActivityContextImpl( + taskName, input, taskExecutionId, taskId, traceParent); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -59,14 +79,17 @@ private class TaskActivityContextImpl implements TaskActivityContext { private final String rawInput; private final String taskExecutionId; private final int taskId; + private final String traceParent; private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; - public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) { + public TaskActivityContextImpl(String activityName, String rawInput, + String taskExecutionId, int taskId, String traceParent) { this.name = activityName; this.rawInput = rawInput; this.taskExecutionId = taskExecutionId; this.taskId = taskId; + this.traceParent = traceParent; } @Override @@ -92,5 +115,10 @@ public String getTaskExecutionId() { public int getTaskId() { return this.taskId; } + + @Override + public String getTraceParent() { + return this.traceParent; + } } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index df0c95ec82..63f852dc6e 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -411,7 +411,7 @@ default Task callSubOrchestrator(String name) { * @return a new {@link Task} that completes when the sub-orchestration completes or fails */ default Task callSubOrchestrator(String name, Object input) { - return this.callSubOrchestrator(name, input, null); + return this.callSubOrchestrator(name, input, Void.class); } /** diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 7a3436b036..e4bd7959a8 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -44,7 +44,7 @@ import java.util.function.IntFunction; import java.util.logging.Logger; -final class TaskOrchestrationExecutor { +public final class TaskOrchestrationExecutor { private static final String EMPTY_STRING = ""; private final HashMap orchestrationFactories; @@ -53,6 +53,15 @@ final class TaskOrchestrationExecutor { private final Duration maximumTimerInterval; private final String appId; + /** + * Creates a new TaskOrchestrationExecutor. + * + * @param orchestrationFactories map of orchestration names to their factories + * @param dataConverter converter for serializing/deserializing data + * @param maximumTimerInterval maximum duration for timer intervals + * @param logger logger for orchestration execution + * @param appId application ID for cross-app routing + */ public TaskOrchestrationExecutor( HashMap orchestrationFactories, DataConverter dataConverter, @@ -66,6 +75,13 @@ public TaskOrchestrationExecutor( this.appId = appId; // extracted from router } + /** + * Executes the orchestration with the given past and new events. + * + * @param pastEvents list of past history events + * @param newEvents list of new history events + * @return the result of the orchestrator execution + */ public TaskOrchestratorResult execute(List pastEvents, List newEvents) { ContextImplTask context = new ContextImplTask(pastEvents, newEvents); @@ -87,7 +103,7 @@ public TaskOrchestratorResult execute(List pas } catch (Exception e) { // The orchestrator threw an unhandled exception - fail it // TODO: What's the right way to log this? - logger.warning("The orchestrator failed with an unhandled exception: " + e.toString()); + logger.warning("The orchestrator failed with an unhandled exception: " + e); context.fail(new FailureDetails(e)); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index 705a41d5c0..c67ad9cc8c 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -18,12 +18,18 @@ import java.util.Collection; import java.util.Collections; -final class TaskOrchestratorResult { +public final class TaskOrchestratorResult { private final Collection actions; private final String customStatus; + /** + * Creates a new TaskOrchestratorResult. + * + * @param actions the collection of orchestrator actions + * @param customStatus the custom status of the orchestrator + */ public TaskOrchestratorResult(Collection actions, String customStatus) { this.actions = Collections.unmodifiableCollection(actions); ; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java new file mode 100644 index 0000000000..c9c0baa434 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java @@ -0,0 +1,182 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import com.google.protobuf.StringValue; +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.TaskActivityExecutor; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ActivityRunner extends DurableRunner { + private static final Logger logger = Logger.getLogger(ActivityRunner.class.getPackage().getName()); + + private final OrchestratorService.ActivityRequest activityRequest; + private final TaskActivityExecutor taskActivityExecutor; + + /** + * Constructor. + * + *

This class executes the activity requests

+ * + * @param workItem work item to be executed + * @param taskActivityExecutor executor for the activity + * @param sidecarClient sidecar client to communicate with the sidecar + * @param tracer tracer to be used for tracing + */ + public ActivityRunner( + OrchestratorService.WorkItem workItem, + TaskActivityExecutor taskActivityExecutor, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + super(workItem, sidecarClient, tracer); + this.activityRequest = workItem.getActivityRequest(); + this.taskActivityExecutor = taskActivityExecutor; + } + + @Override + public void run() { + if (tracer != null) { + runWithTracing(); + } else { + runWithoutTracing(); + } + } + + private void runWithTracing() { + Context parentContext = extractTraceContext(); + + Span span = tracer.spanBuilder("activity:" + activityRequest.getName()) + .setParent(parentContext) + .setSpanKind(SpanKind.INTERNAL) + .setAttribute("durabletask.task.instance_id", + activityRequest.getOrchestrationInstance().getInstanceId()) + .setAttribute("durabletask.task.id", activityRequest.getTaskId()) + .setAttribute("durabletask.activity.name", activityRequest.getName()) + .startSpan(); + + try (Scope scope = span.makeCurrent()) { + executeActivity(); + } catch (Throwable e) { + logger.log(Level.WARNING, "Failed to complete activity task.", e); + span.setStatus(StatusCode.ERROR, "Failed to complete activity task"); + span.recordException(e); + } finally { + span.end(); + } + } + + private void runWithoutTracing() { + try { + executeActivity(); + } catch (Throwable e) { + logger.log(Level.WARNING, "Failed to complete activity task.", e); + } + } + + private void executeActivity() throws Throwable { + String output = null; + OrchestratorService.TaskFailureDetails failureDetails = null; + Throwable failureException = null; + try { + output = taskActivityExecutor.execute( + activityRequest.getName(), + activityRequest.getInput().getValue(), + activityRequest.getTaskExecutionId(), + activityRequest.getTaskId(), + activityRequest.getParentTraceContext().getTraceParent()); + } catch (Throwable e) { + failureDetails = OrchestratorService.TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build(); + failureException = e; + } + + OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse + .newBuilder() + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setTaskId(activityRequest.getTaskId()) + .setCompletionToken(workItem.getCompletionToken()); + + if (output != null) { + responseBuilder.setResult(StringValue.of(output)); + } + + if (failureDetails != null) { + responseBuilder.setFailureDetails(failureDetails); + } + + try { + this.sidecarClient.completeActivityTask(responseBuilder.build()); + } catch (StatusRuntimeException e) { + logException(e); + throw e; + } + + if (failureException != null) { + throw failureException; + } + } + + private Context extractTraceContext() { + if (!activityRequest.hasParentTraceContext()) { + return Context.current(); + } + + OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext(); + String traceParent = traceContext.getTraceParent(); + + if (traceParent.isEmpty()) { + return Context.current(); + } + + Map carrier = new HashMap<>(); + carrier.put("traceparent", traceParent); + if (traceContext.hasTraceState()) { + carrier.put("tracestate", traceContext.getTraceState().getValue()); + } + + TextMapGetter> getter = new TextMapGetter<>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } + }; + + return W3CTraceContextPropagator.getInstance() + .extract(Context.current(), carrier, getter); + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java new file mode 100644 index 0000000000..b59aa58046 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java @@ -0,0 +1,66 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Tracer; + +import javax.annotation.Nullable; +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class DurableRunner implements Runnable { + private static final Logger logger = Logger.getLogger(DurableRunner.class.getPackage().getName()); + public final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; + public final OrchestratorService.WorkItem workItem; + @Nullable + public final Tracer tracer; + + /** + * Constructs a new instance of the DurableRunner. + * + * @param workItem the work item to be executed + * @param sidecarClient the sidecar client used to communicate with the durable task sidecar + * @param tracer the tracer used for tracing operations; can be null if tracing is not required + */ + public DurableRunner(OrchestratorService.WorkItem workItem, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + this.workItem = workItem; + this.sidecarClient = sidecarClient; + this.tracer = tracer; + } + + protected String getSidecarAddress() { + return this.sidecarClient.getChannel().authority(); + } + + protected void logException(StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, + "The sidecar at address {0} is unavailable while completing the activity task.", + this.sidecarClient.getChannel().authority()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, + "Durable Task worker has disconnected from {0} while completing the activity task.", + this.sidecarClient.getChannel().authority()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", + this.sidecarClient.getChannel().authority()); + } + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java new file mode 100644 index 0000000000..56eb47f670 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java @@ -0,0 +1,75 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import com.google.protobuf.StringValue; +import io.dapr.durabletask.TaskOrchestrationExecutor; +import io.dapr.durabletask.TaskOrchestratorResult; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Tracer; + +import javax.annotation.Nullable; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class OrchestratorRunner extends DurableRunner { + private static final Logger logger = Logger.getLogger(OrchestratorRunner.class.getPackage().getName()); + + private final OrchestratorService.OrchestratorRequest orchestratorRequest; + private final TaskOrchestrationExecutor taskOrchestrationExecutor; + + /** + * Constructs a new instance of the OrchestratorRunner class. + * + * @param workItem The work item containing details about the orchestrator task to be executed. + * @param taskOrchestrationExecutor The executor responsible for running task orchestration logic. + * @param sidecarClient The gRPC stub for communication with the Task Hub sidecar service. + * @param tracer An optional tracer used for distributed tracing, can be null. + */ + public OrchestratorRunner( + OrchestratorService.WorkItem workItem, + TaskOrchestrationExecutor taskOrchestrationExecutor, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + + super(workItem, sidecarClient, tracer); + this.orchestratorRequest = workItem.getOrchestratorRequest(); + this.taskOrchestrationExecutor = taskOrchestrationExecutor; + } + + @Override + public void run() { + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + + OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .addAllActions(taskOrchestratorResult.getActions()) + .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) + .setCompletionToken(workItem.getCompletionToken()) + .build(); + + try { + this.sidecarClient.completeOrchestratorTask(response); + logger.log(Level.FINEST, + "Completed orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId()); + } catch (StatusRuntimeException e) { + this.logException(e); + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 2391b8f635..229caaf88c 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -25,4 +25,5 @@ public interface WorkflowActivityContext { T getInput(Class targetType); + String getTraceParent(); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 8608e96937..be3280a992 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -20,7 +20,6 @@ import org.slf4j.Logger; import javax.annotation.Nullable; - import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -383,7 +382,7 @@ default Task callChildWorkflow(String name) { * @return a new {@link Task} that completes when the child-workflow completes or fails */ default Task callChildWorkflow(String name, Object input) { - return this.callChildWorkflow(name, input, null); + return this.callChildWorkflow(name, input, Void.class); } /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index d6f935e8f8..e5d648ff57 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -97,6 +97,11 @@ public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } + @Override + public String getTraceParent() { + return this.innerContext.getTraceParent(); + } + @Override public String getTaskExecutionId() { return this.innerContext.getTaskExecutionId(); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index b6ca38ecbc..feb2025f3a 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -38,7 +38,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DefaultWorkflowContextTest { private DefaultWorkflowContext context; @@ -284,7 +288,7 @@ public void callChildWorkflowWithName() { String expectedName = "TestActivity"; context.callChildWorkflow(expectedName); - verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, null); + verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, Void.class); } @Test diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java index 13cf3b6c6a..ceb6648e6c 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java @@ -5,7 +5,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -21,6 +23,7 @@ void shouldSuccessfullyCreateContextAndReturnCorrectValuesForAllMethods() { when(mockInnerContext.getName()).thenReturn("TestActivity"); when(mockInnerContext.getInput(any())).thenReturn("TestInput"); when(mockInnerContext.getTaskExecutionId()).thenReturn("TestExecutionId"); + when(mockInnerContext.getTraceParent()).thenReturn("00244654132154564654"); assertNotNull(context.getLogger()); assertEquals("TestActivity", context.getName()); @@ -29,6 +32,7 @@ void shouldSuccessfullyCreateContextAndReturnCorrectValuesForAllMethods() { assertEquals("TestInput", input); assertEquals("TestExecutionId", context.getTaskExecutionId()); + assertEquals("00244654132154564654", context.getTraceParent()); } @Test