x
This commit is contained in:
36
crates/pile-value/Cargo.toml
Normal file
36
crates/pile-value/Cargo.toml
Normal file
@@ -0,0 +1,36 @@
|
||||
[package]
|
||||
name = "pile-value"
|
||||
version = { workspace = true }
|
||||
rust-version = { workspace = true }
|
||||
edition = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
pile-config = { workspace = true }
|
||||
pile-flac = { workspace = true }
|
||||
|
||||
serde_json = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
smartstring = { workspace = true }
|
||||
blake3 = { workspace = true }
|
||||
epub = { workspace = true }
|
||||
kamadak-exif = { workspace = true }
|
||||
pdf = { workspace = true }
|
||||
pdfium-render = { workspace = true, optional = true }
|
||||
image = { workspace = true, optional = true }
|
||||
id3 = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
aws-sdk-s3 = { workspace = true }
|
||||
mime = { workspace = true }
|
||||
mime_guess = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
pdfium = ["dep:pdfium-render", "dep:image"]
|
||||
57
crates/pile-value/build.rs
Normal file
57
crates/pile-value/build.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
const PDFIUM_URL: &str = "https://github.com/bblanchon/pdfium-binaries/releases/download/chromium%2F7725/pdfium-linux-x64.tgz";
|
||||
|
||||
#[expect(clippy::expect_used)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
#[expect(clippy::print_stderr)]
|
||||
fn main() {
|
||||
println!("cargo:rerun-if-changed=build.rs");
|
||||
|
||||
if env::var("CARGO_FEATURE_PDFIUM").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
|
||||
|
||||
// OUT_DIR is target/<profile>/build/<pkg>-<hash>/out
|
||||
// Go up 3 levels to reach target/<profile>/
|
||||
let profile_dir = out_dir
|
||||
.ancestors()
|
||||
.nth(3)
|
||||
.expect("unexpected OUT_DIR structure")
|
||||
.to_path_buf();
|
||||
|
||||
let lib_path = profile_dir.join("libpdfium.so");
|
||||
|
||||
if !lib_path.exists() {
|
||||
let tgz_path = out_dir.join("pdfium.tgz");
|
||||
|
||||
eprintln!("cargo:warning=Downloading PDFium from {PDFIUM_URL}");
|
||||
|
||||
let status = std::process::Command::new("curl")
|
||||
.args(["-L", "--fail", "-o", tgz_path.to_str().unwrap(), PDFIUM_URL])
|
||||
.status()
|
||||
.expect("failed to run curl");
|
||||
assert!(status.success(), "curl failed to download PDFium");
|
||||
|
||||
let status = std::process::Command::new("tar")
|
||||
.args([
|
||||
"-xzf",
|
||||
tgz_path.to_str().unwrap(),
|
||||
"-C",
|
||||
out_dir.to_str().unwrap(),
|
||||
])
|
||||
.status()
|
||||
.expect("failed to run tar");
|
||||
assert!(status.success(), "tar failed to extract PDFium");
|
||||
|
||||
std::fs::copy(out_dir.join("lib").join("libpdfium.so"), &lib_path)
|
||||
.expect("failed to copy libpdfium.so");
|
||||
}
|
||||
|
||||
println!("cargo:rustc-link-search=native={}", profile_dir.display());
|
||||
println!("cargo:rustc-link-lib=dylib=pdfium");
|
||||
println!("cargo:rustc-link-arg=-Wl,-rpath,$ORIGIN");
|
||||
}
|
||||
95
crates/pile-value/src/extract/item/epub/epub_meta.rs
Normal file
95
crates/pile-value/src/extract/item/epub/epub_meta.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use epub::doc::EpubDoc;
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{
|
||||
extract::traits::ObjectExtractor,
|
||||
value::{Item, PileValue, SyncReadBridge},
|
||||
};
|
||||
|
||||
pub struct EpubMetaExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl EpubMetaExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let key = self.item.key();
|
||||
let ext = key.as_str().rsplit('.').next();
|
||||
if !matches!(ext, Some("epub")) {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_meta = tokio::task::spawn_blocking(move || {
|
||||
let doc = EpubDoc::from_reader(reader)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let fields: &[&'static str] = &[
|
||||
"title",
|
||||
"creator",
|
||||
"description",
|
||||
"language",
|
||||
"publisher",
|
||||
"date",
|
||||
"subject",
|
||||
"identifier",
|
||||
];
|
||||
|
||||
let meta: Vec<(&'static str, Option<String>)> =
|
||||
fields.iter().map(|&key| (key, doc.mdata(key))).collect();
|
||||
|
||||
Ok::<_, std::io::Error>(meta)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_meta = match raw_meta {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue> = HashMap::new();
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
for (key, val) in raw_meta {
|
||||
let label = Label::new(key).unwrap();
|
||||
let value = match val {
|
||||
Some(s) => PileValue::String(Arc::new(s.into())),
|
||||
None => PileValue::Null,
|
||||
};
|
||||
output.insert(label, value);
|
||||
}
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for EpubMetaExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
102
crates/pile-value/src/extract/item/epub/epub_text.rs
Normal file
102
crates/pile-value/src/extract/item/epub/epub_text.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use epub::doc::EpubDoc;
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{value::{Item, PileValue, SyncReadBridge}, extract::traits::ObjectExtractor};
|
||||
|
||||
pub struct EpubTextExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl EpubTextExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let key = self.item.key();
|
||||
let ext = key.as_str().rsplit('.').next();
|
||||
if !matches!(ext, Some("epub")) {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_text = tokio::task::spawn_blocking(move || {
|
||||
let mut doc = EpubDoc::from_reader(reader)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let mut text_parts: Vec<String> = Vec::new();
|
||||
|
||||
loop {
|
||||
if let Ok(content) = doc.get_current_str() {
|
||||
text_parts.push(strip_html(&content));
|
||||
}
|
||||
if doc.go_next().is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, std::io::Error>(text_parts.join(" "))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_text = match raw_text {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let output = HashMap::from([(
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::String(Arc::new(raw_text.into())),
|
||||
)]);
|
||||
|
||||
let _ = self.output.set(output);
|
||||
#[expect(clippy::unwrap_used)]
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
/// Strip HTML/XHTML tags from a string, leaving only text nodes.
|
||||
fn strip_html(html: &str) -> String {
|
||||
let mut result = String::with_capacity(html.len());
|
||||
let mut in_tag = false;
|
||||
|
||||
for c in html.chars() {
|
||||
match c {
|
||||
'<' => in_tag = true,
|
||||
'>' => in_tag = false,
|
||||
_ if !in_tag => result.push(c),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for EpubTextExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
46
crates/pile-value/src/extract/item/epub/mod.rs
Normal file
46
crates/pile-value/src/extract/item/epub/mod.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use pile_config::Label;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod epub_meta;
|
||||
pub use epub_meta::*;
|
||||
|
||||
mod epub_text;
|
||||
pub use epub_text::*;
|
||||
|
||||
use crate::{
|
||||
extract::traits::ObjectExtractor,
|
||||
value::{Item, PileValue},
|
||||
};
|
||||
|
||||
pub struct EpubExtractor {
|
||||
text: Arc<EpubTextExtractor>,
|
||||
meta: Arc<EpubMetaExtractor>,
|
||||
}
|
||||
|
||||
impl EpubExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
text: Arc::new(EpubTextExtractor::new(item)),
|
||||
meta: Arc::new(EpubMetaExtractor::new(item)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for EpubExtractor {
|
||||
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
match name.as_str() {
|
||||
"text" => self.text.field(name).await,
|
||||
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(vec![
|
||||
Label::new("text").unwrap(),
|
||||
Label::new("meta").unwrap(),
|
||||
])
|
||||
}
|
||||
}
|
||||
93
crates/pile-value/src/extract/item/exif.rs
Normal file
93
crates/pile-value/src/extract/item/exif.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::BufReader,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{value::{Item, PileValue, SyncReadBridge}, extract::traits::ObjectExtractor};
|
||||
|
||||
pub struct ExifExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl ExifExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_fields = tokio::task::spawn_blocking(move || {
|
||||
let mut br = BufReader::new(reader);
|
||||
let exif = exif::Reader::new()
|
||||
.read_from_container(&mut br)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let fields: Vec<(String, String)> = exif
|
||||
.fields()
|
||||
.map(|f| {
|
||||
(
|
||||
f.tag.to_string(),
|
||||
f.display_value().with_unit(&exif).to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok::<_, std::io::Error>(fields)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_fields = match raw_fields {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
trace!(message = "Could not process exif", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue> = HashMap::new();
|
||||
|
||||
for (tag_name, value) in raw_fields {
|
||||
let Some(label) = tag_to_label(&tag_name) else {
|
||||
continue;
|
||||
};
|
||||
// First occurrence wins (PRIMARY IFD comes before THUMBNAIL)
|
||||
output
|
||||
.entry(label)
|
||||
.or_insert_with(|| PileValue::String(Arc::new(value.into())));
|
||||
}
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
fn tag_to_label(tag: &str) -> Option<Label> {
|
||||
let sanitized: String = tag
|
||||
.chars()
|
||||
.map(|c| if c == ' ' { '_' } else { c })
|
||||
.filter(|c| Label::VALID_CHARS.contains(*c))
|
||||
.collect();
|
||||
Label::new(sanitized)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for ExifExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
162
crates/pile-value/src/extract/item/flac.rs
Normal file
162
crates/pile-value/src/extract/item/flac.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
use mime::Mime;
|
||||
use pile_config::Label;
|
||||
use pile_flac::{FlacBlock, FlacReader};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::BufReader,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
value::{Item, PileValue, SyncReadBridge},
|
||||
extract::traits::{ListExtractor, ObjectExtractor},
|
||||
};
|
||||
|
||||
pub struct FlacImagesExtractor {
|
||||
item: Item,
|
||||
}
|
||||
|
||||
impl FlacImagesExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self { item: item.clone() }
|
||||
}
|
||||
|
||||
async fn get_images(&self) -> Result<Vec<PileValue>, std::io::Error> {
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_images = tokio::task::spawn_blocking(move || {
|
||||
let reader = FlacReader::new(BufReader::new(reader));
|
||||
let mut images: Vec<(Mime, Vec<u8>)> = Vec::new();
|
||||
for block in reader {
|
||||
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
|
||||
FlacBlock::Picture(picture) => {
|
||||
images.push((picture.mime, picture.img_data));
|
||||
}
|
||||
FlacBlock::AudioFrame(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok::<_, std::io::Error>(images)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)??;
|
||||
|
||||
Ok(raw_images
|
||||
.into_iter()
|
||||
.map(|(mime, data)| PileValue::Blob {
|
||||
mime,
|
||||
bytes: Arc::new(data),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ListExtractor for FlacImagesExtractor {
|
||||
async fn get<'a>(&'a self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_images().await?.into_iter().nth(idx))
|
||||
}
|
||||
|
||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
||||
Ok(self.get_images().await?.len())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FlacExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
images: Option<PileValue>,
|
||||
}
|
||||
|
||||
impl FlacExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
let is_flac = match item {
|
||||
Item::File { path, .. } => path.to_str().unwrap_or_default().ends_with(".flac"),
|
||||
Item::S3 { key, .. } => key.ends_with(".flac"),
|
||||
};
|
||||
|
||||
let images =
|
||||
is_flac.then(|| PileValue::ListExtractor(Arc::new(FlacImagesExtractor::new(item))));
|
||||
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
images,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let key = match &self.item {
|
||||
Item::File { path, .. } => path.to_str().unwrap_or_default().to_owned(),
|
||||
Item::S3 { key, .. } => key.to_string(),
|
||||
};
|
||||
|
||||
if !key.ends_with(".flac") {
|
||||
let _ = self.output.set(HashMap::new());
|
||||
#[expect(clippy::unwrap_used)]
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_tags = tokio::task::spawn_blocking(move || {
|
||||
let reader = FlacReader::new(BufReader::new(reader));
|
||||
let mut tags: Vec<(String, String)> = Vec::new();
|
||||
for block in reader {
|
||||
match block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? {
|
||||
FlacBlock::VorbisComment(comment) => {
|
||||
for (k, v) in comment.comment.comments {
|
||||
tags.push((k.to_string().to_lowercase(), v.into()));
|
||||
}
|
||||
}
|
||||
FlacBlock::AudioFrame(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok::<_, std::io::Error>(tags)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)??;
|
||||
|
||||
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
|
||||
for (k, v) in raw_tags {
|
||||
if let Some(label) = Label::new(k) {
|
||||
output
|
||||
.entry(label)
|
||||
.or_default()
|
||||
.push(PileValue::String(Arc::new(v.into())));
|
||||
}
|
||||
}
|
||||
let output: HashMap<Label, PileValue> = output
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
|
||||
.collect();
|
||||
|
||||
let _ = self.output.set(output);
|
||||
#[expect(clippy::unwrap_used)]
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for FlacExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
if name.as_str() == "images"
|
||||
&& let Some(ref images) = self.images
|
||||
{
|
||||
return Ok(Some(images.clone()));
|
||||
}
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
let mut fields = self.get_inner().await?.keys().cloned().collect::<Vec<_>>();
|
||||
if self.images.is_some() {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
fields.push(Label::new("images").unwrap());
|
||||
}
|
||||
Ok(fields)
|
||||
}
|
||||
}
|
||||
77
crates/pile-value/src/extract/item/fs.rs
Normal file
77
crates/pile-value/src/extract/item/fs.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Component,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use crate::{value::{Item, PileValue}, extract::traits::ObjectExtractor};
|
||||
|
||||
pub struct FsExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl FsExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let Item::File { path, .. } = &self.item else {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let output = HashMap::from([
|
||||
(
|
||||
Label::new("extension").unwrap(),
|
||||
path.extension()
|
||||
.and_then(|x| x.to_str())
|
||||
.map(|x| PileValue::String(Arc::new(x.into())))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
(
|
||||
Label::new("path").unwrap(),
|
||||
path.to_str()
|
||||
.map(|x| PileValue::String(Arc::new(x.into())))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
(
|
||||
Label::new("segments").unwrap(),
|
||||
path.components()
|
||||
.map(|x| match x {
|
||||
Component::CurDir => Some(".".to_owned()),
|
||||
Component::Normal(x) => x.to_str().map(|x| x.to_owned()),
|
||||
Component::ParentDir => Some("..".to_owned()),
|
||||
Component::RootDir => Some("/".to_owned()),
|
||||
Component::Prefix(x) => x.as_os_str().to_str().map(|x| x.to_owned()),
|
||||
})
|
||||
.map(|x| x.map(|x| PileValue::String(Arc::new(x.into()))))
|
||||
.collect::<Option<Vec<_>>>()
|
||||
.map(|v| PileValue::Array(Arc::new(v)))
|
||||
.unwrap_or(PileValue::Null),
|
||||
),
|
||||
]);
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for FsExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner()?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner()?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
133
crates/pile-value/src/extract/item/id3.rs
Normal file
133
crates/pile-value/src/extract/item/id3.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use id3::Tag;
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
io::BufReader,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
extract::traits::ObjectExtractor,
|
||||
value::{Item, PileValue, SyncReadBridge},
|
||||
};
|
||||
|
||||
pub struct Id3Extractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl Id3Extractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let key = self.item.key();
|
||||
let ext = key.as_str().rsplit('.').next();
|
||||
if !matches!(ext, Some("mp3") | Some("aiff") | Some("aif") | Some("wav")) {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader)))
|
||||
.await
|
||||
{
|
||||
Ok(Ok(tag)) => tag,
|
||||
|
||||
Ok(Err(id3::Error {
|
||||
kind: id3::ErrorKind::NoTag,
|
||||
..
|
||||
})) => {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
|
||||
Ok(Err(id3::Error {
|
||||
kind: id3::ErrorKind::Io(e),
|
||||
..
|
||||
})) => return Err(e),
|
||||
|
||||
Ok(Err(e)) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, Vec<PileValue>> = HashMap::new();
|
||||
for frame in tag.frames() {
|
||||
if let Some(text) = frame.content().text() {
|
||||
let name = frame_id_to_field(frame.id());
|
||||
if let Some(key) = Label::new(name) {
|
||||
output
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push(PileValue::String(Arc::new(text.into())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let output = output
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, PileValue::Array(Arc::new(v))))
|
||||
.collect();
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
/// Map an ID3 frame ID to the equivalent Vorbis Comment field name.
|
||||
/// Falls back to the lowercased frame ID if no mapping exists.
|
||||
fn frame_id_to_field(id: &str) -> Cow<'static, str> {
|
||||
match id {
|
||||
// spell:off
|
||||
"TIT2" => Cow::Borrowed("title"),
|
||||
"TIT1" => Cow::Borrowed("grouping"),
|
||||
"TIT3" => Cow::Borrowed("subtitle"),
|
||||
"TPE1" => Cow::Borrowed("artist"),
|
||||
"TPE2" => Cow::Borrowed("albumartist"),
|
||||
"TPE3" => Cow::Borrowed("conductor"),
|
||||
"TOPE" => Cow::Borrowed("originalartist"),
|
||||
"TALB" => Cow::Borrowed("album"),
|
||||
"TOAL" => Cow::Borrowed("originalalbum"),
|
||||
"TRCK" => Cow::Borrowed("tracknumber"),
|
||||
"TPOS" => Cow::Borrowed("discnumber"),
|
||||
"TSST" => Cow::Borrowed("discsubtitle"),
|
||||
"TDRC" | "TYER" => Cow::Borrowed("date"),
|
||||
"TDOR" | "TORY" => Cow::Borrowed("originaldate"),
|
||||
"TCON" => Cow::Borrowed("genre"),
|
||||
"TCOM" => Cow::Borrowed("composer"),
|
||||
"TEXT" => Cow::Borrowed("lyricist"),
|
||||
"TPUB" => Cow::Borrowed("label"),
|
||||
"TSRC" => Cow::Borrowed("isrc"),
|
||||
"TBPM" => Cow::Borrowed("bpm"),
|
||||
"TLAN" => Cow::Borrowed("language"),
|
||||
"TMED" => Cow::Borrowed("media"),
|
||||
"TMOO" => Cow::Borrowed("mood"),
|
||||
"TCOP" => Cow::Borrowed("copyright"),
|
||||
"TENC" => Cow::Borrowed("encodedby"),
|
||||
"TSSE" => Cow::Borrowed("encodersettings"),
|
||||
"TSOA" => Cow::Borrowed("albumsort"),
|
||||
"TSOP" => Cow::Borrowed("artistsort"),
|
||||
"TSOT" => Cow::Borrowed("titlesort"),
|
||||
"MVNM" => Cow::Borrowed("movement"),
|
||||
"MVIN" => Cow::Borrowed("movementnumber"),
|
||||
_ => Cow::Owned(id.to_lowercase()),
|
||||
// spell:on
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for Id3Extractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
99
crates/pile-value/src/extract/item/mod.rs
Normal file
99
crates/pile-value/src/extract/item/mod.rs
Normal file
@@ -0,0 +1,99 @@
|
||||
mod flac;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub use flac::*;
|
||||
|
||||
mod id3;
|
||||
pub use id3::*;
|
||||
|
||||
mod fs;
|
||||
pub use fs::*;
|
||||
|
||||
mod epub;
|
||||
pub use epub::*;
|
||||
|
||||
mod exif;
|
||||
pub use exif::*;
|
||||
|
||||
mod pdf;
|
||||
pub use pdf::*;
|
||||
|
||||
mod toml;
|
||||
use pile_config::Label;
|
||||
pub use toml::*;
|
||||
|
||||
mod sidecar;
|
||||
pub use sidecar::*;
|
||||
|
||||
use crate::{
|
||||
extract::{misc::MapExtractor, traits::ObjectExtractor},
|
||||
value::{Item, PileValue},
|
||||
};
|
||||
|
||||
pub struct ItemExtractor {
|
||||
inner: MapExtractor,
|
||||
}
|
||||
|
||||
impl ItemExtractor {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
pub fn new(item: &Item) -> Self {
|
||||
let inner = MapExtractor {
|
||||
inner: HashMap::from([
|
||||
(
|
||||
Label::new("flac").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(FlacExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("id3").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(Id3Extractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("fs").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(FsExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("epub").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(EpubExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("exif").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(ExifExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("pdf").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(PdfExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("toml").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("sidecar").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(SidecarExtractor::new(item))),
|
||||
),
|
||||
]),
|
||||
};
|
||||
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for ItemExtractor {
|
||||
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
self.inner.field(name).await
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
return Ok(vec![
|
||||
Label::new("flac").unwrap(),
|
||||
Label::new("id3").unwrap(),
|
||||
Label::new("fs").unwrap(),
|
||||
Label::new("epub").unwrap(),
|
||||
Label::new("exif").unwrap(),
|
||||
Label::new("pdf").unwrap(),
|
||||
Label::new("sidecar").unwrap(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
58
crates/pile-value/src/extract/item/pdf/mod.rs
Normal file
58
crates/pile-value/src/extract/item/pdf/mod.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use pile_config::Label;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(feature = "pdfium")]
|
||||
mod pdf_pages;
|
||||
#[cfg(feature = "pdfium")]
|
||||
pub use pdf_pages::*;
|
||||
|
||||
mod pdf_meta;
|
||||
pub use pdf_meta::*;
|
||||
|
||||
mod pdf_text;
|
||||
pub use pdf_text::*;
|
||||
|
||||
use crate::{value::{Item, PileValue}, extract::traits::ObjectExtractor};
|
||||
|
||||
pub struct PdfExtractor {
|
||||
text: Arc<PdfTextExtractor>,
|
||||
meta: Arc<PdfMetaExtractor>,
|
||||
#[cfg(feature = "pdfium")]
|
||||
pages: Arc<PdfPagesExtractor>,
|
||||
}
|
||||
|
||||
impl PdfExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
text: Arc::new(PdfTextExtractor::new(item)),
|
||||
meta: Arc::new(PdfMetaExtractor::new(item)),
|
||||
#[cfg(feature = "pdfium")]
|
||||
pages: Arc::new(PdfPagesExtractor::new(item)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for PdfExtractor {
|
||||
async fn field(&self, name: &pile_config::Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
match name.as_str() {
|
||||
"text" => self.text.field(name).await,
|
||||
"meta" => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
||||
#[cfg(feature = "pdfium")]
|
||||
"pages" => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(vec![
|
||||
Label::new("text").unwrap(),
|
||||
Label::new("meta").unwrap(),
|
||||
#[cfg(feature = "pdfium")]
|
||||
Label::new("cover").unwrap(),
|
||||
#[cfg(feature = "pdfium")]
|
||||
Label::new("pages").unwrap(),
|
||||
])
|
||||
}
|
||||
}
|
||||
129
crates/pile-value/src/extract/item/pdf/pdf_meta.rs
Normal file
129
crates/pile-value/src/extract/item/pdf/pdf_meta.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
use pdf::file::FileOptions;
|
||||
use pdf::primitive::{Date, TimeRel};
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::BufReader,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{extract::traits::ObjectExtractor, value::{Item, PileValue, SyncReadBridge}};
|
||||
|
||||
pub struct PdfMetaExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl PdfMetaExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_meta = tokio::task::spawn_blocking(move || {
|
||||
let mut bytes = Vec::new();
|
||||
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
|
||||
|
||||
let file = match FileOptions::cached().load(bytes) {
|
||||
Ok(x) => x,
|
||||
Err(pdf::PdfError::Io { source }) => return Err(source),
|
||||
Err(error) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
error.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let page_count = file.num_pages();
|
||||
|
||||
let mut meta: Vec<(&'static str, Option<String>)> = Vec::new();
|
||||
|
||||
if let Some(info) = &file.trailer.info_dict {
|
||||
use pdf::primitive::PdfString;
|
||||
let fields: &[(&'static str, Option<&PdfString>)] = &[
|
||||
("title", info.title.as_ref()),
|
||||
("author", info.author.as_ref()),
|
||||
("subject", info.subject.as_ref()),
|
||||
("keywords", info.keywords.as_ref()),
|
||||
("creator", info.creator.as_ref()),
|
||||
("producer", info.producer.as_ref()),
|
||||
];
|
||||
|
||||
for (key, val) in fields {
|
||||
meta.push((key, val.map(|s| s.to_string_lossy())));
|
||||
}
|
||||
|
||||
meta.push((
|
||||
"creation_date",
|
||||
info.creation_date.as_ref().map(format_date),
|
||||
));
|
||||
meta.push(("mod_date", info.mod_date.as_ref().map(format_date)));
|
||||
}
|
||||
|
||||
Ok::<_, std::io::Error>((page_count, meta))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let (page_count, raw_meta) = match raw_meta {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue> = HashMap::new();
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
output.insert(
|
||||
Label::new("pages").unwrap(),
|
||||
PileValue::U64(page_count as u64),
|
||||
);
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
for (key, val) in raw_meta {
|
||||
let label = Label::new(key).unwrap();
|
||||
let value = match val {
|
||||
Some(s) => PileValue::String(Arc::new(s.into())),
|
||||
None => PileValue::Null,
|
||||
};
|
||||
output.insert(label, value);
|
||||
}
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
fn format_date(d: &Date) -> String {
|
||||
let tz = match d.rel {
|
||||
TimeRel::Universal => "Z".to_owned(),
|
||||
TimeRel::Later => format!("+{:02}:{:02}", d.tz_hour, d.tz_minute),
|
||||
TimeRel::Earlier => format!("-{:02}:{:02}", d.tz_hour, d.tz_minute),
|
||||
};
|
||||
format!(
|
||||
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}{}",
|
||||
d.year, d.month, d.day, d.hour, d.minute, d.second, tz
|
||||
)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for PdfMetaExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
107
crates/pile-value/src/extract/item/pdf/pdf_pages.rs
Normal file
107
crates/pile-value/src/extract/item/pdf/pdf_pages.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
use image::ImageFormat;
|
||||
use pdfium_render::prelude::*;
|
||||
use std::{
|
||||
io::{BufReader, Cursor},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{
|
||||
extract::traits::ListExtractor,
|
||||
value::{Item, PileValue, SyncReadBridge},
|
||||
};
|
||||
|
||||
pub struct PdfPagesExtractor {
|
||||
item: Item,
|
||||
}
|
||||
|
||||
impl PdfPagesExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self { item: item.clone() }
|
||||
}
|
||||
|
||||
async fn get_bytes(&self) -> Result<Vec<u8>, std::io::Error> {
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut b = Vec::new();
|
||||
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut b)?;
|
||||
Ok::<_, std::io::Error>(b)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ListExtractor for PdfPagesExtractor {
|
||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
||||
let bytes = self.get_bytes().await?;
|
||||
let png = tokio::task::spawn_blocking(move || {
|
||||
let pdfium = Pdfium::default();
|
||||
let doc = pdfium
|
||||
.load_pdf_from_byte_slice(&bytes, None)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
if idx >= doc.pages().len() as usize {
|
||||
return Ok::<_, std::io::Error>(None);
|
||||
}
|
||||
let render_config = PdfRenderConfig::new().set_target_width(1024);
|
||||
let page = doc
|
||||
.pages()
|
||||
.get(idx as u16)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
let image = page
|
||||
.render_with_config(&render_config)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?
|
||||
.as_image();
|
||||
let mut png_bytes = Vec::new();
|
||||
image
|
||||
.write_to(&mut Cursor::new(&mut png_bytes), ImageFormat::Png)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
Ok(Some(png_bytes))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let value = match png {
|
||||
Ok(None) => return Ok(None),
|
||||
Ok(Some(bytes)) => PileValue::Blob {
|
||||
mime: mime::IMAGE_PNG,
|
||||
bytes: Arc::new(bytes),
|
||||
},
|
||||
Err(error) => {
|
||||
trace!(message = "Could not render pdf page", ?error, idx, key = ?self.item.key());
|
||||
PileValue::Null
|
||||
}
|
||||
};
|
||||
Ok(Some(value))
|
||||
}
|
||||
|
||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
||||
let bytes = self.get_bytes().await?;
|
||||
let count = tokio::task::spawn_blocking(move || {
|
||||
let pdfium = Pdfium::default();
|
||||
let doc = pdfium
|
||||
.load_pdf_from_byte_slice(&bytes, None)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
Ok::<_, std::io::Error>(doc.pages().len() as usize)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
match count {
|
||||
Ok(n) => Ok(n),
|
||||
Err(error) => {
|
||||
trace!(message = "Could not read pdf page count", ?error, key = ?self.item.key());
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Override, extracting all pages is very slow,
|
||||
// and we can't display binary in json anyway
|
||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
||||
Ok(serde_json::Value::String(format!(
|
||||
"<PdfPages ({} pages)>",
|
||||
self.len().await?
|
||||
)))
|
||||
}
|
||||
}
|
||||
109
crates/pile-value/src/extract/item/pdf/pdf_text.rs
Normal file
109
crates/pile-value/src/extract/item/pdf/pdf_text.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
use pdf::content::{Op, TextDrawAdjusted};
|
||||
use pdf::file::FileOptions;
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::BufReader,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{extract::traits::ObjectExtractor, value::{Item, PileValue, SyncReadBridge}};
|
||||
|
||||
pub struct PdfTextExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl PdfTextExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_text = tokio::task::spawn_blocking(move || {
|
||||
let mut bytes = Vec::new();
|
||||
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
|
||||
|
||||
let file = match FileOptions::cached().load(bytes) {
|
||||
Ok(x) => x,
|
||||
Err(pdf::PdfError::Io { source }) => return Err(source),
|
||||
Err(error) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
error.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let mut text_parts: Vec<String> = Vec::new();
|
||||
|
||||
for page in file.pages() {
|
||||
let page = page.map_err(|e| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
|
||||
})?;
|
||||
|
||||
if let Some(content) = &page.contents {
|
||||
let ops = content.operations(&file.resolver()).map_err(|e| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
|
||||
})?;
|
||||
|
||||
for op in ops {
|
||||
match op {
|
||||
Op::TextDraw { text } => {
|
||||
text_parts.push(text.to_string_lossy());
|
||||
}
|
||||
Op::TextDrawAdjusted { array } => {
|
||||
for item in array {
|
||||
if let TextDrawAdjusted::Text(text) = item {
|
||||
text_parts.push(text.to_string_lossy());
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, std::io::Error>(text_parts.join(" "))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_text = match raw_text {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let output = HashMap::from([(
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::String(Arc::new(raw_text.into())),
|
||||
)]);
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for PdfTextExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
45
crates/pile-value/src/extract/item/sidecar.rs
Normal file
45
crates/pile-value/src/extract/item/sidecar.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use pile_config::Label;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use crate::{
|
||||
value::{Item, PileValue},
|
||||
extract::traits::ObjectExtractor,
|
||||
};
|
||||
use super::TomlExtractor;
|
||||
|
||||
pub struct SidecarExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<Option<TomlExtractor>>,
|
||||
}
|
||||
|
||||
impl SidecarExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for SidecarExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
match self
|
||||
.output
|
||||
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
||||
{
|
||||
Some(x) => Ok(x.field(name).await?),
|
||||
None => Ok(Some(PileValue::Null)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
match self
|
||||
.output
|
||||
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
||||
{
|
||||
Some(x) => Ok(x.fields().await?),
|
||||
None => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
78
crates/pile-value/src/extract/item/toml.rs
Normal file
78
crates/pile-value/src/extract/item/toml.rs
Normal file
@@ -0,0 +1,78 @@
|
||||
use pile_config::Label;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
extract::traits::ObjectExtractor,
|
||||
value::{AsyncReader, Item, PileValue},
|
||||
};
|
||||
|
||||
fn toml_to_pile(value: toml::Value) -> PileValue {
|
||||
match value {
|
||||
toml::Value::String(s) => PileValue::String(Arc::new(s.into())),
|
||||
toml::Value::Integer(i) => PileValue::String(Arc::new(i.to_string().into())),
|
||||
toml::Value::Float(f) => PileValue::String(Arc::new(f.to_string().into())),
|
||||
toml::Value::Boolean(b) => PileValue::String(Arc::new(b.to_string().into())),
|
||||
toml::Value::Datetime(d) => PileValue::String(Arc::new(d.to_string().into())),
|
||||
toml::Value::Array(a) => {
|
||||
PileValue::Array(Arc::new(a.into_iter().map(toml_to_pile).collect()))
|
||||
}
|
||||
toml::Value::Table(_) => PileValue::Null,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TomlExtractor {
|
||||
item: Item,
|
||||
output: OnceLock<HashMap<Label, PileValue>>,
|
||||
}
|
||||
|
||||
impl TomlExtractor {
|
||||
pub fn new(item: &Item) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
output: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_inner(&self) -> Result<&HashMap<Label, PileValue>, std::io::Error> {
|
||||
if let Some(x) = self.output.get() {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let mut reader = match self.item.read().await {
|
||||
Ok(r) => r,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let bytes = reader.read_to_end().await?;
|
||||
let toml: toml::Value = match toml::from_slice(&bytes) {
|
||||
Ok(x) => x,
|
||||
Err(_) => return Ok(self.output.get_or_init(HashMap::new)),
|
||||
};
|
||||
|
||||
let output: HashMap<Label, PileValue> = match toml {
|
||||
toml::Value::Table(t) => t
|
||||
.into_iter()
|
||||
.filter_map(|(k, v)| Label::new(&k).map(|label| (label, toml_to_pile(v))))
|
||||
.collect(),
|
||||
_ => HashMap::new(),
|
||||
};
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for TomlExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.get_inner().await?.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
24
crates/pile-value/src/extract/misc/list.rs
Normal file
24
crates/pile-value/src/extract/misc/list.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{extract::traits::ListExtractor, value::PileValue};
|
||||
|
||||
pub struct ArrayExtractor {
|
||||
inner: Arc<Vec<PileValue>>,
|
||||
}
|
||||
|
||||
impl ArrayExtractor {
|
||||
pub fn new(inner: Arc<Vec<PileValue>>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ListExtractor for ArrayExtractor {
|
||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.inner.get(idx).cloned())
|
||||
}
|
||||
|
||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
||||
Ok(self.inner.len())
|
||||
}
|
||||
}
|
||||
27
crates/pile-value/src/extract/misc/map.rs
Normal file
27
crates/pile-value/src/extract/misc/map.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use pile_config::Label;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{extract::traits::ObjectExtractor, value::PileValue};
|
||||
|
||||
pub struct MapExtractor {
|
||||
pub inner: HashMap<Label, PileValue>,
|
||||
}
|
||||
|
||||
impl Default for MapExtractor {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for MapExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.inner.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(self.inner.keys().cloned().collect())
|
||||
}
|
||||
}
|
||||
8
crates/pile-value/src/extract/misc/mod.rs
Normal file
8
crates/pile-value/src/extract/misc/mod.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
mod list;
|
||||
pub use list::*;
|
||||
|
||||
mod vec;
|
||||
pub use vec::*;
|
||||
|
||||
mod map;
|
||||
pub use map::*;
|
||||
22
crates/pile-value/src/extract/misc/vec.rs
Normal file
22
crates/pile-value/src/extract/misc/vec.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use crate::{extract::traits::ListExtractor, value::PileValue};
|
||||
|
||||
pub struct VecExtractor {
|
||||
pub inner: Vec<PileValue>,
|
||||
}
|
||||
|
||||
impl Default for VecExtractor {
|
||||
fn default() -> Self {
|
||||
Self { inner: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ListExtractor for VecExtractor {
|
||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(self.inner.get(idx).cloned())
|
||||
}
|
||||
|
||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
||||
Ok(self.inner.len())
|
||||
}
|
||||
}
|
||||
4
crates/pile-value/src/extract/mod.rs
Normal file
4
crates/pile-value/src/extract/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
pub mod item;
|
||||
pub mod misc;
|
||||
pub mod string;
|
||||
pub mod traits;
|
||||
51
crates/pile-value/src/extract/string.rs
Normal file
51
crates/pile-value/src/extract/string.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use pile_config::Label;
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{extract::traits::ObjectExtractor, value::PileValue};
|
||||
|
||||
pub struct StringExtractor {
|
||||
item: Arc<SmartString<LazyCompact>>,
|
||||
}
|
||||
|
||||
impl StringExtractor {
|
||||
pub fn new(item: &Arc<SmartString<LazyCompact>>) -> Self {
|
||||
Self { item: item.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for StringExtractor {
|
||||
async fn field(&self, name: &Label) -> Result<Option<PileValue>, std::io::Error> {
|
||||
Ok(match name.as_str() {
|
||||
"trim" => Some(PileValue::String(Arc::new(
|
||||
self.item.as_str().trim().into(),
|
||||
))),
|
||||
|
||||
"upper" => Some(PileValue::String(Arc::new(
|
||||
self.item.as_str().to_lowercase().into(),
|
||||
))),
|
||||
|
||||
"lower" => Some(PileValue::String(Arc::new(
|
||||
self.item.as_str().to_uppercase().into(),
|
||||
))),
|
||||
|
||||
"nonempty" => Some(match self.item.is_empty() {
|
||||
true => PileValue::Null,
|
||||
false => PileValue::String(self.item.clone()),
|
||||
}),
|
||||
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
return Ok(vec![
|
||||
Label::new("trim").unwrap(),
|
||||
Label::new("upper").unwrap(),
|
||||
Label::new("lower").unwrap(),
|
||||
Label::new("nonempty").unwrap(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
68
crates/pile-value/src/extract/traits.rs
Normal file
68
crates/pile-value/src/extract/traits.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
/// An attachment that extracts metadata from an [Item].
|
||||
///
|
||||
/// Metadata is exposed as an immutable map of {label: value},
|
||||
/// much like a json object.
|
||||
#[async_trait::async_trait]
|
||||
pub trait ObjectExtractor: Send + Sync {
|
||||
/// Get the field at `name` from `item`.
|
||||
/// - returns `None` if `name` is not a valid field
|
||||
/// - returns `Some(Null)` if `name` is not available
|
||||
async fn field(
|
||||
&self,
|
||||
name: &pile_config::Label,
|
||||
) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
||||
|
||||
/// Return all fields in this extractor.
|
||||
/// `Self::field` must return [Some] for all these keys
|
||||
/// and [None] for all others.
|
||||
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
|
||||
|
||||
/// Convert this to a JSON value.
|
||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
||||
let keys = self.fields().await?;
|
||||
let mut map = serde_json::Map::new();
|
||||
for k in &keys {
|
||||
let v = match self.field(k).await? {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
|
||||
}
|
||||
|
||||
Ok(serde_json::Value::Object(map))
|
||||
}
|
||||
}
|
||||
|
||||
/// An attachment that extracts metadata from an [Item].
|
||||
///
|
||||
/// Metadata is exposed as an immutable list of values.
|
||||
#[async_trait::async_trait]
|
||||
pub trait ListExtractor: Send + Sync {
|
||||
/// Get the item at index `idx`.
|
||||
/// Indices start at zero, and must be consecutive.
|
||||
/// - returns `None` if `idx` is out of range
|
||||
/// - returns `Some(Null)` if `None` is at `idx`
|
||||
async fn get(&self, idx: usize) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
||||
|
||||
async fn len(&self) -> Result<usize, std::io::Error>;
|
||||
|
||||
async fn is_empty(&self) -> Result<bool, std::io::Error> {
|
||||
Ok(self.len().await? == 0)
|
||||
}
|
||||
|
||||
/// Convert this list to a JSON value.
|
||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
||||
let len = self.len().await?;
|
||||
let mut list = Vec::with_capacity(len);
|
||||
for i in 0..len {
|
||||
#[expect(clippy::expect_used)]
|
||||
let v = self
|
||||
.get(i)
|
||||
.await?
|
||||
.expect("value must be present according to length");
|
||||
list.push(Box::pin(v.to_json()).await?);
|
||||
}
|
||||
|
||||
Ok(serde_json::Value::Array(list))
|
||||
}
|
||||
}
|
||||
3
crates/pile-value/src/lib.rs
Normal file
3
crates/pile-value/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod extract;
|
||||
pub mod source;
|
||||
pub mod value;
|
||||
131
crates/pile-value/src/source/dir.rs
Normal file
131
crates/pile-value/src/source/dir.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::Label;
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::{
|
||||
source::{DataSource, misc::path_ts_latest},
|
||||
value::Item,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DirDataSource {
|
||||
pub name: Label,
|
||||
pub dir: PathBuf,
|
||||
|
||||
pub sidecars: bool,
|
||||
}
|
||||
|
||||
impl DirDataSource {
|
||||
pub fn new(name: &Label, dir: PathBuf, sidecars: bool) -> Self {
|
||||
Self {
|
||||
name: name.clone(),
|
||||
dir,
|
||||
sidecars,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DataSource for Arc<DirDataSource> {
|
||||
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
|
||||
let key = match key.parse::<PathBuf>() {
|
||||
Ok(x) => self.dir.join(x),
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
|
||||
if !key.is_file() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Ignore toml files if sidecars are enabled
|
||||
if self.sidecars && key.extension().and_then(|x| x.to_str()) == Some("toml") {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
return Ok(Some(Item::File {
|
||||
source: Arc::clone(self),
|
||||
mime: mime_guess::from_path(&key).first_or_octet_stream(),
|
||||
path: key.clone(),
|
||||
sidecar: self.sidecars.then(|| {
|
||||
Box::new(Item::File {
|
||||
source: Arc::clone(self),
|
||||
mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(),
|
||||
path: key.with_extension("toml"),
|
||||
sidecar: None,
|
||||
})
|
||||
}),
|
||||
}));
|
||||
}
|
||||
|
||||
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(64);
|
||||
let source = Arc::clone(self);
|
||||
|
||||
let dir = self.dir.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
for entry in WalkDir::new(dir) {
|
||||
let entry = match entry {
|
||||
Err(e) => {
|
||||
let msg = format!("walkdir error: {e:?}");
|
||||
let err = e.into_io_error().unwrap_or(std::io::Error::other(msg));
|
||||
if tx.blocking_send(Err(err)).is_err() {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(e) => e,
|
||||
};
|
||||
|
||||
if entry.file_type().is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.into_path();
|
||||
|
||||
let item = match path.extension().and_then(|x| x.to_str()) {
|
||||
None => continue,
|
||||
Some("toml") if source.sidecars => continue,
|
||||
Some(_) => Item::File {
|
||||
source: Arc::clone(&source),
|
||||
mime: mime_guess::from_path(&path).first_or_octet_stream(),
|
||||
path: path.clone(),
|
||||
|
||||
sidecar: source.sidecars.then(|| {
|
||||
Box::new(Item::File {
|
||||
source: Arc::clone(&source),
|
||||
mime: mime_guess::from_path(path.with_extension("toml"))
|
||||
.first_or_octet_stream(),
|
||||
path: path.with_extension("toml"),
|
||||
sidecar: None,
|
||||
})
|
||||
}),
|
||||
},
|
||||
};
|
||||
|
||||
if tx.blocking_send(Ok(item)).is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ReceiverStream::new(rx)
|
||||
}
|
||||
|
||||
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut ts: Option<DateTime<Utc>> = None;
|
||||
|
||||
if !self.dir.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let new = path_ts_latest(&self.dir)?;
|
||||
match (ts, new) {
|
||||
(_, None) => {}
|
||||
(None, Some(new)) => ts = Some(new),
|
||||
(Some(old), Some(new)) => ts = Some(old.max(new)),
|
||||
};
|
||||
|
||||
return Ok(ts);
|
||||
}
|
||||
}
|
||||
121
crates/pile-value/src/source/misc.rs
Normal file
121
crates/pile-value/src/source/misc.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
/// Returns the age of a path as a [DateTime].
|
||||
/// - If the path doesn't exist, returns [None]
|
||||
/// - If it's a file, returns the modified time
|
||||
/// - If it's a directory, returns the LATEST modified time of all files within
|
||||
pub fn path_ts_latest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let path = path.as_ref();
|
||||
if !path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let metadata = fs::metadata(path)?;
|
||||
|
||||
if metadata.is_file() {
|
||||
let modified = metadata.modified()?;
|
||||
Ok(Some(modified.into()))
|
||||
} else if metadata.is_dir() {
|
||||
find_latest_modified(path)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the age of a path as a [DateTime].
|
||||
/// - If the path doesn't exist, returns [None]
|
||||
/// - If it's a file, returns the modified time
|
||||
/// - If it's a directory, returns the EARLIEST modified time of all files within
|
||||
pub fn path_ts_earliest(path: impl AsRef<Path>) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let path = path.as_ref();
|
||||
if !path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let metadata = fs::metadata(path)?;
|
||||
|
||||
if metadata.is_file() {
|
||||
let modified = metadata.modified()?;
|
||||
Ok(Some(modified.into()))
|
||||
} else if metadata.is_dir() {
|
||||
find_earliest_modified(path)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_latest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut latest: Option<DateTime<Utc>> = None;
|
||||
|
||||
// Include the directory's own modification time
|
||||
let dir_metadata = fs::metadata(dir)?;
|
||||
if let Ok(modified) = dir_metadata.modified() {
|
||||
let dt: DateTime<Utc> = modified.into();
|
||||
latest = Some(dt);
|
||||
}
|
||||
|
||||
let entries = fs::read_dir(dir)?;
|
||||
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
let metadata = entry.metadata()?;
|
||||
|
||||
if metadata.is_file() {
|
||||
if let Ok(modified) = metadata.modified() {
|
||||
let dt: DateTime<Utc> = modified.into();
|
||||
latest = Some(match latest {
|
||||
Some(prev) if prev > dt => prev,
|
||||
_ => dt,
|
||||
});
|
||||
}
|
||||
} else if metadata.is_dir()
|
||||
&& let Some(dir_latest) = find_latest_modified(&path)?
|
||||
{
|
||||
latest = Some(match latest {
|
||||
Some(prev) if prev > dir_latest => prev,
|
||||
_ => dir_latest,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(latest);
|
||||
}
|
||||
|
||||
fn find_earliest_modified(dir: &Path) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut earliest: Option<DateTime<Utc>> = None;
|
||||
|
||||
// Include the directory's own modification time
|
||||
let dir_metadata = fs::metadata(dir)?;
|
||||
if let Ok(modified) = dir_metadata.modified() {
|
||||
let dt: DateTime<Utc> = modified.into();
|
||||
earliest = Some(dt);
|
||||
}
|
||||
|
||||
let entries = fs::read_dir(dir)?;
|
||||
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
let metadata = entry.metadata()?;
|
||||
|
||||
if metadata.is_file() {
|
||||
if let Ok(modified) = metadata.modified() {
|
||||
let dt: DateTime<Utc> = modified.into();
|
||||
earliest = Some(match earliest {
|
||||
Some(prev) if prev < dt => prev,
|
||||
_ => dt,
|
||||
});
|
||||
}
|
||||
} else if metadata.is_dir()
|
||||
&& let Some(dir_earliest) = find_earliest_modified(&path)?
|
||||
{
|
||||
earliest = Some(match earliest {
|
||||
Some(prev) if prev < dir_earliest => prev,
|
||||
_ => dir_earliest,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(earliest);
|
||||
}
|
||||
27
crates/pile-value/src/source/mod.rs
Normal file
27
crates/pile-value/src/source/mod.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
mod dir;
|
||||
pub use dir::*;
|
||||
|
||||
mod s3;
|
||||
pub use s3::*;
|
||||
|
||||
pub mod misc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
/// A read-only set of [Item]s.
|
||||
pub trait DataSource {
|
||||
/// Get an item from this datasource
|
||||
fn get(
|
||||
&self,
|
||||
key: &str,
|
||||
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
|
||||
|
||||
/// Iterate over all items in this source in an arbitrary order
|
||||
fn iter(&self) -> ReceiverStream<Result<crate::value::Item, std::io::Error>>;
|
||||
|
||||
/// Return the time of the latest change to the data in this source
|
||||
fn latest_change(
|
||||
&self,
|
||||
) -> impl Future<Output = Result<Option<DateTime<Utc>>, std::io::Error>> + Send;
|
||||
}
|
||||
255
crates/pile-value/src/source/s3.rs
Normal file
255
crates/pile-value/src/source/s3.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{Label, S3Credentials};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::sync::Arc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::{source::DataSource, value::Item};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct S3DataSource {
|
||||
pub name: Label,
|
||||
pub bucket: SmartString<LazyCompact>,
|
||||
pub prefix: Option<SmartString<LazyCompact>>,
|
||||
pub sidecars: bool,
|
||||
pub client: Arc<aws_sdk_s3::Client>,
|
||||
}
|
||||
|
||||
impl S3DataSource {
|
||||
pub fn new(
|
||||
name: &Label,
|
||||
bucket: String,
|
||||
prefix: Option<String>,
|
||||
endpoint: Option<String>,
|
||||
region: String,
|
||||
credentials: &S3Credentials,
|
||||
sidecars: bool,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let client = {
|
||||
let creds = Credentials::new(
|
||||
&credentials.access_key_id,
|
||||
&credentials.secret_access_key,
|
||||
None,
|
||||
None,
|
||||
"pile",
|
||||
);
|
||||
|
||||
let mut s3_config = aws_sdk_s3::config::Builder::new()
|
||||
.behavior_version(BehaviorVersion::latest())
|
||||
.region(Region::new(region))
|
||||
.credentials_provider(creds);
|
||||
|
||||
if let Some(ep) = endpoint {
|
||||
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
|
||||
}
|
||||
|
||||
aws_sdk_s3::Client::from_conf(s3_config.build())
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
name: name.clone(),
|
||||
bucket: bucket.into(),
|
||||
prefix: prefix.map(|x| x.into()),
|
||||
sidecars,
|
||||
client: Arc::new(client),
|
||||
})
|
||||
}
|
||||
|
||||
async fn find_sidecar_key(&self, key: &str) -> Option<SmartString<LazyCompact>> {
|
||||
// First try {key}.toml
|
||||
let full_toml = format!("{key}.toml");
|
||||
if self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(&full_toml)
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return Some(full_toml.into());
|
||||
}
|
||||
|
||||
// Then try {key-with-extension-stripped}.toml
|
||||
let stripped = std::path::Path::new(key).with_extension("toml");
|
||||
if let Some(stripped_str) = stripped.to_str()
|
||||
&& stripped_str != full_toml.as_str()
|
||||
&& self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(stripped_str)
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return Some(stripped_str.into());
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
|
||||
let key: SmartString<LazyCompact> = key.into();
|
||||
let mime = mime_guess::from_path(key.as_str()).first_or_octet_stream();
|
||||
|
||||
let sidecar = if self.sidecars {
|
||||
self.find_sidecar_key(key.as_str())
|
||||
.await
|
||||
.map(|sidecar_key| {
|
||||
Box::new(Item::S3 {
|
||||
source: Arc::clone(self),
|
||||
mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(),
|
||||
key: sidecar_key,
|
||||
sidecar: None,
|
||||
})
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Item::S3 {
|
||||
source: Arc::clone(self),
|
||||
mime,
|
||||
key,
|
||||
sidecar,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DataSource for Arc<S3DataSource> {
|
||||
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
|
||||
if self.sidecars && key.ends_with(".toml") {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let result = self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(key)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(sdk_err) => {
|
||||
let not_found = sdk_err
|
||||
.as_service_error()
|
||||
.map(|e| e.is_not_found())
|
||||
.unwrap_or(false);
|
||||
if not_found {
|
||||
return Ok(None);
|
||||
}
|
||||
Err(std::io::Error::other(sdk_err))
|
||||
}
|
||||
Ok(_) => Ok(Some(self.make_item(key).await)),
|
||||
}
|
||||
}
|
||||
|
||||
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(64);
|
||||
let source = Arc::clone(self);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut continuation_token: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let mut req = source
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(source.bucket.as_str());
|
||||
|
||||
if let Some(prefix) = &source.prefix {
|
||||
req = req.prefix(prefix.as_str());
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
req = req.continuation_token(token);
|
||||
}
|
||||
|
||||
let resp = match req.send().await {
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(std::io::Error::other(e))).await;
|
||||
break;
|
||||
}
|
||||
Ok(resp) => resp,
|
||||
};
|
||||
|
||||
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
|
||||
let is_truncated = resp.is_truncated().unwrap_or(false);
|
||||
|
||||
for obj in resp.contents() {
|
||||
let key = match obj.key() {
|
||||
Some(k) => k.to_owned(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if source.sidecars && key.ends_with(".toml") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let item = source.make_item(key).await;
|
||||
|
||||
if tx.send(Ok(item)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if !is_truncated {
|
||||
break;
|
||||
}
|
||||
continuation_token = next_token;
|
||||
}
|
||||
});
|
||||
|
||||
ReceiverStream::new(rx)
|
||||
}
|
||||
|
||||
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut ts: Option<DateTime<Utc>> = None;
|
||||
let mut continuation_token: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str());
|
||||
|
||||
if let Some(prefix) = &self.prefix {
|
||||
req = req.prefix(prefix.as_str());
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
req = req.continuation_token(token);
|
||||
}
|
||||
|
||||
let resp = match req.send().await {
|
||||
Err(_) => return Ok(None),
|
||||
Ok(resp) => resp,
|
||||
};
|
||||
|
||||
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
|
||||
let is_truncated = resp.is_truncated().unwrap_or(false);
|
||||
|
||||
for obj in resp.contents() {
|
||||
if let Some(last_modified) = obj.last_modified() {
|
||||
let dt = DateTime::from_timestamp(
|
||||
last_modified.secs(),
|
||||
last_modified.subsec_nanos(),
|
||||
);
|
||||
if let Some(dt) = dt {
|
||||
ts = Some(match ts {
|
||||
None => dt,
|
||||
Some(prev) => prev.max(dt),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !is_truncated {
|
||||
break;
|
||||
}
|
||||
continuation_token = next_token;
|
||||
}
|
||||
|
||||
Ok(ts)
|
||||
}
|
||||
}
|
||||
158
crates/pile-value/src/source/s3reader.rs
Normal file
158
crates/pile-value/src/source/s3reader.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
use aws_sdk_s3::{error::SdkError, operation::get_object::GetObjectError};
|
||||
use mime::Mime;
|
||||
use std::io::{Error as IoError, Seek, SeekFrom, Write};
|
||||
use thiserror::Error;
|
||||
|
||||
use super::S3Client;
|
||||
use crate::retry;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
#[expect(clippy::large_enum_variant)]
|
||||
pub enum S3ReaderError {
|
||||
#[error("sdk error")]
|
||||
SdkError(#[from] SdkError<GetObjectError>),
|
||||
|
||||
#[error("byte stream error")]
|
||||
ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError),
|
||||
|
||||
#[error("i/o error")]
|
||||
IoError(#[from] IoError),
|
||||
}
|
||||
|
||||
/// Provides a [`std::io::Read`]-like interface to an S3 object. \
|
||||
/// This doesn't actually implement [`std::io::Read`] because Read isn't async.
|
||||
///
|
||||
/// Also implements [`std::io::Seek`]
|
||||
pub struct S3Reader {
|
||||
pub(super) client: S3Client,
|
||||
pub(super) bucket: String,
|
||||
pub(super) key: String,
|
||||
|
||||
pub(super) cursor: u64,
|
||||
pub(super) size: u64,
|
||||
pub(super) mime: Mime,
|
||||
}
|
||||
|
||||
impl S3Reader {
|
||||
pub async fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, S3ReaderError> {
|
||||
let len_left = self.size - self.cursor;
|
||||
if len_left == 0 || buf.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)] // TODO: probably fits?
|
||||
let start_byte = usize::try_from(self.cursor).unwrap();
|
||||
|
||||
#[expect(clippy::unwrap_used)] // usize fits in u64
|
||||
let len_to_read = u64::try_from(buf.len()).unwrap().min(len_left);
|
||||
|
||||
#[expect(clippy::unwrap_used)] // must fit, we called min()
|
||||
let len_to_read = usize::try_from(len_to_read).unwrap();
|
||||
|
||||
let end_byte = start_byte + len_to_read - 1;
|
||||
|
||||
let b = retry!(
|
||||
self.client.retries,
|
||||
self.client
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(self.key.as_str())
|
||||
.range(format!("bytes={start_byte}-{end_byte}"))
|
||||
.send()
|
||||
.await
|
||||
)?;
|
||||
|
||||
// Looks like `bytes 31000000-31999999/33921176``
|
||||
// println!("{:?}", b.content_range);
|
||||
|
||||
let mut bytes = b.body.collect().await?.into_bytes();
|
||||
bytes.truncate(len_to_read);
|
||||
let l = bytes.len();
|
||||
|
||||
// Memory to memory writes are infallible
|
||||
#[expect(clippy::unwrap_used)]
|
||||
buf.write_all(&bytes).unwrap();
|
||||
|
||||
// Cannot fail, usize should always fit into u64
|
||||
#[expect(clippy::unwrap_used)]
|
||||
{
|
||||
self.cursor += u64::try_from(l).unwrap();
|
||||
}
|
||||
|
||||
return Ok(len_to_read);
|
||||
}
|
||||
|
||||
pub fn is_done(&self) -> bool {
|
||||
return self.cursor == self.size;
|
||||
}
|
||||
|
||||
pub fn mime(&self) -> &Mime {
|
||||
&self.mime
|
||||
}
|
||||
|
||||
/// Write the entire contents of this reader to `r`.
|
||||
///
|
||||
/// This method always downloads the whole object,
|
||||
/// and always preserves `self.cursor`.
|
||||
pub async fn download<W: Write>(&mut self, r: &mut W) -> Result<(), S3ReaderError> {
|
||||
let pos = self.stream_position()?;
|
||||
|
||||
const BUF_LEN: usize = 10_000_000;
|
||||
#[expect(clippy::unwrap_used)] // Cannot fail
|
||||
let mut buf: Box<[u8; BUF_LEN]> = vec![0u8; BUF_LEN].try_into().unwrap();
|
||||
|
||||
while !self.is_done() {
|
||||
let b = self.read(&mut buf[..]).await?;
|
||||
r.write_all(&buf[0..b])?;
|
||||
}
|
||||
|
||||
self.seek(SeekFrom::Start(pos))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Seek for S3Reader {
|
||||
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
|
||||
match pos {
|
||||
SeekFrom::Start(x) => self.cursor = x.min(self.size - 1),
|
||||
|
||||
// Cannot panic, we handle all cases
|
||||
#[expect(clippy::unwrap_used)]
|
||||
SeekFrom::Current(x) => {
|
||||
if x < 0 {
|
||||
if u64::try_from(x.abs()).unwrap() > self.cursor {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"cannot seek past start",
|
||||
));
|
||||
}
|
||||
self.cursor -= u64::try_from(x.abs()).unwrap();
|
||||
} else {
|
||||
self.cursor += u64::try_from(x).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot panic, we handle all cases
|
||||
#[expect(clippy::unwrap_used)]
|
||||
SeekFrom::End(x) => {
|
||||
if x < 0 {
|
||||
if u64::try_from(x.abs()).unwrap() > self.size {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"cannot seek past start",
|
||||
));
|
||||
}
|
||||
// Cannot fail, is abs
|
||||
self.cursor = self.size - u64::try_from(x.abs()).unwrap();
|
||||
} else {
|
||||
// Cannot fail, is positive
|
||||
self.cursor = self.size + u64::try_from(x).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.cursor = self.cursor.min(self.size - 1);
|
||||
return Ok(self.cursor);
|
||||
}
|
||||
}
|
||||
105
crates/pile-value/src/value/item.rs
Normal file
105
crates/pile-value/src/value/item.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use mime::Mime;
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{fs::File, path::PathBuf, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
source::{DirDataSource, S3DataSource},
|
||||
value::{ItemReader, S3Reader},
|
||||
};
|
||||
|
||||
//
|
||||
// MARK: item
|
||||
//
|
||||
|
||||
/// A cheaply-clonable pointer to an item in a dataset
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Item {
|
||||
File {
|
||||
source: Arc<DirDataSource>,
|
||||
mime: Mime,
|
||||
|
||||
path: PathBuf,
|
||||
sidecar: Option<Box<Item>>,
|
||||
},
|
||||
|
||||
S3 {
|
||||
source: Arc<S3DataSource>,
|
||||
mime: Mime,
|
||||
|
||||
key: SmartString<LazyCompact>,
|
||||
sidecar: Option<Box<Item>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Item {
|
||||
/// Open the item for reading. For S3, performs a HEAD request to determine
|
||||
/// the object size.
|
||||
pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
|
||||
Ok(match self {
|
||||
Self::File { path, .. } => ItemReader::File(File::open(path)?),
|
||||
|
||||
Self::S3 { source, key, .. } => {
|
||||
let head = source
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(source.bucket.as_str())
|
||||
.key(key.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let size = head.content_length().unwrap_or(0) as u64;
|
||||
|
||||
ItemReader::S3(S3Reader {
|
||||
client: source.client.clone(),
|
||||
bucket: source.bucket.clone(),
|
||||
key: key.to_owned(),
|
||||
cursor: 0,
|
||||
size,
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn source_name(&self) -> &pile_config::Label {
|
||||
match self {
|
||||
Self::File { source, .. } => &source.name,
|
||||
Self::S3 { source, .. } => &source.name,
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::expect_used)]
|
||||
pub fn key(&self) -> SmartString<LazyCompact> {
|
||||
match self {
|
||||
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
|
||||
Self::S3 { key, .. } => key.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
|
||||
match self {
|
||||
Self::File { path, .. } => {
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
let mut file = std::fs::File::open(path)?;
|
||||
std::io::copy(&mut file, &mut hasher)?;
|
||||
return Ok(hasher.finalize());
|
||||
}
|
||||
|
||||
Self::S3 { .. } => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mime(&self) -> &Mime {
|
||||
match self {
|
||||
Self::File { mime, .. } => mime,
|
||||
Self::S3 { mime, .. } => mime,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sidecar(&self) -> Option<&Self> {
|
||||
match self {
|
||||
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
|
||||
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
|
||||
}
|
||||
}
|
||||
}
|
||||
8
crates/pile-value/src/value/mod.rs
Normal file
8
crates/pile-value/src/value/mod.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
mod item;
|
||||
pub use item::*;
|
||||
|
||||
mod readers;
|
||||
pub use readers::*;
|
||||
|
||||
mod value;
|
||||
pub use value::*;
|
||||
193
crates/pile-value/src/value/readers.rs
Normal file
193
crates/pile-value/src/value/readers.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
//
|
||||
// MARK: traits
|
||||
//
|
||||
|
||||
pub trait AsyncReader: Send {
|
||||
/// Read a chunk of bytes.
|
||||
fn read(
|
||||
&mut self,
|
||||
buf: &mut [u8],
|
||||
) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
|
||||
|
||||
/// Read all remaining bytes into a `Vec`.
|
||||
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
|
||||
async {
|
||||
let mut buf = Vec::new();
|
||||
let mut chunk = vec![0u8; 65536];
|
||||
loop {
|
||||
let n = self.read(&mut chunk).await?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
}
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait AsyncSeekReader: AsyncReader {
|
||||
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: sync bridge
|
||||
//
|
||||
|
||||
/// Turn an async [Reader] into a sync [Read] + [Seek].
|
||||
///
|
||||
/// Never use this outside of [tokio::task::spawn_blocking],
|
||||
/// the async runtime will deadlock if this struct blocks
|
||||
/// the runtime.
|
||||
pub struct SyncReadBridge<R: AsyncReader> {
|
||||
inner: R,
|
||||
handle: Handle,
|
||||
}
|
||||
|
||||
impl<R: AsyncReader> SyncReadBridge<R> {
|
||||
/// Creates a new adapter using a handle to the current runtime.
|
||||
/// Panics if called outside of tokio
|
||||
pub fn new_current(inner: R) -> Self {
|
||||
Self::new(inner, Handle::current())
|
||||
}
|
||||
|
||||
/// Creates a new adapter using a handle to an existing runtime.
|
||||
pub fn new(inner: R, handle: Handle) -> Self {
|
||||
Self { inner, handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncReader> Read for SyncReadBridge<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
self.handle.block_on(self.inner.read(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
|
||||
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||
self.handle.block_on(self.inner.seek(pos))
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: itemreader
|
||||
//
|
||||
|
||||
pub enum ItemReader {
|
||||
File(File),
|
||||
S3(S3Reader),
|
||||
}
|
||||
|
||||
impl AsyncReader for ItemReader {
|
||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
match self {
|
||||
Self::File(x) => std::io::Read::read(x, buf),
|
||||
Self::S3(x) => x.read(buf).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncSeekReader for ItemReader {
|
||||
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
|
||||
match self {
|
||||
Self::File(x) => x.seek(pos),
|
||||
Self::S3(x) => x.seek(pos).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: S3Reader
|
||||
//
|
||||
|
||||
pub struct S3Reader {
|
||||
pub client: Arc<aws_sdk_s3::Client>,
|
||||
pub bucket: SmartString<LazyCompact>,
|
||||
pub key: SmartString<LazyCompact>,
|
||||
pub cursor: u64,
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
impl AsyncReader for S3Reader {
|
||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
let len_left = self.size.saturating_sub(self.cursor);
|
||||
if len_left == 0 || buf.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let start_byte = self.cursor;
|
||||
let len_to_read = (buf.len() as u64).min(len_left);
|
||||
let end_byte = start_byte + len_to_read - 1;
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(self.key.as_str())
|
||||
.range(format!("bytes={start_byte}-{end_byte}"))
|
||||
.send()
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let bytes = resp
|
||||
.body
|
||||
.collect()
|
||||
.await
|
||||
.map(|x| x.into_bytes())
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let n = bytes.len().min(buf.len());
|
||||
buf[..n].copy_from_slice(&bytes[..n]);
|
||||
self.cursor += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncSeekReader for S3Reader {
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(x) => self.cursor = x.min(self.size),
|
||||
|
||||
SeekFrom::Current(x) => {
|
||||
if x < 0 {
|
||||
let abs = x.unsigned_abs();
|
||||
if abs > self.cursor {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"cannot seek past start",
|
||||
));
|
||||
}
|
||||
self.cursor -= abs;
|
||||
} else {
|
||||
self.cursor += x as u64;
|
||||
}
|
||||
}
|
||||
|
||||
std::io::SeekFrom::End(x) => {
|
||||
if x < 0 {
|
||||
let abs = x.unsigned_abs();
|
||||
if abs > self.size {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"cannot seek past start",
|
||||
));
|
||||
}
|
||||
self.cursor = self.size - abs;
|
||||
} else {
|
||||
self.cursor = self.size + x as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.cursor = self.cursor.min(self.size);
|
||||
Ok(self.cursor)
|
||||
}
|
||||
}
|
||||
229
crates/pile-value/src/value/value.rs
Normal file
229
crates/pile-value/src/value/value.rs
Normal file
@@ -0,0 +1,229 @@
|
||||
use mime::Mime;
|
||||
use pile_config::objectpath::{ObjectPath, PathSegment};
|
||||
use serde_json::{Map, Value};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
extract::{
|
||||
item::ItemExtractor,
|
||||
misc::{ArrayExtractor, MapExtractor, VecExtractor},
|
||||
string::StringExtractor,
|
||||
traits::{ListExtractor, ObjectExtractor},
|
||||
},
|
||||
value::Item,
|
||||
};
|
||||
|
||||
/// An immutable, cheaply-clonable, lazily-computed value.
|
||||
/// Very similar to [serde_json::Value].
|
||||
pub enum PileValue {
|
||||
Null,
|
||||
U64(u64),
|
||||
I64(i64),
|
||||
|
||||
/// A string
|
||||
String(Arc<SmartString<LazyCompact>>),
|
||||
|
||||
/// An array of values
|
||||
Array(Arc<Vec<PileValue>>),
|
||||
|
||||
/// A binary blob
|
||||
Blob {
|
||||
mime: Mime,
|
||||
bytes: Arc<Vec<u8>>,
|
||||
},
|
||||
|
||||
/// A lazily-computed map of {label: value}
|
||||
ObjectExtractor(Arc<dyn ObjectExtractor>),
|
||||
|
||||
/// A lazily-computed array
|
||||
ListExtractor(Arc<dyn ListExtractor>),
|
||||
|
||||
/// An pointer to an item in this dataset
|
||||
Item(Item),
|
||||
}
|
||||
|
||||
impl Clone for PileValue {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::Null => Self::Null,
|
||||
Self::U64(x) => Self::U64(*x),
|
||||
Self::I64(x) => Self::I64(*x),
|
||||
Self::String(x) => Self::String(x.clone()),
|
||||
Self::Array(x) => Self::Array(x.clone()),
|
||||
Self::ObjectExtractor(x) => Self::ObjectExtractor(x.clone()),
|
||||
Self::ListExtractor(x) => Self::ListExtractor(x.clone()),
|
||||
Self::Blob { mime, bytes } => Self::Blob {
|
||||
mime: mime.clone(),
|
||||
bytes: bytes.clone(),
|
||||
},
|
||||
Self::Item(i) => Self::Item(i.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PileValue {
|
||||
pub fn object_extractor(&self) -> Arc<dyn ObjectExtractor> {
|
||||
match self {
|
||||
Self::Null => Arc::new(MapExtractor::default()),
|
||||
Self::U64(_) => Arc::new(MapExtractor::default()),
|
||||
Self::I64(_) => Arc::new(MapExtractor::default()),
|
||||
Self::Array(_) => Arc::new(MapExtractor::default()),
|
||||
Self::String(s) => Arc::new(StringExtractor::new(&s)),
|
||||
Self::Blob { .. } => Arc::new(MapExtractor::default()),
|
||||
Self::ListExtractor(_) => Arc::new(MapExtractor::default()),
|
||||
Self::ObjectExtractor(e) => e.clone(),
|
||||
Self::Item(i) => Arc::new(ItemExtractor::new(i)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_extractor(&self) -> Arc<dyn ListExtractor> {
|
||||
match self {
|
||||
Self::Null => Arc::new(VecExtractor::default()),
|
||||
Self::U64(_) => Arc::new(VecExtractor::default()),
|
||||
Self::I64(_) => Arc::new(VecExtractor::default()),
|
||||
Self::Array(a) => Arc::new(ArrayExtractor::new(a.clone())),
|
||||
Self::String(_) => Arc::new(VecExtractor::default()),
|
||||
Self::Blob { .. } => Arc::new(VecExtractor::default()),
|
||||
Self::ListExtractor(e) => e.clone(),
|
||||
Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()),
|
||||
Self::Item(_) => Arc::new(VecExtractor::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query(&self, query: &ObjectPath) -> Result<Option<Self>, std::io::Error> {
|
||||
let mut out: Option<PileValue> = Some(self.clone());
|
||||
|
||||
for s in &query.segments {
|
||||
match s {
|
||||
PathSegment::Root => out = Some(self.clone()),
|
||||
PathSegment::Field(field) => {
|
||||
let e = match out.map(|x| x.object_extractor()) {
|
||||
Some(e) => e,
|
||||
None => {
|
||||
out = None;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
out = e.field(&field).await?;
|
||||
}
|
||||
|
||||
PathSegment::Index(idx) => {
|
||||
let e = match out.map(|x| x.list_extractor()) {
|
||||
Some(e) => e,
|
||||
None => {
|
||||
out = None;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let idx = if *idx >= 0 {
|
||||
usize::try_from(*idx).ok()
|
||||
} else {
|
||||
usize::try_from(e.len().await? as i64 - idx).ok()
|
||||
};
|
||||
|
||||
let idx = match idx {
|
||||
Some(idx) => idx,
|
||||
None => {
|
||||
out = None;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
out = e.get(idx).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(out.clone());
|
||||
}
|
||||
|
||||
/// Like `to_json`, but counts populated fields instead of collecting values.
|
||||
///
|
||||
/// - Leaf values (non-null scalars, arrays, blobs) contribute `Some(1)`.
|
||||
/// - `Null` contributes `None`.
|
||||
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
|
||||
/// only the fields that had data, or `None` if all fields were absent.
|
||||
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
|
||||
pub async fn count_fields(&self) -> Result<Option<Value>, std::io::Error> {
|
||||
Ok(match self {
|
||||
Self::Null => None,
|
||||
|
||||
Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => {
|
||||
Some(Value::Number(1u64.into()))
|
||||
}
|
||||
|
||||
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
|
||||
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
|
||||
|
||||
Self::ObjectExtractor(_) | Self::Item(_) => {
|
||||
let e = self.object_extractor();
|
||||
let keys = e.fields().await?;
|
||||
let mut map = Map::new();
|
||||
for k in &keys {
|
||||
let v = match e.field(k).await? {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
if let Some(counted) = Box::pin(v.count_fields()).await? {
|
||||
map.insert(k.to_string(), counted);
|
||||
}
|
||||
}
|
||||
if map.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Value::Object(map))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> Option<&str> {
|
||||
match self {
|
||||
Self::String(x) => Some(x),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn to_json(&self) -> Result<Value, std::io::Error> {
|
||||
Ok(match self {
|
||||
Self::Null => Value::Null,
|
||||
Self::U64(x) => Value::Number((*x).into()),
|
||||
Self::I64(x) => Value::Number((*x).into()),
|
||||
Self::String(x) => Value::String(x.to_string()),
|
||||
|
||||
// TODO: replace with something meaningful?
|
||||
Self::Blob { mime, bytes } => {
|
||||
Value::String(format!("<Blob ({mime}, {} bytes)>", bytes.len()))
|
||||
}
|
||||
|
||||
#[expect(clippy::expect_used)]
|
||||
Self::Array(_) | Self::ListExtractor(_) => {
|
||||
let e = self.list_extractor();
|
||||
let len = e.len().await?;
|
||||
let mut arr = Vec::new();
|
||||
for i in 0..len {
|
||||
let v = e.get(i).await?.expect("item must be present");
|
||||
arr.push(Box::pin(v.to_json()).await?);
|
||||
}
|
||||
Value::Array(arr)
|
||||
}
|
||||
|
||||
Self::ObjectExtractor(_) | Self::Item(_) => {
|
||||
let e = self.object_extractor();
|
||||
let keys = e.fields().await?;
|
||||
let mut map = Map::new();
|
||||
for k in &keys {
|
||||
let v = match e.field(k).await? {
|
||||
Some(x) => x,
|
||||
None => continue,
|
||||
};
|
||||
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
|
||||
}
|
||||
Value::Object(map)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user