Compare commits
9 Commits
e7afca3010
...
8a8e0a2770
| Author | SHA1 | Date | |
|---|---|---|---|
| 8a8e0a2770 | |||
| c2f4b12e35 | |||
| 302d2acef3 | |||
| 7caf2553bc | |||
| 44466f16cf | |||
| 48262bab48 | |||
| b6cb5870b4 | |||
| ed169b3ab4 | |||
| 2f2eb323d5 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2574,6 +2574,7 @@ dependencies = [
|
||||
"pdfium-render",
|
||||
"pile-config",
|
||||
"pile-flac",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"smartstring",
|
||||
"tokio",
|
||||
|
||||
@@ -127,6 +127,7 @@ mime = "0.3.17"
|
||||
mime_guess = "2.0.5"
|
||||
paste = "1.0.15"
|
||||
smartstring = "1.0.1"
|
||||
regex = "1"
|
||||
chrono = "0.4.43"
|
||||
parking_lot = "0.12.5"
|
||||
rayon = "1.11.0"
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{fmt, str::FromStr};
|
||||
|
||||
use serde::{
|
||||
Deserialize, Deserializer,
|
||||
Deserialize, Deserializer, Serialize, Serializer,
|
||||
de::{self, Visitor},
|
||||
};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
@@ -49,6 +49,13 @@ pub enum PathSegment {
|
||||
|
||||
/// Go to an element of the current list
|
||||
Index(i64),
|
||||
|
||||
/// Go to a slice of the current list
|
||||
Range {
|
||||
start: i64,
|
||||
end: i64,
|
||||
inclusive: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// A path to aPathSegment::Field inside a nested object,
|
||||
@@ -63,6 +70,39 @@ pub struct ObjectPath {
|
||||
pub segments: Vec<PathSegment>,
|
||||
}
|
||||
|
||||
impl fmt::Display for ObjectPath {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
for seg in &self.segments {
|
||||
match seg {
|
||||
PathSegment::Root => write!(f, "$")?,
|
||||
PathSegment::Field { name, args: None } => write!(f, ".{name}")?,
|
||||
PathSegment::Field {
|
||||
name,
|
||||
args: Some(a),
|
||||
} => write!(f, ".{name}({a})")?,
|
||||
PathSegment::Index(i) => write!(f, "[{i}]")?,
|
||||
PathSegment::Range {
|
||||
start,
|
||||
end,
|
||||
inclusive: false,
|
||||
} => write!(f, "[{start}..{end}]")?,
|
||||
PathSegment::Range {
|
||||
start,
|
||||
end,
|
||||
inclusive: true,
|
||||
} => write!(f, "[{start}..={end}]")?,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ObjectPath {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
serializer.serialize_str(&self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ObjectPath {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
struct PathVisitor;
|
||||
|
||||
@@ -87,6 +87,15 @@ enum State {
|
||||
/// We are indexing an array, waiting for a number
|
||||
Index,
|
||||
|
||||
/// We parsed the start index, waiting for `]` or the first `.` of `..`
|
||||
IndexAfterStart(i64),
|
||||
|
||||
/// We saw one `.` after the start index, waiting for the second `.`
|
||||
IndexRangeDot1(i64),
|
||||
|
||||
/// We saw `..`, waiting for the end index (optionally prefixed with `=`)
|
||||
IndexRangeDot2(i64),
|
||||
|
||||
/// We are indexing an array, waiting for a close-bracket
|
||||
IndexClose,
|
||||
}
|
||||
@@ -164,8 +173,7 @@ impl Parser {
|
||||
}
|
||||
})?;
|
||||
|
||||
self.segments.push(PathSegment::Index(idx));
|
||||
self.state = State::IndexClose;
|
||||
self.state = State::IndexAfterStart(idx);
|
||||
}
|
||||
|
||||
(State::Index, (p, Token::Root))
|
||||
@@ -175,6 +183,49 @@ impl Parser {
|
||||
return Err(PathParseError::Syntax { position: *p });
|
||||
}
|
||||
|
||||
(State::IndexAfterStart(idx), (_, Token::SqbClose)) => {
|
||||
self.segments.push(PathSegment::Index(idx));
|
||||
self.state = State::Selected;
|
||||
}
|
||||
(State::IndexAfterStart(idx), (_, Token::Dot)) => {
|
||||
self.state = State::IndexRangeDot1(idx);
|
||||
}
|
||||
(State::IndexAfterStart(_), (p, _)) => {
|
||||
return Err(PathParseError::Syntax { position: *p });
|
||||
}
|
||||
|
||||
(State::IndexRangeDot1(idx), (_, Token::Dot)) => {
|
||||
self.state = State::IndexRangeDot2(idx);
|
||||
}
|
||||
(State::IndexRangeDot1(_), (p, _)) => {
|
||||
return Err(PathParseError::Syntax { position: *p });
|
||||
}
|
||||
|
||||
(State::IndexRangeDot2(start), (p, Token::Ident(ident))) => {
|
||||
let (end_str, inclusive) = if let Some(stripped) = ident.strip_prefix('=') {
|
||||
(stripped, true)
|
||||
} else {
|
||||
(*ident, false)
|
||||
};
|
||||
|
||||
let end: i64 = i64::from_str(end_str).map_err(|_err| {
|
||||
PathParseError::InvalidIndexString {
|
||||
position: *p,
|
||||
str: (*ident).into(),
|
||||
}
|
||||
})?;
|
||||
|
||||
self.segments.push(PathSegment::Range {
|
||||
start,
|
||||
end,
|
||||
inclusive,
|
||||
});
|
||||
self.state = State::IndexClose;
|
||||
}
|
||||
(State::IndexRangeDot2(_), (p, _)) => {
|
||||
return Err(PathParseError::Syntax { position: *p });
|
||||
}
|
||||
|
||||
(State::IndexClose, (_, Token::SqbClose)) => self.state = State::Selected,
|
||||
(State::IndexClose, (p, _)) => {
|
||||
return Err(PathParseError::Syntax { position: *p });
|
||||
@@ -187,6 +238,9 @@ impl Parser {
|
||||
State::Start => Err(PathParseError::Syntax { position: 0 }),
|
||||
State::Dot => Err(PathParseError::Syntax { position }),
|
||||
State::Index => Err(PathParseError::Syntax { position }),
|
||||
State::IndexAfterStart(_) => Err(PathParseError::Syntax { position }),
|
||||
State::IndexRangeDot1(_) => Err(PathParseError::Syntax { position }),
|
||||
State::IndexRangeDot2(_) => Err(PathParseError::Syntax { position }),
|
||||
State::IndexClose => Err(PathParseError::Syntax { position }),
|
||||
State::Selected => Ok(()),
|
||||
}?;
|
||||
@@ -387,4 +441,46 @@ mod tests {
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// MARK: range
|
||||
|
||||
fn range(start: i64, end: i64, inclusive: bool) -> PathSegment {
|
||||
PathSegment::Range {
|
||||
start,
|
||||
end,
|
||||
inclusive,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exclusive_range() {
|
||||
parse_test(
|
||||
"$.a[0..5]",
|
||||
Ok(&[PathSegment::Root, field("a"), range(0, 5, false)]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inclusive_range() {
|
||||
parse_test(
|
||||
"$.a[1..=2]",
|
||||
Ok(&[PathSegment::Root, field("a"), range(1, 2, true)]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_with_negative_end() {
|
||||
parse_test(
|
||||
"$.a[0..-1]",
|
||||
Ok(&[PathSegment::Root, field("a"), range(0, -1, false)]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_with_negative_start() {
|
||||
parse_test(
|
||||
"$.a[-3..-1]",
|
||||
Ok(&[PathSegment::Root, field("a"), range(-3, -1, false)]),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,52 @@ impl Tokenizer {
|
||||
let mut tokens = Vec::new();
|
||||
|
||||
let mut window_start = None;
|
||||
// Paren depth: while > 0, `.` / `[` / `]` / `$` are part of the ident.
|
||||
let mut paren_depth: usize = 0;
|
||||
// When true, the current char is escaped by a preceding `\` and is
|
||||
// treated as a plain ident character with no special meaning.
|
||||
let mut skip_next = false;
|
||||
|
||||
for (i, c) in source.char_indices() {
|
||||
if skip_next {
|
||||
skip_next = false;
|
||||
// Escaped char: just extend the ident window (already opened by `\`).
|
||||
continue;
|
||||
}
|
||||
|
||||
if c == '\\' {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
skip_next = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if paren_depth > 0 {
|
||||
// Inside parens: only track depth changes, everything else is ident.
|
||||
match c {
|
||||
'(' => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
paren_depth += 1;
|
||||
}
|
||||
')' => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
paren_depth -= 1;
|
||||
}
|
||||
x if x.is_ascii() => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
}
|
||||
char => return Err(PathParseError::NonAsciiChar { position: i, char }),
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
match c {
|
||||
'$' => {
|
||||
if let Some(s) = window_start.take() {
|
||||
@@ -51,10 +96,26 @@ impl Tokenizer {
|
||||
tokens.push((i, Token::SqbClose));
|
||||
}
|
||||
|
||||
x if x.is_ascii() => match window_start {
|
||||
None => window_start = Some(i),
|
||||
Some(_) => continue,
|
||||
},
|
||||
'(' => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
paren_depth += 1;
|
||||
}
|
||||
|
||||
')' => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
// paren_depth is 0 here — stray `)` is an ident char and
|
||||
// parse_field will surface the error later.
|
||||
}
|
||||
|
||||
x if x.is_ascii() => {
|
||||
if window_start.is_none() {
|
||||
window_start = Some(i);
|
||||
}
|
||||
}
|
||||
|
||||
char => return Err(PathParseError::NonAsciiChar { position: i, char }),
|
||||
}
|
||||
|
||||
@@ -122,6 +122,14 @@ mod tests {
|
||||
GroupSegment::Literal(s.into())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn regex() {
|
||||
assert_eq!(
|
||||
parse("{$.split(/)[-1].regex((.*).pub \\((.*)\\).pdf)[0]}").unwrap(),
|
||||
vec![(0, path("$.split(/)[-1].regex((.*).pub \\((.*)\\).pdf)[0]"))]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_path() {
|
||||
assert_eq!(parse("{$.foo}").unwrap(), vec![(0, path("$.foo"))]);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
|
||||
use pile_config::{
|
||||
ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern,
|
||||
};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
@@ -66,15 +68,109 @@ impl Dataset {
|
||||
|
||||
/// An opened dataset: config, working directory, and all opened sources.
|
||||
pub struct Datasets {
|
||||
pub path_config: PathBuf,
|
||||
pub path_config: Option<PathBuf>,
|
||||
pub path_parent: PathBuf,
|
||||
pub path_workdir: PathBuf,
|
||||
pub path_workdir: Option<PathBuf>,
|
||||
|
||||
pub config: ConfigToml,
|
||||
pub sources: HashMap<Label, Dataset>,
|
||||
}
|
||||
|
||||
impl Datasets {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
pub fn virt_source() -> Label {
|
||||
Label::new("virtual-source").unwrap()
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
pub async fn virt(parent: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
|
||||
let path_parent = parent.into();
|
||||
|
||||
let config = ConfigToml {
|
||||
dataset: DatasetConfig {
|
||||
name: Label::new("virtual-dataset").unwrap(),
|
||||
working_dir: None,
|
||||
|
||||
source: [(
|
||||
Self::virt_source(),
|
||||
Source::Filesystem {
|
||||
enabled: true,
|
||||
path: path_parent.clone(),
|
||||
pattern: GroupPattern::default(),
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
},
|
||||
schema: HashMap::new(),
|
||||
fts: None,
|
||||
};
|
||||
|
||||
let mut sources = HashMap::new();
|
||||
for (label, source) in &config.dataset.source {
|
||||
match source {
|
||||
Source::Filesystem {
|
||||
enabled,
|
||||
path,
|
||||
pattern,
|
||||
} => {
|
||||
if !enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
sources.insert(
|
||||
label.clone(),
|
||||
Dataset::Dir(
|
||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||
.await?,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
Source::S3 {
|
||||
enabled,
|
||||
bucket,
|
||||
prefix,
|
||||
endpoint,
|
||||
region,
|
||||
credentials,
|
||||
pattern,
|
||||
} => {
|
||||
if !enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
match S3DataSource::new(
|
||||
label,
|
||||
bucket.clone(),
|
||||
prefix.clone(),
|
||||
endpoint.clone(),
|
||||
region.clone(),
|
||||
credentials,
|
||||
pattern.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ds) => {
|
||||
sources.insert(label.clone(), Dataset::S3(ds));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Could not open S3 source {label}: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(Self {
|
||||
path_config: None,
|
||||
path_workdir: None,
|
||||
path_parent,
|
||||
config,
|
||||
sources,
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
|
||||
let path_config = config.into();
|
||||
let path_parent = path_config
|
||||
@@ -168,9 +264,9 @@ impl Datasets {
|
||||
}
|
||||
|
||||
return Ok(Self {
|
||||
path_config,
|
||||
path_config: Some(path_config),
|
||||
path_workdir: Some(path_workdir),
|
||||
path_parent,
|
||||
path_workdir,
|
||||
config,
|
||||
sources,
|
||||
});
|
||||
@@ -216,8 +312,16 @@ impl Datasets {
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
warn!("Skipping fts_refresh, no workdir");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
@@ -315,7 +419,15 @@ impl Datasets {
|
||||
query: &str,
|
||||
top_n: usize,
|
||||
) -> Result<Vec<FtsLookupResult>, DatasetError> {
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
warn!("Skipping fts_lookup, no workdir");
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
};
|
||||
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
if !fts_dir.exists() {
|
||||
return Err(DatasetError::NoFtsIndex);
|
||||
@@ -335,7 +447,12 @@ impl Datasets {
|
||||
|
||||
/// Time at which fts was created
|
||||
pub fn ts_fts(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let fts_dir = self.path_workdir.join("fts");
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
if !fts_dir.exists() {
|
||||
return Ok(None);
|
||||
|
||||
@@ -112,8 +112,8 @@ impl DbFtsIndex {
|
||||
// Try paths in order, using the first value we find
|
||||
for path in field.path.as_slice() {
|
||||
let val = match extractor.query(state, path).await? {
|
||||
Some(PileValue::Null) | None => continue,
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let val = val_to_string(state, &val, path, field_name).await?;
|
||||
|
||||
@@ -18,6 +18,7 @@ tracing = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
smartstring = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
blake3 = { workspace = true }
|
||||
epub = { workspace = true }
|
||||
kamadak-exif = { workspace = true }
|
||||
|
||||
@@ -54,4 +54,33 @@ impl ObjectExtractor for EpubExtractor {
|
||||
Label::new("meta").unwrap(),
|
||||
])
|
||||
}
|
||||
|
||||
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||
let keys = self.fields().await?;
|
||||
let mut map = serde_json::Map::new();
|
||||
for k in &keys {
|
||||
let v = match self.field(state, k, None).await? {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if k.as_str() == "text" {
|
||||
map.insert(
|
||||
k.to_string(),
|
||||
serde_json::Value::String(format!(
|
||||
"<String ({} bytes)",
|
||||
match v {
|
||||
PileValue::String(x) => x.len(),
|
||||
_ => 0,
|
||||
}
|
||||
)),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
map.insert(k.to_string(), Box::pin(v.to_json(state)).await?);
|
||||
}
|
||||
|
||||
Ok(serde_json::Value::Object(map))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Component,
|
||||
path::{Component, PathBuf},
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
@@ -27,22 +27,20 @@ impl FsExtractor {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let Item::File { path, .. } = &self.item else {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
};
|
||||
let path = PathBuf::from(self.item.key().as_str());
|
||||
|
||||
let mut root = false;
|
||||
let components = path
|
||||
.components()
|
||||
.map(|x| match x {
|
||||
.filter_map(|x| match x {
|
||||
Component::CurDir => None,
|
||||
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
|
||||
Component::ParentDir => Some("..".to_owned()),
|
||||
Component::Normal(x) => Some(x.to_str().map(|x| x.to_owned())),
|
||||
Component::ParentDir => Some(Some("..".to_owned())),
|
||||
Component::RootDir => {
|
||||
root = true;
|
||||
None
|
||||
Some(None)
|
||||
}
|
||||
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
|
||||
Component::Prefix(x) => Some(x.as_os_str().to_str().map(|x| x.to_owned())),
|
||||
})
|
||||
.collect::<Option<Vec<_>>>();
|
||||
|
||||
@@ -69,6 +67,7 @@ impl FsExtractor {
|
||||
(
|
||||
Label::new("segments").unwrap(),
|
||||
components
|
||||
.clone()
|
||||
.map(|x| {
|
||||
PileValue::Array(Arc::new(
|
||||
x.iter()
|
||||
@@ -78,6 +77,12 @@ impl FsExtractor {
|
||||
})
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
(
|
||||
Label::new("name").unwrap(),
|
||||
components
|
||||
.and_then(|x| x.last().map(|x| PileValue::String(Arc::new(x.into()))))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
]);
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
|
||||
@@ -28,6 +28,9 @@ pub use toml::*;
|
||||
mod group;
|
||||
pub use group::*;
|
||||
|
||||
mod text;
|
||||
pub use text::*;
|
||||
|
||||
use crate::{
|
||||
extract::{
|
||||
misc::MapExtractor,
|
||||
@@ -77,6 +80,10 @@ impl ItemExtractor {
|
||||
Label::new("toml").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("groups").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))),
|
||||
@@ -110,6 +117,7 @@ impl ObjectExtractor for ItemExtractor {
|
||||
Label::new("pdf").unwrap(),
|
||||
Label::new("json").unwrap(),
|
||||
Label::new("toml").unwrap(),
|
||||
Label::new("text").unwrap(),
|
||||
Label::new("groups").unwrap(),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -68,4 +68,33 @@ impl ObjectExtractor for PdfExtractor {
|
||||
Label::new("pages").unwrap(),
|
||||
])
|
||||
}
|
||||
|
||||
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||
let keys = self.fields().await?;
|
||||
let mut map = serde_json::Map::new();
|
||||
for k in &keys {
|
||||
let v = match self.field(state, k, None).await? {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if k.as_str() == "text" {
|
||||
map.insert(
|
||||
k.to_string(),
|
||||
serde_json::Value::String(format!(
|
||||
"<String ({} bytes)",
|
||||
match v {
|
||||
PileValue::String(x) => x.len(),
|
||||
_ => 0,
|
||||
}
|
||||
)),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
map.insert(k.to_string(), Box::pin(v.to_json(state)).await?);
|
||||
}
|
||||
|
||||
Ok(serde_json::Value::Object(map))
|
||||
}
|
||||
}
|
||||
|
||||
67
crates/pile-value/src/extract/item/text.rs
Normal file
67
crates/pile-value/src/extract/item/text.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
use pile_config::Label;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use crate::{
|
||||
extract::traits::{ExtractState, ObjectExtractor},
|
||||
value::{AsyncReader, Item, PileValue},
|
||||
};
|
||||
|
||||
pub struct TextExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<PileValue>,
|
||||
}
|
||||
|
||||
impl TextExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for TextExtractor {
|
||||
async fn field(
|
||||
&self,
|
||||
state: &ExtractState,
|
||||
name: &Label,
|
||||
args: Option<&str>,
|
||||
) -> Result<Option<PileValue>, std::io::Error> {
|
||||
if args.is_some() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if !state.ignore_mime
|
||||
&& (self.item.mime().type_() != mime::TEXT
|
||||
&& self.item.mime().type_() != mime::APPLICATION)
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if name.as_str() != "text" {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
{
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(Some(x.clone()));
|
||||
}
|
||||
|
||||
let mut reader = self.item.read().await?;
|
||||
let bytes = reader.read_to_end().await?;
|
||||
let string = String::from_utf8(bytes).ok();
|
||||
let value = match string {
|
||||
Some(x) => PileValue::String(Arc::new(x.into())),
|
||||
None => PileValue::Null,
|
||||
};
|
||||
|
||||
return Ok(Some(self.output.get_or_init(|| value).clone()));
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(vec![Label::new("text").unwrap()])
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod item;
|
||||
pub mod misc;
|
||||
pub mod regex;
|
||||
pub mod string;
|
||||
pub mod traits;
|
||||
|
||||
104
crates/pile-value/src/extract/regex.rs
Normal file
104
crates/pile-value/src/extract/regex.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pile_config::Label;
|
||||
use regex::Regex;
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
|
||||
use crate::{
|
||||
extract::traits::{ExtractState, ListExtractor, ObjectExtractor},
|
||||
value::PileValue,
|
||||
};
|
||||
|
||||
struct RegexData {
|
||||
regex: Arc<Regex>,
|
||||
/// Captured substrings indexed by group index (0 = whole match).
|
||||
captures: Vec<Option<Arc<SmartString<LazyCompact>>>>,
|
||||
}
|
||||
|
||||
impl RegexData {
|
||||
fn new(regex: Arc<Regex>, input: &str) -> Option<Self> {
|
||||
let caps = regex.captures(input)?;
|
||||
let captures = caps
|
||||
.iter()
|
||||
.map(|m| m.map(|m| Arc::new(m.as_str().into())))
|
||||
.collect();
|
||||
Some(Self { regex, captures })
|
||||
}
|
||||
}
|
||||
|
||||
/// Exposes named capture groups as object fields.
|
||||
pub struct RegexExtractor(Arc<RegexData>);
|
||||
|
||||
impl RegexExtractor {
|
||||
/// Run `regex` against `input`. Returns `None` if there is no match.
|
||||
pub fn new(regex: Arc<Regex>, input: &str) -> Option<Self> {
|
||||
Some(Self(Arc::new(RegexData::new(regex, input)?)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for RegexExtractor {
|
||||
async fn field(
|
||||
&self,
|
||||
_state: &ExtractState,
|
||||
name: &Label,
|
||||
args: Option<&str>,
|
||||
) -> Result<Option<PileValue>, std::io::Error> {
|
||||
if args.is_some() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(idx) = self
|
||||
.0
|
||||
.regex
|
||||
.capture_names()
|
||||
.position(|n| n == Some(name.as_str()))
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(Some(
|
||||
match self.0.captures.get(idx).and_then(|v| v.as_ref()) {
|
||||
Some(s) => PileValue::String(s.clone()),
|
||||
None => PileValue::Null,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
Ok(self
|
||||
.0
|
||||
.regex
|
||||
.capture_names()
|
||||
.flatten()
|
||||
.map(|n| Label::new(n).unwrap())
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn as_list(&self) -> Option<Arc<dyn ListExtractor>> {
|
||||
Some(Arc::new(RegexExtractor(self.0.clone())))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ListExtractor for RegexExtractor {
|
||||
async fn get(
|
||||
&self,
|
||||
_state: &ExtractState,
|
||||
idx: usize,
|
||||
) -> Result<Option<PileValue>, std::io::Error> {
|
||||
let raw_idx = idx + 1;
|
||||
let Some(slot) = self.0.captures.get(raw_idx) else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(match slot {
|
||||
Some(s) => PileValue::String(s.clone()),
|
||||
None => PileValue::Null,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn len(&self, _state: &ExtractState) -> Result<usize, std::io::Error> {
|
||||
Ok(self.0.captures.len().saturating_sub(1))
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,13 @@
|
||||
use pile_config::Label;
|
||||
use regex::Regex;
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
extract::traits::{ExtractState, ObjectExtractor},
|
||||
extract::{
|
||||
regex::RegexExtractor,
|
||||
traits::{ExtractState, ObjectExtractor},
|
||||
},
|
||||
value::PileValue,
|
||||
};
|
||||
|
||||
@@ -67,6 +71,18 @@ impl ObjectExtractor for StringExtractor {
|
||||
.collect(),
|
||||
))),
|
||||
|
||||
("regex", Some(pattern)) => {
|
||||
let Ok(re) = Regex::new(pattern) else {
|
||||
return Ok(None);
|
||||
};
|
||||
Some(
|
||||
match RegexExtractor::new(Arc::new(re), self.item.as_str()) {
|
||||
Some(ext) => PileValue::ObjectExtractor(Arc::new(ext)),
|
||||
None => PileValue::Null,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
@@ -78,6 +94,9 @@ impl ObjectExtractor for StringExtractor {
|
||||
Label::new("upper").unwrap(),
|
||||
Label::new("lower").unwrap(),
|
||||
Label::new("nonempty").unwrap(),
|
||||
Label::new("trimprefix").unwrap(),
|
||||
Label::new("trimsuffix").unwrap(),
|
||||
Label::new("split").unwrap(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,11 @@ pub trait ObjectExtractor: Send + Sync {
|
||||
/// and [None] for all others.
|
||||
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
|
||||
|
||||
/// Return a list view of this extractor, if supported.
|
||||
fn as_list(&self) -> Option<std::sync::Arc<dyn ListExtractor>> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Convert this to a JSON value.
|
||||
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||
let keys = self.fields().await?;
|
||||
|
||||
@@ -40,11 +40,22 @@ impl Item {
|
||||
Self::File { path, .. } => ItemReader::File(File::open(path)?),
|
||||
|
||||
Self::S3 { source, key, .. } => {
|
||||
let full_key: SmartString<LazyCompact> = match &source.prefix {
|
||||
None => key.clone(),
|
||||
Some(p) => {
|
||||
if p.ends_with('/') {
|
||||
format!("{p}{key}").into()
|
||||
} else {
|
||||
format!("{p}/{key}").into()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let head = source
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(source.bucket.as_str())
|
||||
.key(key.as_str())
|
||||
.key(full_key.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
@@ -54,7 +65,7 @@ impl Item {
|
||||
ItemReader::S3(S3Reader {
|
||||
client: source.client.clone(),
|
||||
bucket: source.bucket.clone(),
|
||||
key: key.to_owned(),
|
||||
key: full_key,
|
||||
cursor: 0,
|
||||
size,
|
||||
})
|
||||
|
||||
@@ -86,7 +86,9 @@ impl PileValue {
|
||||
Self::String(_) => Arc::new(VecExtractor::default()),
|
||||
Self::Blob { .. } => Arc::new(VecExtractor::default()),
|
||||
Self::ListExtractor(e) => e.clone(),
|
||||
Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()),
|
||||
Self::ObjectExtractor(e) => e
|
||||
.as_list()
|
||||
.unwrap_or_else(|| Arc::new(VecExtractor::default())),
|
||||
Self::Item(_) => Arc::new(VecExtractor::default()),
|
||||
}
|
||||
}
|
||||
@@ -138,6 +140,40 @@ impl PileValue {
|
||||
|
||||
out = e.get(state, idx).await?;
|
||||
}
|
||||
|
||||
PathSegment::Range {
|
||||
start,
|
||||
end,
|
||||
inclusive,
|
||||
} => {
|
||||
let e = match out.map(|x| x.list_extractor()) {
|
||||
Some(e) => e,
|
||||
None => {
|
||||
out = None;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let len = e.len(state).await? as i64;
|
||||
|
||||
let start_idx = if *start >= 0 { *start } else { len + start };
|
||||
let end_idx = if *end >= 0 { *end } else { len + end };
|
||||
let end_idx = if *inclusive { end_idx + 1 } else { end_idx };
|
||||
|
||||
let start_idx = start_idx.max(0) as usize;
|
||||
let end_idx = (end_idx.max(0) as usize).min(len as usize);
|
||||
|
||||
let mut items = Vec::new();
|
||||
for i in start_idx..end_idx {
|
||||
match e.get(state, i).await? {
|
||||
Some(v) => items.push(v),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: lazy view?
|
||||
out = Some(PileValue::Array(Arc::new(items)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
113
crates/pile/src/command/item.rs
Normal file
113
crates/pile/src/command/item.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Args;
|
||||
use pile_config::{Label, objectpath::ObjectPath};
|
||||
use pile_dataset::Datasets;
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::{CliCmd, GlobalContext};
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct ItemCommand {
|
||||
/// Source name (as defined in pile.toml)
|
||||
source: String,
|
||||
|
||||
/// Item key within the source
|
||||
key: String,
|
||||
|
||||
/// If present, extract a specific field
|
||||
#[arg(long, short = 'p')]
|
||||
path: Option<String>,
|
||||
|
||||
/// If present, print the schema fields instead of item data
|
||||
#[arg(long)]
|
||||
fields: bool,
|
||||
|
||||
#[arg(long, short = 'x')]
|
||||
exclude: Vec<String>,
|
||||
|
||||
/// Path to dataset config
|
||||
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||
config: PathBuf,
|
||||
}
|
||||
|
||||
impl CliCmd for ItemCommand {
|
||||
#[expect(clippy::print_stdout)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn run(
|
||||
self,
|
||||
_ctx: GlobalContext,
|
||||
_flag: CancelFlag,
|
||||
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||
let source = Label::new(&self.source)
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
|
||||
|
||||
let ds = Datasets::open(&self.config)
|
||||
.await
|
||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
|
||||
let item = ds.get(&source, &self.key).await.ok_or_else(|| {
|
||||
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
|
||||
})?;
|
||||
let pv = PileValue::Item(item);
|
||||
|
||||
if self.fields {
|
||||
let mut map = serde_json::Map::new();
|
||||
for (name, spec) in &ds.config.schema {
|
||||
if self.exclude.contains(&name.to_string()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut value = None;
|
||||
for path in &spec.path {
|
||||
let v = pv
|
||||
.query(&state, path)
|
||||
.await
|
||||
.with_context(|| format!("while extracting field {name}"))?;
|
||||
if let Some(v) = v
|
||||
&& !matches!(v, PileValue::Null)
|
||||
{
|
||||
let j = v
|
||||
.to_json(&state)
|
||||
.await
|
||||
.with_context(|| format!("while extracting field {name}"))?;
|
||||
value = Some(j);
|
||||
break;
|
||||
}
|
||||
}
|
||||
map.insert(name.to_string(), value.unwrap_or(serde_json::Value::Null));
|
||||
}
|
||||
let json = serde_json::to_string_pretty(&serde_json::Value::Object(map)).unwrap();
|
||||
println!("{json}");
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let json = if let Some(path_str) = self.path {
|
||||
let path: ObjectPath = path_str
|
||||
.parse()
|
||||
.with_context(|| format!("invalid path {path_str:?}"))?;
|
||||
|
||||
let v = pv
|
||||
.query(&state, &path)
|
||||
.await
|
||||
.with_context(|| format!("while extracting {}", self.key))?
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
|
||||
})?;
|
||||
v.to_json(&state)
|
||||
.await
|
||||
.with_context(|| format!("while extracting {}", self.key))?
|
||||
} else {
|
||||
pv.to_json(&state)
|
||||
.await
|
||||
.with_context(|| format!("while extracting {}", self.key))?
|
||||
};
|
||||
|
||||
let json = serde_json::to_string_pretty(&json).unwrap();
|
||||
println!("{json}");
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ mod check;
|
||||
mod fields;
|
||||
mod index;
|
||||
mod init;
|
||||
mod item;
|
||||
mod list;
|
||||
mod lookup;
|
||||
mod probe;
|
||||
@@ -59,12 +60,18 @@ pub enum SubCommand {
|
||||
cmd: fields::FieldsCommand,
|
||||
},
|
||||
|
||||
/// Print all metadata from an item
|
||||
/// Print all metadata from a file
|
||||
Probe {
|
||||
#[command(flatten)]
|
||||
cmd: probe::ProbeCommand,
|
||||
},
|
||||
|
||||
/// Print all metadata from an item
|
||||
Item {
|
||||
#[command(flatten)]
|
||||
cmd: item::ItemCommand,
|
||||
},
|
||||
|
||||
/// Expose a dataset via an http api
|
||||
Serve {
|
||||
#[command(flatten)]
|
||||
@@ -88,6 +95,7 @@ impl CliCmdDispatch for SubCommand {
|
||||
Self::Lookup { cmd } => cmd.start(ctx),
|
||||
Self::Fields { cmd } => cmd.start(ctx),
|
||||
Self::Probe { cmd } => cmd.start(ctx),
|
||||
Self::Item { cmd } => cmd.start(ctx),
|
||||
Self::Serve { cmd } => cmd.start(ctx),
|
||||
Self::Upload { cmd } => cmd.start(ctx),
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Args;
|
||||
use pile_config::{Label, objectpath::ObjectPath};
|
||||
use pile_config::objectpath::ObjectPath;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
@@ -10,19 +10,12 @@ use crate::{CliCmd, GlobalContext};
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct ProbeCommand {
|
||||
/// Source name (as defined in pile.toml)
|
||||
source: String,
|
||||
|
||||
/// Item key within the source
|
||||
key: String,
|
||||
/// The file to probe
|
||||
file: PathBuf,
|
||||
|
||||
/// If present, extract a specific field
|
||||
#[arg(long, short = 'p')]
|
||||
path: Option<String>,
|
||||
|
||||
/// Path to dataset config
|
||||
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||
config: PathBuf,
|
||||
}
|
||||
|
||||
impl CliCmd for ProbeCommand {
|
||||
@@ -33,35 +26,37 @@ impl CliCmd for ProbeCommand {
|
||||
_ctx: GlobalContext,
|
||||
_flag: CancelFlag,
|
||||
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||
let source = Label::new(&self.source)
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
|
||||
|
||||
let ds = Datasets::open(&self.config)
|
||||
let ds = Datasets::virt(".")
|
||||
.await
|
||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||
.with_context(|| "while opening virtual dataset".to_owned())?;
|
||||
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
let key = self.file.to_str().context("path is not utf-8")?;
|
||||
|
||||
let json = if let Some(path_str) = self.path {
|
||||
let path: ObjectPath = path_str
|
||||
.parse()
|
||||
.with_context(|| format!("invalid path {path_str:?}"))?;
|
||||
|
||||
ds.get_field(&state, &source, &self.key, &path)
|
||||
ds.get_field(
|
||||
&state,
|
||||
&Datasets::virt_source(),
|
||||
self.file.to_str().context("path is not utf-8")?,
|
||||
&path,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("while extracting {}", self.key))?
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
|
||||
})?
|
||||
.with_context(|| format!("while extracting {key}"))?
|
||||
.ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?
|
||||
} else {
|
||||
let item = ds.get(&source, &self.key).await.ok_or_else(|| {
|
||||
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
|
||||
})?;
|
||||
let item = ds
|
||||
.get(&Datasets::virt_source(), key)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("{key:?} not found"))?;
|
||||
|
||||
let item = PileValue::Item(item);
|
||||
item.to_json(&state)
|
||||
.await
|
||||
.with_context(|| format!("while extracting {}", self.key))?
|
||||
.with_context(|| format!("while extracting {key}"))?
|
||||
};
|
||||
|
||||
let json = serde_json::to_string_pretty(&json).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user