Files
pile/crates/pile/src/command/annotate.rs
2026-03-16 09:56:48 -07:00

111 lines
2.9 KiB
Rust

use anyhow::{Context, Result};
use clap::Args;
use pile_config::{Label, Source};
use pile_dataset::{Datasets, index::DbFtsIndex};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{
extract::traits::ExtractState,
source::{DataSource, DirDataSource},
value::{Item, PileValue},
};
use std::{path::PathBuf, sync::Arc};
use tokio_stream::StreamExt;
use tracing::{info, warn};
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct AnnotateCommand {
/// The schema field to read (must be defined in pile.toml)
field: String,
/// Sidecar path to write to (e.g. meta.title)
dest: String,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
}
impl AnnotateCommand {
fn parse_dest(dest: &str) -> Result<Vec<Label>> {
dest.split('.')
.map(|s| {
Label::new(s).ok_or_else(|| anyhow::anyhow!("invalid label {s:?} in dest path"))
})
.collect()
}
}
impl CliCmd for AnnotateCommand {
async fn run(
self,
_ctx: GlobalContext,
_flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let field = Label::new(&self.field)
.ok_or_else(|| anyhow::anyhow!("invalid field name {:?}", self.field))?;
let dest_path = Self::parse_dest(&self.dest)?;
let state = ExtractState { ignore_mime: false };
let ds = Datasets::open(&self.config)
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
if !ds.config.schema.contains_key(&field) {
return Err(anyhow::anyhow!("field {:?} is not defined in schema", self.field).into());
}
let index = DbFtsIndex::new(&ds.path_workdir, &ds.config);
let count = 0u64;
for (name, source) in &ds.config.dataset.source {
match source {
Source::Filesystem { path, sidecars, .. } => {
if !sidecars {
warn!("Source {name} does not have sidecars enabled, skipping");
continue;
}
let source = Arc::new(DirDataSource::new(name, path.clone(), *sidecars));
let mut stream = source.iter();
while let Some(res) = stream.next().await {
let item = res.with_context(|| format!("while reading source {name}"))?;
let Item::File { path, .. } = &item else {
continue;
};
let item = PileValue::Item(item.clone());
let Some(value) = index
.get_field(&state, &item, &field)
.await
.with_context(|| {
format!("while extracting field from {}", path.display())
})?
else {
continue;
};
// TODO: implement sidecar writing
let _ = (&dest_path, &value);
todo!("write_sidecar not yet implemented");
#[expect(unreachable_code)]
{
count += 1;
}
}
}
Source::S3 { .. } => {
warn!("Source {name} is an S3 source; sidecar annotation is not yet supported");
}
}
}
info!("Annotated {count} items");
return Ok(0);
}
}