⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion rust/crates/sift_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ flate2 = "1.1.2"
indicatif = "0.18.0"
parquet = "56.2.0"
pbjson-types = { workspace = true }
prost = "0.13.5"
reqwest = "0.12.23"
sift_pbfs = { workspace = true }
sift_rs = { workspace = true }
tokio = { version = "1.47.1", features = ["full", "net", "time"] }
sift_stream = { workspace = true }
tokio = { version = "1.47.1", features = ["full", "net", "time", "macros", "rt-multi-thread", "signal"] }
tokio-stream = "0.1.17"
tonic = { workspace = true }
tonic-reflection = "0.12"
toml = "0.8.23"
zip = "6.0.0"

[dev-dependencies]
indoc = "2.0.6"

[dependencies.uuid]
version = "1.19.0"
features = ["v4"]

[build-dependencies]
tonic-build = "0.12"
29 changes: 29 additions & 0 deletions rust/crates/sift_cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ pub enum Cmd {
/// Import time series files into Sift
#[command(subcommand)]
Import(ImportCmd),

/// Start a test gRPC server for streaming.
#[command(subcommand)]
TestServer(TestServerCmd),
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -164,6 +168,31 @@ pub enum ConfigCmd {
Update(ConfigUpdateArgs),
}

#[derive(Subcommand)]
pub enum TestServerCmd {
/// Start a test ingestion server.
Run(TestServerArgs),
}

#[derive(clap::Args)]
pub struct TestServerArgs {
/// The address to serve gRPC server.
#[arg(short, long, default_value_t = String::from("0.0.0.0:50051"))]
pub local_address: String,

/// Whether to stream metrics to Sift.
#[arg(short, long)]
pub stream_metrics: bool,

/// The asset name to use when streaming server ingestion metrics.
#[arg(short, long)]
pub metrics_asset_name: Option<String>,

/// Include to use plain output. Use this option in scripts or when saving logs.
#[arg(short, long)]
pub plain_output: bool,
}

#[derive(clap::Args)]
pub struct ConfigUpdateArgs {
/// Edit or create a profile interactively (ignores other flags)
Expand Down
4 changes: 2 additions & 2 deletions rust/crates/sift_cli/src/cmd/import/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,14 @@ fn create_data_import_request<R: io::Read>(
let mut enum_configs = Vec::new();
let mut bit_field_configs = Vec::new();

if data_type == ChannelDataType::Enum.into() {
if data_type == ChannelDataType::Enum as i32 {
let Some(configs) = enum_configs_iter.next() else {
return Err(anyhow!(
"'{name}' was declared as type enum but --enum-config was not specified"
));
};
enum_configs = configs;
} else if data_type == ChannelDataType::BitField.into() {
} else if data_type == ChannelDataType::BitField as i32 {
let Some(configs) = bit_field_configs_iter.next() else {
return Err(anyhow!(
"'{name}' was declared as type bit-field but --bit-field-config was not specified"
Expand Down
1 change: 1 addition & 0 deletions rust/crates/sift_cli/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod completions;
pub mod config;
pub mod export;
pub mod import;
pub mod test_server;

pub struct Context {
pub grpc_uri: String,
Expand Down
127 changes: 127 additions & 0 deletions rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use super::Context;
use anyhow::{Ok, anyhow};
use crossterm::style::Stylize;
use sift_stream::{
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, SiftStream,
SiftStreamBuilder, TimeValue,
};

/// Streams metrics to Sift.
pub struct MetricsStreamingClient {
ctx: Context,
asset_name: String,
sift_stream: Option<SiftStream<IngestionConfigEncoder>>,
}

impl MetricsStreamingClient {
pub fn build(
ctx: Context,
stream_metrics: &bool,
asset_name: &Option<String>,
) -> Result<Option<MetricsStreamingClient>, anyhow::Error> {
if !stream_metrics {
return Ok(None);
}

let Some(asset_name) = asset_name else {
return Err(anyhow!(
"must specify {} with streaming enabled",
"--metrics_asset_name".cyan()
));
};

Ok(Some(MetricsStreamingClient {
ctx,
asset_name: asset_name.clone(),
sift_stream: None,
}))
}

/// Initialize SiftStream and create ingestion config.
pub async fn initialize(&mut self) -> Result<(), anyhow::Error> {
let credentials = Credentials::Config {
apikey: self.ctx.api_key.clone(),
uri: self.ctx.grpc_uri.clone(),
};

let ingestion_config = IngestionConfigForm {
asset_name: self.asset_name.to_string(),
client_key: "stress-test-ingestion-config-test".into(),
flows: vec![FlowConfig {
name: "metrics".into(),
channels: vec![
ChannelConfig {
name: "total_num_streams".into(),
description: "Total number of streams created".into(),
data_type: ChannelDataType::Uint32.into(),
..Default::default()
},
ChannelConfig {
name: "total_num_bytes_read".into(),
description: "Total number of bytes read".into(),
unit: "B".into(),
data_type: ChannelDataType::Uint64.into(),
..Default::default()
},
ChannelConfig {
name: "total_num_messages".into(),
description: "Total number of messages received".into(),
unit: "message".into(),
data_type: ChannelDataType::Uint64.into(),
..Default::default()
},
ChannelConfig {
name: "bytes_per_s".into(),
description: "Number of bytes received per second".into(),
data_type: ChannelDataType::Double.into(),
unit: "B/s".into(),
..Default::default()
},
ChannelConfig {
name: "messages_per_s".into(),
description: "Number of messages received per second".into(),
unit: "message/s".into(),
data_type: ChannelDataType::Double.into(),
..Default::default()
},
],
}],
};

let sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.recovery_strategy(RecoveryStrategy::RetryOnly(RetryPolicy::default()))
.build()
.await?;

self.sift_stream = Some(sift_stream);

Ok(())
}

/// Send metrics to Sift.
pub async fn ingest(&mut self, metrics: Metrics) {
let flow = Flow::new(
"metrics",
TimeValue::now(),
&[
ChannelValue::new("total_num_streams", metrics.total_num_streams),
ChannelValue::new("total_num_bytes_read", metrics.total_num_bytes_read),
ChannelValue::new("total_num_messages", metrics.total_num_messages),
ChannelValue::new("bytes_per_s", metrics.bytes_per_s),
ChannelValue::new("messages_per_s", metrics.messages_per_s),
],
);

self.sift_stream.as_mut().unwrap().send(flow).await.unwrap();
}
}

pub struct Metrics {
pub total_num_streams: u32,
pub total_num_bytes_read: u64,
pub total_num_messages: u64,
pub bytes_per_s: f64,
pub messages_per_s: f64,
}
115 changes: 115 additions & 0 deletions rust/crates/sift_cli/src/cmd/test_server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use super::Context;
use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient;
use crate::{cli::TestServerArgs, util::tty::Output};
use anyhow::{Context as AnyhowContext, Result};
use server::TestServer;
use sift_rs::assets::v1::asset_service_server::AssetServiceServer;
use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer;
use sift_rs::ingestion_configs::v2::ingestion_config_service_server::IngestionConfigServiceServer;
use sift_rs::ping::v1::ping_service_server::PingServiceServer;
use std::process::ExitCode;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tonic::transport::Server;
use tonic_reflection::server::Builder;

pub mod metrics_streaming_client;
pub mod server;
use crate::cmd::test_server::metrics_streaming_client::Metrics;

pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> {
let addr = args.local_address.parse().context(format!(
"failed to parse local_address: {}",
args.local_address
))?;

// Initialize streaming client.
let mut streaming_client =
MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)
.context("failed to create metrics streaming client")?;

if let Some(client) = streaming_client.as_mut() {
client
.initialize()
.await
.context("failed to initialize streaming client")?;
}

// Channel to signal program exit.
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let mut shutdown_rx2 = shutdown_rx.clone();

// Channel to send metrics.
let (metrics_tx, mut metrics_rx) = mpsc::channel::<Metrics>(1024);

// Initialize gRPC server.
let server = Arc::new(TestServer::default());

// Start task to calculate ingestion metrics.
let server_arc = Arc::clone(&server);
let calc_stats_task = tokio::spawn(async move {
server_arc
.calculate_metrics(
&mut shutdown_rx,
metrics_tx,
args.stream_metrics,
args.plain_output,
)
.await
.context("calculate metrics task failed")
.unwrap();
});

// Start task to ingest metrics to Sift.
let ingest_metrics_task = tokio::spawn(async move {
if let Some(client) = streaming_client.as_mut() {
loop {
tokio::select! {
_ = shutdown_rx2.changed() => {
Output::new().line("Ingest task shutting down").print();
break;
}
Some(metrics) = metrics_rx.recv() => {
client.ingest(metrics).await;
}
};
}
}
});

let reflection_service = Builder::configure()
.register_encoded_file_descriptor_set(sift_rs::assets::v1::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sift_rs::ingest::v1::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sift_rs::ingestion_configs::v2::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sift_rs::ping::v1::FILE_DESCRIPTOR_SET)
.build_v1alpha()
.context("failed to create gRPC reflection service")?;

Output::new()
.line(format!("Server listening on {addr}"))
.print();

Server::builder()
.add_service(reflection_service)
.add_service(PingServiceServer::from_arc(server.clone()))
.add_service(IngestServiceServer::from_arc(server.clone()))
.add_service(IngestionConfigServiceServer::from_arc(server.clone()))
.add_service(AssetServiceServer::from_arc(server.clone()))
.serve_with_shutdown(addr, async move {
tokio::signal::ctrl_c().await.unwrap();
let _ = shutdown_tx.send(true);
})
.await?;

calc_stats_task
.await
.context("failed to await calculation task")?;
ingest_metrics_task
.await
.context("failed to await ingestion task")?;

Output::new().line("Exiting.").print();

Ok(ExitCode::SUCCESS)
}
Loading
Loading