Compare commits

...

9 Commits

Author SHA1 Message Date
8a8e0a2770 Fix s3 source
All checks were successful
CI / Typos (push) Successful in 29s
CI / Clippy (push) Successful in 1m19s
CI / Build and test (all features) (push) Successful in 4m42s
CI / Build and test (push) Successful in 5m57s
2026-03-21 10:50:04 -07:00
c2f4b12e35 Transparent Nulls 2026-03-21 10:29:01 -07:00
302d2acef3 Slice arrays 2026-03-21 10:20:41 -07:00
7caf2553bc Get fields in item cmd 2026-03-21 10:20:41 -07:00
44466f16cf Tweak fs extractor 2026-03-21 10:20:39 -07:00
48262bab48 Exclude large strings 2026-03-21 09:32:22 -07:00
b6cb5870b4 Add regex extractor 2026-03-21 09:27:12 -07:00
ed169b3ab4 Add item subcommand 2026-03-21 08:49:48 -07:00
2f2eb323d5 Add text extractor 2026-03-18 20:47:29 -07:00
23 changed files with 811 additions and 56 deletions

1
Cargo.lock generated
View File

@@ -2574,6 +2574,7 @@ dependencies = [
"pdfium-render", "pdfium-render",
"pile-config", "pile-config",
"pile-flac", "pile-flac",
"regex",
"serde_json", "serde_json",
"smartstring", "smartstring",
"tokio", "tokio",

View File

@@ -127,6 +127,7 @@ mime = "0.3.17"
mime_guess = "2.0.5" mime_guess = "2.0.5"
paste = "1.0.15" paste = "1.0.15"
smartstring = "1.0.1" smartstring = "1.0.1"
regex = "1"
chrono = "0.4.43" chrono = "0.4.43"
parking_lot = "0.12.5" parking_lot = "0.12.5"
rayon = "1.11.0" rayon = "1.11.0"

View File

@@ -1,7 +1,7 @@
use std::{fmt, str::FromStr}; use std::{fmt, str::FromStr};
use serde::{ use serde::{
Deserialize, Deserializer, Deserialize, Deserializer, Serialize, Serializer,
de::{self, Visitor}, de::{self, Visitor},
}; };
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
@@ -49,6 +49,13 @@ pub enum PathSegment {
/// Go to an element of the current list /// Go to an element of the current list
Index(i64), Index(i64),
/// Go to a slice of the current list
Range {
start: i64,
end: i64,
inclusive: bool,
},
} }
/// A path to aPathSegment::Field inside a nested object, /// A path to aPathSegment::Field inside a nested object,
@@ -63,6 +70,39 @@ pub struct ObjectPath {
pub segments: Vec<PathSegment>, pub segments: Vec<PathSegment>,
} }
impl fmt::Display for ObjectPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for seg in &self.segments {
match seg {
PathSegment::Root => write!(f, "$")?,
PathSegment::Field { name, args: None } => write!(f, ".{name}")?,
PathSegment::Field {
name,
args: Some(a),
} => write!(f, ".{name}({a})")?,
PathSegment::Index(i) => write!(f, "[{i}]")?,
PathSegment::Range {
start,
end,
inclusive: false,
} => write!(f, "[{start}..{end}]")?,
PathSegment::Range {
start,
end,
inclusive: true,
} => write!(f, "[{start}..={end}]")?,
}
}
Ok(())
}
}
impl Serialize for ObjectPath {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for ObjectPath { impl<'de> Deserialize<'de> for ObjectPath {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> { fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct PathVisitor; struct PathVisitor;

View File

@@ -87,6 +87,15 @@ enum State {
/// We are indexing an array, waiting for a number /// We are indexing an array, waiting for a number
Index, Index,
/// We parsed the start index, waiting for `]` or the first `.` of `..`
IndexAfterStart(i64),
/// We saw one `.` after the start index, waiting for the second `.`
IndexRangeDot1(i64),
/// We saw `..`, waiting for the end index (optionally prefixed with `=`)
IndexRangeDot2(i64),
/// We are indexing an array, waiting for a close-bracket /// We are indexing an array, waiting for a close-bracket
IndexClose, IndexClose,
} }
@@ -164,8 +173,7 @@ impl Parser {
} }
})?; })?;
self.segments.push(PathSegment::Index(idx)); self.state = State::IndexAfterStart(idx);
self.state = State::IndexClose;
} }
(State::Index, (p, Token::Root)) (State::Index, (p, Token::Root))
@@ -175,6 +183,49 @@ impl Parser {
return Err(PathParseError::Syntax { position: *p }); return Err(PathParseError::Syntax { position: *p });
} }
(State::IndexAfterStart(idx), (_, Token::SqbClose)) => {
self.segments.push(PathSegment::Index(idx));
self.state = State::Selected;
}
(State::IndexAfterStart(idx), (_, Token::Dot)) => {
self.state = State::IndexRangeDot1(idx);
}
(State::IndexAfterStart(_), (p, _)) => {
return Err(PathParseError::Syntax { position: *p });
}
(State::IndexRangeDot1(idx), (_, Token::Dot)) => {
self.state = State::IndexRangeDot2(idx);
}
(State::IndexRangeDot1(_), (p, _)) => {
return Err(PathParseError::Syntax { position: *p });
}
(State::IndexRangeDot2(start), (p, Token::Ident(ident))) => {
let (end_str, inclusive) = if let Some(stripped) = ident.strip_prefix('=') {
(stripped, true)
} else {
(*ident, false)
};
let end: i64 = i64::from_str(end_str).map_err(|_err| {
PathParseError::InvalidIndexString {
position: *p,
str: (*ident).into(),
}
})?;
self.segments.push(PathSegment::Range {
start,
end,
inclusive,
});
self.state = State::IndexClose;
}
(State::IndexRangeDot2(_), (p, _)) => {
return Err(PathParseError::Syntax { position: *p });
}
(State::IndexClose, (_, Token::SqbClose)) => self.state = State::Selected, (State::IndexClose, (_, Token::SqbClose)) => self.state = State::Selected,
(State::IndexClose, (p, _)) => { (State::IndexClose, (p, _)) => {
return Err(PathParseError::Syntax { position: *p }); return Err(PathParseError::Syntax { position: *p });
@@ -187,6 +238,9 @@ impl Parser {
State::Start => Err(PathParseError::Syntax { position: 0 }), State::Start => Err(PathParseError::Syntax { position: 0 }),
State::Dot => Err(PathParseError::Syntax { position }), State::Dot => Err(PathParseError::Syntax { position }),
State::Index => Err(PathParseError::Syntax { position }), State::Index => Err(PathParseError::Syntax { position }),
State::IndexAfterStart(_) => Err(PathParseError::Syntax { position }),
State::IndexRangeDot1(_) => Err(PathParseError::Syntax { position }),
State::IndexRangeDot2(_) => Err(PathParseError::Syntax { position }),
State::IndexClose => Err(PathParseError::Syntax { position }), State::IndexClose => Err(PathParseError::Syntax { position }),
State::Selected => Ok(()), State::Selected => Ok(()),
}?; }?;
@@ -387,4 +441,46 @@ mod tests {
}), }),
); );
} }
// MARK: range
fn range(start: i64, end: i64, inclusive: bool) -> PathSegment {
PathSegment::Range {
start,
end,
inclusive,
}
}
#[test]
fn exclusive_range() {
parse_test(
"$.a[0..5]",
Ok(&[PathSegment::Root, field("a"), range(0, 5, false)]),
);
}
#[test]
fn inclusive_range() {
parse_test(
"$.a[1..=2]",
Ok(&[PathSegment::Root, field("a"), range(1, 2, true)]),
);
}
#[test]
fn range_with_negative_end() {
parse_test(
"$.a[0..-1]",
Ok(&[PathSegment::Root, field("a"), range(0, -1, false)]),
);
}
#[test]
fn range_with_negative_start() {
parse_test(
"$.a[-3..-1]",
Ok(&[PathSegment::Root, field("a"), range(-3, -1, false)]),
);
}
} }

View File

@@ -21,7 +21,52 @@ impl Tokenizer {
let mut tokens = Vec::new(); let mut tokens = Vec::new();
let mut window_start = None; let mut window_start = None;
// Paren depth: while > 0, `.` / `[` / `]` / `$` are part of the ident.
let mut paren_depth: usize = 0;
// When true, the current char is escaped by a preceding `\` and is
// treated as a plain ident character with no special meaning.
let mut skip_next = false;
for (i, c) in source.char_indices() { for (i, c) in source.char_indices() {
if skip_next {
skip_next = false;
// Escaped char: just extend the ident window (already opened by `\`).
continue;
}
if c == '\\' {
if window_start.is_none() {
window_start = Some(i);
}
skip_next = true;
continue;
}
if paren_depth > 0 {
// Inside parens: only track depth changes, everything else is ident.
match c {
'(' => {
if window_start.is_none() {
window_start = Some(i);
}
paren_depth += 1;
}
')' => {
if window_start.is_none() {
window_start = Some(i);
}
paren_depth -= 1;
}
x if x.is_ascii() => {
if window_start.is_none() {
window_start = Some(i);
}
}
char => return Err(PathParseError::NonAsciiChar { position: i, char }),
}
continue;
}
match c { match c {
'$' => { '$' => {
if let Some(s) = window_start.take() { if let Some(s) = window_start.take() {
@@ -51,10 +96,26 @@ impl Tokenizer {
tokens.push((i, Token::SqbClose)); tokens.push((i, Token::SqbClose));
} }
x if x.is_ascii() => match window_start { '(' => {
None => window_start = Some(i), if window_start.is_none() {
Some(_) => continue, window_start = Some(i);
}, }
paren_depth += 1;
}
')' => {
if window_start.is_none() {
window_start = Some(i);
}
// paren_depth is 0 here — stray `)` is an ident char and
// parse_field will surface the error later.
}
x if x.is_ascii() => {
if window_start.is_none() {
window_start = Some(i);
}
}
char => return Err(PathParseError::NonAsciiChar { position: i, char }), char => return Err(PathParseError::NonAsciiChar { position: i, char }),
} }

View File

@@ -122,6 +122,14 @@ mod tests {
GroupSegment::Literal(s.into()) GroupSegment::Literal(s.into())
} }
#[test]
fn regex() {
assert_eq!(
parse("{$.split(/)[-1].regex((.*).pub \\((.*)\\).pdf)[0]}").unwrap(),
vec![(0, path("$.split(/)[-1].regex((.*).pub \\((.*)\\).pdf)[0]"))]
);
}
#[test] #[test]
fn single_path() { fn single_path() {
assert_eq!(parse("{$.foo}").unwrap(), vec![(0, path("$.foo"))]); assert_eq!(parse("{$.foo}").unwrap(), vec![(0, path("$.foo"))]);

View File

@@ -1,5 +1,7 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; use pile_config::{
ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern,
};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
@@ -66,15 +68,109 @@ impl Dataset {
/// An opened dataset: config, working directory, and all opened sources. /// An opened dataset: config, working directory, and all opened sources.
pub struct Datasets { pub struct Datasets {
pub path_config: PathBuf, pub path_config: Option<PathBuf>,
pub path_parent: PathBuf, pub path_parent: PathBuf,
pub path_workdir: PathBuf, pub path_workdir: Option<PathBuf>,
pub config: ConfigToml, pub config: ConfigToml,
pub sources: HashMap<Label, Dataset>, pub sources: HashMap<Label, Dataset>,
} }
impl Datasets { impl Datasets {
#[expect(clippy::unwrap_used)]
pub fn virt_source() -> Label {
Label::new("virtual-source").unwrap()
}
#[expect(clippy::unwrap_used)]
pub async fn virt(parent: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_parent = parent.into();
let config = ConfigToml {
dataset: DatasetConfig {
name: Label::new("virtual-dataset").unwrap(),
working_dir: None,
source: [(
Self::virt_source(),
Source::Filesystem {
enabled: true,
path: path_parent.clone(),
pattern: GroupPattern::default(),
},
)]
.into_iter()
.collect(),
},
schema: HashMap::new(),
fts: None,
};
let mut sources = HashMap::new();
for (label, source) in &config.dataset.source {
match source {
Source::Filesystem {
enabled,
path,
pattern,
} => {
if !enabled {
continue;
}
sources.insert(
label.clone(),
Dataset::Dir(
DirDataSource::new(label, path_parent.join(path), pattern.clone())
.await?,
),
);
}
Source::S3 {
enabled,
bucket,
prefix,
endpoint,
region,
credentials,
pattern,
} => {
if !enabled {
continue;
}
match S3DataSource::new(
label,
bucket.clone(),
prefix.clone(),
endpoint.clone(),
region.clone(),
credentials,
pattern.clone(),
)
.await
{
Ok(ds) => {
sources.insert(label.clone(), Dataset::S3(ds));
}
Err(err) => {
warn!("Could not open S3 source {label}: {err}");
}
}
}
}
}
return Ok(Self {
path_config: None,
path_workdir: None,
path_parent,
config,
sources,
});
}
pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> { pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_config = config.into(); let path_config = config.into();
let path_parent = path_config let path_parent = path_config
@@ -168,9 +264,9 @@ impl Datasets {
} }
return Ok(Self { return Ok(Self {
path_config, path_config: Some(path_config),
path_workdir: Some(path_workdir),
path_parent, path_parent,
path_workdir,
config, config,
sources, sources,
}); });
@@ -216,8 +312,16 @@ impl Datasets {
_threads: usize, _threads: usize,
flag: Option<CancelFlag>, flag: Option<CancelFlag>,
) -> Result<(), CancelableTaskError<DatasetError>> { ) -> Result<(), CancelableTaskError<DatasetError>> {
let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); let workdir = match self.path_workdir.as_ref() {
let fts_dir = self.path_workdir.join("fts"); Some(x) => x,
None => {
warn!("Skipping fts_refresh, no workdir");
return Ok(());
}
};
let fts_tmp_dir = workdir.join(".tmp-fts");
let fts_dir = workdir.join("fts");
if fts_tmp_dir.is_dir() { if fts_tmp_dir.is_dir() {
warn!("Removing temporary index in {}", fts_dir.display()); warn!("Removing temporary index in {}", fts_dir.display());
@@ -315,7 +419,15 @@ impl Datasets {
query: &str, query: &str,
top_n: usize, top_n: usize,
) -> Result<Vec<FtsLookupResult>, DatasetError> { ) -> Result<Vec<FtsLookupResult>, DatasetError> {
let fts_dir = self.path_workdir.join("fts"); let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => {
warn!("Skipping fts_lookup, no workdir");
return Ok(Vec::new());
}
};
let fts_dir = workdir.join("fts");
if !fts_dir.exists() { if !fts_dir.exists() {
return Err(DatasetError::NoFtsIndex); return Err(DatasetError::NoFtsIndex);
@@ -335,7 +447,12 @@ impl Datasets {
/// Time at which fts was created /// Time at which fts was created
pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> { pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let fts_dir = self.path_workdir.join("fts"); let workdir = match self.path_workdir.as_ref() {
Some(x) => x,
None => return Ok(None),
};
let fts_dir = workdir.join("fts");
if !fts_dir.exists() { if !fts_dir.exists() {
return Ok(None); return Ok(None);

View File

@@ -112,8 +112,8 @@ impl DbFtsIndex {
// Try paths in order, using the first value we find // Try paths in order, using the first value we find
for path in field.path.as_slice() { for path in field.path.as_slice() {
let val = match extractor.query(state, path).await? { let val = match extractor.query(state, path).await? {
Some(PileValue::Null) | None => continue,
Some(x) => x, Some(x) => x,
None => continue,
}; };
let val = val_to_string(state, &val, path, field_name).await?; let val = val_to_string(state, &val, path, field_name).await?;

View File

@@ -18,6 +18,7 @@ tracing = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
toml = { workspace = true } toml = { workspace = true }
smartstring = { workspace = true } smartstring = { workspace = true }
regex = { workspace = true }
blake3 = { workspace = true } blake3 = { workspace = true }
epub = { workspace = true } epub = { workspace = true }
kamadak-exif = { workspace = true } kamadak-exif = { workspace = true }

View File

@@ -54,4 +54,33 @@ impl ObjectExtractor for EpubExtractor {
Label::new("meta").unwrap(), Label::new("meta").unwrap(),
]) ])
} }
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?;
let mut map = serde_json::Map::new();
for k in &keys {
let v = match self.field(state, k, None).await? {
Some(x) => x,
None => continue,
};
if k.as_str() == "text" {
map.insert(
k.to_string(),
serde_json::Value::String(format!(
"<String ({} bytes)",
match v {
PileValue::String(x) => x.len(),
_ => 0,
}
)),
);
continue;
}
map.insert(k.to_string(), Box::pin(v.to_json(state)).await?);
}
Ok(serde_json::Value::Object(map))
}
} }

View File

@@ -5,7 +5,7 @@ use crate::{
use pile_config::Label; use pile_config::Label;
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::Component, path::{Component, PathBuf},
sync::{Arc, OnceLock}, sync::{Arc, OnceLock},
}; };
@@ -27,22 +27,20 @@ impl FsExtractor {
return Ok(x); return Ok(x);
} }
let Item::File { path, .. } = &self.item else { let path = PathBuf::from(self.item.key().as_str());
return Ok(self.output.get_or_init(HashMap::new));
};
let mut root = false; let mut root = false;
let components = path let components = path
.components() .components()
.map(|x| match x { .filter_map(|x| match x {
Component::CurDir => None, Component::CurDir => None,
Component::Normal(x) => x.to_str().map(|x| x.to_owned()), Component::Normal(x) => Some(x.to_str().map(|x| x.to_owned())),
Component::ParentDir => Some("..".to_owned()), Component::ParentDir => Some(Some("..".to_owned())),
Component::RootDir => { Component::RootDir => {
root = true; root = true;
None Some(None)
} }
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()), Component::Prefix(x) => Some(x.as_os_str().to_str().map(|x| x.to_owned())),
}) })
.collect::<Option<Vec<_>>>(); .collect::<Option<Vec<_>>>();
@@ -69,6 +67,7 @@ impl FsExtractor {
( (
Label::new("segments").unwrap(), Label::new("segments").unwrap(),
components components
.clone()
.map(|x| { .map(|x| {
PileValue::Array(Arc::new( PileValue::Array(Arc::new(
x.iter() x.iter()
@@ -78,6 +77,12 @@ impl FsExtractor {
}) })
.unwrap_or(PileValue::Null), .unwrap_or(PileValue::Null),
), ),
(
Label::new("name").unwrap(),
components
.and_then(|x| x.last().map(|x| PileValue::String(Arc::new(x.into()))))
.unwrap_or(PileValue::Null),
),
]); ]);
return Ok(self.output.get_or_init(|| output)); return Ok(self.output.get_or_init(|| output));

View File

@@ -28,6 +28,9 @@ pub use toml::*;
mod group; mod group;
pub use group::*; pub use group::*;
mod text;
pub use text::*;
use crate::{ use crate::{
extract::{ extract::{
misc::MapExtractor, misc::MapExtractor,
@@ -77,6 +80,10 @@ impl ItemExtractor {
Label::new("toml").unwrap(), Label::new("toml").unwrap(),
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))), PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
), ),
(
Label::new("text").unwrap(),
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
),
( (
Label::new("groups").unwrap(), Label::new("groups").unwrap(),
PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))), PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))),
@@ -110,6 +117,7 @@ impl ObjectExtractor for ItemExtractor {
Label::new("pdf").unwrap(), Label::new("pdf").unwrap(),
Label::new("json").unwrap(), Label::new("json").unwrap(),
Label::new("toml").unwrap(), Label::new("toml").unwrap(),
Label::new("text").unwrap(),
Label::new("groups").unwrap(), Label::new("groups").unwrap(),
]); ]);
} }

View File

@@ -68,4 +68,33 @@ impl ObjectExtractor for PdfExtractor {
Label::new("pages").unwrap(), Label::new("pages").unwrap(),
]) ])
} }
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?;
let mut map = serde_json::Map::new();
for k in &keys {
let v = match self.field(state, k, None).await? {
Some(x) => x,
None => continue,
};
if k.as_str() == "text" {
map.insert(
k.to_string(),
serde_json::Value::String(format!(
"<String ({} bytes)",
match v {
PileValue::String(x) => x.len(),
_ => 0,
}
)),
);
continue;
}
map.insert(k.to_string(), Box::pin(v.to_json(state)).await?);
}
Ok(serde_json::Value::Object(map))
}
} }

View File

@@ -0,0 +1,67 @@
use pile_config::Label;
use std::sync::{Arc, OnceLock};
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{AsyncReader, Item, PileValue},
};
pub struct TextExtractor {
item: Item,
output: OnceLock<PileValue>,
}
impl TextExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
}
#[async_trait::async_trait]
impl ObjectExtractor for TextExtractor {
async fn field(
&self,
state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
if !state.ignore_mime
&& (self.item.mime().type_() != mime::TEXT
&& self.item.mime().type_() != mime::APPLICATION)
{
return Ok(None);
}
if name.as_str() != "text" {
return Ok(None);
}
{
if let Some(x) = self.output.get() {
return Ok(Some(x.clone()));
}
let mut reader = self.item.read().await?;
let bytes = reader.read_to_end().await?;
let string = String::from_utf8(bytes).ok();
let value = match string {
Some(x) => PileValue::String(Arc::new(x.into())),
None => PileValue::Null,
};
return Ok(Some(self.output.get_or_init(|| value).clone()));
}
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(vec![Label::new("text").unwrap()])
}
}

View File

@@ -1,4 +1,5 @@
pub mod item; pub mod item;
pub mod misc; pub mod misc;
pub mod regex;
pub mod string; pub mod string;
pub mod traits; pub mod traits;

View File

@@ -0,0 +1,104 @@
use std::sync::Arc;
use pile_config::Label;
use regex::Regex;
use smartstring::{LazyCompact, SmartString};
use crate::{
extract::traits::{ExtractState, ListExtractor, ObjectExtractor},
value::PileValue,
};
struct RegexData {
regex: Arc<Regex>,
/// Captured substrings indexed by group index (0 = whole match).
captures: Vec<Option<Arc<SmartString<LazyCompact>>>>,
}
impl RegexData {
fn new(regex: Arc<Regex>, input: &str) -> Option<Self> {
let caps = regex.captures(input)?;
let captures = caps
.iter()
.map(|m| m.map(|m| Arc::new(m.as_str().into())))
.collect();
Some(Self { regex, captures })
}
}
/// Exposes named capture groups as object fields.
pub struct RegexExtractor(Arc<RegexData>);
impl RegexExtractor {
/// Run `regex` against `input`. Returns `None` if there is no match.
pub fn new(regex: Arc<Regex>, input: &str) -> Option<Self> {
Some(Self(Arc::new(RegexData::new(regex, input)?)))
}
}
#[async_trait::async_trait]
impl ObjectExtractor for RegexExtractor {
async fn field(
&self,
_state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
let Some(idx) = self
.0
.regex
.capture_names()
.position(|n| n == Some(name.as_str()))
else {
return Ok(None);
};
Ok(Some(
match self.0.captures.get(idx).and_then(|v| v.as_ref()) {
Some(s) => PileValue::String(s.clone()),
None => PileValue::Null,
},
))
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
#[expect(clippy::unwrap_used)]
Ok(self
.0
.regex
.capture_names()
.flatten()
.map(|n| Label::new(n).unwrap())
.collect())
}
fn as_list(&self) -> Option<Arc<dyn ListExtractor>> {
Some(Arc::new(RegexExtractor(self.0.clone())))
}
}
#[async_trait::async_trait]
impl ListExtractor for RegexExtractor {
async fn get(
&self,
_state: &ExtractState,
idx: usize,
) -> Result<Option<PileValue>, std::io::Error> {
let raw_idx = idx + 1;
let Some(slot) = self.0.captures.get(raw_idx) else {
return Ok(None);
};
Ok(Some(match slot {
Some(s) => PileValue::String(s.clone()),
None => PileValue::Null,
}))
}
async fn len(&self, _state: &ExtractState) -> Result<usize, std::io::Error> {
Ok(self.0.captures.len().saturating_sub(1))
}
}

View File

@@ -1,9 +1,13 @@
use pile_config::Label; use pile_config::Label;
use regex::Regex;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use std::sync::Arc; use std::sync::Arc;
use crate::{ use crate::{
extract::traits::{ExtractState, ObjectExtractor}, extract::{
regex::RegexExtractor,
traits::{ExtractState, ObjectExtractor},
},
value::PileValue, value::PileValue,
}; };
@@ -67,6 +71,18 @@ impl ObjectExtractor for StringExtractor {
.collect(), .collect(),
))), ))),
("regex", Some(pattern)) => {
let Ok(re) = Regex::new(pattern) else {
return Ok(None);
};
Some(
match RegexExtractor::new(Arc::new(re), self.item.as_str()) {
Some(ext) => PileValue::ObjectExtractor(Arc::new(ext)),
None => PileValue::Null,
},
)
}
_ => None, _ => None,
}) })
} }
@@ -78,6 +94,9 @@ impl ObjectExtractor for StringExtractor {
Label::new("upper").unwrap(), Label::new("upper").unwrap(),
Label::new("lower").unwrap(), Label::new("lower").unwrap(),
Label::new("nonempty").unwrap(), Label::new("nonempty").unwrap(),
Label::new("trimprefix").unwrap(),
Label::new("trimsuffix").unwrap(),
Label::new("split").unwrap(),
]); ]);
} }
} }

View File

@@ -35,6 +35,11 @@ pub trait ObjectExtractor: Send + Sync {
/// and [None] for all others. /// and [None] for all others.
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>; async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
/// Return a list view of this extractor, if supported.
fn as_list(&self) -> Option<std::sync::Arc<dyn ListExtractor>> {
None
}
/// Convert this to a JSON value. /// Convert this to a JSON value.
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> { async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?; let keys = self.fields().await?;

View File

@@ -40,11 +40,22 @@ impl Item {
Self::File { path, .. } => ItemReader::File(File::open(path)?), Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => { Self::S3 { source, key, .. } => {
let full_key: SmartString<LazyCompact> = match &source.prefix {
None => key.clone(),
Some(p) => {
if p.ends_with('/') {
format!("{p}{key}").into()
} else {
format!("{p}/{key}").into()
}
}
};
let head = source let head = source
.client .client
.head_object() .head_object()
.bucket(source.bucket.as_str()) .bucket(source.bucket.as_str())
.key(key.as_str()) .key(full_key.as_str())
.send() .send()
.await .await
.map_err(std::io::Error::other)?; .map_err(std::io::Error::other)?;
@@ -54,7 +65,7 @@ impl Item {
ItemReader::S3(S3Reader { ItemReader::S3(S3Reader {
client: source.client.clone(), client: source.client.clone(),
bucket: source.bucket.clone(), bucket: source.bucket.clone(),
key: key.to_owned(), key: full_key,
cursor: 0, cursor: 0,
size, size,
}) })

View File

@@ -86,7 +86,9 @@ impl PileValue {
Self::String(_) => Arc::new(VecExtractor::default()), Self::String(_) => Arc::new(VecExtractor::default()),
Self::Blob { .. } => Arc::new(VecExtractor::default()), Self::Blob { .. } => Arc::new(VecExtractor::default()),
Self::ListExtractor(e) => e.clone(), Self::ListExtractor(e) => e.clone(),
Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()), Self::ObjectExtractor(e) => e
.as_list()
.unwrap_or_else(|| Arc::new(VecExtractor::default())),
Self::Item(_) => Arc::new(VecExtractor::default()), Self::Item(_) => Arc::new(VecExtractor::default()),
} }
} }
@@ -138,6 +140,40 @@ impl PileValue {
out = e.get(state, idx).await?; out = e.get(state, idx).await?;
} }
PathSegment::Range {
start,
end,
inclusive,
} => {
let e = match out.map(|x| x.list_extractor()) {
Some(e) => e,
None => {
out = None;
continue;
}
};
let len = e.len(state).await? as i64;
let start_idx = if *start >= 0 { *start } else { len + start };
let end_idx = if *end >= 0 { *end } else { len + end };
let end_idx = if *inclusive { end_idx + 1 } else { end_idx };
let start_idx = start_idx.max(0) as usize;
let end_idx = (end_idx.max(0) as usize).min(len as usize);
let mut items = Vec::new();
for i in start_idx..end_idx {
match e.get(state, i).await? {
Some(v) => items.push(v),
None => break,
}
}
// TODO: lazy view?
out = Some(PileValue::Array(Arc::new(items)));
}
} }
} }

View File

@@ -0,0 +1,113 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{extract::traits::ExtractState, value::PileValue};
use std::path::PathBuf;
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ItemCommand {
/// Source name (as defined in pile.toml)
source: String,
/// Item key within the source
key: String,
/// If present, extract a specific field
#[arg(long, short = 'p')]
path: Option<String>,
/// If present, print the schema fields instead of item data
#[arg(long)]
fields: bool,
#[arg(long, short = 'x')]
exclude: Vec<String>,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
}
impl CliCmd for ItemCommand {
#[expect(clippy::print_stdout)]
#[expect(clippy::unwrap_used)]
async fn run(
self,
_ctx: GlobalContext,
_flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let source = Label::new(&self.source)
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };
let item = ds.get(&source, &self.key).await.ok_or_else(|| {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?;
let pv = PileValue::Item(item);
if self.fields {
let mut map = serde_json::Map::new();
for (name, spec) in &ds.config.schema {
if self.exclude.contains(&name.to_string()) {
continue;
}
let mut value = None;
for path in &spec.path {
let v = pv
.query(&state, path)
.await
.with_context(|| format!("while extracting field {name}"))?;
if let Some(v) = v
&& !matches!(v, PileValue::Null)
{
let j = v
.to_json(&state)
.await
.with_context(|| format!("while extracting field {name}"))?;
value = Some(j);
break;
}
}
map.insert(name.to_string(), value.unwrap_or(serde_json::Value::Null));
}
let json = serde_json::to_string_pretty(&serde_json::Value::Object(map)).unwrap();
println!("{json}");
return Ok(0);
}
let json = if let Some(path_str) = self.path {
let path: ObjectPath = path_str
.parse()
.with_context(|| format!("invalid path {path_str:?}"))?;
let v = pv
.query(&state, &path)
.await
.with_context(|| format!("while extracting {}", self.key))?
.ok_or_else(|| {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?;
v.to_json(&state)
.await
.with_context(|| format!("while extracting {}", self.key))?
} else {
pv.to_json(&state)
.await
.with_context(|| format!("while extracting {}", self.key))?
};
let json = serde_json::to_string_pretty(&json).unwrap();
println!("{json}");
return Ok(0);
}
}

View File

@@ -8,6 +8,7 @@ mod check;
mod fields; mod fields;
mod index; mod index;
mod init; mod init;
mod item;
mod list; mod list;
mod lookup; mod lookup;
mod probe; mod probe;
@@ -59,12 +60,18 @@ pub enum SubCommand {
cmd: fields::FieldsCommand, cmd: fields::FieldsCommand,
}, },
/// Print all metadata from an item /// Print all metadata from a file
Probe { Probe {
#[command(flatten)] #[command(flatten)]
cmd: probe::ProbeCommand, cmd: probe::ProbeCommand,
}, },
/// Print all metadata from an item
Item {
#[command(flatten)]
cmd: item::ItemCommand,
},
/// Expose a dataset via an http api /// Expose a dataset via an http api
Serve { Serve {
#[command(flatten)] #[command(flatten)]
@@ -88,6 +95,7 @@ impl CliCmdDispatch for SubCommand {
Self::Lookup { cmd } => cmd.start(ctx), Self::Lookup { cmd } => cmd.start(ctx),
Self::Fields { cmd } => cmd.start(ctx), Self::Fields { cmd } => cmd.start(ctx),
Self::Probe { cmd } => cmd.start(ctx), Self::Probe { cmd } => cmd.start(ctx),
Self::Item { cmd } => cmd.start(ctx),
Self::Serve { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx),
Self::Upload { cmd } => cmd.start(ctx), Self::Upload { cmd } => cmd.start(ctx),

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::Args; use clap::Args;
use pile_config::{Label, objectpath::ObjectPath}; use pile_config::objectpath::ObjectPath;
use pile_dataset::Datasets; use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{extract::traits::ExtractState, value::PileValue}; use pile_value::{extract::traits::ExtractState, value::PileValue};
@@ -10,19 +10,12 @@ use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)] #[derive(Debug, Args)]
pub struct ProbeCommand { pub struct ProbeCommand {
/// Source name (as defined in pile.toml) /// The file to probe
source: String, file: PathBuf,
/// Item key within the source
key: String,
/// If present, extract a specific field /// If present, extract a specific field
#[arg(long, short = 'p')] #[arg(long, short = 'p')]
path: Option<String>, path: Option<String>,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
} }
impl CliCmd for ProbeCommand { impl CliCmd for ProbeCommand {
@@ -33,35 +26,37 @@ impl CliCmd for ProbeCommand {
_ctx: GlobalContext, _ctx: GlobalContext,
_flag: CancelFlag, _flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> { ) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let source = Label::new(&self.source) let ds = Datasets::virt(".")
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
let ds = Datasets::open(&self.config)
.await .await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?; .with_context(|| "while opening virtual dataset".to_owned())?;
let state = ExtractState { ignore_mime: false }; let state = ExtractState { ignore_mime: false };
let key = self.file.to_str().context("path is not utf-8")?;
let json = if let Some(path_str) = self.path { let json = if let Some(path_str) = self.path {
let path: ObjectPath = path_str let path: ObjectPath = path_str
.parse() .parse()
.with_context(|| format!("invalid path {path_str:?}"))?; .with_context(|| format!("invalid path {path_str:?}"))?;
ds.get_field(&state, &source, &self.key, &path) ds.get_field(
&state,
&Datasets::virt_source(),
self.file.to_str().context("path is not utf-8")?,
&path,
)
.await .await
.with_context(|| format!("while extracting {}", self.key))? .with_context(|| format!("while extracting {key}"))?
.ok_or_else(|| { .ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?
} else { } else {
let item = ds.get(&source, &self.key).await.ok_or_else(|| { let item = ds
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) .get(&Datasets::virt_source(), key)
})?; .await
.ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?;
let item = PileValue::Item(item); let item = PileValue::Item(item);
item.to_json(&state) item.to_json(&state)
.await .await
.with_context(|| format!("while extracting {}", self.key))? .with_context(|| format!("while extracting {key}"))?
}; };
let json = serde_json::to_string_pretty(&json).unwrap(); let json = serde_json::to_string_pretty(&json).unwrap();