Item types and Blob values

This commit is contained in:
2026-03-10 17:00:16 -07:00
parent 350f3e3dc9
commit a19c597531
10 changed files with 108 additions and 25 deletions

2
Cargo.lock generated
View File

@@ -2418,6 +2418,8 @@ dependencies = [
"id3", "id3",
"itertools 0.14.0", "itertools 0.14.0",
"kamadak-exif", "kamadak-exif",
"mime",
"mime_guess",
"pdf", "pdf",
"pile-config", "pile-config",
"pile-flac", "pile-flac",

View File

@@ -122,6 +122,7 @@ rand = "0.10.0"
strum = { version = "0.27.2", features = ["derive"] } strum = { version = "0.27.2", features = ["derive"] }
walkdir = "2.5.0" walkdir = "2.5.0"
mime = "0.3.17" mime = "0.3.17"
mime_guess = "2.0.5"
paste = "1.0.15" paste = "1.0.15"
smartstring = "1.0.1" smartstring = "1.0.1"
chrono = "0.4.43" chrono = "0.4.43"

View File

@@ -30,3 +30,5 @@ tokio = { workspace = true }
tokio-stream = { workspace = true } tokio-stream = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true } aws-sdk-s3 = { workspace = true }
mime = { workspace = true }
mime_guess = { workspace = true }

View File

@@ -1,6 +1,11 @@
use mime::Mime;
use pile_config::Label; use pile_config::Label;
use pile_flac::{FlacBlock, FlacReader}; use pile_flac::{FlacBlock, FlacReader};
use std::{collections::HashMap, io::BufReader, sync::OnceLock}; use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
@@ -34,20 +39,25 @@ impl<'a> FlacExtractor<'a> {
} }
let reader = SyncReadBridge::new_current(self.item.read().await?); let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_tags = tokio::task::spawn_blocking(move || { let (raw_tags, raw_images) = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader)); let reader = FlacReader::new(BufReader::new(reader));
let mut tags: Vec<(String, String)> = Vec::new(); let mut tags: Vec<(String, String)> = Vec::new();
let mut images: Vec<(Mime, Vec<u8>)> = Vec::new();
for block in reader { for block in reader {
if let FlacBlock::VorbisComment(comment) = match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? FlacBlock::VorbisComment(comment) => {
{
for (k, v) in comment.comment.comments { for (k, v) in comment.comment.comments {
tags.push((k.to_string().to_lowercase(), v.into())); tags.push((k.to_string().to_lowercase(), v.into()));
} }
break; }
FlacBlock::Picture(picture) => {
images.push((picture.mime, picture.img_data));
}
FlacBlock::AudioFrame(_) => break,
_ => {}
} }
} }
Ok::<_, std::io::Error>(tags) Ok::<_, std::io::Error>((tags, images))
}) })
.await .await
.map_err(std::io::Error::other)??; .map_err(std::io::Error::other)??;
@@ -61,12 +71,24 @@ impl<'a> FlacExtractor<'a> {
.push(PileValue::String(v.into())); .push(PileValue::String(v.into()));
} }
} }
let mut output: HashMap<Label, PileValue<'a>> = output
let output = output
.into_iter() .into_iter()
.map(|(k, v)| (k, PileValue::Array(v))) .map(|(k, v)| (k, PileValue::Array(v)))
.collect(); .collect();
if !raw_images.is_empty() {
if let Some(label) = Label::new("images".to_string()) {
let images = raw_images
.into_iter()
.map(|(mime, data)| PileValue::Blob {
mime,
bytes: Arc::new(data),
})
.collect();
output.insert(label, PileValue::Array(images));
}
}
let _ = self.output.set(output); let _ = self.output.set(output);
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap()); return Ok(self.output.get().unwrap());

View File

@@ -1,7 +1,7 @@
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, sync::OnceLock}; use std::{collections::HashMap, sync::OnceLock};
use crate::{Item, PileValue, Reader, extract::Extractor}; use crate::{AsyncReader, Item, PileValue, extract::Extractor};
fn toml_to_pile(value: toml::Value) -> PileValue<'static> { fn toml_to_pile(value: toml::Value) -> PileValue<'static> {
match value { match value {

View File

@@ -144,6 +144,8 @@ impl DbFtsIndex {
loop { loop {
val = match val { val = match val {
PileValue::String(x) => return Ok(Some(x.to_string())),
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
PileValue::Array(ref mut x) => { PileValue::Array(ref mut x) => {
if x.len() == 1 { if x.len() == 1 {
@@ -161,30 +163,37 @@ impl DbFtsIndex {
message = "Skipping field, is empty array", message = "Skipping field, is empty array",
field = field_name.to_string(), field = field_name.to_string(),
?path, ?path,
//value = ?val
); );
continue 'outer; continue 'outer;
} }
} }
PileValue::Null => { PileValue::Null => {
trace!( trace!(
message = "Skipping field, is null", message = "Skipping field, is null",
field = field_name.to_string(), field = field_name.to_string(),
?path, ?path,
//value = ?val
); );
continue 'outer; continue 'outer;
} }
PileValue::Extractor(_) => { PileValue::Extractor(_) => {
trace!( trace!(
message = "Skipping field, is object", message = "Skipping field, is object",
field = field_name.to_string(), field = field_name.to_string(),
?path, ?path,
//value = ?val
); );
continue 'outer; continue 'outer;
} }
PileValue::String(x) => return Ok(Some(x.to_string())),
PileValue::Blob { .. } => {
trace!(
message = "Skipping field, is blob",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
} }
} }
} }
@@ -291,6 +300,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option<PileValue<
FieldSpecPost::SetCase { case: Case::Lower } => match val { FieldSpecPost::SetCase { case: Case::Lower } => match val {
PileValue::Null => return None, PileValue::Null => return None,
PileValue::Blob { .. } => return None,
PileValue::Extractor(_) => return None, PileValue::Extractor(_) => return None,
PileValue::String(x) => PileValue::String(x.to_lowercase().into()), PileValue::String(x) => PileValue::String(x.to_lowercase().into()),
@@ -301,6 +311,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option<PileValue<
FieldSpecPost::SetCase { case: Case::Upper } => match val { FieldSpecPost::SetCase { case: Case::Upper } => match val {
PileValue::Null => return None, PileValue::Null => return None,
PileValue::Blob { .. } => return None,
PileValue::Extractor(_) => return None, PileValue::Extractor(_) => return None,
PileValue::String(x) => PileValue::String(x.to_uppercase().into()), PileValue::String(x) => PileValue::String(x.to_uppercase().into()),
@@ -311,6 +322,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option<PileValue<
FieldSpecPost::TrimSuffix { trim_suffix } => match val { FieldSpecPost::TrimSuffix { trim_suffix } => match val {
PileValue::Null => return None, PileValue::Null => return None,
PileValue::Blob { .. } => return None,
PileValue::Extractor(_) => return None, PileValue::Extractor(_) => return None,
PileValue::String(x) => { PileValue::String(x) => {
@@ -324,6 +336,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option<PileValue<
FieldSpecPost::TrimPrefix { trim_prefix } => match val { FieldSpecPost::TrimPrefix { trim_prefix } => match val {
PileValue::Null => return None, PileValue::Null => return None,
PileValue::Blob { .. } => return None,
PileValue::Extractor(_) => return None, PileValue::Extractor(_) => return None,
PileValue::String(x) => { PileValue::String(x) => {
@@ -337,6 +350,7 @@ pub fn apply<'a>(post: &FieldSpecPost, val: &PileValue<'a>) -> Option<PileValue<
FieldSpecPost::Join { join } => match val { FieldSpecPost::Join { join } => match val {
PileValue::Null => return None, PileValue::Null => return None,
PileValue::Blob { .. } => return None,
PileValue::Extractor(_) => return None, PileValue::Extractor(_) => return None,
PileValue::String(x) => PileValue::String(x.clone()), PileValue::String(x) => PileValue::String(x.clone()),

View File

@@ -1,3 +1,4 @@
use mime::Mime;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use std::{ use std::{
fs::File, fs::File,
@@ -17,6 +18,7 @@ use crate::source::{DirDataSource, S3DataSource};
pub enum Item { pub enum Item {
File { File {
source: Arc<DirDataSource>, source: Arc<DirDataSource>,
mime: Mime,
path: PathBuf, path: PathBuf,
sidecar: Option<Box<Item>>, sidecar: Option<Box<Item>>,
@@ -24,6 +26,7 @@ pub enum Item {
S3 { S3 {
source: Arc<S3DataSource>, source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>, key: SmartString<LazyCompact>,
sidecar: Option<Box<Item>>, sidecar: Option<Box<Item>>,
@@ -88,6 +91,13 @@ impl Item {
} }
} }
pub fn mime(&self) -> &Mime {
match self {
Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
}
}
pub fn sidecar(&self) -> Option<&Self> { pub fn sidecar(&self) -> Option<&Self> {
match self { match self {
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x), Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
@@ -100,15 +110,13 @@ impl Item {
// MARK: reader // MARK: reader
// //
pub trait Reader: Send { pub trait AsyncReader: Send {
/// Read a chunk of bytes. /// Read a chunk of bytes.
fn read( fn read(
&mut self, &mut self,
buf: &mut [u8], buf: &mut [u8],
) -> impl Future<Output = Result<usize, std::io::Error>> + Send; ) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
/// Read all remaining bytes into a `Vec`. /// Read all remaining bytes into a `Vec`.
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send { fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
async { async {
@@ -126,6 +134,10 @@ pub trait Reader: Send {
} }
} }
pub trait AsyncSeekReader: AsyncReader {
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
}
// //
// MARK: sync bridge // MARK: sync bridge
// //
@@ -135,12 +147,12 @@ pub trait Reader: Send {
/// Never use this outside of [tokio::task::spawn_blocking], /// Never use this outside of [tokio::task::spawn_blocking],
/// the async runtime will deadlock if this struct blocks /// the async runtime will deadlock if this struct blocks
/// the runtime. /// the runtime.
pub struct SyncReadBridge<R: Reader> { pub struct SyncReadBridge<R: AsyncReader> {
inner: R, inner: R,
handle: Handle, handle: Handle,
} }
impl<R: Reader> SyncReadBridge<R> { impl<R: AsyncReader> SyncReadBridge<R> {
/// Creates a new adapter using a handle to the current runtime. /// Creates a new adapter using a handle to the current runtime.
/// Panics if called outside of tokio /// Panics if called outside of tokio
pub fn new_current(inner: R) -> Self { pub fn new_current(inner: R) -> Self {
@@ -153,13 +165,13 @@ impl<R: Reader> SyncReadBridge<R> {
} }
} }
impl<R: Reader> Read for SyncReadBridge<R> { impl<R: AsyncReader> Read for SyncReadBridge<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.handle.block_on(self.inner.read(buf)) self.handle.block_on(self.inner.read(buf))
} }
} }
impl<R: Reader> Seek for SyncReadBridge<R> { impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.handle.block_on(self.inner.seek(pos)) self.handle.block_on(self.inner.seek(pos))
} }
@@ -174,14 +186,16 @@ pub enum ItemReader {
S3(S3Reader), S3(S3Reader),
} }
impl Reader for ItemReader { impl AsyncReader for ItemReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self { match self {
Self::File(x) => std::io::Read::read(x, buf), Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await, Self::S3(x) => x.read(buf).await,
} }
} }
}
impl AsyncSeekReader for ItemReader {
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> { async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match self { match self {
Self::File(x) => x.seek(pos), Self::File(x) => x.seek(pos),
@@ -202,7 +216,7 @@ pub struct S3Reader {
size: u64, size: u64,
} }
impl Reader for S3Reader { impl AsyncReader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor); let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() { if len_left == 0 || buf.is_empty() {
@@ -235,7 +249,9 @@ impl Reader for S3Reader {
self.cursor += n as u64; self.cursor += n as u64;
Ok(n) Ok(n)
} }
}
impl AsyncSeekReader for S3Reader {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos { match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size), SeekFrom::Start(x) => self.cursor = x.min(self.size),

View File

@@ -42,10 +42,12 @@ impl DataSource for Arc<DirDataSource> {
return Ok(Some(Item::File { return Ok(Some(Item::File {
source: Arc::clone(self), source: Arc::clone(self),
mime: mime_guess::from_path(&key).first_or_octet_stream(),
path: key.clone(), path: key.clone(),
sidecar: self.sidecars.then(|| { sidecar: self.sidecars.then(|| {
Box::new(Item::File { Box::new(Item::File {
source: Arc::clone(self), source: Arc::clone(self),
mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(),
path: key.with_extension("toml"), path: key.with_extension("toml"),
sidecar: None, sidecar: None,
}) })
@@ -83,11 +85,14 @@ impl DataSource for Arc<DirDataSource> {
Some("toml") if source.sidecars => continue, Some("toml") if source.sidecars => continue,
Some(_) => Item::File { Some(_) => Item::File {
source: Arc::clone(&source), source: Arc::clone(&source),
mime: mime_guess::from_path(&path).first_or_octet_stream(),
path: path.clone(), path: path.clone(),
sidecar: source.sidecars.then(|| { sidecar: source.sidecars.then(|| {
Box::new(Item::File { Box::new(Item::File {
source: Arc::clone(&source), source: Arc::clone(&source),
mime: mime_guess::from_path(path.with_extension("toml"))
.first_or_octet_stream(),
path: path.with_extension("toml"), path: path.with_extension("toml"),
sidecar: None, sidecar: None,
}) })

View File

@@ -92,6 +92,7 @@ impl S3DataSource {
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item { async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
let key: SmartString<LazyCompact> = key.into(); let key: SmartString<LazyCompact> = key.into();
let mime = mime_guess::from_path(key.as_str()).first_or_octet_stream();
let sidecar = if self.sidecars { let sidecar = if self.sidecars {
self.find_sidecar_key(key.as_str()) self.find_sidecar_key(key.as_str())
@@ -99,6 +100,7 @@ impl S3DataSource {
.map(|sidecar_key| { .map(|sidecar_key| {
Box::new(Item::S3 { Box::new(Item::S3 {
source: Arc::clone(self), source: Arc::clone(self),
mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(),
key: sidecar_key, key: sidecar_key,
sidecar: None, sidecar: None,
}) })
@@ -109,6 +111,7 @@ impl S3DataSource {
Item::S3 { Item::S3 {
source: Arc::clone(self), source: Arc::clone(self),
mime,
key, key,
sidecar, sidecar,
} }

View File

@@ -1,3 +1,4 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment}; use pile_config::objectpath::{ObjectPath, PathSegment};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
@@ -15,6 +16,12 @@ pub enum PileValue<'a> {
/// An array of values /// An array of values
Array(Vec<PileValue<'a>>), Array(Vec<PileValue<'a>>),
/// A binary blob
Blob {
mime: Mime,
bytes: Arc<Vec<u8>>,
},
/// A lazily-computed map of {label: value} /// A lazily-computed map of {label: value}
Extractor(Arc<dyn Extractor + 'a>), Extractor(Arc<dyn Extractor + 'a>),
} }
@@ -26,6 +33,10 @@ impl Clone for PileValue<'_> {
Self::String(x) => Self::String(x.clone()), Self::String(x) => Self::String(x.clone()),
Self::Array(x) => Self::Array(x.clone()), Self::Array(x) => Self::Array(x.clone()),
Self::Extractor(x) => Self::Extractor(x.clone()), Self::Extractor(x) => Self::Extractor(x.clone()),
Self::Blob { mime, bytes } => Self::Blob {
mime: mime.clone(),
bytes: bytes.clone(),
},
} }
} }
} }
@@ -43,6 +54,7 @@ impl<'a> PileValue<'a> {
Some(Self::Null) => None, Some(Self::Null) => None,
Some(Self::Array(_)) => None, Some(Self::Array(_)) => None,
Some(Self::String(_)) => None, Some(Self::String(_)) => None,
Some(Self::Blob { .. }) => None,
Some(Self::Extractor(e)) => e.field(field).await?, Some(Self::Extractor(e)) => e.field(field).await?,
} }
} }
@@ -51,6 +63,7 @@ impl<'a> PileValue<'a> {
out = match &out { out = match &out {
None => return Ok(None), None => return Ok(None),
Some(Self::Null) => None, Some(Self::Null) => None,
Some(Self::Blob { .. }) => None,
Some(Self::Array(v)) => { Some(Self::Array(v)) => {
let idx = if *idx >= 0 { let idx = if *idx >= 0 {
usize::try_from(*idx).ok() usize::try_from(*idx).ok()
@@ -80,6 +93,11 @@ impl<'a> PileValue<'a> {
pub async fn to_json(&self) -> Result<Value, std::io::Error> { pub async fn to_json(&self) -> Result<Value, std::io::Error> {
Ok(match self { Ok(match self {
Self::Null => Value::Null, Self::Null => Value::Null,
// TODO: replace with something meaningful
Self::Blob { mime, bytes } => {
Value::String(format!("<Blob ({mime}, {} bytes)>", bytes.len()))
}
Self::String(x) => Value::String(x.to_string()), Self::String(x) => Value::String(x.to_string()),
Self::Array(x) => { Self::Array(x) => {