Compare commits

...

10 Commits

Author SHA1 Message Date
95a547045d Refactor errors
Some checks failed
CI / Typos (push) Successful in 28s
CI / Clippy (push) Failing after 1m21s
CI / Build and test (all features) (push) Successful in 4m18s
CI / Build and test (push) Successful in 6m10s
2026-03-12 23:04:59 -07:00
15e56d895c FLAC image extractor tweak 2026-03-12 11:50:02 -07:00
f3bb1a265e Improve arg parsing
Some checks failed
CI / Typos (push) Successful in 20s
CI / Build and test (push) Successful in 2m28s
CI / Clippy (push) Failing after 2m50s
CI / Build and test (all features) (push) Successful in 7m27s
2026-03-11 12:54:02 -07:00
8a9388020c Fix tests
Some checks are pending
CI / Typos (push) Successful in 22s
CI / Clippy (push) Has started running
CI / Build and test (push) Has started running
CI / Build and test (all features) (push) Has started running
2026-03-11 12:24:45 -07:00
82dcdbaa27 Bump versions
Some checks failed
CI / Build and test (all features) (push) Waiting to run
CI / Typos (push) Successful in 41s
CI / Clippy (push) Failing after 1m28s
CI / Build and test (push) Has been cancelled
2026-03-11 12:22:02 -07:00
42f186d77f More string extractors 2026-03-11 12:22:02 -07:00
b36b62150c Add nix files 2026-03-11 12:21:58 -07:00
c03fac0e37 /item range requests 2026-03-11 11:05:32 -07:00
c9d99e8719 Stream items in /item 2026-03-11 11:05:32 -07:00
4546a85bd3 Extractor rewrite 2026-03-11 11:05:30 -07:00
60 changed files with 1704 additions and 1017 deletions

13
.editorconfig Normal file
View File

@@ -0,0 +1,13 @@
root = true
[*]
indent_style = tab
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
[*.yml]
indent_style = space
indent_size = 2

55
Cargo.lock generated
View File

@@ -2481,7 +2481,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pile"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"anstyle",
"anyhow",
@@ -2491,6 +2491,7 @@ dependencies = [
"pile-config",
"pile-dataset",
"pile-toolbox",
"pile-value",
"serde",
"serde_json",
"tokio",
@@ -2503,7 +2504,7 @@ dependencies = [
[[package]]
name = "pile-config"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"serde",
"smartstring",
@@ -2513,28 +2514,15 @@ dependencies = [
[[package]]
name = "pile-dataset"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"async-trait",
"aws-sdk-s3",
"axum",
"blake3",
"chrono",
"epub",
"id3",
"image",
"itertools 0.14.0",
"kamadak-exif",
"mime",
"mime_guess",
"pdf",
"pdfium-render",
"pile-config",
"pile-flac",
"pile-toolbox",
"pile-value",
"serde",
"serde_json",
"smartstring",
"tantivy",
"thiserror",
"tokio",
@@ -2543,12 +2531,11 @@ dependencies = [
"tracing",
"utoipa",
"utoipa-swagger-ui",
"walkdir",
]
[[package]]
name = "pile-flac"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"base64",
"itertools 0.14.0",
@@ -2562,12 +2549,40 @@ dependencies = [
[[package]]
name = "pile-toolbox"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"thiserror",
"tokio",
]
[[package]]
name = "pile-value"
version = "0.0.2"
dependencies = [
"anyhow",
"async-trait",
"aws-sdk-s3",
"blake3",
"chrono",
"epub",
"id3",
"image",
"kamadak-exif",
"mime",
"mime_guess",
"pdf",
"pdfium-render",
"pile-config",
"pile-flac",
"serde_json",
"smartstring",
"tokio",
"tokio-stream",
"toml",
"tracing",
"walkdir",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"

View File

@@ -5,7 +5,7 @@ resolver = "2"
[workspace.package]
rust-version = "1.94.0"
edition = "2024"
version = "0.0.1"
version = "0.0.2"
[workspace.lints.rust]
unused_import_braces = "deny"
@@ -67,20 +67,21 @@ pile-toolbox = { path = "crates/pile-toolbox" }
pile-config = { path = "crates/pile-config" }
pile-flac = { path = "crates/pile-flac" }
pile-dataset = { path = "crates/pile-dataset" }
pile-value = { path = "crates/pile-value" }
# Clients & servers
tantivy = "0.25.0"
axum = { version = "0.8.8", features = ["macros", "multipart"] }
utoipa = { version = "5.4.0", features = [
"axum_extras",
"chrono",
"url",
"uuid",
"axum_extras",
"chrono",
"url",
"uuid",
] }
utoipa-swagger-ui = { version = "9.0.2", features = [
"axum",
"debug-embed",
"vendored",
"axum",
"debug-embed",
"vendored",
] }
# Async & Parallelism

26
bump-version.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/env bash
set -euo pipefail
CARGO_TOML="$(dirname "$0")/Cargo.toml"
DEFAULT_NIX="$(dirname "$0")/default.nix"
# Read current version from workspace Cargo.toml
current=$(grep '^version = ' "$CARGO_TOML" | head -1 | sed 's/version = "\(.*\)"/\1/')
echo "Current version: $current"
read -rp "New version: " new
if [[ -z "$new" ]]; then
echo "No version entered. Aborting." >&2
exit 1
fi
# Update Cargo.toml workspace version
sed -i "s/^version = \"$current\"/version = \"$new\"/" "$CARGO_TOML"
# Update default.nix version field
sed -i "s/version = \"$current\";/version = \"$new\";/" "$DEFAULT_NIX"
echo "Bumped $current -> $new in:"
echo " $CARGO_TOML"
echo " $DEFAULT_NIX"

View File

@@ -9,7 +9,7 @@ name = "dataset"
# working_dir = ".pile"
# Data sources available in this dataset
source."music" = { type = "filesystem", path = "music" }
source."music" = { type = "filesystem", path = "library" }
# This dataset's schema.
# Defines normalized fields that are extracted from source entries on-demand.
@@ -21,18 +21,18 @@ source."music" = { type = "filesystem", path = "music" }
# # only text is supported in this version.
# type = "text",
#
# # An array of jsonpaths (rfc9535) used to extract this field from each source entry.
# # How to extract this field from each source entry.
# # These are evaluated in order, the first non-null value is used.
# # A single string is equivalent to an array with one element.
# path = "$.json.path"
# path = [ "$.json.path" ]
# }
[schema]
album = { type = "text", path = "$.Album" }
isrc = { type = "text", path = "$.Isrc" }
artist = { type = "text", path = ["$.Artist", "$.TrackArtist"] }
lyrics = { type = "text", path = "$.Lyrics" }
genre = { type = "text", path = "$.Genre" }
title = { type = "text", path = ["$.Title", "$.TrackTitle"] }
album = { type = "text", path = ["$.flac.album"] }
isrc = { type = "text", path = ["$.flac.isrc"] }
artist = { type = "text", path = ["$.flac.artist", "$.flac.trackartist"] }
lyrics = { type = "text", path = ["$.flac.lyrics"] }
genre = { type = "text", path = ["$.flac.genre"] }
title = { type = "text", path = ["$.flac.tracktitle", "$.flac.title"] }
# Fts configuration.
# Determines which fields (defined in `schema`) are included in the fts index.

View File

@@ -1,9 +1,6 @@
use serde::Deserialize;
use std::{collections::HashMap, fmt::Debug, path::PathBuf};
mod post;
pub use post::*;
mod misc;
pub use misc::*;
@@ -40,10 +37,6 @@ pub struct DatasetConfig {
/// Where to find this field
pub source: HashMap<Label, Source>,
/// How to post-process this field
#[serde(default)]
pub post: Vec<FieldSpecPost>,
}
#[derive(Debug, Clone, Deserialize)]
@@ -100,10 +93,6 @@ pub struct FieldSpec {
/// How to find this field in a data entry
pub path: Vec<ObjectPath>,
/// How to post-process this field
#[serde(default)]
pub post: Vec<FieldSpecPost>,
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]

View File

@@ -41,8 +41,11 @@ pub enum PathSegment {
/// Go to root node (`$` identifier)
Root,
/// Go to a child of the current object
Field(Label),
/// Go to a child of the current object.
Field {
name: Label,
args: Option<SmartString<LazyCompact>>,
},
/// Go to an element of the current list
Index(i64),

View File

@@ -1,10 +1,80 @@
use std::str::FromStr;
use smartstring::{LazyCompact, SmartString};
use crate::{
Label,
objectpath::{PathParseError, PathSegment, tokenizer::Token},
};
/// Parse an ident token into a `PathSegment::Field`, handling optional args of
/// the form `name(args)`. Parens inside args may be nested; `\(` and `\)` are
/// escaped and do not affect depth counting.
fn parse_field(ident: &str, position: usize) -> Result<PathSegment, PathParseError> {
let bytes = ident.as_bytes();
let mut i = 0;
// Find the first unescaped '(' — everything before it is the name.
let open_paren: Option<usize> = loop {
if i >= bytes.len() {
break None;
}
match bytes[i] {
b'\\' => i += 2, // skip escaped character
b'(' => break Some(i),
_ => i += 1,
}
};
let name_str = &ident[..open_paren.unwrap_or(bytes.len())];
let name = Label::new(name_str).ok_or_else(|| PathParseError::InvalidField {
position,
str: name_str.into(),
})?;
let Some(open_pos) = open_paren else {
return Ok(PathSegment::Field { name, args: None });
};
// Scan args, tracking paren depth.
let args_start = open_pos + 1;
let mut depth: usize = 1;
let mut j = args_start;
while j < bytes.len() {
match bytes[j] {
b'\\' => j += 2, // skip escaped character
b'(' => {
depth += 1;
j += 1;
}
b')' => {
depth -= 1;
if depth == 0 {
// Closing paren must be the last character.
if j + 1 != bytes.len() {
return Err(PathParseError::Syntax {
position: position + j + 1,
});
}
let args: SmartString<LazyCompact> = ident[args_start..j].into();
return Ok(PathSegment::Field {
name,
args: Some(args),
});
}
j += 1;
}
_ => j += 1,
}
}
// Reached end of ident without finding the matching ')'.
Err(PathParseError::Syntax {
position: position + ident.len(),
})
}
enum State {
Start,
@@ -72,14 +142,7 @@ impl Parser {
// MARK: dot
//
(State::Dot, (p, Token::Ident(ident))) => {
self.segments
.push(PathSegment::Field(Label::new(*ident).ok_or_else(|| {
PathParseError::InvalidField {
position: *p,
str: (*ident).into(),
}
})?));
self.segments.push(parse_field(ident, *p)?);
self.state = State::Selected;
}
@@ -161,27 +224,30 @@ mod tests {
parse_test("$", Ok(&[PathSegment::Root]));
}
fn field(name: &str) -> PathSegment {
PathSegment::Field {
name: Label::new(name).unwrap(),
args: None,
}
}
fn field_args(name: &str, args: &str) -> PathSegment {
PathSegment::Field {
name: Label::new(name).unwrap(),
args: Some(args.into()),
}
}
#[test]
fn single_field() {
parse_test(
"$.foo",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("foo").unwrap()),
]),
);
parse_test("$.foo", Ok(&[PathSegment::Root, field("foo")]));
}
#[test]
fn nested_fields() {
parse_test(
"$.foo.bar.baz",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("foo").unwrap()),
PathSegment::Field(Label::new("bar").unwrap()),
PathSegment::Field(Label::new("baz").unwrap()),
]),
Ok(&[PathSegment::Root, field("foo"), field("bar"), field("baz")]),
);
}
@@ -189,11 +255,7 @@ mod tests {
fn array_index() {
parse_test(
"$.items[0]",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("items").unwrap()),
PathSegment::Index(0),
]),
Ok(&[PathSegment::Root, field("items"), PathSegment::Index(0)]),
);
}
@@ -203,7 +265,7 @@ mod tests {
"$.a[1][2]",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("a").unwrap()),
field("a"),
PathSegment::Index(1),
PathSegment::Index(2),
]),
@@ -216,9 +278,9 @@ mod tests {
"$.a[0].b",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("a").unwrap()),
field("a"),
PathSegment::Index(0),
PathSegment::Field(Label::new("b").unwrap()),
field("b"),
]),
);
}
@@ -227,14 +289,94 @@ mod tests {
fn negative_index() {
parse_test(
"$.a[-1]",
Ok(&[PathSegment::Root, field("a"), PathSegment::Index(-1)]),
);
}
// MARK: args
#[test]
fn field_with_simple_args() {
parse_test(
"$.foo(bar)",
Ok(&[PathSegment::Root, field_args("foo", "bar")]),
);
}
#[test]
fn field_with_empty_args() {
parse_test("$.foo()", Ok(&[PathSegment::Root, field_args("foo", "")]));
}
#[test]
fn field_with_nested_parens_in_args() {
parse_test(
"$.foo(a(b)c)",
Ok(&[PathSegment::Root, field_args("foo", "a(b)c")]),
);
}
#[test]
fn field_with_deeply_nested_parens_in_args() {
parse_test(
"$.foo(a(b(c))d)",
Ok(&[PathSegment::Root, field_args("foo", "a(b(c))d")]),
);
}
#[test]
fn field_with_escaped_open_paren_in_args() {
// "$.foo(a\(b)" — '\(' is escaped, so depth never rises above 1; ')' closes it
parse_test(
r"$.foo(a\(b)",
Ok(&[PathSegment::Root, field_args("foo", r"a\(b")]),
);
}
#[test]
fn field_with_escaped_close_paren_in_args() {
// "$.foo(a\)b)" — '\)' is escaped, the second ')' closes at depth 0
parse_test(
r"$.foo(a\)b)",
Ok(&[PathSegment::Root, field_args("foo", r"a\)b")]),
);
}
#[test]
fn field_with_both_escaped_parens_in_args() {
parse_test(
r"$.foo(a\(b\)c)",
Ok(&[PathSegment::Root, field_args("foo", r"a\(b\)c")]),
);
}
#[test]
fn field_args_with_multiple_segments() {
parse_test(
"$.foo(x).bar(y)",
Ok(&[
PathSegment::Root,
PathSegment::Field(Label::new("a").unwrap()),
PathSegment::Index(-1),
field_args("foo", "x"),
field_args("bar", "y"),
]),
);
}
#[test]
fn field_args_unclosed_paren_error() {
// Missing closing ')' → Syntax error at end of source
parse_test("$.foo(bar", Err(PathParseError::Syntax { position: 9 }));
}
#[test]
fn field_args_trailing_chars_after_close_error() {
// Closing ')' is not the last char → Syntax error at the trailing char
parse_test(
"$.foo(bar)baz",
Err(PathParseError::Syntax { position: 10 }),
);
}
#[test]
fn non_ascii_error() {
parse_test(

View File

@@ -1,18 +0,0 @@
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub enum FieldSpecPost {
TrimSuffix { trim_suffix: String },
TrimPrefix { trim_prefix: String },
SetCase { case: Case },
Join { join: String },
NotEmpty { notempty: bool },
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Case {
Lower,
Upper,
}

View File

@@ -10,37 +10,23 @@ workspace = true
[dependencies]
pile-config = { workspace = true }
pile-toolbox = { workspace = true }
pile-flac = { workspace = true }
pile-value = { workspace = true }
serde_json = { workspace = true }
itertools = { workspace = true }
walkdir = { workspace = true }
tantivy = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
smartstring = { workspace = true }
blake3 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }
pdfium-render = { workspace = true, optional = true }
image = { workspace = true, optional = true }
id3 = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true }
mime = { workspace = true }
mime_guess = { workspace = true }
serde = { workspace = true }
serde = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features]
default = []
pdfium = ["dep:pdfium-render", "dep:image"]
axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"]
pdfium = ["pile-value/pdfium"]
axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde"]

View File

@@ -1,6 +1,10 @@
use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{
source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest},
value::{Item, PileValue},
};
use serde_json::Value;
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
@@ -9,13 +13,7 @@ use tokio::task::JoinSet;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{debug, info, trace, warn};
use crate::{
DataSource, Item, PileValue,
extract::MetaExtractor,
index::{DbFtsIndex, FtsLookupResult},
path_ts_earliest,
source::{DirDataSource, S3DataSource},
};
use crate::index::{DbFtsIndex, FtsLookupResult};
#[derive(Debug, Error)]
pub enum DatasetError {
@@ -183,11 +181,12 @@ impl Datasets {
let Some(item) = self.get(source, key).await else {
return Ok(None);
};
let extractor = MetaExtractor::new(&item);
let root = PileValue::ObjectExtractor(Arc::new(extractor));
let Some(value) = root.query(path).await? else {
let item = PileValue::Item(item);
let Some(value) = item.query(path).await? else {
return Ok(None);
};
Ok(Some(value.to_json().await?))
}

View File

@@ -1,162 +0,0 @@
use mime::Mime;
use pile_config::Label;
use pile_flac::{FlacBlock, FlacReader};
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use crate::{
Item, PileValue, SyncReadBridge,
extract::{ListExtractor, ObjectExtractor},
};
pub struct FlacImagesExtractor {
item: Item,
}
impl FlacImagesExtractor {
pub fn new(item: &Item) -> Self {
Self { item: item.clone() }
}
async fn get_images(&self) -> Result<Vec<PileValue>, std::io::Error> {
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_images = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut images: Vec<(Mime, Vec<u8>)> = Vec::new();
for block in reader {
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
FlacBlock::Picture(picture) => {
images.push((picture.mime, picture.img_data));
}
FlacBlock::AudioFrame(_) => break,
_ => {}
}
}
Ok::<_, std::io::Error>(images)
})
.await
.map_err(std::io::Error::other)??;
Ok(raw_images
.into_iter()
.map(|(mime, data)| PileValue::Blob {
mime,
bytes: Arc::new(data),
})
.collect())
}
}
#[async_trait::async_trait]
impl ListExtractor for FlacImagesExtractor {
async fn get<'a>(&'a self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_images().await?.into_iter().nth(idx))
}
async fn len(&self) -> Result<usize, std::io::Error> {
Ok(self.get_images().await?.len())
}
}
pub struct FlacExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
images: Option<PileValue>,
}
impl FlacExtractor {
pub fn new(item: &Item) -> Self {
let is_flac = match item {
Item::File { path, .. } => path.to_str().unwrap_or_default().ends_with(".flac"),
Item::S3 { key, .. } => key.ends_with(".flac"),
};
let images =
is_flac.then(|| PileValue::ListExtractor(Arc::new(FlacImagesExtractor::new(item))));
Self {
item: item.clone(),
output: OnceLock::new(),
images,
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let key = match &self.item {
Item::File { path, .. } => path.to_str().unwrap_or_default().to_owned(),
Item::S3 { key, .. } => key.to_string(),
};
if !key.ends_with(".flac") {
let _ = self.output.set(HashMap::new());
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_tags = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut tags: Vec<(String, String)> = Vec::new();
for block in reader {
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
FlacBlock::VorbisComment(comment) => {
for (k, v) in comment.comment.comments {
tags.push((k.to_string().to_lowercase(), v.into()));
}
}
FlacBlock::AudioFrame(_) => break,
_ => {}
}
}
Ok::<_, std::io::Error>(tags)
})
.await
.map_err(std::io::Error::other)??;
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
for (k, v) in raw_tags {
if let Some(label) = Label::new(k) {
output
.entry(label)
.or_default()
.push(PileValue::String(Arc::new(v.into())));
}
}
let output: HashMap<Label, PileValue> = output
.into_iter()
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
.collect();
let _ = self.output.set(output);
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
}
}
#[async_trait::async_trait]
impl ObjectExtractor for FlacExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
if name.as_str() == "images"
&& let Some(ref images) = self.images
{
return Ok(Some(images.clone()));
}
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
let mut fields = self.get_inner().await?.keys().cloned().collect::<Vec<_>>();
if self.images.is_some() {
#[expect(clippy::unwrap_used)]
fields.push(Label::new("images").unwrap());
}
Ok(fields)
}
}

View File

@@ -1,165 +0,0 @@
use pile_config::Label;
use std::{collections::HashMap, sync::Arc};
mod flac;
pub use flac::*;
mod id3;
pub use id3::*;
mod fs;
pub use fs::*;
mod epub;
pub use epub::*;
mod exif;
pub use exif::*;
mod pdf;
pub use pdf::*;
mod toml;
pub use toml::*;
mod map;
pub use map::*;
mod sidecar;
pub use sidecar::*;
use crate::{Item, PileValue};
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable map of {label: value},
/// much like a json object.
#[async_trait::async_trait]
pub trait ObjectExtractor: Send + Sync {
/// Get the field at `name` from `item`.
/// - returns `None` if `name` is not a valid field
/// - returns `Some(Null)` if `name` is not available
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error>;
/// Return all fields in this extractor.
/// `Self::field` must return [Some] for all these keys
/// and [None] for all others.
async fn fields(&self) -> Result<Vec<Label>, std::io::Error>;
/// Convert this to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?;
let mut map = serde_json::Map::new();
for k in &keys {
let v = match self.field(k).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Object(map))
}
}
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable list of values.
#[async_trait::async_trait]
pub trait ListExtractor: Send + Sync {
/// Get the item at index `idx`.
/// Indices start at zero, and must be consecutive.
/// - returns `None` if `idx` is out of range
/// - returns `Some(Null)` if `None` is at `idx`
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error>;
async fn len(&self) -> Result<usize, std::io::Error>;
async fn is_empty(&self) -> Result<bool, std::io::Error> {
Ok(self.len().await? == 0)
}
/// Convert this list to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let len = self.len().await?;
let mut list = Vec::with_capacity(len);
for i in 0..len {
#[expect(clippy::expect_used)]
let v = self
.get(i)
.await?
.expect("value must be present according to length");
list.push(Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Array(list))
}
}
pub struct MetaExtractor {
inner: MapExtractor,
}
impl MetaExtractor {
#[expect(clippy::unwrap_used)]
pub fn new(item: &Item) -> Self {
let inner = MapExtractor {
inner: HashMap::from([
(
Label::new("flac").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))),
),
(
Label::new("id3").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))),
),
(
Label::new("fs").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))),
),
(
Label::new("epub").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))),
),
(
Label::new("exif").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))),
),
(
Label::new("pdf").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))),
),
(
Label::new("toml").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
),
(
Label::new("sidecar").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))),
),
]),
};
Self { inner }
}
}
#[async_trait::async_trait]
impl ObjectExtractor for MetaExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
self.inner.field(name).await
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
return Ok(vec![
Label::new("flac").unwrap(),
Label::new("id3").unwrap(),
Label::new("fs").unwrap(),
Label::new("epub").unwrap(),
Label::new("exif").unwrap(),
Label::new("pdf").unwrap(),
Label::new("sidecar").unwrap(),
]);
}
}

View File

@@ -1,95 +0,0 @@
use image::ImageFormat;
use pdfium_render::prelude::*;
use pile_config::Label;
use std::{
collections::HashMap,
io::{BufReader, Cursor},
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct PdfCoverExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl PdfCoverExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let cover = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let pdfium = Pdfium::default();
let document = pdfium
.load_pdf_from_byte_slice(&bytes, None)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let render_config = PdfRenderConfig::new().set_target_width(1024);
let page = document
.pages()
.get(0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let image = page
.render_with_config(&render_config)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?
.as_image();
let mut png_bytes = Vec::new();
image
.write_to(&mut Cursor::new(&mut png_bytes), ImageFormat::Png)
.map_err(|e| std::io::Error::other(e.to_string()))?;
Ok::<_, std::io::Error>(png_bytes)
})
.await
.map_err(std::io::Error::other)?;
let output = match cover {
Ok(data) => {
#[expect(clippy::unwrap_used)]
let label = Label::new("cover").unwrap();
HashMap::from([(
label,
PileValue::Blob {
mime: mime::IMAGE_PNG,
bytes: Arc::new(data),
},
)])
}
Err(error) => {
trace!(message = "Could not render pdf cover", ?error, key = ?self.item.key());
HashMap::new()
}
};
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for PdfCoverExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,9 +1,6 @@
use itertools::Itertools;
use pile_config::{Case, ConfigToml, DatasetFts, FieldSpecPost, Label};
use std::{
path::PathBuf,
sync::{Arc, LazyLock},
};
use pile_config::{ConfigToml, DatasetFts, Label};
use pile_value::value::{Item, PileValue};
use std::{path::PathBuf, sync::LazyLock};
use tantivy::{
DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError,
collector::Collector,
@@ -12,8 +9,6 @@ use tantivy::{
};
use tracing::{debug, trace, warn};
use crate::{Item, PileValue, extract::MetaExtractor};
#[derive(Debug, Clone)]
pub struct FtsLookupResult {
pub score: f32,
@@ -76,11 +71,11 @@ impl DbFtsIndex {
doc.add_text(self.schema.get_field("_meta_source")?, item.source_name());
doc.add_text(self.schema.get_field("_meta_key")?, key);
let extractor = PileValue::ObjectExtractor(Arc::new(MetaExtractor::new(item)));
let item = PileValue::Item(item.clone());
let mut empty = true;
for name in self.fts_cfg().fields.keys() {
let x = self.get_field(&extractor, name).await?;
let x = self.get_field(&item, name).await?;
let val = match x {
Some(x) => x,
@@ -135,13 +130,6 @@ impl DbFtsIndex {
x => x.clone(),
};
for post in &field.post {
val = match apply(post, &val) {
Some(x) => x,
None => return Ok(None),
};
}
loop {
val = match val {
PileValue::String(x) => return Ok(Some(x.to_string())),
@@ -186,6 +174,15 @@ impl DbFtsIndex {
continue 'outer;
}
PileValue::Item(_) => {
trace!(
message = "Skipping field, is item",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::ListExtractor(_) => {
trace!(
message = "Skipping field, is ListExtractor",
@@ -296,104 +293,3 @@ impl DbFtsIndex {
return Ok(out);
}
}
pub fn apply(post: &FieldSpecPost, val: &PileValue) -> Option<PileValue> {
Some(match post {
FieldSpecPost::NotEmpty { notempty: false } => val.clone(),
FieldSpecPost::NotEmpty { notempty: true } => match val {
PileValue::Null => return None,
PileValue::String(x) if x.is_empty() => return None,
PileValue::Array(x) if x.is_empty() => return None,
x => x.clone(),
},
FieldSpecPost::SetCase { case: Case::Lower } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_lowercase().into())),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?,
)),
},
FieldSpecPost::SetCase { case: Case::Upper } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_uppercase().into())),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::TrimSuffix { trim_suffix } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(
x.strip_suffix(trim_suffix).unwrap_or(x.as_str()).into(),
)),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::TrimPrefix { trim_prefix } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(
x.strip_prefix(trim_prefix).unwrap_or(x.as_str()).into(),
)),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::Join { join } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(x.clone()),
PileValue::Array(x) => PileValue::String(Arc::new(
x.iter()
.map(|x| apply(post, x))
.map(|x| x.and_then(|x| x.as_str().map(|x| x.to_owned())))
.collect::<Option<Vec<_>>>()?
.into_iter()
.join(join)
.into(),
)),
},
})
}

View File

@@ -1,21 +1,7 @@
mod traits;
pub use traits::*;
mod misc;
pub use misc::*;
mod dataset;
pub use dataset::{Dataset, DatasetError, Datasets};
mod item;
pub use item::*;
mod value;
pub use value::*;
pub mod extract;
pub mod index;
pub mod source;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -5,12 +5,13 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::{Label, objectpath::ObjectPath};
use pile_value::value::PileValue;
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::{Datasets, PileValue, extract::MetaExtractor};
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct FieldQuery {
@@ -61,10 +62,8 @@ pub async fn get_field(
return StatusCode::NOT_FOUND.into_response();
};
let extractor = MetaExtractor::new(&item);
let root: PileValue = PileValue::ObjectExtractor(Arc::new(extractor));
let value = match root.query(&path).await {
let item = PileValue::Item(item);
let value = match item.query(&path).await {
Ok(Some(v)) => v,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),

View File

@@ -1,15 +1,19 @@
use axum::{
body::Body,
extract::{Query, State},
http::{StatusCode, header},
http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_value::value::{AsyncReader, AsyncSeekReader};
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use std::{io::SeekFrom, sync::Arc, time::Instant};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use utoipa::ToSchema;
use crate::{AsyncReader, Datasets};
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ItemQuery {
@@ -17,6 +21,24 @@ pub struct ItemQuery {
key: String,
}
/// Parse a `Range: bytes=...` header value.
/// Returns `(start, end)` where either may be `None` (suffix form has `None` start).
fn parse_byte_range(s: &str) -> Option<(Option<u64>, Option<u64>)> {
let spec = s.strip_prefix("bytes=")?;
if spec.contains(',') {
return None; // multiple ranges not supported
}
if let Some(suffix) = spec.strip_prefix('-') {
return Some((None, Some(suffix.parse().ok()?)));
}
let mut parts = spec.splitn(2, '-');
let start: u64 = parts.next()?.parse().ok()?;
let end = parts
.next()
.and_then(|e| if e.is_empty() { None } else { e.parse().ok() });
Some((Some(start), end))
}
/// Fetch the raw bytes of an item by source and key
#[utoipa::path(
get,
@@ -27,14 +49,17 @@ pub struct ItemQuery {
),
responses(
(status = 200, description = "Raw item bytes"),
(status = 206, description = "Partial content"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 416, description = "Range not satisfiable"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn item_get(
State(state): State<Arc<Datasets>>,
Query(params): Query<ItemQuery>,
headers: HeaderMap,
) -> Response {
let start = Instant::now();
debug!(
@@ -59,6 +84,43 @@ pub async fn item_get(
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
let total = match reader.seek(SeekFrom::End(0)).await {
Ok(n) => n,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
let range = headers
.get(header::RANGE)
.and_then(|v| v.to_str().ok())
.and_then(parse_byte_range);
// Resolve (byte_start, byte_end, content_length, is_range)
let (byte_start, byte_end, length, is_range) = match range {
Some((Some(s), e)) => {
let e = e
.unwrap_or(total.saturating_sub(1))
.min(total.saturating_sub(1));
if s >= total || s > e {
return (
StatusCode::RANGE_NOT_SATISFIABLE,
[(header::CONTENT_RANGE, format!("bytes */{total}"))],
)
.into_response();
}
(s, e, e - s + 1, true)
}
Some((None, Some(suffix))) => {
let s = total.saturating_sub(suffix);
let e = total.saturating_sub(1);
(s, e, total.saturating_sub(s), true)
}
_ => (0, total.saturating_sub(1), total, false),
};
if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
}
debug!(
message = "Served /item",
source = params.source,
@@ -66,8 +128,54 @@ pub async fn item_get(
time_ms = start.elapsed().as_millis()
);
match reader.read_to_end().await {
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8);
tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
let mut remaining = length;
loop {
if remaining == 0 {
break;
}
let to_read = (buf.len() as u64).min(remaining) as usize;
match reader.read(&mut buf[..to_read]).await {
Ok(0) => break,
Ok(n) => {
remaining -= n as u64;
if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e)).await;
break;
}
}
}
});
let body = Body::from_stream(ReceiverStream::new(rx));
let status = if is_range {
StatusCode::PARTIAL_CONTENT
} else {
StatusCode::OK
};
let mut builder = axum::http::Response::builder()
.status(status)
.header(header::CONTENT_TYPE, mime)
.header(header::ACCEPT_RANGES, "bytes")
.header(header::CONTENT_LENGTH, length);
if is_range {
builder = builder.header(
header::CONTENT_RANGE,
format!("bytes {byte_start}-{byte_end}/{total}"),
);
}
builder
.body(body)
.map(IntoResponse::into_response)
.unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response())
}

View File

@@ -1,5 +0,0 @@
mod dir;
pub use dir::*;
mod s3;
pub use s3::*;

View File

@@ -0,0 +1,37 @@
[package]
name = "pile-value"
version = { workspace = true }
rust-version = { workspace = true }
edition = { workspace = true }
[lints]
workspace = true
[dependencies]
pile-config = { workspace = true }
pile-flac = { workspace = true }
anyhow = { workspace = true }
serde_json = { workspace = true }
walkdir = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
toml = { workspace = true }
smartstring = { workspace = true }
blake3 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }
pdfium-render = { workspace = true, optional = true }
image = { workspace = true, optional = true }
id3 = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true }
mime = { workspace = true }
mime_guess = { workspace = true }
[features]
default = []
pdfium = ["dep:pdfium-render", "dep:image"]

View File

@@ -23,6 +23,13 @@ fn main() {
.expect("unexpected OUT_DIR structure")
.to_path_buf();
// If PDFIUM_LIB_DIR is set (e.g. by Nix), use the pre-installed library directly.
if let Ok(lib_dir) = env::var("PDFIUM_LIB_DIR") {
println!("cargo:rustc-link-search=native={lib_dir}");
println!("cargo:rustc-link-lib=dylib=pdfium");
return;
}
let lib_path = profile_dir.join("libpdfium.so");
if !lib_path.exists() {

View File

@@ -6,7 +6,10 @@ use std::{
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct EpubMetaExtractor {
item: Item,
@@ -26,16 +29,9 @@ impl EpubMetaExtractor {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("epub")) {
return Ok(self.output.get_or_init(HashMap::new));
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_meta = tokio::task::spawn_blocking(move || {
let doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let doc = EpubDoc::from_reader(reader)?;
let fields: &[&'static str] = &[
"title",
@@ -51,17 +47,19 @@ impl EpubMetaExtractor {
let meta: Vec<(&'static str, Option<String>)> =
fields.iter().map(|&key| (key, doc.mdata(key))).collect();
Ok::<_, std::io::Error>(meta)
Ok::<_, anyhow::Error>(meta)
})
.await
.map_err(std::io::Error::other)?;
.await?;
let raw_meta = match raw_meta {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
Err(error) => match error.downcast::<std::io::Error>() {
Ok(x) => return Err(x),
Err(error) => {
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
},
};
let mut output: HashMap<Label, PileValue> = HashMap::new();
@@ -82,7 +80,15 @@ impl EpubMetaExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for EpubMetaExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -4,9 +4,12 @@ use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use tracing::debug;
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct EpubTextExtractor {
item: Item,
@@ -26,16 +29,9 @@ impl EpubTextExtractor {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("epub")) {
return Ok(self.output.get_or_init(HashMap::new));
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_text = tokio::task::spawn_blocking(move || {
let mut doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut doc = EpubDoc::from_reader(reader)?;
let mut text_parts: Vec<String> = Vec::new();
@@ -48,17 +44,19 @@ impl EpubTextExtractor {
}
}
Ok::<_, std::io::Error>(text_parts.join(" "))
Ok::<_, anyhow::Error>(text_parts.join(" "))
})
.await
.map_err(std::io::Error::other)?;
.await?;
let raw_text = match raw_text {
Ok(x) => x,
Err(error) => {
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
Err(error) => match error.downcast::<std::io::Error>() {
Ok(x) => return Err(x),
Err(error) => {
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
},
};
#[expect(clippy::unwrap_used)]
@@ -92,7 +90,15 @@ fn strip_html(html: &str) -> String {
#[async_trait::async_trait]
impl ObjectExtractor for EpubTextExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -7,7 +7,10 @@ pub use epub_meta::*;
mod epub_text;
pub use epub_text::*;
use crate::{Item, PileValue, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue},
};
pub struct EpubExtractor {
text: Arc<EpubTextExtractor>,
@@ -25,10 +28,20 @@ impl EpubExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for EpubExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
match name.as_str() {
"text" => self.text.field(name).await,
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
async fn field(
&self,
name: &pile_config::Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
match (name.as_str(), args) {
("text", args) => Ok(Some(
self.text
.field(name, args)
.await
.map(|x| x.unwrap_or(PileValue::Null))?,
)),
("meta", None) => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
_ => Ok(None),
}
}

View File

@@ -6,7 +6,10 @@ use std::{
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct ExifExtractor {
item: Item,
@@ -29,9 +32,7 @@ impl ExifExtractor {
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_fields = tokio::task::spawn_blocking(move || {
let mut br = BufReader::new(reader);
let exif = exif::Reader::new()
.read_from_container(&mut br)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let exif = exif::Reader::new().read_from_container(&mut br)?;
let fields: Vec<(String, String)> = exif
.fields()
@@ -43,13 +44,13 @@ impl ExifExtractor {
})
.collect();
Ok::<_, std::io::Error>(fields)
Ok::<_, exif::Error>(fields)
})
.await
.map_err(std::io::Error::other)?;
.await?;
let raw_fields = match raw_fields {
Ok(x) => x,
Err(exif::Error::Io(x)) => return Err(x),
Err(error) => {
trace!(message = "Could not process exif", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
@@ -62,6 +63,7 @@ impl ExifExtractor {
let Some(label) = tag_to_label(&tag_name) else {
continue;
};
// First occurrence wins (PRIMARY IFD comes before THUMBNAIL)
output
.entry(label)
@@ -83,7 +85,21 @@ fn tag_to_label(tag: &str) -> Option<Label> {
#[async_trait::async_trait]
impl ObjectExtractor for ExifExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
trace!(
?args,
key = self.item.key().as_str(),
"Getting field {name:?} from ExifExtractor",
);
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -0,0 +1,211 @@
use mime::Mime;
use pile_config::Label;
use pile_flac::{FlacBlock, FlacDecodeError, FlacReader};
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{
extract::traits::{ListExtractor, ObjectExtractor},
value::{Item, PileValue, SyncReadBridge},
};
pub struct FlacImagesExtractor {
item: Item,
cached_count: OnceLock<usize>,
}
impl FlacImagesExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
cached_count: OnceLock::new(),
}
}
async fn get_count(&self) -> Result<usize, std::io::Error> {
let reader = SyncReadBridge::new_current(self.item.read().await?);
let count = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut count = 0usize;
for block in reader {
match block {
Ok(FlacBlock::AudioFrame(_)) => break,
Ok(FlacBlock::Picture(_)) => count += 1,
Err(FlacDecodeError::IoError(err)) => return Err(err),
Err(_) => return Ok(0),
_ => {}
}
}
Ok::<_, std::io::Error>(count)
})
.await??;
return Ok(count);
}
}
#[async_trait::async_trait]
impl ListExtractor for FlacImagesExtractor {
async fn get<'a>(&'a self, mut idx: usize) -> Result<Option<PileValue>, std::io::Error> {
trace!(
key = self.item.key().as_str(),
"Getting index {idx} from FlacImagesExtractor",
);
let key = self.item.key();
let reader = SyncReadBridge::new_current(self.item.read().await?);
let image = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut out: Option<(Mime, Vec<u8>)> = None;
'blocks: for block in reader {
match block {
Ok(FlacBlock::AudioFrame(_)) => break,
Ok(FlacBlock::Picture(picture)) => {
if idx > 0 {
idx -= 1;
continue;
}
out = Some((picture.mime, picture.img_data));
break 'blocks;
}
Err(FlacDecodeError::IoError(err)) => return Err(err),
Err(error) => {
trace!(
message = "Could not parse FLAC images",
key = key.as_str(),
?error
);
return Ok(None);
}
_ => {}
}
}
Ok::<_, std::io::Error>(out)
})
.await
.map_err(std::io::Error::other)??;
Ok(image.map(|(mime, data)| PileValue::Blob {
mime,
bytes: Arc::new(data),
}))
}
async fn len(&self) -> Result<usize, std::io::Error> {
if let Some(x) = self.cached_count.get() {
return Ok(*x);
}
let count = self.get_count().await?;
return Ok(*self.cached_count.get_or_init(|| count));
}
}
pub struct FlacExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
images: PileValue,
}
impl FlacExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
images: PileValue::ListExtractor(Arc::new(FlacImagesExtractor::new(item))),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
trace!(
message = "Reading FLAC tags",
key = self.item.key().as_str()
);
let key = self.item.key();
let reader = SyncReadBridge::new_current(self.item.read().await?);
let output = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
for block in reader {
match block {
Ok(FlacBlock::AudioFrame(_)) => break,
Ok(FlacBlock::VorbisComment(comment)) => {
for (k, v) in comment.comment.comments {
if let Some(label) = Label::new(k.to_string().to_lowercase()) {
output
.entry(label)
.or_default()
.push(PileValue::String(Arc::new(v)));
}
}
}
Err(FlacDecodeError::IoError(err)) => return Err(err),
Err(error) => {
trace!(
message = "Could not parse FLAC metadata",
key = key.as_str(),
?error
);
return Ok(HashMap::new());
}
_ => {}
}
}
let output: HashMap<Label, PileValue> = output
.into_iter()
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
.collect();
Ok::<HashMap<Label, PileValue>, std::io::Error>(output)
})
.await??;
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for FlacExtractor {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
if name.as_str() == "images" {
return Ok(Some(self.images.clone()));
}
Ok(self.get_inner().await?.get(name).cloned())
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self
.get_inner()
.await?
.keys()
.cloned()
.chain([Label::new("images").unwrap()])
.collect::<Vec<_>>())
}
}

View File

@@ -1,3 +1,7 @@
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue},
};
use pile_config::Label;
use std::{
collections::HashMap,
@@ -5,8 +9,6 @@ use std::{
sync::{Arc, OnceLock},
};
use crate::{Item, PileValue, extract::ObjectExtractor};
pub struct FsExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
@@ -29,6 +31,26 @@ impl FsExtractor {
return Ok(self.output.get_or_init(HashMap::new));
};
let mut root = false;
let components = path
.components()
.map(|x| match x {
Component::CurDir => None,
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
Component::ParentDir => Some("..".to_owned()),
Component::RootDir => {
root = true;
None
}
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
})
.collect::<Option<Vec<_>>>();
let mut path_str = components.as_ref().map(|x| x.join("/"));
if root {
path_str = path_str.map(|x| format!("/{x}"));
}
#[expect(clippy::unwrap_used)]
let output = HashMap::from([
(
@@ -40,23 +62,20 @@ impl FsExtractor {
),
(
Label::new("path").unwrap(),
path.to_str()
path_str
.map(|x| PileValue::String(Arc::new(x.into())))
.unwrap_or(PileValue::Null),
),
(
Label::new("segments").unwrap(),
path.components()
.map(|x| match x {
Component::CurDir => Some(".".to_owned()),
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
Component::ParentDir => Some("..".to_owned()),
Component::RootDir => Some("/".to_owned()),
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
components
.map(|x| {
PileValue::Array(Arc::new(
x.iter()
.map(|x| PileValue::String(Arc::new(x.into())))
.collect(),
))
})
.map(|x| x.map(|x| PileValue::String(Arc::new(x.into()))))
.collect::<Option<Vec<_>>>()
.map(|v| PileValue::Array(Arc::new(v)))
.unwrap_or(PileValue::Null),
),
]);
@@ -67,7 +86,15 @@ impl FsExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for FsExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner()?.get(name).cloned())
}

View File

@@ -6,8 +6,12 @@ use std::{
io::BufReader,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct Id3Extractor {
item: Item,
@@ -27,32 +31,29 @@ impl Id3Extractor {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("mp3") | Some("aiff") | Some("aif") | Some("wav")) {
return Ok(self.output.get_or_init(HashMap::new));
}
trace!(message = "Reading id3 tags", key = self.item.key().as_str());
let key = self.item.key();
let reader = SyncReadBridge::new_current(self.item.read().await?);
let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader)))
.await
{
Ok(Ok(tag)) => tag,
Ok(Err(id3::Error {
kind: id3::ErrorKind::NoTag,
..
})) => {
return Ok(self.output.get_or_init(HashMap::new));
}
Err(e) => return Err(e.into()),
Ok(Err(id3::Error {
kind: id3::ErrorKind::Io(e),
..
})) => return Err(e),
Ok(Err(e)) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Err(e) => return Err(e.into()),
Ok(Err(error)) => {
trace!(
message = "Could not parse id3 tags",
key = key.as_str(),
?error
);
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
@@ -120,7 +121,15 @@ fn frame_id_to_field(id: &str) -> Cow<'static, str> {
#[async_trait::async_trait]
impl ObjectExtractor for Id3Extractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -0,0 +1,103 @@
mod flac;
use std::{collections::HashMap, sync::Arc};
pub use flac::*;
mod id3;
pub use id3::*;
mod fs;
pub use fs::*;
mod epub;
pub use epub::*;
mod exif;
pub use exif::*;
mod pdf;
pub use pdf::*;
mod toml;
use pile_config::Label;
pub use toml::*;
mod sidecar;
pub use sidecar::*;
use crate::{
extract::{misc::MapExtractor, traits::ObjectExtractor},
value::{Item, PileValue},
};
pub struct ItemExtractor {
inner: MapExtractor,
}
impl ItemExtractor {
#[expect(clippy::unwrap_used)]
pub fn new(item: &Item) -> Self {
let inner = MapExtractor {
inner: HashMap::from([
(
Label::new("flac").unwrap(),
PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))),
),
(
Label::new("id3").unwrap(),
PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))),
),
(
Label::new("fs").unwrap(),
PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))),
),
(
Label::new("epub").unwrap(),
PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))),
),
(
Label::new("exif").unwrap(),
PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))),
),
(
Label::new("pdf").unwrap(),
PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))),
),
(
Label::new("toml").unwrap(),
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
),
(
Label::new("sidecar").unwrap(),
PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))),
),
]),
};
Self { inner }
}
}
#[async_trait::async_trait]
impl ObjectExtractor for ItemExtractor {
async fn field(
&self,
name: &pile_config::Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
self.inner.field(name, args).await
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
return Ok(vec![
Label::new("flac").unwrap(),
Label::new("id3").unwrap(),
Label::new("fs").unwrap(),
Label::new("epub").unwrap(),
Label::new("exif").unwrap(),
Label::new("pdf").unwrap(),
Label::new("sidecar").unwrap(),
]);
}
}

View File

@@ -1,10 +1,6 @@
use pile_config::Label;
use std::sync::Arc;
#[cfg(feature = "pdfium")]
mod pdf_cover;
#[cfg(feature = "pdfium")]
pub use pdf_cover::*;
use tracing::trace;
#[cfg(feature = "pdfium")]
mod pdf_pages;
@@ -17,14 +13,15 @@ pub use pdf_meta::*;
mod pdf_text;
pub use pdf_text::*;
use crate::{Item, PileValue, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue},
};
pub struct PdfExtractor {
text: Arc<PdfTextExtractor>,
meta: Arc<PdfMetaExtractor>,
#[cfg(feature = "pdfium")]
cover: Arc<PdfCoverExtractor>,
#[cfg(feature = "pdfium")]
pages: Arc<PdfPagesExtractor>,
}
@@ -34,8 +31,6 @@ impl PdfExtractor {
text: Arc::new(PdfTextExtractor::new(item)),
meta: Arc::new(PdfMetaExtractor::new(item)),
#[cfg(feature = "pdfium")]
cover: Arc::new(PdfCoverExtractor::new(item)),
#[cfg(feature = "pdfium")]
pages: Arc::new(PdfPagesExtractor::new(item)),
}
}
@@ -43,14 +38,22 @@ impl PdfExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for PdfExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
match name.as_str() {
"text" => self.text.field(name).await,
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
async fn field(
&self,
name: &pile_config::Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
trace!(
?args,
key = self.text.item.key().as_str(),
"Getting field {name:?} from PdfExtractor",
);
match (name.as_str(), args) {
("text", args) => self.text.field(name, args).await,
("meta", None) => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
#[cfg(feature = "pdfium")]
"cover" => self.cover.field(name).await,
#[cfg(feature = "pdfium")]
"pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
("pages", None) => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
_ => Ok(None),
}
}
@@ -61,8 +64,6 @@ impl ObjectExtractor for PdfExtractor {
Label::new("text").unwrap(),
Label::new("meta").unwrap(),
#[cfg(feature = "pdfium")]
Label::new("cover").unwrap(),
#[cfg(feature = "pdfium")]
Label::new("pages").unwrap(),
])
}

View File

@@ -8,8 +8,10 @@ use std::{
};
use tracing::trace;
use crate::extract::ObjectExtractor;
use crate::{Item, PileValue, SyncReadBridge};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct PdfMetaExtractor {
item: Item,
@@ -120,7 +122,14 @@ fn format_date(d: &Date) -> String {
#[async_trait::async_trait]
impl ObjectExtractor for PdfMetaExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -6,7 +6,10 @@ use std::{
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ListExtractor};
use crate::{
extract::traits::ListExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct PdfPagesExtractor {
item: Item,
@@ -32,6 +35,11 @@ impl PdfPagesExtractor {
#[async_trait::async_trait]
impl ListExtractor for PdfPagesExtractor {
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
trace!(
key = self.item.key().as_str(),
"Getting index {idx} from PdfPagesExtractor",
);
let bytes = self.get_bytes().await?;
let png = tokio::task::spawn_blocking(move || {
let pdfium = Pdfium::default();

View File

@@ -8,11 +8,13 @@ use std::{
};
use tracing::trace;
use crate::extract::ObjectExtractor;
use crate::{Item, PileValue, SyncReadBridge};
use crate::{
extract::traits::ObjectExtractor,
value::{Item, PileValue, SyncReadBridge},
};
pub struct PdfTextExtractor {
item: Item,
pub(super) item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
@@ -100,7 +102,15 @@ impl PdfTextExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for PdfTextExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -1,9 +1,11 @@
use pile_config::Label;
use std::sync::OnceLock;
use tracing::trace;
use super::TomlExtractor;
use crate::{
Item, PileValue,
extract::{ObjectExtractor, TomlExtractor},
extract::traits::ObjectExtractor,
value::{Item, PileValue},
};
pub struct SidecarExtractor {
@@ -22,12 +24,22 @@ impl SidecarExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for SidecarExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
trace!(
?args,
key = self.item.key().as_str(),
"Getting field {name:?} from SidecarExtractor",
);
match self
.output
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
{
Some(x) => Ok(x.field(name).await?),
Some(x) => Ok(x.field(name, args).await?),
None => Ok(Some(PileValue::Null)),
}
}

View File

@@ -4,7 +4,10 @@ use std::{
sync::{Arc, OnceLock},
};
use crate::{AsyncReader, Item, PileValue, extract::ObjectExtractor};
use crate::{
extract::traits::ObjectExtractor,
value::{AsyncReader, Item, PileValue},
};
fn toml_to_pile(value: toml::Value) -> PileValue {
match value {
@@ -38,13 +41,7 @@ impl TomlExtractor {
return Ok(x);
}
let mut reader = match self.item.read().await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(self.output.get_or_init(HashMap::new));
}
Err(e) => return Err(e),
};
let mut reader = self.item.read().await?;
let bytes = reader.read_to_end().await?;
let toml: toml::Value = match toml::from_slice(&bytes) {
Ok(x) => x,
@@ -65,7 +62,15 @@ impl TomlExtractor {
#[async_trait::async_trait]
impl ObjectExtractor for TomlExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.get_inner().await?.get(name).cloned())
}

View File

@@ -0,0 +1,24 @@
use std::sync::Arc;
use crate::{extract::traits::ListExtractor, value::PileValue};
pub struct ArrayExtractor {
inner: Arc<Vec<PileValue>>,
}
impl ArrayExtractor {
pub fn new(inner: Arc<Vec<PileValue>>) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl ListExtractor for ArrayExtractor {
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.inner.get(idx).cloned())
}
async fn len(&self) -> Result<usize, std::io::Error> {
Ok(self.inner.len())
}
}

View File

@@ -1,15 +1,24 @@
use pile_config::Label;
use std::collections::HashMap;
use crate::{PileValue, extract::ObjectExtractor};
use crate::{extract::traits::ObjectExtractor, value::PileValue};
#[derive(Default)]
pub struct MapExtractor {
pub(crate) inner: HashMap<Label, PileValue>,
pub inner: HashMap<Label, PileValue>,
}
#[async_trait::async_trait]
impl ObjectExtractor for MapExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self.inner.get(name).cloned())
}

View File

@@ -0,0 +1,8 @@
mod list;
pub use list::*;
mod vec;
pub use vec::*;
mod map;
pub use map::*;

View File

@@ -0,0 +1,17 @@
use crate::{extract::traits::ListExtractor, value::PileValue};
#[derive(Default)]
pub struct VecExtractor {
pub inner: Vec<PileValue>,
}
#[async_trait::async_trait]
impl ListExtractor for VecExtractor {
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.inner.get(idx).cloned())
}
async fn len(&self) -> Result<usize, std::io::Error> {
Ok(self.inner.len())
}
}

View File

@@ -0,0 +1,4 @@
pub mod item;
pub mod misc;
pub mod string;
pub mod traits;

View File

@@ -0,0 +1,210 @@
use pile_config::Label;
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use crate::{extract::traits::ObjectExtractor, value::PileValue};
pub struct StringExtractor {
item: Arc<SmartString<LazyCompact>>,
}
impl StringExtractor {
pub fn new(item: &Arc<SmartString<LazyCompact>>) -> Self {
Self { item: item.clone() }
}
}
#[async_trait::async_trait]
impl ObjectExtractor for StringExtractor {
async fn field(
&self,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
Ok(match (name.as_str(), args) {
("trim", None) => Some(PileValue::String(Arc::new(
self.item.as_str().trim().into(),
))),
("upper", None) => Some(PileValue::String(Arc::new(
self.item.as_str().to_lowercase().into(),
))),
("lower", None) => Some(PileValue::String(Arc::new(
self.item.as_str().to_uppercase().into(),
))),
("nonempty", None) => Some(match self.item.is_empty() {
true => PileValue::Null,
false => PileValue::String(self.item.clone()),
}),
("trimprefix", Some(prefix)) => Some(PileValue::String(Arc::new(
self.item
.as_str()
.strip_prefix(prefix)
.unwrap_or(self.item.as_str())
.into(),
))),
("trimsuffix", Some(suffix)) => Some(PileValue::String(Arc::new(
self.item
.as_str()
.strip_suffix(suffix)
.unwrap_or(self.item.as_str())
.into(),
))),
("split", Some(by)) => Some(PileValue::Array(Arc::new(
self.item
.as_str()
.split(by)
.map(|s| PileValue::String(Arc::new(s.into())))
.collect(),
))),
_ => None,
})
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
return Ok(vec![
Label::new("trim").unwrap(),
Label::new("upper").unwrap(),
Label::new("lower").unwrap(),
Label::new("nonempty").unwrap(),
]);
}
}
#[cfg(test)]
#[expect(clippy::expect_used)]
mod tests {
use super::*;
fn extractor(s: &str) -> StringExtractor {
StringExtractor::new(&Arc::new(s.into()))
}
#[expect(clippy::unwrap_used)]
async fn field(ext: &StringExtractor, name: &str, args: Option<&str>) -> Option<PileValue> {
ext.field(&Label::new(name).unwrap(), args).await.unwrap()
}
fn string(v: Option<PileValue>) -> Option<String> {
match v? {
PileValue::String(s) => Some(s.as_str().to_owned()),
_ => panic!("expected string"),
}
}
fn array(v: Option<PileValue>) -> Vec<String> {
match v.expect("expected Some") {
PileValue::Array(arr) => arr
.iter()
.map(|v| match v {
PileValue::String(s) => s.as_str().to_owned(),
_ => panic!("expected string element"),
})
.collect(),
_ => panic!("expected array"),
}
}
#[tokio::test]
async fn trim() {
assert_eq!(
string(field(&extractor(" hi "), "trim", None).await),
Some("hi".into())
);
}
#[tokio::test]
async fn trim_no_args() {
assert!(field(&extractor("x"), "trim", Some("foo")).await.is_none());
}
#[tokio::test]
async fn nonempty_with_content() {
assert!(matches!(
field(&extractor("hello"), "nonempty", None).await,
Some(PileValue::String(_))
));
}
#[tokio::test]
async fn nonempty_empty_string() {
assert!(matches!(
field(&extractor(""), "nonempty", None).await,
Some(PileValue::Null)
));
}
#[tokio::test]
async fn trimprefix_present() {
assert_eq!(
string(field(&extractor("foobar"), "trimprefix", Some("foo")).await),
Some("bar".into())
);
}
#[tokio::test]
async fn trimprefix_absent() {
assert_eq!(
string(field(&extractor("foobar"), "trimprefix", Some("baz")).await),
Some("foobar".into())
);
}
#[tokio::test]
async fn trimprefix_no_args() {
assert!(
field(&extractor("foobar"), "trimprefix", None)
.await
.is_none()
);
}
#[tokio::test]
async fn trimsuffix_present() {
assert_eq!(
string(field(&extractor("foobar"), "trimsuffix", Some("bar")).await),
Some("foo".into())
);
}
#[tokio::test]
async fn trimsuffix_absent() {
assert_eq!(
string(field(&extractor("foobar"), "trimsuffix", Some("baz")).await),
Some("foobar".into())
);
}
#[tokio::test]
async fn split_basic() {
assert_eq!(
array(field(&extractor("a,b,c"), "split", Some(",")).await),
vec!["a", "b", "c"]
);
}
#[tokio::test]
async fn split_no_match() {
assert_eq!(
array(field(&extractor("abc"), "split", Some(",")).await),
vec!["abc"]
);
}
#[tokio::test]
async fn split_no_args() {
assert!(field(&extractor("abc"), "split", None).await.is_none());
}
#[tokio::test]
async fn unknown_field() {
assert!(field(&extractor("abc"), "bogus", None).await.is_none());
}
}

View File

@@ -0,0 +1,75 @@
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable map of {label: value},
/// much like a json object.
#[async_trait::async_trait]
pub trait ObjectExtractor: Send + Sync {
/// Get the field at `name` from `item`.
/// - returns `None` if `name` is not a valid field
/// - returns `Some(Null)` if `name` is not available
///
/// For extractors that parse binary, this fn should return
/// an error only if we failed to obtain the data we need (permission denied, etc).
///
/// If the underlying data has an invalid format (e.g, running a pdf extractor on a non-pdf file),
/// this fn should return `Ok(Some(None))`.
async fn field(
&self,
name: &pile_config::Label,
args: Option<&str>,
) -> Result<Option<crate::value::PileValue>, std::io::Error>;
/// Return all fields in this extractor.
/// `Self::field` must return [Some] for all these keys
/// and [None] for all others.
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
/// Convert this to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?;
let mut map = serde_json::Map::new();
for k in &keys {
let v = match self.field(k, None).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Object(map))
}
}
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable list of values.
#[async_trait::async_trait]
pub trait ListExtractor: Send + Sync {
/// Get the item at index `idx`.
/// Indices start at zero, and must be consecutive.
/// - returns `None` if `idx` is out of range
/// - returns `Some(Null)` if `None` is at `idx`
async fn get(&self, idx: usize) -> Result<Option<crate::value::PileValue>, std::io::Error>;
async fn len(&self) -> Result<usize, std::io::Error>;
async fn is_empty(&self) -> Result<bool, std::io::Error> {
Ok(self.len().await? == 0)
}
/// Convert this list to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let len = self.len().await?;
let mut list = Vec::with_capacity(len);
for i in 0..len {
#[expect(clippy::expect_used)]
let v = self
.get(i)
.await?
.expect("value must be present according to length");
list.push(Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Array(list))
}
}

View File

@@ -0,0 +1,3 @@
pub mod extract;
pub mod source;
pub mod value;

View File

@@ -4,7 +4,10 @@ use std::{path::PathBuf, sync::Arc};
use tokio_stream::wrappers::ReceiverStream;
use walkdir::WalkDir;
use crate::{DataSource, Item, path_ts_latest};
use crate::{
source::{DataSource, misc::path_ts_latest},
value::Item,
};
#[derive(Debug)]
pub struct DirDataSource {

View File

@@ -1,15 +1,24 @@
mod dir;
pub use dir::*;
mod s3;
pub use s3::*;
pub mod misc;
use chrono::{DateTime, Utc};
use tokio_stream::wrappers::ReceiverStream;
use crate::Item;
/// A read-only set of [Item]s.
pub trait DataSource {
/// Get an item from this datasource
fn get(&self, key: &str) -> impl Future<Output = Result<Option<Item>, std::io::Error>> + Send;
fn get(
&self,
key: &str,
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
/// Iterate over all items in this source in an arbitrary order
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>>;
fn iter(&self) -> ReceiverStream<Result<crate::value::Item, std::io::Error>>;
/// Return the time of the latest change to the data in this source
fn latest_change(

View File

@@ -5,7 +5,7 @@ use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use crate::{DataSource, Item};
use crate::{source::DataSource, value::Item};
#[derive(Debug)]
pub struct S3DataSource {

View File

@@ -0,0 +1,105 @@
use mime::Mime;
use smartstring::{LazyCompact, SmartString};
use std::{fs::File, path::PathBuf, sync::Arc};
use crate::{
source::{DirDataSource, S3DataSource},
value::{ItemReader, S3Reader},
};
//
// MARK: item
//
/// A cheaply-cloneable pointer to an item in a dataset
#[derive(Debug, Clone)]
pub enum Item {
File {
source: Arc<DirDataSource>,
mime: Mime,
path: PathBuf,
sidecar: Option<Box<Item>>,
},
S3 {
source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>,
sidecar: Option<Box<Item>>,
},
}
impl Item {
/// Open the item for reading. For S3, performs a HEAD request to determine
/// the object size.
pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
Ok(match self {
Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let head = source
.client
.head_object()
.bucket(source.bucket.as_str())
.key(key.as_str())
.send()
.await
.map_err(std::io::Error::other)?;
let size = head.content_length().unwrap_or(0) as u64;
ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: key.to_owned(),
cursor: 0,
size,
})
}
})
}
pub fn source_name(&self) -> &pile_config::Label {
match self {
Self::File { source, .. } => &source.name,
Self::S3 { source, .. } => &source.name,
}
}
#[expect(clippy::expect_used)]
pub fn key(&self) -> SmartString<LazyCompact> {
match self {
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
Self::S3 { key, .. } => key.clone(),
}
}
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
match self {
Self::File { path, .. } => {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
std::io::copy(&mut file, &mut hasher)?;
return Ok(hasher.finalize());
}
Self::S3 { .. } => todo!(),
}
}
pub fn mime(&self) -> &Mime {
match self {
Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
}
}
pub fn sidecar(&self) -> Option<&Self> {
match self {
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
}
}
}

View File

@@ -0,0 +1,9 @@
mod item;
pub use item::*;
mod readers;
pub use readers::*;
#[expect(clippy::module_inception)]
mod value;
pub use value::*;

View File

@@ -1,114 +1,13 @@
use mime::Mime;
use smartstring::{LazyCompact, SmartString};
use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::PathBuf,
sync::Arc,
};
use tokio::runtime::Handle;
use crate::source::{DirDataSource, S3DataSource};
//
// MARK: item
//
/// A cheaply-clonable pointer to an item in a dataset
#[derive(Debug, Clone)]
pub enum Item {
File {
source: Arc<DirDataSource>,
mime: Mime,
path: PathBuf,
sidecar: Option<Box<Item>>,
},
S3 {
source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>,
sidecar: Option<Box<Item>>,
},
}
impl Item {
/// Open the item for reading. For S3, performs a HEAD request to determine
/// the object size.
pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
Ok(match self {
Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let head = source
.client
.head_object()
.bucket(source.bucket.as_str())
.key(key.as_str())
.send()
.await
.map_err(std::io::Error::other)?;
let size = head.content_length().unwrap_or(0) as u64;
ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: key.to_owned(),
cursor: 0,
size,
})
}
})
}
pub fn source_name(&self) -> &pile_config::Label {
match self {
Self::File { source, .. } => &source.name,
Self::S3 { source, .. } => &source.name,
}
}
#[expect(clippy::expect_used)]
pub fn key(&self) -> SmartString<LazyCompact> {
match self {
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
Self::S3 { key, .. } => key.clone(),
}
}
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
match self {
Self::File { path, .. } => {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
std::io::copy(&mut file, &mut hasher)?;
return Ok(hasher.finalize());
}
Self::S3 { .. } => todo!(),
}
}
pub fn mime(&self) -> &Mime {
match self {
Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
}
}
pub fn sidecar(&self) -> Option<&Self> {
match self {
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
}
}
}
//
// MARK: reader
// MARK: traits
//
pub trait AsyncReader: Send {
@@ -210,11 +109,11 @@ impl AsyncSeekReader for ItemReader {
//
pub struct S3Reader {
client: Arc<aws_sdk_s3::Client>,
bucket: SmartString<LazyCompact>,
key: SmartString<LazyCompact>,
cursor: u64,
size: u64,
pub client: Arc<aws_sdk_s3::Client>,
pub bucket: SmartString<LazyCompact>,
pub key: SmartString<LazyCompact>,
pub cursor: u64,
pub size: u64,
}
impl AsyncReader for S3Reader {

View File

@@ -4,9 +4,17 @@ use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use crate::extract::{ListExtractor, ObjectExtractor};
use crate::{
extract::{
item::ItemExtractor,
misc::{ArrayExtractor, MapExtractor, VecExtractor},
string::StringExtractor,
traits::{ListExtractor, ObjectExtractor},
},
value::Item,
};
/// An immutable, cheaply-clonable, lazily-computed value.
/// An immutable, cheaply-cloneable, lazily-computed value.
/// Very similar to [serde_json::Value].
pub enum PileValue {
Null,
@@ -30,6 +38,9 @@ pub enum PileValue {
/// A lazily-computed array
ListExtractor(Arc<dyn ListExtractor>),
/// An pointer to an item in this dataset
Item(Item),
}
impl Clone for PileValue {
@@ -46,62 +57,82 @@ impl Clone for PileValue {
mime: mime.clone(),
bytes: bytes.clone(),
},
Self::Item(i) => Self::Item(i.clone()),
}
}
}
impl PileValue {
pub fn object_extractor(&self) -> Arc<dyn ObjectExtractor> {
match self {
Self::Null => Arc::new(MapExtractor::default()),
Self::U64(_) => Arc::new(MapExtractor::default()),
Self::I64(_) => Arc::new(MapExtractor::default()),
Self::Array(_) => Arc::new(MapExtractor::default()),
Self::String(s) => Arc::new(StringExtractor::new(s)),
Self::Blob { .. } => Arc::new(MapExtractor::default()),
Self::ListExtractor(_) => Arc::new(MapExtractor::default()),
Self::ObjectExtractor(e) => e.clone(),
Self::Item(i) => Arc::new(ItemExtractor::new(i)),
}
}
pub fn list_extractor(&self) -> Arc<dyn ListExtractor> {
match self {
Self::Null => Arc::new(VecExtractor::default()),
Self::U64(_) => Arc::new(VecExtractor::default()),
Self::I64(_) => Arc::new(VecExtractor::default()),
Self::Array(a) => Arc::new(ArrayExtractor::new(a.clone())),
Self::String(_) => Arc::new(VecExtractor::default()),
Self::Blob { .. } => Arc::new(VecExtractor::default()),
Self::ListExtractor(e) => e.clone(),
Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()),
Self::Item(_) => Arc::new(VecExtractor::default()),
}
}
pub async fn query(&self, query: &ObjectPath) -> Result<Option<Self>, std::io::Error> {
let mut out: Option<PileValue> = Some(self.clone());
for s in &query.segments {
match s {
PathSegment::Root => out = Some(self.clone()),
PathSegment::Field(field) => {
out = match out {
None => return Ok(None),
Some(Self::Null) => None,
Some(Self::U64(_)) => None,
Some(Self::I64(_)) => None,
Some(Self::Array(_)) => None,
Some(Self::String(_)) => None,
Some(Self::Blob { .. }) => None,
Some(Self::ListExtractor(_)) => None,
Some(Self::ObjectExtractor(e)) => e.field(field).await?,
}
PathSegment::Field { name, args } => {
let e = match out.map(|x| x.object_extractor()) {
Some(e) => e,
None => {
out = None;
continue;
}
};
out = e.field(name, args.as_deref()).await?;
}
PathSegment::Index(idx) => {
out = match &out {
None => return Ok(None),
Some(Self::Null) => None,
Some(Self::U64(_)) => None,
Some(Self::I64(_)) => None,
Some(Self::Blob { .. }) => None,
Some(Self::Array(v)) => {
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(v.len() as i64 - idx).ok()
};
idx.and_then(|idx| v.get(idx)).cloned()
let e = match out.map(|x| x.list_extractor()) {
Some(e) => e,
None => {
out = None;
continue;
}
Some(Self::String(_)) => None,
Some(Self::ObjectExtractor(_)) => None,
Some(Self::ListExtractor(e)) => {
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(e.len().await? as i64 - idx).ok()
};
};
match idx {
Some(idx) => e.get(idx).await?,
None => None,
}
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(e.len().await? as i64 - idx).ok()
};
let idx = match idx {
Some(idx) => idx,
None => {
out = None;
continue;
}
}
};
out = e.get(idx).await?;
}
}
}
@@ -127,11 +158,12 @@ impl PileValue {
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
Self::ObjectExtractor(e) => {
Self::ObjectExtractor(_) | Self::Item(_) => {
let e = self.object_extractor();
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
let v = match e.field(k, None).await? {
Some(x) => x,
None => continue,
};
@@ -160,35 +192,22 @@ impl PileValue {
Self::Null => Value::Null,
Self::U64(x) => Value::Number((*x).into()),
Self::I64(x) => Value::Number((*x).into()),
Self::String(x) => Value::String(x.to_string()),
// TODO: replace with something meaningful
// TODO: replace with something meaningful?
Self::Blob { mime, bytes } => {
Value::String(format!("<Blob ({mime}, {} bytes)>", bytes.len()))
}
Self::String(x) => Value::String(x.to_string()),
Self::Array(x) => {
let mut arr = Vec::new();
for item in &**x {
arr.push(Box::pin(item.to_json()).await?);
}
Value::Array(arr)
Self::Array(_) | Self::ListExtractor(_) => {
let e = self.list_extractor();
return e.to_json().await;
}
Self::ObjectExtractor(e) => {
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Value::Object(map)
Self::ObjectExtractor(_) | Self::Item(_) => {
let e = self.object_extractor();
return e.to_json().await;
}
Self::ListExtractor(e) => e.to_json().await?,
})
}
}

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
pile-toolbox = { workspace = true }
pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
pile-value = { workspace = true, features = ["pdfium"] }
pile-config = { workspace = true }
tracing = { workspace = true }

View File

@@ -1,10 +1,12 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_config::{Label, Source};
use pile_dataset::index::DbFtsIndex;
use pile_dataset::source::DirDataSource;
use pile_dataset::{DataSource, Datasets, Item, PileValue, extract::MetaExtractor};
use pile_dataset::{Datasets, index::DbFtsIndex};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{
source::{DataSource, DirDataSource},
value::{Item, PileValue},
};
use std::{path::PathBuf, sync::Arc};
use tokio_stream::StreamExt;
use tracing::{info, warn};
@@ -72,11 +74,9 @@ impl CliCmd for AnnotateCommand {
continue;
};
let meta = MetaExtractor::new(&item);
let extractor = PileValue::ObjectExtractor(Arc::new(meta));
let item = PileValue::Item(item.clone());
let Some(value) =
index.get_field(&extractor, &field).await.with_context(|| {
index.get_field(&item, &field).await.with_context(|| {
format!("while extracting field from {}", path.display())
})?
else {

View File

@@ -1,9 +1,10 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::value::PileValue;
use serde_json::{Map, Value};
use std::{path::PathBuf, sync::Arc, time::Instant};
use std::{path::PathBuf, time::Instant};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::info;
@@ -93,9 +94,8 @@ impl CliCmd for FieldsCommand {
item_result.with_context(|| format!("while reading source {name}"))?;
let name = name.clone();
join_set.spawn(async move {
let meta = MetaExtractor::new(&item);
let value = PileValue::ObjectExtractor(Arc::new(meta));
let result = value.count_fields().await.with_context(|| {
let item = PileValue::Item(item);
let result = item.count_fields().await.with_context(|| {
format!("while counting fields in source {name}")
})?;
Ok(result.and_then(|v| {

View File

@@ -1,8 +1,9 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_config::objectpath::ObjectPath;
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::value::PileValue;
use std::{path::PathBuf, str::FromStr, sync::Arc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
@@ -79,9 +80,8 @@ impl CliCmd for ListCommand {
let invert = self.invert;
join_set.spawn(async move {
let meta = MetaExtractor::new(&item);
let root = PileValue::ObjectExtractor(Arc::new(meta));
let value = root.query(&path).await?;
let item = PileValue::Item(item);
let value = item.query(&path).await?;
let is_present =
matches!(value, Some(v) if !matches!(v, PileValue::Null));

View File

@@ -1,9 +1,10 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use std::{path::PathBuf, sync::Arc};
use pile_value::value::PileValue;
use std::path::PathBuf;
use crate::{CliCmd, GlobalContext};
@@ -54,9 +55,8 @@ impl CliCmd for ProbeCommand {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?;
let value = PileValue::ObjectExtractor(Arc::new(MetaExtractor::new(&item)));
value
.to_json()
let item = PileValue::Item(item);
item.to_json()
.await
.with_context(|| format!("while extracting {}", self.key))?
};

52
default.nix Normal file
View File

@@ -0,0 +1,52 @@
let
rustOverlay = import (
builtins.fetchTarball {
url = "https://github.com/oxalica/rust-overlay/archive/master.tar.gz";
sha256 = "0qgrkgc695a7gja83dngxrcx4gdg9056gvg5325i5yyjxg0ni6c9";
}
);
pkgsDefault = import <nixpkgs> { overlays = [ rustOverlay ]; };
rustToolchain = pkgsDefault.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
rustPlatformDefault = pkgsDefault.makeRustPlatform {
cargo = rustToolchain;
rustc = rustToolchain;
};
in
{
pkgs ? pkgsDefault,
pileRustPlatform ? rustPlatformDefault,
}:
pileRustPlatform.buildRustPackage {
pname = "pile";
version = "0.0.2";
src = ./.;
cargoLock.lockFile = ./Cargo.lock;
PDFIUM_LIB_DIR = "${pkgs.pdfium-binaries}/lib";
buildInputs = [
pkgs.pdfium-binaries
pkgs.openssl
]
++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
pkgs.darwin.apple_sdk.frameworks.Security
pkgs.darwin.apple_sdk.frameworks.SystemConfiguration
];
nativeBuildInputs = [
pkgs.pkg-config
pkgs.makeWrapper
];
postInstall = ''
wrapProgram $out/bin/pile \
--prefix LD_LIBRARY_PATH : ${pkgs.lib.makeLibraryPath [ pkgs.pdfium-binaries ]}
'';
meta = {
description = "pile - flexible file indexing";
mainProgram = "pile";
};
}

40
flake.nix Normal file
View File

@@ -0,0 +1,40 @@
{
description = "pile - personal data indexer";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, rust-overlay, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs { inherit system overlays; };
rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
rustPlatform = pkgs.makeRustPlatform {
cargo = rustToolchain;
rustc = rustToolchain;
};
in {
packages.default = import ./default.nix { inherit pkgs; pileRustPlatform = rustPlatform; };
devShells.default = pkgs.mkShell {
buildInputs = [
rustToolchain
pkgs.pdfium-binaries
pkgs.openssl
pkgs.pkg-config
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath [ pkgs.pdfium-binaries ];
};
});
}

2
rust-toolchain.toml Normal file
View File

@@ -0,0 +1,2 @@
[toolchain]
channel = "1.94.0"