Compare commits

..

16 Commits

Author SHA1 Message Date
583a1aa6b1 Flatten arrays for FTS index
Some checks failed
CI / Typos (push) Successful in 31s
CI / Clippy (push) Failing after 1m13s
CI / Build and test (all features) (push) Successful in 4m22s
CI / Build and test (push) Successful in 6m2s
2026-03-16 09:56:48 -07:00
2a2d5af36c Sidecar fixes 2026-03-16 09:56:48 -07:00
1d90306408 Add json extractor 2026-03-16 09:56:48 -07:00
979fbb9b0d Filter by mime 2026-03-16 09:56:48 -07:00
8041fc7531 upload subcommand 2026-03-16 09:56:48 -07:00
4ce563ae80 Consistent paths, disable sources 2026-03-16 09:56:48 -07:00
26a428dedc Refactor errors 2026-03-16 09:56:48 -07:00
60483dd53d FLAC image extractor tweak 2026-03-16 09:56:48 -07:00
eea01616a3 Improve arg parsing 2026-03-16 09:56:48 -07:00
2af318c0ec Fix tests 2026-03-16 09:56:48 -07:00
8dd617a24d Bump versions 2026-03-16 09:56:48 -07:00
6c7b23a9e3 More string extractors 2026-03-16 09:56:48 -07:00
08578c7655 Add nix files 2026-03-16 09:56:48 -07:00
d138b6ac95 /item range requests 2026-03-16 09:56:48 -07:00
24428f956c Stream items in /item 2026-03-16 09:56:48 -07:00
078801be40 Extractor rewrite 2026-03-16 09:56:46 -07:00
6 changed files with 191 additions and 132 deletions

View File

@@ -1,4 +1,4 @@
use pile_config::{ConfigToml, DatasetFts, Label}; use pile_config::{ConfigToml, DatasetFts, Label, objectpath::ObjectPath};
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
value::{Item, PileValue}, value::{Item, PileValue},
@@ -10,7 +10,7 @@ use tantivy::{
query::QueryParser, query::QueryParser,
schema::{self, Schema, Value as TantivyValue}, schema::{self, Schema, Value as TantivyValue},
}; };
use tracing::{debug, trace, warn}; use tracing::warn;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FtsLookupResult { pub struct FtsLookupResult {
@@ -79,18 +79,12 @@ impl DbFtsIndex {
let mut empty = true; let mut empty = true;
for name in self.fts_cfg().fields.keys() { for name in self.fts_cfg().fields.keys() {
let x = self.get_field(state, &item, name).await?; let vals = self.get_field(state, &item, name).await?;
let field = self.schema.get_field(name)?;
let val = match x { for v in vals {
Some(x) => x, empty = false;
None => continue, doc.add_text(field, v);
};
empty = false;
let field = self.schema.get_field(name);
if let Ok(field) = field {
doc.add_text(field, val);
} }
} }
@@ -106,110 +100,29 @@ impl DbFtsIndex {
state: &ExtractState, state: &ExtractState,
extractor: &PileValue, extractor: &PileValue,
field_name: &Label, field_name: &Label,
) -> Result<Option<String>, std::io::Error> { ) -> Result<Vec<String>, std::io::Error> {
let field = match self.cfg.schema.get(field_name) { let field = match self.cfg.schema.get(field_name) {
Some(x) => x, Some(x) => x,
None => { None => {
warn!("Unknown field {field_name:?}"); warn!("Unknown field {field_name:?}");
return Ok(None); return Ok(Vec::new());
} }
}; };
// Try paths in order, using the first value we find // Try paths in order, using the first value we find
'outer: for path in field.path.as_slice() { for path in field.path.as_slice() {
let val = match extractor.query(state, path).await? { let val = match extractor.query(state, path).await? {
Some(x) => x, Some(x) => x,
None => return Ok(None), None => continue,
}; };
let mut val = match val { let val = val_to_string(state, &val, path, field_name).await?;
PileValue::Null => { if !val.is_empty() {
trace!( return Ok(val);
message = "Skipping field, is null",
field = field_name.to_string(),
?path,
// value = ?val
);
continue;
}
x => x.clone(),
};
loop {
val = match val {
PileValue::String(x) => return Ok(Some(x.to_string())),
PileValue::U64(x) => return Ok(Some(x.to_string())),
PileValue::I64(x) => return Ok(Some(x.to_string())),
PileValue::Array(x) => {
if x.len() == 1 {
x[0].clone()
} else if x.len() > 1 {
debug!(
message = "Skipping field, is array with more than one element",
field = field_name.to_string(),
?path,
);
continue 'outer;
} else {
debug!(
message = "Skipping field, is empty array",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
}
PileValue::Null => {
trace!(
message = "Skipping field, is null",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::ObjectExtractor(_) => {
trace!(
message = "Skipping field, is object",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::Item(_) => {
trace!(
message = "Skipping field, is item",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::ListExtractor(_) => {
trace!(
message = "Skipping field, is ListExtractor",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::Blob { .. } => {
trace!(
message = "Skipping field, is blob",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
}
} }
} }
return Ok(None); return Ok(Vec::new());
} }
/// Run the given query on this table's ftx index. /// Run the given query on this table's ftx index.
@@ -298,3 +211,42 @@ impl DbFtsIndex {
return Ok(out); return Ok(out);
} }
} }
async fn val_to_string(
state: &ExtractState,
val: &PileValue,
path: &ObjectPath,
field_name: &str,
) -> Result<Vec<String>, std::io::Error> {
match val {
PileValue::String(x) => return Ok(vec![x.to_string()]),
PileValue::U64(x) => return Ok(vec![x.to_string()]),
PileValue::I64(x) => return Ok(vec![x.to_string()]),
PileValue::Array(x) => {
let mut out = Vec::new();
for x in x.iter() {
out.extend(Box::pin(val_to_string(state, x, path, field_name)).await?);
}
return Ok(out);
}
#[expect(clippy::unwrap_used)]
PileValue::ListExtractor(x) => {
let mut out = Vec::new();
let len = x.len(state).await?;
for i in 0..len {
let v = x.get(state, i).await?;
out.extend(Box::pin(val_to_string(state, &v.unwrap(), path, field_name)).await?);
}
return Ok(out);
}
PileValue::Null => {}
PileValue::ObjectExtractor(_) => {}
PileValue::Item(_) => {}
PileValue::Blob { .. } => {}
}
return Ok(Vec::new());
}

View File

@@ -58,13 +58,15 @@ impl Id3Extractor {
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new(); let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
for frame in tag.frames() { for frame in tag.frames() {
if let Some(text) = frame.content().text() { if let Some(texts) = frame.content().text_values() {
let name = frame_id_to_field(frame.id()); let name = frame_id_to_field(frame.id());
if let Some(key) = Label::new(name) { if let Some(key) = Label::new(name) {
output for text in texts {
.entry(key) output
.or_default() .entry(key.clone())
.push(PileValue::String(Arc::new(text.into()))); .or_default()
.push(PileValue::String(Arc::new(text.into())));
}
} }
} }
} }

View File

@@ -0,0 +1,87 @@
use pile_config::Label;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{AsyncReader, Item, PileValue},
};
fn json_to_pile(value: serde_json::Value) -> PileValue {
match value {
serde_json::Value::Null => PileValue::Null,
serde_json::Value::Bool(b) => PileValue::String(Arc::new(b.to_string().into())),
serde_json::Value::Number(n) => PileValue::String(Arc::new(n.to_string().into())),
serde_json::Value::String(s) => PileValue::String(Arc::new(s.into())),
serde_json::Value::Array(a) => {
PileValue::Array(Arc::new(a.into_iter().map(json_to_pile).collect()))
}
serde_json::Value::Object(_) => PileValue::Null,
}
}
pub struct JsonExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl JsonExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let mut reader = self.item.read().await?;
let bytes = reader.read_to_end().await?;
let json: serde_json::Value = match serde_json::from_slice(&bytes) {
Ok(x) => x,
Err(_) => return Ok(self.output.get_or_init(HashMap::new)),
};
let output: HashMap<Label, PileValue> = match json {
serde_json::Value::Object(map) => map
.into_iter()
.filter_map(|(k, v)| Label::new(&k).map(|label| (label, json_to_pile(v))))
.collect(),
_ => HashMap::new(),
};
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for JsonExtractor {
async fn field(
&self,
state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
if !state.ignore_mime
&& (self.item.mime().type_() != mime::APPLICATION
&& self.item.mime().type_() != mime::TEXT)
{
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -18,6 +18,9 @@ pub use exif::*;
mod pdf; mod pdf;
pub use pdf::*; pub use pdf::*;
mod json;
pub use json::*;
mod toml; mod toml;
use pile_config::Label; use pile_config::Label;
pub use toml::*; pub use toml::*;
@@ -66,6 +69,10 @@ impl ItemExtractor {
Label::new("pdf").unwrap(), Label::new("pdf").unwrap(),
PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))), PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))),
), ),
(
Label::new("json").unwrap(),
PileValue::ObjectExtractor(Arc::new(JsonExtractor::new(item))),
),
( (
Label::new("toml").unwrap(), Label::new("toml").unwrap(),
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))), PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
@@ -101,6 +108,7 @@ impl ObjectExtractor for ItemExtractor {
Label::new("epub").unwrap(), Label::new("epub").unwrap(),
Label::new("exif").unwrap(), Label::new("exif").unwrap(),
Label::new("pdf").unwrap(), Label::new("pdf").unwrap(),
Label::new("json").unwrap(),
Label::new("sidecar").unwrap(), Label::new("sidecar").unwrap(),
]); ]);
} }

View File

@@ -47,14 +47,20 @@ impl DataSource for Arc<DirDataSource> {
source: Arc::clone(self), source: Arc::clone(self),
mime: mime_guess::from_path(&key).first_or_octet_stream(), mime: mime_guess::from_path(&key).first_or_octet_stream(),
path: key.clone(), path: key.clone(),
sidecar: self.sidecars.then(|| { sidecar: self
Box::new(Item::File { .sidecars
source: Arc::clone(self), .then(|| {
mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(), let sidecar_path = key.with_extension("toml");
path: key.with_extension("toml"), sidecar_path.is_file().then(|| {
sidecar: None, Box::new(Item::File {
source: Arc::clone(self),
mime: mime_guess::from_path(&sidecar_path).first_or_octet_stream(),
path: sidecar_path,
sidecar: None,
})
})
}) })
}), .flatten(),
})); }));
} }
@@ -91,15 +97,21 @@ impl DataSource for Arc<DirDataSource> {
mime: mime_guess::from_path(&path).first_or_octet_stream(), mime: mime_guess::from_path(&path).first_or_octet_stream(),
path: path.clone(), path: path.clone(),
sidecar: source.sidecars.then(|| { sidecar: source
Box::new(Item::File { .sidecars
source: Arc::clone(&source), .then(|| {
mime: mime_guess::from_path(path.with_extension("toml")) let sidecar_path = path.with_extension("toml");
.first_or_octet_stream(), sidecar_path.is_file().then(|| {
path: path.with_extension("toml"), Box::new(Item::File {
sidecar: None, source: Arc::clone(&source),
mime: mime_guess::from_path(&sidecar_path)
.first_or_octet_stream(),
path: sidecar_path,
sidecar: None,
})
})
}) })
}), .flatten(),
}, },
}; };

View File

@@ -77,18 +77,16 @@ impl CliCmd for AnnotateCommand {
}; };
let item = PileValue::Item(item.clone()); let item = PileValue::Item(item.clone());
let Some(value) = index let vals =
.get_field(&state, &item, &field) index
.await .get_field(&state, &item, &field)
.with_context(|| { .await
format!("while extracting field from {}", path.display()) .with_context(|| {
})? format!("while extracting field from {}", path.display())
else { })?;
continue;
};
// TODO: implement sidecar writing // TODO: implement sidecar writing
let _ = (&dest_path, &value); let _ = (&dest_path, &vals);
todo!("write_sidecar not yet implemented"); todo!("write_sidecar not yet implemented");
#[expect(unreachable_code)] #[expect(unreachable_code)]