diff --git a/Cargo.lock b/Cargo.lock index 60e8d84..257dd2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2042,15 +2042,15 @@ dependencies = [ "chrono", "percent-encoding", "pile-config", - "pile-io", "pile-toolbox", "pile-value", + "regex", "serde", "serde_json", "tantivy", "thiserror", "tokio", - "tokio-stream", + "tokio-util", "toml", "tracing", "utoipa", diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index 715af84..c2e3351 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fmt::Debug, path::PathBuf}; -use crate::{objectpath::ObjectPath, pattern::GroupPattern}; +use crate::objectpath::ObjectPath; mod misc; pub use misc::*; @@ -15,6 +15,15 @@ fn default_true() -> bool { true } +pub fn default_base() -> String { + "(.*)".to_owned() +} + +#[expect(clippy::unwrap_used)] +pub fn default_files() -> HashMap { + [(Label::new("item").unwrap(), "{base}".to_owned())].into() +} + #[test] #[expect(clippy::expect_used)] fn init_db_toml_valid() { @@ -51,9 +60,17 @@ pub enum Source { /// Must be relative. path: PathBuf, - /// How to group files into items in this source - #[serde(default)] - pattern: GroupPattern, + /// Regex that extracts an item key from a file path. + /// - File paths are relative to `path`. + /// - The first group in this regex is the file's item key. + #[serde(default = "default_base")] + base_pattern: String, + + /// Map of files included in each item.' + /// `{base}` is replaced with the string extraced by base_pattern. + /// Default is `{ item: "{base}" }` + #[serde(default = "default_files")] + files: HashMap, }, } diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 67534b9..8655072 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -11,8 +11,8 @@ workspace = true pile-config = { workspace = true } pile-toolbox = { workspace = true } pile-value = { workspace = true } -pile-io = { workspace = true } +regex = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } tracing = { workspace = true } @@ -20,7 +20,7 @@ chrono = { workspace = true } toml = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } +tokio-util = { version = "0.7", features = ["io"] } serde = { workspace = true, optional = true } axum = { workspace = true, optional = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 0ba2855..f23d615 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use pile_config::{ - ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern, + ConfigToml, DatasetConfig, Label, Source, default_base, default_files, objectpath::ObjectPath, }; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ @@ -8,6 +8,7 @@ use pile_value::{ source::{DataSource, DirDataSource, misc::path_ts_earliest}, value::{Item, PileValue}, }; +use regex::Regex; use serde_json::Value; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; @@ -107,7 +108,8 @@ impl Datasets { Source::Filesystem { enabled: true, path: path_parent.clone(), - pattern: GroupPattern::default(), + base_pattern: default_base(), + files: default_files(), }, )] .into_iter() @@ -125,18 +127,37 @@ impl Datasets { Source::Filesystem { enabled, path, - pattern, + base_pattern, + files, } => { let target = match enabled { true => &mut sources, false => &mut disabled_sources, }; + let base_regex = Regex::new(base_pattern).map_err(|e| { + std::io::Error::new( + ErrorKind::InvalidInput, + format!("invalid base_pattern: {e}"), + ) + })?; + if base_regex.captures_len() != 2 { + return Err(std::io::Error::new( + ErrorKind::InvalidInput, + "base_pattern must have exactly one capture group", + )); + } + target.insert( label.clone(), Dataset::Dir( - DirDataSource::new(label, path_parent.join(path), pattern.clone()) - .await?, + DirDataSource::new( + label, + path_parent.join(path), + base_regex, + files.clone(), + ) + .await?, ), ); } @@ -194,18 +215,37 @@ impl Datasets { Source::Filesystem { enabled, path, - pattern, + base_pattern, + files, } => { let target = match enabled { true => &mut sources, false => &mut disabled_sources, }; + let base_regex = Regex::new(base_pattern).map_err(|e| { + std::io::Error::new( + ErrorKind::InvalidInput, + format!("invalid base_pattern: {e}"), + ) + })?; + if base_regex.captures_len() != 2 { + return Err(std::io::Error::new( + ErrorKind::InvalidInput, + "base_pattern must have exactly one capture group", + )); + } + target.insert( label.clone(), Dataset::Dir( - DirDataSource::new(label, path_parent.join(path), pattern.clone()) - .await?, + DirDataSource::new( + label, + path_parent.join(path), + base_regex, + files.clone(), + ) + .await?, ), ); } diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index 0bf0369..edf4459 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -245,7 +245,7 @@ async fn val_to_string( PileValue::Null => {} PileValue::ObjectExtractor(_) => {} PileValue::Item(_) => {} - PileValue::Blob { .. } => {} + PileValue::Binary(_) => {} } return Ok(Vec::new()); diff --git a/crates/pile-dataset/src/serve/extract.rs b/crates/pile-dataset/src/serve/extract.rs index aa1cdd3..82615c9 100644 --- a/crates/pile-dataset/src/serve/extract.rs +++ b/crates/pile-dataset/src/serve/extract.rs @@ -1,14 +1,19 @@ use axum::{ Json, + body::Body, extract::{Query, RawQuery, State}, http::{StatusCode, header}, response::{IntoResponse, Response}, }; use percent_encoding::percent_decode_str; use pile_config::{Label, objectpath::ObjectPath}; -use pile_value::{extract::traits::ExtractState, value::PileValue}; +use pile_value::{ + extract::traits::ExtractState, + value::{BinaryPileValue, PileValue}, +}; use serde::Deserialize; use std::{sync::Arc, time::Instant}; +use tokio_util::io::ReaderStream; use tracing::debug; use utoipa::ToSchema; @@ -141,15 +146,30 @@ pub async fn get_extract( s.to_string(), ) .into_response(), - PileValue::Blob { mime, bytes } => ( - StatusCode::OK, - [ - (header::CONTENT_TYPE, mime.to_string()), - (header::CONTENT_DISPOSITION, disposition), - ], - bytes.as_ref().clone(), - ) - .into_response(), + + PileValue::Binary(binary) => { + let mime = binary.mime().to_string(); + let body = match binary { + BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()), + BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await { + Ok(file) => Body::from_stream(ReaderStream::new(file)), + Err(e) => { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")) + .into_response(); + } + }, + }; + ( + StatusCode::OK, + [ + (header::CONTENT_TYPE, mime), + (header::CONTENT_DISPOSITION, disposition), + ], + body, + ) + .into_response() + } + _ => match value.to_json(&extract_state).await { Ok(json) => ( StatusCode::OK, diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs deleted file mode 100644 index eacec63..0000000 --- a/crates/pile-dataset/src/serve/item.rs +++ /dev/null @@ -1,202 +0,0 @@ -use axum::{ - body::Body, - extract::{Query, State}, - http::{HeaderMap, StatusCode, header}, - response::{IntoResponse, Response}, -}; -use pile_config::Label; -use pile_io::{AsyncReader, AsyncSeekReader}; -use serde::Deserialize; -use std::{io::SeekFrom, sync::Arc, time::Instant}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::debug; -use utoipa::ToSchema; - -use crate::Datasets; - -#[derive(Deserialize, ToSchema)] -pub struct ItemQuery { - source: String, - key: String, - - #[serde(default)] - download: bool, - name: Option, -} - -/// Parse a `Range: bytes=...` header value. -/// Returns `(start, end)` where either may be `None` (suffix form has `None` start). -fn parse_byte_range(s: &str) -> Option<(Option, Option)> { - let spec = s.strip_prefix("bytes=")?; - if spec.contains(',') { - return None; // multiple ranges not supported - } - if let Some(suffix) = spec.strip_prefix('-') { - return Some((None, Some(suffix.parse().ok()?))); - } - let mut parts = spec.splitn(2, '-'); - let start: u64 = parts.next()?.parse().ok()?; - let end = parts - .next() - .and_then(|e| if e.is_empty() { None } else { e.parse().ok() }); - Some((Some(start), end)) -} - -/// Fetch the raw bytes of an item by source and key -#[utoipa::path( - get, - path = "/item", - params( - ("source" = String, Query, description = "Source label"), - ("key" = String, Query, description = "Item key"), - ("name" = Option, Query, description = "Downloaded filename; defaults to the last segment of the key"), - ), - responses( - (status = 200, description = "Raw item bytes"), - (status = 206, description = "Partial content"), - (status = 400, description = "Invalid source label"), - (status = 404, description = "Item not found"), - (status = 416, description = "Range not satisfiable"), - (status = 500, description = "Internal server error"), - ) -)] -pub async fn item_get( - State(state): State>, - Query(params): Query, - headers: HeaderMap, -) -> Response { - let start = Instant::now(); - debug!( - message = "Serving /item", - source = params.source, - key = params.key - ); - - let label = match Label::try_from(params.source.clone()) { - Ok(l) => l, - Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), - }; - - let Some(item) = state.get(&label, ¶ms.key).await else { - return StatusCode::NOT_FOUND.into_response(); - }; - - let mime = item.mime().to_string(); - - let mut reader = match item.read().await { - Ok(r) => r, - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - }; - - let total = match reader.seek(SeekFrom::End(0)).await { - Ok(n) => n, - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - }; - - let range = headers - .get(header::RANGE) - .and_then(|v| v.to_str().ok()) - .and_then(parse_byte_range); - - // Resolve (byte_start, byte_end, content_length, is_range) - let (byte_start, byte_end, length, is_range) = match range { - Some((Some(s), e)) => { - let e = e - .unwrap_or(total.saturating_sub(1)) - .min(total.saturating_sub(1)); - if s >= total || s > e { - return ( - StatusCode::RANGE_NOT_SATISFIABLE, - [(header::CONTENT_RANGE, format!("bytes */{total}"))], - ) - .into_response(); - } - (s, e, e - s + 1, true) - } - Some((None, Some(suffix))) => { - let s = total.saturating_sub(suffix); - let e = total.saturating_sub(1); - (s, e, total.saturating_sub(s), true) - } - _ => (0, total.saturating_sub(1), total, false), - }; - - if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(); - } - - debug!( - message = "Served /item", - source = params.source, - key = params.key, - time_ms = start.elapsed().as_millis() - ); - - let (tx, rx) = mpsc::channel::, std::io::Error>>(8); - - tokio::spawn(async move { - let mut buf = vec![0u8; 65536]; - let mut remaining = length; - loop { - if remaining == 0 { - break; - } - let to_read = (buf.len() as u64).min(remaining) as usize; - match reader.read(&mut buf[..to_read]).await { - Ok(0) => break, - Ok(n) => { - remaining -= n as u64; - if tx.send(Ok(buf[..n].to_vec())).await.is_err() { - break; - } - } - Err(e) => { - let _ = tx.send(Err(e)).await; - break; - } - } - } - }); - - let body = Body::from_stream(ReceiverStream::new(rx)); - let status = if is_range { - StatusCode::PARTIAL_CONTENT - } else { - StatusCode::OK - }; - - let disposition_type = if params.download { - "attachment" - } else { - "inline" - }; - let file_name = params.name.unwrap_or_else(|| { - params - .key - .rsplit('/') - .next() - .unwrap_or(¶ms.key) - .to_owned() - }); - let disposition = format!("{disposition_type}; filename=\"{file_name}\""); - - let mut builder = axum::http::Response::builder() - .status(status) - .header(header::CONTENT_TYPE, mime) - .header(header::ACCEPT_RANGES, "bytes") - .header(header::CONTENT_LENGTH, length) - .header(header::CONTENT_DISPOSITION, disposition); - - if is_range { - builder = builder.header( - header::CONTENT_RANGE, - format!("bytes {byte_start}-{byte_end}/{total}"), - ); - } - - builder - .body(body) - .map(IntoResponse::into_response) - .unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response()) -} diff --git a/crates/pile-dataset/src/serve/mod.rs b/crates/pile-dataset/src/serve/mod.rs index db4b093..a3740a9 100644 --- a/crates/pile-dataset/src/serve/mod.rs +++ b/crates/pile-dataset/src/serve/mod.rs @@ -11,9 +11,6 @@ use crate::Datasets; mod lookup; pub use lookup::*; -mod item; -pub use item::*; - mod extract; pub use extract::*; @@ -34,7 +31,6 @@ pub use schema::*; tags(), paths( lookup, - item_get, get_extract, items_list, config_schema, @@ -45,7 +41,6 @@ pub use schema::*; LookupRequest, LookupResponse, LookupResult, - ItemQuery, ExtractQuery, ItemsQuery, ItemsResponse, @@ -64,7 +59,6 @@ impl Datasets { pub fn router_prefix(self: Arc, with_docs: bool, prefix: Option<&str>) -> Router<()> { let mut router = Router::new() .route("/lookup", post(lookup)) - .route("/item", get(item_get)) .route("/extract", get(get_extract)) .route("/items", get(items_list)) .route("/config/schema", get(config_schema)) diff --git a/crates/pile-dataset/src/serve/schema.rs b/crates/pile-dataset/src/serve/schema.rs index 3283018..ed7e6c8 100644 --- a/crates/pile-dataset/src/serve/schema.rs +++ b/crates/pile-dataset/src/serve/schema.rs @@ -44,9 +44,9 @@ async fn pile_value_to_api( PileValue::I64(n) => Ok(ApiValue::Number(n.into())), PileValue::Null => Ok(ApiValue::Null), - PileValue::Blob { mime, .. } => Ok(ApiValue::Binary { + PileValue::Binary(x) => Ok(ApiValue::Binary { binary: true, - mime: mime.to_string(), + mime: x.mime().to_string(), }), PileValue::Array(arr) => { diff --git a/crates/pile-dataset/src/serve/schema_field.rs b/crates/pile-dataset/src/serve/schema_field.rs index cd1b5bf..14255da 100644 --- a/crates/pile-dataset/src/serve/schema_field.rs +++ b/crates/pile-dataset/src/serve/schema_field.rs @@ -1,13 +1,18 @@ use axum::{ Json, + body::Body, extract::{Path, Query, State}, http::{StatusCode, header}, response::{IntoResponse, Response}, }; use pile_config::Label; -use pile_value::{extract::traits::ExtractState, value::PileValue}; +use pile_value::{ + extract::traits::ExtractState, + value::{BinaryPileValue, PileValue}, +}; use serde::Deserialize; use std::{sync::Arc, time::Instant}; +use tokio_util::io::ReaderStream; use tracing::debug; use utoipa::IntoParams; @@ -125,15 +130,30 @@ pub async fn schema_field( s.to_string(), ) .into_response(), - PileValue::Blob { mime, bytes } => ( - StatusCode::OK, - [ - (header::CONTENT_TYPE, mime.to_string()), - (header::CONTENT_DISPOSITION, disposition), - ], - bytes.as_ref().clone(), - ) - .into_response(), + + PileValue::Binary(binary) => { + let mime = binary.mime().to_string(); + let body = match binary { + BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()), + BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await { + Ok(file) => Body::from_stream(ReaderStream::new(file)), + Err(e) => { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")) + .into_response(); + } + }, + }; + ( + StatusCode::OK, + [ + (header::CONTENT_TYPE, mime), + (header::CONTENT_DISPOSITION, disposition), + ], + body, + ) + .into_response() + } + _ => match value.to_json(&extract_state).await { Ok(json) => ( StatusCode::OK, diff --git a/crates/pile-value/src/extract/item/epub/epub_cover.rs b/crates/pile-value/src/extract/blob/epub/epub_cover.rs similarity index 80% rename from crates/pile-value/src/extract/item/epub/epub_cover.rs rename to crates/pile-value/src/extract/blob/epub/epub_cover.rs index 2623309..5bdcd8c 100644 --- a/crates/pile-value/src/extract/item/epub/epub_cover.rs +++ b/crates/pile-value/src/extract/blob/epub/epub_cover.rs @@ -6,16 +6,16 @@ use tracing::trace; use crate::{ extract::traits::ExtractState, - value::{Item, PileValue}, + value::{ArcBytes, BinaryPileValue, PileValue}, }; pub struct EpubCoverExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock)>>, } impl EpubCoverExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -51,7 +51,7 @@ impl EpubCoverExtractor { Err(error) => match error.downcast::() { Ok(x) => return Err(x), Err(error) => { - trace!(message = "Could not extract epub cover", ?error, key = ?self.item.key()); + trace!(message = "Could not extract epub cover", ?error, item = ?self.item); None } }, @@ -65,12 +65,11 @@ impl EpubCoverExtractor { return Ok(None); } - Ok(self - .get_inner() - .await? - .map(|(mime, bytes)| PileValue::Blob { + Ok(self.get_inner().await?.map(|(mime, bytes)| { + PileValue::Binary(BinaryPileValue::Blob { mime: mime.clone(), - bytes: Arc::new(bytes.clone()), - })) + bytes: ArcBytes(Arc::new(bytes.clone())), + }) + })) } } diff --git a/crates/pile-value/src/extract/item/epub/epub_meta.rs b/crates/pile-value/src/extract/blob/epub/epub_meta.rs similarity index 92% rename from crates/pile-value/src/extract/item/epub/epub_meta.rs rename to crates/pile-value/src/extract/blob/epub/epub_meta.rs index a8d1887..2ca4d3e 100644 --- a/crates/pile-value/src/extract/item/epub/epub_meta.rs +++ b/crates/pile-value/src/extract/blob/epub/epub_meta.rs @@ -9,16 +9,16 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct EpubMetaExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl EpubMetaExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -57,7 +57,7 @@ impl EpubMetaExtractor { Err(error) => match error.downcast::() { Ok(x) => return Err(x), Err(error) => { - trace!(message = "Could not process epub", ?error, key = ?self.item.key()); + trace!(message = "Could not process epub", ?error, item = ?self.item); return Ok(self.output.get_or_init(HashMap::new)); } }, diff --git a/crates/pile-value/src/extract/item/epub/epub_text.rs b/crates/pile-value/src/extract/blob/epub/epub_text.rs similarity index 92% rename from crates/pile-value/src/extract/item/epub/epub_text.rs rename to crates/pile-value/src/extract/blob/epub/epub_text.rs index 91292bb..cf2b53a 100644 --- a/crates/pile-value/src/extract/item/epub/epub_text.rs +++ b/crates/pile-value/src/extract/blob/epub/epub_text.rs @@ -9,16 +9,16 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct EpubTextExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl EpubTextExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -54,7 +54,7 @@ impl EpubTextExtractor { Err(error) => match error.downcast::() { Ok(x) => return Err(x), Err(error) => { - trace!(message = "Could not process epub", ?error, key = ?self.item.key()); + trace!(message = "Could not process epub", ?error, item = ?self.item); return Ok(self.output.get_or_init(HashMap::new)); } }, diff --git a/crates/pile-value/src/extract/item/epub/mod.rs b/crates/pile-value/src/extract/blob/epub/mod.rs similarity index 87% rename from crates/pile-value/src/extract/item/epub/mod.rs rename to crates/pile-value/src/extract/blob/epub/mod.rs index d81e129..4a7a22b 100644 --- a/crates/pile-value/src/extract/item/epub/mod.rs +++ b/crates/pile-value/src/extract/blob/epub/mod.rs @@ -12,7 +12,7 @@ pub use epub_text::*; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct EpubExtractor { @@ -22,7 +22,7 @@ pub struct EpubExtractor { } impl EpubExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { text: Arc::new(EpubTextExtractor::new(item)), meta: Arc::new(EpubMetaExtractor::new(item)), @@ -87,9 +87,13 @@ impl ObjectExtractor for EpubExtractor { if k.as_str() == "cover" { let summary = match &v { - PileValue::Blob { mime, bytes } => { - format!("", mime, bytes.len()) + PileValue::Binary(BinaryPileValue::Blob { mime, bytes }) => { + format!("", bytes.0.len()) } + PileValue::Binary(BinaryPileValue::File { mime, .. }) => { + format!("") + } + PileValue::Null => "".to_owned(), _ => "".to_owned(), }; diff --git a/crates/pile-value/src/extract/item/exif.rs b/crates/pile-value/src/extract/blob/exif.rs similarity index 92% rename from crates/pile-value/src/extract/item/exif.rs rename to crates/pile-value/src/extract/blob/exif.rs index 5c95487..650f316 100644 --- a/crates/pile-value/src/extract/item/exif.rs +++ b/crates/pile-value/src/extract/blob/exif.rs @@ -9,16 +9,16 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct ExifExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl ExifExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -53,7 +53,7 @@ impl ExifExtractor { Ok(x) => x, Err(exif::Error::Io(x)) => return Err(x), Err(error) => { - trace!(message = "Could not process exif", ?error, key = ?self.item.key()); + trace!(message = "Could not process exif", ?error, item = ?self.item); return Ok(self.output.get_or_init(HashMap::new)); } }; @@ -94,7 +94,7 @@ impl ObjectExtractor for ExifExtractor { ) -> Result, std::io::Error> { trace!( ?args, - key = self.item.key().as_str(), + item = ?self.item, "Getting field {name:?} from ExifExtractor", ); diff --git a/crates/pile-value/src/extract/item/flac.rs b/crates/pile-value/src/extract/blob/flac.rs similarity index 89% rename from crates/pile-value/src/extract/item/flac.rs rename to crates/pile-value/src/extract/blob/flac.rs index 21845f9..c56f272 100644 --- a/crates/pile-value/src/extract/item/flac.rs +++ b/crates/pile-value/src/extract/blob/flac.rs @@ -11,16 +11,16 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ListExtractor, ObjectExtractor}, - value::{Item, PileValue}, + value::{ArcBytes, BinaryPileValue, PileValue}, }; pub struct FlacImagesExtractor { - item: Item, + item: BinaryPileValue, cached_count: OnceLock, } impl FlacImagesExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), cached_count: OnceLock::new(), @@ -65,7 +65,7 @@ impl ListExtractor for FlacImagesExtractor { mut idx: usize, ) -> Result, std::io::Error> { trace!( - key = self.item.key().as_str(), + item = ?self.item, "Getting index {idx} from FlacImagesExtractor", ); @@ -73,7 +73,7 @@ impl ListExtractor for FlacImagesExtractor { return Ok(None); } - let key = self.item.key(); + let item = self.item.clone(); let reader = SyncReadBridge::new_current(self.item.read().await?); let image = tokio::task::spawn_blocking(move || { let reader = FlacReader::new(BufReader::new(reader)); @@ -93,11 +93,7 @@ impl ListExtractor for FlacImagesExtractor { Err(FlacDecodeError::IoError(err)) => return Err(err), Err(error) => { - trace!( - message = "Could not parse FLAC images", - key = key.as_str(), - ?error - ); + trace!(message = "Could not parse FLAC images", ?item, ?error); return Ok(None); } @@ -109,9 +105,11 @@ impl ListExtractor for FlacImagesExtractor { .await .map_err(std::io::Error::other)??; - Ok(image.map(|(mime, data)| PileValue::Blob { - mime, - bytes: Arc::new(data), + Ok(image.map(|(mime, data)| { + PileValue::Binary(BinaryPileValue::Blob { + mime, + bytes: ArcBytes(Arc::new(data)), + }) })) } @@ -130,13 +128,13 @@ impl ListExtractor for FlacImagesExtractor { } pub struct FlacExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, images: PileValue, } impl FlacExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -149,12 +147,9 @@ impl FlacExtractor { return Ok(x); } - trace!( - message = "Reading FLAC tags", - key = self.item.key().as_str() - ); + trace!(message = "Reading FLAC tags", item = ?self.item); - let key = self.item.key(); + let item = self.item.clone(); let reader = SyncReadBridge::new_current(self.item.read().await?); let output = tokio::task::spawn_blocking(move || { let reader = FlacReader::new(BufReader::new(reader)); @@ -176,11 +171,7 @@ impl FlacExtractor { Err(FlacDecodeError::IoError(err)) => return Err(err), Err(error) => { - trace!( - message = "Could not parse FLAC metadata", - key = key.as_str(), - ?error - ); + trace!(message = "Could not parse FLAC metadata", ?item, ?error); return Ok(HashMap::new()); } diff --git a/crates/pile-value/src/extract/item/fs.rs b/crates/pile-value/src/extract/blob/fs.rs similarity index 89% rename from crates/pile-value/src/extract/item/fs.rs rename to crates/pile-value/src/extract/blob/fs.rs index 65afa76..7c8695e 100644 --- a/crates/pile-value/src/extract/item/fs.rs +++ b/crates/pile-value/src/extract/blob/fs.rs @@ -1,21 +1,21 @@ use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; use pile_config::Label; use std::{ collections::HashMap, - path::{Component, PathBuf}, + path::Component, sync::{Arc, OnceLock}, }; pub struct FsExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl FsExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -27,7 +27,10 @@ impl FsExtractor { return Ok(x); } - let path = PathBuf::from(self.item.key().as_str()); + let path = match &self.item { + BinaryPileValue::File { path, .. } => path, + _ => return Ok(self.output.get_or_init(HashMap::new)), + }; let mut root = false; let components = path diff --git a/crates/pile-value/src/extract/item/id3.rs b/crates/pile-value/src/extract/blob/id3.rs similarity index 92% rename from crates/pile-value/src/extract/item/id3.rs rename to crates/pile-value/src/extract/blob/id3.rs index 653ba06..eab6b2f 100644 --- a/crates/pile-value/src/extract/item/id3.rs +++ b/crates/pile-value/src/extract/blob/id3.rs @@ -11,16 +11,16 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct Id3Extractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl Id3Extractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -32,9 +32,9 @@ impl Id3Extractor { return Ok(x); } - trace!(message = "Reading id3 tags", key = self.item.key().as_str()); + trace!(message = "Reading id3 tags", key = ?self.item); - let key = self.item.key(); + let item = self.item.clone(); let reader = SyncReadBridge::new_current(self.item.read().await?); let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader))) .await @@ -48,11 +48,7 @@ impl Id3Extractor { })) => return Err(e), Ok(Err(error)) => { - trace!( - message = "Could not parse id3 tags", - key = key.as_str(), - ?error - ); + trace!(message = "Could not parse id3 tags", ?item, ?error); return Ok(self.output.get_or_init(HashMap::new)); } }; diff --git a/crates/pile-value/src/extract/item/image/mod.rs b/crates/pile-value/src/extract/blob/image/mod.rs similarity index 59% rename from crates/pile-value/src/extract/item/image/mod.rs rename to crates/pile-value/src/extract/blob/image/mod.rs index 526b093..42a5a9f 100644 --- a/crates/pile-value/src/extract/item/image/mod.rs +++ b/crates/pile-value/src/extract/blob/image/mod.rs @@ -1,63 +1,25 @@ -mod transform; -use transform::{CropTransformer, ImageTransformer, MaxDimTransformer}; - use image::ImageFormat; use mime::Mime; use pile_config::Label; use pile_io::AsyncReader; -use std::{ - io::Cursor, - str::FromStr, - sync::{Arc, OnceLock}, -}; +use std::{io::Cursor, str::FromStr, sync::Arc}; use tracing::trace; +use transform::{CropTransformer, ImageTransformer, MaxDimTransformer}; + +mod transform; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{ArcBytes, BinaryPileValue, PileValue}, }; -enum ImageSource { - Item(Item, OnceLock>>), - Blob(Arc>, Mime), -} - pub struct ImageExtractor { - source: ImageSource, + item: BinaryPileValue, } impl ImageExtractor { - pub fn new(item: &Item) -> Self { - Self { - source: ImageSource::Item(item.clone(), OnceLock::new()), - } - } - - pub fn from_blob(bytes: Arc>, mime: Mime) -> Self { - Self { - source: ImageSource::Blob(bytes, mime), - } - } - - fn mime(&self) -> &Mime { - match &self.source { - ImageSource::Item(item, _) => item.mime(), - ImageSource::Blob(_, mime) => mime, - } - } - - async fn read_bytes(&self) -> Result>, std::io::Error> { - match &self.source { - ImageSource::Blob(bytes, _) => Ok(bytes.clone()), - ImageSource::Item(item, cache) => { - if let Some(x) = cache.get() { - return Ok(x.clone()); - } - let mut reader = item.read().await?; - let bytes = reader.read_to_end().await?; - Ok(cache.get_or_init(|| Arc::new(bytes)).clone()) - } - } + pub fn new(item: &BinaryPileValue) -> Self { + Self { item: item.clone() } } async fn apply( @@ -69,11 +31,14 @@ impl ImageExtractor { Err(_) => return Ok(None), }; - let mime = self.mime().clone(); - let bytes = self.read_bytes().await?; + let mime = self.item.mime().clone(); + let bytes = self.item.read().await?.read_to_end().await?; let Some(format) = ImageFormat::from_mime_type(&mime) else { - return Ok(Some(PileValue::Blob { mime, bytes })); + return Ok(Some(PileValue::Binary(BinaryPileValue::Blob { + mime, + bytes: ArcBytes(Arc::new(bytes)), + }))); }; let bytes_for_closure = bytes.clone(); @@ -91,11 +56,15 @@ impl ImageExtractor { .await?; match result { - Ok((out_mime, out_bytes)) => Ok(Some(PileValue::Blob { + Ok((out_mime, out_bytes)) => Ok(Some(PileValue::Binary(BinaryPileValue::Blob { mime: out_mime, - bytes: Arc::new(out_bytes), - })), - Err(_) => Ok(Some(PileValue::Blob { mime, bytes })), + bytes: ArcBytes(Arc::new(out_bytes)), + }))), + + Err(_) => Ok(Some(PileValue::Binary(BinaryPileValue::Blob { + mime, + bytes: ArcBytes(Arc::new(bytes)), + }))), } } } diff --git a/crates/pile-value/src/extract/item/image/transform/mod.rs b/crates/pile-value/src/extract/blob/image/transform/mod.rs similarity index 100% rename from crates/pile-value/src/extract/item/image/transform/mod.rs rename to crates/pile-value/src/extract/blob/image/transform/mod.rs diff --git a/crates/pile-value/src/extract/item/image/transform/pixeldim.rs b/crates/pile-value/src/extract/blob/image/transform/pixeldim.rs similarity index 100% rename from crates/pile-value/src/extract/item/image/transform/pixeldim.rs rename to crates/pile-value/src/extract/blob/image/transform/pixeldim.rs diff --git a/crates/pile-value/src/extract/item/image/transform/transformers/crop.rs b/crates/pile-value/src/extract/blob/image/transform/transformers/crop.rs similarity index 100% rename from crates/pile-value/src/extract/item/image/transform/transformers/crop.rs rename to crates/pile-value/src/extract/blob/image/transform/transformers/crop.rs diff --git a/crates/pile-value/src/extract/item/image/transform/transformers/maxdim.rs b/crates/pile-value/src/extract/blob/image/transform/transformers/maxdim.rs similarity index 100% rename from crates/pile-value/src/extract/item/image/transform/transformers/maxdim.rs rename to crates/pile-value/src/extract/blob/image/transform/transformers/maxdim.rs diff --git a/crates/pile-value/src/extract/item/image/transform/transformers/mod.rs b/crates/pile-value/src/extract/blob/image/transform/transformers/mod.rs similarity index 100% rename from crates/pile-value/src/extract/item/image/transform/transformers/mod.rs rename to crates/pile-value/src/extract/blob/image/transform/transformers/mod.rs diff --git a/crates/pile-value/src/extract/item/json.rs b/crates/pile-value/src/extract/blob/json.rs similarity index 95% rename from crates/pile-value/src/extract/item/json.rs rename to crates/pile-value/src/extract/blob/json.rs index a9898ba..e4af3e2 100644 --- a/crates/pile-value/src/extract/item/json.rs +++ b/crates/pile-value/src/extract/blob/json.rs @@ -7,7 +7,7 @@ use std::{ use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; fn json_to_pile(value: serde_json::Value) -> PileValue { @@ -24,12 +24,12 @@ fn json_to_pile(value: serde_json::Value) -> PileValue { } pub struct JsonExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl JsonExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), diff --git a/crates/pile-value/src/extract/item/mod.rs b/crates/pile-value/src/extract/blob/mod.rs similarity index 88% rename from crates/pile-value/src/extract/item/mod.rs rename to crates/pile-value/src/extract/blob/mod.rs index a69b698..5430813 100644 --- a/crates/pile-value/src/extract/item/mod.rs +++ b/crates/pile-value/src/extract/blob/mod.rs @@ -25,9 +25,6 @@ mod toml; use pile_config::Label; pub use toml::*; -mod group; -pub use group::*; - mod text; pub use text::*; @@ -39,17 +36,17 @@ use crate::{ misc::MapExtractor, traits::{ExtractState, ObjectExtractor}, }, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; -pub struct ItemExtractor { +pub struct BinaryExtractor { inner: MapExtractor, image: Arc, } -impl ItemExtractor { +impl BinaryExtractor { #[expect(clippy::unwrap_used)] - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { let inner = MapExtractor { inner: HashMap::from([ ( @@ -88,10 +85,6 @@ impl ItemExtractor { Label::new("text").unwrap(), PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))), ), - ( - Label::new("groups").unwrap(), - PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))), - ), ]), }; @@ -103,7 +96,7 @@ impl ItemExtractor { } #[async_trait::async_trait] -impl ObjectExtractor for ItemExtractor { +impl ObjectExtractor for BinaryExtractor { async fn field( &self, state: &ExtractState, diff --git a/crates/pile-value/src/extract/item/pdf/mod.rs b/crates/pile-value/src/extract/blob/pdf/mod.rs similarity index 95% rename from crates/pile-value/src/extract/item/pdf/mod.rs rename to crates/pile-value/src/extract/blob/pdf/mod.rs index cdb23a4..f569542 100644 --- a/crates/pile-value/src/extract/item/pdf/mod.rs +++ b/crates/pile-value/src/extract/blob/pdf/mod.rs @@ -15,7 +15,7 @@ pub use pdf_text::*; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct PdfExtractor { @@ -26,7 +26,7 @@ pub struct PdfExtractor { } impl PdfExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { text: Arc::new(PdfTextExtractor::new(item)), meta: Arc::new(PdfMetaExtractor::new(item)), @@ -46,7 +46,7 @@ impl ObjectExtractor for PdfExtractor { ) -> Result, std::io::Error> { trace!( ?args, - key = self.text.item.key().as_str(), + item = ?self.text.item, "Getting field {name:?} from PdfExtractor", ); diff --git a/crates/pile-value/src/extract/item/pdf/pdf_meta.rs b/crates/pile-value/src/extract/blob/pdf/pdf_meta.rs similarity index 94% rename from crates/pile-value/src/extract/item/pdf/pdf_meta.rs rename to crates/pile-value/src/extract/blob/pdf/pdf_meta.rs index 221033a..09a1188 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_meta.rs +++ b/crates/pile-value/src/extract/blob/pdf/pdf_meta.rs @@ -9,18 +9,19 @@ use std::{ }; use tracing::trace; +use crate::value::BinaryPileValue; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::PileValue, }; pub struct PdfMetaExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl PdfMetaExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -82,7 +83,7 @@ impl PdfMetaExtractor { let (page_count, raw_meta) = match raw_meta { Ok(x) => x, Err(error) => { - trace!(message = "Could not process pdf", ?error, key = ?self.item.key()); + trace!(message = "Could not process pdf", ?error, item = ?self.item); return Ok(self.output.get_or_init(HashMap::new)); } }; diff --git a/crates/pile-value/src/extract/item/pdf/pdf_pages.rs b/crates/pile-value/src/extract/blob/pdf/pdf_pages.rs similarity index 88% rename from crates/pile-value/src/extract/item/pdf/pdf_pages.rs rename to crates/pile-value/src/extract/blob/pdf/pdf_pages.rs index 6c9101e..f067716 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_pages.rs +++ b/crates/pile-value/src/extract/blob/pdf/pdf_pages.rs @@ -9,15 +9,15 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ListExtractor}, - value::{Item, PileValue}, + value::{ArcBytes, BinaryPileValue, PileValue}, }; pub struct PdfPagesExtractor { - item: Item, + item: BinaryPileValue, } impl PdfPagesExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone() } } @@ -41,7 +41,7 @@ impl ListExtractor for PdfPagesExtractor { idx: usize, ) -> Result, std::io::Error> { trace!( - key = self.item.key().as_str(), + item = ?self.item, "Getting index {idx} from PdfPagesExtractor", ); @@ -78,12 +78,12 @@ impl ListExtractor for PdfPagesExtractor { let value = match png { Ok(None) => return Ok(None), - Ok(Some(bytes)) => PileValue::Blob { + Ok(Some(bytes)) => PileValue::Binary(BinaryPileValue::Blob { mime: mime::IMAGE_PNG, - bytes: Arc::new(bytes), - }, + bytes: ArcBytes(Arc::new(bytes)), + }), Err(error) => { - trace!(message = "Could not render pdf page", ?error, idx, key = ?self.item.key()); + trace!(message = "Could not render pdf page", ?error, idx, item = ?self.item); PileValue::Null } }; @@ -108,7 +108,7 @@ impl ListExtractor for PdfPagesExtractor { match count { Ok(n) => Ok(n), Err(error) => { - trace!(message = "Could not read pdf page count", ?error, key = ?self.item.key()); + trace!(message = "Could not read pdf page count", ?error, item = ?self.item); Ok(0) } } diff --git a/crates/pile-value/src/extract/item/pdf/pdf_text.rs b/crates/pile-value/src/extract/blob/pdf/pdf_text.rs similarity index 93% rename from crates/pile-value/src/extract/item/pdf/pdf_text.rs rename to crates/pile-value/src/extract/blob/pdf/pdf_text.rs index b90f244..d6e5648 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_text.rs +++ b/crates/pile-value/src/extract/blob/pdf/pdf_text.rs @@ -9,18 +9,19 @@ use std::{ }; use tracing::trace; +use crate::value::BinaryPileValue; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::PileValue, }; pub struct PdfTextExtractor { - pub(super) item: Item, + pub(super) item: BinaryPileValue, output: OnceLock>, } impl PdfTextExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), @@ -86,7 +87,7 @@ impl PdfTextExtractor { let raw_text = match raw_text { Ok(x) => x, Err(error) => { - trace!(message = "Could not process pdf", ?error, key = ?self.item.key()); + trace!(message = "Could not process pdf", ?error, item = ?self.item); return Ok(self.output.get_or_init(HashMap::new)); } }; diff --git a/crates/pile-value/src/extract/item/text.rs b/crates/pile-value/src/extract/blob/text.rs similarity index 92% rename from crates/pile-value/src/extract/item/text.rs rename to crates/pile-value/src/extract/blob/text.rs index 772646e..bf362b5 100644 --- a/crates/pile-value/src/extract/item/text.rs +++ b/crates/pile-value/src/extract/blob/text.rs @@ -4,16 +4,16 @@ use std::sync::{Arc, OnceLock}; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; pub struct TextExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock, } impl TextExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), diff --git a/crates/pile-value/src/extract/item/toml.rs b/crates/pile-value/src/extract/blob/toml.rs similarity index 95% rename from crates/pile-value/src/extract/item/toml.rs rename to crates/pile-value/src/extract/blob/toml.rs index a11c1ae..26ce152 100644 --- a/crates/pile-value/src/extract/item/toml.rs +++ b/crates/pile-value/src/extract/blob/toml.rs @@ -7,7 +7,7 @@ use std::{ use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, + value::{BinaryPileValue, PileValue}, }; fn toml_to_pile(value: toml::Value) -> PileValue { @@ -25,12 +25,12 @@ fn toml_to_pile(value: toml::Value) -> PileValue { } pub struct TomlExtractor { - item: Item, + item: BinaryPileValue, output: OnceLock>, } impl TomlExtractor { - pub fn new(item: &Item) -> Self { + pub fn new(item: &BinaryPileValue) -> Self { Self { item: item.clone(), output: OnceLock::new(), diff --git a/crates/pile-value/src/extract/item.rs b/crates/pile-value/src/extract/item.rs new file mode 100644 index 0000000..ed180b5 --- /dev/null +++ b/crates/pile-value/src/extract/item.rs @@ -0,0 +1,58 @@ +use std::{collections::HashMap, sync::Arc}; + +use pile_config::Label; + +use crate::{ + extract::{ + misc::MapExtractor, + traits::{ExtractState, ObjectExtractor}, + }, + value::{Item, PileValue}, +}; + +pub struct ItemExtractor { + inner: MapExtractor, +} + +impl ItemExtractor { + pub fn new(item: &Item) -> Self { + let files = { + let Item::File { files, .. } = &item; + let mut inner = HashMap::new(); + for f in files { + inner.insert(f.0.clone(), f.1.clone()); + } + PileValue::ObjectExtractor(Arc::new(MapExtractor { inner })) + }; + + #[expect(clippy::unwrap_used)] + let inner = MapExtractor { + inner: HashMap::from([ + (Label::new("files").unwrap(), files), + ( + Label::new("key").unwrap(), + PileValue::String(Arc::new(item.key())), + ), + ]), + }; + + Self { inner } + } +} + +#[async_trait::async_trait] +impl ObjectExtractor for ItemExtractor { + async fn field( + &self, + state: &ExtractState, + name: &pile_config::Label, + args: Option<&str>, + ) -> Result, std::io::Error> { + self.inner.field(state, name, args).await + } + + async fn fields(&self) -> Result, std::io::Error> { + let fields = self.inner.fields().await?; + Ok(fields) + } +} diff --git a/crates/pile-value/src/extract/item/group.rs b/crates/pile-value/src/extract/item/group.rs deleted file mode 100644 index 294c2a2..0000000 --- a/crates/pile-value/src/extract/item/group.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::sync::Arc; - -use pile_config::Label; - -use crate::{ - extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue}, -}; - -pub struct GroupExtractor { - item: Item, -} - -impl GroupExtractor { - pub fn new(item: &Item) -> Self { - Self { item: item.clone() } - } -} - -#[async_trait::async_trait] -impl ObjectExtractor for GroupExtractor { - async fn field( - &self, - _state: &ExtractState, - name: &Label, - args: Option<&str>, - ) -> Result, std::io::Error> { - if args.is_some() { - return Ok(None); - } - Ok(self - .item - .group() - .get(name) - .map(|item| PileValue::ObjectExtractor(Arc::new(super::ItemExtractor::new(item))))) - } - - async fn fields(&self) -> Result, std::io::Error> { - Ok(self.item.group().keys().cloned().collect()) - } - - async fn to_json(&self, _state: &ExtractState) -> Result { - Ok(serde_json::Value::Object( - self.item - .group() - .iter() - .map(|(k, v)| { - ( - k.to_string(), - serde_json::Value::String(format!("", v.key())), - ) - }) - .collect(), - )) - } -} diff --git a/crates/pile-value/src/extract/mod.rs b/crates/pile-value/src/extract/mod.rs index be4e103..4f3efbc 100644 --- a/crates/pile-value/src/extract/mod.rs +++ b/crates/pile-value/src/extract/mod.rs @@ -1,3 +1,4 @@ +pub mod blob; pub mod item; pub mod misc; pub mod regex; diff --git a/crates/pile-value/src/source/dir.rs b/crates/pile-value/src/source/dir.rs index db253bc..146b2ca 100644 --- a/crates/pile-value/src/source/dir.rs +++ b/crates/pile-value/src/source/dir.rs @@ -1,27 +1,25 @@ use chrono::{DateTime, Utc}; -use pile_config::{ - Label, - pattern::{GroupPattern, GroupSegment}, -}; +use pile_config::Label; +use regex::Regex; use smartstring::{LazyCompact, SmartString}; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, path::PathBuf, sync::{Arc, OnceLock}, }; use walkdir::WalkDir; use crate::{ - extract::traits::ExtractState, source::{DataSource, misc::path_ts_latest}, - value::{Item, PileValue}, + value::{BinaryPileValue, Item, PileValue}, }; #[derive(Debug)] pub struct DirDataSource { pub name: Label, pub dir: PathBuf, - pub pattern: GroupPattern, + pub base_pattern: Regex, + pub files: HashMap, pub index: OnceLock, Item>>, } @@ -29,21 +27,18 @@ impl DirDataSource { pub async fn new( name: &Label, dir: PathBuf, - pattern: GroupPattern, + base_pattern: Regex, + files: HashMap, ) -> Result, std::io::Error> { let source = Arc::new(Self { name: name.clone(), dir, - pattern, + base_pattern, + files, index: OnceLock::new(), }); - // - // MARK: list paths - // - - let mut paths_items = HashSet::new(); - let mut paths_grouped_items = HashSet::new(); + let mut index = BTreeMap::new(); 'entry: for entry in WalkDir::new(&source.dir) { let entry = match entry { Err(e) => { @@ -59,51 +54,52 @@ impl DirDataSource { } let path = entry.into_path(); - let path_str = match path.to_str() { + let rel_path = match path.strip_prefix(&source.dir) { + Ok(p) => p, + Err(_) => continue 'entry, + }; + let path_str = match rel_path.to_str() { Some(x) => x, None => continue 'entry, }; - let groups = resolve_groups(&source.pattern, path_str).await; - paths_grouped_items.extend(groups.into_values()); - paths_items.insert(path); - } - - // - // MARK: resolve groups - // - - let mut index = BTreeMap::new(); - 'entry: for path in paths_items.difference(&paths_grouped_items) { - let path_str = match path.to_str() { - Some(x) => x, + let captures = match source.base_pattern.captures(path_str) { + Some(c) => c, + None => continue 'entry, + }; + let base = match captures.get(1) { + Some(m) => m.as_str(), None => continue 'entry, }; - let group = resolve_groups(&source.pattern, path_str).await; - let group = group - .into_iter() - .map(|(k, group_path)| { - ( - k, - Box::new(Item::File { - source: Arc::clone(&source), - mime: mime_guess::from_path(&group_path).first_or_octet_stream(), - path: group_path.clone(), - group: Arc::new(HashMap::new()), + let key: SmartString = base.into(); + if index.contains_key(&key) { + continue 'entry; + } + + let mut item_files = HashMap::new(); + for (label, template) in &source.files { + let file_path = source.dir.join(template.replace("{base}", base)); + if file_path.exists() { + let mime = mime_guess::from_path(&file_path).first_or_octet_stream(); + item_files.insert( + label.clone(), + PileValue::Binary(BinaryPileValue::File { + mime, + path: file_path, }), - ) - }) - .collect::>(); + ); + } + } - let item = Item::File { - source: Arc::clone(&source), - mime: mime_guess::from_path(path).first_or_octet_stream(), - path: path.into(), - group: Arc::new(group), - }; - - index.insert(item.key(), item); + index.insert( + key.clone(), + Item::File { + key, + source: Arc::clone(&source), + files: item_files, + }, + ); } source.index.get_or_init(|| index); @@ -139,43 +135,3 @@ impl DataSource for Arc { path_ts_latest(&self.dir) } } - -async fn resolve_groups(pattern: &GroupPattern, path_str: &str) -> HashMap { - let state = ExtractState { ignore_mime: false }; - let mut group = HashMap::new(); - 'pattern: for (l, pat) in &pattern.pattern { - let item = PileValue::String(Arc::new(path_str.into())); - let mut target = String::new(); - for p in pat { - match p { - GroupSegment::Literal(x) => target.push_str(x), - GroupSegment::Path(op) => { - let res = match item.query(&state, op).await { - Ok(Some(x)) => x, - _ => continue 'pattern, - }; - - let res = match res.as_str() { - Some(x) => x, - None => continue 'pattern, - }; - - target.push_str(res); - } - } - } - - let group_path: PathBuf = match target.parse() { - Ok(x) => x, - Err(_) => continue 'pattern, - }; - - if !group_path.exists() { - continue; - } - - group.insert(l.clone(), group_path); - } - - return group; -} diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs index af4cbad..330ace4 100644 --- a/crates/pile-value/src/value/item.rs +++ b/crates/pile-value/src/value/item.rs @@ -1,74 +1,45 @@ -use mime::Mime; use pile_config::Label; -use pile_io::SyncReadBridge; use smartstring::{LazyCompact, SmartString}; -use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; -use crate::{source::DirDataSource, value::ItemReader}; +use crate::{source::DirDataSource, value::PileValue}; // // MARK: item // /// A cheaply-cloneable pointer to an item in a dataset -#[derive(Debug, Clone)] +#[derive(Clone)] pub enum Item { File { + key: SmartString, source: Arc, - mime: Mime, - - path: PathBuf, - group: Arc>>, + files: HashMap, }, } -impl Item { - /// Open the item for reading. - pub async fn read(&self) -> Result { - Ok(match self { - Self::File { path, .. } => ItemReader::File(File::open(path)?), - }) +impl std::fmt::Debug for Item { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::File { key, files, .. } => f + .debug_struct("Item::File") + .field("key", key) + .field("files", &files.keys().collect::>()) + .finish(), + } } +} +impl Item { pub fn source_name(&self) -> &pile_config::Label { match self { Self::File { source, .. } => &source.name, } } - #[expect(clippy::expect_used)] pub fn key(&self) -> SmartString { match self { - Self::File { source, path, .. } => path - .strip_prefix(&source.dir) - .expect("item must be inside source") - .to_str() - .expect("path is not utf-8") - .into(), - } - } - - pub async fn hash(&self) -> Result { - let read = self.read().await?; - let mut read = SyncReadBridge::new_current(read); - let out = tokio::task::spawn_blocking(move || { - let mut hasher = blake3::Hasher::new(); - std::io::copy(&mut read, &mut hasher)?; - return Ok::<_, std::io::Error>(hasher.finalize()); - }) - .await??; - return Ok(out); - } - - pub fn mime(&self) -> &Mime { - match self { - Self::File { mime, .. } => mime, - } - } - - pub fn group(&self) -> &HashMap> { - match self { - Self::File { group, .. } => group, + Self::File { key, .. } => key.clone(), } } } diff --git a/crates/pile-value/src/value/readers.rs b/crates/pile-value/src/value/readers.rs index b7dac78..81cb39d 100644 --- a/crates/pile-value/src/value/readers.rs +++ b/crates/pile-value/src/value/readers.rs @@ -1,5 +1,10 @@ use pile_io::{AsyncReader, AsyncSeekReader}; -use std::{fs::File, io::Seek}; +use std::{ + fs::File, + io::{Cursor, Seek}, +}; + +use crate::value::ArcBytes; // // MARK: itemreader @@ -7,12 +12,14 @@ use std::{fs::File, io::Seek}; pub enum ItemReader { File(File), + Vec(Cursor), } impl AsyncReader for ItemReader { async fn read(&mut self, buf: &mut [u8]) -> Result { match self { Self::File(x) => std::io::Read::read(x, buf), + Self::Vec(x) => std::io::Read::read(x, buf), } } } @@ -21,6 +28,7 @@ impl AsyncSeekReader for ItemReader { async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { match self { Self::File(x) => x.seek(pos), + Self::Vec(x) => x.seek(pos), } } } diff --git a/crates/pile-value/src/value/value.rs b/crates/pile-value/src/value/value.rs index 3e7f0d4..b08c64c 100644 --- a/crates/pile-value/src/value/value.rs +++ b/crates/pile-value/src/value/value.rs @@ -1,19 +1,75 @@ use mime::Mime; use pile_config::objectpath::{ObjectPath, PathSegment}; +use pile_io::SyncReadBridge; use serde_json::{Map, Value}; use smartstring::{LazyCompact, SmartString}; -use std::sync::Arc; +use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc}; use crate::{ extract::{ - item::{ImageExtractor, ItemExtractor}, + blob::BinaryExtractor, + item::ItemExtractor, misc::{ArrayExtractor, MapExtractor, VecExtractor}, string::StringExtractor, traits::{ExtractState, ListExtractor, ObjectExtractor}, }, - value::Item, + value::{Item, ItemReader}, }; +#[derive(Clone)] +pub struct ArcBytes(pub Arc>); +impl Debug for ArcBytes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArcBytes") + .field("len()", &self.0.len()) + .finish() + } +} + +impl AsRef<[u8]> for ArcBytes { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +#[derive(Debug, Clone)] +pub enum BinaryPileValue { + /// A binary blob + Blob { mime: Mime, bytes: ArcBytes }, + + /// An pointer to a file + File { mime: Mime, path: PathBuf }, +} + +impl BinaryPileValue { + /// Open the item for reading. + pub async fn read(&self) -> Result { + match self { + Self::File { path, .. } => Ok(ItemReader::File(File::open(path)?)), + Self::Blob { bytes, .. } => Ok(ItemReader::Vec(Cursor::new(bytes.clone()))), + } + } + + pub async fn hash(&self) -> Result { + let read = self.read().await?; + let mut read = SyncReadBridge::new_current(read); + let out = tokio::task::spawn_blocking(move || { + let mut hasher = blake3::Hasher::new(); + std::io::copy(&mut read, &mut hasher)?; + return Ok::<_, std::io::Error>(hasher.finalize()); + }) + .await??; + return Ok(out); + } + + pub fn mime(&self) -> &Mime { + match self { + Self::Blob { mime, .. } => mime, + Self::File { mime, .. } => mime, + } + } +} + /// An immutable, cheaply-cloneable, lazily-computed value. /// Very similar to [serde_json::Value]. pub enum PileValue { @@ -27,12 +83,6 @@ pub enum PileValue { /// An array of values Array(Arc>), - /// A binary blob - Blob { - mime: Mime, - bytes: Arc>, - }, - /// A lazily-computed map of {label: value} ObjectExtractor(Arc), @@ -41,6 +91,9 @@ pub enum PileValue { /// An pointer to an item in this dataset Item(Item), + + /// Binary data + Binary(BinaryPileValue), } impl Clone for PileValue { @@ -53,11 +106,8 @@ impl Clone for PileValue { Self::Array(x) => Self::Array(x.clone()), Self::ObjectExtractor(x) => Self::ObjectExtractor(x.clone()), Self::ListExtractor(x) => Self::ListExtractor(x.clone()), - Self::Blob { mime, bytes } => Self::Blob { - mime: mime.clone(), - bytes: bytes.clone(), - }, Self::Item(i) => Self::Item(i.clone()), + Self::Binary(b) => Self::Binary(b.clone()), } } } @@ -70,13 +120,10 @@ impl PileValue { Self::I64(_) => Arc::new(MapExtractor::default()), Self::Array(_) => Arc::new(MapExtractor::default()), Self::String(s) => Arc::new(StringExtractor::new(s)), - Self::Blob { mime, bytes } => { - // TODO: make a blobextractor (with pdf, epub, etc; like item) - Arc::new(ImageExtractor::from_blob(bytes.clone(), mime.clone())) - } Self::ListExtractor(_) => Arc::new(MapExtractor::default()), Self::ObjectExtractor(e) => e.clone(), Self::Item(i) => Arc::new(ItemExtractor::new(i)), + Self::Binary(b) => Arc::new(BinaryExtractor::new(b)), } } @@ -87,12 +134,12 @@ impl PileValue { Self::I64(_) => Arc::new(VecExtractor::default()), Self::Array(a) => Arc::new(ArrayExtractor::new(a.clone())), Self::String(_) => Arc::new(VecExtractor::default()), - Self::Blob { .. } => Arc::new(VecExtractor::default()), Self::ListExtractor(e) => e.clone(), Self::ObjectExtractor(e) => e .as_list() .unwrap_or_else(|| Arc::new(VecExtractor::default())), Self::Item(_) => Arc::new(VecExtractor::default()), + Self::Binary(_) => Arc::new(VecExtractor::default()), } } @@ -197,14 +244,17 @@ impl PileValue { Ok(match self { Self::Null => None, - Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => { - Some(Value::Number(1u64.into())) - } + Self::U64(_) + | Self::I64(_) + | Self::String(_) + | Self::Binary(BinaryPileValue::Blob { .. }) => Some(Value::Number(1u64.into())), Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())), Self::ListExtractor(x) => (x.len(state).await? > 0).then(|| Value::Number(1u64.into())), - Self::ObjectExtractor(_) | Self::Item(_) => { + Self::ObjectExtractor(_) + | Self::Item(_) + | Self::Binary(BinaryPileValue::File { .. }) => { let e = self.object_extractor(); let keys = e.fields().await?; let mut map = Map::new(); @@ -241,8 +291,8 @@ impl PileValue { Self::String(x) => Value::String(x.to_string()), // TODO: replace with something meaningful? - Self::Blob { mime, bytes } => { - Value::String(format!("", bytes.len())) + Self::Binary(BinaryPileValue::Blob { mime, bytes }) => { + Value::String(format!("", bytes.0.len())) } Self::Array(_) | Self::ListExtractor(_) => { @@ -250,7 +300,9 @@ impl PileValue { return e.to_json(state).await; } - Self::ObjectExtractor(_) | Self::Item(_) => { + Self::ObjectExtractor(_) + | Self::Item(_) + | Self::Binary(BinaryPileValue::File { .. }) => { let e = self.object_extractor(); return e.to_json(state).await; } diff --git a/crates/pile/src/command/item.rs b/crates/pile/src/command/item.rs index 17e1e1a..1ada4df 100644 --- a/crates/pile/src/command/item.rs +++ b/crates/pile/src/command/item.rs @@ -22,7 +22,7 @@ pub struct ItemCommand { /// If present, print the schema fields instead of item data #[arg(long)] - fields: bool, + schema: bool, #[arg(long, short = 'x')] exclude: Vec, @@ -58,7 +58,7 @@ impl CliCmd for ItemCommand { })?; let pv = PileValue::Item(item); - if self.fields { + if self.schema { let mut map = serde_json::Map::new(); for (name, spec) in &ds.config.schema { if self.exclude.contains(&name.to_string()) {