From af3140d1ab495f7d46551bbfaaaa007a7d09d601 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 6 Jan 2026 23:05:58 -0800 Subject: [PATCH] Initial `pile-dataset` --- crates/pile-dataset/Cargo.toml | 20 ++ crates/pile-dataset/src/index/index_fts.rs | 304 +++++++++++++++++++++ crates/pile-dataset/src/index/mod.rs | 2 + crates/pile-dataset/src/item/flac.rs | 89 ++++++ crates/pile-dataset/src/item/mod.rs | 2 + crates/pile-dataset/src/lib.rs | 6 + crates/pile-dataset/src/source/dir.rs | 66 +++++ crates/pile-dataset/src/source/mod.rs | 2 + crates/pile-dataset/src/traits.rs | 49 ++++ 9 files changed, 540 insertions(+) create mode 100644 crates/pile-dataset/Cargo.toml create mode 100644 crates/pile-dataset/src/index/index_fts.rs create mode 100644 crates/pile-dataset/src/index/mod.rs create mode 100644 crates/pile-dataset/src/item/flac.rs create mode 100644 crates/pile-dataset/src/item/mod.rs create mode 100644 crates/pile-dataset/src/lib.rs create mode 100644 crates/pile-dataset/src/source/dir.rs create mode 100644 crates/pile-dataset/src/source/mod.rs create mode 100644 crates/pile-dataset/src/traits.rs diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml new file mode 100644 index 0000000..b148fe4 --- /dev/null +++ b/crates/pile-dataset/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "pile-dataset" +version = { workspace = true } +rust-version = { workspace = true } +edition = { workspace = true } + +[lints] +workspace = true + +[dependencies] +pile-config = { workspace = true } +pile-audio = { workspace = true } + + +serde_json = { workspace = true } +itertools = { workspace = true } +walkdir = { workspace = true } +tantivy = { workspace = true } +tracing = { workspace = true } +jsonpath-rust = { workspace = true } diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs new file mode 100644 index 0000000..683704e --- /dev/null +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -0,0 +1,304 @@ +use jsonpath_rust::JsonPath; +use pile_config::{ConfigToml, DatasetFts}; +use serde_json::Value; +use std::{path::PathBuf, sync::LazyLock}; +use tantivy::{ + DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError, + collector::Collector, + query::QueryParser, + schema::{self, Schema, Value as TantivyValue}, +}; +use tracing::{debug, trace, warn}; + +use crate::{Item, Key}; + +#[derive(Debug, Clone)] +pub struct FtsLookupResult { + pub score: f32, + pub source_name: String, + pub key: String, +} + +pub struct DbFtsIndex { + pub path: PathBuf, + pub cfg: ConfigToml, + pub schema: Schema, +} + +impl DbFtsIndex { + fn fts_cfg(&self) -> &DatasetFts { + static DEFAULT: LazyLock = LazyLock::new(|| DatasetFts::default()); + &self.cfg.fts.as_ref().unwrap_or(&DEFAULT) + } +} + +impl DbFtsIndex { + pub fn new(path: impl Into, cfg: &ConfigToml) -> Self { + let path = path.into(); + let cfg = cfg.clone(); + + let schema = { + let mut schema_builder = Schema::builder(); + + schema_builder.add_text_field("_meta_source", schema::STORED); + schema_builder.add_text_field("_meta_key", schema::STORED); + + let default = DatasetFts::default(); + let fields = &cfg.fts.as_ref().unwrap_or(&default).fields; + for (name, field) in fields { + if field.tokenize { + schema_builder.add_text_field(&name, schema::TEXT); + } else { + schema_builder.add_text_field(&name, schema::STRING); + } + } + + schema_builder.build() + }; + + Self { path, cfg, schema } + } + + // + // MARK: write + // + + /// Turn an entry into a tantivy document + pub fn entry_to_document( + &self, + item: &dyn Item, + ) -> Result, TantivyError> { + let mut doc = TantivyDocument::default(); + + { + let f_ptr = self.schema.get_field("_meta_source")?; + doc.add_text(f_ptr, item.source_name()); + } + + { + let f_ptr = self.schema.get_field("_meta_key")?; + doc.add_text(f_ptr, item.key().to_string()); + } + + let json = item.json().unwrap(); + let mut empty = true; + for (name, _field) in &self.fts_cfg().fields { + let val = match self.get_field(&json, name)? { + Some(x) => x, + None => continue, + }; + + empty = false; + + let field = self.schema.get_field(name); + if let Ok(field) = field { + doc.add_text(field, val); + } + } + + return Ok((!empty).then_some(doc)); + } + + // + // MARK: read + // + + pub fn get_field( + &self, + json: &Value, + field_name: &str, + ) -> Result, std::io::Error> { + let field = match self.cfg.schema.get(field_name) { + Some(x) => x, + None => { + warn!("Unknown field {field_name:?}"); + return Ok(None); + } + }; + + // Try paths in order, using the first value we find + 'outer: for path in field.path.as_slice() { + let val = match json.query(&path) { + Ok(mut x) => { + if x.len() > 1 { + warn!( + message = "Path returned more than one value, this is not supported. Skipping.", + ?path, + field = field_name + ); + continue; + } + + match x.pop() { + Some(x) => x, + None => continue, + } + } + + Err(error) => { + warn!( + message = "Invalid path, skipping", + ?path, + field = field_name, + ?error + ); + continue; + } + }; + + let mut val = match val { + Value::Null => { + trace!( + message = "Skipping field, is null", + field = field_name, + path, + value = ?val + ); + continue; + } + x => x.clone(), + }; + + for post in &field.post { + val = match post.apply(&val) { + Some(x) => x, + None => return Ok(None), + }; + } + + loop { + val = match val { + Value::Array(ref mut x) => { + if x.len() == 1 { + x.pop().unwrap() + } else if x.len() > 1 { + debug!( + message = "Skipping field, is array with more than one element", + field = field_name, + path, + value = ?val + ); + continue 'outer; + } else { + debug!( + message = "Skipping field, is empty array", + field = field_name, + path, + value = ?val + ); + continue 'outer; + } + } + Value::Null => { + trace!( + message = "Skipping field, is null", + field = field_name, + path, + value = ?val + ); + continue 'outer; + } + Value::Object(_) => { + trace!( + message = "Skipping field, is object", + field = field_name, + path, + value = ?val + ); + continue 'outer; + } + Value::Bool(x) => return Ok(Some(x.to_string())), + Value::Number(x) => return Ok(Some(x.to_string())), + Value::String(x) => return Ok(Some(x)), + } + } + } + + return Ok(None); + } + + /// Run the given query on this table's ftx index. + /// Returns `None` if this table does not have an fts index. + /// + /// Note that concurrent writes to the table may invalidate `Entry::Id`. + /// When you use [Table::get_meta] on this method's results, there is + /// NO GUARANTEE that the entries returned by this method will exist. + pub fn lookup( + &self, + query: impl Into, + collector: impl AsRef + Send + 'static, + ) -> Result, TantivyError> + where + C: Collector, + C::Fruit: IntoIterator, + { + if !self.path.exists() { + return Ok(Vec::new()); + } + + if !self.path.is_dir() { + warn!("fts index at {} is not a directory?!", self.path.display()); + return Ok(Vec::new()); + } + + let query: String = query.into(); + let index = Index::open_in_dir(&self.path)?; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommitWithDelay) + .try_into()?; + + let schema = index.schema(); + + let mut fields = Vec::new(); + for f in self.schema.fields() { + fields.push(f.0) + } + + let searcher = reader.searcher(); + let query_parser = QueryParser::for_index(&index, fields); + let query = query_parser.parse_query(&query)?; + let res = searcher.search(&query, collector.as_ref())?; + + let mut out = Vec::new(); + for (score, doc) in res { + let retrieved_doc: TantivyDocument = searcher.doc(doc)?; + + #[expect(clippy::unwrap_used)] + let source_name = { + let f_source = schema.get_field("_meta_source").unwrap(); + + let mut source = None; + for v in retrieved_doc.get_all(f_source) { + assert!(source.is_none()); // Must only exist once + let v = v.as_str().unwrap().to_owned(); + source = Some(v) + } + + source.unwrap() + }; + + #[expect(clippy::unwrap_used)] + let key = { + let f_source = schema.get_field("_meta_key").unwrap(); + + let mut source = None; + for v in retrieved_doc.get_all(f_source) { + assert!(source.is_none()); // Must only exist once + let v = v.as_str().unwrap().to_owned(); + source = Some(v) + } + + source.unwrap() + }; + + out.push(FtsLookupResult { + score, + source_name, + key, + }); + } + + return Ok(out); + } +} diff --git a/crates/pile-dataset/src/index/mod.rs b/crates/pile-dataset/src/index/mod.rs new file mode 100644 index 0000000..e6b7cc8 --- /dev/null +++ b/crates/pile-dataset/src/index/mod.rs @@ -0,0 +1,2 @@ +mod index_fts; +pub use index_fts::*; diff --git a/crates/pile-dataset/src/item/flac.rs b/crates/pile-dataset/src/item/flac.rs new file mode 100644 index 0000000..3185c9c --- /dev/null +++ b/crates/pile-dataset/src/item/flac.rs @@ -0,0 +1,89 @@ +use std::{ + fmt::Debug, + fs::File, + io::{Read, Seek}, + path::PathBuf, +}; + +use pile_audio::flac::blockread::{FlacBlock, FlacBlockReader, FlacBlockSelector}; +use serde_json::{Map, Value}; + +use crate::Item; + +pub struct FlacItem { + pub(crate) path: PathBuf, + pub(crate) source_name: String, +} + +impl Debug for FlacItem { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlacItem") + .field("path", &self.path) + .finish() + } +} + +impl Item for FlacItem { + type Key = PathBuf; + + fn source_name(&self) -> &str { + &self.source_name + } + + fn key(&self) -> &Self::Key { + &self.path + } + + fn json(&self) -> Result { + let mut block_reader = FlacBlockReader::new(FlacBlockSelector { + pick_vorbiscomment: true, + ..Default::default() + }); + + let mut file = File::open(&self.path)?; + + // TODO: do not read the whole file + file.rewind()?; + let mut data = Vec::new(); + file.read_to_end(&mut data)?; + + block_reader.push_data(&data).unwrap(); + block_reader.finish().unwrap(); + + // + // Return tags + // + + let mut output = Map::new(); + + while block_reader.has_block() { + let b = block_reader.pop_block().unwrap(); + match b { + FlacBlock::VorbisComment(comment) => { + for (k, v) in comment.comment.comments { + let k = k.to_string(); + let v = Value::String(v.into()); + let e = output.get_mut(&k); + + match e { + None => { + output.insert(k.to_string(), Value::Array(vec![v])); + } + Some(e) => { + e.as_array_mut().unwrap().push(v); + } + } + } + } + + // `reader` filters blocks for us + _ => unreachable!(), + } + + // We should only have one comment block + assert!(!block_reader.has_block()); + } + + return Ok(serde_json::Value::Object(output)); + } +} diff --git a/crates/pile-dataset/src/item/mod.rs b/crates/pile-dataset/src/item/mod.rs new file mode 100644 index 0000000..db29339 --- /dev/null +++ b/crates/pile-dataset/src/item/mod.rs @@ -0,0 +1,2 @@ +mod flac; +pub use flac::*; diff --git a/crates/pile-dataset/src/lib.rs b/crates/pile-dataset/src/lib.rs new file mode 100644 index 0000000..6c04bd9 --- /dev/null +++ b/crates/pile-dataset/src/lib.rs @@ -0,0 +1,6 @@ +mod traits; +pub use traits::*; + +pub mod index; +pub mod item; +pub mod source; diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs new file mode 100644 index 0000000..0fb6101 --- /dev/null +++ b/crates/pile-dataset/src/source/dir.rs @@ -0,0 +1,66 @@ +use itertools::Itertools; +use std::{io::ErrorKind, path::PathBuf}; +use walkdir::WalkDir; + +use crate::{DataSource, Item, item::FlacItem}; + +#[derive(Debug)] +pub struct DirDataSource { + pub name: String, + pub dirs: Vec, +} + +impl DirDataSource { + pub fn new(name: impl Into, dirs: Vec) -> Self { + Self { + name: name.into(), + dirs, + } + } +} + +impl DataSource for DirDataSource { + type Key = PathBuf; + type Error = std::io::Error; + + fn get(&self, key: &Self::Key) -> Result>>, Self::Error> { + if !key.is_file() { + return Ok(None); + } + + return Ok(Some(Box::new(FlacItem { + source_name: self.name.clone(), + path: key.to_owned(), + }))); + } + + fn iter( + &self, + ) -> impl Iterator>), Self::Error>> { + return self + .dirs + .iter() + .map(|x| WalkDir::new(x).into_iter().map_ok(move |d| (x, d))) + .flatten() + .into_iter() + .filter_ok(|(_, entry)| entry.file_type().is_file()) + .map(|x| match x { + Err(err) => { + let msg = format!("other walkdir error: {err:?}"); + Err(err + .into_io_error() + .unwrap_or(std::io::Error::new(ErrorKind::Other, msg))) + } + Ok((_, entry)) => { + let path = entry.into_path(); + let item = FlacItem { + source_name: self.name.clone(), + path: path.clone(), + }; + + let item: Box> = Box::new(item); + Ok((path, item)) + } + }); + } +} diff --git a/crates/pile-dataset/src/source/mod.rs b/crates/pile-dataset/src/source/mod.rs new file mode 100644 index 0000000..fa148b9 --- /dev/null +++ b/crates/pile-dataset/src/source/mod.rs @@ -0,0 +1,2 @@ +mod dir; +pub use dir::*; diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-dataset/src/traits.rs new file mode 100644 index 0000000..a71a116 --- /dev/null +++ b/crates/pile-dataset/src/traits.rs @@ -0,0 +1,49 @@ +use std::{error::Error, fmt::Debug, path::PathBuf}; + +/// A read-only set of [Item]s. +pub trait DataSource { + /// The type used to retrieve items from this source + /// (e.g, a PathBuf or a primary key) + type Key: Key; + + type Error: Error; + + /// Get an item from this datasource + fn get(&self, key: &Self::Key) -> Result>>, Self::Error>; + + /// Iterate over all items in this source in an arbitrary order + fn iter( + &self, + ) -> impl Iterator>), Self::Error>>; +} + +pub trait Item: Debug + Send + Sync + 'static { + type Key: Key; + + /// Get this item's unstructured schema + /// + /// TODO: don't use json, use a lazily-evaluated type that supports binary + fn json(&self) -> Result; + + fn source_name(&self) -> &str; + fn key(&self) -> &Self::Key; +} + +// +// MARK: key +// + +pub trait Key: Debug + Clone + Send + Sync + 'static { + fn to_string(&self) -> String; + fn from_string(str: &str) -> Option; +} + +impl Key for PathBuf { + fn from_string(str: &str) -> Option { + str.parse().ok() + } + + fn to_string(&self) -> String { + self.to_str().expect("path is not a string").to_owned() + } +}