Add hash extractor

This commit is contained in:
2026-04-03 08:57:37 -07:00
parent e6e340d082
commit 4d4e9c93a2
9 changed files with 139 additions and 39 deletions

View File

@@ -11,7 +11,6 @@ workspace = true
pile-dataset = { workspace = true, features = ["axum"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
futures-core = "0.3"
serde = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }

View File

@@ -3,10 +3,8 @@ use axum::{
routing::any,
};
use bytes::Bytes;
use futures_core::Stream;
use reqwest::{Client, StatusCode, header};
use serde::Deserialize;
use std::pin::Pin;
use thiserror::Error;
use tracing::{trace, warn};
@@ -120,26 +118,6 @@ impl DatasetClient {
check_status(resp).await?.json().await.map_err(Into::into)
}
/// `GET /item` — stream the raw bytes of an item.
///
/// The returned stream yields chunks as they arrive from the server.
pub async fn get_item(
&self,
source: &str,
key: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
let url = format!("{}/item", self.base_url);
trace!(url, source, key, "GET /item");
let resp = self
.client
.get(url)
.query(&[("source", source), ("key", key)])
.send()
.await?;
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
}
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
pub async fn get_extract(
&self,

View File

@@ -21,6 +21,9 @@ toml = { workspace = true }
smartstring = { workspace = true }
regex = { workspace = true }
blake3 = { workspace = true }
sha2 = { workspace = true }
sha1 = { workspace = true }
md5 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }

View File

@@ -0,0 +1,111 @@
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{BinaryPileValue, PileValue},
};
use pile_config::Label;
use pile_io::SyncReadBridge;
use std::{io::Read, sync::Arc};
use tokio::sync::OnceCell;
fn to_hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
macro_rules! hash_algos {
($($name:ident),* $(,)?) => {
pub struct HashExtractor {
item: BinaryPileValue,
$($name: OnceCell<String>,)*
}
impl HashExtractor {
pub fn new(item: &BinaryPileValue) -> Self {
Self {
item: item.clone(),
$($name: OnceCell::new(),)*
}
}
}
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*]
});
};
}
hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512);
impl HashExtractor {
async fn compute(&self, name: &Label) -> Result<Option<String>, std::io::Error> {
let name_str = name.as_ref();
macro_rules! algo {
($cell:ident, $compute:expr) => {
if name_str == stringify!($cell) {
return Ok(Some(
self.$cell
.get_or_try_init(|| async {
let read = self.item.read().await?;
let mut read = SyncReadBridge::new_current(read);
tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
read.read_to_end(&mut bytes)?;
Ok::<String, std::io::Error>($compute(&bytes))
})
.await?
})
.await?
.clone(),
));
}
};
}
algo!(blake3, |b: &Vec<u8>| blake3::hash(b).to_hex().to_string());
algo!(md5, |b: &Vec<u8>| format!("{:x}", md5::compute(b)));
algo!(sha1, |b: &Vec<u8>| {
use sha1::Digest;
to_hex(sha1::Sha1::digest(b).as_ref())
});
algo!(sha224, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha224::digest(b).as_ref())
});
algo!(sha256, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha256::digest(b).as_ref())
});
algo!(sha384, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha384::digest(b).as_ref())
});
algo!(sha512, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha512::digest(b).as_ref())
});
Ok(None)
}
}
#[async_trait::async_trait]
impl ObjectExtractor for HashExtractor {
async fn field(
&self,
_state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self
.compute(name)
.await?
.map(|s| PileValue::String(Arc::new(s.into()))))
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(LABELS.clone())
}
}

View File

@@ -31,6 +31,9 @@ pub use text::*;
mod image;
pub use image::*;
mod hash;
pub use hash::*;
use crate::{
extract::{
misc::MapExtractor,
@@ -85,6 +88,10 @@ impl BinaryExtractor {
Label::new("text").unwrap(),
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
),
(
Label::new("hash").unwrap(),
PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))),
),
]),
};

View File

@@ -1,6 +1,5 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment};
use pile_io::SyncReadBridge;
use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
@@ -50,18 +49,6 @@ impl BinaryPileValue {
}
}
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
let read = self.read().await?;
let mut read = SyncReadBridge::new_current(read);
let out = tokio::task::spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut read, &mut hasher)?;
return Ok::<_, std::io::Error>(hasher.finalize());
})
.await??;
return Ok(out);
}
pub fn mime(&self) -> &Mime {
match self {
Self::Blob { mime, .. } => mime,

View File

@@ -1,6 +1,6 @@
use serde::Deserialize;
use std::{num::NonZeroUsize, path::PathBuf};
use tracing::info;
use tracing::debug;
use crate::config::{
env::load_env,
@@ -89,7 +89,7 @@ impl PileServerConfig {
}
}
info!(message = "Config loaded", ?config);
debug!(message = "Config loaded", ?config);
return config;
}