diff --git a/Cargo.lock b/Cargo.lock index 888f952a26..8696daebea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1809,6 +1809,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.5.0" @@ -7029,12 +7035,16 @@ dependencies = [ "bytes", "cfg-if", "combine", + "crc16", + "futures-sink", "futures-util", "itoa", + "log", "num-bigint", "percent-encoding", "pin-project-lite", "r2d2", + "rand 0.9.2", "ryu", "sha1_smol", "socket2 0.6.1", diff --git a/Cargo.toml b/Cargo.toml index f49a75fb4b..b04c0ddf0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ const_format = "0.2.34" daedalus = { path = "packages/daedalus" } dashmap = "6.1.0" data-url = "0.3.2" -deadpool-redis = "0.22.0" +deadpool-redis = { version ="0.22.0", features = ["cluster-async"] } derive_more = "2.0.1" directories = "6.0.0" dirs = "6.0.0" diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index f3cedf4771..a2fbd06aca 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -557,9 +557,10 @@ impl DBNotification { let mut redis = redis.connect().await?; redis - .delete_many(user_ids.into_iter().map(|id| { - (USER_NOTIFICATIONS_NAMESPACE, Some(id.0.to_string())) - })) + .delete_many( + USER_NOTIFICATIONS_NAMESPACE, + user_ids.into_iter().map(|id| Some(id.0.to_string())), + ) .await?; Ok(()) diff --git a/apps/labrinth/src/database/models/organization_item.rs b/apps/labrinth/src/database/models/organization_item.rs index 56637d01f3..88b77bff42 100644 --- a/apps/labrinth/src/database/models/organization_item.rs +++ b/apps/labrinth/src/database/models/organization_item.rs @@ -256,15 +256,16 @@ impl DBOrganization { ) -> Result<(), super::DatabaseError> { let mut redis = redis.connect().await?; + if let Some(slug) = slug { + redis + .delete(ORGANIZATIONS_TITLES_NAMESPACE, slug.to_lowercase()) + .await?; + } + redis - .delete_many([ - (ORGANIZATIONS_NAMESPACE, Some(id.0.to_string())), - ( - ORGANIZATIONS_TITLES_NAMESPACE, - slug.map(|x| x.to_lowercase()), - ), - ]) + .delete(ORGANIZATIONS_NAMESPACE, id.0.to_string()) .await?; + Ok(()) } } diff --git a/apps/labrinth/src/database/models/pat_item.rs b/apps/labrinth/src/database/models/pat_item.rs index 735f7efd76..f6f4aef854 100644 --- a/apps/labrinth/src/database/models/pat_item.rs +++ b/apps/labrinth/src/database/models/pat_item.rs @@ -209,18 +209,26 @@ impl DBPersonalAccessToken { } redis - .delete_many(clear_pats.into_iter().flat_map( - |(id, token, user_id)| { - [ - (PATS_NAMESPACE, id.map(|i| i.0.to_string())), - (PATS_TOKENS_NAMESPACE, token), - ( - PATS_USERS_NAMESPACE, - user_id.map(|i| i.0.to_string()), - ), - ] - }, - )) + .delete_many( + PATS_NAMESPACE, + clear_pats + .iter() + .map(|(x, _, _)| x.map(|i| i.0.to_string())), + ) + .await?; + redis + .delete_many( + PATS_TOKENS_NAMESPACE, + clear_pats.iter().map(|(_, token, _)| token.clone()), + ) + .await?; + redis + .delete_many( + PATS_USERS_NAMESPACE, + clear_pats + .iter() + .map(|(_, _, x)| x.map(|i| i.0.to_string())), + ) .await?; Ok(()) diff --git a/apps/labrinth/src/database/models/project_item.rs b/apps/labrinth/src/database/models/project_item.rs index 717ebf7e47..fbcb3fb241 100644 --- a/apps/labrinth/src/database/models/project_item.rs +++ b/apps/labrinth/src/database/models/project_item.rs @@ -953,20 +953,20 @@ impl DBProject { ) -> Result<(), DatabaseError> { let mut redis = redis.connect().await?; - redis - .delete_many([ - (PROJECTS_NAMESPACE, Some(id.0.to_string())), - (PROJECTS_SLUGS_NAMESPACE, slug.map(|x| x.to_lowercase())), - ( - PROJECTS_DEPENDENCIES_NAMESPACE, - if clear_dependencies.unwrap_or(false) { - Some(id.0.to_string()) - } else { - None - }, - ), - ]) - .await?; + redis.delete(PROJECTS_NAMESPACE, id.0.to_string()).await?; + + if let Some(slug) = slug { + redis + .delete(PROJECTS_SLUGS_NAMESPACE, slug.to_lowercase()) + .await?; + } + + if clear_dependencies.unwrap_or(false) { + redis + .delete(PROJECTS_DEPENDENCIES_NAMESPACE, id.0.to_string()) + .await?; + } + Ok(()) } } diff --git a/apps/labrinth/src/database/models/session_item.rs b/apps/labrinth/src/database/models/session_item.rs index 2b1bac1cb7..b3f4dee822 100644 --- a/apps/labrinth/src/database/models/session_item.rs +++ b/apps/labrinth/src/database/models/session_item.rs @@ -268,19 +268,28 @@ impl DBSession { } redis - .delete_many(clear_sessions.into_iter().flat_map( - |(id, session, user_id)| { - [ - (SESSIONS_NAMESPACE, id.map(|i| i.0.to_string())), - (SESSIONS_IDS_NAMESPACE, session), - ( - SESSIONS_USERS_NAMESPACE, - user_id.map(|i| i.0.to_string()), - ), - ] - }, - )) + .delete_many( + SESSIONS_NAMESPACE, + clear_sessions + .iter() + .map(|(x, _, _)| x.map(|x| x.0.to_string())), + ) + .await?; + redis + .delete_many( + SESSIONS_IDS_NAMESPACE, + clear_sessions.iter().map(|(_, session, _)| session.clone()), + ) .await?; + redis + .delete_many( + SESSIONS_USERS_NAMESPACE, + clear_sessions + .iter() + .map(|(_, _, x)| x.map(|x| x.0.to_string())), + ) + .await?; + Ok(()) } diff --git a/apps/labrinth/src/database/models/user_item.rs b/apps/labrinth/src/database/models/user_item.rs index b28e005c84..7032c2bbc5 100644 --- a/apps/labrinth/src/database/models/user_item.rs +++ b/apps/labrinth/src/database/models/user_item.rs @@ -470,15 +470,16 @@ impl DBUser { let mut redis = redis.connect().await?; redis - .delete_many(user_ids.iter().flat_map(|(id, username)| { - [ - (USERS_NAMESPACE, Some(id.0.to_string())), - ( - USER_USERNAMES_NAMESPACE, - username.clone().map(|i| i.to_lowercase()), - ), - ] - })) + .delete_many( + USERS_NAMESPACE, + user_ids.iter().map(|(id, _)| Some(id.0.to_string())), + ) + .await?; + redis + .delete_many( + USER_USERNAMES_NAMESPACE, + user_ids.iter().map(|(_, username)| username.clone()), + ) .await?; Ok(()) } @@ -491,9 +492,8 @@ impl DBUser { redis .delete_many( - user_ids.iter().map(|id| { - (USERS_PROJECTS_NAMESPACE, Some(id.0.to_string())) - }), + USERS_PROJECTS_NAMESPACE, + user_ids.iter().map(|id| Some(id.0.to_string())), ) .await?; diff --git a/apps/labrinth/src/database/models/version_item.rs b/apps/labrinth/src/database/models/version_item.rs index d0ba9c0b97..065631a878 100644 --- a/apps/labrinth/src/database/models/version_item.rs +++ b/apps/labrinth/src/database/models/version_item.rs @@ -14,7 +14,6 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::HashMap; -use std::iter; pub const VERSIONS_NAMESPACE: &str = "versions"; const VERSION_FILES_NAMESPACE: &str = "versions_files"; @@ -914,24 +913,21 @@ impl DBVersion { ) -> Result<(), DatabaseError> { let mut redis = redis.connect().await?; + redis + .delete(VERSIONS_NAMESPACE, version.inner.id.0.to_string()) + .await?; + redis .delete_many( - iter::once(( - VERSIONS_NAMESPACE, - Some(version.inner.id.0.to_string()), - )) - .chain(version.files.iter().flat_map( - |file| { - file.hashes.iter().map(|(algo, hash)| { - ( - VERSION_FILES_NAMESPACE, - Some(format!("{algo}_{hash}")), - ) - }) - }, - )), + VERSION_FILES_NAMESPACE, + version.files.iter().flat_map(|file| { + file.hashes + .iter() + .map(|(algo, hash)| Some(format!("{algo}_{hash}"))) + }), ) .await?; + Ok(()) } } diff --git a/apps/labrinth/src/database/redis/mod.rs b/apps/labrinth/src/database/redis/mod.rs index dd4e11e10a..ecba1e5658 100644 --- a/apps/labrinth/src/database/redis/mod.rs +++ b/apps/labrinth/src/database/redis/mod.rs @@ -2,7 +2,7 @@ use super::models::DatabaseError; use ariadne::ids::base62_impl::{parse_base62, to_base62}; use chrono::{TimeZone, Utc}; use dashmap::DashMap; -use deadpool_redis::{Config, Runtime}; +use deadpool_redis::cluster::{Config, Runtime}; use futures::future::Either; use prometheus::{IntGauge, Registry}; use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs}; @@ -24,13 +24,13 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes #[derive(Clone)] pub struct RedisPool { - pub url: String, + pub urls: Vec, pub pool: util::InstrumentedPool, meta_namespace: String, } pub struct RedisConnection { - pub connection: deadpool_redis::Connection, + pub connection: deadpool_redis::cluster::Connection, meta_namespace: String, } @@ -51,8 +51,13 @@ impl RedisPool { }, ); - let url = dotenvy::var("REDIS_URL").expect("Redis URL not set"); - let pool = Config::from_url(url.clone()) + let urls = dotenvy::var("REDIS_URLS") + .expect("Redis URL not set") + .split(',') + .map(String::from) + .collect::>(); + + let pool = Config::from_urls(&*urls) .builder() .expect("Error building Redis pool") .max_size( @@ -67,7 +72,7 @@ impl RedisPool { .expect("Redis connection failed"); let pool = RedisPool { - url, + urls, pool: util::InstrumentedPool::new(pool), meta_namespace: meta_namespace.unwrap_or("".to_string()), }; @@ -280,7 +285,7 @@ impl RedisPool { .iter() .map(|x| { format!( - "{}_{slug_namespace}:{}", + "{}_{slug_namespace}:{{ns:{namespace}}}:{}", self.meta_namespace, if case_sensitive { x.value().to_string() @@ -316,7 +321,12 @@ impl RedisPool { .map(|x| x.to_string()) })) .chain(slug_ids) - .map(|x| format!("{}_{namespace}:{x}", self.meta_namespace)) + .map(|x| { + format!( + "{}_{namespace}:{{ns:{namespace}}}:{x}", + self.meta_namespace + ) + }) .collect::>(); let cached_values = cmd("MGET") @@ -378,10 +388,10 @@ impl RedisPool { ids.iter().map(|x| x.key().clone()).collect::>(); fetch_ids.iter().for_each(|key| { - pipe.atomic().set_options( + pipe.set_options( // We store locks in lowercase because they are case insensitive format!( - "{}_{namespace}:{}/lock", + "{}_{namespace}:{{ns:{namespace}}}:{}/lock", self.meta_namespace, key.to_lowercase() ), @@ -445,9 +455,9 @@ impl RedisPool { alias: slug.clone(), }; - pipe.atomic().set_ex( + pipe.set_ex( format!( - "{}_{namespace}:{key}", + "{}_{namespace}:{{ns:{namespace}}}:{key}", self.meta_namespace ), serde_json::to_string(&value)?, @@ -464,17 +474,17 @@ impl RedisPool { slug.to_string().to_lowercase() }; - pipe.atomic().set_ex( + pipe.set_ex( format!( - "{}_{slug_namespace}:{}", + "{}_{slug_namespace}:{{ns:{namespace}}}:{}", self.meta_namespace, actual_slug ), key.to_string(), DEFAULT_EXPIRY as u64, ); - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", + pipe.del(format!( + "{}_{namespace}:{{ns:{namespace}}}:{}/lock", // Locks are stored in lowercase self.meta_namespace, actual_slug.to_lowercase() @@ -489,16 +499,16 @@ impl RedisPool { let base62 = to_base62(value); ids.remove(&base62); - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", + pipe.del(format!( + "{}_{namespace}:{{ns:{namespace}}}:{}/lock", self.meta_namespace, // Locks are stored in lowercase base62.to_lowercase() )); } - pipe.atomic().del(format!( - "{}_{namespace}:{key}/lock", + pipe.del(format!( + "{}_{namespace}:{{ns:{namespace}}}:{key}/lock", self.meta_namespace )); @@ -507,13 +517,13 @@ impl RedisPool { } for (key, _) in ids { - pipe.atomic().del(format!( - "{}_{namespace}:{}/lock", + pipe.del(format!( + "{}_{namespace}:{{ns:{namespace}}}:{}/lock", self.meta_namespace, key.to_lowercase() )); - pipe.atomic().del(format!( - "{}_{namespace}:{key}/lock", + pipe.del(format!( + "{}_{namespace}:{{ns:{namespace}}}:{key}/lock", self.meta_namespace )); } @@ -539,7 +549,7 @@ impl RedisPool { .iter() .map(|x| { format!( - "{}_{namespace}:{}/lock", + "{}_{namespace}:{{ns:{namespace}}}:{}/lock", self.meta_namespace, // We lowercase key because locks are stored in lowercase x.key().to_lowercase() @@ -613,7 +623,10 @@ impl RedisConnection { redis_args( &mut cmd, vec![ - format!("{}_{}:{}", self.meta_namespace, namespace, id), + format!( + "{}_{}:{{ns:{namespace}}}:{}", + self.meta_namespace, namespace, id + ), data.to_string(), "EX".to_string(), expiry.unwrap_or(DEFAULT_EXPIRY).to_string(), @@ -654,8 +667,11 @@ impl RedisConnection { let mut cmd = cmd("GET"); redis_args( &mut cmd, - vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)] - .as_slice(), + vec![format!( + "{}_{}:{{ns:{namespace}}}:{}", + self.meta_namespace, namespace, id + )] + .as_slice(), ); let res = redis_execute(&mut cmd, &mut self.connection).await?; Ok(res) @@ -671,7 +687,12 @@ impl RedisConnection { redis_args( &mut cmd, ids.iter() - .map(|x| format!("{}_{}:{}", self.meta_namespace, namespace, x)) + .map(|x| { + format!( + "{}_{}:{{ns:{namespace}}}:{}", + self.meta_namespace, namespace, x + ) + }) .collect::>() .as_slice(), ); @@ -723,8 +744,11 @@ impl RedisConnection { let mut cmd = cmd("DEL"); redis_args( &mut cmd, - vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)] - .as_slice(), + vec![format!( + "{}_{}:{{ns:{namespace}}}:{}", + self.meta_namespace, namespace, id + )] + .as_slice(), ); redis_execute::<()>(&mut cmd, &mut self.connection).await?; Ok(()) @@ -733,16 +757,20 @@ impl RedisConnection { #[tracing::instrument(skip(self, iter))] pub async fn delete_many( &mut self, - iter: impl IntoIterator)>, + namespace: &str, + iter: impl IntoIterator>, ) -> Result<(), DatabaseError> { let mut cmd = cmd("DEL"); let mut any = false; - for (namespace, id) in iter { + for id in iter { if let Some(id) = id { redis_args( &mut cmd, - [format!("{}_{}:{}", self.meta_namespace, namespace, id)] - .as_slice(), + [format!( + "{}_{}:{{ns:{namespace}}}:{}", + self.meta_namespace, namespace, id + )] + .as_slice(), ); any = true; } @@ -762,7 +790,10 @@ impl RedisConnection { key: &str, value: impl ToRedisArgs + Send + Sync + Debug, ) -> Result<(), DatabaseError> { - let key = format!("{}_{namespace}:{key}", self.meta_namespace); + let key = format!( + "{}_{namespace}:{{ns:{namespace}}}:{key}", + self.meta_namespace + ); cmd("LPUSH") .arg(key) .arg(value) @@ -778,7 +809,10 @@ impl RedisConnection { key: &str, timeout: Option, ) -> Result, DatabaseError> { - let key = format!("{}_{namespace}:{key}", self.meta_namespace); + let key = format!( + "{}_{namespace}:{{ns:{namespace}}}:{key}", + self.meta_namespace + ); // a timeout of 0 is infinite let timeout = timeout.unwrap_or(0.0); let values = cmd("BRPOP") @@ -807,7 +841,7 @@ pub fn redis_args(cmd: &mut util::InstrumentedCmd, args: &[String]) { pub async fn redis_execute( cmd: &mut util::InstrumentedCmd, - redis: &mut deadpool_redis::Connection, + redis: &mut deadpool_redis::cluster::Connection, ) -> Result where T: redis::FromRedisValue, diff --git a/apps/labrinth/src/database/redis/util.rs b/apps/labrinth/src/database/redis/util.rs index 1980bc7b77..7d4fe032b8 100644 --- a/apps/labrinth/src/database/redis/util.rs +++ b/apps/labrinth/src/database/redis/util.rs @@ -5,17 +5,19 @@ use derive_more::{Deref, DerefMut}; use redis::{FromRedisValue, RedisResult, ToRedisArgs}; use tracing::{Instrument, info_span}; -#[derive(Debug, Clone, Deref, DerefMut)] +#[derive(/*Debug, */ Clone, Deref, DerefMut)] pub struct InstrumentedPool { - inner: deadpool_redis::Pool, + inner: deadpool_redis::cluster::Pool, } impl InstrumentedPool { - pub fn new(inner: deadpool_redis::Pool) -> Self { + pub fn new(inner: deadpool_redis::cluster::Pool) -> Self { Self { inner } } - pub async fn get(&self) -> Result { + pub async fn get( + &self, + ) -> Result { self.inner .get() .instrument(info_span!("get redis connection")) diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 4376f2c20e..0f1252154b 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -260,7 +260,8 @@ pub fn app_setup( { let pool = pool.clone(); - let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap(); + let redis_client = + redis::Client::open(redis_pool.urls[0].clone()).unwrap(); let sockets = active_sockets.clone(); actix_rt::spawn(async move { let pubsub = redis_client.get_async_pubsub().await.unwrap(); @@ -386,7 +387,7 @@ pub fn check_env_vars() -> bool { failed |= check_var::("MEILISEARCH_READ_ADDR"); failed |= check_var::("MEILISEARCH_WRITE_ADDRS"); failed |= check_var::("MEILISEARCH_KEY"); - failed |= check_var::("REDIS_URL"); + failed |= check_var::("REDIS_URLS"); failed |= check_var::("BIND_ADDR"); failed |= check_var::("SELF_ADDR"); diff --git a/apps/labrinth/src/queue/analytics.rs b/apps/labrinth/src/queue/analytics.rs index bb0373cb44..2c4bdaeeea 100644 --- a/apps/labrinth/src/queue/analytics.rs +++ b/apps/labrinth/src/queue/analytics.rs @@ -120,7 +120,12 @@ impl AnalyticsQueue { .arg( views_keys .iter() - .map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1)) + .map(|x| { + format!( + "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", + VIEWS_NAMESPACE, x.0, x.1 + ) + }) .collect::>(), ) .query_async::>>(&mut redis) @@ -152,7 +157,10 @@ impl AnalyticsQueue { }; pipe.atomic().set_ex( - format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1), + format!( + "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", + VIEWS_NAMESPACE, key.0, key.1 + ), new_count, 6 * 60 * 60, ); @@ -195,7 +203,10 @@ impl AnalyticsQueue { downloads_keys .iter() .map(|x| { - format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1) + format!( + "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", + DOWNLOADS_NAMESPACE, x.0, x.1 + ) }) .collect::>(), ) @@ -219,7 +230,10 @@ impl AnalyticsQueue { }; pipe.atomic().set_ex( - format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1), + format!( + "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", + DOWNLOADS_NAMESPACE, key.0, key.1 + ), new_count, 6 * 60 * 60, );