Add --schema arg to fields command
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
|
use pile_config::objectpath::ObjectPath;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
@@ -40,6 +41,10 @@ pub struct FieldsCommand {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
max_percent: Option<f64>,
|
max_percent: Option<f64>,
|
||||||
|
|
||||||
|
/// Print counts of non-null schema fields instead of raw fields
|
||||||
|
#[arg(long)]
|
||||||
|
schema: bool,
|
||||||
|
|
||||||
/// Restrict to these sources (all sources if empty)
|
/// Restrict to these sources (all sources if empty)
|
||||||
#[arg(long, short = 's')]
|
#[arg(long, short = 's')]
|
||||||
source: Vec<String>,
|
source: Vec<String>,
|
||||||
@@ -67,6 +72,17 @@ impl CliCmd for FieldsCommand {
|
|||||||
let jobs = self.jobs.max(1);
|
let jobs = self.jobs.max(1);
|
||||||
let state = ExtractState { ignore_mime: false };
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
|
// Pre-collect schema fields for the --schema mode
|
||||||
|
let schema_fields: Vec<(String, Vec<ObjectPath>)> = if self.schema {
|
||||||
|
ds.config
|
||||||
|
.schema
|
||||||
|
.iter()
|
||||||
|
.map(|(name, spec)| (name.to_string(), spec.path.clone()))
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
|
||||||
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
||||||
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
||||||
}) {
|
}) {
|
||||||
@@ -98,19 +114,50 @@ impl CliCmd for FieldsCommand {
|
|||||||
let item = item.clone();
|
let item = item.clone();
|
||||||
let name = name.clone();
|
let name = name.clone();
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
join_set.spawn(async move {
|
if self.schema {
|
||||||
let item = PileValue::Item(item);
|
let schema_fields = schema_fields.clone();
|
||||||
let result = item.count_fields(&state).await.with_context(|| {
|
join_set.spawn(async move {
|
||||||
format!("while counting fields in source {name}")
|
let pv = PileValue::Item(item);
|
||||||
})?;
|
let mut counts = Map::new();
|
||||||
Ok(result.and_then(|v| {
|
for (field_name, paths) in &schema_fields {
|
||||||
if let Value::Object(m) = v {
|
let mut present = false;
|
||||||
Some(m)
|
for path in paths {
|
||||||
} else {
|
let v =
|
||||||
None
|
pv.query(&state, path).await.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"while extracting field {field_name} in source {name}"
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if let Some(v) = v
|
||||||
|
&& !matches!(v, PileValue::Null)
|
||||||
|
{
|
||||||
|
present = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
counts.insert(
|
||||||
|
field_name.clone(),
|
||||||
|
Value::Number((present as u64).into()),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}))
|
Ok(Some(counts))
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
join_set.spawn(async move {
|
||||||
|
let item = PileValue::Item(item);
|
||||||
|
let result =
|
||||||
|
item.count_fields(&state).await.with_context(|| {
|
||||||
|
format!("while counting fields in source {name}")
|
||||||
|
})?;
|
||||||
|
Ok(result.and_then(|v| {
|
||||||
|
if let Value::Object(m) = v {
|
||||||
|
Some(m)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user