From adcf46314f4c7fb999dc407247376d7308604af8 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:50:09 -0700 Subject: [PATCH] Add discovery subcommands `list` and `fields` --- crates/pile-dataset/src/extract/epub/mod.rs | 35 ++- crates/pile-dataset/src/extract/exif.rs | 4 +- crates/pile-dataset/src/extract/pdf/mod.rs | 52 ++--- crates/pile-dataset/src/extract/toml.rs | 9 +- crates/pile-dataset/src/value.rs | 39 ++++ crates/pile/src/command/fields.rs | 228 ++++++++++++++++++++ crates/pile/src/command/list.rs | 107 +++++++++ crates/pile/src/command/mod.rs | 17 ++ 8 files changed, 437 insertions(+), 54 deletions(-) create mode 100644 crates/pile/src/command/fields.rs create mode 100644 crates/pile/src/command/list.rs diff --git a/crates/pile-dataset/src/extract/epub/mod.rs b/crates/pile-dataset/src/extract/epub/mod.rs index eb814d7..ddfdf35 100644 --- a/crates/pile-dataset/src/extract/epub/mod.rs +++ b/crates/pile-dataset/src/extract/epub/mod.rs @@ -1,5 +1,5 @@ use pile_config::Label; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; mod epub_meta; pub use epub_meta::*; @@ -7,39 +7,30 @@ pub use epub_meta::*; mod epub_text; pub use epub_text::*; -use crate::{ - Item, PileValue, - extract::{MapExtractor, ObjectExtractor}, -}; +use crate::{Item, PileValue, extract::ObjectExtractor}; pub struct EpubExtractor { - inner: MapExtractor, + text: Arc, + meta: Arc, } impl EpubExtractor { - #[expect(clippy::unwrap_used)] pub fn new(item: &Item) -> Self { - let inner = MapExtractor { - inner: HashMap::from([ - ( - Label::new("text").unwrap(), - PileValue::ObjectExtractor(Arc::new(EpubTextExtractor::new(item))), - ), - ( - Label::new("meta").unwrap(), - PileValue::ObjectExtractor(Arc::new(EpubMetaExtractor::new(item))), - ), - ]), - }; - - Self { inner } + Self { + text: Arc::new(EpubTextExtractor::new(item)), + meta: Arc::new(EpubMetaExtractor::new(item)), + } } } #[async_trait::async_trait] impl ObjectExtractor for EpubExtractor { async fn field(&self, name: &pile_config::Label) -> Result, std::io::Error> { - self.inner.field(name).await + match name.as_str() { + "text" => self.text.field(name).await, + "meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))), + _ => Ok(None), + } } #[expect(clippy::unwrap_used)] diff --git a/crates/pile-dataset/src/extract/exif.rs b/crates/pile-dataset/src/extract/exif.rs index fef16c1..aa1c54e 100644 --- a/crates/pile-dataset/src/extract/exif.rs +++ b/crates/pile-dataset/src/extract/exif.rs @@ -4,7 +4,7 @@ use std::{ io::BufReader, sync::{Arc, OnceLock}, }; -use tracing::debug; +use tracing::trace; use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor}; @@ -51,7 +51,7 @@ impl ExifExtractor { let raw_fields = match raw_fields { Ok(x) => x, Err(error) => { - debug!(message = "Could not process exif", ?error, key = ?self.item.key()); + trace!(message = "Could not process exif", ?error, key = ?self.item.key()); return Ok(self.output.get_or_init(HashMap::new)); } }; diff --git a/crates/pile-dataset/src/extract/pdf/mod.rs b/crates/pile-dataset/src/extract/pdf/mod.rs index 5934cec..816bb35 100644 --- a/crates/pile-dataset/src/extract/pdf/mod.rs +++ b/crates/pile-dataset/src/extract/pdf/mod.rs @@ -1,5 +1,5 @@ use pile_config::Label; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; #[cfg(feature = "pdfium")] mod pdf_cover; @@ -17,40 +17,26 @@ pub use pdf_meta::*; mod pdf_text; pub use pdf_text::*; -use crate::{ - Item, PileValue, - extract::{MapExtractor, ObjectExtractor}, -}; +use crate::{Item, PileValue, extract::ObjectExtractor}; pub struct PdfExtractor { - inner: MapExtractor, + text: Arc, + meta: Arc, + #[cfg(feature = "pdfium")] + cover: Arc, + #[cfg(feature = "pdfium")] + pages: Arc, } impl PdfExtractor { - #[expect(clippy::unwrap_used)] pub fn new(item: &Item) -> Self { - let mut inner_map = HashMap::new(); - inner_map.insert( - Label::new("text").unwrap(), - PileValue::ObjectExtractor(Arc::new(PdfTextExtractor::new(item))), - ); - inner_map.insert( - Label::new("meta").unwrap(), - PileValue::ObjectExtractor(Arc::new(PdfMetaExtractor::new(item))), - ); - #[cfg(feature = "pdfium")] - inner_map.insert( - Label::new("cover").unwrap(), - PileValue::ObjectExtractor(Arc::new(PdfCoverExtractor::new(item))), - ); - #[cfg(feature = "pdfium")] - inner_map.insert( - Label::new("pages").unwrap(), - PileValue::ListExtractor(Arc::new(PdfPagesExtractor::new(item))), - ); - Self { - inner: MapExtractor { inner: inner_map }, + text: Arc::new(PdfTextExtractor::new(item)), + meta: Arc::new(PdfMetaExtractor::new(item)), + #[cfg(feature = "pdfium")] + cover: Arc::new(PdfCoverExtractor::new(item)), + #[cfg(feature = "pdfium")] + pages: Arc::new(PdfPagesExtractor::new(item)), } } } @@ -58,7 +44,15 @@ impl PdfExtractor { #[async_trait::async_trait] impl ObjectExtractor for PdfExtractor { async fn field(&self, name: &pile_config::Label) -> Result, std::io::Error> { - self.inner.field(name).await + match name.as_str() { + "text" => self.text.field(name).await, + "meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))), + #[cfg(feature = "pdfium")] + "cover" => self.cover.field(name).await, + #[cfg(feature = "pdfium")] + "pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))), + _ => Ok(None), + } } #[expect(clippy::unwrap_used)] diff --git a/crates/pile-dataset/src/extract/toml.rs b/crates/pile-dataset/src/extract/toml.rs index 7260668..abdde13 100644 --- a/crates/pile-dataset/src/extract/toml.rs +++ b/crates/pile-dataset/src/extract/toml.rs @@ -38,7 +38,14 @@ impl TomlExtractor { return Ok(x); } - let bytes = self.item.read().await?.read_to_end().await?; + let mut reader = match self.item.read().await { + Ok(r) => r, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(self.output.get_or_init(HashMap::new)); + } + Err(e) => return Err(e), + }; + let bytes = reader.read_to_end().await?; let toml: toml::Value = match toml::from_slice(&bytes) { Ok(x) => x, Err(_) => return Ok(self.output.get_or_init(HashMap::new)), diff --git a/crates/pile-dataset/src/value.rs b/crates/pile-dataset/src/value.rs index 61cb94c..9d99f22 100644 --- a/crates/pile-dataset/src/value.rs +++ b/crates/pile-dataset/src/value.rs @@ -109,6 +109,45 @@ impl PileValue { return Ok(out.clone()); } + /// Like `to_json`, but counts populated fields instead of collecting values. + /// + /// - Leaf values (non-null scalars, arrays, blobs) contribute `Some(1)`. + /// - `Null` contributes `None`. + /// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with + /// only the fields that had data, or `None` if all fields were absent. + /// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into). + pub async fn count_fields(&self) -> Result, std::io::Error> { + Ok(match self { + Self::Null => None, + + Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => { + Some(Value::Number(1u64.into())) + } + + Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())), + Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())), + + Self::ObjectExtractor(e) => { + let keys = e.fields().await?; + let mut map = Map::new(); + for k in &keys { + let v = match e.field(k).await? { + Some(x) => x, + None => continue, + }; + if let Some(counted) = Box::pin(v.count_fields()).await? { + map.insert(k.to_string(), counted); + } + } + if map.is_empty() { + None + } else { + Some(Value::Object(map)) + } + } + }) + } + pub fn as_str(&self) -> Option<&str> { match self { Self::String(x) => Some(x), diff --git a/crates/pile/src/command/fields.rs b/crates/pile/src/command/fields.rs new file mode 100644 index 0000000..c61f1dd --- /dev/null +++ b/crates/pile/src/command/fields.rs @@ -0,0 +1,228 @@ +use anyhow::{Context, Result}; +use clap::Args; +use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use serde_json::{Map, Value}; +use std::{path::PathBuf, sync::Arc, time::Instant}; +use tokio::task::JoinSet; +use tokio_stream::StreamExt; +use tracing::info; + +use crate::{CliCmd, GlobalContext}; + +#[derive(Debug, Args)] +pub struct FieldsCommand { + /// Path to dataset config + #[arg(long, short = 'c', default_value = "./pile.toml")] + config: PathBuf, + + /// Number of parallel jobs + #[arg(long, short = 'j', default_value = "5")] + jobs: usize, + + /// Print percentages instead of counts + #[arg(long)] + percent: bool, + + /// Do not print fields with a count smaller than this + #[arg(long)] + min_count: Option, + + /// Do not print fields with a count larger than this + #[arg(long)] + max_count: Option, + + /// Do not print fields with a percentage smaller than this + #[arg(long)] + min_percent: Option, + + /// Do not print fields with a percentage larger than this + #[arg(long)] + max_percent: Option, + + /// Restrict to these sources (all sources if empty) + #[arg(long, short = 's')] + source: Vec, +} + +impl CliCmd for FieldsCommand { + #[expect(clippy::print_stdout)] + #[expect(clippy::unwrap_used)] + async fn run( + self, + _ctx: GlobalContext, + flag: CancelFlag, + ) -> Result> { + let ds = Datasets::open(&self.config) + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; + + let start = Instant::now(); + let mut total_counts: Map = Map::new(); + let mut total_items = 0u64; + let jobs = self.jobs.max(1); + + for (name, dataset) in ds.sources.iter().filter(|(name, _)| { + self.source.is_empty() || self.source.iter().any(|s| s == name.as_str()) + }) { + info!("Scanning source {name}"); + let mut stream = dataset.iter(); + let mut join_set: JoinSet>, anyhow::Error>> = + JoinSet::new(); + + loop { + // Drain one completed task when at capacity + if join_set.len() >= jobs + && let Some(result) = join_set.join_next().await + { + let counts = result.context("task panicked")??; + if let Some(delta) = counts { + merge_field_counts(&mut total_counts, delta); + } + total_items += 1; + } + + if flag.is_cancelled() { + join_set.abort_all(); + return Err(CancelableTaskError::Cancelled); + } + + match stream.next().await { + None => break, + Some(item_result) => { + let item = + item_result.with_context(|| format!("while reading source {name}"))?; + let name = name.clone(); + join_set.spawn(async move { + let meta = MetaExtractor::new(&item); + let value = PileValue::ObjectExtractor(Arc::new(meta)); + let result = value.count_fields().await.with_context(|| { + format!("while counting fields in source {name}") + })?; + Ok(result.and_then(|v| { + if let Value::Object(m) = v { + Some(m) + } else { + None + } + })) + }); + } + } + } + + // Drain remaining tasks + while let Some(result) = join_set.join_next().await { + let counts = result.context("task panicked")??; + if let Some(delta) = counts { + merge_field_counts(&mut total_counts, delta); + } + total_items += 1; + } + } + + let time_ms = start.elapsed().as_millis(); + info!("Scanned {total_items} items in {time_ms} ms"); + + let output = if self.percent { + let converted = convert_to_percent( + &total_counts, + total_items, + self.min_percent, + self.max_percent, + ); + Value::Object(converted) + } else { + let filtered = filter_by_count(&total_counts, self.min_count, self.max_count); + Value::Object(filtered) + }; + + let json = serde_json::to_string_pretty(&output).unwrap(); + println!("{json}"); + Ok(0) + } +} + +fn filter_by_count( + map: &Map, + min_count: Option, + max_count: Option, +) -> Map { + let mut out = Map::new(); + for (key, val) in map { + match val { + Value::Number(n) => { + let count = n.as_u64().unwrap_or(0); + if min_count.is_none_or(|m| count >= m) && max_count.is_none_or(|m| count <= m) { + out.insert(key.clone(), val.clone()); + } + } + Value::Object(inner) => { + let filtered = filter_by_count(inner, min_count, max_count); + if !filtered.is_empty() { + out.insert(key.clone(), Value::Object(filtered)); + } + } + _ => {} + } + } + out +} + +fn convert_to_percent( + map: &Map, + total: u64, + min_percent: Option, + max_percent: Option, +) -> Map { + let mut out = Map::new(); + for (key, val) in map { + match val { + Value::Number(n) => { + let count = n.as_u64().unwrap_or(0); + let pct = if total == 0 { + 0.0 + } else { + (count as f64 / total as f64 * 1000.0).round() / 1000.0 + }; + if min_percent.is_none_or(|m| pct >= m) && max_percent.is_none_or(|m| pct <= m) { + out.insert( + key.clone(), + serde_json::Number::from_f64(pct) + .map(Value::Number) + .unwrap_or(Value::Null), + ); + } + } + Value::Object(inner) => { + let converted = convert_to_percent(inner, total, min_percent, max_percent); + if !converted.is_empty() { + out.insert(key.clone(), Value::Object(converted)); + } + } + _ => {} + } + } + out +} + +/// Merge `delta` field counts into `acc`, summing leaf counts and recursing into objects. +fn merge_field_counts(acc: &mut Map, delta: Map) { + for (key, delta_val) in delta { + match delta_val { + Value::Number(n) => { + let entry = acc.entry(key).or_insert_with(|| Value::Number(0u64.into())); + if let Value::Number(cur) = entry { + let new = cur.as_u64().unwrap_or(0) + n.as_u64().unwrap_or(0); + *cur = new.into(); + } + } + Value::Object(inner_delta) => { + let entry = acc.entry(key).or_insert_with(|| Value::Object(Map::new())); + if let Value::Object(inner_acc) = entry { + merge_field_counts(inner_acc, inner_delta); + } + } + _ => {} + } + } +} diff --git a/crates/pile/src/command/list.rs b/crates/pile/src/command/list.rs new file mode 100644 index 0000000..fbcd48b --- /dev/null +++ b/crates/pile/src/command/list.rs @@ -0,0 +1,107 @@ +use anyhow::{Context, Result}; +use clap::Args; +use pile_config::objectpath::ObjectPath; +use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use std::{path::PathBuf, str::FromStr, sync::Arc}; +use tokio::task::JoinSet; +use tokio_stream::StreamExt; +use tracing::info; + +use crate::{CliCmd, GlobalContext}; + +#[derive(Debug, Args)] +pub struct ListCommand { + /// Path to query, e.g. $.flac.artist + path: String, + + /// Only print items where the value is null (inverse of default) + #[arg(long)] + invert: bool, + + /// Path to dataset config + #[arg(long, short = 'c', default_value = "./pile.toml")] + config: PathBuf, + + /// Number of parallel jobs + #[arg(long, short = 'j', default_value = "4")] + jobs: usize, + + /// Restrict to these sources (all sources if empty) + #[arg(long, short = 's')] + source: Vec, +} + +impl CliCmd for ListCommand { + #[expect(clippy::print_stdout)] + async fn run( + self, + _ctx: GlobalContext, + flag: CancelFlag, + ) -> Result> { + let path = ObjectPath::from_str(&self.path) + .with_context(|| format!("invalid path {:?}", self.path))?; + let path = Arc::new(path); + + let ds = Datasets::open(&self.config) + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; + + let jobs = self.jobs.max(1); + + for (name, dataset) in ds.sources.iter().filter(|(name, _)| { + self.source.is_empty() || self.source.iter().any(|s| s == name.as_str()) + }) { + info!("Scanning source {name}"); + let mut stream = dataset.iter(); + let mut join_set: JoinSet, anyhow::Error>> = JoinSet::new(); + + loop { + if join_set.len() >= jobs + && let Some(result) = join_set.join_next().await + && let Some(line) = result.context("task panicked")?? + { + println!("{line}"); + } + + if flag.is_cancelled() { + join_set.abort_all(); + return Err(CancelableTaskError::Cancelled); + } + + match stream.next().await { + None => break, + Some(item_result) => { + let item = + item_result.with_context(|| format!("while reading source {name}"))?; + let source_name = name.to_string(); + let key = item.key().to_string(); + let path = path.clone(); + let invert = self.invert; + + join_set.spawn(async move { + let meta = MetaExtractor::new(&item); + let root = PileValue::ObjectExtractor(Arc::new(meta)); + let value = root.query(&path).await?; + + let is_present = + matches!(value, Some(v) if !matches!(v, PileValue::Null)); + + let should_print = if invert { !is_present } else { is_present }; + + Ok(should_print.then(|| format!("{source_name}\t{key}"))) + }); + } + } + } + + // Drain remaining tasks + while let Some(result) = join_set.join_next().await { + if let Some(line) = result.context("task panicked")?? { + println!("{line}"); + } + } + } + + Ok(0) + } +} diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 7043857..7f1ab46 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -6,8 +6,10 @@ use pile_toolbox::cancelabletask::{ mod annotate; mod check; +mod fields; mod index; mod init; +mod list; mod lookup; mod probe; mod serve; @@ -44,12 +46,25 @@ pub enum SubCommand { cmd: index::IndexCommand, }, + /// List all items that have certain field + List { + #[command(flatten)] + cmd: list::ListCommand, + }, + /// Search all sources + #[clap(alias = "search")] Lookup { #[command(flatten)] cmd: lookup::LookupCommand, }, + /// Print an overview of all fields present in this dataset + Overview { + #[command(flatten)] + cmd: fields::FieldsCommand, + }, + /// Print all metadata from an item Probe { #[command(flatten)] @@ -70,7 +85,9 @@ impl CliCmdDispatch for SubCommand { Self::Init { cmd } => cmd.start(ctx), Self::Check { cmd } => cmd.start(ctx), Self::Index { cmd } => cmd.start(ctx), + Self::List { cmd } => cmd.start(ctx), Self::Lookup { cmd } => cmd.start(ctx), + Self::Overview { cmd } => cmd.start(ctx), Self::Probe { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx),