Extractor rewrite

This commit is contained in:
2026-03-11 10:12:36 -07:00
parent b789255ea9
commit 4546a85bd3
51 changed files with 669 additions and 688 deletions

View File

@@ -10,37 +10,23 @@ workspace = true
[dependencies]
pile-config = { workspace = true }
pile-toolbox = { workspace = true }
pile-flac = { workspace = true }
pile-value = { workspace = true }
serde_json = { workspace = true }
itertools = { workspace = true }
walkdir = { workspace = true }
tantivy = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
smartstring = { workspace = true }
blake3 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }
pdfium-render = { workspace = true, optional = true }
image = { workspace = true, optional = true }
id3 = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true }
mime = { workspace = true }
mime_guess = { workspace = true }
serde = { workspace = true }
serde = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features]
default = []
pdfium = ["dep:pdfium-render", "dep:image"]
axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"]
pdfium = ["pile-value/pdfium"]
axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde"]

View File

@@ -1,57 +0,0 @@
use std::env;
use std::path::PathBuf;
const PDFIUM_URL: &str = "https://github.com/bblanchon/pdfium-binaries/releases/download/chromium%2F7725/pdfium-linux-x64.tgz";
#[expect(clippy::expect_used)]
#[expect(clippy::unwrap_used)]
#[expect(clippy::print_stderr)]
fn main() {
println!("cargo:rerun-if-changed=build.rs");
if env::var("CARGO_FEATURE_PDFIUM").is_err() {
return;
}
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
// OUT_DIR is target/<profile>/build/<pkg>-<hash>/out
// Go up 3 levels to reach target/<profile>/
let profile_dir = out_dir
.ancestors()
.nth(3)
.expect("unexpected OUT_DIR structure")
.to_path_buf();
let lib_path = profile_dir.join("libpdfium.so");
if !lib_path.exists() {
let tgz_path = out_dir.join("pdfium.tgz");
eprintln!("cargo:warning=Downloading PDFium from {PDFIUM_URL}");
let status = std::process::Command::new("curl")
.args(["-L", "--fail", "-o", tgz_path.to_str().unwrap(), PDFIUM_URL])
.status()
.expect("failed to run curl");
assert!(status.success(), "curl failed to download PDFium");
let status = std::process::Command::new("tar")
.args([
"-xzf",
tgz_path.to_str().unwrap(),
"-C",
out_dir.to_str().unwrap(),
])
.status()
.expect("failed to run tar");
assert!(status.success(), "tar failed to extract PDFium");
std::fs::copy(out_dir.join("lib").join("libpdfium.so"), &lib_path)
.expect("failed to copy libpdfium.so");
}
println!("cargo:rustc-link-search=native={}", profile_dir.display());
println!("cargo:rustc-link-lib=dylib=pdfium");
println!("cargo:rustc-link-arg=-Wl,-rpath,$ORIGIN");
}

View File

@@ -1,6 +1,10 @@
use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{
source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest},
value::{Item, PileValue},
};
use serde_json::Value;
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
@@ -9,13 +13,7 @@ use tokio::task::JoinSet;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{debug, info, trace, warn};
use crate::{
DataSource, Item, PileValue,
extract::MetaExtractor,
index::{DbFtsIndex, FtsLookupResult},
path_ts_earliest,
source::{DirDataSource, S3DataSource},
};
use crate::index::{DbFtsIndex, FtsLookupResult};
#[derive(Debug, Error)]
pub enum DatasetError {
@@ -183,11 +181,12 @@ impl Datasets {
let Some(item) = self.get(source, key).await else {
return Ok(None);
};
let extractor = MetaExtractor::new(&item);
let root = PileValue::ObjectExtractor(Arc::new(extractor));
let Some(value) = root.query(path).await? else {
let item = PileValue::Item(item);
let Some(value) = item.query(path).await? else {
return Ok(None);
};
Ok(Some(value.to_json().await?))
}

View File

@@ -1,92 +0,0 @@
use epub::doc::EpubDoc;
use pile_config::Label;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct EpubMetaExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl EpubMetaExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("epub")) {
return Ok(self.output.get_or_init(HashMap::new));
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_meta = tokio::task::spawn_blocking(move || {
let doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let fields: &[&'static str] = &[
"title",
"creator",
"description",
"language",
"publisher",
"date",
"subject",
"identifier",
];
let meta: Vec<(&'static str, Option<String>)> =
fields.iter().map(|&key| (key, doc.mdata(key))).collect();
Ok::<_, std::io::Error>(meta)
})
.await
.map_err(std::io::Error::other)?;
let raw_meta = match raw_meta {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, PileValue> = HashMap::new();
#[expect(clippy::unwrap_used)]
for (key, val) in raw_meta {
let label = Label::new(key).unwrap();
let value = match val {
Some(s) => PileValue::String(Arc::new(s.into())),
None => PileValue::Null,
};
output.insert(label, value);
}
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for EpubMetaExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,102 +0,0 @@
use epub::doc::EpubDoc;
use pile_config::Label;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use tracing::debug;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct EpubTextExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl EpubTextExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("epub")) {
return Ok(self.output.get_or_init(HashMap::new));
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_text = tokio::task::spawn_blocking(move || {
let mut doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut text_parts: Vec<String> = Vec::new();
loop {
if let Ok(content) = doc.get_current_str() {
text_parts.push(strip_html(&content));
}
if doc.go_next().is_err() {
break;
}
}
Ok::<_, std::io::Error>(text_parts.join(" "))
})
.await
.map_err(std::io::Error::other)?;
let raw_text = match raw_text {
Ok(x) => x,
Err(error) => {
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
#[expect(clippy::unwrap_used)]
let output = HashMap::from([(
Label::new("text").unwrap(),
PileValue::String(Arc::new(raw_text.into())),
)]);
let _ = self.output.set(output);
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
}
}
/// Strip HTML/XHTML tags from a string, leaving only text nodes.
fn strip_html(html: &str) -> String {
let mut result = String::with_capacity(html.len());
let mut in_tag = false;
for c in html.chars() {
match c {
'<' => in_tag = true,
'>' => in_tag = false,
_ if !in_tag => result.push(c),
_ => {}
}
}
result
}
#[async_trait::async_trait]
impl ObjectExtractor for EpubTextExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,43 +0,0 @@
use pile_config::Label;
use std::sync::Arc;
mod epub_meta;
pub use epub_meta::*;
mod epub_text;
pub use epub_text::*;
use crate::{Item, PileValue, extract::ObjectExtractor};
pub struct EpubExtractor {
text: Arc<EpubTextExtractor>,
meta: Arc<EpubMetaExtractor>,
}
impl EpubExtractor {
pub fn new(item: &Item) -> Self {
Self {
text: Arc::new(EpubTextExtractor::new(item)),
meta: Arc::new(EpubMetaExtractor::new(item)),
}
}
}
#[async_trait::async_trait]
impl ObjectExtractor for EpubExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
match name.as_str() {
"text" => self.text.field(name).await,
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
_ => Ok(None),
}
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(vec![
Label::new("text").unwrap(),
Label::new("meta").unwrap(),
])
}
}

View File

@@ -1,93 +0,0 @@
use pile_config::Label;
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct ExifExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl ExifExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_fields = tokio::task::spawn_blocking(move || {
let mut br = BufReader::new(reader);
let exif = exif::Reader::new()
.read_from_container(&mut br)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let fields: Vec<(String, String)> = exif
.fields()
.map(|f| {
(
f.tag.to_string(),
f.display_value().with_unit(&exif).to_string(),
)
})
.collect();
Ok::<_, std::io::Error>(fields)
})
.await
.map_err(std::io::Error::other)?;
let raw_fields = match raw_fields {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process exif", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, PileValue> = HashMap::new();
for (tag_name, value) in raw_fields {
let Some(label) = tag_to_label(&tag_name) else {
continue;
};
// First occurrence wins (PRIMARY IFD comes before THUMBNAIL)
output
.entry(label)
.or_insert_with(|| PileValue::String(Arc::new(value.into())));
}
return Ok(self.output.get_or_init(|| output));
}
}
fn tag_to_label(tag: &str) -> Option<Label> {
let sanitized: String = tag
.chars()
.map(|c| if c == ' ' { '_' } else { c })
.filter(|c| Label::VALID_CHARS.contains(*c))
.collect();
Label::new(sanitized)
}
#[async_trait::async_trait]
impl ObjectExtractor for ExifExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,162 +0,0 @@
use mime::Mime;
use pile_config::Label;
use pile_flac::{FlacBlock, FlacReader};
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use crate::{
Item, PileValue, SyncReadBridge,
extract::{ListExtractor, ObjectExtractor},
};
pub struct FlacImagesExtractor {
item: Item,
}
impl FlacImagesExtractor {
pub fn new(item: &Item) -> Self {
Self { item: item.clone() }
}
async fn get_images(&self) -> Result<Vec<PileValue>, std::io::Error> {
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_images = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut images: Vec<(Mime, Vec<u8>)> = Vec::new();
for block in reader {
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
FlacBlock::Picture(picture) => {
images.push((picture.mime, picture.img_data));
}
FlacBlock::AudioFrame(_) => break,
_ => {}
}
}
Ok::<_, std::io::Error>(images)
})
.await
.map_err(std::io::Error::other)??;
Ok(raw_images
.into_iter()
.map(|(mime, data)| PileValue::Blob {
mime,
bytes: Arc::new(data),
})
.collect())
}
}
#[async_trait::async_trait]
impl ListExtractor for FlacImagesExtractor {
async fn get<'a>(&'a self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_images().await?.into_iter().nth(idx))
}
async fn len(&self) -> Result<usize, std::io::Error> {
Ok(self.get_images().await?.len())
}
}
pub struct FlacExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
images: Option<PileValue>,
}
impl FlacExtractor {
pub fn new(item: &Item) -> Self {
let is_flac = match item {
Item::File { path, .. } => path.to_str().unwrap_or_default().ends_with(".flac"),
Item::S3 { key, .. } => key.ends_with(".flac"),
};
let images =
is_flac.then(|| PileValue::ListExtractor(Arc::new(FlacImagesExtractor::new(item))));
Self {
item: item.clone(),
output: OnceLock::new(),
images,
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let key = match &self.item {
Item::File { path, .. } => path.to_str().unwrap_or_default().to_owned(),
Item::S3 { key, .. } => key.to_string(),
};
if !key.ends_with(".flac") {
let _ = self.output.set(HashMap::new());
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_tags = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut tags: Vec<(String, String)> = Vec::new();
for block in reader {
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
FlacBlock::VorbisComment(comment) => {
for (k, v) in comment.comment.comments {
tags.push((k.to_string().to_lowercase(), v.into()));
}
}
FlacBlock::AudioFrame(_) => break,
_ => {}
}
}
Ok::<_, std::io::Error>(tags)
})
.await
.map_err(std::io::Error::other)??;
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
for (k, v) in raw_tags {
if let Some(label) = Label::new(k) {
output
.entry(label)
.or_default()
.push(PileValue::String(Arc::new(v.into())));
}
}
let output: HashMap<Label, PileValue> = output
.into_iter()
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
.collect();
let _ = self.output.set(output);
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
}
}
#[async_trait::async_trait]
impl ObjectExtractor for FlacExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
if name.as_str() == "images"
&& let Some(ref images) = self.images
{
return Ok(Some(images.clone()));
}
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
let mut fields = self.get_inner().await?.keys().cloned().collect::<Vec<_>>();
if self.images.is_some() {
#[expect(clippy::unwrap_used)]
fields.push(Label::new("images").unwrap());
}
Ok(fields)
}
}

View File

@@ -1,77 +0,0 @@
use pile_config::Label;
use std::{
collections::HashMap,
path::Component,
sync::{Arc, OnceLock},
};
use crate::{Item, PileValue, extract::ObjectExtractor};
pub struct FsExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl FsExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let Item::File { path, .. } = &self.item else {
return Ok(self.output.get_or_init(HashMap::new));
};
#[expect(clippy::unwrap_used)]
let output = HashMap::from([
(
Label::new("extension").unwrap(),
path.extension()
.and_then(|x| x.to_str())
.map(|x| PileValue::String(Arc::new(x.into())))
.unwrap_or(PileValue::Null),
),
(
Label::new("path").unwrap(),
path.to_str()
.map(|x| PileValue::String(Arc::new(x.into())))
.unwrap_or(PileValue::Null),
),
(
Label::new("segments").unwrap(),
path.components()
.map(|x| match x {
Component::CurDir => Some(".".to_owned()),
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
Component::ParentDir => Some("..".to_owned()),
Component::RootDir => Some("/".to_owned()),
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
})
.map(|x| x.map(|x| PileValue::String(Arc::new(x.into()))))
.collect::<Option<Vec<_>>>()
.map(|v| PileValue::Array(Arc::new(v)))
.unwrap_or(PileValue::Null),
),
]);
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for FsExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner()?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner()?.keys().cloned().collect())
}
}

View File

@@ -1,130 +0,0 @@
use id3::Tag;
use pile_config::Label;
use std::{
borrow::Cow,
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct Id3Extractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl Id3Extractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
if !matches!(ext, Some("mp3") | Some("aiff") | Some("aif") | Some("wav")) {
return Ok(self.output.get_or_init(HashMap::new));
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader)))
.await
{
Ok(Ok(tag)) => tag,
Ok(Err(id3::Error {
kind: id3::ErrorKind::NoTag,
..
})) => {
return Ok(self.output.get_or_init(HashMap::new));
}
Ok(Err(id3::Error {
kind: id3::ErrorKind::Io(e),
..
})) => return Err(e),
Ok(Err(e)) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Err(e) => return Err(e.into()),
};
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
for frame in tag.frames() {
if let Some(text) = frame.content().text() {
let name = frame_id_to_field(frame.id());
if let Some(key) = Label::new(name) {
output
.entry(key)
.or_default()
.push(PileValue::String(Arc::new(text.into())));
}
}
}
let output = output
.into_iter()
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
.collect();
return Ok(self.output.get_or_init(|| output));
}
}
/// Map an ID3 frame ID to the equivalent Vorbis Comment field name.
/// Falls back to the lowercased frame ID if no mapping exists.
fn frame_id_to_field(id: &str) -> Cow<'static, str> {
match id {
// spell:off
"TIT2" => Cow::Borrowed("title"),
"TIT1" => Cow::Borrowed("grouping"),
"TIT3" => Cow::Borrowed("subtitle"),
"TPE1" => Cow::Borrowed("artist"),
"TPE2" => Cow::Borrowed("albumartist"),
"TPE3" => Cow::Borrowed("conductor"),
"TOPE" => Cow::Borrowed("originalartist"),
"TALB" => Cow::Borrowed("album"),
"TOAL" => Cow::Borrowed("originalalbum"),
"TRCK" => Cow::Borrowed("tracknumber"),
"TPOS" => Cow::Borrowed("discnumber"),
"TSST" => Cow::Borrowed("discsubtitle"),
"TDRC" | "TYER" => Cow::Borrowed("date"),
"TDOR" | "TORY" => Cow::Borrowed("originaldate"),
"TCON" => Cow::Borrowed("genre"),
"TCOM" => Cow::Borrowed("composer"),
"TEXT" => Cow::Borrowed("lyricist"),
"TPUB" => Cow::Borrowed("label"),
"TSRC" => Cow::Borrowed("isrc"),
"TBPM" => Cow::Borrowed("bpm"),
"TLAN" => Cow::Borrowed("language"),
"TMED" => Cow::Borrowed("media"),
"TMOO" => Cow::Borrowed("mood"),
"TCOP" => Cow::Borrowed("copyright"),
"TENC" => Cow::Borrowed("encodedby"),
"TSSE" => Cow::Borrowed("encodersettings"),
"TSOA" => Cow::Borrowed("albumsort"),
"TSOP" => Cow::Borrowed("artistsort"),
"TSOT" => Cow::Borrowed("titlesort"),
"MVNM" => Cow::Borrowed("movement"),
"MVIN" => Cow::Borrowed("movementnumber"),
_ => Cow::Owned(id.to_lowercase()),
// spell:on
}
}
#[async_trait::async_trait]
impl ObjectExtractor for Id3Extractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,19 +0,0 @@
use pile_config::Label;
use std::collections::HashMap;
use crate::{PileValue, extract::ObjectExtractor};
pub struct MapExtractor {
pub(crate) inner: HashMap<Label, PileValue>,
}
#[async_trait::async_trait]
impl ObjectExtractor for MapExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.inner.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.inner.keys().cloned().collect())
}
}

View File

@@ -1,165 +0,0 @@
use pile_config::Label;
use std::{collections::HashMap, sync::Arc};
mod flac;
pub use flac::*;
mod id3;
pub use id3::*;
mod fs;
pub use fs::*;
mod epub;
pub use epub::*;
mod exif;
pub use exif::*;
mod pdf;
pub use pdf::*;
mod toml;
pub use toml::*;
mod map;
pub use map::*;
mod sidecar;
pub use sidecar::*;
use crate::{Item, PileValue};
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable map of {label: value},
/// much like a json object.
#[async_trait::async_trait]
pub trait ObjectExtractor: Send + Sync {
/// Get the field at `name` from `item`.
/// - returns `None` if `name` is not a valid field
/// - returns `Some(Null)` if `name` is not available
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error>;
/// Return all fields in this extractor.
/// `Self::field` must return [Some] for all these keys
/// and [None] for all others.
async fn fields(&self) -> Result<Vec<Label>, std::io::Error>;
/// Convert this to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let keys = self.fields().await?;
let mut map = serde_json::Map::new();
for k in &keys {
let v = match self.field(k).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Object(map))
}
}
/// An attachment that extracts metadata from an [Item].
///
/// Metadata is exposed as an immutable list of values.
#[async_trait::async_trait]
pub trait ListExtractor: Send + Sync {
/// Get the item at index `idx`.
/// Indices start at zero, and must be consecutive.
/// - returns `None` if `idx` is out of range
/// - returns `Some(Null)` if `None` is at `idx`
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error>;
async fn len(&self) -> Result<usize, std::io::Error>;
async fn is_empty(&self) -> Result<bool, std::io::Error> {
Ok(self.len().await? == 0)
}
/// Convert this list to a JSON value.
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
let len = self.len().await?;
let mut list = Vec::with_capacity(len);
for i in 0..len {
#[expect(clippy::expect_used)]
let v = self
.get(i)
.await?
.expect("value must be present according to length");
list.push(Box::pin(v.to_json()).await?);
}
Ok(serde_json::Value::Array(list))
}
}
pub struct MetaExtractor {
inner: MapExtractor,
}
impl MetaExtractor {
#[expect(clippy::unwrap_used)]
pub fn new(item: &Item) -> Self {
let inner = MapExtractor {
inner: HashMap::from([
(
Label::new("flac").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))),
),
(
Label::new("id3").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))),
),
(
Label::new("fs").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))),
),
(
Label::new("epub").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))),
),
(
Label::new("exif").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))),
),
(
Label::new("pdf").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))),
),
(
Label::new("toml").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
),
(
Label::new("sidecar").unwrap(),
crate::PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))),
),
]),
};
Self { inner }
}
}
#[async_trait::async_trait]
impl ObjectExtractor for MetaExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
self.inner.field(name).await
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
return Ok(vec![
Label::new("flac").unwrap(),
Label::new("id3").unwrap(),
Label::new("fs").unwrap(),
Label::new("epub").unwrap(),
Label::new("exif").unwrap(),
Label::new("pdf").unwrap(),
Label::new("sidecar").unwrap(),
]);
}
}

View File

@@ -1,69 +0,0 @@
use pile_config::Label;
use std::sync::Arc;
#[cfg(feature = "pdfium")]
mod pdf_cover;
#[cfg(feature = "pdfium")]
pub use pdf_cover::*;
#[cfg(feature = "pdfium")]
mod pdf_pages;
#[cfg(feature = "pdfium")]
pub use pdf_pages::*;
mod pdf_meta;
pub use pdf_meta::*;
mod pdf_text;
pub use pdf_text::*;
use crate::{Item, PileValue, extract::ObjectExtractor};
pub struct PdfExtractor {
text: Arc<PdfTextExtractor>,
meta: Arc<PdfMetaExtractor>,
#[cfg(feature = "pdfium")]
cover: Arc<PdfCoverExtractor>,
#[cfg(feature = "pdfium")]
pages: Arc<PdfPagesExtractor>,
}
impl PdfExtractor {
pub fn new(item: &Item) -> Self {
Self {
text: Arc::new(PdfTextExtractor::new(item)),
meta: Arc::new(PdfMetaExtractor::new(item)),
#[cfg(feature = "pdfium")]
cover: Arc::new(PdfCoverExtractor::new(item)),
#[cfg(feature = "pdfium")]
pages: Arc::new(PdfPagesExtractor::new(item)),
}
}
}
#[async_trait::async_trait]
impl ObjectExtractor for PdfExtractor {
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
match name.as_str() {
"text" => self.text.field(name).await,
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
#[cfg(feature = "pdfium")]
"cover" => self.cover.field(name).await,
#[cfg(feature = "pdfium")]
"pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
_ => Ok(None),
}
}
#[expect(clippy::unwrap_used)]
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(vec![
Label::new("text").unwrap(),
Label::new("meta").unwrap(),
#[cfg(feature = "pdfium")]
Label::new("cover").unwrap(),
#[cfg(feature = "pdfium")]
Label::new("pages").unwrap(),
])
}
}

View File

@@ -1,95 +0,0 @@
use image::ImageFormat;
use pdfium_render::prelude::*;
use pile_config::Label;
use std::{
collections::HashMap,
io::{BufReader, Cursor},
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ObjectExtractor};
pub struct PdfCoverExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl PdfCoverExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let cover = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let pdfium = Pdfium::default();
let document = pdfium
.load_pdf_from_byte_slice(&bytes, None)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let render_config = PdfRenderConfig::new().set_target_width(1024);
let page = document
.pages()
.get(0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let image = page
.render_with_config(&render_config)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?
.as_image();
let mut png_bytes = Vec::new();
image
.write_to(&mut Cursor::new(&mut png_bytes), ImageFormat::Png)
.map_err(|e| std::io::Error::other(e.to_string()))?;
Ok::<_, std::io::Error>(png_bytes)
})
.await
.map_err(std::io::Error::other)?;
let output = match cover {
Ok(data) => {
#[expect(clippy::unwrap_used)]
let label = Label::new("cover").unwrap();
HashMap::from([(
label,
PileValue::Blob {
mime: mime::IMAGE_PNG,
bytes: Arc::new(data),
},
)])
}
Err(error) => {
trace!(message = "Could not render pdf cover", ?error, key = ?self.item.key());
HashMap::new()
}
};
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for PdfCoverExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,130 +0,0 @@
use pdf::file::FileOptions;
use pdf::primitive::{Date, TimeRel};
use pile_config::Label;
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::extract::ObjectExtractor;
use crate::{Item, PileValue, SyncReadBridge};
pub struct PdfMetaExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl PdfMetaExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_meta = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let file = match FileOptions::cached().load(bytes) {
Ok(x) => x,
Err(pdf::PdfError::Io { source }) => return Err(source),
Err(error) => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
error.to_string(),
));
}
};
let page_count = file.num_pages();
let mut meta: Vec<(&'static str, Option<String>)> = Vec::new();
if let Some(info) = &file.trailer.info_dict {
use pdf::primitive::PdfString;
let fields: &[(&'static str, Option<&PdfString>)] = &[
("title", info.title.as_ref()),
("author", info.author.as_ref()),
("subject", info.subject.as_ref()),
("keywords", info.keywords.as_ref()),
("creator", info.creator.as_ref()),
("producer", info.producer.as_ref()),
];
for (key, val) in fields {
meta.push((key, val.map(|s| s.to_string_lossy())));
}
meta.push((
"creation_date",
info.creation_date.as_ref().map(format_date),
));
meta.push(("mod_date", info.mod_date.as_ref().map(format_date)));
}
Ok::<_, std::io::Error>((page_count, meta))
})
.await
.map_err(std::io::Error::other)?;
let (page_count, raw_meta) = match raw_meta {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, PileValue> = HashMap::new();
#[expect(clippy::unwrap_used)]
output.insert(
Label::new("pages").unwrap(),
PileValue::U64(page_count as u64),
);
#[expect(clippy::unwrap_used)]
for (key, val) in raw_meta {
let label = Label::new(key).unwrap();
let value = match val {
Some(s) => PileValue::String(Arc::new(s.into())),
None => PileValue::Null,
};
output.insert(label, value);
}
return Ok(self.output.get_or_init(|| output));
}
}
fn format_date(d: &Date) -> String {
let tz = match d.rel {
TimeRel::Universal => "Z".to_owned(),
TimeRel::Later => format!("+{:02}:{:02}", d.tz_hour, d.tz_minute),
TimeRel::Earlier => format!("-{:02}:{:02}", d.tz_hour, d.tz_minute),
};
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}{}",
d.year, d.month, d.day, d.hour, d.minute, d.second, tz
)
}
#[async_trait::async_trait]
impl ObjectExtractor for PdfMetaExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,104 +0,0 @@
use image::ImageFormat;
use pdfium_render::prelude::*;
use std::{
io::{BufReader, Cursor},
sync::Arc,
};
use tracing::trace;
use crate::{Item, PileValue, SyncReadBridge, extract::ListExtractor};
pub struct PdfPagesExtractor {
item: Item,
}
impl PdfPagesExtractor {
pub fn new(item: &Item) -> Self {
Self { item: item.clone() }
}
async fn get_bytes(&self) -> Result<Vec<u8>, std::io::Error> {
let reader = SyncReadBridge::new_current(self.item.read().await?);
tokio::task::spawn_blocking(move || {
let mut b = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut b)?;
Ok::<_, std::io::Error>(b)
})
.await
.map_err(std::io::Error::other)?
}
}
#[async_trait::async_trait]
impl ListExtractor for PdfPagesExtractor {
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
let bytes = self.get_bytes().await?;
let png = tokio::task::spawn_blocking(move || {
let pdfium = Pdfium::default();
let doc = pdfium
.load_pdf_from_byte_slice(&bytes, None)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
if idx >= doc.pages().len() as usize {
return Ok::<_, std::io::Error>(None);
}
let render_config = PdfRenderConfig::new().set_target_width(1024);
let page = doc
.pages()
.get(idx as u16)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let image = page
.render_with_config(&render_config)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?
.as_image();
let mut png_bytes = Vec::new();
image
.write_to(&mut Cursor::new(&mut png_bytes), ImageFormat::Png)
.map_err(|e| std::io::Error::other(e.to_string()))?;
Ok(Some(png_bytes))
})
.await
.map_err(std::io::Error::other)?;
let value = match png {
Ok(None) => return Ok(None),
Ok(Some(bytes)) => PileValue::Blob {
mime: mime::IMAGE_PNG,
bytes: Arc::new(bytes),
},
Err(error) => {
trace!(message = "Could not render pdf page", ?error, idx, key = ?self.item.key());
PileValue::Null
}
};
Ok(Some(value))
}
async fn len(&self) -> Result<usize, std::io::Error> {
let bytes = self.get_bytes().await?;
let count = tokio::task::spawn_blocking(move || {
let pdfium = Pdfium::default();
let doc = pdfium
.load_pdf_from_byte_slice(&bytes, None)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
Ok::<_, std::io::Error>(doc.pages().len() as usize)
})
.await
.map_err(std::io::Error::other)?;
match count {
Ok(n) => Ok(n),
Err(error) => {
trace!(message = "Could not read pdf page count", ?error, key = ?self.item.key());
Ok(0)
}
}
}
// Override, extracting all pages is very slow,
// and we can't display binary in json anyway
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
Ok(serde_json::Value::String(format!(
"<PdfPages ({} pages)>",
self.len().await?
)))
}
}

View File

@@ -1,110 +0,0 @@
use pdf::content::{Op, TextDrawAdjusted};
use pdf::file::FileOptions;
use pile_config::Label;
use std::{
collections::HashMap,
io::BufReader,
sync::{Arc, OnceLock},
};
use tracing::trace;
use crate::extract::ObjectExtractor;
use crate::{Item, PileValue, SyncReadBridge};
pub struct PdfTextExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl PdfTextExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_text = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let file = match FileOptions::cached().load(bytes) {
Ok(x) => x,
Err(pdf::PdfError::Io { source }) => return Err(source),
Err(error) => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
error.to_string(),
));
}
};
let mut text_parts: Vec<String> = Vec::new();
for page in file.pages() {
let page = page.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
if let Some(content) = &page.contents {
let ops = content.operations(&file.resolver()).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
for op in ops {
match op {
Op::TextDraw { text } => {
text_parts.push(text.to_string_lossy());
}
Op::TextDrawAdjusted { array } => {
for item in array {
if let TextDrawAdjusted::Text(text) = item {
text_parts.push(text.to_string_lossy());
}
}
}
_ => {}
}
}
}
}
Ok::<_, std::io::Error>(text_parts.join(" "))
})
.await
.map_err(std::io::Error::other)?;
let raw_text = match raw_text {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
#[expect(clippy::unwrap_used)]
let output = HashMap::from([(
Label::new("text").unwrap(),
PileValue::String(Arc::new(raw_text.into())),
)]);
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for PdfTextExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,44 +0,0 @@
use pile_config::Label;
use std::sync::OnceLock;
use crate::{
Item, PileValue,
extract::{ObjectExtractor, TomlExtractor},
};
pub struct SidecarExtractor {
item: Item,
output: OnceLock<Option<TomlExtractor>>,
}
impl SidecarExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
}
#[async_trait::async_trait]
impl ObjectExtractor for SidecarExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
match self
.output
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
{
Some(x) => Ok(x.field(name).await?),
None => Ok(Some(PileValue::Null)),
}
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
match self
.output
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
{
Some(x) => Ok(x.fields().await?),
None => Ok(Vec::new()),
}
}
}

View File

@@ -1,75 +0,0 @@
use pile_config::Label;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use crate::{AsyncReader, Item, PileValue, extract::ObjectExtractor};
fn toml_to_pile(value: toml::Value) -> PileValue {
match value {
toml::Value::String(s) => PileValue::String(Arc::new(s.into())),
toml::Value::Integer(i) => PileValue::String(Arc::new(i.to_string().into())),
toml::Value::Float(f) => PileValue::String(Arc::new(f.to_string().into())),
toml::Value::Boolean(b) => PileValue::String(Arc::new(b.to_string().into())),
toml::Value::Datetime(d) => PileValue::String(Arc::new(d.to_string().into())),
toml::Value::Array(a) => {
PileValue::Array(Arc::new(a.into_iter().map(toml_to_pile).collect()))
}
toml::Value::Table(_) => PileValue::Null,
}
}
pub struct TomlExtractor {
item: Item,
output: OnceLock<HashMap<Label, PileValue>>,
}
impl TomlExtractor {
pub fn new(item: &Item) -> Self {
Self {
item: item.clone(),
output: OnceLock::new(),
}
}
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
if let Some(x) = self.output.get() {
return Ok(x);
}
let mut reader = match self.item.read().await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(self.output.get_or_init(HashMap::new));
}
Err(e) => return Err(e),
};
let bytes = reader.read_to_end().await?;
let toml: toml::Value = match toml::from_slice(&bytes) {
Ok(x) => x,
Err(_) => return Ok(self.output.get_or_init(HashMap::new)),
};
let output: HashMap<Label, PileValue> = match toml {
toml::Value::Table(t) => t
.into_iter()
.filter_map(|(k, v)| Label::new(&k).map(|label| (label, toml_to_pile(v))))
.collect(),
_ => HashMap::new(),
};
return Ok(self.output.get_or_init(|| output));
}
}
#[async_trait::async_trait]
impl ObjectExtractor for TomlExtractor {
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
Ok(self.get_inner().await?.get(name).cloned())
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.get_inner().await?.keys().cloned().collect())
}
}

View File

@@ -1,9 +1,6 @@
use itertools::Itertools;
use pile_config::{Case, ConfigToml, DatasetFts, FieldSpecPost, Label};
use std::{
path::PathBuf,
sync::{Arc, LazyLock},
};
use pile_config::{ConfigToml, DatasetFts, Label};
use pile_value::value::{Item, PileValue};
use std::{path::PathBuf, sync::LazyLock};
use tantivy::{
DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError,
collector::Collector,
@@ -12,8 +9,6 @@ use tantivy::{
};
use tracing::{debug, trace, warn};
use crate::{Item, PileValue, extract::MetaExtractor};
#[derive(Debug, Clone)]
pub struct FtsLookupResult {
pub score: f32,
@@ -76,11 +71,11 @@ impl DbFtsIndex {
doc.add_text(self.schema.get_field("_meta_source")?, item.source_name());
doc.add_text(self.schema.get_field("_meta_key")?, key);
let extractor = PileValue::ObjectExtractor(Arc::new(MetaExtractor::new(item)));
let item = PileValue::Item(item.clone());
let mut empty = true;
for name in self.fts_cfg().fields.keys() {
let x = self.get_field(&extractor, name).await?;
let x = self.get_field(&item, name).await?;
let val = match x {
Some(x) => x,
@@ -135,13 +130,6 @@ impl DbFtsIndex {
x => x.clone(),
};
for post in &field.post {
val = match apply(post, &val) {
Some(x) => x,
None => return Ok(None),
};
}
loop {
val = match val {
PileValue::String(x) => return Ok(Some(x.to_string())),
@@ -186,6 +174,15 @@ impl DbFtsIndex {
continue 'outer;
}
PileValue::Item(_) => {
trace!(
message = "Skipping field, is item",
field = field_name.to_string(),
?path,
);
continue 'outer;
}
PileValue::ListExtractor(_) => {
trace!(
message = "Skipping field, is ListExtractor",
@@ -296,104 +293,3 @@ impl DbFtsIndex {
return Ok(out);
}
}
pub fn apply(post: &FieldSpecPost, val: &PileValue) -> Option<PileValue> {
Some(match post {
FieldSpecPost::NotEmpty { notempty: false } => val.clone(),
FieldSpecPost::NotEmpty { notempty: true } => match val {
PileValue::Null => return None,
PileValue::String(x) if x.is_empty() => return None,
PileValue::Array(x) if x.is_empty() => return None,
x => x.clone(),
},
FieldSpecPost::SetCase { case: Case::Lower } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_lowercase().into())),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter().map(|x| apply(post, x)).collect::<Option<_>>()?,
)),
},
FieldSpecPost::SetCase { case: Case::Upper } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(x.as_str().to_uppercase().into())),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::TrimSuffix { trim_suffix } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(
x.strip_suffix(trim_suffix).unwrap_or(x.as_str()).into(),
)),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::TrimPrefix { trim_prefix } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(Arc::new(
x.strip_prefix(trim_prefix).unwrap_or(x.as_str()).into(),
)),
PileValue::Array(x) => PileValue::Array(Arc::new(
x.iter()
.map(|x| apply(post, x))
.collect::<Option<Vec<_>>>()?,
)),
},
FieldSpecPost::Join { join } => match val {
PileValue::Null => return None,
PileValue::U64(_) => return None,
PileValue::I64(_) => return None,
PileValue::Blob { .. } => return None,
PileValue::ObjectExtractor(_) => return None,
PileValue::ListExtractor(_) => return None,
PileValue::String(x) => PileValue::String(x.clone()),
PileValue::Array(x) => PileValue::String(Arc::new(
x.iter()
.map(|x| apply(post, x))
.map(|x| x.and_then(|x| x.as_str().map(|x| x.to_owned())))
.collect::<Option<Vec<_>>>()?
.into_iter()
.join(join)
.into(),
)),
},
})
}

View File

@@ -1,294 +0,0 @@
use mime::Mime;
use smartstring::{LazyCompact, SmartString};
use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::PathBuf,
sync::Arc,
};
use tokio::runtime::Handle;
use crate::source::{DirDataSource, S3DataSource};
//
// MARK: item
//
/// A cheaply-clonable pointer to an item in a dataset
#[derive(Debug, Clone)]
pub enum Item {
File {
source: Arc<DirDataSource>,
mime: Mime,
path: PathBuf,
sidecar: Option<Box<Item>>,
},
S3 {
source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>,
sidecar: Option<Box<Item>>,
},
}
impl Item {
/// Open the item for reading. For S3, performs a HEAD request to determine
/// the object size.
pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
Ok(match self {
Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let head = source
.client
.head_object()
.bucket(source.bucket.as_str())
.key(key.as_str())
.send()
.await
.map_err(std::io::Error::other)?;
let size = head.content_length().unwrap_or(0) as u64;
ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: key.to_owned(),
cursor: 0,
size,
})
}
})
}
pub fn source_name(&self) -> &pile_config::Label {
match self {
Self::File { source, .. } => &source.name,
Self::S3 { source, .. } => &source.name,
}
}
#[expect(clippy::expect_used)]
pub fn key(&self) -> SmartString<LazyCompact> {
match self {
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
Self::S3 { key, .. } => key.clone(),
}
}
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
match self {
Self::File { path, .. } => {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
std::io::copy(&mut file, &mut hasher)?;
return Ok(hasher.finalize());
}
Self::S3 { .. } => todo!(),
}
}
pub fn mime(&self) -> &Mime {
match self {
Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
}
}
pub fn sidecar(&self) -> Option<&Self> {
match self {
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
}
}
}
//
// MARK: reader
//
pub trait AsyncReader: Send {
/// Read a chunk of bytes.
fn read(
&mut self,
buf: &mut [u8],
) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
/// Read all remaining bytes into a `Vec`.
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
async {
let mut buf = Vec::new();
let mut chunk = vec![0u8; 65536];
loop {
let n = self.read(&mut chunk).await?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
}
Ok(buf)
}
}
}
pub trait AsyncSeekReader: AsyncReader {
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
}
//
// MARK: sync bridge
//
/// Turn an async [Reader] into a sync [Read] + [Seek].
///
/// Never use this outside of [tokio::task::spawn_blocking],
/// the async runtime will deadlock if this struct blocks
/// the runtime.
pub struct SyncReadBridge<R: AsyncReader> {
inner: R,
handle: Handle,
}
impl<R: AsyncReader> SyncReadBridge<R> {
/// Creates a new adapter using a handle to the current runtime.
/// Panics if called outside of tokio
pub fn new_current(inner: R) -> Self {
Self::new(inner, Handle::current())
}
/// Creates a new adapter using a handle to an existing runtime.
pub fn new(inner: R, handle: Handle) -> Self {
Self { inner, handle }
}
}
impl<R: AsyncReader> Read for SyncReadBridge<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.handle.block_on(self.inner.read(buf))
}
}
impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.handle.block_on(self.inner.seek(pos))
}
}
//
// MARK: itemreader
//
pub enum ItemReader {
File(File),
S3(S3Reader),
}
impl AsyncReader for ItemReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self {
Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await,
}
}
}
impl AsyncSeekReader for ItemReader {
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match self {
Self::File(x) => x.seek(pos),
Self::S3(x) => x.seek(pos).await,
}
}
}
//
// MARK: S3Reader
//
pub struct S3Reader {
client: Arc<aws_sdk_s3::Client>,
bucket: SmartString<LazyCompact>,
key: SmartString<LazyCompact>,
cursor: u64,
size: u64,
}
impl AsyncReader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
let start_byte = self.cursor;
let len_to_read = (buf.len() as u64).min(len_left);
let end_byte = start_byte + len_to_read - 1;
let resp = self
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
.map_err(std::io::Error::other)?;
let bytes = resp
.body
.collect()
.await
.map(|x| x.into_bytes())
.map_err(std::io::Error::other)?;
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
self.cursor += n as u64;
Ok(n)
}
}
impl AsyncSeekReader for S3Reader {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
std::io::SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.size - abs;
} else {
self.cursor = self.size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.size);
Ok(self.cursor)
}
}

View File

@@ -1,21 +1,7 @@
mod traits;
pub use traits::*;
mod misc;
pub use misc::*;
mod dataset;
pub use dataset::{Dataset, DatasetError, Datasets};
mod item;
pub use item::*;
mod value;
pub use value::*;
pub mod extract;
pub mod index;
pub mod source;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -1,121 +0,0 @@
use chrono::{DateTime, Utc};
use std::fs;
use std::path::Path;
/// Returns the age of a path as a [DateTime].
/// - If the path doesn't exist, returns [None]
/// - If it's a file, returns the modified time
/// - If it's a directory, returns the LATEST modified time of all files within
pub fn path_ts_latest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let path = path.as_ref();
if !path.exists() {
return Ok(None);
}
let metadata = fs::metadata(path)?;
if metadata.is_file() {
let modified = metadata.modified()?;
Ok(Some(modified.into()))
} else if metadata.is_dir() {
find_latest_modified(path)
} else {
Ok(None)
}
}
/// Returns the age of a path as a [DateTime].
/// - If the path doesn't exist, returns [None]
/// - If it's a file, returns the modified time
/// - If it's a directory, returns the EARLIEST modified time of all files within
pub fn path_ts_earliest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let path = path.as_ref();
if !path.exists() {
return Ok(None);
}
let metadata = fs::metadata(path)?;
if metadata.is_file() {
let modified = metadata.modified()?;
Ok(Some(modified.into()))
} else if metadata.is_dir() {
find_earliest_modified(path)
} else {
Ok(None)
}
}
fn find_latest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut latest: Option<DateTime<Utc>> = None;
// Include the directory's own modification time
let dir_metadata = fs::metadata(dir)?;
if let Ok(modified) = dir_metadata.modified() {
let dt: DateTime<Utc> = modified.into();
latest = Some(dt);
}
let entries = fs::read_dir(dir)?;
for entry in entries.flatten() {
let path = entry.path();
let metadata = entry.metadata()?;
if metadata.is_file() {
if let Ok(modified) = metadata.modified() {
let dt: DateTime<Utc> = modified.into();
latest = Some(match latest {
Some(prev) if prev > dt => prev,
_ => dt,
});
}
} else if metadata.is_dir()
&& let Some(dir_latest) = find_latest_modified(&path)?
{
latest = Some(match latest {
Some(prev) if prev > dir_latest => prev,
_ => dir_latest,
});
}
}
return Ok(latest);
}
fn find_earliest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut earliest: Option<DateTime<Utc>> = None;
// Include the directory's own modification time
let dir_metadata = fs::metadata(dir)?;
if let Ok(modified) = dir_metadata.modified() {
let dt: DateTime<Utc> = modified.into();
earliest = Some(dt);
}
let entries = fs::read_dir(dir)?;
for entry in entries.flatten() {
let path = entry.path();
let metadata = entry.metadata()?;
if metadata.is_file() {
if let Ok(modified) = metadata.modified() {
let dt: DateTime<Utc> = modified.into();
earliest = Some(match earliest {
Some(prev) if prev < dt => prev,
_ => dt,
});
}
} else if metadata.is_dir()
&& let Some(dir_earliest) = find_earliest_modified(&path)?
{
earliest = Some(match earliest {
Some(prev) if prev < dir_earliest => prev,
_ => dir_earliest,
});
}
}
return Ok(earliest);
}

View File

@@ -5,12 +5,13 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::{Label, objectpath::ObjectPath};
use pile_value::value::PileValue;
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::{Datasets, PileValue, extract::MetaExtractor};
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct FieldQuery {
@@ -61,10 +62,8 @@ pub async fn get_field(
return StatusCode::NOT_FOUND.into_response();
};
let extractor = MetaExtractor::new(&item);
let root: PileValue = PileValue::ObjectExtractor(Arc::new(extractor));
let value = match root.query(&path).await {
let item = PileValue::Item(item);
let value = match item.query(&path).await {
Ok(Some(v)) => v,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),

View File

@@ -4,12 +4,13 @@ use axum::{
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_value::value::AsyncReader;
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::{AsyncReader, Datasets};
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ItemQuery {

View File

@@ -1,128 +0,0 @@
use chrono::{DateTime, Utc};
use pile_config::Label;
use std::{path::PathBuf, sync::Arc};
use tokio_stream::wrappers::ReceiverStream;
use walkdir::WalkDir;
use crate::{DataSource, Item, path_ts_latest};
#[derive(Debug)]
pub struct DirDataSource {
pub name: Label,
pub dir: PathBuf,
pub sidecars: bool,
}
impl DirDataSource {
pub fn new(name: &Label, dir: PathBuf, sidecars: bool) -> Self {
Self {
name: name.clone(),
dir,
sidecars,
}
}
}
impl DataSource for Arc<DirDataSource> {
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
let key = match key.parse::<PathBuf>() {
Ok(x) => self.dir.join(x),
Err(_) => return Ok(None),
};
if !key.is_file() {
return Ok(None);
}
// Ignore toml files if sidecars are enabled
if self.sidecars && key.extension().and_then(|x| x.to_str()) == Some("toml") {
return Ok(None);
}
return Ok(Some(Item::File {
source: Arc::clone(self),
mime: mime_guess::from_path(&key).first_or_octet_stream(),
path: key.clone(),
sidecar: self.sidecars.then(|| {
Box::new(Item::File {
source: Arc::clone(self),
mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(),
path: key.with_extension("toml"),
sidecar: None,
})
}),
}));
}
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let source = Arc::clone(self);
let dir = self.dir.clone();
tokio::task::spawn_blocking(move || {
for entry in WalkDir::new(dir) {
let entry = match entry {
Err(e) => {
let msg = format!("walkdir error: {e:?}");
let err = e.into_io_error().unwrap_or(std::io::Error::other(msg));
if tx.blocking_send(Err(err)).is_err() {
return;
}
continue;
}
Ok(e) => e,
};
if entry.file_type().is_dir() {
continue;
}
let path = entry.into_path();
let item = match path.extension().and_then(|x| x.to_str()) {
None => continue,
Some("toml") if source.sidecars => continue,
Some(_) => Item::File {
source: Arc::clone(&source),
mime: mime_guess::from_path(&path).first_or_octet_stream(),
path: path.clone(),
sidecar: source.sidecars.then(|| {
Box::new(Item::File {
source: Arc::clone(&source),
mime: mime_guess::from_path(path.with_extension("toml"))
.first_or_octet_stream(),
path: path.with_extension("toml"),
sidecar: None,
})
}),
},
};
if tx.blocking_send(Ok(item)).is_err() {
return;
}
}
});
ReceiverStream::new(rx)
}
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut ts: Option<DateTime<Utc>> = None;
if !self.dir.exists() {
return Ok(None);
}
let new = path_ts_latest(&self.dir)?;
match (ts, new) {
(_, None) => {}
(None, Some(new)) => ts = Some(new),
(Some(old), Some(new)) => ts = Some(old.max(new)),
};
return Ok(ts);
}
}

View File

@@ -1,5 +0,0 @@
mod dir;
pub use dir::*;
mod s3;
pub use s3::*;

View File

@@ -1,255 +0,0 @@
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use chrono::{DateTime, Utc};
use pile_config::{Label, S3Credentials};
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use crate::{DataSource, Item};
#[derive(Debug)]
pub struct S3DataSource {
pub name: Label,
pub bucket: SmartString<LazyCompact>,
pub prefix: Option<SmartString<LazyCompact>>,
pub sidecars: bool,
pub client: Arc<aws_sdk_s3::Client>,
}
impl S3DataSource {
pub fn new(
name: &Label,
bucket: String,
prefix: Option<String>,
endpoint: Option<String>,
region: String,
credentials: &S3Credentials,
sidecars: bool,
) -> Result<Self, std::io::Error> {
let client = {
let creds = Credentials::new(
&credentials.access_key_id,
&credentials.secret_access_key,
None,
None,
"pile",
);
let mut s3_config = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())
.region(Region::new(region))
.credentials_provider(creds);
if let Some(ep) = endpoint {
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
}
aws_sdk_s3::Client::from_conf(s3_config.build())
};
Ok(Self {
name: name.clone(),
bucket: bucket.into(),
prefix: prefix.map(|x| x.into()),
sidecars,
client: Arc::new(client),
})
}
async fn find_sidecar_key(&self, key: &str) -> Option<SmartString<LazyCompact>> {
// First try {key}.toml
let full_toml = format!("{key}.toml");
if self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(&full_toml)
.send()
.await
.is_ok()
{
return Some(full_toml.into());
}
// Then try {key-with-extension-stripped}.toml
let stripped = std::path::Path::new(key).with_extension("toml");
if let Some(stripped_str) = stripped.to_str()
&& stripped_str != full_toml.as_str()
&& self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(stripped_str)
.send()
.await
.is_ok()
{
return Some(stripped_str.into());
}
None
}
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
let key: SmartString<LazyCompact> = key.into();
let mime = mime_guess::from_path(key.as_str()).first_or_octet_stream();
let sidecar = if self.sidecars {
self.find_sidecar_key(key.as_str())
.await
.map(|sidecar_key| {
Box::new(Item::S3 {
source: Arc::clone(self),
mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(),
key: sidecar_key,
sidecar: None,
})
})
} else {
None
};
Item::S3 {
source: Arc::clone(self),
mime,
key,
sidecar,
}
}
}
impl DataSource for Arc<S3DataSource> {
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
if self.sidecars && key.ends_with(".toml") {
return Ok(None);
}
let result = self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(key)
.send()
.await;
match result {
Err(sdk_err) => {
let not_found = sdk_err
.as_service_error()
.map(|e| e.is_not_found())
.unwrap_or(false);
if not_found {
return Ok(None);
}
Err(std::io::Error::other(sdk_err))
}
Ok(_) => Ok(Some(self.make_item(key).await)),
}
}
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let source = Arc::clone(self);
tokio::spawn(async move {
let mut continuation_token: Option<String> = None;
loop {
let mut req = source
.client
.list_objects_v2()
.bucket(source.bucket.as_str());
if let Some(prefix) = &source.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = match req.send().await {
Err(e) => {
let _ = tx.send(Err(std::io::Error::other(e))).await;
break;
}
Ok(resp) => resp,
};
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
let key = match obj.key() {
Some(k) => k.to_owned(),
None => continue,
};
if source.sidecars && key.ends_with(".toml") {
continue;
}
let item = source.make_item(key).await;
if tx.send(Ok(item)).await.is_err() {
return;
}
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
});
ReceiverStream::new(rx)
}
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut ts: Option<DateTime<Utc>> = None;
let mut continuation_token: Option<String> = None;
loop {
let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str());
if let Some(prefix) = &self.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = match req.send().await {
Err(_) => return Ok(None),
Ok(resp) => resp,
};
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
if let Some(last_modified) = obj.last_modified() {
let dt = DateTime::from_timestamp(
last_modified.secs(),
last_modified.subsec_nanos(),
);
if let Some(dt) = dt {
ts = Some(match ts {
None => dt,
Some(prev) => prev.max(dt),
});
}
}
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
Ok(ts)
}
}

View File

@@ -1,158 +0,0 @@
use aws_sdk_s3::{error::SdkError, operation::get_object::GetObjectError};
use mime::Mime;
use std::io::{Error as IoError, Seek, SeekFrom, Write};
use thiserror::Error;
use super::S3Client;
use crate::retry;
#[derive(Debug, Error)]
#[expect(clippy::large_enum_variant)]
pub enum S3ReaderError {
#[error("sdk error")]
SdkError(#[from] SdkError<GetObjectError>),
#[error("byte stream error")]
ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError),
#[error("i/o error")]
IoError(#[from] IoError),
}
/// Provides a [`std::io::Read`]-like interface to an S3 object. \
/// This doesn't actually implement [`std::io::Read`] because Read isn't async.
///
/// Also implements [`std::io::Seek`]
pub struct S3Reader {
pub(super) client: S3Client,
pub(super) bucket: String,
pub(super) key: String,
pub(super) cursor: u64,
pub(super) size: u64,
pub(super) mime: Mime,
}
impl S3Reader {
pub async fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, S3ReaderError> {
let len_left = self.size - self.cursor;
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
#[expect(clippy::unwrap_used)] // TODO: probably fits?
let start_byte = usize::try_from(self.cursor).unwrap();
#[expect(clippy::unwrap_used)] // usize fits in u64
let len_to_read = u64::try_from(buf.len()).unwrap().min(len_left);
#[expect(clippy::unwrap_used)] // must fit, we called min()
let len_to_read = usize::try_from(len_to_read).unwrap();
let end_byte = start_byte + len_to_read - 1;
let b = retry!(
self.client.retries,
self.client
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
)?;
// Looks like `bytes 31000000-31999999/33921176``
// println!("{:?}", b.content_range);
let mut bytes = b.body.collect().await?.into_bytes();
bytes.truncate(len_to_read);
let l = bytes.len();
// Memory to memory writes are infallible
#[expect(clippy::unwrap_used)]
buf.write_all(&bytes).unwrap();
// Cannot fail, usize should always fit into u64
#[expect(clippy::unwrap_used)]
{
self.cursor += u64::try_from(l).unwrap();
}
return Ok(len_to_read);
}
pub fn is_done(&self) -> bool {
return self.cursor == self.size;
}
pub fn mime(&self) -> &Mime {
&self.mime
}
/// Write the entire contents of this reader to `r`.
///
/// This method always downloads the whole object,
/// and always preserves `self.cursor`.
pub async fn download<W: Write>(&mut self, r: &mut W) -> Result<(), S3ReaderError> {
let pos = self.stream_position()?;
const BUF_LEN: usize = 10_000_000;
#[expect(clippy::unwrap_used)] // Cannot fail
let mut buf: Box<[u8; BUF_LEN]> = vec![0u8; BUF_LEN].try_into().unwrap();
while !self.is_done() {
let b = self.read(&mut buf[..]).await?;
r.write_all(&buf[0..b])?;
}
self.seek(SeekFrom::Start(pos))?;
Ok(())
}
}
impl Seek for S3Reader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size - 1),
// Cannot panic, we handle all cases
#[expect(clippy::unwrap_used)]
SeekFrom::Current(x) => {
if x < 0 {
if u64::try_from(x.abs()).unwrap() > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= u64::try_from(x.abs()).unwrap();
} else {
self.cursor += u64::try_from(x).unwrap();
}
}
// Cannot panic, we handle all cases
#[expect(clippy::unwrap_used)]
SeekFrom::End(x) => {
if x < 0 {
if u64::try_from(x.abs()).unwrap() > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
// Cannot fail, is abs
self.cursor = self.size - u64::try_from(x.abs()).unwrap();
} else {
// Cannot fail, is positive
self.cursor = self.size + u64::try_from(x).unwrap();
}
}
}
self.cursor = self.cursor.min(self.size - 1);
return Ok(self.cursor);
}
}

View File

@@ -1,18 +0,0 @@
use chrono::{DateTime, Utc};
use tokio_stream::wrappers::ReceiverStream;
use crate::Item;
/// A read-only set of [Item]s.
pub trait DataSource {
/// Get an item from this datasource
fn get(&self, key: &str) -> impl Future<Output = Result<Option<Item>, std::io::Error>> + Send;
/// Iterate over all items in this source in an arbitrary order
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>>;
/// Return the time of the latest change to the data in this source
fn latest_change(
&self,
) -> impl Future<Output = Result<Option<DateTime<Utc>>, std::io::Error>> + Send;
}

View File

@@ -1,194 +0,0 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment};
use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use crate::extract::{ListExtractor, ObjectExtractor};
/// An immutable, cheaply-clonable, lazily-computed value.
/// Very similar to [serde_json::Value].
pub enum PileValue {
Null,
U64(u64),
I64(i64),
/// A string
String(Arc<SmartString<LazyCompact>>),
/// An array of values
Array(Arc<Vec<PileValue>>),
/// A binary blob
Blob {
mime: Mime,
bytes: Arc<Vec<u8>>,
},
/// A lazily-computed map of {label: value}
ObjectExtractor(Arc<dyn ObjectExtractor>),
/// A lazily-computed array
ListExtractor(Arc<dyn ListExtractor>),
}
impl Clone for PileValue {
fn clone(&self) -> Self {
match self {
Self::Null => Self::Null,
Self::U64(x) => Self::U64(*x),
Self::I64(x) => Self::I64(*x),
Self::String(x) => Self::String(x.clone()),
Self::Array(x) => Self::Array(x.clone()),
Self::ObjectExtractor(x) => Self::ObjectExtractor(x.clone()),
Self::ListExtractor(x) => Self::ListExtractor(x.clone()),
Self::Blob { mime, bytes } => Self::Blob {
mime: mime.clone(),
bytes: bytes.clone(),
},
}
}
}
impl PileValue {
pub async fn query(&self, query: &ObjectPath) -> Result<Option<Self>, std::io::Error> {
let mut out: Option<PileValue> = Some(self.clone());
for s in &query.segments {
match s {
PathSegment::Root => out = Some(self.clone()),
PathSegment::Field(field) => {
out = match out {
None => return Ok(None),
Some(Self::Null) => None,
Some(Self::U64(_)) => None,
Some(Self::I64(_)) => None,
Some(Self::Array(_)) => None,
Some(Self::String(_)) => None,
Some(Self::Blob { .. }) => None,
Some(Self::ListExtractor(_)) => None,
Some(Self::ObjectExtractor(e)) => e.field(field).await?,
}
}
PathSegment::Index(idx) => {
out = match &out {
None => return Ok(None),
Some(Self::Null) => None,
Some(Self::U64(_)) => None,
Some(Self::I64(_)) => None,
Some(Self::Blob { .. }) => None,
Some(Self::Array(v)) => {
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(v.len() as i64 - idx).ok()
};
idx.and_then(|idx| v.get(idx)).cloned()
}
Some(Self::String(_)) => None,
Some(Self::ObjectExtractor(_)) => None,
Some(Self::ListExtractor(e)) => {
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(e.len().await? as i64 - idx).ok()
};
match idx {
Some(idx) => e.get(idx).await?,
None => None,
}
}
}
}
}
}
return Ok(out.clone());
}
/// Like `to_json`, but counts populated fields instead of collecting values.
///
/// - Leaf values (non-null scalars, arrays, blobs) contribute `Some(1)`.
/// - `Null` contributes `None`.
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
/// only the fields that had data, or `None` if all fields were absent.
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
pub async fn count_fields(&self) -> Result<Option<Value>, std::io::Error> {
Ok(match self {
Self::Null => None,
Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => {
Some(Value::Number(1u64.into()))
}
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
Self::ObjectExtractor(e) => {
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
Some(x) => x,
None => continue,
};
if let Some(counted) = Box::pin(v.count_fields()).await? {
map.insert(k.to_string(), counted);
}
}
if map.is_empty() {
None
} else {
Some(Value::Object(map))
}
}
})
}
pub fn as_str(&self) -> Option<&str> {
match self {
Self::String(x) => Some(x),
_ => None,
}
}
pub async fn to_json(&self) -> Result<Value, std::io::Error> {
Ok(match self {
Self::Null => Value::Null,
Self::U64(x) => Value::Number((*x).into()),
Self::I64(x) => Value::Number((*x).into()),
// TODO: replace with something meaningful
Self::Blob { mime, bytes } => {
Value::String(format!("<Blob ({mime}, {} bytes)>", bytes.len()))
}
Self::String(x) => Value::String(x.to_string()),
Self::Array(x) => {
let mut arr = Vec::new();
for item in &**x {
arr.push(Box::pin(item.to_json()).await?);
}
Value::Array(arr)
}
Self::ObjectExtractor(e) => {
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Value::Object(map)
}
Self::ListExtractor(e) => e.to_json().await?,
})
}
}