⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions durabletask-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;

import javax.annotation.Nullable;
import java.io.FileInputStream;
Expand All @@ -41,6 +43,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

/**
Expand All @@ -57,6 +60,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final DataConverter dataConverter;
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final Tracer tracer;

DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
Expand Down Expand Up @@ -130,6 +134,13 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
sidecarGrpcChannel = this.managedSidecarChannel;
}

if (builder.tracer != null) {
this.tracer = builder.tracer;
} else {
//this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient");
this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow");
}

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
}

Expand Down Expand Up @@ -188,9 +199,12 @@ public String scheduleNewOrchestrationInstance(
builder.setScheduledStartTimestamp(ts);
}

AtomicReference<OrchestratorService.CreateInstanceResponse> response = new AtomicReference<>();

OrchestratorService.CreateInstanceRequest request = builder.build();
OrchestratorService.CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
response.set(this.sidecarClient.startInstance(request));

return response.get().getInstanceId();
}

@Override
Expand All @@ -208,6 +222,7 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)

OrchestratorService.RaiseEventRequest request = builder.build();
this.sidecarClient.raiseEvent(request);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.durabletask;

import io.grpc.Channel;
import io.opentelemetry.api.trace.Tracer;

/**
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
Expand All @@ -27,6 +28,7 @@ public final class DurableTaskGrpcClientBuilder {
String tlsCertPath;
String tlsKeyPath;
boolean insecure;
Tracer tracer;

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder grpcChannel(Channel channel) {
return this;
}

/**
* Sets the Tracer object to be used by DurableTaskClient to emit traces.
*
* @param tracer to be used by the DurableTaskClient
* @return this builder object
*/
public DurableTaskGrpcClientBuilder tracer(Tracer tracer) {
this.tracer = tracer;
return this;
}

/**
* Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@

package io.dapr.durabletask;

import com.google.protobuf.StringValue;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.dapr.durabletask.runner.ActivityRunner;
import io.dapr.durabletask.runner.OrchestratorRunner;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -50,6 +56,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final Duration maximumTimerInterval;
private final ExecutorService workerPool;
private final String appId; // App ID for cross-app routing
private final Tracer tracer;

private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;
Expand Down Expand Up @@ -81,12 +88,20 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
sidecarGrpcChannel = this.managedSidecarChannel;
}

this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow");

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();

ExecutorService rawExecutor = builder.executorService != null
? builder.executorService : Executors.newCachedThreadPool();
this.workerPool = Context.taskWrapping(rawExecutor);

this.isExecutorServiceManaged = builder.executorService == null;


}

/**
Expand Down Expand Up @@ -159,102 +174,25 @@ public void startAndBlock() {
while (workItemStream.hasNext()) {
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();

if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
logger.log(Level.FINEST,
logger.log(Level.INFO,
String.format("Processing orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId()));

// TODO: Error handling
this.workerPool.submit(() -> {
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.setCompletionToken(workItem.getCompletionToken())
.build();

try {
this.sidecarClient.completeOrchestratorTask(response);
logger.log(Level.FINEST,
"Completed orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the orchestrator task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the orchestrator task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING,
"Unexpected failure completing the orchestrator task at {0}.",
this.getSidecarAddress());
}
}
});
this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer));
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();
logger.log(Level.FINEST,
String.format("Processing activity request: %s for instance: %s}",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId()));

// TODO: Error handling
this.workerPool.submit(() -> {
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(

logger.log(Level.INFO,
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskExecutionId(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}

OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId())
.setCompletionToken(workItem.getCompletionToken());

if (output != null) {
responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
}

try {
this.sidecarClient.completeActivityTask(responseBuilder.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the activity task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the activity task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
this.getSidecarAddress());
}
}
});
activityRequest.getOrchestrationInstance().getInstanceId(),
Context.current()));

this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));

} else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) {
// No-op
} else {
Expand Down Expand Up @@ -325,4 +263,57 @@ private void shutDownWorkerPool() {
private String getSidecarAddress() {
return this.sidecarClient.getChannel().authority();
}

/**
* Extracts trace context from the ActivityRequest's ParentTraceContext field
* and creates an OpenTelemetry Context with the parent span set.
*
* @param activityRequest The activity request containing the parent trace context
* @return A Context with the parent span set, or the current context if no trace context is present
*/
private Context extractTraceContext(OrchestratorService.ActivityRequest activityRequest) {
if (!activityRequest.hasParentTraceContext()) {
logger.log(Level.FINE, "No parent trace context in activity request");
return Context.current();
}

OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext();
String traceParent = traceContext.getTraceParent();

if (traceParent.isEmpty()) {
logger.log(Level.FINE, "Empty traceparent in activity request");
return Context.current();
}

logger.log(Level.INFO,
String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceParent));

// Use W3CTraceContextPropagator to extract the trace context
Map<String, String> carrier = new HashMap<>();
carrier.put("traceparent", traceParent);
if (traceContext.hasTraceState()) {
carrier.put("tracestate", traceContext.getTraceState().getValue());
}

TextMapGetter<Map<String, String>> getter = new TextMapGetter<Map<String, String>>() {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}

@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
};


Context extractedContext = W3CTraceContextPropagator.getInstance()
.extract(Context.current(), carrier, getter);

logger.log(Level.INFO,
String.format("Extracted trace context: %s", extractedContext));

return extractedContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
}
}

static String getFullStackTrace(Throwable e) {
/**
* Gets the full stack trace of the specified exception.
*
* @param e the exception
* @return the full stack trace of the exception
*/
public static String getFullStackTrace(Throwable e) {
StackTraceElement[] elements = e.getStackTrace();

// Plan for 256 characters per stack frame (which is likely on the high-end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ public interface TaskActivityContext {
* @return the task id of the current task activity
*/
int getTaskId();

/**
* Gets the trace parent id for the current workflow execution.
* @return trace parent id
*/
String getTraceParent();
}
Loading
Loading