Add item subcommand

This commit is contained in:
2026-03-21 08:49:48 -07:00
parent 2f2eb323d5
commit ed169b3ab4
4 changed files with 226 additions and 35 deletions

View File

@@ -1,5 +1,7 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; use pile_config::{
ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern,
};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
@@ -66,15 +68,109 @@ impl Dataset {
/// An opened dataset: config, working directory, and all opened sources. /// An opened dataset: config, working directory, and all opened sources.
pub struct Datasets { pub struct Datasets {
pub path_config: PathBuf, pub path_config: Option<PathBuf>,
pub path_parent: PathBuf, pub path_parent: PathBuf,
pub path_workdir: PathBuf, pub path_workdir: Option<PathBuf>,
pub config: ConfigToml, pub config: ConfigToml,
pub sources: HashMap<Label, Dataset>, pub sources: HashMap<Label, Dataset>,
} }
impl Datasets { impl Datasets {
#[expect(clippy::unwrap_used)]
pub fn virt_source() -> Label {
Label::new("virtual-source").unwrap()
}
#[expect(clippy::unwrap_used)]
pub async fn virt(parent: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_parent = parent.into();
let config = ConfigToml {
dataset: DatasetConfig {
name: Label::new("virtual-dataset").unwrap(),
working_dir: None,
source: [(
Self::virt_source(),
Source::Filesystem {
enabled: true,
path: path_parent.clone(),
pattern: GroupPattern::default(),
},
)]
.into_iter()
.collect(),
},
schema: HashMap::new(),
fts: None,
};
let mut sources = HashMap::new();
for (label, source) in &config.dataset.source {
match source {
Source::Filesystem {
enabled,
path,
pattern,
} => {
if !enabled {
continue;
}
sources.insert(
label.clone(),
Dataset::Dir(
DirDataSource::new(label, path_parent.join(path), pattern.clone())
.await?,
),
);
}
Source::S3 {
enabled,
bucket,
prefix,
endpoint,
region,
credentials,
pattern,
} => {
if !enabled {
continue;
}
match S3DataSource::new(
label,
bucket.clone(),
prefix.clone(),
endpoint.clone(),
region.clone(),
credentials,
pattern.clone(),
)
.await
{
Ok(ds) => {
sources.insert(label.clone(), Dataset::S3(ds));
}
Err(err) => {
warn!("Could not open S3 source {label}: {err}");
}
}
}
}
}
return Ok(Self {
path_config: None,
path_workdir: None,
path_parent,
config,
sources,
});
}
pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> { pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_config = config.into(); let path_config = config.into();
let path_parent = path_config let path_parent = path_config
@@ -168,9 +264,9 @@ impl Datasets {
} }
return Ok(Self { return Ok(Self {
path_config, path_config: Some(path_config),
path_workdir: Some(path_workdir),
path_parent, path_parent,
path_workdir,
config, config,
sources, sources,
}); });
@@ -216,8 +312,16 @@ impl Datasets {
_threads: usize, _threads: usize,
flag: Option<CancelFlag>, flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> { ) -> Result<(), CancelableTaskError<DatasetError>> {
let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); let workdir = match self.path_workdir.as_ref() {
let fts_dir = self.path_workdir.join("fts"); Some(x) => x,
None => {
warn!("Skipping fts_refresh, no workdir");
return Ok(());
}
};
let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts");
if fts_tmp_dir.is_dir() { if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display()); warn!("Removing temporary index in {}", fts_dir.display());
@@ -315,7 +419,15 @@ impl Datasets {
query: &str, query: &str,
top_n: usize, top_n: usize,
) -> Result<Vec<FtsLookupResult>, DatasetError> { ) -> Result<Vec<FtsLookupResult>, DatasetError> {
let fts_dir = self.path_workdir.join("fts"); let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => {
warn!("Skipping fts_lookup, no workdir");
return Ok(Vec::new());
}
};
let fts_dir = workdir.join("fts");
if !fts_dir.exists() { if !fts_dir.exists() {
return Err(DatasetError::NoFtsIndex); return Err(DatasetError::NoFtsIndex);
@@ -335,7 +447,12 @@ impl Datasets {
/// Time at which fts was created /// Time at which fts was created
pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> { pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let fts_dir = self.path_workdir.join("fts"); let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => return Ok(None),
};
let fts_dir = workdir.join("fts");
if !fts_dir.exists() { if !fts_dir.exists() {
return Ok(None); return Ok(None);

View File

@@ -0,0 +1,71 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{extract::traits::ExtractState, value::PileValue};
use std::path::PathBuf;
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ItemCommand {
/// Source name (as defined in pile.toml)
source: String,
/// Item key within the source
key: String,
/// If present, extract a specific field
#[arg(long, short = 'p')]
path: Option<String>,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
}
impl CliCmd for ItemCommand {
#[expect(clippy::print_stdout)]
#[expect(clippy::unwrap_used)]
async fn run(
self,
_ctx: GlobalContext,
_flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let source = Label::new(&self.source)
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };
let json = if let Some(path_str) = self.path {
let path: ObjectPath = path_str
.parse()
.with_context(|| format!("invalid path {path_str:?}"))?;
ds.get_field(&state, &source, &self.key, &path)
.await
.with_context(|| format!("while extracting {}", self.key))?
.ok_or_else(|| {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?
} else {
let item = ds.get(&source, &self.key).await.ok_or_else(|| {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?;
let item = PileValue::Item(item);
item.to_json(&state)
.await
.with_context(|| format!("while extracting {}", self.key))?
};
let json = serde_json::to_string_pretty(&json).unwrap();
println!("{json}");
return Ok(0);
}
}

View File

@@ -8,6 +8,7 @@ mod check;
mod fields; mod fields;
mod index; mod index;
mod init; mod init;
mod item;
mod list; mod list;
mod lookup; mod lookup;
mod probe; mod probe;
@@ -59,12 +60,18 @@ pub enum SubCommand {
cmd: fields::FieldsCommand, cmd: fields::FieldsCommand,
}, },
/// Print all metadata from an item /// Print all metadata from a file
Probe { Probe {
#[command(flatten)] #[command(flatten)]
cmd: probe::ProbeCommand, cmd: probe::ProbeCommand,
}, },
/// Print all metadata from an item
Item {
#[command(flatten)]
cmd: item::ItemCommand,
},
/// Expose a dataset via an http api /// Expose a dataset via an http api
Serve { Serve {
#[command(flatten)] #[command(flatten)]
@@ -88,6 +95,7 @@ impl CliCmdDispatch for SubCommand {
Self::Lookup { cmd } => cmd.start(ctx), Self::Lookup { cmd } => cmd.start(ctx),
Self::Fields { cmd } => cmd.start(ctx), Self::Fields { cmd } => cmd.start(ctx),
Self::Probe { cmd } => cmd.start(ctx), Self::Probe { cmd } => cmd.start(ctx),
Self::Item { cmd } => cmd.start(ctx),
Self::Serve { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx),
Self::Upload { cmd } => cmd.start(ctx), Self::Upload { cmd } => cmd.start(ctx),

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::Args; use clap::Args;
use pile_config::{Label, objectpath::ObjectPath}; use pile_config::objectpath::ObjectPath;
use pile_dataset::Datasets; use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{extract::traits::ExtractState, value::PileValue}; use pile_value::{extract::traits::ExtractState, value::PileValue};
@@ -10,19 +10,12 @@ use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)] #[derive(Debug, Args)]
pub struct ProbeCommand { pub struct ProbeCommand {
/// Source name (as defined in pile.toml) /// The file to probe
source: String, file: PathBuf,
/// Item key within the source
key: String,
/// If present, extract a specific field /// If present, extract a specific field
#[arg(long, short = 'p')] #[arg(long, short = 'p')]
path: Option<String>, path: Option<String>,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
} }
impl CliCmd for ProbeCommand { impl CliCmd for ProbeCommand {
@@ -33,35 +26,37 @@ impl CliCmd for ProbeCommand {
_ctx: GlobalContext, _ctx: GlobalContext,
_flag: CancelFlag, _flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> { ) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let source = Label::new(&self.source) let ds = Datasets::virt(".")
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
let ds = Datasets::open(&self.config)
.await .await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?; .with_context(|| "while opening virtual dataset".to_owned())?;
let state = ExtractState { ignore_mime: false }; let state = ExtractState { ignore_mime: false };
let key = self.file.to_str().context("path is not utf-8")?;
let json = if let Some(path_str) = self.path { let json = if let Some(path_str) = self.path {
let path: ObjectPath = path_str let path: ObjectPath = path_str
.parse() .parse()
.with_context(|| format!("invalid path {path_str:?}"))?; .with_context(|| format!("invalid path {path_str:?}"))?;
ds.get_field(&state, &source, &self.key, &path) ds.get_field(
&state,
&Datasets::virt_source(),
self.file.to_str().context("path is not utf-8")?,
&path,
)
.await .await
.with_context(|| format!("while extracting {}", self.key))? .with_context(|| format!("while extracting {key}"))?
.ok_or_else(|| { .ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?
} else { } else {
let item = ds.get(&source, &self.key).await.ok_or_else(|| { let item = ds
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) .get(&Datasets::virt_source(), key)
})?; .await
.ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?;
let item = PileValue::Item(item); let item = PileValue::Item(item);
item.to_json(&state) item.to_json(&state)
.await .await
.with_context(|| format!("while extracting {}", self.key))? .with_context(|| format!("while extracting {key}"))?
}; };
let json = serde_json::to_string_pretty(&json).unwrap(); let json = serde_json::to_string_pretty(&json).unwrap();