diff --git a/Cargo.lock b/Cargo.lock index 3653aec..cbc72e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2418,6 +2418,8 @@ dependencies = [ "id3", "itertools 0.14.0", "kamadak-exif", + "mime", + "mime_guess", "pdf", "pile-config", "pile-flac", diff --git a/Cargo.toml b/Cargo.toml index 43bac9b..fa52637 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ rand = "0.10.0" strum = { version = "0.27.2", features = ["derive"] } walkdir = "2.5.0" mime = "0.3.17" +mime_guess = "2.0.5" paste = "1.0.15" smartstring = "1.0.1" chrono = "0.4.43" diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index f52fcde..8e55456 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -30,3 +30,5 @@ tokio = { workspace = true } tokio-stream = { workspace = true } async-trait = { workspace = true } aws-sdk-s3 = { workspace = true } +mime = { workspace = true } +mime_guess = { workspace = true } diff --git a/crates/pile-dataset/src/extract/flac.rs b/crates/pile-dataset/src/extract/flac.rs index 0a42f01..bb3d872 100644 --- a/crates/pile-dataset/src/extract/flac.rs +++ b/crates/pile-dataset/src/extract/flac.rs @@ -1,6 +1,11 @@ +use mime::Mime; use pile_config::Label; use pile_flac::{FlacBlock, FlacReader}; -use std::{collections::HashMap, io::BufReader, sync::OnceLock}; +use std::{ + collections::HashMap, + io::BufReader, + sync::{Arc, OnceLock}, +}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor}; @@ -34,20 +39,25 @@ impl<'a> FlacExtractor<'a> { } let reader = SyncReadBridge::new_current(self.item.read().await?); - let raw_tags = tokio::task::spawn_blocking(move || { + let (raw_tags, raw_images) = tokio::task::spawn_blocking(move || { let reader = FlacReader::new(BufReader::new(reader)); let mut tags: Vec<(String, String)> = Vec::new(); + let mut images: Vec<(Mime, Vec)> = Vec::new(); for block in reader { - if let FlacBlock::VorbisComment(comment) = - block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? - { - for (k, v) in comment.comment.comments { - tags.push((k.to_string().to_lowercase(), v.into())); + match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? { + FlacBlock::VorbisComment(comment) => { + for (k, v) in comment.comment.comments { + tags.push((k.to_string().to_lowercase(), v.into())); + } } - break; + FlacBlock::Picture(picture) => { + images.push((picture.mime, picture.img_data)); + } + FlacBlock::AudioFrame(_) => break, + _ => {} } } - Ok::<_, std::io::Error>(tags) + Ok::<_, std::io::Error>((tags, images)) }) .await .map_err(std::io::Error::other)??; @@ -61,12 +71,24 @@ impl<'a> FlacExtractor<'a> { .push(PileValue::String(v.into())); } } - - let output = output + let mut output: HashMap> = output .into_iter() .map(|(k, v)| (k, PileValue::Array(v))) .collect(); + if !raw_images.is_empty() { + if let Some(label) = Label::new("images".to_string()) { + let images = raw_images + .into_iter() + .map(|(mime, data)| PileValue::Blob { + mime, + bytes: Arc::new(data), + }) + .collect(); + output.insert(label, PileValue::Array(images)); + } + } + let _ = self.output.set(output); #[expect(clippy::unwrap_used)] return Ok(self.output.get().unwrap()); diff --git a/crates/pile-dataset/src/extract/toml.rs b/crates/pile-dataset/src/extract/toml.rs index 4713084..6447441 100644 --- a/crates/pile-dataset/src/extract/toml.rs +++ b/crates/pile-dataset/src/extract/toml.rs @@ -1,7 +1,7 @@ use pile_config::Label; use std::{collections::HashMap, sync::OnceLock}; -use crate::{Item, PileValue, Reader, extract::Extractor}; +use crate::{AsyncReader, Item, PileValue, extract::Extractor}; fn toml_to_pile(value: toml::Value) -> PileValue<'static> { match value { diff --git a/crates/pile-dataset/src/index/index_fts.rs b/crates/pile-dataset/src/index/index_fts.rs index 2d5a03c..a5a7517 100644 --- a/crates/pile-dataset/src/index/index_fts.rs +++ b/crates/pile-dataset/src/index/index_fts.rs @@ -144,6 +144,8 @@ impl DbFtsIndex { loop { val = match val { + PileValue::String(x) => return Ok(Some(x.to_string())), + #[expect(clippy::unwrap_used)] PileValue::Array(ref mut x) => { if x.len() == 1 { @@ -161,30 +163,37 @@ impl DbFtsIndex { message = "Skipping field, is empty array", field = field_name.to_string(), ?path, - //value = ?val ); continue 'outer; } } + PileValue::Null => { trace!( message = "Skipping field, is null", field = field_name.to_string(), ?path, - //value = ?val ); continue 'outer; } + PileValue::Extractor(_) => { trace!( message = "Skipping field, is object", field = field_name.to_string(), ?path, - //value = ?val ); continue 'outer; } - PileValue::String(x) => return Ok(Some(x.to_string())), + + PileValue::Blob { .. } => { + trace!( + message = "Skipping field, is blob", + field = field_name.to_string(), + ?path, + ); + continue 'outer; + } } } } @@ -291,6 +300,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option match val { PileValue::Null => return None, + PileValue::Blob { .. } => return None, PileValue::Extractor(_) => return None, PileValue::String(x) => PileValue::String(x.to_lowercase().into()), @@ -301,6 +311,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option match val { PileValue::Null => return None, + PileValue::Blob { .. } => return None, PileValue::Extractor(_) => return None, PileValue::String(x) => PileValue::String(x.to_uppercase().into()), @@ -311,6 +322,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option match val { PileValue::Null => return None, + PileValue::Blob { .. } => return None, PileValue::Extractor(_) => return None, PileValue::String(x) => { @@ -324,6 +336,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option match val { PileValue::Null => return None, + PileValue::Blob { .. } => return None, PileValue::Extractor(_) => return None, PileValue::String(x) => { @@ -337,6 +350,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option match val { PileValue::Null => return None, + PileValue::Blob { .. } => return None, PileValue::Extractor(_) => return None, PileValue::String(x) => PileValue::String(x.clone()), diff --git a/crates/pile-dataset/src/item.rs b/crates/pile-dataset/src/item.rs index acaa664..778b9cd 100644 --- a/crates/pile-dataset/src/item.rs +++ b/crates/pile-dataset/src/item.rs @@ -1,3 +1,4 @@ +use mime::Mime; use smartstring::{LazyCompact, SmartString}; use std::{ fs::File, @@ -17,6 +18,7 @@ use crate::source::{DirDataSource, S3DataSource}; pub enum Item { File { source: Arc, + mime: Mime, path: PathBuf, sidecar: Option>, @@ -24,6 +26,7 @@ pub enum Item { S3 { source: Arc, + mime: Mime, key: SmartString, sidecar: Option>, @@ -88,6 +91,13 @@ impl Item { } } + pub fn mime(&self) -> &Mime { + match self { + Self::File { mime, .. } => mime, + Self::S3 { mime, .. } => mime, + } + } + pub fn sidecar(&self) -> Option<&Self> { match self { Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x), @@ -100,15 +110,13 @@ impl Item { // MARK: reader // -pub trait Reader: Send { +pub trait AsyncReader: Send { /// Read a chunk of bytes. fn read( &mut self, buf: &mut [u8], ) -> impl Future> + Send; - fn seek(&mut self, pos: SeekFrom) -> impl Future> + Send; - /// Read all remaining bytes into a `Vec`. fn read_to_end(&mut self) -> impl Future, std::io::Error>> + Send { async { @@ -126,6 +134,10 @@ pub trait Reader: Send { } } +pub trait AsyncSeekReader: AsyncReader { + fn seek(&mut self, pos: SeekFrom) -> impl Future> + Send; +} + // // MARK: sync bridge // @@ -135,12 +147,12 @@ pub trait Reader: Send { /// Never use this outside of [tokio::task::spawn_blocking], /// the async runtime will deadlock if this struct blocks /// the runtime. -pub struct SyncReadBridge { +pub struct SyncReadBridge { inner: R, handle: Handle, } -impl SyncReadBridge { +impl SyncReadBridge { /// Creates a new adapter using a handle to the current runtime. /// Panics if called outside of tokio pub fn new_current(inner: R) -> Self { @@ -153,13 +165,13 @@ impl SyncReadBridge { } } -impl Read for SyncReadBridge { +impl Read for SyncReadBridge { fn read(&mut self, buf: &mut [u8]) -> Result { self.handle.block_on(self.inner.read(buf)) } } -impl Seek for SyncReadBridge { +impl Seek for SyncReadBridge { fn seek(&mut self, pos: SeekFrom) -> Result { self.handle.block_on(self.inner.seek(pos)) } @@ -174,14 +186,16 @@ pub enum ItemReader { S3(S3Reader), } -impl Reader for ItemReader { +impl AsyncReader for ItemReader { async fn read(&mut self, buf: &mut [u8]) -> Result { match self { Self::File(x) => std::io::Read::read(x, buf), Self::S3(x) => x.read(buf).await, } } +} +impl AsyncSeekReader for ItemReader { async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { match self { Self::File(x) => x.seek(pos), @@ -202,7 +216,7 @@ pub struct S3Reader { size: u64, } -impl Reader for S3Reader { +impl AsyncReader for S3Reader { async fn read(&mut self, buf: &mut [u8]) -> Result { let len_left = self.size.saturating_sub(self.cursor); if len_left == 0 || buf.is_empty() { @@ -235,7 +249,9 @@ impl Reader for S3Reader { self.cursor += n as u64; Ok(n) } +} +impl AsyncSeekReader for S3Reader { async fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(x) => self.cursor = x.min(self.size), diff --git a/crates/pile-dataset/src/source/dir.rs b/crates/pile-dataset/src/source/dir.rs index 7906586..8df2f71 100644 --- a/crates/pile-dataset/src/source/dir.rs +++ b/crates/pile-dataset/src/source/dir.rs @@ -42,10 +42,12 @@ impl DataSource for Arc { return Ok(Some(Item::File { source: Arc::clone(self), + mime: mime_guess::from_path(&key).first_or_octet_stream(), path: key.clone(), sidecar: self.sidecars.then(|| { Box::new(Item::File { source: Arc::clone(self), + mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(), path: key.with_extension("toml"), sidecar: None, }) @@ -83,11 +85,14 @@ impl DataSource for Arc { Some("toml") if source.sidecars => continue, Some(_) => Item::File { source: Arc::clone(&source), + mime: mime_guess::from_path(&path).first_or_octet_stream(), path: path.clone(), sidecar: source.sidecars.then(|| { Box::new(Item::File { source: Arc::clone(&source), + mime: mime_guess::from_path(path.with_extension("toml")) + .first_or_octet_stream(), path: path.with_extension("toml"), sidecar: None, }) diff --git a/crates/pile-dataset/src/source/s3.rs b/crates/pile-dataset/src/source/s3.rs index 889bb2a..8525128 100644 --- a/crates/pile-dataset/src/source/s3.rs +++ b/crates/pile-dataset/src/source/s3.rs @@ -92,6 +92,7 @@ impl S3DataSource { async fn make_item(self: &Arc, key: impl Into>) -> Item { let key: SmartString = key.into(); + let mime = mime_guess::from_path(key.as_str()).first_or_octet_stream(); let sidecar = if self.sidecars { self.find_sidecar_key(key.as_str()) @@ -99,6 +100,7 @@ impl S3DataSource { .map(|sidecar_key| { Box::new(Item::S3 { source: Arc::clone(self), + mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(), key: sidecar_key, sidecar: None, }) @@ -109,6 +111,7 @@ impl S3DataSource { Item::S3 { source: Arc::clone(self), + mime, key, sidecar, } diff --git a/crates/pile-dataset/src/value.rs b/crates/pile-dataset/src/value.rs index 25d6bba..fef3cda 100644 --- a/crates/pile-dataset/src/value.rs +++ b/crates/pile-dataset/src/value.rs @@ -1,3 +1,4 @@ +use mime::Mime; use pile_config::objectpath::{ObjectPath, PathSegment}; use serde_json::{Map, Value}; use smartstring::{LazyCompact, SmartString}; @@ -15,6 +16,12 @@ pub enum PileValue<'a> { /// An array of values Array(Vec>), + /// A binary blob + Blob { + mime: Mime, + bytes: Arc>, + }, + /// A lazily-computed map of {label: value} Extractor(Arc), } @@ -26,6 +33,10 @@ impl Clone for PileValue<'_> { Self::String(x) => Self::String(x.clone()), Self::Array(x) => Self::Array(x.clone()), Self::Extractor(x) => Self::Extractor(x.clone()), + Self::Blob { mime, bytes } => Self::Blob { + mime: mime.clone(), + bytes: bytes.clone(), + }, } } } @@ -43,6 +54,7 @@ impl<'a> PileValue<'a> { Some(Self::Null) => None, Some(Self::Array(_)) => None, Some(Self::String(_)) => None, + Some(Self::Blob { .. }) => None, Some(Self::Extractor(e)) => e.field(field).await?, } } @@ -51,6 +63,7 @@ impl<'a> PileValue<'a> { out = match &out { None => return Ok(None), Some(Self::Null) => None, + Some(Self::Blob { .. }) => None, Some(Self::Array(v)) => { let idx = if *idx >= 0 { usize::try_from(*idx).ok() @@ -80,6 +93,11 @@ impl<'a> PileValue<'a> { pub async fn to_json(&self) -> Result { Ok(match self { Self::Null => Value::Null, + + // TODO: replace with something meaningful + Self::Blob { mime, bytes } => { + Value::String(format!("", bytes.len())) + } Self::String(x) => Value::String(x.to_string()), Self::Array(x) => {