diff --git a/Cargo.lock b/Cargo.lock index 2264090..635a7cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "bumpalo" version = "3.19.0" @@ -50,11 +56,12 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "datapath" -version = "0.0.6" +version = "0.0.8" dependencies = [ "datapath-macro", "itertools", "regex", + "smartstring", "tokio", "tracing", "trie-rs", @@ -63,7 +70,7 @@ dependencies = [ [[package]] name = "datapath-macro" -version = "0.0.6" +version = "0.0.8" dependencies = [ "proc-macro2", "quote", @@ -228,6 +235,23 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "syn" version = "2.0.111" @@ -305,6 +329,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasip2" version = "1.0.1+wasi-0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 779eb8b..0b17142 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ readme = "README.md" authors = ["rm-dr"] # Don't forget to bump datapath-macro below! -version = "0.0.6" +version = "0.0.8" [workspace.lints.rust] unused_import_braces = "deny" @@ -70,7 +70,7 @@ cargo_common_metadata = "deny" # [workspace.dependencies] -datapath-macro = { path = "crates/datapath-macro", version = "0.0.6" } +datapath-macro = { path = "crates/datapath-macro", version = "0.0.8" } datapath = { path = "crates/datapath" } chrono = "0.4.42" @@ -83,3 +83,4 @@ tracing = "0.1" trie-rs = "0.4.2" uuid = "1.19.0" tokio = { version = "1.48.0", features = ["sync"] } +smartstring = "1.0.1" diff --git a/crates/datapath/Cargo.toml b/crates/datapath/Cargo.toml index 4e626a0..7ecc0a2 100644 --- a/crates/datapath/Cargo.toml +++ b/crates/datapath/Cargo.toml @@ -16,17 +16,18 @@ workspace = true [dependencies] datapath-macro = { workspace = true } +regex = { workspace = true } +tracing = { workspace = true } -regex = { workspace = true, optional = true } -tracing = { workspace = true, optional = true } trie-rs = { workspace = true, optional = true } -itertools = { workspace = true, optional = true } tokio = { workspace = true, optional = true } +smartstring = { workspace = true, optional = true } +itertools = { workspace = true, optional = true } [dev-dependencies] uuid = { version = "1", features = ["v4"] } [features] default = [] -index = ["dep:regex", "dep:trie-rs", "dep:tracing", "dep:itertools"] +index = ["dep:trie-rs", "dep:smartstring", "dep:itertools"] tokio = ["dep:tokio"] diff --git a/crates/datapath/src/index/mod.rs b/crates/datapath/src/index/mod.rs index e3faf5c..e49c30a 100644 --- a/crates/datapath/src/index/mod.rs +++ b/crates/datapath/src/index/mod.rs @@ -1,56 +1,13 @@ use itertools::Itertools; use std::{ collections::{HashMap, HashSet}, - fmt::Display, str::FromStr, sync::Arc, }; use tracing::trace; use trie_rs::map::{Trie, TrieBuilder}; -mod rule; -pub use rule::Rule; - -/// A path segment in an [`AnyDatapath`] -#[derive(Debug, Clone, Hash, PartialEq, Eq)] -enum PathSegment { - /// A constant value, like `web` - Constant(String), - - /// A key=value partition, like `domain=gouletpens.com` - Value { key: String, value: String }, -} - -impl Display for PathSegment { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - PathSegment::Constant(x) => write!(f, "{x}"), - PathSegment::Value { key, value } => write!(f, "{key}={value}"), - } - } -} - -impl FromStr for PathSegment { - type Err = (); - fn from_str(s: &str) -> Result { - if s.contains("\n") { - return Err(()); - } - - if s.is_empty() { - return Err(()); - } - - return Ok(if s.contains("=") { - let mut s = s.split("="); - let key = s.next().ok_or(())?.to_owned(); - let value = s.join("="); - Self::Value { key, value } - } else { - Self::Constant(s.to_owned()) - }); - } -} +use crate::{Rule, segment::PathSegment}; // // MARK: index @@ -234,7 +191,7 @@ impl DatapathIndex { query: impl Into, ) -> Option> + '_> { let query: String = query.into(); - let regex = rule::Rule::new(query.clone())?; + let regex = Rule::new(query.clone())?; let key = Self::query_to_key(&query); trace!("DatapathIndex key is {key}"); @@ -247,10 +204,7 @@ impl DatapathIndex { } /// Like [Self::query], but with a precompiled rule - pub fn query_rule<'a>( - &'a self, - rule: &'a rule::Rule, - ) -> impl Iterator> + 'a { + pub fn query_rule<'a>(&'a self, rule: &'a Rule) -> impl Iterator> + 'a { let key = Self::query_to_key(rule.pattern()); trace!("DatapathIndex key is {key}"); @@ -263,7 +217,7 @@ impl DatapathIndex { /// Like [Self::query], but returns `true` if any paths match pub fn query_match(&self, query: impl Into) -> Option { let query: String = query.into(); - let regex = rule::Rule::new(query.clone())?; + let regex = Rule::new(query.clone())?; let key = Self::query_to_key(&query); trace!("DatapathIndex key is {key}"); @@ -279,7 +233,7 @@ impl DatapathIndex { } /// Like [Self::query_match], but with a precompiled rule - pub fn query_rule_match<'a>(&'a self, rule: &'a rule::Rule) -> bool { + pub fn query_rule_match<'a>(&'a self, rule: &'a Rule) -> bool { let key = Self::query_to_key(&rule.pattern()); trace!("DatapathIndex key is {key}"); diff --git a/crates/datapath/src/lib.rs b/crates/datapath/src/lib.rs index ade3bed..f39a0d6 100644 --- a/crates/datapath/src/lib.rs +++ b/crates/datapath/src/lib.rs @@ -26,10 +26,22 @@ pub use wildcardable::*; mod arcsubstr; pub use arcsubstr::*; +mod rule; +pub use rule::*; + +#[cfg(feature = "index")] +pub(crate) mod segment; + #[cfg(feature = "index")] mod index; #[cfg(feature = "index")] pub use index::*; +#[cfg(feature = "index")] +mod vec_index; + +#[cfg(feature = "index")] +pub use vec_index::*; + pub use datapath_macro::datapath; diff --git a/crates/datapath/src/index/rule.rs b/crates/datapath/src/rule.rs similarity index 100% rename from crates/datapath/src/index/rule.rs rename to crates/datapath/src/rule.rs diff --git a/crates/datapath/src/segment.rs b/crates/datapath/src/segment.rs new file mode 100644 index 0000000..d08a90f --- /dev/null +++ b/crates/datapath/src/segment.rs @@ -0,0 +1,43 @@ +use itertools::Itertools; +use std::{fmt::Display, str::FromStr}; + +/// A path segment in an [`AnyDatapath`] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum PathSegment { + /// A constant value, like `web` + Constant(String), + + /// A key=value partition, like `domain=gouletpens.com` + Value { key: String, value: String }, +} + +impl Display for PathSegment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PathSegment::Constant(x) => write!(f, "{x}"), + PathSegment::Value { key, value } => write!(f, "{key}={value}"), + } + } +} + +impl FromStr for PathSegment { + type Err = (); + fn from_str(s: &str) -> Result { + if s.contains("\n") { + return Err(()); + } + + if s.is_empty() { + return Err(()); + } + + return Ok(if s.contains("=") { + let mut s = s.split("="); + let key = s.next().ok_or(())?.to_owned(); + let value = s.join("="); + Self::Value { key, value } + } else { + Self::Constant(s.to_owned()) + }); + } +} diff --git a/crates/datapath/src/vec_index/mod.rs b/crates/datapath/src/vec_index/mod.rs new file mode 100644 index 0000000..6516709 --- /dev/null +++ b/crates/datapath/src/vec_index/mod.rs @@ -0,0 +1,390 @@ +use itertools::Itertools; +use smartstring::{LazyCompact, SmartString}; +use std::{collections::HashMap, sync::Arc}; + +use crate::Rule; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum SchemaSegments { + /// A const path segment like `web` + Const(SmartString), + + /// A prefix path segment like `domain=`, + /// without the equals sign + Prefix(SmartString), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct Schema { + segments: Vec, +} + +impl Schema { + pub fn from_path(path: &str, max_len: usize) -> Self { + let segments = path + .split('/') + .take(max_len) + .map(|x| { + if x == "*" || x == "**" { + return None; + } + + let is_prefix = x.contains('='); + Some(match is_prefix { + true => SchemaSegments::Prefix( + x.split('=') + .next() + .expect("split must return at least one item") + .into(), + ), + false => SchemaSegments::Const(x.into()), + }) + }) + .while_some() + .collect::>(); + + Self { segments } + } + + pub fn exemplar(&self) -> String { + self.segments + .iter() + .map(|x| match x { + SchemaSegments::Const(x) => x.to_string(), + SchemaSegments::Prefix(x) => format!("{x}=*"), + }) + .join("/") + } +} + +/// An in-memory cache of s3 paths. +#[derive(Debug)] +pub struct DatapathVecIndex { + /// Array of (schema, paths with that schema) + /// - all paths belong to exactly one schema + /// - we use the fact that order in both vecs is constant + paths: Vec<(Schema, Vec>)>, + + len: usize, + schema_len: usize, +} + +impl DatapathVecIndex { + pub fn new_empty(schema_len: usize) -> Self { + Self { + paths: Vec::new(), + len: 0, + schema_len, + } + } + + pub fn schema_len(&self) -> usize { + self.schema_len + } + + pub fn new, I: Iterator>(schema_len: usize, paths: I) -> Self { + let mut len = 0; + let mut map: HashMap>> = HashMap::new(); + for p in paths { + let p = Arc::new(p.into()); + let schema = Schema::from_path(&p, schema_len); + map.entry(schema).or_default().push(p); + len += 1; + } + + Self { + schema_len, + len, + paths: map.into_iter().collect(), + } + } + + #[cfg(feature = "tokio")] + pub async fn async_new>( + schema_len: usize, + mut stream: tokio::sync::mpsc::Receiver, + ) -> Self { + let mut len = 0; + let mut map: HashMap>> = HashMap::new(); + while let Some(p) = stream.recv().await { + let p = Arc::new(p.into()); + let schema = Schema::from_path(&p, schema_len); + map.entry(schema).or_default().push(p); + len += 1; + } + + Self { + schema_len, + len, + paths: map.into_iter().collect(), + } + } + + #[inline(always)] + pub fn len(&self) -> usize { + self.len + } + + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Given a datapath (that may contain wildcards) as a query, + /// return all known datapaths that match it. + /// + /// Returns an empty iterator if no paths match. + /// Returns `None` if the query was invalid. + pub fn query(&self, query: impl Into) -> Option + '_> { + let query: String = query.into(); + let regex = Rule::new(query.clone())?; + + // This is not a bug, we want all segments from query. + let query_schema = Schema::from_path(&query, query.len()).exemplar(); + + Some( + self.paths + .iter() + .filter(move |(schema, _)| { + let schema = schema.exemplar(); + schema.starts_with(&query_schema) || query_schema.starts_with(&schema) + }) + .flat_map(|(_, paths)| paths.iter()) + .filter(move |path| regex.is_match(path.as_str())) + .map(|arc_str| arc_str.as_ref().clone()), + ) + } + + /// Like [Self::query], but with a precompiled rule + pub fn query_rule<'a>(&'a self, rule: &'a Rule) -> impl Iterator + 'a { + let query = rule.pattern(); + let query_schema = Schema::from_path(&query, query.len()).exemplar(); + + self.paths + .iter() + .filter(move |(schema, _)| { + let schema = schema.exemplar(); + schema.starts_with(&query_schema) || query_schema.starts_with(&schema) + }) + .flat_map(|(_, paths)| paths.iter()) + .filter(move |path| rule.is_match(path.as_str())) + .map(|arc_str| arc_str.as_ref().clone()) + } + + /// Like [Self::query], but returns `true` if any paths match + pub fn query_match(&self, query: impl Into) -> Option { + let query: String = query.into(); + let regex = Rule::new(query.clone())?; + let query_schema = Schema::from_path(&query, query.len()).exemplar(); + + Some( + self.paths + .iter() + .filter(move |(schema, _)| { + let schema = schema.exemplar(); + schema.starts_with(&query_schema) || query_schema.starts_with(&schema) + }) + .flat_map(|(_, paths)| paths.iter()) + .any(|path| regex.is_match(path.as_str())), + ) + } + + /// Like [Self::query_match], but with a precompiled rule + pub fn query_rule_match<'a>(&'a self, rule: &'a Rule) -> bool { + let query = rule.pattern(); + let query_schema = Schema::from_path(&query, query.len()).exemplar(); + + self.paths + .iter() + .filter(move |(schema, _)| { + let schema = schema.exemplar(); + schema.starts_with(&query_schema) || query_schema.starts_with(&schema) + }) + .flat_map(|(_, paths)| paths.iter()) + .any(|path| rule.is_match(path.as_str())) + } +} + +// MARK: index tests + +#[cfg(test)] +#[expect(clippy::unwrap_used)] +mod index_tests { + use super::*; + + #[test] + fn datapath_index_empty() { + let idx = DatapathVecIndex::new(3, std::iter::empty::()); + let query = "web/domain=example.com"; + assert_eq!(idx.query(query).unwrap().count(), 0); + assert!(idx.is_empty()); + assert_eq!(idx.len(), 0); + } + + #[test] + fn insert_and_lookup_exact_match() { + let paths = vec!["web/domain=example.com/ts=1234"]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Exact match + let results: Vec<_> = idx + .query("web/domain=example.com/ts=1234") + .unwrap() + .collect(); + assert_eq!(results.len(), 1); + assert_eq!(results[0], "web/domain=example.com/ts=1234"); + + // No match + let results: Vec<_> = idx.query("web/domain=other.com/ts=1234").unwrap().collect(); + assert_eq!(results.len(), 0); + + assert_eq!(idx.len(), 1); + } + + #[test] + fn wildcard_constant_match() { + let paths = vec![ + "web/domain=example.com/ts=1234", + "api/domain=example.com/ts=1234", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Wildcard first segment + let results: Vec<_> = idx.query("*/domain=example.com/ts=1234").unwrap().collect(); + assert_eq!(results.len(), 2); + + assert_eq!(idx.len(), 2); + } + + #[test] + fn wildcard_value_match() { + let paths = vec![ + "web/domain=example.com/ts=1234", + "web/domain=other.com/ts=1234", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Wildcard domain + let results: Vec<_> = idx.query("web/domain=*/ts=1234").unwrap().collect(); + assert_eq!(results.len(), 2); + } + + #[test] + fn multiple_datapaths() { + let paths = vec![ + "web/domain=example.com/ts=1234", + "web/domain=other.com/ts=1234", + "api/domain=example.com/ts=5678", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Specific lookup + let results: Vec<_> = idx + .query("web/domain=example.com/ts=1234") + .unwrap() + .collect(); + assert_eq!(results.len(), 1); + assert_eq!(results[0], "web/domain=example.com/ts=1234"); + + // Wildcard time lookup + let results: Vec<_> = idx.query("web/domain=example.com/ts=*").unwrap().collect(); + assert_eq!(results.len(), 1); + assert_eq!(results[0], "web/domain=example.com/ts=1234"); + + // Double wildcard lookup + let results: Vec<_> = idx.query("web/domain=*/ts=*").unwrap().collect(); + assert_eq!(results.len(), 2); + + assert_eq!(idx.len(), 3); + } + + #[test] + fn nested_wildcards() { + let paths = vec![ + "web/domain=example.com/ts=1234/crawl/2.5", + "web/domain=other.com/ts=5678/crawl/2.5", + "web/domain=example.com/ts=9999/crawl/3.0", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Multiple wildcards in path + let results: Vec<_> = idx.query("web/domain=*/ts=*/crawl/*").unwrap().collect(); + assert_eq!(results.len(), 3); + + // Selective wildcards + let results: Vec<_> = idx + .query("web/domain=example.com/ts=*/crawl/*") + .unwrap() + .collect(); + assert_eq!(results.len(), 2); + } + + #[test] + fn partial_path_query() { + let paths = vec!["web/domain=example.com/ts=1234/crawl/2.5"]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Query with fewer segments than the stored path + let results: Vec<_> = idx.query("web/domain=example.com").unwrap().collect(); + assert_eq!(results.len(), 0); + } + + #[test] + fn longer_path_query() { + let paths = vec!["web/domain=example.com"]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Query with more segments than the stored path + let results: Vec<_> = idx + .query("web/domain=example.com/ts=1234/crawl/2.5") + .unwrap() + .collect(); + assert_eq!(results.len(), 0); + } + + #[test] + fn query_match() { + let paths = vec![ + "web/domain=example.com/ts=1234", + "web/domain=other.com/ts=5678", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Match exists + assert_eq!( + idx.query_match("web/domain=example.com/ts=1234").unwrap(), + true + ); + assert_eq!(idx.query_match("web/domain=*/ts=*").unwrap(), true); + + // No match + assert_eq!( + idx.query_match("api/domain=example.com/ts=1234").unwrap(), + false + ); + assert_eq!( + idx.query_match("web/domain=missing.com/ts=9999").unwrap(), + false + ); + } + + #[test] + fn suffix_wildcard() { + let paths = vec![ + "web/domain=example.com/ts=1234/file1.json", + "web/domain=example.com/ts=1234/file2.json", + "web/domain=example.com/ts=5678/file3.json", + ]; + let idx = DatapathVecIndex::new(3, paths.into_iter()); + + // Query with suffix wildcard + let results: Vec<_> = idx.query("web/domain=example.com/**").unwrap().collect(); + assert_eq!(results.len(), 3); + + let results: Vec<_> = idx + .query("web/domain=example.com/ts=1234/**") + .unwrap() + .collect(); + assert_eq!(results.len(), 2); + } +}