1
0
mirror of https://github.com/rm-dr/datapath.git synced 2026-05-16 00:08:59 -07:00

Add VecIndex
CI / Check typos (push) Successful in 1m30s
CI / Check links (push) Successful in 1m26s
CI / Clippy (push) Successful in 2m0s
CI / Build and test (push) Successful in 2m30s

This commit is contained in:
2026-02-23 10:11:03 -08:00
committed by Mark
parent ad7ee03812
commit 3407c45dcb
8 changed files with 490 additions and 59 deletions
Generated
+32 -2
View File
@@ -11,6 +11,12 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.19.0" version = "3.19.0"
@@ -50,11 +56,12 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]] [[package]]
name = "datapath" name = "datapath"
version = "0.0.6" version = "0.0.8"
dependencies = [ dependencies = [
"datapath-macro", "datapath-macro",
"itertools", "itertools",
"regex", "regex",
"smartstring",
"tokio", "tokio",
"tracing", "tracing",
"trie-rs", "trie-rs",
@@ -63,7 +70,7 @@ dependencies = [
[[package]] [[package]]
name = "datapath-macro" name = "datapath-macro"
version = "0.0.6" version = "0.0.8"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -228,6 +235,23 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "smartstring"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29"
dependencies = [
"autocfg",
"static_assertions",
"version_check",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.111" version = "2.0.111"
@@ -305,6 +329,12 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]] [[package]]
name = "wasip2" name = "wasip2"
version = "1.0.1+wasi-0.2.4" version = "1.0.1+wasi-0.2.4"
+3 -2
View File
@@ -11,7 +11,7 @@ readme = "README.md"
authors = ["rm-dr"] authors = ["rm-dr"]
# Don't forget to bump datapath-macro below! # Don't forget to bump datapath-macro below!
version = "0.0.6" version = "0.0.8"
[workspace.lints.rust] [workspace.lints.rust]
unused_import_braces = "deny" unused_import_braces = "deny"
@@ -70,7 +70,7 @@ cargo_common_metadata = "deny"
# #
[workspace.dependencies] [workspace.dependencies]
datapath-macro = { path = "crates/datapath-macro", version = "0.0.6" } datapath-macro = { path = "crates/datapath-macro", version = "0.0.8" }
datapath = { path = "crates/datapath" } datapath = { path = "crates/datapath" }
chrono = "0.4.42" chrono = "0.4.42"
@@ -83,3 +83,4 @@ tracing = "0.1"
trie-rs = "0.4.2" trie-rs = "0.4.2"
uuid = "1.19.0" uuid = "1.19.0"
tokio = { version = "1.48.0", features = ["sync"] } tokio = { version = "1.48.0", features = ["sync"] }
smartstring = "1.0.1"
+5 -4
View File
@@ -16,17 +16,18 @@ workspace = true
[dependencies] [dependencies]
datapath-macro = { workspace = true } datapath-macro = { workspace = true }
regex = { workspace = true }
tracing = { workspace = true }
regex = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
trie-rs = { workspace = true, optional = true } trie-rs = { workspace = true, optional = true }
itertools = { workspace = true, optional = true }
tokio = { workspace = true, optional = true } tokio = { workspace = true, optional = true }
smartstring = { workspace = true, optional = true }
itertools = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
[features] [features]
default = [] default = []
index = ["dep:regex", "dep:trie-rs", "dep:tracing", "dep:itertools"] index = ["dep:trie-rs", "dep:smartstring", "dep:itertools"]
tokio = ["dep:tokio"] tokio = ["dep:tokio"]
+5 -51
View File
@@ -1,56 +1,13 @@
use itertools::Itertools; use itertools::Itertools;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt::Display,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
}; };
use tracing::trace; use tracing::trace;
use trie_rs::map::{Trie, TrieBuilder}; use trie_rs::map::{Trie, TrieBuilder};
mod rule; use crate::{Rule, segment::PathSegment};
pub use rule::Rule;
/// A path segment in an [`AnyDatapath`]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
enum PathSegment {
/// A constant value, like `web`
Constant(String),
/// A key=value partition, like `domain=gouletpens.com`
Value { key: String, value: String },
}
impl Display for PathSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PathSegment::Constant(x) => write!(f, "{x}"),
PathSegment::Value { key, value } => write!(f, "{key}={value}"),
}
}
}
impl FromStr for PathSegment {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains("\n") {
return Err(());
}
if s.is_empty() {
return Err(());
}
return Ok(if s.contains("=") {
let mut s = s.split("=");
let key = s.next().ok_or(())?.to_owned();
let value = s.join("=");
Self::Value { key, value }
} else {
Self::Constant(s.to_owned())
});
}
}
// //
// MARK: index // MARK: index
@@ -234,7 +191,7 @@ impl DatapathIndex {
query: impl Into<String>, query: impl Into<String>,
) -> Option<impl Iterator<Item = &Arc<String>> + '_> { ) -> Option<impl Iterator<Item = &Arc<String>> + '_> {
let query: String = query.into(); let query: String = query.into();
let regex = rule::Rule::new(query.clone())?; let regex = Rule::new(query.clone())?;
let key = Self::query_to_key(&query); let key = Self::query_to_key(&query);
trace!("DatapathIndex key is {key}"); trace!("DatapathIndex key is {key}");
@@ -247,10 +204,7 @@ impl DatapathIndex {
} }
/// Like [Self::query], but with a precompiled rule /// Like [Self::query], but with a precompiled rule
pub fn query_rule<'a>( pub fn query_rule<'a>(&'a self, rule: &'a Rule) -> impl Iterator<Item = &'a Arc<String>> + 'a {
&'a self,
rule: &'a rule::Rule,
) -> impl Iterator<Item = &'a Arc<String>> + 'a {
let key = Self::query_to_key(rule.pattern()); let key = Self::query_to_key(rule.pattern());
trace!("DatapathIndex key is {key}"); trace!("DatapathIndex key is {key}");
@@ -263,7 +217,7 @@ impl DatapathIndex {
/// Like [Self::query], but returns `true` if any paths match /// Like [Self::query], but returns `true` if any paths match
pub fn query_match(&self, query: impl Into<String>) -> Option<bool> { pub fn query_match(&self, query: impl Into<String>) -> Option<bool> {
let query: String = query.into(); let query: String = query.into();
let regex = rule::Rule::new(query.clone())?; let regex = Rule::new(query.clone())?;
let key = Self::query_to_key(&query); let key = Self::query_to_key(&query);
trace!("DatapathIndex key is {key}"); trace!("DatapathIndex key is {key}");
@@ -279,7 +233,7 @@ impl DatapathIndex {
} }
/// Like [Self::query_match], but with a precompiled rule /// Like [Self::query_match], but with a precompiled rule
pub fn query_rule_match<'a>(&'a self, rule: &'a rule::Rule) -> bool { pub fn query_rule_match<'a>(&'a self, rule: &'a Rule) -> bool {
let key = Self::query_to_key(&rule.pattern()); let key = Self::query_to_key(&rule.pattern());
trace!("DatapathIndex key is {key}"); trace!("DatapathIndex key is {key}");
+12
View File
@@ -26,10 +26,22 @@ pub use wildcardable::*;
mod arcsubstr; mod arcsubstr;
pub use arcsubstr::*; pub use arcsubstr::*;
mod rule;
pub use rule::*;
#[cfg(feature = "index")]
pub(crate) mod segment;
#[cfg(feature = "index")] #[cfg(feature = "index")]
mod index; mod index;
#[cfg(feature = "index")] #[cfg(feature = "index")]
pub use index::*; pub use index::*;
#[cfg(feature = "index")]
mod vec_index;
#[cfg(feature = "index")]
pub use vec_index::*;
pub use datapath_macro::datapath; pub use datapath_macro::datapath;
+43
View File
@@ -0,0 +1,43 @@
use itertools::Itertools;
use std::{fmt::Display, str::FromStr};
/// A path segment in an [`AnyDatapath`]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum PathSegment {
/// A constant value, like `web`
Constant(String),
/// A key=value partition, like `domain=gouletpens.com`
Value { key: String, value: String },
}
impl Display for PathSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PathSegment::Constant(x) => write!(f, "{x}"),
PathSegment::Value { key, value } => write!(f, "{key}={value}"),
}
}
}
impl FromStr for PathSegment {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains("\n") {
return Err(());
}
if s.is_empty() {
return Err(());
}
return Ok(if s.contains("=") {
let mut s = s.split("=");
let key = s.next().ok_or(())?.to_owned();
let value = s.join("=");
Self::Value { key, value }
} else {
Self::Constant(s.to_owned())
});
}
}
+390
View File
@@ -0,0 +1,390 @@
use itertools::Itertools;
use smartstring::{LazyCompact, SmartString};
use std::{collections::HashMap, sync::Arc};
use crate::Rule;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum SchemaSegments {
/// A const path segment like `web`
Const(SmartString<LazyCompact>),
/// A prefix path segment like `domain=`,
/// without the equals sign
Prefix(SmartString<LazyCompact>),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct Schema {
segments: Vec<SchemaSegments>,
}
impl Schema {
pub fn from_path(path: &str, max_len: usize) -> Self {
let segments = path
.split('/')
.take(max_len)
.map(|x| {
if x == "*" || x == "**" {
return None;
}
let is_prefix = x.contains('=');
Some(match is_prefix {
true => SchemaSegments::Prefix(
x.split('=')
.next()
.expect("split must return at least one item")
.into(),
),
false => SchemaSegments::Const(x.into()),
})
})
.while_some()
.collect::<Vec<_>>();
Self { segments }
}
pub fn exemplar(&self) -> String {
self.segments
.iter()
.map(|x| match x {
SchemaSegments::Const(x) => x.to_string(),
SchemaSegments::Prefix(x) => format!("{x}=*"),
})
.join("/")
}
}
/// An in-memory cache of s3 paths.
#[derive(Debug)]
pub struct DatapathVecIndex {
/// Array of (schema, paths with that schema)
/// - all paths belong to exactly one schema
/// - we use the fact that order in both vecs is constant
paths: Vec<(Schema, Vec<Arc<String>>)>,
len: usize,
schema_len: usize,
}
impl DatapathVecIndex {
pub fn new_empty(schema_len: usize) -> Self {
Self {
paths: Vec::new(),
len: 0,
schema_len,
}
}
pub fn schema_len(&self) -> usize {
self.schema_len
}
pub fn new<S: Into<String>, I: Iterator<Item = S>>(schema_len: usize, paths: I) -> Self {
let mut len = 0;
let mut map: HashMap<Schema, Vec<Arc<String>>> = HashMap::new();
for p in paths {
let p = Arc::new(p.into());
let schema = Schema::from_path(&p, schema_len);
map.entry(schema).or_default().push(p);
len += 1;
}
Self {
schema_len,
len,
paths: map.into_iter().collect(),
}
}
#[cfg(feature = "tokio")]
pub async fn async_new<S: Into<String>>(
schema_len: usize,
mut stream: tokio::sync::mpsc::Receiver<S>,
) -> Self {
let mut len = 0;
let mut map: HashMap<Schema, Vec<Arc<String>>> = HashMap::new();
while let Some(p) = stream.recv().await {
let p = Arc::new(p.into());
let schema = Schema::from_path(&p, schema_len);
map.entry(schema).or_default().push(p);
len += 1;
}
Self {
schema_len,
len,
paths: map.into_iter().collect(),
}
}
#[inline(always)]
pub fn len(&self) -> usize {
self.len
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Given a datapath (that may contain wildcards) as a query,
/// return all known datapaths that match it.
///
/// Returns an empty iterator if no paths match.
/// Returns `None` if the query was invalid.
pub fn query(&self, query: impl Into<String>) -> Option<impl Iterator<Item = String> + '_> {
let query: String = query.into();
let regex = Rule::new(query.clone())?;
// This is not a bug, we want all segments from query.
let query_schema = Schema::from_path(&query, query.len()).exemplar();
Some(
self.paths
.iter()
.filter(move |(schema, _)| {
let schema = schema.exemplar();
schema.starts_with(&query_schema) || query_schema.starts_with(&schema)
})
.flat_map(|(_, paths)| paths.iter())
.filter(move |path| regex.is_match(path.as_str()))
.map(|arc_str| arc_str.as_ref().clone()),
)
}
/// Like [Self::query], but with a precompiled rule
pub fn query_rule<'a>(&'a self, rule: &'a Rule) -> impl Iterator<Item = String> + 'a {
let query = rule.pattern();
let query_schema = Schema::from_path(&query, query.len()).exemplar();
self.paths
.iter()
.filter(move |(schema, _)| {
let schema = schema.exemplar();
schema.starts_with(&query_schema) || query_schema.starts_with(&schema)
})
.flat_map(|(_, paths)| paths.iter())
.filter(move |path| rule.is_match(path.as_str()))
.map(|arc_str| arc_str.as_ref().clone())
}
/// Like [Self::query], but returns `true` if any paths match
pub fn query_match(&self, query: impl Into<String>) -> Option<bool> {
let query: String = query.into();
let regex = Rule::new(query.clone())?;
let query_schema = Schema::from_path(&query, query.len()).exemplar();
Some(
self.paths
.iter()
.filter(move |(schema, _)| {
let schema = schema.exemplar();
schema.starts_with(&query_schema) || query_schema.starts_with(&schema)
})
.flat_map(|(_, paths)| paths.iter())
.any(|path| regex.is_match(path.as_str())),
)
}
/// Like [Self::query_match], but with a precompiled rule
pub fn query_rule_match<'a>(&'a self, rule: &'a Rule) -> bool {
let query = rule.pattern();
let query_schema = Schema::from_path(&query, query.len()).exemplar();
self.paths
.iter()
.filter(move |(schema, _)| {
let schema = schema.exemplar();
schema.starts_with(&query_schema) || query_schema.starts_with(&schema)
})
.flat_map(|(_, paths)| paths.iter())
.any(|path| rule.is_match(path.as_str()))
}
}
// MARK: index tests
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod index_tests {
use super::*;
#[test]
fn datapath_index_empty() {
let idx = DatapathVecIndex::new(3, std::iter::empty::<String>());
let query = "web/domain=example.com";
assert_eq!(idx.query(query).unwrap().count(), 0);
assert!(idx.is_empty());
assert_eq!(idx.len(), 0);
}
#[test]
fn insert_and_lookup_exact_match() {
let paths = vec!["web/domain=example.com/ts=1234"];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Exact match
let results: Vec<_> = idx
.query("web/domain=example.com/ts=1234")
.unwrap()
.collect();
assert_eq!(results.len(), 1);
assert_eq!(results[0], "web/domain=example.com/ts=1234");
// No match
let results: Vec<_> = idx.query("web/domain=other.com/ts=1234").unwrap().collect();
assert_eq!(results.len(), 0);
assert_eq!(idx.len(), 1);
}
#[test]
fn wildcard_constant_match() {
let paths = vec![
"web/domain=example.com/ts=1234",
"api/domain=example.com/ts=1234",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Wildcard first segment
let results: Vec<_> = idx.query("*/domain=example.com/ts=1234").unwrap().collect();
assert_eq!(results.len(), 2);
assert_eq!(idx.len(), 2);
}
#[test]
fn wildcard_value_match() {
let paths = vec![
"web/domain=example.com/ts=1234",
"web/domain=other.com/ts=1234",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Wildcard domain
let results: Vec<_> = idx.query("web/domain=*/ts=1234").unwrap().collect();
assert_eq!(results.len(), 2);
}
#[test]
fn multiple_datapaths() {
let paths = vec![
"web/domain=example.com/ts=1234",
"web/domain=other.com/ts=1234",
"api/domain=example.com/ts=5678",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Specific lookup
let results: Vec<_> = idx
.query("web/domain=example.com/ts=1234")
.unwrap()
.collect();
assert_eq!(results.len(), 1);
assert_eq!(results[0], "web/domain=example.com/ts=1234");
// Wildcard time lookup
let results: Vec<_> = idx.query("web/domain=example.com/ts=*").unwrap().collect();
assert_eq!(results.len(), 1);
assert_eq!(results[0], "web/domain=example.com/ts=1234");
// Double wildcard lookup
let results: Vec<_> = idx.query("web/domain=*/ts=*").unwrap().collect();
assert_eq!(results.len(), 2);
assert_eq!(idx.len(), 3);
}
#[test]
fn nested_wildcards() {
let paths = vec![
"web/domain=example.com/ts=1234/crawl/2.5",
"web/domain=other.com/ts=5678/crawl/2.5",
"web/domain=example.com/ts=9999/crawl/3.0",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Multiple wildcards in path
let results: Vec<_> = idx.query("web/domain=*/ts=*/crawl/*").unwrap().collect();
assert_eq!(results.len(), 3);
// Selective wildcards
let results: Vec<_> = idx
.query("web/domain=example.com/ts=*/crawl/*")
.unwrap()
.collect();
assert_eq!(results.len(), 2);
}
#[test]
fn partial_path_query() {
let paths = vec!["web/domain=example.com/ts=1234/crawl/2.5"];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Query with fewer segments than the stored path
let results: Vec<_> = idx.query("web/domain=example.com").unwrap().collect();
assert_eq!(results.len(), 0);
}
#[test]
fn longer_path_query() {
let paths = vec!["web/domain=example.com"];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Query with more segments than the stored path
let results: Vec<_> = idx
.query("web/domain=example.com/ts=1234/crawl/2.5")
.unwrap()
.collect();
assert_eq!(results.len(), 0);
}
#[test]
fn query_match() {
let paths = vec![
"web/domain=example.com/ts=1234",
"web/domain=other.com/ts=5678",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Match exists
assert_eq!(
idx.query_match("web/domain=example.com/ts=1234").unwrap(),
true
);
assert_eq!(idx.query_match("web/domain=*/ts=*").unwrap(), true);
// No match
assert_eq!(
idx.query_match("api/domain=example.com/ts=1234").unwrap(),
false
);
assert_eq!(
idx.query_match("web/domain=missing.com/ts=9999").unwrap(),
false
);
}
#[test]
fn suffix_wildcard() {
let paths = vec![
"web/domain=example.com/ts=1234/file1.json",
"web/domain=example.com/ts=1234/file2.json",
"web/domain=example.com/ts=5678/file3.json",
];
let idx = DatapathVecIndex::new(3, paths.into_iter());
// Query with suffix wildcard
let results: Vec<_> = idx.query("web/domain=example.com/**").unwrap().collect();
assert_eq!(results.len(), 3);
let results: Vec<_> = idx
.query("web/domain=example.com/ts=1234/**")
.unwrap()
.collect();
assert_eq!(results.len(), 2);
}
}