diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index b149e09..ccdb38d 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,5 +1,7 @@ use chrono::{DateTime, Utc}; -use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; +use pile_config::{ + ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern, +}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ extract::traits::ExtractState, @@ -66,15 +68,109 @@ impl Dataset { /// An opened dataset: config, working directory, and all opened sources. pub struct Datasets { - pub path_config: PathBuf, + pub path_config: Option, pub path_parent: PathBuf, - pub path_workdir: PathBuf, + pub path_workdir: Option, pub config: ConfigToml, pub sources: HashMap, } impl Datasets { + #[expect(clippy::unwrap_used)] + pub fn virt_source() -> Label { + Label::new("virtual-source").unwrap() + } + + #[expect(clippy::unwrap_used)] + pub async fn virt(parent: impl Into) -> Result { + let path_parent = parent.into(); + + let config = ConfigToml { + dataset: DatasetConfig { + name: Label::new("virtual-dataset").unwrap(), + working_dir: None, + + source: [( + Self::virt_source(), + Source::Filesystem { + enabled: true, + path: path_parent.clone(), + pattern: GroupPattern::default(), + }, + )] + .into_iter() + .collect(), + }, + schema: HashMap::new(), + fts: None, + }; + + let mut sources = HashMap::new(); + for (label, source) in &config.dataset.source { + match source { + Source::Filesystem { + enabled, + path, + pattern, + } => { + if !enabled { + continue; + } + + sources.insert( + label.clone(), + Dataset::Dir( + DirDataSource::new(label, path_parent.join(path), pattern.clone()) + .await?, + ), + ); + } + + Source::S3 { + enabled, + bucket, + prefix, + endpoint, + region, + credentials, + pattern, + } => { + if !enabled { + continue; + } + + match S3DataSource::new( + label, + bucket.clone(), + prefix.clone(), + endpoint.clone(), + region.clone(), + credentials, + pattern.clone(), + ) + .await + { + Ok(ds) => { + sources.insert(label.clone(), Dataset::S3(ds)); + } + Err(err) => { + warn!("Could not open S3 source {label}: {err}"); + } + } + } + } + } + + return Ok(Self { + path_config: None, + path_workdir: None, + path_parent, + config, + sources, + }); + } + pub async fn open(config: impl Into) -> Result { let path_config = config.into(); let path_parent = path_config @@ -168,9 +264,9 @@ impl Datasets { } return Ok(Self { - path_config, + path_config: Some(path_config), + path_workdir: Some(path_workdir), path_parent, - path_workdir, config, sources, }); @@ -216,8 +312,16 @@ impl Datasets { _threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { - let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); - let fts_dir = self.path_workdir.join("fts"); + let workdir = match self.path_workdir.as_ref() { + Some(x) => x, + None => { + warn!("Skipping fts_refresh, no workdir"); + return Ok(()); + } + }; + + let fts_tmp_dir = workdir.join(".tmp-fts"); + let fts_dir = workdir.join("fts"); if fts_tmp_dir.is_dir() { warn!("Removing temporary index in {}", fts_dir.display()); @@ -315,7 +419,15 @@ impl Datasets { query: &str, top_n: usize, ) -> Result, DatasetError> { - let fts_dir = self.path_workdir.join("fts"); + let workdir = match self.path_workdir.as_ref() { + Some(x) => x, + None => { + warn!("Skipping fts_lookup, no workdir"); + return Ok(Vec::new()); + } + }; + + let fts_dir = workdir.join("fts"); if !fts_dir.exists() { return Err(DatasetError::NoFtsIndex); @@ -335,7 +447,12 @@ impl Datasets { /// Time at which fts was created pub fn ts_fts(&self) -> Result>, std::io::Error> { - let fts_dir = self.path_workdir.join("fts"); + let workdir = match self.path_workdir.as_ref() { + Some(x) => x, + None => return Ok(None), + }; + + let fts_dir = workdir.join("fts"); if !fts_dir.exists() { return Ok(None); diff --git a/crates/pile/src/command/item.rs b/crates/pile/src/command/item.rs new file mode 100644 index 0000000..5c9ba9c --- /dev/null +++ b/crates/pile/src/command/item.rs @@ -0,0 +1,71 @@ +use anyhow::{Context, Result}; +use clap::Args; +use pile_config::{Label, objectpath::ObjectPath}; +use pile_dataset::Datasets; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::{extract::traits::ExtractState, value::PileValue}; +use std::path::PathBuf; + +use crate::{CliCmd, GlobalContext}; + +#[derive(Debug, Args)] +pub struct ItemCommand { + /// Source name (as defined in pile.toml) + source: String, + + /// Item key within the source + key: String, + + /// If present, extract a specific field + #[arg(long, short = 'p')] + path: Option, + + /// Path to dataset config + #[arg(long, short = 'c', default_value = "./pile.toml")] + config: PathBuf, +} + +impl CliCmd for ItemCommand { + #[expect(clippy::print_stdout)] + #[expect(clippy::unwrap_used)] + async fn run( + self, + _ctx: GlobalContext, + _flag: CancelFlag, + ) -> Result> { + let source = Label::new(&self.source) + .ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?; + + let ds = Datasets::open(&self.config) + .await + .with_context(|| format!("while opening dataset for {}", self.config.display()))?; + + let state = ExtractState { ignore_mime: false }; + + let json = if let Some(path_str) = self.path { + let path: ObjectPath = path_str + .parse() + .with_context(|| format!("invalid path {path_str:?}"))?; + + ds.get_field(&state, &source, &self.key, &path) + .await + .with_context(|| format!("while extracting {}", self.key))? + .ok_or_else(|| { + anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) + })? + } else { + let item = ds.get(&source, &self.key).await.ok_or_else(|| { + anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) + })?; + + let item = PileValue::Item(item); + item.to_json(&state) + .await + .with_context(|| format!("while extracting {}", self.key))? + }; + + let json = serde_json::to_string_pretty(&json).unwrap(); + println!("{json}"); + return Ok(0); + } +} diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 586519d..e0fe6e6 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -8,6 +8,7 @@ mod check; mod fields; mod index; mod init; +mod item; mod list; mod lookup; mod probe; @@ -59,12 +60,18 @@ pub enum SubCommand { cmd: fields::FieldsCommand, }, - /// Print all metadata from an item + /// Print all metadata from a file Probe { #[command(flatten)] cmd: probe::ProbeCommand, }, + /// Print all metadata from an item + Item { + #[command(flatten)] + cmd: item::ItemCommand, + }, + /// Expose a dataset via an http api Serve { #[command(flatten)] @@ -88,6 +95,7 @@ impl CliCmdDispatch for SubCommand { Self::Lookup { cmd } => cmd.start(ctx), Self::Fields { cmd } => cmd.start(ctx), Self::Probe { cmd } => cmd.start(ctx), + Self::Item { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx), Self::Upload { cmd } => cmd.start(ctx), diff --git a/crates/pile/src/command/probe.rs b/crates/pile/src/command/probe.rs index b29ea00..b14aa76 100644 --- a/crates/pile/src/command/probe.rs +++ b/crates/pile/src/command/probe.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; use clap::Args; -use pile_config::{Label, objectpath::ObjectPath}; +use pile_config::objectpath::ObjectPath; use pile_dataset::Datasets; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{extract::traits::ExtractState, value::PileValue}; @@ -10,19 +10,12 @@ use crate::{CliCmd, GlobalContext}; #[derive(Debug, Args)] pub struct ProbeCommand { - /// Source name (as defined in pile.toml) - source: String, - - /// Item key within the source - key: String, + /// The file to probe + file: PathBuf, /// If present, extract a specific field #[arg(long, short = 'p')] path: Option, - - /// Path to dataset config - #[arg(long, short = 'c', default_value = "./pile.toml")] - config: PathBuf, } impl CliCmd for ProbeCommand { @@ -33,35 +26,37 @@ impl CliCmd for ProbeCommand { _ctx: GlobalContext, _flag: CancelFlag, ) -> Result> { - let source = Label::new(&self.source) - .ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?; - - let ds = Datasets::open(&self.config) + let ds = Datasets::virt(".") .await - .with_context(|| format!("while opening dataset for {}", self.config.display()))?; + .with_context(|| "while opening virtual dataset".to_owned())?; let state = ExtractState { ignore_mime: false }; + let key = self.file.to_str().context("path is not utf-8")?; let json = if let Some(path_str) = self.path { let path: ObjectPath = path_str .parse() .with_context(|| format!("invalid path {path_str:?}"))?; - ds.get_field(&state, &source, &self.key, &path) - .await - .with_context(|| format!("while extracting {}", self.key))? - .ok_or_else(|| { - anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) - })? + ds.get_field( + &state, + &Datasets::virt_source(), + self.file.to_str().context("path is not utf-8")?, + &path, + ) + .await + .with_context(|| format!("while extracting {key}"))? + .ok_or_else(|| anyhow::anyhow!("{key:?} not found"))? } else { - let item = ds.get(&source, &self.key).await.ok_or_else(|| { - anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) - })?; + let item = ds + .get(&Datasets::virt_source(), key) + .await + .ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?; let item = PileValue::Item(item); item.to_json(&state) .await - .with_context(|| format!("while extracting {}", self.key))? + .with_context(|| format!("while extracting {key}"))? }; let json = serde_json::to_string_pretty(&json).unwrap();