diff --git a/Cargo.lock b/Cargo.lock index d7fb578e722..1011044fad5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -1385,6 +1400,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.2.1" @@ -1857,6 +1886,27 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.4.0" @@ -6566,6 +6616,7 @@ version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 8850764d63b..509d239bf01 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -14,7 +14,7 @@ mod transport; pub use self::capabilities::NodeCapabilities; pub use self::ethereum_adapter::EthereumAdapter; pub use self::runtime::RuntimeAdapter; -pub use self::transport::Transport; +pub use self::transport::{Compression, Transport}; pub use env::ENV_VARS; pub use buffered_call_cache::BufferedCallCache; diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 882720e55f1..98240ddea07 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -314,6 +314,7 @@ mod tests { use graph::components::network_provider::ProviderManager; use graph::components::network_provider::ProviderName; use graph::data::value::Word; + use graph::http::HeaderMap; use graph::{ endpoint::EndpointMetrics, @@ -324,7 +325,9 @@ mod tests { }; use std::sync::Arc; - use crate::{EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport}; + use crate::{ + Compression, EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport, + }; use super::{EthereumNetworkAdapter, EthereumNetworkAdapters, NodeCapabilities}; @@ -395,6 +398,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -499,6 +503,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -571,6 +576,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -636,6 +642,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -924,6 +931,7 @@ mod tests { endpoint_metrics.clone(), "", false, + Compression::None, ); Arc::new( diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 4923e9c7b82..3d96473e14c 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -10,6 +10,27 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tower::Service; +/// Compression method for RPC requests. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum Compression { + #[default] + None, + Gzip, + Brotli, + Deflate, +} + +impl std::fmt::Display for Compression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Compression::None => write!(f, "none"), + Compression::Gzip => write!(f, "gzip"), + Compression::Brotli => write!(f, "brotli"), + Compression::Deflate => write!(f, "deflate"), + } + } +} + /// Abstraction over different transport types for Alloy providers. #[derive(Clone, Debug)] pub enum Transport { @@ -49,11 +70,24 @@ impl Transport { metrics: Arc, provider: impl AsRef, no_eip2718: bool, + compression: Compression, ) -> Self { - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .expect("Failed to build HTTP client"); + let mut client_builder = reqwest::Client::builder().default_headers(headers); + + match compression { + Compression::None => {} + Compression::Gzip => { + client_builder = client_builder.gzip(true); + } + Compression::Brotli => { + client_builder = client_builder.brotli(true); + } + Compression::Deflate => { + client_builder = client_builder.deflate(true); + } + } + + let client = client_builder.build().expect("Failed to build HTTP client"); let patching_transport = PatchingHttp::new(client, rpc, no_eip2718); let metrics_transport = diff --git a/graph/Cargo.toml b/graph/Cargo.toml index f5346202dbc..4934839cb57 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -26,7 +26,7 @@ diesel_derives = { workspace = true } chrono = "0.4.43" envconfig = { workspace = true } Inflector = "0.11.3" -reqwest = { version = "0.12.23", features = ["json", "stream", "multipart"] } +reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip", "brotli", "deflate"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/components/link_resolver/arweave.rs b/graph/src/components/link_resolver/arweave.rs index abd3d80a503..a9b66516dee 100644 --- a/graph/src/components/link_resolver/arweave.rs +++ b/graph/src/components/link_resolver/arweave.rs @@ -42,7 +42,7 @@ impl Default for ArweaveClient { Self { base_url: "https://arweave.net".parse().unwrap(), - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), logger: Logger::root(slog::Discard, o!()), } } @@ -53,7 +53,7 @@ impl ArweaveClient { Self { base_url, logger, - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), } } } diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 057e774d93e..69cc0bbe02f 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,6 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive", "compression/gzip"] }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index e417ad48e6f..7d851918c70 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -206,11 +206,13 @@ pub async fn create_ethereum_networks_for_chain( } let logger = logger.new(o!("provider" => provider.label.clone())); + let compression = web3.compression(); info!( logger, "Creating transport"; "url" => &web3.url, - "capabilities" => capabilities + "capabilities" => capabilities, + "compression" => compression.to_string() ); use crate::config::Transport::*; @@ -223,6 +225,7 @@ pub async fn create_ethereum_networks_for_chain( endpoint_metrics.cheap_clone(), &provider.label, no_eip2718, + compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index c06b5298ac0..9751844535f 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -17,7 +17,7 @@ use graph::{ }, }; use graph_chain_ethereum as ethereum; -use graph_chain_ethereum::NodeCapabilities; +use graph_chain_ethereum::{Compression, NodeCapabilities}; use graph_store_postgres::{DeploymentPlacer, Shard as ShardName, PRIMARY_SHARD}; use graph::http::{HeaderMap, Uri}; @@ -704,6 +704,18 @@ impl Web3Provider { } } + pub fn compression(&self) -> Compression { + if self.features.contains("compression/gzip") { + Compression::Gzip + } else if self.features.contains("compression/brotli") { + Compression::Brotli + } else if self.features.contains("compression/deflate") { + Compression::Deflate + } else { + Compression::None + } + } + pub fn limit_for(&self, node: &NodeId) -> SubgraphLimit { self.rules.limit_for(node) } @@ -715,7 +727,15 @@ impl Web3Provider { /// - `no_eip1898`: Provider doesn't support EIP-1898 (block parameter by hash/number object) /// - `no_eip2718`: Provider doesn't return the `type` field in transaction receipts. /// When set, receipts are patched to add `"type": "0x0"` for legacy transaction compatibility. -const PROVIDER_FEATURES: [&str; 4] = ["traces", "archive", "no_eip1898", "no_eip2718"]; +const PROVIDER_FEATURES: [&str; 7] = [ + "traces", + "archive", + "no_eip1898", + "no_eip2718", + "compression/gzip", + "compression/brotli", + "compression/deflate", +]; const DEFAULT_PROVIDER_FEATURES: [&str; 2] = ["traces", "archive"]; impl Provider { @@ -776,6 +796,18 @@ impl Provider { } } + let compression_count = web3 + .features + .iter() + .filter(|f| f.starts_with("compression/")) + .count(); + if compression_count > 1 { + return Err(anyhow!( + "at most one compression method allowed for provider {}", + self.label + )); + } + web3.url = shellexpand::env(&web3.url)?.into_owned(); let label = &self.label; @@ -1207,6 +1239,7 @@ mod tests { use graph::http::{HeaderMap, HeaderValue}; use graph::prelude::regex::Regex; use graph::prelude::{toml, NodeId}; + use graph_chain_ethereum::Compression; use std::collections::BTreeSet; use std::fs::read_to_string; use std::path::{Path, PathBuf}; @@ -1625,6 +1658,117 @@ mod tests { assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + #[test] + fn it_parses_web3_provider_with_gzip_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/gzip"] } + "#, + ) + .unwrap(); + + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features.insert("compression/gzip".to_string()); + + assert_eq!( + Provider { + label: "compressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features, + headers: HeaderMap::new(), + rules: Vec::new(), + }), + }, + actual + ); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Gzip); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_with_brotli_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/brotli"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Brotli); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_with_deflate_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/deflate"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Deflate); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_without_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "uncompressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::None); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_rejects_multiple_compression_features() { + let mut actual: Provider = toml::from_str( + r#" + label = "multi-comp" + details = { type = "web3", url = "http://localhost:8545", features = ["compression/gzip", "compression/brotli"] } + "#, + ) + .unwrap(); + + let err = actual.validate(); + assert!(err.is_err()); + let err = err.unwrap_err(); + assert!( + err.to_string() + .contains("at most one compression method allowed"), + "result: {:?}", + err + ); + } + #[test] fn duplicated_labels_are_not_allowed_within_chain() { let mut actual = toml::from_str::(