From f466e1c1e38bdfa5f3cd242a8a728ca5136c0511 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Sun, 22 Feb 2026 09:23:57 -0800 Subject: [PATCH] lazily-evaluated extractors --- Cargo.lock | 123 +--------------- Cargo.toml | 1 - crates/pile-config/Cargo.toml | 2 - crates/pile-config/src/post.rs | 103 ------------- crates/pile-dataset/Cargo.toml | 2 - crates/pile-dataset/src/dataset.rs | 22 ++- crates/pile-dataset/src/extract/flac.rs | 70 +++++++++ crates/pile-dataset/src/extract/fs.rs | 80 +++++++++++ crates/pile-dataset/src/extract/map.rs | 21 +++ crates/pile-dataset/src/extract/mod.rs | 136 ++++++++++++++++++ crates/pile-dataset/src/index/index_fts.rs | 159 +++++++++++++++------ crates/pile-dataset/src/item.rs | 62 ++++++++ crates/pile-dataset/src/item/flac.rs | 66 --------- crates/pile-dataset/src/item/mod.rs | 2 - crates/pile-dataset/src/lib.rs | 5 +- crates/pile-dataset/src/source/dir.rs | 6 +- crates/pile-dataset/src/traits.rs | 38 +---- 17 files changed, 512 insertions(+), 386 deletions(-) create mode 100644 crates/pile-dataset/src/extract/flac.rs create mode 100644 crates/pile-dataset/src/extract/fs.rs create mode 100644 crates/pile-dataset/src/extract/map.rs create mode 100644 crates/pile-dataset/src/extract/mod.rs create mode 100644 crates/pile-dataset/src/item.rs delete mode 100644 crates/pile-dataset/src/item/flac.rs delete mode 100644 crates/pile-dataset/src/item/mod.rs diff --git a/Cargo.lock b/Cargo.lock index ef263e9..009d46d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,15 +135,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "block-buffer" -version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.11.0" @@ -362,16 +353,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" -[[package]] -name = "crypto-common" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" -dependencies = [ - "generic-array", - "typenum", -] - [[package]] name = "crypto-common" version = "0.2.0" @@ -425,25 +406,15 @@ dependencies = [ "serde_core", ] -[[package]] -name = "digest" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" -dependencies = [ - "block-buffer 0.10.4", - "crypto-common 0.1.7", -] - [[package]] name = "digest" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8bf3682cdec91817be507e4aa104314898b95b84d74f3d43882210101a545b6" dependencies = [ - "block-buffer 0.11.0", + "block-buffer", "const-oid", - "crypto-common 0.2.0", + "crypto-common", ] [[package]] @@ -556,16 +527,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "getrandom" version = "0.2.17" @@ -753,19 +714,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "jsonpath-rust" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "633a7320c4bb672863a3782e89b9094ad70285e097ff6832cddd0ec615beadfa" -dependencies = [ - "pest", - "pest_derive", - "regex", - "serde_json", - "thiserror", -] - [[package]] name = "lazy_static" version = "1.5.0" @@ -985,49 +933,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "pest" -version = "2.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0848c601009d37dfa3430c4666e147e49cdcf1b92ecd3e63657d8a5f19da662" -dependencies = [ - "memchr", - "ucd-trie", -] - -[[package]] -name = "pest_derive" -version = "2.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f486f1ea21e6c10ed15d5a7c77165d0ee443402f0780849d1768e7d9d6fe77" -dependencies = [ - "pest", - "pest_generator", -] - -[[package]] -name = "pest_generator" -version = "2.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8040c4647b13b210a963c1ed407c1ff4fdfa01c31d6d2a098218702e6664f94f" -dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pest_meta" -version = "2.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89815c69d36021a140146f26659a81d6c2afa33d216d736dd4be5381a7362220" -dependencies = [ - "pest", - "sha2 0.10.9", -] - [[package]] name = "pile" version = "0.0.1" @@ -1052,9 +957,7 @@ dependencies = [ name = "pile-config" version = "0.0.1" dependencies = [ - "itertools", "serde", - "serde_json", "smartstring", "toml", ] @@ -1065,7 +968,6 @@ version = "0.0.1" dependencies = [ "chrono", "itertools", - "jsonpath-rust", "pile-config", "pile-flac", "pile-toolbox", @@ -1086,7 +988,7 @@ dependencies = [ "itertools", "mime", "paste", - "sha2 0.11.0-rc.5", + "sha2", "smartstring", "strum", "thiserror", @@ -1373,17 +1275,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "sha2" -version = "0.10.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha2" version = "0.11.0-rc.5" @@ -1392,7 +1283,7 @@ checksum = "7c5f3b1e2dc8aad28310d8410bd4d7e180eca65fca176c52ab00d364475d0024" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.11.0", + "digest", ] [[package]] @@ -1900,12 +1791,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" -[[package]] -name = "ucd-trie" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" - [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 2408663..9e6ea34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,6 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" base64 = "0.22.1" toml = "1.0.3" -jsonpath-rust = "1.0.4" sha2 = "0.11.0-rc.5" # Misc helpers diff --git a/crates/pile-config/Cargo.toml b/crates/pile-config/Cargo.toml index 67b39ac..d2e66ec 100644 --- a/crates/pile-config/Cargo.toml +++ b/crates/pile-config/Cargo.toml @@ -9,8 +9,6 @@ workspace = true [dependencies] serde = { workspace = true } -itertools = { workspace = true } -serde_json = { workspace = true } smartstring = { workspace = true } [dev-dependencies] diff --git a/crates/pile-config/src/post.rs b/crates/pile-config/src/post.rs index 326acb1..aea31be 100644 --- a/crates/pile-config/src/post.rs +++ b/crates/pile-config/src/post.rs @@ -1,6 +1,4 @@ -use itertools::Itertools; use serde::Deserialize; -use serde_json::Value; #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(untagged)] @@ -12,107 +10,6 @@ pub enum FieldSpecPost { NotEmpty { notempty: bool }, } -impl FieldSpecPost { - pub fn apply(&self, val: &Value) -> Option { - Some(match self { - Self::NotEmpty { notempty: false } => val.clone(), - Self::NotEmpty { notempty: true } => match val { - Value::Null => return None, - Value::String(x) if x.is_empty() => return None, - Value::Array(x) if x.is_empty() => return None, - x => x.clone(), - }, - - Self::SetCase { case: Case::Lower } => match val { - Value::Null => return None, - Value::Bool(_) | Value::Number(_) => val.clone(), - Value::String(x) => Value::String(x.to_lowercase()), - - Value::Array(x) => { - Value::Array(x.iter().map(|x| self.apply(x)).collect::>()?) - } - - Value::Object(x) => Value::Object( - x.iter() - .map(|x| (x.0.to_lowercase(), self.apply(x.1))) - .map(|x| x.1.map(|y| (x.0, y))) - .collect::>()?, - ), - }, - - Self::SetCase { case: Case::Upper } => match val { - Value::Null => return None, - Value::Bool(_) | Value::Number(_) => val.clone(), - Value::String(x) => Value::String(x.to_uppercase()), - - Value::Array(x) => { - Value::Array(x.iter().map(|x| self.apply(x)).collect::>()?) - } - - Value::Object(x) => Value::Object( - x.iter() - .map(|x| (x.0.to_uppercase(), self.apply(x.1))) - .map(|x| x.1.map(|y| (x.0, y))) - .collect::>()?, - ), - }, - - Self::TrimSuffix { trim_suffix } => match val { - Value::Null => return None, - Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()), - - Value::String(x) => { - Value::String(x.strip_suffix(trim_suffix).unwrap_or(x).to_owned()) - } - - Value::Array(x) => { - Value::Array(x.iter().map(|x| self.apply(x)).collect::>()?) - } - - Value::Object(x) => Value::Object( - x.iter() - .map(|x| { - ( - x.0.strip_suffix(trim_suffix).unwrap_or(x.0).to_owned(), - self.apply(x.1), - ) - }) - .map(|x| x.1.map(|y| (x.0, y))) - .collect::>()?, - ), - }, - - Self::TrimPrefix { trim_prefix } => match val { - Value::Null => return None, - Value::Object(_) => return None, - Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()), - - Value::String(x) => { - Value::String(x.strip_prefix(trim_prefix).unwrap_or(x).to_owned()) - } - - Value::Array(x) => { - Value::Array(x.iter().map(|x| self.apply(x)).collect::>()?) - } - }, - - Self::Join { join } => match val { - Value::Null => return None, - Value::Object(_) => return None, - Value::Bool(_) | Value::Number(_) => Value::String(val.to_string()), - Value::String(x) => Value::String(x.clone()), - Value::Array(x) => Value::String( - x.iter() - .map(|x| self.apply(x)) - .collect::>>()? - .into_iter() - .join(join), - ), - }, - }) - } -} - #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum Case { diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 72039d1..fb5115d 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -12,13 +12,11 @@ pile-config = { workspace = true } pile-toolbox = { workspace = true } pile-flac = { workspace = true } - serde_json = { workspace = true } itertools = { workspace = true } walkdir = { workspace = true } tantivy = { workspace = true } tracing = { workspace = true } -jsonpath-rust = { workspace = true } chrono = { workspace = true } toml = { workspace = true } thiserror = { workspace = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 35e5378..de3206e 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -8,7 +8,11 @@ use rayon::{ use std::{ io::ErrorKind, path::PathBuf, - sync::{Arc, mpsc::Receiver}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + mpsc::Receiver, + }, thread::JoinHandle, time::Instant, }; @@ -145,14 +149,13 @@ impl Dataset { let mut total = 0u64; while let Ok(batch) = read_rx.recv() { let batch = batch.map_err(DatasetError::from)?; - let len = batch.len() as u64; - if let Some(flag) = &flag && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } + let this = AtomicU64::new(0); let start = Instant::now(); write_pool .install(|| { @@ -170,6 +173,7 @@ impl Dataset { } }) .map(|(key, doc)| { + this.fetch_add(1, Ordering::Relaxed); index_writer .add_document(doc) .map_err(|err| (key, err)) @@ -180,9 +184,10 @@ impl Dataset { }) .map_err(|(_key, err)| DatasetError::from(err))?; - total += len; + let this = this.load(Ordering::Relaxed); + total += this; let time_ms = start.elapsed().as_millis(); - debug!("Added a batch of {len} in {time_ms} ms ({total} total)"); + debug!("Added a batch of {this} in {time_ms} ms ({total} total)"); } if let Some(flag) = flag.as_ref() @@ -334,6 +339,13 @@ fn start_read_task( } } } + + if !batch.is_empty() { + match read_tx.send(Ok(batch)) { + Ok(()) => {} + Err(_) => return, + }; + } }); return (read_task, read_rx); diff --git a/crates/pile-dataset/src/extract/flac.rs b/crates/pile-dataset/src/extract/flac.rs new file mode 100644 index 0000000..6b8c938 --- /dev/null +++ b/crates/pile-dataset/src/extract/flac.rs @@ -0,0 +1,70 @@ +use pile_config::Label; +use pile_flac::{FlacBlock, FlacReader}; +use std::{collections::HashMap, fs::File, io::BufReader, sync::OnceLock}; + +use crate::{ + FileItem, + extract::{Extractor, PileValue}, +}; + +pub struct FlacExtractor<'a> { + item: &'a FileItem, + output: OnceLock>>, +} + +impl<'a> FlacExtractor<'a> { + pub fn new(item: &'a FileItem) -> Self { + Self { + item, + output: OnceLock::new(), + } + } + + fn get_inner(&self) -> Result<&HashMap>, std::io::Error> { + if let Some(x) = self.output.get() { + return Ok(x); + } + + let file = File::open(&self.item.path)?; + let reader = FlacReader::new(BufReader::new(file)); + + let mut output: HashMap> = HashMap::new(); + for block in reader { + if let FlacBlock::VorbisComment(comment) = block.unwrap() { + for (k, v) in comment.comment.comments { + match Label::new(k.to_string().to_lowercase()) { + Some(k) => output + .entry(k) + .or_default() + .push(PileValue::String(v.to_string())), + None => continue, + } + } + + // We should only have one comment block, + // stop reading when we find it + break; + } + } + + let output = output + .into_iter() + .map(|(k, v)| (k, PileValue::Array(v))) + .collect(); + + return Ok(self.output.get_or_init(|| output)); + } +} + +impl Extractor for FlacExtractor<'_> { + fn field<'a>( + &'a self, + name: &Label, + ) -> Result>, std::io::Error> { + Ok(self.get_inner()?.get(&name)) + } + + fn fields(&self) -> Result, std::io::Error> { + Ok(self.get_inner()?.keys().cloned().collect()) + } +} diff --git a/crates/pile-dataset/src/extract/fs.rs b/crates/pile-dataset/src/extract/fs.rs new file mode 100644 index 0000000..73bfb90 --- /dev/null +++ b/crates/pile-dataset/src/extract/fs.rs @@ -0,0 +1,80 @@ +use pile_config::Label; +use std::{collections::HashMap, path::Component, sync::OnceLock}; + +use crate::{ + FileItem, Key, + extract::{Extractor, PileValue}, +}; + +pub struct FsExtractor<'a> { + item: &'a FileItem, + output: OnceLock>>, +} + +impl<'a> FsExtractor<'a> { + pub fn new(item: &'a FileItem) -> Self { + Self { + item, + output: OnceLock::new(), + } + } + + fn get_inner(&self) -> Result<&HashMap>, std::io::Error> { + if let Some(x) = self.output.get() { + return Ok(x); + } + + let output = HashMap::from([ + ( + Label::new("extension").unwrap(), + self.item + .path + .extension() + .map(|x| x.to_str()) + .flatten() + .map(|x| PileValue::String(x.to_owned())) + .unwrap_or(PileValue::Null), + ), + ( + Label::new("path").unwrap(), + self.item + .path + .to_string() + .map(|x| PileValue::String(x.to_owned())) + .unwrap_or(PileValue::Null), + ), + ( + Label::new("segments").unwrap(), + self.item + .path + .components() + .map(|x| match x { + Component::CurDir => Some(".".to_owned()), + Component::Normal(x) => x.to_str().map(|x| x.to_owned()), + Component::ParentDir => Some("..".to_owned()), + Component::RootDir => Some("/".to_owned()), + Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()), + }) + .map(|x| x.map(PileValue::String)) + .collect::>>() + .map(|x| PileValue::Array(x)) + .unwrap_or(PileValue::Null), + ), + ]); + + return Ok(self.output.get_or_init(|| output)); + } +} + +impl Extractor for FsExtractor<'_> { + fn field<'a>( + &'a self, + name: &Label, + ) -> Result>, std::io::Error> { + Ok(self.get_inner()?.get(&name)) + } + + fn fields(&self) -> Result, std::io::Error> { + Ok(self.get_inner()?.keys().cloned().collect()) + } +} diff --git a/crates/pile-dataset/src/extract/map.rs b/crates/pile-dataset/src/extract/map.rs new file mode 100644 index 0000000..80f7acf --- /dev/null +++ b/crates/pile-dataset/src/extract/map.rs @@ -0,0 +1,21 @@ +use pile_config::Label; +use std::collections::HashMap; + +use crate::{ + Item, + extract::{Extractor, PileValue}, +}; + +pub struct MapExtractor<'a, I: Item> { + pub(super) inner: HashMap>, +} + +impl Extractor for MapExtractor<'_, I> { + fn field<'a>(&'a self, name: &Label) -> Result>, std::io::Error> { + Ok(self.inner.get(&name)) + } + + fn fields(&self) -> Result, std::io::Error> { + Ok(self.inner.keys().cloned().collect()) + } +} diff --git a/crates/pile-dataset/src/extract/mod.rs b/crates/pile-dataset/src/extract/mod.rs new file mode 100644 index 0000000..d5639ce --- /dev/null +++ b/crates/pile-dataset/src/extract/mod.rs @@ -0,0 +1,136 @@ +mod flac; +use std::{collections::HashMap, rc::Rc}; + +pub use flac::*; + +mod fs; +pub use fs::*; + +mod map; +pub use map::*; +use pile_config::Label; +use serde_json::{Map, Value}; + +use crate::{FileItem, Item}; + +pub trait Extractor { + /// Get the field at `name` from `item`. + /// - returns `None` if `name` is not a valid field + /// - returns `Some(Null)` if `name` is not available + fn field<'a>( + &'a self, + name: &pile_config::Label, + ) -> Result>, std::io::Error>; + + /// Return all fields in this extrator. + /// `Self::field` must return [Some] for all these keys + /// and [None] for all others. + fn fields(&self) -> Result, std::io::Error>; +} + +pub enum PileValue<'a, I: crate::Item> { + Null, + String(String), + Array(Vec>), + Extractor(Rc + 'a>), +} + +impl Clone for PileValue<'_, I> { + fn clone(&self) -> Self { + match self { + Self::Null => Self::Null, + Self::String(x) => Self::String(x.clone()), + Self::Array(x) => Self::Array(x.clone()), + Self::Extractor(x) => Self::Extractor(x.clone()), + } + } +} + +impl<'a, I: Item> PileValue<'a, I> { + pub fn query(&'a self, query: &[Label]) -> Result, std::io::Error> { + let mut out = Some(self); + + for q in query { + out = match &out { + None => return Ok(None), + Some(Self::Null) => None, + Some(Self::Array(_)) => None, + Some(Self::String(_)) => None, + Some(Self::Extractor(e)) => e.field(&q)?, + }; + } + + return Ok(out); + } + + pub fn as_str(&self) -> Option<&str> { + match self { + Self::String(x) => Some(&x), + _ => None, + } + } + + pub fn to_json(&self) -> Result { + Ok(match self { + Self::Null => Value::Null, + Self::String(x) => Value::String(x.clone()), + + Self::Array(x) => Value::Array( + x.iter() + .map(|x| x.to_json()) + .collect::, _>>()?, + ), + + Self::Extractor(e) => { + let keys = e.fields()?; + let map = keys + .iter() + .map(|k| { + let v = e.field(k)?.unwrap(); + let v = v.to_json()?; + Ok((k.to_string(), v)) + }) + .collect::, std::io::Error>>()?; + + Value::Object(map) + } + }) + } +} + +pub struct MetaExtractor<'a, I: crate::Item> { + inner: MapExtractor<'a, I>, +} + +impl<'a> MetaExtractor<'a, FileItem> { + pub fn new(item: &'a FileItem) -> Self { + let inner = MapExtractor { + inner: HashMap::from([ + ( + Label::new("flac").unwrap(), + PileValue::Extractor(Rc::new(FlacExtractor::new(item))), + ), + ( + Label::new("fs").unwrap(), + PileValue::Extractor(Rc::new(FsExtractor::new(item))), + ), + ]), + }; + + Self { inner } + } +} + +impl Extractor for MetaExtractor<'_, crate::FileItem> { + fn field<'a>( + &'a self, + name: &pile_config::Label, + ) -> Result>, std::io::Error> { + self.inner.field(name) + } + + #[expect(clippy::unwrap_used)] + fn fields(&self) -> Result, std::io::Error> { + return Ok(vec![Label::new("flac").unwrap(), Label::new("fs").unwrap()]); + } +} diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index dce31e2..7e9ceb9 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -1,7 +1,6 @@ -use jsonpath_rust::JsonPath; -use pile_config::{ConfigToml, DatasetFts, Label}; -use serde_json::Value; -use std::{path::PathBuf, sync::LazyLock}; +use itertools::Itertools; +use pile_config::{Case, ConfigToml, DatasetFts, FieldSpecPost, Label}; +use std::{path::PathBuf, rc::Rc, sync::LazyLock}; use tantivy::{ DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError, collector::Collector, @@ -10,7 +9,10 @@ use tantivy::{ }; use tracing::{debug, trace, warn}; -use crate::{Item, Key}; +use crate::{ + Item, Key, + extract::{MetaExtractor, PileValue}, +}; #[derive(Debug, Clone)] pub struct FtsLookupResult { @@ -84,10 +86,17 @@ impl DbFtsIndex { doc.add_text(self.schema.get_field("_meta_source")?, item.source_name()); doc.add_text(self.schema.get_field("_meta_key")?, key); - let json = item.json()?; + let item = match item.as_file() { + Some(x) => x, + None => return Ok(None), + }; + + let extractor = MetaExtractor::new(item); + let extractor = PileValue::Extractor(Rc::new(extractor)); + let mut empty = true; for name in self.fts_cfg().fields.keys() { - let x = self.get_field(&json, name)?; + let x = self.get_field(&extractor, name)?; let val = match x { Some(x) => x, @@ -109,9 +118,9 @@ impl DbFtsIndex { // MARK: read // - pub fn get_field( + pub fn get_field( &self, - json: &Value, + extractor: &PileValue<'_, I>, field_name: &Label, ) -> Result, std::io::Error> { let field = match self.cfg.schema.get(field_name) { @@ -124,41 +133,23 @@ impl DbFtsIndex { // Try paths in order, using the first value we find 'outer: for path in field.path.as_slice() { - let val = match json.query(path) { - Ok(mut x) => { - if x.len() > 1 { - warn!( - message = "Path returned more than one value, this is not supported. Skipping.", - ?path, - field = field_name.to_string() - ); - continue; - } + let segments = path + .split('.') + .map(|x| Label::new(x).expect(&format!("wtf {x}"))) + .collect::>(); - match x.pop() { - Some(x) => x, - None => continue, - } - } - - Err(error) => { - warn!( - message = "Invalid path, skipping", - ?path, - field = field_name.to_string(), - ?error - ); - continue; - } + let val = match extractor.query(&segments)? { + Some(x) => x, + None => return Ok(None), }; let mut val = match val { - Value::Null => { + PileValue::Null => { trace!( message = "Skipping field, is null", field = field_name.to_string(), path, - value = ?val + // value = ?val ); continue; } @@ -166,7 +157,7 @@ impl DbFtsIndex { }; for post in &field.post { - val = match post.apply(&val) { + val = match apply(&post, &val) { Some(x) => x, None => return Ok(None), }; @@ -175,7 +166,7 @@ impl DbFtsIndex { loop { val = match val { #[expect(clippy::unwrap_used)] - Value::Array(ref mut x) => { + PileValue::Array(ref mut x) => { if x.len() == 1 { x.pop().unwrap() } else if x.len() > 1 { @@ -183,7 +174,7 @@ impl DbFtsIndex { message = "Skipping field, is array with more than one element", field = field_name.to_string(), path, - value = ?val + //value = ?val ); continue 'outer; } else { @@ -191,32 +182,30 @@ impl DbFtsIndex { message = "Skipping field, is empty array", field = field_name.to_string(), path, - value = ?val + //value = ?val ); continue 'outer; } } - Value::Null => { + PileValue::Null => { trace!( message = "Skipping field, is null", field = field_name.to_string(), path, - value = ?val + //value = ?val ); continue 'outer; } - Value::Object(_) => { + PileValue::Extractor(_) => { trace!( message = "Skipping field, is object", field = field_name.to_string(), path, - value = ?val + //value = ?val ); continue 'outer; } - Value::Bool(x) => return Ok(Some(x.to_string())), - Value::Number(x) => return Ok(Some(x.to_string())), - Value::String(x) => return Ok(Some(x)), + PileValue::String(x) => return Ok(Some(x)), } } } @@ -310,3 +299,79 @@ impl DbFtsIndex { return Ok(out); } } + +pub fn apply<'a, I: Item>( + post: &FieldSpecPost, + val: &PileValue<'a, I>, +) -> Option> { + Some(match post { + FieldSpecPost::NotEmpty { notempty: false } => val.clone(), + FieldSpecPost::NotEmpty { notempty: true } => match val { + PileValue::Null => return None, + PileValue::String(x) if x.is_empty() => return None, + PileValue::Array(x) if x.is_empty() => return None, + x => x.clone(), + }, + + FieldSpecPost::SetCase { case: Case::Lower } => match val { + PileValue::Null => return None, + PileValue::Extractor(_) => return None, + PileValue::String(x) => PileValue::String(x.to_lowercase()), + + PileValue::Array(x) => { + PileValue::Array(x.iter().map(|x| apply(post, x)).collect::>()?) + } + }, + + FieldSpecPost::SetCase { case: Case::Upper } => match val { + PileValue::Null => return None, + PileValue::Extractor(_) => return None, + PileValue::String(x) => PileValue::String(x.to_uppercase()), + + PileValue::Array(x) => { + PileValue::Array(x.iter().map(|x| apply(post, x)).collect::>()?) + } + }, + + FieldSpecPost::TrimSuffix { trim_suffix } => match val { + PileValue::Null => return None, + PileValue::Extractor(_) => return None, + + PileValue::String(x) => { + PileValue::String(x.strip_suffix(trim_suffix).unwrap_or(x).to_owned()) + } + + PileValue::Array(x) => { + PileValue::Array(x.iter().map(|x| apply(post, x)).collect::>()?) + } + }, + + FieldSpecPost::TrimPrefix { trim_prefix } => match val { + PileValue::Null => return None, + PileValue::Extractor(_) => return None, + + PileValue::String(x) => { + PileValue::String(x.strip_prefix(trim_prefix).unwrap_or(x).to_owned()) + } + + PileValue::Array(x) => { + PileValue::Array(x.iter().map(|x| apply(post, x)).collect::>()?) + } + }, + + FieldSpecPost::Join { join } => match val { + PileValue::Null => return None, + PileValue::Extractor(_) => return None, + + PileValue::String(x) => PileValue::String(x.clone()), + PileValue::Array(x) => PileValue::String( + x.iter() + .map(|x| apply(post, x)) + .map(|x| x.map(|x| x.as_str().map(|x| x.to_owned())).flatten()) + .collect::>>()? + .into_iter() + .join(join), + ), + }, + }) +} diff --git a/crates/pile-dataset/src/item.rs b/crates/pile-dataset/src/item.rs new file mode 100644 index 0000000..fc23449 --- /dev/null +++ b/crates/pile-dataset/src/item.rs @@ -0,0 +1,62 @@ +use pile_config::Label; +use std::{fmt::Debug, path::PathBuf}; + +// +// MARK: key +// + +pub trait Key: Debug + Clone + Send + Sync + 'static { + /// Convert this key to a string, returning `None` + /// if we encounter any kind of error. + fn to_string(&self) -> Option; + + fn from_string(str: &str) -> Option; +} + +impl Key for PathBuf { + fn from_string(str: &str) -> Option { + str.parse().ok() + } + + fn to_string(&self) -> Option { + self.to_str().map(|x| x.to_owned()) + } +} + +// +// MARK: item +// + +/// A pointer to raw data +pub trait Item: Debug + Send + Sync + 'static { + type Key: Key; + + fn source_name(&self) -> &str; + fn key(&self) -> &Self::Key; + + fn as_file(&self) -> Option<&FileItem>; +} + +#[derive(Clone, Debug)] +pub struct FileItem { + /// Path to this file. + /// Must be relative to source root dir. + pub path: PathBuf, + pub source_name: Label, +} + +impl Item for FileItem { + type Key = PathBuf; + + fn source_name(&self) -> &str { + &self.source_name + } + + fn key(&self) -> &Self::Key { + &self.path + } + + fn as_file(&self) -> Option<&FileItem> { + Some(&self) + } +} diff --git a/crates/pile-dataset/src/item/flac.rs b/crates/pile-dataset/src/item/flac.rs deleted file mode 100644 index 475c836..0000000 --- a/crates/pile-dataset/src/item/flac.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{fmt::Debug, fs::File, io::BufReader, path::PathBuf}; - -use pile_config::Label; -use pile_flac::{FlacBlock, FlacReader}; -use serde_json::{Map, Value}; - -use crate::Item; - -pub struct FlacItem { - pub(crate) path: PathBuf, - pub(crate) source_name: Label, -} - -impl Debug for FlacItem { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlacItem") - .field("path", &self.path) - .finish() - } -} - -impl Item for FlacItem { - type Key = PathBuf; - - fn source_name(&self) -> &str { - &self.source_name - } - - fn key(&self) -> &Self::Key { - &self.path - } - - fn json(&self) -> Result { - let file = File::open(&self.path)?; - let reader = FlacReader::new(BufReader::new(file)); - - let mut output = Map::new(); - - for block in reader { - if let FlacBlock::VorbisComment(comment) = block.unwrap() { - for (k, v) in comment.comment.comments { - let k = k.to_string(); - let v = Value::String(v.into()); - let e = output.get_mut(&k); - - match e { - None => { - output.insert(k.clone(), Value::Array(vec![v])); - } - Some(e) => { - // We always insert an array - #[expect(clippy::unwrap_used)] - e.as_array_mut().unwrap().push(v); - } - } - } - - // We should only have one comment block, - // stop reading when we find it - break; - } - } - - return Ok(serde_json::Value::Object(output)); - } -} diff --git a/crates/pile-dataset/src/item/mod.rs b/crates/pile-dataset/src/item/mod.rs deleted file mode 100644 index db29339..0000000 --- a/crates/pile-dataset/src/item/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod flac; -pub use flac::*; diff --git a/crates/pile-dataset/src/lib.rs b/crates/pile-dataset/src/lib.rs index c0a9666..ed4e6c4 100644 --- a/crates/pile-dataset/src/lib.rs +++ b/crates/pile-dataset/src/lib.rs @@ -7,6 +7,9 @@ pub use misc::*; mod dataset; pub use dataset::*; +mod item; +pub use item::*; + +pub mod extract; pub mod index; -pub mod item; pub mod source; diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs index fe65270..3910b11 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-dataset/src/source/dir.rs @@ -4,7 +4,7 @@ use pile_config::Label; use std::path::PathBuf; use walkdir::WalkDir; -use crate::{DataSource, Item, item::FlacItem, path_ts_latest}; +use crate::{DataSource, Item, item::FileItem, path_ts_latest}; #[derive(Debug)] pub struct DirDataSource { @@ -33,7 +33,7 @@ impl DataSource for DirDataSource { return Ok(None); } - return Ok(Some(Box::new(FlacItem { + return Ok(Some(Box::new(FileItem { source_name: self.name.clone(), path: key.to_owned(), }))); @@ -61,7 +61,7 @@ impl DataSource for DirDataSource { let item: Box> = match path.extension().map(|x| x.to_str()).flatten() { None => return None, - Some("flac") => Box::new(FlacItem { + Some("flac") => Box::new(FileItem { source_name: self.name.clone(), path: path.clone(), }), diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-dataset/src/traits.rs index aac2dd6..4359e52 100644 --- a/crates/pile-dataset/src/traits.rs +++ b/crates/pile-dataset/src/traits.rs @@ -1,5 +1,7 @@ use chrono::{DateTime, Utc}; -use std::{error::Error, fmt::Debug, path::PathBuf}; +use std::error::Error; + +use crate::{Item, Key}; /// A read-only set of [Item]s. pub trait DataSource { @@ -23,37 +25,3 @@ pub trait DataSource { /// Return the time of the latest change to the data in this source fn latest_change(&self) -> Result>, Self::Error>; } - -pub trait Item: Debug + Send + Sync + 'static { - type Key: Key; - - /// Get this item's unstructured schema - /// - /// TODO: don't use json, use a lazily-evaluated type that supports binary - fn json(&self) -> Result; - - fn source_name(&self) -> &str; - fn key(&self) -> &Self::Key; -} - -// -// MARK: key -// - -pub trait Key: Debug + Clone + Send + Sync + 'static { - /// Convert this key to a string, returning `None` - /// if we encounter any kind of error. - fn to_string(&self) -> Option; - - fn from_string(str: &str) -> Option; -} - -impl Key for PathBuf { - fn from_string(str: &str) -> Option { - str.parse().ok() - } - - fn to_string(&self) -> Option { - self.to_str().map(|x| x.to_owned()) - } -}