diff --git a/crates/datapath/src/arcsubstr.rs b/crates/datapath/src/arcsubstr.rs index 5266d6b..c5be98c 100644 --- a/crates/datapath/src/arcsubstr.rs +++ b/crates/datapath/src/arcsubstr.rs @@ -1,3 +1,5 @@ +use std::borrow::Borrow; +use std::ops::Deref; use std::sync::Arc; // @@ -5,6 +7,7 @@ use std::sync::Arc; // /// A reference to a substring of an [Arc] +#[derive(Copy, Clone)] pub struct ArcSubstr<'a> { pub string: &'a Arc, pub start: usize, @@ -23,6 +26,14 @@ impl<'a> ArcSubstr<'a> { string, } } + + pub fn to_owned(&self) -> ArcSubstring { + ArcSubstring { + string: self.string.clone(), + start: self.start, + end: self.end, + } + } } impl PartialEq for ArcSubstr<'_> { @@ -63,11 +74,50 @@ impl std::fmt::Display for ArcSubstr<'_> { } } +impl Deref for ArcSubstr<'_> { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_str() + } +} + +impl AsRef for ArcSubstr<'_> { + fn as_ref(&self) -> &str { + self.as_str() + } +} + +impl Borrow for ArcSubstr<'_> { + fn borrow(&self) -> &str { + self.as_str() + } +} + +impl PartialEq for ArcSubstr<'_> { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl PartialEq<&str> for ArcSubstr<'_> { + fn eq(&self, other: &&str) -> bool { + self.as_str() == *other + } +} + +impl PartialEq for ArcSubstr<'_> { + fn eq(&self, other: &String) -> bool { + self.as_str() == other.as_str() + } +} + // // MARK: string // /// An owned [ArcSubstr] +#[derive(Clone)] pub struct ArcSubstring { pub string: Arc, pub start: usize, @@ -125,3 +175,65 @@ impl std::fmt::Display for ArcSubstring { self.as_str().fmt(f) } } + +impl Deref for ArcSubstring { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_str() + } +} + +impl AsRef for ArcSubstring { + fn as_ref(&self) -> &str { + self.as_str() + } +} + +impl Borrow for ArcSubstring { + fn borrow(&self) -> &str { + self.as_str() + } +} + +impl PartialEq for ArcSubstring { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl PartialEq<&str> for ArcSubstring { + fn eq(&self, other: &&str) -> bool { + self.as_str() == *other + } +} + +impl PartialEq for ArcSubstring { + fn eq(&self, other: &String) -> bool { + self.as_str() == other.as_str() + } +} + +impl From for ArcSubstring { + fn from(s: String) -> Self { + Self::from_string(Arc::new(s)) + } +} + +impl From> for ArcSubstring { + fn from(s: Arc) -> Self { + Self::from_string(s) + } +} + +impl<'a> From<&'a ArcSubstr<'a>> for ArcSubstring { + fn from(s: &'a ArcSubstr<'a>) -> Self { + s.to_owned() + } +} + +impl<'a> From> for ArcSubstring { + fn from(s: ArcSubstr<'a>) -> Self { + s.to_owned() + } +} diff --git a/crates/datapath/src/index/mod.rs b/crates/datapath/src/index/mod.rs index d6ae56d..e3faf5c 100644 --- a/crates/datapath/src/index/mod.rs +++ b/crates/datapath/src/index/mod.rs @@ -1,5 +1,10 @@ use itertools::Itertools; -use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Display, + str::FromStr, + sync::Arc, +}; use tracing::trace; use trie_rs::map::{Trie, TrieBuilder}; @@ -55,6 +60,7 @@ impl FromStr for PathSegment { #[derive(Debug)] pub struct DatapathIndex { patterns: Trie>>, + paths: HashSet>, len: usize, } @@ -90,16 +96,33 @@ impl DatapathIndex { Self { patterns: TrieBuilder::new().build(), len: 0, + paths: HashSet::new(), } } - pub fn new, I: Iterator>(paths: I) -> Self { + pub fn new, I: Iterator>(sources: I, old: Option<&Self>) -> Self { let mut len = 0; let mut patterns = HashMap::new(); + let mut paths = HashSet::new(); - for s in paths { + for s in sources { let s: String = s.into(); - let s = Arc::new(s); + + let s = { + let mut s = Arc::new(s); + + // Reuse existing Arc, if they are available. + // This GREATLY reduces memory usage when updating a dpi + // while keeping an older "snapshot" around. + if let Some(o) = old { + if let Some(existing) = o.paths.get(&s) { + s = existing.clone(); + } + } + + paths.insert(s.clone()); + s + }; let mut segments = Vec::new(); for seg in s.split('/') { @@ -128,17 +151,37 @@ impl DatapathIndex { Self { len, patterns: builder.build(), + paths, } } #[cfg(feature = "tokio")] - pub async fn async_new>(mut paths: tokio::sync::mpsc::Receiver) -> Self { + pub async fn async_new>( + mut sources: tokio::sync::mpsc::Receiver, + old: Option<&Self>, + ) -> Self { let mut len = 0; let mut patterns = HashMap::new(); + let mut paths = HashSet::new(); - while let Some(s) = paths.recv().await { + while let Some(s) = sources.recv().await { let s: String = s.into(); - let s = Arc::new(s); + + let s = { + let mut s = Arc::new(s); + + // Reuse existing Arc, if they are available. + // This GREATLY reduces memory usage when updating a dpi + // while keeping an older "snapshot" around. + if let Some(o) = old { + if let Some(existing) = o.paths.get(&s) { + s = existing.clone(); + } + } + + paths.insert(s.clone()); + s + }; let mut segments = Vec::new(); for seg in s.split('/') { @@ -167,6 +210,7 @@ impl DatapathIndex { Self { len, patterns: builder.build(), + paths, } } @@ -260,7 +304,7 @@ mod index_tests { #[test] fn datapath_index_empty() { - let idx = DatapathIndex::new(std::iter::empty::()); + let idx = DatapathIndex::new(std::iter::empty::(), None); let query = "web/domain=example.com"; assert_eq!(idx.query(query).unwrap().count(), 0); assert!(idx.is_empty()); @@ -270,7 +314,7 @@ mod index_tests { #[test] fn insert_and_lookup_exact_match() { let paths = vec!["web/domain=example.com/ts=1234"]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Exact match let results: Vec<_> = idx @@ -293,7 +337,7 @@ mod index_tests { "web/domain=example.com/ts=1234", "api/domain=example.com/ts=1234", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Wildcard first segment let results: Vec<_> = idx.query("*/domain=example.com/ts=1234").unwrap().collect(); @@ -308,7 +352,7 @@ mod index_tests { "web/domain=example.com/ts=1234", "web/domain=other.com/ts=1234", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Wildcard domain let results: Vec<_> = idx.query("web/domain=*/ts=1234").unwrap().collect(); @@ -322,7 +366,7 @@ mod index_tests { "web/domain=other.com/ts=1234", "api/domain=example.com/ts=5678", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Specific lookup let results: Vec<_> = idx @@ -351,7 +395,7 @@ mod index_tests { "web/domain=other.com/ts=5678/crawl/2.5", "web/domain=example.com/ts=9999/crawl/3.0", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Multiple wildcards in path let results: Vec<_> = idx.query("web/domain=*/ts=*/crawl/*").unwrap().collect(); @@ -368,7 +412,7 @@ mod index_tests { #[test] fn partial_path_query() { let paths = vec!["web/domain=example.com/ts=1234/crawl/2.5"]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Query with fewer segments than the stored path let results: Vec<_> = idx.query("web/domain=example.com").unwrap().collect(); @@ -378,7 +422,7 @@ mod index_tests { #[test] fn longer_path_query() { let paths = vec!["web/domain=example.com"]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Query with more segments than the stored path let results: Vec<_> = idx @@ -394,7 +438,7 @@ mod index_tests { "web/domain=example.com/ts=1234", "web/domain=other.com/ts=5678", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Match exists assert_eq!( @@ -421,7 +465,7 @@ mod index_tests { "web/domain=example.com/ts=1234/file2.json", "web/domain=example.com/ts=5678/file3.json", ]; - let idx = DatapathIndex::new(paths.into_iter()); + let idx = DatapathIndex::new(paths.into_iter(), None); // Query with suffix wildcard let results: Vec<_> = idx.query("web/domain=example.com/**").unwrap().collect();