diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml new file mode 100644 index 000000000..620584030 --- /dev/null +++ b/impl/grpc/pom.xml @@ -0,0 +1,48 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-grpc + Serverless Workflow :: Impl :: gRPC + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + io.grpc + grpc-stub + + + com.google.protobuf + protobuf-java + 3.25.8 + + + com.google.protobuf + protobuf-java-util + 3.25.8 + + + com.github.os72 + protoc-jar + ${version.com.github.os72.protoc.jar} + + + io.grpc + grpc-protobuf + + + \ No newline at end of file diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java new file mode 100644 index 000000000..a111e9dd0 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.DescriptorProtos; + +public record FileDescriptorContext( + DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java new file mode 100644 index 000000000..1f416a9a5 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +@FunctionalInterface +public interface FileDescriptorContextSupplier { + + FileDescriptorContext get( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java new file mode 100644 index 000000000..638d5c513 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Optional; + +public class FileDescriptorReader { + + public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { + Path grpcDir = + tryCreateTempGrpcDir() + .orElseThrow( + () -> new IllegalStateException("Could not create temporary gRPC directory")); + + try (InputStream inputStream = externalResourceHandler.open()) { + + Path protoFile = grpcDir.resolve(externalResourceHandler.name()); + if (!Files.exists(protoFile)) { + Files.createDirectories(protoFile); + } + + Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING); + + Path descriptorOutput = grpcDir.resolve("descriptor.protobin"); + + try { + + generateFileDescriptor(grpcDir, protoFile, descriptorOutput); + + DescriptorProtos.FileDescriptorSet fileDescriptorSet = + DescriptorProtos.FileDescriptorSet.newBuilder() + .mergeFrom(Files.readAllBytes(descriptorOutput)) + .build(); + + return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name()); + + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to read external resource handler: " + externalResourceHandler.name(), e); + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to read descriptor file", e); + } + } + + private Optional tryCreateTempGrpcDir() { + try { + return Optional.of(Files.createTempDirectory("serverless-workflow-")); + } catch (IOException e) { + throw new UncheckedIOException("Error while creating temporary gRPC directory", e); + } + } + + /** + * Calls protoc binary with --descriptor_set_out= option set. + * + * @param grpcDir a temporary directory + * @param protoFile the .proto file used by protoc to generate the file descriptor + * @param descriptorOutput the output directory where the descriptor file will be generated + */ + private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) { + String[] protocArgs = + new String[] { + "--include_imports", + "--descriptor_set_out=" + descriptorOutput.toAbsolutePath(), + "-I", + grpcDir.toAbsolutePath().toString(), + protoFile.toAbsolutePath().toString() + }; + + try { + + int status = Protoc.runProtoc(protocArgs); + + // ProcessBuilder processBuilder = new ProcessBuilder(protocArgs); + // int status = ScriptUtils.uncheckedStart(processBuilder).waitFor(); + + if (status != 0) { + throw new RuntimeException( + "Unable to generate file descriptor, 'protoc' execution failed with status " + status); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to generate file descriptor", e); + } catch (IOException e) { + throw new UncheckedIOException("Unable to generate file descriptor", e); + } + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java new file mode 100644 index 000000000..8de978253 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; + +@FunctionalInterface +public interface GrpcCallExecutor { + + WorkflowModel apply( + GrpcRequestContext requestContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model, + Map arguments); +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java new file mode 100644 index 000000000..c13f5de5a --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; + +public class GrpcChannelResolver { + + public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; + + public static Channel channel( + WorkflowContext workflowContext, + TaskContext taskContext, + GrpcRequestContext grpcRequestContext) { + WorkflowApplication appl = workflowContext.definition().application(); + return appl.additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) + .orElseGet( + () -> + ManagedChannelBuilder.forAddress( + grpcRequestContext.address(), grpcRequestContext.port()) + .usePlaintext() + .build()); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java new file mode 100644 index 000000000..36055c476 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class GrpcExecutor implements CallableTask { + + private final GrpcRequestContext requestContext; + private final GrpcCallExecutor grpcCallExecutor; + private final WorkflowValueResolver> arguments; + + public GrpcExecutor( + GrpcRequestContext builder, + GrpcCallExecutor grpcCallExecutor, + WorkflowValueResolver> arguments) { + this.requestContext = builder; + this.grpcCallExecutor = grpcCallExecutor; + this.arguments = arguments; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + + Map arguments = this.arguments.apply(workflowContext, taskContext, input); + + return CompletableFuture.supplyAsync( + () -> + this.grpcCallExecutor.apply( + requestContext, workflowContext, taskContext, input, arguments)); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java new file mode 100644 index 000000000..7960bbcad --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -0,0 +1,230 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.ServiceDescriptor; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ClientCalls; +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.ExternalResource; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.WithGRPCService; +import io.serverlessworkflow.impl.*; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class GrpcExecutorBuilder implements CallableTaskBuilder { + + private ExternalResource proto; + private GrpcRequestContext grpcRequestContext; + private GrpcCallExecutor callExecutor; + private WorkflowValueResolver> arguments; + + @Override + public boolean accept(Class clazz) { + return clazz.equals(CallGRPC.class); + } + + @Override + public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) { + + GRPCArguments with = task.getWith(); + WithGRPCService service = with.getService(); + this.proto = with.getProto(); + + this.arguments = + WorkflowUtils.buildMapResolver( + definition.application(), + with.getArguments() != null ? with.getArguments().getAdditionalProperties() : Map.of()); + + this.grpcRequestContext = + new GrpcRequestContext( + service.getHost(), service.getPort(), with.getMethod(), service.getName()); + + FileDescriptorReader fileDescriptorReader = new FileDescriptorReader(); + FileDescriptorContext fileDescriptorContext = + definition + .resourceLoader() + .loadStatic(with.getProto().getEndpoint(), fileDescriptorReader::readDescriptor); + + this.callExecutor = + (requestContext, workflowContext, taskContext, model, arguments) -> { + Channel channel = + GrpcChannelResolver.channel(workflowContext, taskContext, this.grpcRequestContext); + String protoName = fileDescriptorContext.inputProto(); + + DescriptorProtos.FileDescriptorProto fileDescriptorProto = + fileDescriptorContext.fileDescriptorSet().getFileList().stream() + .filter( + file -> + file.getName() + .equals( + this.proto.getName() != null ? this.proto.getName() : protoName)) + .findFirst() + .orElseThrow( + () -> new IllegalStateException("Proto file not found in descriptor set")); + + try { + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[] {}); + + ServiceDescriptor serviceDescriptor = + fileDescriptor.findServiceByName(this.grpcRequestContext.service()); + + Objects.requireNonNull( + serviceDescriptor, "Service not found: " + this.grpcRequestContext.service()); + + Descriptors.MethodDescriptor methodDescriptor = + serviceDescriptor.findMethodByName(this.grpcRequestContext.method()); + + Objects.requireNonNull( + methodDescriptor, "Method not found: " + this.grpcRequestContext.method()); + + MethodType methodType = ProtobufMessageUtils.getMethodType(methodDescriptor); + + ClientCall call = + buildClientCall(channel, methodType, serviceDescriptor, methodDescriptor); + + return switch (methodType) { + case CLIENT_STREAMING -> + handleClientStreaming(workflowContext, arguments, methodDescriptor, call); + case BIDI_STREAMING -> + handleBidiStreaming(workflowContext, arguments, methodDescriptor, call); + case SERVER_STREAMING -> + handleServerStreaming(workflowContext, methodDescriptor, arguments, call); + case UNARY, UNKNOWN -> + handleBlockingUnary(workflowContext, methodDescriptor, arguments, call); + }; + + } catch (Descriptors.DescriptorValidationException + | InvalidProtocolBufferException + | JsonProcessingException e) { + throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); + } + }; + } + + private static ClientCall buildClientCall( + Channel channel, + MethodType methodType, + ServiceDescriptor serviceDescriptor, + Descriptors.MethodDescriptor methodDescriptor) { + return channel.newCall( + MethodDescriptor.newBuilder() + .setType(methodType) + .setFullMethodName( + MethodDescriptor.generateFullMethodName( + serviceDescriptor.getFullName(), methodDescriptor.getName())) + .setRequestMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())) + .setResponseMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())) + .build(), + CallOptions.DEFAULT.withWaitForReady()); + } + + private static WorkflowModel handleClientStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + JsonNode jsonNode = + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0)); + return workflowContext.definition().application().modelFactory().fromAny(jsonNode); + } + + private static WorkflowModel handleServerStreaming( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + List nodes = new ArrayList<>(); + ClientCalls.blockingServerStreamingCall(call, builder.build()) + .forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message))); + return workflowContext.definition().application().modelFactory().fromAny(nodes); + } + + private static WorkflowModel handleBlockingUnary( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + + Message message = ClientCalls.blockingUnaryCall(call, builder.build()); + return workflowContext + .definition() + .application() + .modelFactory() + .fromAny(ProtobufMessageUtils.convert(message)); + } + + private static WorkflowModel handleBidiStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + return workflowContext + .definition() + .application() + .modelFactory() + .fromAny( + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), + v -> { + Collection nodes = v; + List list = new ArrayList<>(nodes); + return JsonUtils.fromValue(list); + })); + } + + @Override + public CallableTask build() { + return new GrpcExecutor(this.grpcRequestContext, this.callExecutor, this.arguments); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java new file mode 100644 index 000000000..2abab6464 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +public record GrpcRequestContext(String address, int port, String method, String service) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java new file mode 100644 index 000000000..384b449c9 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.grpc.MethodDescriptor; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.api.WorkflowFormat; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +public interface ProtobufMessageUtils { + + static JsonNode convert(Message message) { + StringBuilder str = new StringBuilder(); + try { + JsonFormat.printer().appendTo(message, str); + return WorkflowFormat.JSON.mapper().readTree(str.toString()); + } catch (IOException e) { + throw new UncheckedIOException("Error converting protobuf message to JSON", e); + } + } + + static MethodDescriptor.MethodType getMethodType( + com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { + DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); + if (methodDescProto.getClientStreaming()) { + if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.BIDI_STREAMING; + } + return MethodDescriptor.MethodType.CLIENT_STREAMING; + } else if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.SERVER_STREAMING; + } else { + return MethodDescriptor.MethodType.UNARY; + } + } + + static JsonNode asyncStreamingCall( + Map parameters, + com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + UnaryOperator> streamObserverFunction, + Function, JsonNode> nodesFunction) { + WaitingStreamObserver responseObserver = new WaitingStreamObserver(); + StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); + + for (var entry : parameters.entrySet()) { + try { + Message message = + buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build(); + requestObserver.onNext(message); + } catch (Exception e) { + requestObserver.onError(e); + throw new RuntimeException(e); + } + responseObserver.checkForServerStreamErrors(); + } + requestObserver.onCompleted(); + + return nodesFunction.apply( + responseObserver.get().stream() + .map(ProtobufMessageUtils::convert) + .collect(Collectors.toList())); + } + + static Message.Builder buildMessage(Object object, Message.Builder builder) + throws InvalidProtocolBufferException, JsonProcessingException { + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder); + return builder; + } + + static Message.Builder buildMessage( + Descriptors.MethodDescriptor methodDescriptor, Map parameters) + throws InvalidProtocolBufferException, JsonProcessingException { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder); + return builder; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java new file mode 100644 index 000000000..fe73d5cd3 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class WaitingStreamObserver implements StreamObserver { + + List responses = new ArrayList<>(); + CompletableFuture> responsesFuture = new CompletableFuture<>(); + + @Override + public void onNext(Message messageReply) { + responses.add(messageReply); + } + + @Override + public void onError(Throwable throwable) { + responsesFuture.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + responsesFuture.complete(responses); + } + + public List get() { + int defaultTimeout = 10000; + + try { + return responsesFuture.get(defaultTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (TimeoutException e) { + throw new IllegalStateException( + String.format("gRPC call timed out after %d seconds", defaultTimeout), e); + } catch (ExecutionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + + public void checkForServerStreamErrors() { + if (responsesFuture.isCompletedExceptionally()) { + try { + responsesFuture.join(); + } catch (CompletionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + } + + private String getServerStreamErrorMessage(Throwable throwable) { + return String.format( + "Received an error through gRPC server stream with status: %s", + Status.fromThrowable(throwable)); + } +} diff --git a/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 000000000..6acd9ba63 --- /dev/null +++ b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.grpc.GrpcExecutorBuilder \ No newline at end of file diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java index fc40b2576..c26ea3eee 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java @@ -20,10 +20,13 @@ import io.serverlessworkflow.impl.WorkflowContext; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.ClientRequestFilter; +import java.util.Optional; public class HttpClientResolver { public static final String HTTP_CLIENT_PROVIDER = "httpClientProvider"; + public static final String HTTP_CLIENT_FILTER_PROVIDER = "httpClientFilterProvider"; private static class DefaultHolder { private static final Client client = ClientBuilder.newClient(); @@ -35,5 +38,12 @@ public static Client client(WorkflowContext workflowContext, TaskContext taskCon .orElseGet(() -> DefaultHolder.client); } + public static Optional clientRequestFilter( + WorkflowContext workflowContext, TaskContext taskContext) { + WorkflowApplication application = workflowContext.definition().application(); + return application.additionalObject( + HTTP_CLIENT_FILTER_PROVIDER, workflowContext, taskContext); + } + private HttpClientResolver() {} } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java index c787cd2bd..1413e582e 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java @@ -21,6 +21,7 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; +import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.Invocation.Builder; import jakarta.ws.rs.client.WebTarget; import java.net.URI; @@ -69,7 +70,12 @@ public CompletableFuture apply( p.apply(workflow, taskContext, input))) .orElse(uriSupplier.apply(workflow, taskContext, input)); - WebTarget target = HttpClientResolver.client(workflow, taskContext).target(uri); + Client client = HttpClientResolver.client(workflow, taskContext); + + HttpClientResolver.clientRequestFilter(workflow, taskContext).ifPresent(client::register); + + WebTarget target = client.target(uri); + for (Entry entry : queryMap.map(q -> q.apply(workflow, taskContext, input)).orElse(Map.of()).entrySet()) { target = target.queryParam(entry.getKey(), entry.getValue()); diff --git a/impl/pom.xml b/impl/pom.xml index 7c4f17133..166f96f0f 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.1 + 3.11.4 @@ -105,6 +106,11 @@ serverlessworkflow-impl-container ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-grpc + ${project.version} + net.thisptr jackson-jq @@ -193,5 +199,6 @@ test javascript python + grpc diff --git a/impl/test/pom.xml b/impl/test/pom.xml index fd47aae9c..2363c8d72 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -53,6 +53,10 @@ io.serverlessworkflow serverlessworkflow-impl-container + + io.serverlessworkflow + serverlessworkflow-impl-grpc + org.glassfish.jersey.core jersey-client @@ -85,6 +89,12 @@ org.awaitility awaitility + + + io.grpc + grpc-netty + test + @@ -95,7 +105,32 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier} + + + + + test-compile + test-compile-custom + + + + maven-jar-plugin diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java new file mode 100644 index 000000000..aa34497aa --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorBiDiStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcBiDirectionalStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorBiDiStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-bidi-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java new file mode 100644 index 000000000..52ffc88aa --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcClientStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorClientStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + Map output = + workflowDefinition.instance(Map.of()).start().join().asMap().orElseThrow(); + + Assertions.assertThat(output) + .contains(Map.entry("message", "dependabot[bot] has 1 contributions")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java new file mode 100644 index 000000000..78ac67ca2 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorServerStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcServerStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorServerStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-server-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java new file mode 100644 index 000000000..dab0fbd39 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorUnaryArgsExprHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryArgsExprTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorUnaryArgsExprHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-unary-args-expr-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + // + // String filename = + // getClass() + // .getClassLoader() + // .getResource("workflows-samples/grpc/proto/contributors.proto") + // .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("github", "bootable[origin]")) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java new file mode 100644 index 000000000..6205ff27e --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.PersonUnaryHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonUnaryHandler()).build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath("workflows-samples/grpc/get-person-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto") + .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("protoFilePath", "file://" + filename)) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java new file mode 100644 index 000000000..7ee446fe8 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.BiDirectionalStreamingGrpc.BiDirectionalStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorBiDiStreamingHandler extends BiDirectionalStreamingImplBase { + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Contribution " + i + 1 + "added successfully") + .build(); + responseObserver.onNext(addContributionResponse); + } + } + + @Override + public void onError(Throwable t) { + // no-op + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java new file mode 100644 index 000000000..cdf5900f3 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.ClientStreamingGrpc.ClientStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +public class ContributorClientStreamingHandler extends ClientStreamingImplBase { + + private static final Map GITHUBS = new ConcurrentHashMap<>(); + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver<>() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + String github = value.getGithub(); + GITHUBS.compute( + github, + (key, counter) -> { + if (counter == null) { + LongAdder longAdder = new LongAdder(); + longAdder.increment(); + return longAdder; + } + counter.increment(); + return counter; + }); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() { + StringBuilder stringBuilder = new StringBuilder(); + Set> entries = GITHUBS.entrySet(); + for (Map.Entry entry : entries) { + stringBuilder + .append(entry.getKey()) + .append(" has ") + .append(entry.getValue()) + .append(" contributions"); + } + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage(stringBuilder.toString()) + .build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java new file mode 100644 index 000000000..53979739c --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import io.serverlessworkflow.impl.executors.grpc.contributors.ServerStreamingGrpc.ServerStreamingImplBase; + +public class ContributorServerStreamingHandler extends ServerStreamingImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success: (" + (i + 1) + ")") + .build(); + responseObserver.onNext(addContributionResponse); + } + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java new file mode 100644 index 000000000..1ac2f2ebe --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import static io.serverlessworkflow.impl.executors.grpc.contributors.UnaryArgsExprGrpc.UnaryArgsExprImplBase; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorUnaryArgsExprHandler extends UnaryArgsExprImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success with " + request.getGithub()) + .build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java new file mode 100644 index 000000000..e72f46138 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import person.PersonGrpc.PersonImplBase; +import person.PersonOuterClass; + +public class PersonUnaryHandler extends PersonImplBase { + + @Override + public void getPerson( + PersonOuterClass.GetPersonRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + PersonOuterClass.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/proto/contributors.proto b/impl/test/src/test/proto/contributors.proto new file mode 100644 index 000000000..a7cb71e09 --- /dev/null +++ b/impl/test/src/test/proto/contributors.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +service ClientStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + +service ServerStreaming { + rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service BiDirectionalStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service UnaryArgsExpr { + rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} +} \ No newline at end of file diff --git a/impl/test/src/test/proto/person.proto b/impl/test/src/test/proto/person.proto new file mode 100644 index 000000000..b86dd98f4 --- /dev/null +++ b/impl/test/src/test/proto/person.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +message GetCarRequest {} + +message GetCarResponse { + required string name = 1; + required string brand = 2; +} + +service Person { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} + +service Car { + rpc GetCar(GetCarRequest) returns (GetCarResponse); +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml new file mode 100644 index 000000000..e5ab493ae --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: BiDirectionalStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml new file mode 100644 index 000000000..e7d089fc5 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ClientStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml new file mode 100644 index 000000000..c74c68b4b --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ServerStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml new file mode 100644 index 000000000..5d4ec2c9b --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: UnaryArgsExpr + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: ${ .github } \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml new file mode 100644 index 000000000..221c373ec --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/person.proto + service: + name: Person + host: localhost + port: 5011 + method: GetPerson \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto new file mode 100644 index 000000000..a7cb71e09 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +service ClientStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + +service ServerStreaming { + rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service BiDirectionalStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service UnaryArgsExpr { + rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto new file mode 100644 index 000000000..b86dd98f4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +message GetCarRequest {} + +message GetCarResponse { + required string name = 1; + required string brand = 2; +} + +service Person { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} + +service Car { + rpc GetCar(GetCarRequest) returns (GetCarResponse); +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1cc54e0dc..540f384b4 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ 3.6.2 3.5.4 3.2.8 + 1.78.0 3.5.0 ${java.version} 1.2.2 @@ -80,6 +81,7 @@ 4.3.0 + 4.32.1 1.5.25 2.21.0 2.21 @@ -203,6 +205,27 @@ jakarta.validation-api ${version.jakarta.validation} + + io.grpc + grpc-netty + ${version.io.grpc.java} + runtime + + + io.grpc + grpc-protobuf + ${version.io.grpc.java} + + + io.grpc + grpc-stub + ${version.io.grpc.java} + + + com.google.protobuf + protobuf-java + ${version.com.google.protobuf.java} +