TMP sidecars
This commit is contained in:
@@ -1,12 +1,13 @@
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{collections::HashMap, fmt::Debug, path::PathBuf};
|
use std::{collections::HashMap, fmt::Debug, path::PathBuf};
|
||||||
|
|
||||||
|
use crate::{objectpath::ObjectPath, pattern::GroupPattern};
|
||||||
|
|
||||||
mod misc;
|
mod misc;
|
||||||
pub use misc::*;
|
pub use misc::*;
|
||||||
|
|
||||||
use crate::objectpath::ObjectPath;
|
|
||||||
|
|
||||||
pub mod objectpath;
|
pub mod objectpath;
|
||||||
|
pub mod pattern;
|
||||||
|
|
||||||
pub static INIT_DB_TOML: &str = include_str!("./config.toml");
|
pub static INIT_DB_TOML: &str = include_str!("./config.toml");
|
||||||
|
|
||||||
@@ -59,13 +60,9 @@ pub enum Source {
|
|||||||
/// Must be relative.
|
/// Must be relative.
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
|
||||||
/// If true, all toml files are ignored.
|
/// How to group files into items in this source
|
||||||
/// Metadata can be added to any file using a {filename}.toml.
|
#[serde(default)]
|
||||||
///
|
pattern: GroupPattern,
|
||||||
/// If false, toml files are treated as regular files
|
|
||||||
/// and sidecar metadata is disabled.
|
|
||||||
#[serde(default = "default_true")]
|
|
||||||
sidecars: bool,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/// An S3-compatible object store bucket
|
/// An S3-compatible object store bucket
|
||||||
@@ -84,9 +81,9 @@ pub enum Source {
|
|||||||
|
|
||||||
credentials: S3Credentials,
|
credentials: S3Credentials,
|
||||||
|
|
||||||
/// If true, all .toml objects are treated as sidecar metadata files.
|
/// How to group files into items in this source
|
||||||
#[serde(default = "default_true")]
|
#[serde(default)]
|
||||||
sidecars: bool,
|
pattern: GroupPattern,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ pub enum PathSegment {
|
|||||||
/// - `$` refers to the root object
|
/// - `$` refers to the root object
|
||||||
/// - `.<name>` selects aPathSegment::Field of an object
|
/// - `.<name>` selects aPathSegment::Field of an object
|
||||||
/// - `[n]` selects an item of an array
|
/// - `[n]` selects an item of an array
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct ObjectPath {
|
pub struct ObjectPath {
|
||||||
pub segments: Vec<PathSegment>,
|
pub segments: Vec<PathSegment>,
|
||||||
}
|
}
|
||||||
|
|||||||
49
crates/pile-config/src/pattern/mod.rs
Normal file
49
crates/pile-config/src/pattern/mod.rs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Deserializer, de};
|
||||||
|
use smartstring::{LazyCompact, SmartString};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
use crate::{Label, objectpath::PathParseError as ObjectPathError};
|
||||||
|
|
||||||
|
mod parser;
|
||||||
|
pub use parser::GroupSegment;
|
||||||
|
|
||||||
|
#[derive(Debug, Error, PartialEq)]
|
||||||
|
pub enum GroupPatternParseError {
|
||||||
|
/// A `{` or `}` appeared in an invalid position, or a `{` was never closed.
|
||||||
|
#[error("syntax error at index {position}")]
|
||||||
|
Syntax { position: usize },
|
||||||
|
|
||||||
|
/// The contents of a `{...}` block could not be parsed as an object path.
|
||||||
|
#[error("invalid object path {path:?}: {source}")]
|
||||||
|
InvalidObjectPath {
|
||||||
|
start: usize,
|
||||||
|
end: usize,
|
||||||
|
path: SmartString<LazyCompact>,
|
||||||
|
source: ObjectPathError,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct GroupPattern {
|
||||||
|
pub parts: HashMap<Label, Vec<GroupSegment>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'de> Deserialize<'de> for GroupPattern {
|
||||||
|
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||||
|
let raw = HashMap::<String, String>::deserialize(deserializer)?;
|
||||||
|
let mut parts = HashMap::with_capacity(raw.len());
|
||||||
|
for (key, value) in raw {
|
||||||
|
let label = Label::try_from(key.as_str()).map_err(de::Error::custom)?;
|
||||||
|
let segments = parser::Parser::new()
|
||||||
|
.parse(&value)
|
||||||
|
.map_err(de::Error::custom)?
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, seg)| seg)
|
||||||
|
.collect();
|
||||||
|
parts.insert(label, segments);
|
||||||
|
}
|
||||||
|
Ok(GroupPattern { parts })
|
||||||
|
}
|
||||||
|
}
|
||||||
195
crates/pile-config/src/pattern/parser.rs
Normal file
195
crates/pile-config/src/pattern/parser.rs
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
use smartstring::{LazyCompact, SmartString};
|
||||||
|
|
||||||
|
use crate::{objectpath::ObjectPath, pattern::GroupPatternParseError};
|
||||||
|
|
||||||
|
#[cfg_attr(test, derive(PartialEq))]
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum GroupSegment {
|
||||||
|
Path(ObjectPath),
|
||||||
|
Literal(SmartString<LazyCompact>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Parser {}
|
||||||
|
|
||||||
|
impl Parser {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a pattern string of the form `{path}.literal{path}...`.
|
||||||
|
///
|
||||||
|
/// - `{...}` delimiters are parsed as [`ObjectPath`] expressions.
|
||||||
|
/// Nested `{}` inside a path are allowed; depth is tracked to find the
|
||||||
|
/// matching closing brace.
|
||||||
|
/// - Everything outside `{...}` is a `Literal` segment.
|
||||||
|
/// - A bare `}` in literal position (depth == 0) is a syntax error.
|
||||||
|
/// - An unclosed `{` is a syntax error.
|
||||||
|
pub fn parse(self, source: &str) -> Result<Vec<(usize, GroupSegment)>, GroupPatternParseError> {
|
||||||
|
let mut tokens = Vec::new();
|
||||||
|
|
||||||
|
// `depth` > 0 means we are currently inside a `{...}` path expression.
|
||||||
|
let mut depth: usize = 0;
|
||||||
|
// Start of the current segment (literal text or path content).
|
||||||
|
let mut window_start: usize = 0;
|
||||||
|
// Source position of the opening `{` for the current path (used for error reporting).
|
||||||
|
let mut open_brace: usize = 0;
|
||||||
|
|
||||||
|
for (i, c) in source.char_indices() {
|
||||||
|
match c {
|
||||||
|
'{' => {
|
||||||
|
if depth == 0 {
|
||||||
|
// Emit any accumulated literal.
|
||||||
|
if i > window_start {
|
||||||
|
tokens.push((
|
||||||
|
window_start,
|
||||||
|
GroupSegment::Literal(source[window_start..i].into()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
open_brace = i;
|
||||||
|
// Path content starts after the opening brace.
|
||||||
|
window_start = i + 1;
|
||||||
|
depth = 1;
|
||||||
|
} else {
|
||||||
|
// Nested brace inside a path — keep counting.
|
||||||
|
depth += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
'}' => {
|
||||||
|
if depth == 0 {
|
||||||
|
// Unmatched `}` outside any path.
|
||||||
|
return Err(GroupPatternParseError::Syntax { position: i });
|
||||||
|
}
|
||||||
|
depth -= 1;
|
||||||
|
if depth == 0 {
|
||||||
|
// Closing brace of the outermost path expression — parse as ObjectPath.
|
||||||
|
let path_str = &source[window_start..i];
|
||||||
|
let path = path_str.parse::<ObjectPath>().map_err(|e| {
|
||||||
|
GroupPatternParseError::InvalidObjectPath {
|
||||||
|
start: open_brace,
|
||||||
|
end: i + 1,
|
||||||
|
path: path_str.into(),
|
||||||
|
source: e,
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
tokens.push((open_brace, GroupSegment::Path(path)));
|
||||||
|
// Literal content (if any) starts after this `}`.
|
||||||
|
window_start = i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unclosed `{`.
|
||||||
|
if depth > 0 {
|
||||||
|
return Err(GroupPatternParseError::Syntax {
|
||||||
|
position: open_brace,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit any trailing literal.
|
||||||
|
if window_start < source.len() {
|
||||||
|
tokens.push((
|
||||||
|
window_start,
|
||||||
|
GroupSegment::Literal(source[window_start..].into()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(tokens)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: tests
|
||||||
|
//
|
||||||
|
|
||||||
|
#[expect(clippy::unwrap_used)]
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn parse(source: &str) -> Result<Vec<(usize, GroupSegment)>, GroupPatternParseError> {
|
||||||
|
Parser::new().parse(source)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn path(s: &str) -> GroupSegment {
|
||||||
|
GroupSegment::Path(s.parse().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lit(s: &str) -> GroupSegment {
|
||||||
|
GroupSegment::Literal(s.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn single_path() {
|
||||||
|
assert_eq!(parse("{$.foo}").unwrap(), vec![(0, path("$.foo"))]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn single_literal() {
|
||||||
|
assert_eq!(parse("hello").unwrap(), vec![(0, lit("hello"))]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn path_then_literal() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("{$.foo}.txt").unwrap(),
|
||||||
|
vec![(0, path("$.foo")), (7, lit(".txt"))]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn literal_then_path() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("prefix/{$.foo}").unwrap(),
|
||||||
|
vec![(0, lit("prefix/")), (7, path("$.foo"))]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn interleaved() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("{$.a}.sep.{$.b}").unwrap(),
|
||||||
|
vec![(0, path("$.a")), (5, lit(".sep.")), (10, path("$.b")),]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unmatched_open_brace_error() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("{$.foo"),
|
||||||
|
Err(GroupPatternParseError::Syntax { position: 0 })
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unmatched_close_brace_in_literal_error() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("foo}bar"),
|
||||||
|
Err(GroupPatternParseError::Syntax { position: 3 })
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn invalid_path_error() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("{not-a-path}"),
|
||||||
|
Err(GroupPatternParseError::InvalidObjectPath {
|
||||||
|
start: 0,
|
||||||
|
end: 12,
|
||||||
|
path: "not-a-path".into(),
|
||||||
|
source: crate::objectpath::PathParseError::MustStartWithRoot { position: 0 },
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn literal_between_paths() {
|
||||||
|
assert_eq!(
|
||||||
|
parse("foo{$.x}bar").unwrap(),
|
||||||
|
vec![(0, lit("foo")), (3, path("$.x")), (8, lit("bar")),]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -118,7 +118,7 @@ impl Datasets {
|
|||||||
Source::Filesystem {
|
Source::Filesystem {
|
||||||
enabled,
|
enabled,
|
||||||
path,
|
path,
|
||||||
sidecars,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
if !enabled {
|
||||||
continue;
|
continue;
|
||||||
@@ -129,7 +129,7 @@ impl Datasets {
|
|||||||
Dataset::Dir(Arc::new(DirDataSource::new(
|
Dataset::Dir(Arc::new(DirDataSource::new(
|
||||||
label,
|
label,
|
||||||
path_parent.join(path),
|
path_parent.join(path),
|
||||||
*sidecars,
|
pattern.clone(),
|
||||||
))),
|
))),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -141,7 +141,7 @@ impl Datasets {
|
|||||||
endpoint,
|
endpoint,
|
||||||
region,
|
region,
|
||||||
credentials,
|
credentials,
|
||||||
sidecars,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
if !enabled {
|
||||||
continue;
|
continue;
|
||||||
@@ -154,7 +154,7 @@ impl Datasets {
|
|||||||
endpoint.clone(),
|
endpoint.clone(),
|
||||||
region.clone(),
|
region.clone(),
|
||||||
credentials,
|
credentials,
|
||||||
*sidecars,
|
pattern.clone(),
|
||||||
) {
|
) {
|
||||||
Ok(ds) => {
|
Ok(ds) => {
|
||||||
sources.insert(label.clone(), Dataset::S3(Arc::new(ds)));
|
sources.insert(label.clone(), Dataset::S3(Arc::new(ds)));
|
||||||
|
|||||||
@@ -25,9 +25,6 @@ mod toml;
|
|||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
pub use toml::*;
|
pub use toml::*;
|
||||||
|
|
||||||
mod sidecar;
|
|
||||||
pub use sidecar::*;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::{
|
extract::{
|
||||||
misc::MapExtractor,
|
misc::MapExtractor,
|
||||||
@@ -77,10 +74,6 @@ impl ItemExtractor {
|
|||||||
Label::new("toml").unwrap(),
|
Label::new("toml").unwrap(),
|
||||||
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
|
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
|
||||||
),
|
),
|
||||||
(
|
|
||||||
Label::new("sidecar").unwrap(),
|
|
||||||
PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))),
|
|
||||||
),
|
|
||||||
]),
|
]),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -109,7 +102,6 @@ impl ObjectExtractor for ItemExtractor {
|
|||||||
Label::new("exif").unwrap(),
|
Label::new("exif").unwrap(),
|
||||||
Label::new("pdf").unwrap(),
|
Label::new("pdf").unwrap(),
|
||||||
Label::new("json").unwrap(),
|
Label::new("json").unwrap(),
|
||||||
Label::new("sidecar").unwrap(),
|
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,57 +0,0 @@
|
|||||||
use pile_config::Label;
|
|
||||||
use std::sync::OnceLock;
|
|
||||||
use tracing::trace;
|
|
||||||
|
|
||||||
use super::TomlExtractor;
|
|
||||||
use crate::{
|
|
||||||
extract::traits::{ExtractState, ObjectExtractor},
|
|
||||||
value::{Item, PileValue},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct SidecarExtractor {
|
|
||||||
item: Item,
|
|
||||||
output: OnceLock<Option<TomlExtractor>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SidecarExtractor {
|
|
||||||
pub fn new(item: &Item) -> Self {
|
|
||||||
Self {
|
|
||||||
item: item.clone(),
|
|
||||||
output: OnceLock::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl ObjectExtractor for SidecarExtractor {
|
|
||||||
async fn field(
|
|
||||||
&self,
|
|
||||||
state: &ExtractState,
|
|
||||||
name: &Label,
|
|
||||||
args: Option<&str>,
|
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
|
||||||
trace!(
|
|
||||||
?args,
|
|
||||||
key = self.item.key().as_str(),
|
|
||||||
"Getting field {name:?} from SidecarExtractor",
|
|
||||||
);
|
|
||||||
|
|
||||||
match self
|
|
||||||
.output
|
|
||||||
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
|
||||||
{
|
|
||||||
Some(x) => Ok(x.field(state, name, args).await?),
|
|
||||||
None => Ok(Some(PileValue::Null)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
|
||||||
match self
|
|
||||||
.output
|
|
||||||
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
|
||||||
{
|
|
||||||
Some(x) => Ok(x.fields().await?),
|
|
||||||
None => Ok(Vec::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use pile_config::Label;
|
use pile_config::{Label, pattern::GroupPattern};
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
@@ -13,16 +13,15 @@ use crate::{
|
|||||||
pub struct DirDataSource {
|
pub struct DirDataSource {
|
||||||
pub name: Label,
|
pub name: Label,
|
||||||
pub dir: PathBuf,
|
pub dir: PathBuf,
|
||||||
|
pub pattern: GroupPattern,
|
||||||
pub sidecars: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DirDataSource {
|
impl DirDataSource {
|
||||||
pub fn new(name: &Label, dir: PathBuf, sidecars: bool) -> Self {
|
pub fn new(name: &Label, dir: PathBuf, pattern: GroupPattern) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
dir,
|
dir,
|
||||||
sidecars,
|
pattern,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -38,29 +37,11 @@ impl DataSource for Arc<DirDataSource> {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore toml files if sidecars are enabled
|
|
||||||
if self.sidecars && key.extension().and_then(|x| x.to_str()) == Some("toml") {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(Some(Item::File {
|
return Ok(Some(Item::File {
|
||||||
source: Arc::clone(self),
|
source: Arc::clone(self),
|
||||||
mime: mime_guess::from_path(&key).first_or_octet_stream(),
|
mime: mime_guess::from_path(&key).first_or_octet_stream(),
|
||||||
path: key.clone(),
|
path: key.clone(),
|
||||||
sidecar: self
|
group: todo!(),
|
||||||
.sidecars
|
|
||||||
.then(|| {
|
|
||||||
let sidecar_path = key.with_extension("toml");
|
|
||||||
sidecar_path.is_file().then(|| {
|
|
||||||
Box::new(Item::File {
|
|
||||||
source: Arc::clone(self),
|
|
||||||
mime: mime_guess::from_path(&sidecar_path).first_or_octet_stream(),
|
|
||||||
path: sidecar_path,
|
|
||||||
sidecar: None,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.flatten(),
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,27 +72,12 @@ impl DataSource for Arc<DirDataSource> {
|
|||||||
|
|
||||||
let item = match path.extension().and_then(|x| x.to_str()) {
|
let item = match path.extension().and_then(|x| x.to_str()) {
|
||||||
None => continue,
|
None => continue,
|
||||||
Some("toml") if source.sidecars => continue,
|
|
||||||
Some(_) => Item::File {
|
Some(_) => Item::File {
|
||||||
source: Arc::clone(&source),
|
source: Arc::clone(&source),
|
||||||
mime: mime_guess::from_path(&path).first_or_octet_stream(),
|
mime: mime_guess::from_path(&path).first_or_octet_stream(),
|
||||||
path: path.clone(),
|
path: path.clone(),
|
||||||
|
|
||||||
sidecar: source
|
group: todo!(),
|
||||||
.sidecars
|
|
||||||
.then(|| {
|
|
||||||
let sidecar_path = path.with_extension("toml");
|
|
||||||
sidecar_path.is_file().then(|| {
|
|
||||||
Box::new(Item::File {
|
|
||||||
source: Arc::clone(&source),
|
|
||||||
mime: mime_guess::from_path(&sidecar_path)
|
|
||||||
.first_or_octet_stream(),
|
|
||||||
path: sidecar_path,
|
|
||||||
sidecar: None,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.flatten(),
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use pile_config::{Label, S3Credentials};
|
use pile_config::{Label, S3Credentials, pattern::GroupPattern};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
@@ -12,8 +12,8 @@ pub struct S3DataSource {
|
|||||||
pub name: Label,
|
pub name: Label,
|
||||||
pub bucket: SmartString<LazyCompact>,
|
pub bucket: SmartString<LazyCompact>,
|
||||||
pub prefix: Option<SmartString<LazyCompact>>,
|
pub prefix: Option<SmartString<LazyCompact>>,
|
||||||
pub sidecars: bool,
|
|
||||||
pub client: Arc<aws_sdk_s3::Client>,
|
pub client: Arc<aws_sdk_s3::Client>,
|
||||||
|
pub pattern: GroupPattern,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl S3DataSource {
|
impl S3DataSource {
|
||||||
@@ -24,7 +24,7 @@ impl S3DataSource {
|
|||||||
endpoint: Option<String>,
|
endpoint: Option<String>,
|
||||||
region: String,
|
region: String,
|
||||||
credentials: &S3Credentials,
|
credentials: &S3Credentials,
|
||||||
sidecars: bool,
|
pattern: GroupPattern,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
let client = {
|
let client = {
|
||||||
let creds = Credentials::new(
|
let creds = Credentials::new(
|
||||||
@@ -51,8 +51,8 @@ impl S3DataSource {
|
|||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
bucket: bucket.into(),
|
bucket: bucket.into(),
|
||||||
prefix: prefix.map(|x| x.into()),
|
prefix: prefix.map(|x| x.into()),
|
||||||
sidecars,
|
|
||||||
client: Arc::new(client),
|
client: Arc::new(client),
|
||||||
|
pattern,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,36 +99,17 @@ impl S3DataSource {
|
|||||||
|
|
||||||
let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream();
|
let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream();
|
||||||
|
|
||||||
let sidecar = if self.sidecars {
|
|
||||||
self.find_sidecar_key(object_path.as_str())
|
|
||||||
.await
|
|
||||||
.map(|sidecar_key| {
|
|
||||||
Box::new(Item::S3 {
|
|
||||||
source: Arc::clone(self),
|
|
||||||
mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(),
|
|
||||||
key: sidecar_key,
|
|
||||||
sidecar: None,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
Item::S3 {
|
Item::S3 {
|
||||||
source: Arc::clone(self),
|
source: Arc::clone(self),
|
||||||
mime,
|
mime,
|
||||||
key,
|
key,
|
||||||
sidecar,
|
group: todo!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DataSource for Arc<S3DataSource> {
|
impl DataSource for Arc<S3DataSource> {
|
||||||
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
|
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
|
||||||
if self.sidecars && key.ends_with(".toml") {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
let key: SmartString<LazyCompact> = key.into();
|
let key: SmartString<LazyCompact> = key.into();
|
||||||
let key = match &self.prefix {
|
let key = match &self.prefix {
|
||||||
Some(x) => format!("{x}/{key}").into(),
|
Some(x) => format!("{x}/{key}").into(),
|
||||||
@@ -196,10 +177,6 @@ impl DataSource for Arc<S3DataSource> {
|
|||||||
None => continue,
|
None => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
if source.sidecars && key.ends_with(".toml") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let item = source.make_item(key).await;
|
let item = source.make_item(key).await;
|
||||||
|
|
||||||
if tx.send(Ok(item)).await.is_err() {
|
if tx.send(Ok(item)).await.is_err() {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
|
use pile_config::Label;
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{fs::File, path::PathBuf, sync::Arc};
|
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
source::{DirDataSource, S3DataSource},
|
source::{DirDataSource, S3DataSource},
|
||||||
@@ -19,7 +20,7 @@ pub enum Item {
|
|||||||
mime: Mime,
|
mime: Mime,
|
||||||
|
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
sidecar: Option<Box<Item>>,
|
group: Arc<HashMap<Label, Box<Item>>>,
|
||||||
},
|
},
|
||||||
|
|
||||||
S3 {
|
S3 {
|
||||||
@@ -27,7 +28,7 @@ pub enum Item {
|
|||||||
mime: Mime,
|
mime: Mime,
|
||||||
|
|
||||||
key: SmartString<LazyCompact>,
|
key: SmartString<LazyCompact>,
|
||||||
sidecar: Option<Box<Item>>,
|
group: Arc<HashMap<Label, Box<Item>>>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,10 +97,10 @@ impl Item {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sidecar(&self) -> Option<&Self> {
|
pub fn group(&self) -> &HashMap<Label, Box<Self>> {
|
||||||
match self {
|
match self {
|
||||||
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
|
Self::File { group, .. } => group,
|
||||||
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
|
Self::S3 { group, .. } => group,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,108 +0,0 @@
|
|||||||
use anyhow::{Context, Result};
|
|
||||||
use clap::Args;
|
|
||||||
use pile_config::{Label, Source};
|
|
||||||
use pile_dataset::{Datasets, index::DbFtsIndex};
|
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
|
||||||
use pile_value::{
|
|
||||||
extract::traits::ExtractState,
|
|
||||||
source::{DataSource, DirDataSource},
|
|
||||||
value::{Item, PileValue},
|
|
||||||
};
|
|
||||||
use std::{path::PathBuf, sync::Arc};
|
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tracing::{info, warn};
|
|
||||||
|
|
||||||
use crate::{CliCmd, GlobalContext};
|
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
|
||||||
pub struct AnnotateCommand {
|
|
||||||
/// The schema field to read (must be defined in pile.toml)
|
|
||||||
field: String,
|
|
||||||
|
|
||||||
/// Sidecar path to write to (e.g. meta.title)
|
|
||||||
dest: String,
|
|
||||||
|
|
||||||
/// Path to dataset config
|
|
||||||
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
|
||||||
config: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AnnotateCommand {
|
|
||||||
fn parse_dest(dest: &str) -> Result<Vec<Label>> {
|
|
||||||
dest.split('.')
|
|
||||||
.map(|s| {
|
|
||||||
Label::new(s).ok_or_else(|| anyhow::anyhow!("invalid label {s:?} in dest path"))
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CliCmd for AnnotateCommand {
|
|
||||||
async fn run(
|
|
||||||
self,
|
|
||||||
_ctx: GlobalContext,
|
|
||||||
_flag: CancelFlag,
|
|
||||||
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
|
||||||
let field = Label::new(&self.field)
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("invalid field name {:?}", self.field))?;
|
|
||||||
let dest_path = Self::parse_dest(&self.dest)?;
|
|
||||||
|
|
||||||
let state = ExtractState { ignore_mime: false };
|
|
||||||
let ds = Datasets::open(&self.config)
|
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
|
||||||
|
|
||||||
if !ds.config.schema.contains_key(&field) {
|
|
||||||
return Err(anyhow::anyhow!("field {:?} is not defined in schema", self.field).into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = DbFtsIndex::new(&ds.path_workdir, &ds.config);
|
|
||||||
let count = 0u64;
|
|
||||||
|
|
||||||
for (name, source) in &ds.config.dataset.source {
|
|
||||||
match source {
|
|
||||||
Source::Filesystem { path, sidecars, .. } => {
|
|
||||||
if !sidecars {
|
|
||||||
warn!("Source {name} does not have sidecars enabled, skipping");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let source = Arc::new(DirDataSource::new(name, path.clone(), *sidecars));
|
|
||||||
|
|
||||||
let mut stream = source.iter();
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let item = res.with_context(|| format!("while reading source {name}"))?;
|
|
||||||
|
|
||||||
let Item::File { path, .. } = &item else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let item = PileValue::Item(item.clone());
|
|
||||||
let vals =
|
|
||||||
index
|
|
||||||
.get_field(&state, &item, &field)
|
|
||||||
.await
|
|
||||||
.with_context(|| {
|
|
||||||
format!("while extracting field from {}", path.display())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// TODO: implement sidecar writing
|
|
||||||
let _ = (&dest_path, &vals);
|
|
||||||
todo!("write_sidecar not yet implemented");
|
|
||||||
|
|
||||||
#[expect(unreachable_code)]
|
|
||||||
{
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Source::S3 { .. } => {
|
|
||||||
warn!("Source {name} is an S3 source; sidecar annotation is not yet supported");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Annotated {count} items");
|
|
||||||
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -4,7 +4,6 @@ use pile_toolbox::cancelabletask::{
|
|||||||
CancelFlag, CancelableTask, CancelableTaskError, CancelableTaskResult,
|
CancelFlag, CancelableTask, CancelableTaskError, CancelableTaskResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod annotate;
|
|
||||||
mod check;
|
mod check;
|
||||||
mod fields;
|
mod fields;
|
||||||
mod index;
|
mod index;
|
||||||
@@ -23,12 +22,6 @@ pub enum SubCommand {
|
|||||||
#[clap(alias = "doc")]
|
#[clap(alias = "doc")]
|
||||||
Docs {},
|
Docs {},
|
||||||
|
|
||||||
/// Annotate all items with a field, writing it to a sidecar path
|
|
||||||
Annotate {
|
|
||||||
#[command(flatten)]
|
|
||||||
cmd: annotate::AnnotateCommand,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Create an empty dataset
|
/// Create an empty dataset
|
||||||
Init {
|
Init {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
@@ -88,7 +81,6 @@ pub enum SubCommand {
|
|||||||
impl CliCmdDispatch for SubCommand {
|
impl CliCmdDispatch for SubCommand {
|
||||||
fn start(self, ctx: GlobalContext) -> Result<CancelableTask<Result<i32>>> {
|
fn start(self, ctx: GlobalContext) -> Result<CancelableTask<Result<i32>>> {
|
||||||
match self {
|
match self {
|
||||||
Self::Annotate { cmd } => cmd.start(ctx),
|
|
||||||
Self::Init { cmd } => cmd.start(ctx),
|
Self::Init { cmd } => cmd.start(ctx),
|
||||||
Self::Check { cmd } => cmd.start(ctx),
|
Self::Check { cmd } => cmd.start(ctx),
|
||||||
Self::Index { cmd } => cmd.start(ctx),
|
Self::Index { cmd } => cmd.start(ctx),
|
||||||
|
|||||||
Reference in New Issue
Block a user