Add discovery subcommands list and fields
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
mod epub_meta;
|
mod epub_meta;
|
||||||
pub use epub_meta::*;
|
pub use epub_meta::*;
|
||||||
@@ -7,39 +7,30 @@ pub use epub_meta::*;
|
|||||||
mod epub_text;
|
mod epub_text;
|
||||||
pub use epub_text::*;
|
pub use epub_text::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{Item, PileValue, extract::ObjectExtractor};
|
||||||
Item, PileValue,
|
|
||||||
extract::{MapExtractor, ObjectExtractor},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct EpubExtractor {
|
pub struct EpubExtractor {
|
||||||
inner: MapExtractor,
|
text: Arc<EpubTextExtractor>,
|
||||||
|
meta: Arc<EpubMetaExtractor>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EpubExtractor {
|
impl EpubExtractor {
|
||||||
#[expect(clippy::unwrap_used)]
|
|
||||||
pub fn new(item: &Item) -> Self {
|
pub fn new(item: &Item) -> Self {
|
||||||
let inner = MapExtractor {
|
Self {
|
||||||
inner: HashMap::from([
|
text: Arc::new(EpubTextExtractor::new(item)),
|
||||||
(
|
meta: Arc::new(EpubMetaExtractor::new(item)),
|
||||||
Label::new("text").unwrap(),
|
}
|
||||||
PileValue::ObjectExtractor(Arc::new(EpubTextExtractor::new(item))),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
Label::new("meta").unwrap(),
|
|
||||||
PileValue::ObjectExtractor(Arc::new(EpubMetaExtractor::new(item))),
|
|
||||||
),
|
|
||||||
]),
|
|
||||||
};
|
|
||||||
|
|
||||||
Self { inner }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ObjectExtractor for EpubExtractor {
|
impl ObjectExtractor for EpubExtractor {
|
||||||
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
self.inner.field(name).await
|
match name.as_str() {
|
||||||
|
"text" => self.text.field(name).await,
|
||||||
|
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::{
|
|||||||
io::BufReader,
|
io::BufReader,
|
||||||
sync::{Arc, OnceLock},
|
sync::{Arc, OnceLock},
|
||||||
};
|
};
|
||||||
use tracing::debug;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
|
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
|
||||||
|
|
||||||
@@ -51,7 +51,7 @@ impl ExifExtractor {
|
|||||||
let raw_fields = match raw_fields {
|
let raw_fields = match raw_fields {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
debug!(message = "Could not process exif", ?error, key = ?self.item.key());
|
trace!(message = "Could not process exif", ?error, key = ?self.item.key());
|
||||||
return Ok(self.output.get_or_init(HashMap::new));
|
return Ok(self.output.get_or_init(HashMap::new));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[cfg(feature = "pdfium")]
|
#[cfg(feature = "pdfium")]
|
||||||
mod pdf_cover;
|
mod pdf_cover;
|
||||||
@@ -17,40 +17,26 @@ pub use pdf_meta::*;
|
|||||||
mod pdf_text;
|
mod pdf_text;
|
||||||
pub use pdf_text::*;
|
pub use pdf_text::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{Item, PileValue, extract::ObjectExtractor};
|
||||||
Item, PileValue,
|
|
||||||
extract::{MapExtractor, ObjectExtractor},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct PdfExtractor {
|
pub struct PdfExtractor {
|
||||||
inner: MapExtractor,
|
text: Arc<PdfTextExtractor>,
|
||||||
|
meta: Arc<PdfMetaExtractor>,
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
cover: Arc<PdfCoverExtractor>,
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
pages: Arc<PdfPagesExtractor>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PdfExtractor {
|
impl PdfExtractor {
|
||||||
#[expect(clippy::unwrap_used)]
|
|
||||||
pub fn new(item: &Item) -> Self {
|
pub fn new(item: &Item) -> Self {
|
||||||
let mut inner_map = HashMap::new();
|
|
||||||
inner_map.insert(
|
|
||||||
Label::new("text").unwrap(),
|
|
||||||
PileValue::ObjectExtractor(Arc::new(PdfTextExtractor::new(item))),
|
|
||||||
);
|
|
||||||
inner_map.insert(
|
|
||||||
Label::new("meta").unwrap(),
|
|
||||||
PileValue::ObjectExtractor(Arc::new(PdfMetaExtractor::new(item))),
|
|
||||||
);
|
|
||||||
#[cfg(feature = "pdfium")]
|
|
||||||
inner_map.insert(
|
|
||||||
Label::new("cover").unwrap(),
|
|
||||||
PileValue::ObjectExtractor(Arc::new(PdfCoverExtractor::new(item))),
|
|
||||||
);
|
|
||||||
#[cfg(feature = "pdfium")]
|
|
||||||
inner_map.insert(
|
|
||||||
Label::new("pages").unwrap(),
|
|
||||||
PileValue::ListExtractor(Arc::new(PdfPagesExtractor::new(item))),
|
|
||||||
);
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
inner: MapExtractor { inner: inner_map },
|
text: Arc::new(PdfTextExtractor::new(item)),
|
||||||
|
meta: Arc::new(PdfMetaExtractor::new(item)),
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
cover: Arc::new(PdfCoverExtractor::new(item)),
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
pages: Arc::new(PdfPagesExtractor::new(item)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -58,7 +44,15 @@ impl PdfExtractor {
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ObjectExtractor for PdfExtractor {
|
impl ObjectExtractor for PdfExtractor {
|
||||||
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
self.inner.field(name).await
|
match name.as_str() {
|
||||||
|
"text" => self.text.field(name).await,
|
||||||
|
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
"cover" => self.cover.field(name).await,
|
||||||
|
#[cfg(feature = "pdfium")]
|
||||||
|
"pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
|
|||||||
@@ -38,7 +38,14 @@ impl TomlExtractor {
|
|||||||
return Ok(x);
|
return Ok(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
let bytes = self.item.read().await?.read_to_end().await?;
|
let mut reader = match self.item.read().await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||||
|
return Ok(self.output.get_or_init(HashMap::new));
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
let bytes = reader.read_to_end().await?;
|
||||||
let toml: toml::Value = match toml::from_slice(&bytes) {
|
let toml: toml::Value = match toml::from_slice(&bytes) {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(_) => return Ok(self.output.get_or_init(HashMap::new)),
|
Err(_) => return Ok(self.output.get_or_init(HashMap::new)),
|
||||||
|
|||||||
@@ -109,6 +109,45 @@ impl PileValue {
|
|||||||
return Ok(out.clone());
|
return Ok(out.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Like `to_json`, but counts populated fields instead of collecting values.
|
||||||
|
///
|
||||||
|
/// - Leaf values (non-null scalars, arrays, blobs) contribute `Some(1)`.
|
||||||
|
/// - `Null` contributes `None`.
|
||||||
|
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
|
||||||
|
/// only the fields that had data, or `None` if all fields were absent.
|
||||||
|
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
|
||||||
|
pub async fn count_fields(&self) -> Result<Option<Value>, std::io::Error> {
|
||||||
|
Ok(match self {
|
||||||
|
Self::Null => None,
|
||||||
|
|
||||||
|
Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => {
|
||||||
|
Some(Value::Number(1u64.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
|
||||||
|
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
|
||||||
|
|
||||||
|
Self::ObjectExtractor(e) => {
|
||||||
|
let keys = e.fields().await?;
|
||||||
|
let mut map = Map::new();
|
||||||
|
for k in &keys {
|
||||||
|
let v = match e.field(k).await? {
|
||||||
|
Some(x) => x,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
if let Some(counted) = Box::pin(v.count_fields()).await? {
|
||||||
|
map.insert(k.to_string(), counted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if map.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Value::Object(map))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn as_str(&self) -> Option<&str> {
|
pub fn as_str(&self) -> Option<&str> {
|
||||||
match self {
|
match self {
|
||||||
Self::String(x) => Some(x),
|
Self::String(x) => Some(x),
|
||||||
|
|||||||
228
crates/pile/src/command/fields.rs
Normal file
228
crates/pile/src/command/fields.rs
Normal file
@@ -0,0 +1,228 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use clap::Args;
|
||||||
|
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
|
||||||
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use serde_json::{Map, Value};
|
||||||
|
use std::{path::PathBuf, sync::Arc, time::Instant};
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{CliCmd, GlobalContext};
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct FieldsCommand {
|
||||||
|
/// Path to dataset config
|
||||||
|
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||||
|
config: PathBuf,
|
||||||
|
|
||||||
|
/// Number of parallel jobs
|
||||||
|
#[arg(long, short = 'j', default_value = "5")]
|
||||||
|
jobs: usize,
|
||||||
|
|
||||||
|
/// Print percentages instead of counts
|
||||||
|
#[arg(long)]
|
||||||
|
percent: bool,
|
||||||
|
|
||||||
|
/// Do not print fields with a count smaller than this
|
||||||
|
#[arg(long)]
|
||||||
|
min_count: Option<u64>,
|
||||||
|
|
||||||
|
/// Do not print fields with a count larger than this
|
||||||
|
#[arg(long)]
|
||||||
|
max_count: Option<u64>,
|
||||||
|
|
||||||
|
/// Do not print fields with a percentage smaller than this
|
||||||
|
#[arg(long)]
|
||||||
|
min_percent: Option<f64>,
|
||||||
|
|
||||||
|
/// Do not print fields with a percentage larger than this
|
||||||
|
#[arg(long)]
|
||||||
|
max_percent: Option<f64>,
|
||||||
|
|
||||||
|
/// Restrict to these sources (all sources if empty)
|
||||||
|
#[arg(long, short = 's')]
|
||||||
|
source: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CliCmd for FieldsCommand {
|
||||||
|
#[expect(clippy::print_stdout)]
|
||||||
|
#[expect(clippy::unwrap_used)]
|
||||||
|
async fn run(
|
||||||
|
self,
|
||||||
|
_ctx: GlobalContext,
|
||||||
|
flag: CancelFlag,
|
||||||
|
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||||
|
let ds = Datasets::open(&self.config)
|
||||||
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut total_counts: Map<String, Value> = Map::new();
|
||||||
|
let mut total_items = 0u64;
|
||||||
|
let jobs = self.jobs.max(1);
|
||||||
|
|
||||||
|
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
||||||
|
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
||||||
|
}) {
|
||||||
|
info!("Scanning source {name}");
|
||||||
|
let mut stream = dataset.iter();
|
||||||
|
let mut join_set: JoinSet<Result<Option<Map<String, Value>>, anyhow::Error>> =
|
||||||
|
JoinSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Drain one completed task when at capacity
|
||||||
|
if join_set.len() >= jobs
|
||||||
|
&& let Some(result) = join_set.join_next().await
|
||||||
|
{
|
||||||
|
let counts = result.context("task panicked")??;
|
||||||
|
if let Some(delta) = counts {
|
||||||
|
merge_field_counts(&mut total_counts, delta);
|
||||||
|
}
|
||||||
|
total_items += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if flag.is_cancelled() {
|
||||||
|
join_set.abort_all();
|
||||||
|
return Err(CancelableTaskError::Cancelled);
|
||||||
|
}
|
||||||
|
|
||||||
|
match stream.next().await {
|
||||||
|
None => break,
|
||||||
|
Some(item_result) => {
|
||||||
|
let item =
|
||||||
|
item_result.with_context(|| format!("while reading source {name}"))?;
|
||||||
|
let name = name.clone();
|
||||||
|
join_set.spawn(async move {
|
||||||
|
let meta = MetaExtractor::new(&item);
|
||||||
|
let value = PileValue::ObjectExtractor(Arc::new(meta));
|
||||||
|
let result = value.count_fields().await.with_context(|| {
|
||||||
|
format!("while counting fields in source {name}")
|
||||||
|
})?;
|
||||||
|
Ok(result.and_then(|v| {
|
||||||
|
if let Value::Object(m) = v {
|
||||||
|
Some(m)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain remaining tasks
|
||||||
|
while let Some(result) = join_set.join_next().await {
|
||||||
|
let counts = result.context("task panicked")??;
|
||||||
|
if let Some(delta) = counts {
|
||||||
|
merge_field_counts(&mut total_counts, delta);
|
||||||
|
}
|
||||||
|
total_items += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let time_ms = start.elapsed().as_millis();
|
||||||
|
info!("Scanned {total_items} items in {time_ms} ms");
|
||||||
|
|
||||||
|
let output = if self.percent {
|
||||||
|
let converted = convert_to_percent(
|
||||||
|
&total_counts,
|
||||||
|
total_items,
|
||||||
|
self.min_percent,
|
||||||
|
self.max_percent,
|
||||||
|
);
|
||||||
|
Value::Object(converted)
|
||||||
|
} else {
|
||||||
|
let filtered = filter_by_count(&total_counts, self.min_count, self.max_count);
|
||||||
|
Value::Object(filtered)
|
||||||
|
};
|
||||||
|
|
||||||
|
let json = serde_json::to_string_pretty(&output).unwrap();
|
||||||
|
println!("{json}");
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter_by_count(
|
||||||
|
map: &Map<String, Value>,
|
||||||
|
min_count: Option<u64>,
|
||||||
|
max_count: Option<u64>,
|
||||||
|
) -> Map<String, Value> {
|
||||||
|
let mut out = Map::new();
|
||||||
|
for (key, val) in map {
|
||||||
|
match val {
|
||||||
|
Value::Number(n) => {
|
||||||
|
let count = n.as_u64().unwrap_or(0);
|
||||||
|
if min_count.is_none_or(|m| count >= m) && max_count.is_none_or(|m| count <= m) {
|
||||||
|
out.insert(key.clone(), val.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Value::Object(inner) => {
|
||||||
|
let filtered = filter_by_count(inner, min_count, max_count);
|
||||||
|
if !filtered.is_empty() {
|
||||||
|
out.insert(key.clone(), Value::Object(filtered));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
fn convert_to_percent(
|
||||||
|
map: &Map<String, Value>,
|
||||||
|
total: u64,
|
||||||
|
min_percent: Option<f64>,
|
||||||
|
max_percent: Option<f64>,
|
||||||
|
) -> Map<String, Value> {
|
||||||
|
let mut out = Map::new();
|
||||||
|
for (key, val) in map {
|
||||||
|
match val {
|
||||||
|
Value::Number(n) => {
|
||||||
|
let count = n.as_u64().unwrap_or(0);
|
||||||
|
let pct = if total == 0 {
|
||||||
|
0.0
|
||||||
|
} else {
|
||||||
|
(count as f64 / total as f64 * 1000.0).round() / 1000.0
|
||||||
|
};
|
||||||
|
if min_percent.is_none_or(|m| pct >= m) && max_percent.is_none_or(|m| pct <= m) {
|
||||||
|
out.insert(
|
||||||
|
key.clone(),
|
||||||
|
serde_json::Number::from_f64(pct)
|
||||||
|
.map(Value::Number)
|
||||||
|
.unwrap_or(Value::Null),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Value::Object(inner) => {
|
||||||
|
let converted = convert_to_percent(inner, total, min_percent, max_percent);
|
||||||
|
if !converted.is_empty() {
|
||||||
|
out.insert(key.clone(), Value::Object(converted));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Merge `delta` field counts into `acc`, summing leaf counts and recursing into objects.
|
||||||
|
fn merge_field_counts(acc: &mut Map<String, Value>, delta: Map<String, Value>) {
|
||||||
|
for (key, delta_val) in delta {
|
||||||
|
match delta_val {
|
||||||
|
Value::Number(n) => {
|
||||||
|
let entry = acc.entry(key).or_insert_with(|| Value::Number(0u64.into()));
|
||||||
|
if let Value::Number(cur) = entry {
|
||||||
|
let new = cur.as_u64().unwrap_or(0) + n.as_u64().unwrap_or(0);
|
||||||
|
*cur = new.into();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Value::Object(inner_delta) => {
|
||||||
|
let entry = acc.entry(key).or_insert_with(|| Value::Object(Map::new()));
|
||||||
|
if let Value::Object(inner_acc) = entry {
|
||||||
|
merge_field_counts(inner_acc, inner_delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
107
crates/pile/src/command/list.rs
Normal file
107
crates/pile/src/command/list.rs
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use clap::Args;
|
||||||
|
use pile_config::objectpath::ObjectPath;
|
||||||
|
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
|
||||||
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use std::{path::PathBuf, str::FromStr, sync::Arc};
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{CliCmd, GlobalContext};
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct ListCommand {
|
||||||
|
/// Path to query, e.g. $.flac.artist
|
||||||
|
path: String,
|
||||||
|
|
||||||
|
/// Only print items where the value is null (inverse of default)
|
||||||
|
#[arg(long)]
|
||||||
|
invert: bool,
|
||||||
|
|
||||||
|
/// Path to dataset config
|
||||||
|
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||||
|
config: PathBuf,
|
||||||
|
|
||||||
|
/// Number of parallel jobs
|
||||||
|
#[arg(long, short = 'j', default_value = "4")]
|
||||||
|
jobs: usize,
|
||||||
|
|
||||||
|
/// Restrict to these sources (all sources if empty)
|
||||||
|
#[arg(long, short = 's')]
|
||||||
|
source: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CliCmd for ListCommand {
|
||||||
|
#[expect(clippy::print_stdout)]
|
||||||
|
async fn run(
|
||||||
|
self,
|
||||||
|
_ctx: GlobalContext,
|
||||||
|
flag: CancelFlag,
|
||||||
|
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||||
|
let path = ObjectPath::from_str(&self.path)
|
||||||
|
.with_context(|| format!("invalid path {:?}", self.path))?;
|
||||||
|
let path = Arc::new(path);
|
||||||
|
|
||||||
|
let ds = Datasets::open(&self.config)
|
||||||
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let jobs = self.jobs.max(1);
|
||||||
|
|
||||||
|
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
||||||
|
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
||||||
|
}) {
|
||||||
|
info!("Scanning source {name}");
|
||||||
|
let mut stream = dataset.iter();
|
||||||
|
let mut join_set: JoinSet<Result<Option<String>, anyhow::Error>> = JoinSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if join_set.len() >= jobs
|
||||||
|
&& let Some(result) = join_set.join_next().await
|
||||||
|
&& let Some(line) = result.context("task panicked")??
|
||||||
|
{
|
||||||
|
println!("{line}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if flag.is_cancelled() {
|
||||||
|
join_set.abort_all();
|
||||||
|
return Err(CancelableTaskError::Cancelled);
|
||||||
|
}
|
||||||
|
|
||||||
|
match stream.next().await {
|
||||||
|
None => break,
|
||||||
|
Some(item_result) => {
|
||||||
|
let item =
|
||||||
|
item_result.with_context(|| format!("while reading source {name}"))?;
|
||||||
|
let source_name = name.to_string();
|
||||||
|
let key = item.key().to_string();
|
||||||
|
let path = path.clone();
|
||||||
|
let invert = self.invert;
|
||||||
|
|
||||||
|
join_set.spawn(async move {
|
||||||
|
let meta = MetaExtractor::new(&item);
|
||||||
|
let root = PileValue::ObjectExtractor(Arc::new(meta));
|
||||||
|
let value = root.query(&path).await?;
|
||||||
|
|
||||||
|
let is_present =
|
||||||
|
matches!(value, Some(v) if !matches!(v, PileValue::Null));
|
||||||
|
|
||||||
|
let should_print = if invert { !is_present } else { is_present };
|
||||||
|
|
||||||
|
Ok(should_print.then(|| format!("{source_name}\t{key}")))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain remaining tasks
|
||||||
|
while let Some(result) = join_set.join_next().await {
|
||||||
|
if let Some(line) = result.context("task panicked")?? {
|
||||||
|
println!("{line}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,8 +6,10 @@ use pile_toolbox::cancelabletask::{
|
|||||||
|
|
||||||
mod annotate;
|
mod annotate;
|
||||||
mod check;
|
mod check;
|
||||||
|
mod fields;
|
||||||
mod index;
|
mod index;
|
||||||
mod init;
|
mod init;
|
||||||
|
mod list;
|
||||||
mod lookup;
|
mod lookup;
|
||||||
mod probe;
|
mod probe;
|
||||||
mod serve;
|
mod serve;
|
||||||
@@ -44,12 +46,25 @@ pub enum SubCommand {
|
|||||||
cmd: index::IndexCommand,
|
cmd: index::IndexCommand,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// List all items that have certain field
|
||||||
|
List {
|
||||||
|
#[command(flatten)]
|
||||||
|
cmd: list::ListCommand,
|
||||||
|
},
|
||||||
|
|
||||||
/// Search all sources
|
/// Search all sources
|
||||||
|
#[clap(alias = "search")]
|
||||||
Lookup {
|
Lookup {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
cmd: lookup::LookupCommand,
|
cmd: lookup::LookupCommand,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Print an overview of all fields present in this dataset
|
||||||
|
Overview {
|
||||||
|
#[command(flatten)]
|
||||||
|
cmd: fields::FieldsCommand,
|
||||||
|
},
|
||||||
|
|
||||||
/// Print all metadata from an item
|
/// Print all metadata from an item
|
||||||
Probe {
|
Probe {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
@@ -70,7 +85,9 @@ impl CliCmdDispatch for SubCommand {
|
|||||||
Self::Init { cmd } => cmd.start(ctx),
|
Self::Init { cmd } => cmd.start(ctx),
|
||||||
Self::Check { cmd } => cmd.start(ctx),
|
Self::Check { cmd } => cmd.start(ctx),
|
||||||
Self::Index { cmd } => cmd.start(ctx),
|
Self::Index { cmd } => cmd.start(ctx),
|
||||||
|
Self::List { cmd } => cmd.start(ctx),
|
||||||
Self::Lookup { cmd } => cmd.start(ctx),
|
Self::Lookup { cmd } => cmd.start(ctx),
|
||||||
|
Self::Overview { cmd } => cmd.start(ctx),
|
||||||
Self::Probe { cmd } => cmd.start(ctx),
|
Self::Probe { cmd } => cmd.start(ctx),
|
||||||
Self::Serve { cmd } => cmd.start(ctx),
|
Self::Serve { cmd } => cmd.start(ctx),
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user