Add hash extractor
Some checks failed
CI / Typos (push) Failing after 18s
CI / Build and test (push) Successful in 1m43s
CI / Clippy (push) Successful in 2m31s
Docker / build-and-push (push) Successful in 4m36s
CI / Build and test (all features) (push) Successful in 6m52s

This commit is contained in:
2026-04-02 22:56:31 -07:00
parent e6e340d082
commit 9b8f825667
9 changed files with 140 additions and 39 deletions

15
Cargo.lock generated
View File

@@ -2016,7 +2016,6 @@ version = "0.0.2"
dependencies = [
"axum",
"bytes",
"futures-core",
"pile-dataset",
"reqwest",
"serde",
@@ -2098,6 +2097,7 @@ dependencies = [
"id3",
"image",
"kamadak-exif",
"md5",
"mime",
"mime_guess",
"pdf",
@@ -2109,6 +2109,8 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"sha1",
"sha2 0.11.0-rc.5",
"smartstring",
"strum",
"tokio",
@@ -2654,6 +2656,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.7",
]
[[package]]
name = "sha2"
version = "0.10.9"

View File

@@ -111,6 +111,8 @@ bytes = "1"
toml = "1.0.3"
toml_edit = "0.25.4"
sha2 = "0.11.0-rc.5"
sha1 = "0.10"
md5 = "0.7"
blake3 = "1.8.3"
dotenvy = "0.15.7"
envy = "0.4.2"

View File

@@ -11,7 +11,6 @@ workspace = true
pile-dataset = { workspace = true, features = ["axum"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
futures-core = "0.3"
serde = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }

View File

@@ -3,10 +3,8 @@ use axum::{
routing::any,
};
use bytes::Bytes;
use futures_core::Stream;
use reqwest::{Client, StatusCode, header};
use serde::Deserialize;
use std::pin::Pin;
use thiserror::Error;
use tracing::{trace, warn};
@@ -120,26 +118,6 @@ impl DatasetClient {
check_status(resp).await?.json().await.map_err(Into::into)
}
/// `GET /item` — stream the raw bytes of an item.
///
/// The returned stream yields chunks as they arrive from the server.
pub async fn get_item(
&self,
source: &str,
key: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
let url = format!("{}/item", self.base_url);
trace!(url, source, key, "GET /item");
let resp = self
.client
.get(url)
.query(&[("source", source), ("key", key)])
.send()
.await?;
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
}
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
pub async fn get_extract(
&self,

View File

@@ -21,6 +21,9 @@ toml = { workspace = true }
smartstring = { workspace = true }
regex = { workspace = true }
blake3 = { workspace = true }
sha2 = { workspace = true }
sha1 = { workspace = true }
md5 = { workspace = true }
epub = { workspace = true }
kamadak-exif = { workspace = true }
pdf = { workspace = true }

View File

@@ -0,0 +1,112 @@
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{BinaryPileValue, PileValue},
};
use pile_config::Label;
use pile_io::SyncReadBridge;
use std::{io::Read, sync::Arc};
use tokio::sync::OnceCell;
fn to_hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
macro_rules! hash_algos {
($($name:ident),* $(,)?) => {
pub struct HashExtractor {
item: BinaryPileValue,
$($name: OnceCell<String>,)*
}
impl HashExtractor {
pub fn new(item: &BinaryPileValue) -> Self {
Self {
item: item.clone(),
$($name: OnceCell::new(),)*
}
}
}
#[expect(clippy::unwrap_used)]
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
vec![$(Label::new(stringify!($name)).unwrap()),*]
});
};
}
hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512);
impl HashExtractor {
async fn compute(&self, name: &Label) -> Result<Option<String>, std::io::Error> {
let name_str = name.as_ref();
macro_rules! algo {
($cell:ident, $compute:expr) => {
if name_str == stringify!($cell) {
return Ok(Some(
self.$cell
.get_or_try_init(|| async {
let read = self.item.read().await?;
let mut read = SyncReadBridge::new_current(read);
tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
read.read_to_end(&mut bytes)?;
Ok::<String, std::io::Error>($compute(&bytes))
})
.await?
})
.await?
.clone(),
));
}
};
}
algo!(blake3, |b: &Vec<u8>| blake3::hash(b).to_hex().to_string());
algo!(md5, |b: &Vec<u8>| format!("{:x}", md5::compute(b)));
algo!(sha1, |b: &Vec<u8>| {
use sha1::Digest;
to_hex(sha1::Sha1::digest(b).as_ref())
});
algo!(sha224, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha224::digest(b).as_ref())
});
algo!(sha256, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha256::digest(b).as_ref())
});
algo!(sha384, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha384::digest(b).as_ref())
});
algo!(sha512, |b: &Vec<u8>| {
use sha2::Digest;
to_hex(sha2::Sha512::digest(b).as_ref())
});
Ok(None)
}
}
#[async_trait::async_trait]
impl ObjectExtractor for HashExtractor {
async fn field(
&self,
_state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self
.compute(name)
.await?
.map(|s| PileValue::String(Arc::new(s.into()))))
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(LABELS.clone())
}
}

View File

@@ -31,6 +31,9 @@ pub use text::*;
mod image;
pub use image::*;
mod hash;
pub use hash::*;
use crate::{
extract::{
misc::MapExtractor,
@@ -85,6 +88,10 @@ impl BinaryExtractor {
Label::new("text").unwrap(),
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
),
(
Label::new("hash").unwrap(),
PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))),
),
]),
};

View File

@@ -1,6 +1,5 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment};
use pile_io::SyncReadBridge;
use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
@@ -50,18 +49,6 @@ impl BinaryPileValue {
}
}
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
let read = self.read().await?;
let mut read = SyncReadBridge::new_current(read);
let out = tokio::task::spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut read, &mut hasher)?;
return Ok::<_, std::io::Error>(hasher.finalize());
})
.await??;
return Ok(out);
}
pub fn mime(&self) -> &Mime {
match self {
Self::Blob { mime, .. } => mime,

View File

@@ -1,6 +1,6 @@
use serde::Deserialize;
use std::{num::NonZeroUsize, path::PathBuf};
use tracing::info;
use tracing::debug;
use crate::config::{
env::load_env,
@@ -89,7 +89,7 @@ impl PileServerConfig {
}
}
info!(message = "Config loaded", ?config);
debug!(message = "Config loaded", ?config);
return config;
}