diff --git a/Cargo.lock b/Cargo.lock index bb0ea99..60c0214 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2491,6 +2491,7 @@ dependencies = [ "pile-config", "pile-dataset", "pile-toolbox", + "pile-value", "serde", "serde_json", "tokio", @@ -2515,26 +2516,13 @@ dependencies = [ name = "pile-dataset" version = "0.0.1" dependencies = [ - "async-trait", - "aws-sdk-s3", "axum", - "blake3", "chrono", - "epub", - "id3", - "image", - "itertools 0.14.0", - "kamadak-exif", - "mime", - "mime_guess", - "pdf", - "pdfium-render", "pile-config", - "pile-flac", "pile-toolbox", + "pile-value", "serde", "serde_json", - "smartstring", "tantivy", "thiserror", "tokio", @@ -2543,7 +2531,6 @@ dependencies = [ "tracing", "utoipa", "utoipa-swagger-ui", - "walkdir", ] [[package]] @@ -2568,6 +2555,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "pile-value" +version = "0.0.1" +dependencies = [ + "async-trait", + "aws-sdk-s3", + "blake3", + "chrono", + "epub", + "id3", + "image", + "kamadak-exif", + "mime", + "mime_guess", + "pdf", + "pdfium-render", + "pile-config", + "pile-flac", + "serde_json", + "smartstring", + "tokio", + "tokio-stream", + "toml", + "tracing", + "walkdir", +] + [[package]] name = "pin-project-lite" version = "0.2.16" diff --git a/Cargo.toml b/Cargo.toml index 2889a85..4f5418e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ pile-toolbox = { path = "crates/pile-toolbox" } pile-config = { path = "crates/pile-config" } pile-flac = { path = "crates/pile-flac" } pile-dataset = { path = "crates/pile-dataset" } +pile-value = { path = "crates/pile-value" } # Clients & servers tantivy = "0.25.0" diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index 944a2e6..76ef44f 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -1,9 +1,6 @@ use serde::Deserialize; use std::{collections::HashMap, fmt::Debug, path::PathBuf}; -mod post; -pub use post::*; - mod misc; pub use misc::*; @@ -40,10 +37,6 @@ pub struct DatasetConfig { /// Where to find this field pub source: HashMap, - - /// How to post-process this field - #[serde(default)] - pub post: Vec, } #[derive(Debug, Clone, Deserialize)] @@ -100,10 +93,6 @@ pub struct FieldSpec { /// How to find this field in a data entry pub path: Vec, - - /// How to post-process this field - #[serde(default)] - pub post: Vec, } #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] diff --git a/crates/pile-config/src/post.rs b/crates/pile-config/src/post.rs deleted file mode 100644 index aea31be..0000000 --- a/crates/pile-config/src/post.rs +++ /dev/null @@ -1,18 +0,0 @@ -use serde::Deserialize; - -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde(untagged)] -pub enum FieldSpecPost { - TrimSuffix { trim_suffix: String }, - TrimPrefix { trim_prefix: String }, - SetCase { case: Case }, - Join { join: String }, - NotEmpty { notempty: bool }, -} - -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "lowercase")] -pub enum Case { - Lower, - Upper, -} diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 113f6fa..bf9e532 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -10,37 +10,23 @@ workspace = true [dependencies] pile-config = { workspace = true } pile-toolbox = { workspace = true } -pile-flac = { workspace = true } +pile-value = { workspace = true } serde_json = { workspace = true } -itertools = { workspace = true } -walkdir = { workspace = true } tantivy = { workspace = true } tracing = { workspace = true } chrono = { workspace = true } toml = { workspace = true } thiserror = { workspace = true } -smartstring = { workspace = true } -blake3 = { workspace = true } -epub = { workspace = true } -kamadak-exif = { workspace = true } -pdf = { workspace = true } -pdfium-render = { workspace = true, optional = true } -image = { workspace = true, optional = true } -id3 = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } -async-trait = { workspace = true } -aws-sdk-s3 = { workspace = true } -mime = { workspace = true } -mime_guess = { workspace = true } -serde = { workspace = true } +serde = { workspace = true, optional = true } axum = { workspace = true, optional = true } utoipa = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } [features] default = [] -pdfium = ["dep:pdfium-render", "dep:image"] -axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"] +pdfium = ["pile-value/pdfium"] +axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde"] diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index d8696f2..c3fc64d 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,6 +1,10 @@ use chrono::{DateTime, Utc}; use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::{ + source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest}, + value::{Item, PileValue}, +}; use serde_json::Value; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; @@ -9,13 +13,7 @@ use tokio::task::JoinSet; use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::{debug, info, trace, warn}; -use crate::{ - DataSource, Item, PileValue, - extract::MetaExtractor, - index::{DbFtsIndex, FtsLookupResult}, - path_ts_earliest, - source::{DirDataSource, S3DataSource}, -}; +use crate::index::{DbFtsIndex, FtsLookupResult}; #[derive(Debug, Error)] pub enum DatasetError { @@ -183,11 +181,12 @@ impl Datasets { let Some(item) = self.get(source, key).await else { return Ok(None); }; - let extractor = MetaExtractor::new(&item); - let root = PileValue::ObjectExtractor(Arc::new(extractor)); - let Some(value) = root.query(path).await? else { + + let item = PileValue::Item(item); + let Some(value) = item.query(path).await? else { return Ok(None); }; + Ok(Some(value.to_json().await?)) } diff --git a/crates/pile-dataset/src/extract/mod.rs b/crates/pile-dataset/src/extract/mod.rs deleted file mode 100644 index 2482e7f..0000000 --- a/crates/pile-dataset/src/extract/mod.rs +++ /dev/null @@ -1,165 +0,0 @@ -use pile_config::Label; -use std::{collections::HashMap, sync::Arc}; - -mod flac; -pub use flac::*; - -mod id3; -pub use id3::*; - -mod fs; -pub use fs::*; - -mod epub; -pub use epub::*; - -mod exif; -pub use exif::*; - -mod pdf; -pub use pdf::*; - -mod toml; -pub use toml::*; - -mod map; -pub use map::*; - -mod sidecar; -pub use sidecar::*; - -use crate::{Item, PileValue}; - -/// An attachment that extracts metadata from an [Item]. -/// -/// Metadata is exposed as an immutable map of {label: value}, -/// much like a json object. -#[async_trait::async_trait] -pub trait ObjectExtractor: Send + Sync { - /// Get the field at `name` from `item`. - /// - returns `None` if `name` is not a valid field - /// - returns `Some(Null)` if `name` is not available - async fn field(&self, name: &pile_config::Label) -> Result, std::io::Error>; - - /// Return all fields in this extractor. - /// `Self::field` must return [Some] for all these keys - /// and [None] for all others. - async fn fields(&self) -> Result, std::io::Error>; - - /// Convert this to a JSON value. - async fn to_json(&self) -> Result { - let keys = self.fields().await?; - let mut map = serde_json::Map::new(); - for k in &keys { - let v = match self.field(k).await? { - Some(x) => x, - None => continue, - }; - map.insert(k.to_string(), Box::pin(v.to_json()).await?); - } - - Ok(serde_json::Value::Object(map)) - } -} - -/// An attachment that extracts metadata from an [Item]. -/// -/// Metadata is exposed as an immutable list of values. -#[async_trait::async_trait] -pub trait ListExtractor: Send + Sync { - /// Get the item at index `idx`. - /// Indices start at zero, and must be consecutive. - /// - returns `None` if `idx` is out of range - /// - returns `Some(Null)` if `None` is at `idx` - async fn get(&self, idx: usize) -> Result, std::io::Error>; - - async fn len(&self) -> Result; - - async fn is_empty(&self) -> Result { - Ok(self.len().await? == 0) - } - - /// Convert this list to a JSON value. - async fn to_json(&self) -> Result { - let len = self.len().await?; - let mut list = Vec::with_capacity(len); - for i in 0..len { - #[expect(clippy::expect_used)] - let v = self - .get(i) - .await? - .expect("value must be present according to length"); - list.push(Box::pin(v.to_json()).await?); - } - - Ok(serde_json::Value::Array(list)) - } -} - -pub struct MetaExtractor { - inner: MapExtractor, -} - -impl MetaExtractor { - #[expect(clippy::unwrap_used)] - pub fn new(item: &Item) -> Self { - let inner = MapExtractor { - inner: HashMap::from([ - ( - Label::new("flac").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))), - ), - ( - Label::new("id3").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))), - ), - ( - Label::new("fs").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))), - ), - ( - Label::new("epub").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))), - ), - ( - Label::new("exif").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))), - ), - ( - Label::new("pdf").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))), - ), - ( - Label::new("toml").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))), - ), - ( - Label::new("sidecar").unwrap(), - crate::PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))), - ), - ]), - }; - - Self { inner } - } -} - -#[async_trait::async_trait] -impl ObjectExtractor for MetaExtractor { - async fn field(&self, name: &pile_config::Label) -> Result, std::io::Error> { - self.inner.field(name).await - } - - #[expect(clippy::unwrap_used)] - async fn fields(&self) -> Result, std::io::Error> { - return Ok(vec![ - Label::new("flac").unwrap(), - Label::new("id3").unwrap(), - Label::new("fs").unwrap(), - Label::new("epub").unwrap(), - Label::new("exif").unwrap(), - Label::new("pdf").unwrap(), - Label::new("sidecar").unwrap(), - ]); - } -} diff --git a/crates/pile-dataset/src/extract/pdf/pdf_cover.rs b/crates/pile-dataset/src/extract/pdf/pdf_cover.rs deleted file mode 100644 index 18a8832..0000000 --- a/crates/pile-dataset/src/extract/pdf/pdf_cover.rs +++ /dev/null @@ -1,95 +0,0 @@ -use image::ImageFormat; -use pdfium_render::prelude::*; -use pile_config::Label; -use std::{ - collections::HashMap, - io::{BufReader, Cursor}, - sync::{Arc, OnceLock}, -}; -use tracing::trace; - -use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; - -pub struct PdfCoverExtractor { - item: Item, - output: OnceLock>, -} - -impl PdfCoverExtractor { - pub fn new(item: &Item) -> Self { - Self { - item: item.clone(), - output: OnceLock::new(), - } - } - - async fn get_inner(&self) -> Result<&HashMap, std::io::Error> { - if let Some(x) = self.output.get() { - return Ok(x); - } - - let reader = SyncReadBridge::new_current(self.item.read().await?); - let cover = tokio::task::spawn_blocking(move || { - let mut bytes = Vec::new(); - std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?; - - let pdfium = Pdfium::default(); - - let document = pdfium - .load_pdf_from_byte_slice(&bytes, None) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - - let render_config = PdfRenderConfig::new().set_target_width(1024); - - let page = document - .pages() - .get(0) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - - let image = page - .render_with_config(&render_config) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))? - .as_image(); - - let mut png_bytes = Vec::new(); - image - .write_to(&mut Cursor::new(&mut png_bytes), ImageFormat::Png) - .map_err(|e| std::io::Error::other(e.to_string()))?; - - Ok::<_, std::io::Error>(png_bytes) - }) - .await - .map_err(std::io::Error::other)?; - - let output = match cover { - Ok(data) => { - #[expect(clippy::unwrap_used)] - let label = Label::new("cover").unwrap(); - HashMap::from([( - label, - PileValue::Blob { - mime: mime::IMAGE_PNG, - bytes: Arc::new(data), - }, - )]) - } - Err(error) => { - trace!(message = "Could not render pdf cover", ?error, key = ?self.item.key()); - HashMap::new() - } - }; - - return Ok(self.output.get_or_init(|| output)); - } -} - -#[async_trait::async_trait] -impl ObjectExtractor for PdfCoverExtractor { - async fn field(&self, name: &Label) -> Result, std::io::Error> { - Ok(self.get_inner().await?.get(name).cloned()) - } - - async fn fields(&self) -> Result, std::io::Error> { - Ok(self.get_inner().await?.keys().cloned().collect()) - } -} diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index 182e92a..bf70551 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -1,9 +1,6 @@ -use itertools::Itertools; -use pile_config::{Case, ConfigToml, DatasetFts, FieldSpecPost, Label}; -use std::{ - path::PathBuf, - sync::{Arc, LazyLock}, -}; +use pile_config::{ConfigToml, DatasetFts, Label}; +use pile_value::value::{Item, PileValue}; +use std::{path::PathBuf, sync::LazyLock}; use tantivy::{ DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError, collector::Collector, @@ -12,8 +9,6 @@ use tantivy::{ }; use tracing::{debug, trace, warn}; -use crate::{Item, PileValue, extract::MetaExtractor}; - #[derive(Debug, Clone)] pub struct FtsLookupResult { pub score: f32, @@ -76,11 +71,11 @@ impl DbFtsIndex { doc.add_text(self.schema.get_field("_meta_source")?, item.source_name()); doc.add_text(self.schema.get_field("_meta_key")?, key); - let extractor = PileValue::ObjectExtractor(Arc::new(MetaExtractor::new(item))); + let item = PileValue::Item(item.clone()); let mut empty = true; for name in self.fts_cfg().fields.keys() { - let x = self.get_field(&extractor, name).await?; + let x = self.get_field(&item, name).await?; let val = match x { Some(x) => x, @@ -135,13 +130,6 @@ impl DbFtsIndex { x => x.clone(), }; - for post in &field.post { - val = match apply(post, &val) { - Some(x) => x, - None => return Ok(None), - }; - } - loop { val = match val { PileValue::String(x) => return Ok(Some(x.to_string())), @@ -186,6 +174,15 @@ impl DbFtsIndex { continue 'outer; } + PileValue::Item(_) => { + trace!( + message = "Skipping field, is item", + field = field_name.to_string(), + ?path, + ); + continue 'outer; + } + PileValue::ListExtractor(_) => { trace!( message = "Skipping field, is ListExtractor", @@ -296,104 +293,3 @@ impl DbFtsIndex { return Ok(out); } } - -pub fn apply(post: &FieldSpecPost, val: &PileValue) -> Option { - Some(match post { - FieldSpecPost::NotEmpty { notempty: false } => val.clone(), - FieldSpecPost::NotEmpty { notempty: true } => match val { - PileValue::Null => return None, - PileValue::String(x) if x.is_empty() => return None, - PileValue::Array(x) if x.is_empty() => return None, - x => x.clone(), - }, - - FieldSpecPost::SetCase { case: Case::Lower } => match val { - PileValue::Null => return None, - PileValue::U64(_) => return None, - PileValue::I64(_) => return None, - PileValue::Blob { .. } => return None, - PileValue::ObjectExtractor(_) => return None, - PileValue::ListExtractor(_) => return None, - PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_lowercase().into())), - - PileValue::Array(x) => PileValue::Array(Arc::new( - x.iter().map(|x| apply(post, x)).collect::>()?, - )), - }, - - FieldSpecPost::SetCase { case: Case::Upper } => match val { - PileValue::Null => return None, - PileValue::U64(_) => return None, - PileValue::I64(_) => return None, - PileValue::Blob { .. } => return None, - PileValue::ObjectExtractor(_) => return None, - PileValue::ListExtractor(_) => return None, - PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_uppercase().into())), - - PileValue::Array(x) => PileValue::Array(Arc::new( - x.iter() - .map(|x| apply(post, x)) - .collect::>>()?, - )), - }, - - FieldSpecPost::TrimSuffix { trim_suffix } => match val { - PileValue::Null => return None, - PileValue::U64(_) => return None, - PileValue::I64(_) => return None, - PileValue::Blob { .. } => return None, - PileValue::ObjectExtractor(_) => return None, - PileValue::ListExtractor(_) => return None, - - PileValue::String(x) => PileValue::String(Arc::new( - x.strip_suffix(trim_suffix).unwrap_or(x.as_str()).into(), - )), - - PileValue::Array(x) => PileValue::Array(Arc::new( - x.iter() - .map(|x| apply(post, x)) - .collect::>>()?, - )), - }, - - FieldSpecPost::TrimPrefix { trim_prefix } => match val { - PileValue::Null => return None, - PileValue::U64(_) => return None, - PileValue::I64(_) => return None, - PileValue::Blob { .. } => return None, - PileValue::ObjectExtractor(_) => return None, - PileValue::ListExtractor(_) => return None, - - PileValue::String(x) => PileValue::String(Arc::new( - x.strip_prefix(trim_prefix).unwrap_or(x.as_str()).into(), - )), - - PileValue::Array(x) => PileValue::Array(Arc::new( - x.iter() - .map(|x| apply(post, x)) - .collect::>>()?, - )), - }, - - FieldSpecPost::Join { join } => match val { - PileValue::Null => return None, - PileValue::U64(_) => return None, - PileValue::I64(_) => return None, - PileValue::Blob { .. } => return None, - PileValue::ObjectExtractor(_) => return None, - PileValue::ListExtractor(_) => return None, - - PileValue::String(x) => PileValue::String(x.clone()), - - PileValue::Array(x) => PileValue::String(Arc::new( - x.iter() - .map(|x| apply(post, x)) - .map(|x| x.and_then(|x| x.as_str().map(|x| x.to_owned()))) - .collect::>>()? - .into_iter() - .join(join) - .into(), - )), - }, - }) -} diff --git a/crates/pile-dataset/src/lib.rs b/crates/pile-dataset/src/lib.rs index a7295a5..b27410c 100644 --- a/crates/pile-dataset/src/lib.rs +++ b/crates/pile-dataset/src/lib.rs @@ -1,21 +1,7 @@ -mod traits; -pub use traits::*; - -mod misc; -pub use misc::*; - mod dataset; pub use dataset::{Dataset, DatasetError, Datasets}; -mod item; -pub use item::*; - -mod value; -pub use value::*; - -pub mod extract; pub mod index; -pub mod source; #[cfg(feature = "axum")] pub mod serve; diff --git a/crates/pile-dataset/src/serve/field.rs b/crates/pile-dataset/src/serve/field.rs index 85dfcb6..eab1dff 100644 --- a/crates/pile-dataset/src/serve/field.rs +++ b/crates/pile-dataset/src/serve/field.rs @@ -5,12 +5,13 @@ use axum::{ response::{IntoResponse, Response}, }; use pile_config::{Label, objectpath::ObjectPath}; +use pile_value::value::PileValue; use serde::Deserialize; use std::{sync::Arc, time::Instant}; use tracing::debug; use utoipa::ToSchema; -use crate::{Datasets, PileValue, extract::MetaExtractor}; +use crate::Datasets; #[derive(Deserialize, ToSchema)] pub struct FieldQuery { @@ -61,10 +62,8 @@ pub async fn get_field( return StatusCode::NOT_FOUND.into_response(); }; - let extractor = MetaExtractor::new(&item); - let root: PileValue = PileValue::ObjectExtractor(Arc::new(extractor)); - - let value = match root.query(&path).await { + let item = PileValue::Item(item); + let value = match item.query(&path).await { Ok(Some(v)) => v, Ok(None) => return StatusCode::NOT_FOUND.into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs index 7ee6325..83bbc1e 100644 --- a/crates/pile-dataset/src/serve/item.rs +++ b/crates/pile-dataset/src/serve/item.rs @@ -4,12 +4,13 @@ use axum::{ response::{IntoResponse, Response}, }; use pile_config::Label; +use pile_value::value::AsyncReader; use serde::Deserialize; use std::{sync::Arc, time::Instant}; use tracing::debug; use utoipa::ToSchema; -use crate::{AsyncReader, Datasets}; +use crate::Datasets; #[derive(Deserialize, ToSchema)] pub struct ItemQuery { diff --git a/crates/pile-dataset/src/source/mod.rs b/crates/pile-dataset/src/source/mod.rs deleted file mode 100644 index b12895e..0000000 --- a/crates/pile-dataset/src/source/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod dir; -pub use dir::*; - -mod s3; -pub use s3::*; diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml new file mode 100644 index 0000000..66394f1 --- /dev/null +++ b/crates/pile-value/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "pile-value" +version = { workspace = true } +rust-version = { workspace = true } +edition = { workspace = true } + +[lints] +workspace = true + +[dependencies] +pile-config = { workspace = true } +pile-flac = { workspace = true } + +serde_json = { workspace = true } +walkdir = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } +toml = { workspace = true } +smartstring = { workspace = true } +blake3 = { workspace = true } +epub = { workspace = true } +kamadak-exif = { workspace = true } +pdf = { workspace = true } +pdfium-render = { workspace = true, optional = true } +image = { workspace = true, optional = true } +id3 = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +async-trait = { workspace = true } +aws-sdk-s3 = { workspace = true } +mime = { workspace = true } +mime_guess = { workspace = true } + +[features] +default = [] +pdfium = ["dep:pdfium-render", "dep:image"] diff --git a/crates/pile-dataset/build.rs b/crates/pile-value/build.rs similarity index 100% rename from crates/pile-dataset/build.rs rename to crates/pile-value/build.rs diff --git a/crates/pile-dataset/src/extract/epub/epub_meta.rs b/crates/pile-value/src/extract/item/epub/epub_meta.rs similarity index 95% rename from crates/pile-dataset/src/extract/epub/epub_meta.rs rename to crates/pile-value/src/extract/item/epub/epub_meta.rs index 49b218b..31d2d47 100644 --- a/crates/pile-dataset/src/extract/epub/epub_meta.rs +++ b/crates/pile-value/src/extract/item/epub/epub_meta.rs @@ -6,7 +6,10 @@ use std::{ }; use tracing::trace; -use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct EpubMetaExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/epub/epub_text.rs b/crates/pile-value/src/extract/item/epub/epub_text.rs similarity index 96% rename from crates/pile-dataset/src/extract/epub/epub_text.rs rename to crates/pile-value/src/extract/item/epub/epub_text.rs index 5c47b15..e04b83c 100644 --- a/crates/pile-dataset/src/extract/epub/epub_text.rs +++ b/crates/pile-value/src/extract/item/epub/epub_text.rs @@ -6,7 +6,10 @@ use std::{ }; use tracing::debug; -use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct EpubTextExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/epub/mod.rs b/crates/pile-value/src/extract/item/epub/mod.rs similarity index 91% rename from crates/pile-dataset/src/extract/epub/mod.rs rename to crates/pile-value/src/extract/item/epub/mod.rs index ddfdf35..f742dc0 100644 --- a/crates/pile-dataset/src/extract/epub/mod.rs +++ b/crates/pile-value/src/extract/item/epub/mod.rs @@ -7,7 +7,10 @@ pub use epub_meta::*; mod epub_text; pub use epub_text::*; -use crate::{Item, PileValue, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue}, +}; pub struct EpubExtractor { text: Arc, diff --git a/crates/pile-dataset/src/extract/exif.rs b/crates/pile-value/src/extract/item/exif.rs similarity index 95% rename from crates/pile-dataset/src/extract/exif.rs rename to crates/pile-value/src/extract/item/exif.rs index aa1c54e..56a5f8d 100644 --- a/crates/pile-dataset/src/extract/exif.rs +++ b/crates/pile-value/src/extract/item/exif.rs @@ -6,7 +6,10 @@ use std::{ }; use tracing::trace; -use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct ExifExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/flac.rs b/crates/pile-value/src/extract/item/flac.rs similarity index 97% rename from crates/pile-dataset/src/extract/flac.rs rename to crates/pile-value/src/extract/item/flac.rs index 531bc5f..4d00a88 100644 --- a/crates/pile-dataset/src/extract/flac.rs +++ b/crates/pile-value/src/extract/item/flac.rs @@ -8,8 +8,8 @@ use std::{ }; use crate::{ - Item, PileValue, SyncReadBridge, - extract::{ListExtractor, ObjectExtractor}, + extract::traits::{ListExtractor, ObjectExtractor}, + value::{Item, PileValue, SyncReadBridge}, }; pub struct FlacImagesExtractor { diff --git a/crates/pile-dataset/src/extract/fs.rs b/crates/pile-value/src/extract/item/fs.rs similarity index 96% rename from crates/pile-dataset/src/extract/fs.rs rename to crates/pile-value/src/extract/item/fs.rs index ae396cd..a1cbe40 100644 --- a/crates/pile-dataset/src/extract/fs.rs +++ b/crates/pile-value/src/extract/item/fs.rs @@ -5,7 +5,10 @@ use std::{ sync::{Arc, OnceLock}, }; -use crate::{Item, PileValue, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue}, +}; pub struct FsExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/id3.rs b/crates/pile-value/src/extract/item/id3.rs similarity index 97% rename from crates/pile-dataset/src/extract/id3.rs rename to crates/pile-value/src/extract/item/id3.rs index 64cdd47..8b787de 100644 --- a/crates/pile-dataset/src/extract/id3.rs +++ b/crates/pile-value/src/extract/item/id3.rs @@ -7,7 +7,10 @@ use std::{ sync::{Arc, OnceLock}, }; -use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct Id3Extractor { item: Item, diff --git a/crates/pile-value/src/extract/item/mod.rs b/crates/pile-value/src/extract/item/mod.rs new file mode 100644 index 0000000..0bea6f1 --- /dev/null +++ b/crates/pile-value/src/extract/item/mod.rs @@ -0,0 +1,99 @@ +mod flac; +use std::{collections::HashMap, sync::Arc}; + +pub use flac::*; + +mod id3; +pub use id3::*; + +mod fs; +pub use fs::*; + +mod epub; +pub use epub::*; + +mod exif; +pub use exif::*; + +mod pdf; +pub use pdf::*; + +mod toml; +use pile_config::Label; +pub use toml::*; + +mod sidecar; +pub use sidecar::*; + +use crate::{ + extract::{misc::MapExtractor, traits::ObjectExtractor}, + value::{Item, PileValue}, +}; + +pub struct ItemExtractor { + inner: MapExtractor, +} + +impl ItemExtractor { + #[expect(clippy::unwrap_used)] + pub fn new(item: &Item) -> Self { + let inner = MapExtractor { + inner: HashMap::from([ + ( + Label::new("flac").unwrap(), + PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))), + ), + ( + Label::new("id3").unwrap(), + PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))), + ), + ( + Label::new("fs").unwrap(), + PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))), + ), + ( + Label::new("epub").unwrap(), + PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))), + ), + ( + Label::new("exif").unwrap(), + PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))), + ), + ( + Label::new("pdf").unwrap(), + PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))), + ), + ( + Label::new("toml").unwrap(), + PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))), + ), + ( + Label::new("sidecar").unwrap(), + PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))), + ), + ]), + }; + + Self { inner } + } +} + +#[async_trait::async_trait] +impl ObjectExtractor for ItemExtractor { + async fn field(&self, name: &pile_config::Label) -> Result, std::io::Error> { + self.inner.field(name).await + } + + #[expect(clippy::unwrap_used)] + async fn fields(&self) -> Result, std::io::Error> { + return Ok(vec![ + Label::new("flac").unwrap(), + Label::new("id3").unwrap(), + Label::new("fs").unwrap(), + Label::new("epub").unwrap(), + Label::new("exif").unwrap(), + Label::new("pdf").unwrap(), + Label::new("sidecar").unwrap(), + ]); + } +} diff --git a/crates/pile-dataset/src/extract/pdf/mod.rs b/crates/pile-value/src/extract/item/pdf/mod.rs similarity index 78% rename from crates/pile-dataset/src/extract/pdf/mod.rs rename to crates/pile-value/src/extract/item/pdf/mod.rs index 816bb35..2a5f6e3 100644 --- a/crates/pile-dataset/src/extract/pdf/mod.rs +++ b/crates/pile-value/src/extract/item/pdf/mod.rs @@ -1,11 +1,6 @@ use pile_config::Label; use std::sync::Arc; -#[cfg(feature = "pdfium")] -mod pdf_cover; -#[cfg(feature = "pdfium")] -pub use pdf_cover::*; - #[cfg(feature = "pdfium")] mod pdf_pages; #[cfg(feature = "pdfium")] @@ -17,14 +12,15 @@ pub use pdf_meta::*; mod pdf_text; pub use pdf_text::*; -use crate::{Item, PileValue, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue}, +}; pub struct PdfExtractor { text: Arc, meta: Arc, #[cfg(feature = "pdfium")] - cover: Arc, - #[cfg(feature = "pdfium")] pages: Arc, } @@ -34,8 +30,6 @@ impl PdfExtractor { text: Arc::new(PdfTextExtractor::new(item)), meta: Arc::new(PdfMetaExtractor::new(item)), #[cfg(feature = "pdfium")] - cover: Arc::new(PdfCoverExtractor::new(item)), - #[cfg(feature = "pdfium")] pages: Arc::new(PdfPagesExtractor::new(item)), } } @@ -48,8 +42,6 @@ impl ObjectExtractor for PdfExtractor { "text" => self.text.field(name).await, "meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))), #[cfg(feature = "pdfium")] - "cover" => self.cover.field(name).await, - #[cfg(feature = "pdfium")] "pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))), _ => Ok(None), } diff --git a/crates/pile-dataset/src/extract/pdf/pdf_meta.rs b/crates/pile-value/src/extract/item/pdf/pdf_meta.rs similarity index 97% rename from crates/pile-dataset/src/extract/pdf/pdf_meta.rs rename to crates/pile-value/src/extract/item/pdf/pdf_meta.rs index 4c9ba5d..9c07599 100644 --- a/crates/pile-dataset/src/extract/pdf/pdf_meta.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_meta.rs @@ -8,8 +8,10 @@ use std::{ }; use tracing::trace; -use crate::extract::ObjectExtractor; -use crate::{Item, PileValue, SyncReadBridge}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct PdfMetaExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/pdf/pdf_pages.rs b/crates/pile-value/src/extract/item/pdf/pdf_pages.rs similarity index 96% rename from crates/pile-dataset/src/extract/pdf/pdf_pages.rs rename to crates/pile-value/src/extract/item/pdf/pdf_pages.rs index de38e0b..66eb751 100644 --- a/crates/pile-dataset/src/extract/pdf/pdf_pages.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_pages.rs @@ -6,7 +6,10 @@ use std::{ }; use tracing::trace; -use crate::{Item, PileValue, SyncReadBridge, extract::ListExtractor}; +use crate::{ + extract::traits::ListExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct PdfPagesExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/pdf/pdf_text.rs b/crates/pile-value/src/extract/item/pdf/pdf_text.rs similarity index 96% rename from crates/pile-dataset/src/extract/pdf/pdf_text.rs rename to crates/pile-value/src/extract/item/pdf/pdf_text.rs index 2bf33a3..f8130e7 100644 --- a/crates/pile-dataset/src/extract/pdf/pdf_text.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_text.rs @@ -8,8 +8,10 @@ use std::{ }; use tracing::trace; -use crate::extract::ObjectExtractor; -use crate::{Item, PileValue, SyncReadBridge}; +use crate::{ + extract::traits::ObjectExtractor, + value::{Item, PileValue, SyncReadBridge}, +}; pub struct PdfTextExtractor { item: Item, diff --git a/crates/pile-dataset/src/extract/sidecar.rs b/crates/pile-value/src/extract/item/sidecar.rs similarity index 90% rename from crates/pile-dataset/src/extract/sidecar.rs rename to crates/pile-value/src/extract/item/sidecar.rs index 663afe0..be80951 100644 --- a/crates/pile-dataset/src/extract/sidecar.rs +++ b/crates/pile-value/src/extract/item/sidecar.rs @@ -1,9 +1,10 @@ use pile_config::Label; use std::sync::OnceLock; +use super::TomlExtractor; use crate::{ - Item, PileValue, - extract::{ObjectExtractor, TomlExtractor}, + extract::traits::ObjectExtractor, + value::{Item, PileValue}, }; pub struct SidecarExtractor { diff --git a/crates/pile-dataset/src/extract/toml.rs b/crates/pile-value/src/extract/item/toml.rs similarity index 95% rename from crates/pile-dataset/src/extract/toml.rs rename to crates/pile-value/src/extract/item/toml.rs index abdde13..91056ff 100644 --- a/crates/pile-dataset/src/extract/toml.rs +++ b/crates/pile-value/src/extract/item/toml.rs @@ -4,7 +4,10 @@ use std::{ sync::{Arc, OnceLock}, }; -use crate::{AsyncReader, Item, PileValue, extract::ObjectExtractor}; +use crate::{ + extract::traits::ObjectExtractor, + value::{AsyncReader, Item, PileValue}, +}; fn toml_to_pile(value: toml::Value) -> PileValue { match value { diff --git a/crates/pile-value/src/extract/misc/list.rs b/crates/pile-value/src/extract/misc/list.rs new file mode 100644 index 0000000..65ceba3 --- /dev/null +++ b/crates/pile-value/src/extract/misc/list.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use crate::{extract::traits::ListExtractor, value::PileValue}; + +pub struct ArrayExtractor { + inner: Arc>, +} + +impl ArrayExtractor { + pub fn new(inner: Arc>) -> Self { + Self { inner } + } +} + +#[async_trait::async_trait] +impl ListExtractor for ArrayExtractor { + async fn get(&self, idx: usize) -> Result, std::io::Error> { + Ok(self.inner.get(idx).cloned()) + } + + async fn len(&self) -> Result { + Ok(self.inner.len()) + } +} diff --git a/crates/pile-dataset/src/extract/map.rs b/crates/pile-value/src/extract/misc/map.rs similarity index 76% rename from crates/pile-dataset/src/extract/map.rs rename to crates/pile-value/src/extract/misc/map.rs index 420a911..8005678 100644 --- a/crates/pile-dataset/src/extract/map.rs +++ b/crates/pile-value/src/extract/misc/map.rs @@ -1,10 +1,11 @@ use pile_config::Label; use std::collections::HashMap; -use crate::{PileValue, extract::ObjectExtractor}; +use crate::{extract::traits::ObjectExtractor, value::PileValue}; +#[derive(Default)] pub struct MapExtractor { - pub(crate) inner: HashMap, + pub inner: HashMap, } #[async_trait::async_trait] diff --git a/crates/pile-value/src/extract/misc/mod.rs b/crates/pile-value/src/extract/misc/mod.rs new file mode 100644 index 0000000..766d291 --- /dev/null +++ b/crates/pile-value/src/extract/misc/mod.rs @@ -0,0 +1,8 @@ +mod list; +pub use list::*; + +mod vec; +pub use vec::*; + +mod map; +pub use map::*; diff --git a/crates/pile-value/src/extract/misc/vec.rs b/crates/pile-value/src/extract/misc/vec.rs new file mode 100644 index 0000000..ef1a5c0 --- /dev/null +++ b/crates/pile-value/src/extract/misc/vec.rs @@ -0,0 +1,17 @@ +use crate::{extract::traits::ListExtractor, value::PileValue}; + +#[derive(Default)] +pub struct VecExtractor { + pub inner: Vec, +} + +#[async_trait::async_trait] +impl ListExtractor for VecExtractor { + async fn get(&self, idx: usize) -> Result, std::io::Error> { + Ok(self.inner.get(idx).cloned()) + } + + async fn len(&self) -> Result { + Ok(self.inner.len()) + } +} diff --git a/crates/pile-value/src/extract/mod.rs b/crates/pile-value/src/extract/mod.rs new file mode 100644 index 0000000..2e91ea1 --- /dev/null +++ b/crates/pile-value/src/extract/mod.rs @@ -0,0 +1,4 @@ +pub mod item; +pub mod misc; +pub mod string; +pub mod traits; diff --git a/crates/pile-value/src/extract/string.rs b/crates/pile-value/src/extract/string.rs new file mode 100644 index 0000000..34dafe2 --- /dev/null +++ b/crates/pile-value/src/extract/string.rs @@ -0,0 +1,51 @@ +use pile_config::Label; +use smartstring::{LazyCompact, SmartString}; +use std::sync::Arc; + +use crate::{extract::traits::ObjectExtractor, value::PileValue}; + +pub struct StringExtractor { + item: Arc>, +} + +impl StringExtractor { + pub fn new(item: &Arc>) -> Self { + Self { item: item.clone() } + } +} + +#[async_trait::async_trait] +impl ObjectExtractor for StringExtractor { + async fn field(&self, name: &Label) -> Result, std::io::Error> { + Ok(match name.as_str() { + "trim" => Some(PileValue::String(Arc::new( + self.item.as_str().trim().into(), + ))), + + "upper" => Some(PileValue::String(Arc::new( + self.item.as_str().to_lowercase().into(), + ))), + + "lower" => Some(PileValue::String(Arc::new( + self.item.as_str().to_uppercase().into(), + ))), + + "nonempty" => Some(match self.item.is_empty() { + true => PileValue::Null, + false => PileValue::String(self.item.clone()), + }), + + _ => None, + }) + } + + #[expect(clippy::unwrap_used)] + async fn fields(&self) -> Result, std::io::Error> { + return Ok(vec![ + Label::new("trim").unwrap(), + Label::new("upper").unwrap(), + Label::new("lower").unwrap(), + Label::new("nonempty").unwrap(), + ]); + } +} diff --git a/crates/pile-value/src/extract/traits.rs b/crates/pile-value/src/extract/traits.rs new file mode 100644 index 0000000..4ca0f17 --- /dev/null +++ b/crates/pile-value/src/extract/traits.rs @@ -0,0 +1,68 @@ +/// An attachment that extracts metadata from an [Item]. +/// +/// Metadata is exposed as an immutable map of {label: value}, +/// much like a json object. +#[async_trait::async_trait] +pub trait ObjectExtractor: Send + Sync { + /// Get the field at `name` from `item`. + /// - returns `None` if `name` is not a valid field + /// - returns `Some(Null)` if `name` is not available + async fn field( + &self, + name: &pile_config::Label, + ) -> Result, std::io::Error>; + + /// Return all fields in this extractor. + /// `Self::field` must return [Some] for all these keys + /// and [None] for all others. + async fn fields(&self) -> Result, std::io::Error>; + + /// Convert this to a JSON value. + async fn to_json(&self) -> Result { + let keys = self.fields().await?; + let mut map = serde_json::Map::new(); + for k in &keys { + let v = match self.field(k).await? { + Some(x) => x, + None => continue, + }; + map.insert(k.to_string(), Box::pin(v.to_json()).await?); + } + + Ok(serde_json::Value::Object(map)) + } +} + +/// An attachment that extracts metadata from an [Item]. +/// +/// Metadata is exposed as an immutable list of values. +#[async_trait::async_trait] +pub trait ListExtractor: Send + Sync { + /// Get the item at index `idx`. + /// Indices start at zero, and must be consecutive. + /// - returns `None` if `idx` is out of range + /// - returns `Some(Null)` if `None` is at `idx` + async fn get(&self, idx: usize) -> Result, std::io::Error>; + + async fn len(&self) -> Result; + + async fn is_empty(&self) -> Result { + Ok(self.len().await? == 0) + } + + /// Convert this list to a JSON value. + async fn to_json(&self) -> Result { + let len = self.len().await?; + let mut list = Vec::with_capacity(len); + for i in 0..len { + #[expect(clippy::expect_used)] + let v = self + .get(i) + .await? + .expect("value must be present according to length"); + list.push(Box::pin(v.to_json()).await?); + } + + Ok(serde_json::Value::Array(list)) + } +} diff --git a/crates/pile-value/src/lib.rs b/crates/pile-value/src/lib.rs new file mode 100644 index 0000000..1025418 --- /dev/null +++ b/crates/pile-value/src/lib.rs @@ -0,0 +1,3 @@ +pub mod extract; +pub mod source; +pub mod value; diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-value/src/source/dir.rs similarity index 97% rename from crates/pile-dataset/src/source/dir.rs rename to crates/pile-value/src/source/dir.rs index 8df2f71..aa7361d 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-value/src/source/dir.rs @@ -4,7 +4,10 @@ use std::{path::PathBuf, sync::Arc}; use tokio_stream::wrappers::ReceiverStream; use walkdir::WalkDir; -use crate::{DataSource, Item, path_ts_latest}; +use crate::{ + source::{DataSource, misc::path_ts_latest}, + value::Item, +}; #[derive(Debug)] pub struct DirDataSource { diff --git a/crates/pile-dataset/src/misc.rs b/crates/pile-value/src/source/misc.rs similarity index 100% rename from crates/pile-dataset/src/misc.rs rename to crates/pile-value/src/source/misc.rs diff --git a/crates/pile-dataset/src/traits.rs b/crates/pile-value/src/source/mod.rs similarity index 61% rename from crates/pile-dataset/src/traits.rs rename to crates/pile-value/src/source/mod.rs index 8a2b4ea..bf89bfd 100644 --- a/crates/pile-dataset/src/traits.rs +++ b/crates/pile-value/src/source/mod.rs @@ -1,15 +1,24 @@ +mod dir; +pub use dir::*; + +mod s3; +pub use s3::*; + +pub mod misc; + use chrono::{DateTime, Utc}; use tokio_stream::wrappers::ReceiverStream; -use crate::Item; - /// A read-only set of [Item]s. pub trait DataSource { /// Get an item from this datasource - fn get(&self, key: &str) -> impl Future, std::io::Error>> + Send; + fn get( + &self, + key: &str, + ) -> impl Future, std::io::Error>> + Send; /// Iterate over all items in this source in an arbitrary order - fn iter(&self) -> ReceiverStream>; + fn iter(&self) -> ReceiverStream>; /// Return the time of the latest change to the data in this source fn latest_change( diff --git a/crates/pile-dataset/src/source/s3.rs b/crates/pile-value/src/source/s3.rs similarity index 99% rename from crates/pile-dataset/src/source/s3.rs rename to crates/pile-value/src/source/s3.rs index 8525128..e635580 100644 --- a/crates/pile-dataset/src/source/s3.rs +++ b/crates/pile-value/src/source/s3.rs @@ -5,7 +5,7 @@ use smartstring::{LazyCompact, SmartString}; use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; -use crate::{DataSource, Item}; +use crate::{source::DataSource, value::Item}; #[derive(Debug)] pub struct S3DataSource { diff --git a/crates/pile-dataset/src/source/s3reader.rs b/crates/pile-value/src/source/s3reader.rs similarity index 100% rename from crates/pile-dataset/src/source/s3reader.rs rename to crates/pile-value/src/source/s3reader.rs diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs new file mode 100644 index 0000000..dadede7 --- /dev/null +++ b/crates/pile-value/src/value/item.rs @@ -0,0 +1,105 @@ +use mime::Mime; +use smartstring::{LazyCompact, SmartString}; +use std::{fs::File, path::PathBuf, sync::Arc}; + +use crate::{ + source::{DirDataSource, S3DataSource}, + value::{ItemReader, S3Reader}, +}; + +// +// MARK: item +// + +/// A cheaply-clonable pointer to an item in a dataset +#[derive(Debug, Clone)] +pub enum Item { + File { + source: Arc, + mime: Mime, + + path: PathBuf, + sidecar: Option>, + }, + + S3 { + source: Arc, + mime: Mime, + + key: SmartString, + sidecar: Option>, + }, +} + +impl Item { + /// Open the item for reading. For S3, performs a HEAD request to determine + /// the object size. + pub async fn read(&self) -> Result { + Ok(match self { + Self::File { path, .. } => ItemReader::File(File::open(path)?), + + Self::S3 { source, key, .. } => { + let head = source + .client + .head_object() + .bucket(source.bucket.as_str()) + .key(key.as_str()) + .send() + .await + .map_err(std::io::Error::other)?; + + let size = head.content_length().unwrap_or(0) as u64; + + ItemReader::S3(S3Reader { + client: source.client.clone(), + bucket: source.bucket.clone(), + key: key.to_owned(), + cursor: 0, + size, + }) + } + }) + } + + pub fn source_name(&self) -> &pile_config::Label { + match self { + Self::File { source, .. } => &source.name, + Self::S3 { source, .. } => &source.name, + } + } + + #[expect(clippy::expect_used)] + pub fn key(&self) -> SmartString { + match self { + Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(), + Self::S3 { key, .. } => key.clone(), + } + } + + pub fn hash(&self) -> Result { + match self { + Self::File { path, .. } => { + let mut hasher = blake3::Hasher::new(); + let mut file = std::fs::File::open(path)?; + std::io::copy(&mut file, &mut hasher)?; + return Ok(hasher.finalize()); + } + + Self::S3 { .. } => todo!(), + } + } + + pub fn mime(&self) -> &Mime { + match self { + Self::File { mime, .. } => mime, + Self::S3 { mime, .. } => mime, + } + } + + pub fn sidecar(&self) -> Option<&Self> { + match self { + Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x), + Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x), + } + } +} diff --git a/crates/pile-value/src/value/mod.rs b/crates/pile-value/src/value/mod.rs new file mode 100644 index 0000000..cd64909 --- /dev/null +++ b/crates/pile-value/src/value/mod.rs @@ -0,0 +1,9 @@ +mod item; +pub use item::*; + +mod readers; +pub use readers::*; + +#[expect(clippy::module_inception)] +mod value; +pub use value::*; diff --git a/crates/pile-dataset/src/item.rs b/crates/pile-value/src/value/readers.rs similarity index 64% rename from crates/pile-dataset/src/item.rs rename to crates/pile-value/src/value/readers.rs index a2ebd4c..953c2e4 100644 --- a/crates/pile-dataset/src/item.rs +++ b/crates/pile-value/src/value/readers.rs @@ -1,114 +1,13 @@ -use mime::Mime; use smartstring::{LazyCompact, SmartString}; use std::{ fs::File, io::{Read, Seek, SeekFrom}, - path::PathBuf, sync::Arc, }; use tokio::runtime::Handle; -use crate::source::{DirDataSource, S3DataSource}; - // -// MARK: item -// - -/// A cheaply-clonable pointer to an item in a dataset -#[derive(Debug, Clone)] -pub enum Item { - File { - source: Arc, - mime: Mime, - - path: PathBuf, - sidecar: Option>, - }, - - S3 { - source: Arc, - mime: Mime, - - key: SmartString, - sidecar: Option>, - }, -} - -impl Item { - /// Open the item for reading. For S3, performs a HEAD request to determine - /// the object size. - pub async fn read(&self) -> Result { - Ok(match self { - Self::File { path, .. } => ItemReader::File(File::open(path)?), - - Self::S3 { source, key, .. } => { - let head = source - .client - .head_object() - .bucket(source.bucket.as_str()) - .key(key.as_str()) - .send() - .await - .map_err(std::io::Error::other)?; - - let size = head.content_length().unwrap_or(0) as u64; - - ItemReader::S3(S3Reader { - client: source.client.clone(), - bucket: source.bucket.clone(), - key: key.to_owned(), - cursor: 0, - size, - }) - } - }) - } - - pub fn source_name(&self) -> &pile_config::Label { - match self { - Self::File { source, .. } => &source.name, - Self::S3 { source, .. } => &source.name, - } - } - - #[expect(clippy::expect_used)] - pub fn key(&self) -> SmartString { - match self { - Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(), - Self::S3 { key, .. } => key.clone(), - } - } - - pub fn hash(&self) -> Result { - match self { - Self::File { path, .. } => { - let mut hasher = blake3::Hasher::new(); - let mut file = std::fs::File::open(path)?; - std::io::copy(&mut file, &mut hasher)?; - return Ok(hasher.finalize()); - } - - Self::S3 { .. } => todo!(), - } - } - - pub fn mime(&self) -> &Mime { - match self { - Self::File { mime, .. } => mime, - Self::S3 { mime, .. } => mime, - } - } - - pub fn sidecar(&self) -> Option<&Self> { - match self { - Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x), - Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x), - } - } -} - -// -// MARK: reader +// MARK: traits // pub trait AsyncReader: Send { @@ -210,11 +109,11 @@ impl AsyncSeekReader for ItemReader { // pub struct S3Reader { - client: Arc, - bucket: SmartString, - key: SmartString, - cursor: u64, - size: u64, + pub client: Arc, + pub bucket: SmartString, + pub key: SmartString, + pub cursor: u64, + pub size: u64, } impl AsyncReader for S3Reader { diff --git a/crates/pile-dataset/src/value.rs b/crates/pile-value/src/value/value.rs similarity index 58% rename from crates/pile-dataset/src/value.rs rename to crates/pile-value/src/value/value.rs index b34c376..fe6a58d 100644 --- a/crates/pile-dataset/src/value.rs +++ b/crates/pile-value/src/value/value.rs @@ -4,7 +4,15 @@ use serde_json::{Map, Value}; use smartstring::{LazyCompact, SmartString}; use std::sync::Arc; -use crate::extract::{ListExtractor, ObjectExtractor}; +use crate::{ + extract::{ + item::ItemExtractor, + misc::{ArrayExtractor, MapExtractor, VecExtractor}, + string::StringExtractor, + traits::{ListExtractor, ObjectExtractor}, + }, + value::Item, +}; /// An immutable, cheaply-clonable, lazily-computed value. /// Very similar to [serde_json::Value]. @@ -30,6 +38,9 @@ pub enum PileValue { /// A lazily-computed array ListExtractor(Arc), + + /// An pointer to an item in this dataset + Item(Item), } impl Clone for PileValue { @@ -46,11 +57,40 @@ impl Clone for PileValue { mime: mime.clone(), bytes: bytes.clone(), }, + Self::Item(i) => Self::Item(i.clone()), } } } impl PileValue { + pub fn object_extractor(&self) -> Arc { + match self { + Self::Null => Arc::new(MapExtractor::default()), + Self::U64(_) => Arc::new(MapExtractor::default()), + Self::I64(_) => Arc::new(MapExtractor::default()), + Self::Array(_) => Arc::new(MapExtractor::default()), + Self::String(s) => Arc::new(StringExtractor::new(s)), + Self::Blob { .. } => Arc::new(MapExtractor::default()), + Self::ListExtractor(_) => Arc::new(MapExtractor::default()), + Self::ObjectExtractor(e) => e.clone(), + Self::Item(i) => Arc::new(ItemExtractor::new(i)), + } + } + + pub fn list_extractor(&self) -> Arc { + match self { + Self::Null => Arc::new(VecExtractor::default()), + Self::U64(_) => Arc::new(VecExtractor::default()), + Self::I64(_) => Arc::new(VecExtractor::default()), + Self::Array(a) => Arc::new(ArrayExtractor::new(a.clone())), + Self::String(_) => Arc::new(VecExtractor::default()), + Self::Blob { .. } => Arc::new(VecExtractor::default()), + Self::ListExtractor(e) => e.clone(), + Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()), + Self::Item(_) => Arc::new(VecExtractor::default()), + } + } + pub async fn query(&self, query: &ObjectPath) -> Result, std::io::Error> { let mut out: Option = Some(self.clone()); @@ -58,50 +98,41 @@ impl PileValue { match s { PathSegment::Root => out = Some(self.clone()), PathSegment::Field(field) => { - out = match out { - None => return Ok(None), - Some(Self::Null) => None, - Some(Self::U64(_)) => None, - Some(Self::I64(_)) => None, - Some(Self::Array(_)) => None, - Some(Self::String(_)) => None, - Some(Self::Blob { .. }) => None, - Some(Self::ListExtractor(_)) => None, - Some(Self::ObjectExtractor(e)) => e.field(field).await?, - } + let e = match out.map(|x| x.object_extractor()) { + Some(e) => e, + None => { + out = None; + continue; + } + }; + + out = e.field(field).await?; } PathSegment::Index(idx) => { - out = match &out { - None => return Ok(None), - Some(Self::Null) => None, - Some(Self::U64(_)) => None, - Some(Self::I64(_)) => None, - Some(Self::Blob { .. }) => None, - Some(Self::Array(v)) => { - let idx = if *idx >= 0 { - usize::try_from(*idx).ok() - } else { - usize::try_from(v.len() as i64 - idx).ok() - }; - - idx.and_then(|idx| v.get(idx)).cloned() + let e = match out.map(|x| x.list_extractor()) { + Some(e) => e, + None => { + out = None; + continue; } - Some(Self::String(_)) => None, - Some(Self::ObjectExtractor(_)) => None, - Some(Self::ListExtractor(e)) => { - let idx = if *idx >= 0 { - usize::try_from(*idx).ok() - } else { - usize::try_from(e.len().await? as i64 - idx).ok() - }; + }; - match idx { - Some(idx) => e.get(idx).await?, - None => None, - } + let idx = if *idx >= 0 { + usize::try_from(*idx).ok() + } else { + usize::try_from(e.len().await? as i64 - idx).ok() + }; + + let idx = match idx { + Some(idx) => idx, + None => { + out = None; + continue; } - } + }; + + out = e.get(idx).await?; } } } @@ -127,7 +158,8 @@ impl PileValue { Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())), Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())), - Self::ObjectExtractor(e) => { + Self::ObjectExtractor(_) | Self::Item(_) => { + let e = self.object_extractor(); let keys = e.fields().await?; let mut map = Map::new(); for k in &keys { @@ -160,22 +192,27 @@ impl PileValue { Self::Null => Value::Null, Self::U64(x) => Value::Number((*x).into()), Self::I64(x) => Value::Number((*x).into()), + Self::String(x) => Value::String(x.to_string()), - // TODO: replace with something meaningful + // TODO: replace with something meaningful? Self::Blob { mime, bytes } => { Value::String(format!("", bytes.len())) } - Self::String(x) => Value::String(x.to_string()), - Self::Array(x) => { + #[expect(clippy::expect_used)] + Self::Array(_) | Self::ListExtractor(_) => { + let e = self.list_extractor(); + let len = e.len().await?; let mut arr = Vec::new(); - for item in &**x { - arr.push(Box::pin(item.to_json()).await?); + for i in 0..len { + let v = e.get(i).await?.expect("item must be present"); + arr.push(Box::pin(v.to_json()).await?); } Value::Array(arr) } - Self::ObjectExtractor(e) => { + Self::ObjectExtractor(_) | Self::Item(_) => { + let e = self.object_extractor(); let keys = e.fields().await?; let mut map = Map::new(); for k in &keys { @@ -187,8 +224,6 @@ impl PileValue { } Value::Object(map) } - - Self::ListExtractor(e) => e.to_json().await?, }) } } diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index 3e70249..e8a9e89 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] pile-toolbox = { workspace = true } pile-dataset = { workspace = true, features = ["axum", "pdfium"] } +pile-value = { workspace = true, features = ["pdfium"] } pile-config = { workspace = true } tracing = { workspace = true } diff --git a/crates/pile/src/command/annotate.rs b/crates/pile/src/command/annotate.rs index c46bdbc..90912b3 100644 --- a/crates/pile/src/command/annotate.rs +++ b/crates/pile/src/command/annotate.rs @@ -1,10 +1,12 @@ use anyhow::{Context, Result}; use clap::Args; use pile_config::{Label, Source}; -use pile_dataset::index::DbFtsIndex; -use pile_dataset::source::DirDataSource; -use pile_dataset::{DataSource, Datasets, Item, PileValue, extract::MetaExtractor}; +use pile_dataset::{Datasets, index::DbFtsIndex}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::{ + source::{DataSource, DirDataSource}, + value::{Item, PileValue}, +}; use std::{path::PathBuf, sync::Arc}; use tokio_stream::StreamExt; use tracing::{info, warn}; @@ -72,11 +74,9 @@ impl CliCmd for AnnotateCommand { continue; }; - let meta = MetaExtractor::new(&item); - let extractor = PileValue::ObjectExtractor(Arc::new(meta)); - + let item = PileValue::Item(item.clone()); let Some(value) = - index.get_field(&extractor, &field).await.with_context(|| { + index.get_field(&item, &field).await.with_context(|| { format!("while extracting field from {}", path.display()) })? else { diff --git a/crates/pile/src/command/fields.rs b/crates/pile/src/command/fields.rs index c61f1dd..45f80f6 100644 --- a/crates/pile/src/command/fields.rs +++ b/crates/pile/src/command/fields.rs @@ -1,9 +1,10 @@ use anyhow::{Context, Result}; use clap::Args; -use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; +use pile_dataset::Datasets; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::value::PileValue; use serde_json::{Map, Value}; -use std::{path::PathBuf, sync::Arc, time::Instant}; +use std::{path::PathBuf, time::Instant}; use tokio::task::JoinSet; use tokio_stream::StreamExt; use tracing::info; @@ -93,9 +94,8 @@ impl CliCmd for FieldsCommand { item_result.with_context(|| format!("while reading source {name}"))?; let name = name.clone(); join_set.spawn(async move { - let meta = MetaExtractor::new(&item); - let value = PileValue::ObjectExtractor(Arc::new(meta)); - let result = value.count_fields().await.with_context(|| { + let item = PileValue::Item(item); + let result = item.count_fields().await.with_context(|| { format!("while counting fields in source {name}") })?; Ok(result.and_then(|v| { diff --git a/crates/pile/src/command/list.rs b/crates/pile/src/command/list.rs index fbcd48b..a99b542 100644 --- a/crates/pile/src/command/list.rs +++ b/crates/pile/src/command/list.rs @@ -1,8 +1,9 @@ use anyhow::{Context, Result}; use clap::Args; use pile_config::objectpath::ObjectPath; -use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; +use pile_dataset::Datasets; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::value::PileValue; use std::{path::PathBuf, str::FromStr, sync::Arc}; use tokio::task::JoinSet; use tokio_stream::StreamExt; @@ -79,9 +80,8 @@ impl CliCmd for ListCommand { let invert = self.invert; join_set.spawn(async move { - let meta = MetaExtractor::new(&item); - let root = PileValue::ObjectExtractor(Arc::new(meta)); - let value = root.query(&path).await?; + let item = PileValue::Item(item); + let value = item.query(&path).await?; let is_present = matches!(value, Some(v) if !matches!(v, PileValue::Null)); diff --git a/crates/pile/src/command/probe.rs b/crates/pile/src/command/probe.rs index dc1d565..174d2d5 100644 --- a/crates/pile/src/command/probe.rs +++ b/crates/pile/src/command/probe.rs @@ -1,9 +1,10 @@ use anyhow::{Context, Result}; use clap::Args; use pile_config::{Label, objectpath::ObjectPath}; -use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; +use pile_dataset::Datasets; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use std::{path::PathBuf, sync::Arc}; +use pile_value::value::PileValue; +use std::path::PathBuf; use crate::{CliCmd, GlobalContext}; @@ -54,9 +55,8 @@ impl CliCmd for ProbeCommand { anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) })?; - let value = PileValue::ObjectExtractor(Arc::new(MetaExtractor::new(&item))); - value - .to_json() + let item = PileValue::Item(item); + item.to_json() .await .with_context(|| format!("while extracting {}", self.key))? };