⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture.

Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.

## Pub-Sub Sample using the Java-SDK

This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5).
Expand Down Expand Up @@ -41,45 +41,80 @@ cd examples

Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.

### Running the subscriber

The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface.
## Running the Subscriber

The subscriber uses the `DaprPreviewClient` interface to subscribe to events via streaming and process them using reactive operators.

The SDK provides two ways to subscribe to events:

The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
### Option 1: Raw Data Subscription

In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
Use `TypeRef.STRING` (or any other type) to receive the deserialized message data directly:

```java
public class Subscriber {

// ...
public static void main(String[] args) throws Exception {
try (var client = new DaprClientBuilder().buildPreviewClient()) {
// Subscribe to events - receives raw String data directly
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast();
}
}
}
```

### Option 2: CloudEvent Subscription

Use `TypeRef<CloudEvent<String>>` to receive the full CloudEvent with metadata (ID, source, type, etc.):

```java
public class SubscriberCloudEvent {

public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
// Subscribe to events - receives CloudEvent<String> with full metadata
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
.doOnNext(cloudEvent -> {
System.out.println("Received CloudEvent:");
System.out.println(" ID: " + cloudEvent.getId());
System.out.println(" Type: " + cloudEvent.getType());
System.out.println(" Data: " + cloudEvent.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
.blockLast();
}
}

// ...
}
```

Execute the following command to run the Subscriber example:
### Subscription with Metadata

You can also pass metadata to the subscription, for example to enable raw payload mode:

```java
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true"))
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
.blockLast();
```

### Subscription Lifecycle

The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.

## Running the Examples

Execute the following command to run the raw data Subscriber example:

<!-- STEP
name: Run Subscriber
Expand All @@ -97,6 +132,12 @@ dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar t

<!-- END_STEP -->

Or run the CloudEvent Subscriber example:

```bash
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent
```

Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:

<!-- STEP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,47 @@

/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the subscriber:
* dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
*
* <p>This example demonstrates subscribing to raw message data directly.
* For CloudEvent subscription with metadata, see {@link SubscriberCloudEvent}.
*
* <p>Usage:
* <ol>
* <li>Build and install jars: {@code mvn clean install}
* <li>cd [repo root]/examples
* <li>Run the subscriber:
* {@code dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber}
* </ol>
*/
public class Subscriber {

//The title of the topic to be used for publishing
private static final String DEFAULT_TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";

/**
* This is the entry point for this example app, which subscribes to a topic.
* Main entry point for the raw data subscriber example.
*
* @param args Used to optionally pass a topic name.
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
System.out.println("Subscribing to topic: " + topicName);

// Subscribe to events - receives raw String data directly
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
.blockLast();
}
}

/**
* If a topic is specified in args, use that.
* Else, fallback to the default topic.
* @param args program arguments
* @return name of the topic to publish messages to.
*/
private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.stream;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;

/**
* Subscriber using bi-directional gRPC streaming with CloudEvent metadata.
*
* <p>This example demonstrates subscribing to CloudEvent objects which include
* metadata like event ID, source, type, and other CloudEvents specification fields.
* For raw message data subscription, see {@link Subscriber}.
*
* <p>Usage:
* <ol>
* <li>Build and install jars: {@code mvn clean install}
* <li>cd [repo root]/examples
* <li>Run the subscriber:
* {@code dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar \
* io.dapr.examples.pubsub.stream.SubscriberCloudEvent}
* </ol>
*/
public class SubscriberCloudEvent {

private static final String DEFAULT_TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";

/**
* Main entry point for the CloudEvent subscriber example.
*
* @param args Used to optionally pass a topic name.
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
System.out.println("Subscribing to topic: " + topicName + " (CloudEvent mode)");

// Subscribe to events - receives CloudEvent<String> with full metadata
// Use TypeRef<CloudEvent<String>> to get CloudEvent wrapper with metadata
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
.doOnNext(cloudEvent -> {
System.out.println("Received CloudEvent:");
System.out.println(" ID: " + cloudEvent.getId());
System.out.println(" Source: " + cloudEvent.getSource());
System.out.println(" Type: " + cloudEvent.getType());
System.out.println(" Topic: " + cloudEvent.getTopic());
System.out.println(" PubSub: " + cloudEvent.getPubsubName());
System.out.println(" Data: " + cloudEvent.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast();
}
}

private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
}
return DEFAULT_TOPIC_NAME;
}
}
Loading