diff --git a/Cargo.lock b/Cargo.lock index cb363dc..909351a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2496,7 +2496,6 @@ dependencies = [ "serde", "serde_json", "tokio", - "tokio-stream", "toml", "tracing", "tracing-indicatif", @@ -2578,7 +2577,6 @@ dependencies = [ "serde_json", "smartstring", "tokio", - "tokio-stream", "toml", "tracing", "walkdir", diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index 678de58..2e0a9e4 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -1,12 +1,13 @@ use serde::Deserialize; use std::{collections::HashMap, fmt::Debug, path::PathBuf}; +use crate::{objectpath::ObjectPath, pattern::GroupPattern}; + mod misc; pub use misc::*; -use crate::objectpath::ObjectPath; - pub mod objectpath; +pub mod pattern; pub static INIT_DB_TOML: &str = include_str!("./config.toml"); @@ -59,13 +60,9 @@ pub enum Source { /// Must be relative. path: PathBuf, - /// If true, all toml files are ignored. - /// Metadata can be added to any file using a {filename}.toml. - /// - /// If false, toml files are treated as regular files - /// and sidecar metadata is disabled. - #[serde(default = "default_true")] - sidecars: bool, + /// How to group files into items in this source + #[serde(default)] + pattern: GroupPattern, }, /// An S3-compatible object store bucket @@ -84,9 +81,9 @@ pub enum Source { credentials: S3Credentials, - /// If true, all .toml objects are treated as sidecar metadata files. - #[serde(default = "default_true")] - sidecars: bool, + /// How to group files into items in this source + #[serde(default)] + pattern: GroupPattern, }, } diff --git a/crates/pile-config/src/objectpath/mod.rs b/crates/pile-config/src/objectpath/mod.rs index 668518a..f812896 100644 --- a/crates/pile-config/src/objectpath/mod.rs +++ b/crates/pile-config/src/objectpath/mod.rs @@ -58,7 +58,7 @@ pub enum PathSegment { /// - `$` refers to the root object /// - `.` selects aPathSegment::Field of an object /// - `[n]` selects an item of an array -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ObjectPath { pub segments: Vec, } diff --git a/crates/pile-config/src/pattern/mod.rs b/crates/pile-config/src/pattern/mod.rs new file mode 100644 index 0000000..e618165 --- /dev/null +++ b/crates/pile-config/src/pattern/mod.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Deserializer, de}; +use smartstring::{LazyCompact, SmartString}; +use thiserror::Error; + +use crate::{Label, objectpath::PathParseError as ObjectPathError}; + +mod parser; +pub use parser::GroupSegment; + +#[derive(Debug, Error, PartialEq)] +pub enum GroupPatternParseError { + /// A `{` or `}` appeared in an invalid position, or a `{` was never closed. + #[error("syntax error at index {position}")] + Syntax { position: usize }, + + /// The contents of a `{...}` block could not be parsed as an object path. + #[error("invalid object path {path:?}: {source}")] + InvalidObjectPath { + start: usize, + end: usize, + path: SmartString, + source: ObjectPathError, + }, +} + +#[derive(Debug, Clone, Default)] +pub struct GroupPattern { + pub pattern: HashMap>, +} + +impl<'de> Deserialize<'de> for GroupPattern { + fn deserialize>(deserializer: D) -> Result { + let raw = HashMap::::deserialize(deserializer)?; + let mut parts = HashMap::with_capacity(raw.len()); + for (key, value) in raw { + let label = Label::try_from(key.as_str()).map_err(de::Error::custom)?; + let segments = parser::Parser::new() + .parse(&value) + .map_err(de::Error::custom)? + .into_iter() + .map(|(_, seg)| seg) + .collect(); + parts.insert(label, segments); + } + Ok(GroupPattern { pattern: parts }) + } +} diff --git a/crates/pile-config/src/pattern/parser.rs b/crates/pile-config/src/pattern/parser.rs new file mode 100644 index 0000000..eefddf7 --- /dev/null +++ b/crates/pile-config/src/pattern/parser.rs @@ -0,0 +1,195 @@ +use smartstring::{LazyCompact, SmartString}; + +use crate::{objectpath::ObjectPath, pattern::GroupPatternParseError}; + +#[cfg_attr(test, derive(PartialEq))] +#[derive(Debug, Clone)] +pub enum GroupSegment { + Path(ObjectPath), + Literal(SmartString), +} + +pub struct Parser {} + +impl Parser { + pub fn new() -> Self { + Self {} + } + + /// Parse a pattern string of the form `{path}.literal{path}...`. + /// + /// - `{...}` delimiters are parsed as [`ObjectPath`] expressions. + /// Nested `{}` inside a path are allowed; depth is tracked to find the + /// matching closing brace. + /// - Everything outside `{...}` is a `Literal` segment. + /// - A bare `}` in literal position (depth == 0) is a syntax error. + /// - An unclosed `{` is a syntax error. + pub fn parse(self, source: &str) -> Result, GroupPatternParseError> { + let mut tokens = Vec::new(); + + // `depth` > 0 means we are currently inside a `{...}` path expression. + let mut depth: usize = 0; + // Start of the current segment (literal text or path content). + let mut window_start: usize = 0; + // Source position of the opening `{` for the current path (used for error reporting). + let mut open_brace: usize = 0; + + for (i, c) in source.char_indices() { + match c { + '{' => { + if depth == 0 { + // Emit any accumulated literal. + if i > window_start { + tokens.push(( + window_start, + GroupSegment::Literal(source[window_start..i].into()), + )); + } + open_brace = i; + // Path content starts after the opening brace. + window_start = i + 1; + depth = 1; + } else { + // Nested brace inside a path — keep counting. + depth += 1; + } + } + + '}' => { + if depth == 0 { + // Unmatched `}` outside any path. + return Err(GroupPatternParseError::Syntax { position: i }); + } + depth -= 1; + if depth == 0 { + // Closing brace of the outermost path expression — parse as ObjectPath. + let path_str = &source[window_start..i]; + let path = path_str.parse::().map_err(|e| { + GroupPatternParseError::InvalidObjectPath { + start: open_brace, + end: i + 1, + path: path_str.into(), + source: e, + } + })?; + tokens.push((open_brace, GroupSegment::Path(path))); + // Literal content (if any) starts after this `}`. + window_start = i + 1; + } + } + + _ => {} + } + } + + // Unclosed `{`. + if depth > 0 { + return Err(GroupPatternParseError::Syntax { + position: open_brace, + }); + } + + // Emit any trailing literal. + if window_start < source.len() { + tokens.push(( + window_start, + GroupSegment::Literal(source[window_start..].into()), + )); + } + + Ok(tokens) + } +} + +// +// MARK: tests +// + +#[expect(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use super::*; + + fn parse(source: &str) -> Result, GroupPatternParseError> { + Parser::new().parse(source) + } + + fn path(s: &str) -> GroupSegment { + GroupSegment::Path(s.parse().unwrap()) + } + + fn lit(s: &str) -> GroupSegment { + GroupSegment::Literal(s.into()) + } + + #[test] + fn single_path() { + assert_eq!(parse("{$.foo}").unwrap(), vec![(0, path("$.foo"))]); + } + + #[test] + fn single_literal() { + assert_eq!(parse("hello").unwrap(), vec![(0, lit("hello"))]); + } + + #[test] + fn path_then_literal() { + assert_eq!( + parse("{$.foo}.txt").unwrap(), + vec![(0, path("$.foo")), (7, lit(".txt"))] + ); + } + + #[test] + fn literal_then_path() { + assert_eq!( + parse("prefix/{$.foo}").unwrap(), + vec![(0, lit("prefix/")), (7, path("$.foo"))] + ); + } + + #[test] + fn interleaved() { + assert_eq!( + parse("{$.a}.sep.{$.b}").unwrap(), + vec![(0, path("$.a")), (5, lit(".sep.")), (10, path("$.b")),] + ); + } + + #[test] + fn unmatched_open_brace_error() { + assert_eq!( + parse("{$.foo"), + Err(GroupPatternParseError::Syntax { position: 0 }) + ); + } + + #[test] + fn unmatched_close_brace_in_literal_error() { + assert_eq!( + parse("foo}bar"), + Err(GroupPatternParseError::Syntax { position: 3 }) + ); + } + + #[test] + fn invalid_path_error() { + assert_eq!( + parse("{not-a-path}"), + Err(GroupPatternParseError::InvalidObjectPath { + start: 0, + end: 12, + path: "not-a-path".into(), + source: crate::objectpath::PathParseError::MustStartWithRoot { position: 0 }, + }) + ); + } + + #[test] + fn literal_between_paths() { + assert_eq!( + parse("foo{$.x}bar").unwrap(), + vec![(0, lit("foo")), (3, path("$.x")), (8, lit("bar")),] + ); + } +} diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 616f775..b149e09 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -11,7 +11,6 @@ use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::I use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; use tokio::task::JoinSet; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::{debug, info, trace, warn}; use crate::index::{DbFtsIndex, FtsLookupResult}; @@ -46,10 +45,10 @@ impl Dataset { } } - pub fn iter(&self) -> ReceiverStream> { + pub fn iter(&self) -> Box + Send + '_> { match self { - Self::Dir(ds) => ds.iter(), - Self::S3(ds) => ds.iter(), + Self::Dir(ds) => Box::new(ds.iter()), + Self::S3(ds) => Box::new(ds.iter()), } } @@ -76,7 +75,7 @@ pub struct Datasets { } impl Datasets { - pub fn open(config: impl Into) -> Result { + pub async fn open(config: impl Into) -> Result { let path_config = config.into(); let path_parent = path_config .parent() @@ -118,7 +117,7 @@ impl Datasets { Source::Filesystem { enabled, path, - sidecars, + pattern, } => { if !enabled { continue; @@ -126,11 +125,10 @@ impl Datasets { sources.insert( label.clone(), - Dataset::Dir(Arc::new(DirDataSource::new( - label, - path_parent.join(path), - *sidecars, - ))), + Dataset::Dir( + DirDataSource::new(label, path_parent.join(path), pattern.clone()) + .await?, + ), ); } @@ -141,7 +139,7 @@ impl Datasets { endpoint, region, credentials, - sidecars, + pattern, } => { if !enabled { continue; @@ -154,10 +152,12 @@ impl Datasets { endpoint.clone(), region.clone(), credentials, - *sidecars, - ) { + pattern.clone(), + ) + .await + { Ok(ds) => { - sources.insert(label.clone(), Dataset::S3(Arc::new(ds))); + sources.insert(label.clone(), Dataset::S3(ds)); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); @@ -258,17 +258,17 @@ impl Datasets { for (name, dataset) in &self.sources { info!("Loading source {name}"); - let mut stream = dataset.iter(); - while let Some(item_result) = stream.next().await { + let stream = dataset.iter(); + for item in stream { if let Some(flag) = &flag && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } - let item = item_result.map_err(DatasetError::from)?; let db = Arc::clone(&db_index); let state = state.clone(); + let item = item.clone(); join_set.spawn(async move { let key = item.key(); let result = db.entry_to_document(&state, &item).await; diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml index 4263ad7..c079484 100644 --- a/crates/pile-value/Cargo.toml +++ b/crates/pile-value/Cargo.toml @@ -26,7 +26,6 @@ pdfium-render = { workspace = true, optional = true } image = { workspace = true, optional = true } id3 = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } async-trait = { workspace = true } aws-sdk-s3 = { workspace = true } mime = { workspace = true } diff --git a/crates/pile-value/src/extract/item/group.rs b/crates/pile-value/src/extract/item/group.rs new file mode 100644 index 0000000..294c2a2 --- /dev/null +++ b/crates/pile-value/src/extract/item/group.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; + +use pile_config::Label; + +use crate::{ + extract::traits::{ExtractState, ObjectExtractor}, + value::{Item, PileValue}, +}; + +pub struct GroupExtractor { + item: Item, +} + +impl GroupExtractor { + pub fn new(item: &Item) -> Self { + Self { item: item.clone() } + } +} + +#[async_trait::async_trait] +impl ObjectExtractor for GroupExtractor { + async fn field( + &self, + _state: &ExtractState, + name: &Label, + args: Option<&str>, + ) -> Result, std::io::Error> { + if args.is_some() { + return Ok(None); + } + Ok(self + .item + .group() + .get(name) + .map(|item| PileValue::ObjectExtractor(Arc::new(super::ItemExtractor::new(item))))) + } + + async fn fields(&self) -> Result, std::io::Error> { + Ok(self.item.group().keys().cloned().collect()) + } + + async fn to_json(&self, _state: &ExtractState) -> Result { + Ok(serde_json::Value::Object( + self.item + .group() + .iter() + .map(|(k, v)| { + ( + k.to_string(), + serde_json::Value::String(format!("", v.key())), + ) + }) + .collect(), + )) + } +} diff --git a/crates/pile-value/src/extract/item/mod.rs b/crates/pile-value/src/extract/item/mod.rs index ffb2be3..907d9d3 100644 --- a/crates/pile-value/src/extract/item/mod.rs +++ b/crates/pile-value/src/extract/item/mod.rs @@ -25,8 +25,8 @@ mod toml; use pile_config::Label; pub use toml::*; -mod sidecar; -pub use sidecar::*; +mod group; +pub use group::*; use crate::{ extract::{ @@ -78,8 +78,8 @@ impl ItemExtractor { PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))), ), ( - Label::new("sidecar").unwrap(), - PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))), + Label::new("groups").unwrap(), + PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))), ), ]), }; @@ -109,7 +109,8 @@ impl ObjectExtractor for ItemExtractor { Label::new("exif").unwrap(), Label::new("pdf").unwrap(), Label::new("json").unwrap(), - Label::new("sidecar").unwrap(), + Label::new("toml").unwrap(), + Label::new("groups").unwrap(), ]); } } diff --git a/crates/pile-value/src/extract/item/sidecar.rs b/crates/pile-value/src/extract/item/sidecar.rs deleted file mode 100644 index cdac3b2..0000000 --- a/crates/pile-value/src/extract/item/sidecar.rs +++ /dev/null @@ -1,57 +0,0 @@ -use pile_config::Label; -use std::sync::OnceLock; -use tracing::trace; - -use super::TomlExtractor; -use crate::{ - extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, -}; - -pub struct SidecarExtractor { - item: Item, - output: OnceLock>, -} - -impl SidecarExtractor { - pub fn new(item: &Item) -> Self { - Self { - item: item.clone(), - output: OnceLock::new(), - } - } -} - -#[async_trait::async_trait] -impl ObjectExtractor for SidecarExtractor { - async fn field( - &self, - state: &ExtractState, - name: &Label, - args: Option<&str>, - ) -> Result, std::io::Error> { - trace!( - ?args, - key = self.item.key().as_str(), - "Getting field {name:?} from SidecarExtractor", - ); - - match self - .output - .get_or_init(|| self.item.sidecar().map(TomlExtractor::new)) - { - Some(x) => Ok(x.field(state, name, args).await?), - None => Ok(Some(PileValue::Null)), - } - } - - async fn fields(&self) -> Result, std::io::Error> { - match self - .output - .get_or_init(|| self.item.sidecar().map(TomlExtractor::new)) - { - Some(x) => Ok(x.fields().await?), - None => Ok(Vec::new()), - } - } -} diff --git a/crates/pile-value/src/source/dir.rs b/crates/pile-value/src/source/dir.rs index fde1156..06e8505 100644 --- a/crates/pile-value/src/source/dir.rs +++ b/crates/pile-value/src/source/dir.rs @@ -1,143 +1,176 @@ use chrono::{DateTime, Utc}; -use pile_config::Label; -use std::{path::PathBuf, sync::Arc}; -use tokio_stream::wrappers::ReceiverStream; +use pile_config::{ + Label, + pattern::{GroupPattern, GroupSegment}, +}; +use smartstring::{LazyCompact, SmartString}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::{Arc, OnceLock}, +}; use walkdir::WalkDir; use crate::{ + extract::traits::ExtractState, source::{DataSource, misc::path_ts_latest}, - value::Item, + value::{Item, PileValue}, }; #[derive(Debug)] pub struct DirDataSource { pub name: Label, pub dir: PathBuf, - - pub sidecars: bool, + pub pattern: GroupPattern, + pub index: OnceLock, Item>>, } impl DirDataSource { - pub fn new(name: &Label, dir: PathBuf, sidecars: bool) -> Self { - Self { + pub async fn new( + name: &Label, + dir: PathBuf, + pattern: GroupPattern, + ) -> Result, std::io::Error> { + let source = Arc::new(Self { name: name.clone(), dir, - sidecars, + pattern, + index: OnceLock::new(), + }); + + // + // MARK: list paths + // + + let mut paths_items = HashSet::new(); + let mut paths_grouped_items = HashSet::new(); + 'entry: for entry in WalkDir::new(&source.dir) { + let entry = match entry { + Err(e) => { + let msg = format!("walkdir error: {e:?}"); + let err = e.into_io_error().unwrap_or(std::io::Error::other(msg)); + return Err(err); + } + Ok(e) => e, + }; + + if entry.file_type().is_dir() { + continue; + } + + let path = entry.into_path(); + let path_str = match path.to_str() { + Some(x) => x, + None => continue 'entry, + }; + + let groups = resolve_groups(&source.pattern, path_str).await; + paths_grouped_items.extend(groups.into_values()); + paths_items.insert(path); } + + // + // MARK: resolve groups + // + + let mut index = HashMap::new(); + 'entry: for path in paths_items.difference(&paths_grouped_items) { + let path_str = match path.to_str() { + Some(x) => x, + None => continue 'entry, + }; + + let group = resolve_groups(&source.pattern, path_str).await; + let group = group + .into_iter() + .map(|(k, group_path)| { + ( + k, + Box::new(Item::File { + source: Arc::clone(&source), + mime: mime_guess::from_path(&group_path).first_or_octet_stream(), + path: group_path.clone(), + group: Arc::new(HashMap::new()), + }), + ) + }) + .collect::>(); + + let item = Item::File { + source: Arc::clone(&source), + mime: mime_guess::from_path(path).first_or_octet_stream(), + path: path.into(), + group: Arc::new(group), + }; + + index.insert(item.key(), item); + } + + source.index.get_or_init(|| index); + Ok(source) } } impl DataSource for Arc { + #[expect(clippy::expect_used)] async fn get(&self, key: &str) -> Result, std::io::Error> { - let key = match key.parse::() { - Ok(x) => self.dir.join(x), - Err(_) => return Ok(None), - }; - - if !key.is_file() { - return Ok(None); - } - - // Ignore toml files if sidecars are enabled - if self.sidecars && key.extension().and_then(|x| x.to_str()) == Some("toml") { - return Ok(None); - } - - return Ok(Some(Item::File { - source: Arc::clone(self), - mime: mime_guess::from_path(&key).first_or_octet_stream(), - path: key.clone(), - sidecar: self - .sidecars - .then(|| { - let sidecar_path = key.with_extension("toml"); - sidecar_path.is_file().then(|| { - Box::new(Item::File { - source: Arc::clone(self), - mime: mime_guess::from_path(&sidecar_path).first_or_octet_stream(), - path: sidecar_path, - sidecar: None, - }) - }) - }) - .flatten(), - })); + return Ok(self + .index + .get() + .expect("index should be initialized") + .get(key) + .cloned()); } - fn iter(&self) -> ReceiverStream> { - let (tx, rx) = tokio::sync::mpsc::channel(64); - let source = Arc::clone(self); - - let dir = self.dir.clone(); - tokio::task::spawn_blocking(move || { - for entry in WalkDir::new(dir) { - let entry = match entry { - Err(e) => { - let msg = format!("walkdir error: {e:?}"); - let err = e.into_io_error().unwrap_or(std::io::Error::other(msg)); - if tx.blocking_send(Err(err)).is_err() { - return; - } - continue; - } - Ok(e) => e, - }; - - if entry.file_type().is_dir() { - continue; - } - - let path = entry.into_path(); - - let item = match path.extension().and_then(|x| x.to_str()) { - None => continue, - Some("toml") if source.sidecars => continue, - Some(_) => Item::File { - source: Arc::clone(&source), - mime: mime_guess::from_path(&path).first_or_octet_stream(), - path: path.clone(), - - sidecar: source - .sidecars - .then(|| { - let sidecar_path = path.with_extension("toml"); - sidecar_path.is_file().then(|| { - Box::new(Item::File { - source: Arc::clone(&source), - mime: mime_guess::from_path(&sidecar_path) - .first_or_octet_stream(), - path: sidecar_path, - sidecar: None, - }) - }) - }) - .flatten(), - }, - }; - - if tx.blocking_send(Ok(item)).is_err() { - return; - } - } - }); - - ReceiverStream::new(rx) + #[expect(clippy::expect_used)] + fn iter(&self) -> impl Iterator { + self.index + .get() + .expect("index should be initialized") + .values() } async fn latest_change(&self) -> Result>, std::io::Error> { - let mut ts: Option> = None; - - if !self.dir.exists() { - return Ok(None); - } - - let new = path_ts_latest(&self.dir)?; - match (ts, new) { - (_, None) => {} - (None, Some(new)) => ts = Some(new), - (Some(old), Some(new)) => ts = Some(old.max(new)), - }; - - return Ok(ts); + path_ts_latest(&self.dir) } } + +async fn resolve_groups(pattern: &GroupPattern, path_str: &str) -> HashMap { + let state = ExtractState { ignore_mime: false }; + let mut group = HashMap::new(); + 'pattern: for (l, pat) in &pattern.pattern { + let item = PileValue::String(Arc::new(path_str.into())); + let mut target = String::new(); + for p in pat { + match p { + GroupSegment::Literal(x) => target.push_str(x), + GroupSegment::Path(op) => { + let res = match item.query(&state, op).await { + Ok(Some(x)) => x, + _ => continue 'pattern, + }; + + let res = match res.as_str() { + Some(x) => x, + None => continue 'pattern, + }; + + target.push_str(res); + } + } + } + + let group_path: PathBuf = match target.parse() { + Ok(x) => x, + Err(_) => continue 'pattern, + }; + + if !group_path.exists() { + continue; + } + + group.insert(l.clone(), group_path); + } + + return group; +} diff --git a/crates/pile-value/src/source/mod.rs b/crates/pile-value/src/source/mod.rs index bf89bfd..86894d3 100644 --- a/crates/pile-value/src/source/mod.rs +++ b/crates/pile-value/src/source/mod.rs @@ -6,9 +6,6 @@ pub use s3::*; pub mod misc; -use chrono::{DateTime, Utc}; -use tokio_stream::wrappers::ReceiverStream; - /// A read-only set of [Item]s. pub trait DataSource { /// Get an item from this datasource @@ -18,10 +15,10 @@ pub trait DataSource { ) -> impl Future, std::io::Error>> + Send; /// Iterate over all items in this source in an arbitrary order - fn iter(&self) -> ReceiverStream>; + fn iter(&self) -> impl Iterator; /// Return the time of the latest change to the data in this source fn latest_change( &self, - ) -> impl Future>, std::io::Error>> + Send; + ) -> impl Future>, std::io::Error>> + Send; } diff --git a/crates/pile-value/src/source/s3.rs b/crates/pile-value/src/source/s3.rs index b4eaf55..613b77f 100644 --- a/crates/pile-value/src/source/s3.rs +++ b/crates/pile-value/src/source/s3.rs @@ -1,31 +1,41 @@ use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use chrono::{DateTime, Utc}; -use pile_config::{Label, S3Credentials}; +use pile_config::{ + Label, S3Credentials, + pattern::{GroupPattern, GroupSegment}, +}; use smartstring::{LazyCompact, SmartString}; -use std::sync::Arc; -use tokio_stream::wrappers::ReceiverStream; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, OnceLock}, +}; -use crate::{source::DataSource, value::Item}; +use crate::{ + extract::traits::ExtractState, + source::DataSource, + value::{Item, PileValue}, +}; #[derive(Debug)] pub struct S3DataSource { pub name: Label, pub bucket: SmartString, pub prefix: Option>, - pub sidecars: bool, pub client: Arc, + pub pattern: GroupPattern, + pub index: OnceLock, Item>>, } impl S3DataSource { - pub fn new( + pub async fn new( name: &Label, bucket: String, prefix: Option, endpoint: Option, region: String, credentials: &S3Credentials, - sidecars: bool, - ) -> Result { + pattern: GroupPattern, + ) -> Result, std::io::Error> { let client = { let creds = Credentials::new( &credentials.access_key_id, @@ -47,174 +57,118 @@ impl S3DataSource { aws_sdk_s3::Client::from_conf(s3_config.build()) }; - Ok(Self { + let source = Arc::new(Self { name: name.clone(), bucket: bucket.into(), prefix: prefix.map(|x| x.into()), - sidecars, client: Arc::new(client), - }) - } + pattern, + index: OnceLock::new(), + }); - async fn find_sidecar_key(&self, key: &str) -> Option> { - // First try {key}.toml - let full_toml = format!("{key}.toml"); - if self - .client - .head_object() - .bucket(self.bucket.as_str()) - .key(&full_toml) - .send() - .await - .is_ok() - { - return Some(full_toml.into()); - } + // + // MARK: list keys + // - // Then try {key-with-extension-stripped}.toml - let stripped = std::path::Path::new(key).with_extension("toml"); - if let Some(stripped_str) = stripped.to_str() - && stripped_str != full_toml.as_str() - && self + let mut all_keys: HashSet> = HashSet::new(); + let mut continuation_token: Option = None; + + loop { + let mut req = source .client - .head_object() - .bucket(self.bucket.as_str()) - .key(stripped_str) - .send() - .await - .is_ok() - { - return Some(stripped_str.into()); + .list_objects_v2() + .bucket(source.bucket.as_str()); + + if let Some(prefix) = &source.prefix { + req = req.prefix(prefix.as_str()); + } + + if let Some(token) = continuation_token { + req = req.continuation_token(token); + } + + let resp = req.send().await.map_err(std::io::Error::other)?; + + let next_token = resp.next_continuation_token().map(ToOwned::to_owned); + let is_truncated = resp.is_truncated().unwrap_or(false); + + for obj in resp.contents() { + let Some(full_key) = obj.key() else { continue }; + let key = strip_prefix(full_key, source.prefix.as_deref()); + all_keys.insert(key.into()); + } + + if !is_truncated { + break; + } + continuation_token = next_token; } - None - } + // + // MARK: resolve groups + // - async fn make_item(self: &Arc, key: impl Into>) -> Item { - let key: SmartString = key.into(); - let object_path = match &self.prefix { - Some(x) => format!("{x}/{key}").into(), - None => key.clone(), - }; + let mut keys_grouped: HashSet> = HashSet::new(); + for key in &all_keys { + let groups = resolve_groups(&source.pattern, key).await; + for group_key in groups.into_values() { + if all_keys.contains(&group_key) { + keys_grouped.insert(group_key); + } + } + } - let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream(); - - let sidecar = if self.sidecars { - self.find_sidecar_key(object_path.as_str()) - .await - .map(|sidecar_key| { - Box::new(Item::S3 { - source: Arc::clone(self), - mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(), - key: sidecar_key, - sidecar: None, - }) + let mut index = HashMap::new(); + for key in all_keys.difference(&keys_grouped) { + let groups = resolve_groups(&source.pattern, key).await; + let group = groups + .into_iter() + .filter(|(_, gk)| all_keys.contains(gk)) + .map(|(label, gk)| { + ( + label, + Box::new(Item::S3 { + source: Arc::clone(&source), + mime: mime_guess::from_path(gk.as_str()).first_or_octet_stream(), + key: gk, + group: Arc::new(HashMap::new()), + }), + ) }) - } else { - None - }; + .collect::>(); - Item::S3 { - source: Arc::clone(self), - mime, - key, - sidecar, + let item = Item::S3 { + source: Arc::clone(&source), + mime: mime_guess::from_path(key.as_str()).first_or_octet_stream(), + key: key.clone(), + group: Arc::new(group), + }; + + index.insert(item.key(), item); } + + source.index.get_or_init(|| index); + Ok(source) } } impl DataSource for Arc { + #[expect(clippy::expect_used)] async fn get(&self, key: &str) -> Result, std::io::Error> { - if self.sidecars && key.ends_with(".toml") { - return Ok(None); - } - - let key: SmartString = key.into(); - let key = match &self.prefix { - Some(x) => format!("{x}/{key}").into(), - None => key, - }; - - let result = self - .client - .head_object() - .bucket(self.bucket.as_str()) - .key(key.as_str()) - .send() - .await; - - match result { - Err(sdk_err) => { - let not_found = sdk_err - .as_service_error() - .map(|e| e.is_not_found()) - .unwrap_or(false); - if not_found { - return Ok(None); - } - Err(std::io::Error::other(sdk_err)) - } - Ok(_) => Ok(Some(self.make_item(key).await)), - } + return Ok(self + .index + .get() + .expect("index should be initialized") + .get(key) + .cloned()); } - fn iter(&self) -> ReceiverStream> { - let (tx, rx) = tokio::sync::mpsc::channel(64); - let source = Arc::clone(self); - - tokio::spawn(async move { - let mut continuation_token: Option = None; - - loop { - let mut req = source - .client - .list_objects_v2() - .bucket(source.bucket.as_str()); - - if let Some(prefix) = &source.prefix { - req = req.prefix(prefix.as_str()); - } - - if let Some(token) = continuation_token { - req = req.continuation_token(token); - } - - let resp = match req.send().await { - Err(e) => { - let _ = tx.send(Err(std::io::Error::other(e))).await; - break; - } - Ok(resp) => resp, - }; - - let next_token = resp.next_continuation_token().map(ToOwned::to_owned); - let is_truncated = resp.is_truncated().unwrap_or(false); - - for obj in resp.contents() { - let key = match obj.key() { - Some(k) => k.to_owned(), - None => continue, - }; - - if source.sidecars && key.ends_with(".toml") { - continue; - } - - let item = source.make_item(key).await; - - if tx.send(Ok(item)).await.is_err() { - return; - } - } - - if !is_truncated { - break; - } - continuation_token = next_token; - } - }); - - ReceiverStream::new(rx) + #[expect(clippy::expect_used)] + fn iter(&self) -> impl Iterator { + self.index + .get() + .expect("index should be initialized") + .values() } async fn latest_change(&self) -> Result>, std::io::Error> { @@ -264,3 +218,51 @@ impl DataSource for Arc { Ok(ts) } } + +fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str { + match prefix { + None => key, + Some(p) => { + let with_slash = if p.ends_with('/') { + key.strip_prefix(p) + } else { + key.strip_prefix(&format!("{p}/")) + }; + with_slash.unwrap_or(key) + } + } +} + +async fn resolve_groups( + pattern: &GroupPattern, + key: &str, +) -> HashMap> { + let state = ExtractState { ignore_mime: false }; + let mut group = HashMap::new(); + 'pattern: for (l, pat) in &pattern.pattern { + let item = PileValue::String(Arc::new(key.into())); + let mut target = String::new(); + for p in pat { + match p { + GroupSegment::Literal(x) => target.push_str(x), + GroupSegment::Path(op) => { + let res = match item.query(&state, op).await { + Ok(Some(x)) => x, + _ => continue 'pattern, + }; + + let res = match res.as_str() { + Some(x) => x, + None => continue 'pattern, + }; + + target.push_str(res); + } + } + } + + group.insert(l.clone(), target.into()); + } + + return group; +} diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs index d4398c3..0087e69 100644 --- a/crates/pile-value/src/value/item.rs +++ b/crates/pile-value/src/value/item.rs @@ -1,6 +1,7 @@ use mime::Mime; +use pile_config::Label; use smartstring::{LazyCompact, SmartString}; -use std::{fs::File, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; use crate::{ source::{DirDataSource, S3DataSource}, @@ -19,7 +20,7 @@ pub enum Item { mime: Mime, path: PathBuf, - sidecar: Option>, + group: Arc>>, }, S3 { @@ -27,7 +28,7 @@ pub enum Item { mime: Mime, key: SmartString, - sidecar: Option>, + group: Arc>>, }, } @@ -71,7 +72,12 @@ impl Item { #[expect(clippy::expect_used)] pub fn key(&self) -> SmartString { match self { - Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(), + Self::File { source, path, .. } => path + .strip_prefix(&source.dir) + .unwrap() + .to_str() + .expect("path is not utf-8") + .into(), Self::S3 { key, .. } => key.clone(), } } @@ -96,10 +102,10 @@ impl Item { } } - pub fn sidecar(&self) -> Option<&Self> { + pub fn group(&self) -> &HashMap> { match self { - Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x), - Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x), + Self::File { group, .. } => group, + Self::S3 { group, .. } => group, } } } diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index 10f26cc..d842fa6 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -17,7 +17,6 @@ aws-sdk-s3 = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } clap = { workspace = true } #clap_complete = { workspace = true } serde = { workspace = true } diff --git a/crates/pile/src/command/annotate.rs b/crates/pile/src/command/annotate.rs deleted file mode 100644 index 9f95fb5..0000000 --- a/crates/pile/src/command/annotate.rs +++ /dev/null @@ -1,108 +0,0 @@ -use anyhow::{Context, Result}; -use clap::Args; -use pile_config::{Label, Source}; -use pile_dataset::{Datasets, index::DbFtsIndex}; -use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use pile_value::{ - extract::traits::ExtractState, - source::{DataSource, DirDataSource}, - value::{Item, PileValue}, -}; -use std::{path::PathBuf, sync::Arc}; -use tokio_stream::StreamExt; -use tracing::{info, warn}; - -use crate::{CliCmd, GlobalContext}; - -#[derive(Debug, Args)] -pub struct AnnotateCommand { - /// The schema field to read (must be defined in pile.toml) - field: String, - - /// Sidecar path to write to (e.g. meta.title) - dest: String, - - /// Path to dataset config - #[arg(long, short = 'c', default_value = "./pile.toml")] - config: PathBuf, -} - -impl AnnotateCommand { - fn parse_dest(dest: &str) -> Result> { - dest.split('.') - .map(|s| { - Label::new(s).ok_or_else(|| anyhow::anyhow!("invalid label {s:?} in dest path")) - }) - .collect() - } -} - -impl CliCmd for AnnotateCommand { - async fn run( - self, - _ctx: GlobalContext, - _flag: CancelFlag, - ) -> Result> { - let field = Label::new(&self.field) - .ok_or_else(|| anyhow::anyhow!("invalid field name {:?}", self.field))?; - let dest_path = Self::parse_dest(&self.dest)?; - - let state = ExtractState { ignore_mime: false }; - let ds = Datasets::open(&self.config) - .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - - if !ds.config.schema.contains_key(&field) { - return Err(anyhow::anyhow!("field {:?} is not defined in schema", self.field).into()); - } - - let index = DbFtsIndex::new(&ds.path_workdir, &ds.config); - let count = 0u64; - - for (name, source) in &ds.config.dataset.source { - match source { - Source::Filesystem { path, sidecars, .. } => { - if !sidecars { - warn!("Source {name} does not have sidecars enabled, skipping"); - continue; - } - - let source = Arc::new(DirDataSource::new(name, path.clone(), *sidecars)); - - let mut stream = source.iter(); - while let Some(res) = stream.next().await { - let item = res.with_context(|| format!("while reading source {name}"))?; - - let Item::File { path, .. } = &item else { - continue; - }; - - let item = PileValue::Item(item.clone()); - let vals = - index - .get_field(&state, &item, &field) - .await - .with_context(|| { - format!("while extracting field from {}", path.display()) - })?; - - // TODO: implement sidecar writing - let _ = (&dest_path, &vals); - todo!("write_sidecar not yet implemented"); - - #[expect(unreachable_code)] - { - count += 1; - } - } - } - Source::S3 { .. } => { - warn!("Source {name} is an S3 source; sidecar annotation is not yet supported"); - } - } - } - - info!("Annotated {count} items"); - - return Ok(0); - } -} diff --git a/crates/pile/src/command/check.rs b/crates/pile/src/command/check.rs index 11de82e..6d71d9f 100644 --- a/crates/pile/src/command/check.rs +++ b/crates/pile/src/command/check.rs @@ -44,6 +44,7 @@ impl CliCmd for CheckCommand { } let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let ts_fts = ds.ts_fts().context("while determining fts age")?; diff --git a/crates/pile/src/command/fields.rs b/crates/pile/src/command/fields.rs index 0938fa3..3ff61d0 100644 --- a/crates/pile/src/command/fields.rs +++ b/crates/pile/src/command/fields.rs @@ -6,7 +6,6 @@ use pile_value::{extract::traits::ExtractState, value::PileValue}; use serde_json::{Map, Value}; use std::{path::PathBuf, time::Instant}; use tokio::task::JoinSet; -use tokio_stream::StreamExt; use tracing::info; use crate::{CliCmd, GlobalContext}; @@ -55,6 +54,7 @@ impl CliCmd for FieldsCommand { flag: CancelFlag, ) -> Result> { let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let start = Instant::now(); @@ -88,11 +88,10 @@ impl CliCmd for FieldsCommand { return Err(CancelableTaskError::Cancelled); } - match stream.next().await { + match stream.next() { None => break, - Some(item_result) => { - let item = - item_result.with_context(|| format!("while reading source {name}"))?; + Some(item) => { + let item = item.clone(); let name = name.clone(); let state = state.clone(); join_set.spawn(async move { diff --git a/crates/pile/src/command/index.rs b/crates/pile/src/command/index.rs index 61d9272..9e53070 100644 --- a/crates/pile/src/command/index.rs +++ b/crates/pile/src/command/index.rs @@ -25,6 +25,7 @@ impl CliCmd for IndexCommand { flag: CancelFlag, ) -> Result> { let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let state = ExtractState { ignore_mime: false }; diff --git a/crates/pile/src/command/list.rs b/crates/pile/src/command/list.rs index 7e8bd22..b2943a1 100644 --- a/crates/pile/src/command/list.rs +++ b/crates/pile/src/command/list.rs @@ -6,7 +6,6 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{extract::traits::ExtractState, value::PileValue}; use std::{path::PathBuf, str::FromStr, sync::Arc}; use tokio::task::JoinSet; -use tokio_stream::StreamExt; use tracing::info; use crate::{CliCmd, GlobalContext}; @@ -14,6 +13,7 @@ use crate::{CliCmd, GlobalContext}; #[derive(Debug, Args)] pub struct ListCommand { /// Path to query, e.g. $.flac.artist + #[clap(default_value = "$")] path: String, /// Only print items where the value is null (inverse of default) @@ -45,6 +45,7 @@ impl CliCmd for ListCommand { let path = Arc::new(path); let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let jobs = self.jobs.max(1); @@ -70,11 +71,10 @@ impl CliCmd for ListCommand { return Err(CancelableTaskError::Cancelled); } - match stream.next().await { + match stream.next() { None => break, - Some(item_result) => { - let item = - item_result.with_context(|| format!("while reading source {name}"))?; + Some(item) => { + let item = item.clone(); let source_name = name.to_string(); let key = item.key().to_string(); let path = path.clone(); diff --git a/crates/pile/src/command/lookup.rs b/crates/pile/src/command/lookup.rs index 413d91d..e2e341a 100644 --- a/crates/pile/src/command/lookup.rs +++ b/crates/pile/src/command/lookup.rs @@ -41,6 +41,7 @@ impl CliCmd for LookupCommand { flag: CancelFlag, ) -> Result> { let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let state = ExtractState { ignore_mime: false }; diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 050ff24..586519d 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -4,7 +4,6 @@ use pile_toolbox::cancelabletask::{ CancelFlag, CancelableTask, CancelableTaskError, CancelableTaskResult, }; -mod annotate; mod check; mod fields; mod index; @@ -23,12 +22,6 @@ pub enum SubCommand { #[clap(alias = "doc")] Docs {}, - /// Annotate all items with a field, writing it to a sidecar path - Annotate { - #[command(flatten)] - cmd: annotate::AnnotateCommand, - }, - /// Create an empty dataset Init { #[command(flatten)] @@ -88,7 +81,6 @@ pub enum SubCommand { impl CliCmdDispatch for SubCommand { fn start(self, ctx: GlobalContext) -> Result>> { match self { - Self::Annotate { cmd } => cmd.start(ctx), Self::Init { cmd } => cmd.start(ctx), Self::Check { cmd } => cmd.start(ctx), Self::Index { cmd } => cmd.start(ctx), diff --git a/crates/pile/src/command/probe.rs b/crates/pile/src/command/probe.rs index a42272b..b29ea00 100644 --- a/crates/pile/src/command/probe.rs +++ b/crates/pile/src/command/probe.rs @@ -37,6 +37,7 @@ impl CliCmd for ProbeCommand { .ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?; let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let state = ExtractState { ignore_mime: false }; diff --git a/crates/pile/src/command/serve.rs b/crates/pile/src/command/serve.rs index 1886ed8..4c14c92 100644 --- a/crates/pile/src/command/serve.rs +++ b/crates/pile/src/command/serve.rs @@ -34,6 +34,7 @@ impl CliCmd for ServeCommand { flag: CancelFlag, ) -> Result> { let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let state = ExtractState { ignore_mime: false }; diff --git a/crates/pile/src/command/upload.rs b/crates/pile/src/command/upload.rs index d4d97dd..1211171 100644 --- a/crates/pile/src/command/upload.rs +++ b/crates/pile/src/command/upload.rs @@ -8,7 +8,6 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::source::{DataSource, DirDataSource, S3DataSource}; use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::task::JoinSet; -use tokio_stream::StreamExt; use tracing::info; use crate::{CliCmd, GlobalContext, cli::progress_big}; @@ -52,6 +51,7 @@ impl CliCmd for UploadCommand { flag: CancelFlag, ) -> Result> { let ds = Datasets::open(&self.config) + .await .with_context(|| format!("while opening dataset for {}", self.config.display()))?; let dir_label = Label::new(&self.dir_source) @@ -104,20 +104,12 @@ impl CliCmd for UploadCommand { } // Count total files before uploading so we can show accurate progress - let total = { - let mut count = 0u64; - let mut count_stream = Arc::clone(&dir_ds).iter(); - while let Some(result) = count_stream.next().await { - result.context("while counting filesystem source")?; - count += 1; - } - count - }; + let total = dir_ds.iter().count() as u64; // Walk filesystem source and upload files in parallel let jobs = self.jobs.max(1); let mut uploaded: u64 = 0; - let mut stream = Arc::clone(&dir_ds).iter(); + let mut stream = dir_ds.iter(); let mut join_set: JoinSet> = JoinSet::new(); let pb = ctx.mp.add(ProgressBar::new(total)); @@ -146,24 +138,13 @@ impl CliCmd for UploadCommand { return Err(CancelableTaskError::Cancelled); } - let item = match stream.next().await { + let item = match stream.next() { None => break, - Some(Err(e)) => { - return Err(anyhow::Error::from(e) - .context("while iterating filesystem source") - .into()); - } - Some(Ok(item)) => item, + Some(item) => item.clone(), }; - let item_path = PathBuf::from(item.key().as_str()); - let relative = item_path.strip_prefix(&dir_ds.dir).with_context(|| { - format!("path '{}' is not under source root", item_path.display()) - })?; - let relative_str = relative - .to_str() - .ok_or_else(|| anyhow::anyhow!("non-UTF-8 path: {}", item_path.display()))? - .to_owned(); + let relative_str = item.key().to_string(); + let item_path = dir_ds.dir.join(&relative_str); let key = format!("{full_prefix}/{relative_str}"); let mime = item.mime().to_string();