From 4d4e9c93a2dabe9cb1664517a46c6465b77b4310 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Fri, 3 Apr 2026 08:57:37 -0700 Subject: [PATCH] Add hash extractor --- Cargo.lock | 15 ++- Cargo.toml | 2 + crates/pile-client/Cargo.toml | 1 - crates/pile-client/src/lib.rs | 22 ---- crates/pile-value/Cargo.toml | 3 + crates/pile-value/src/extract/blob/hash.rs | 111 +++++++++++++++++++++ crates/pile-value/src/extract/blob/mod.rs | 7 ++ crates/pile-value/src/value/value.rs | 13 --- crates/pile/src/config/config.rs | 4 +- 9 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 crates/pile-value/src/extract/blob/hash.rs diff --git a/Cargo.lock b/Cargo.lock index 257dd2a..ad16b74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,7 +2016,6 @@ version = "0.0.2" dependencies = [ "axum", "bytes", - "futures-core", "pile-dataset", "reqwest", "serde", @@ -2098,6 +2097,7 @@ dependencies = [ "id3", "image", "kamadak-exif", + "md5", "mime", "mime_guess", "pdf", @@ -2109,6 +2109,8 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sha1", + "sha2 0.11.0-rc.5", "smartstring", "strum", "tokio", @@ -2654,6 +2656,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.9" diff --git a/Cargo.toml b/Cargo.toml index 47f5eeb..859af9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,8 @@ bytes = "1" toml = "1.0.3" toml_edit = "0.25.4" sha2 = "0.11.0-rc.5" +sha1 = "0.10" +md5 = "0.7" blake3 = "1.8.3" dotenvy = "0.15.7" envy = "0.4.2" diff --git a/crates/pile-client/Cargo.toml b/crates/pile-client/Cargo.toml index f35817c..474c222 100644 --- a/crates/pile-client/Cargo.toml +++ b/crates/pile-client/Cargo.toml @@ -11,7 +11,6 @@ workspace = true pile-dataset = { workspace = true, features = ["axum"] } reqwest = { version = "0.12", features = ["json", "stream"] } -futures-core = "0.3" serde = { workspace = true } thiserror = { workspace = true } bytes = { workspace = true } diff --git a/crates/pile-client/src/lib.rs b/crates/pile-client/src/lib.rs index 57d9acf..74d35c2 100644 --- a/crates/pile-client/src/lib.rs +++ b/crates/pile-client/src/lib.rs @@ -3,10 +3,8 @@ use axum::{ routing::any, }; use bytes::Bytes; -use futures_core::Stream; use reqwest::{Client, StatusCode, header}; use serde::Deserialize; -use std::pin::Pin; use thiserror::Error; use tracing::{trace, warn}; @@ -120,26 +118,6 @@ impl DatasetClient { check_status(resp).await?.json().await.map_err(Into::into) } - /// `GET /item` — stream the raw bytes of an item. - /// - /// The returned stream yields chunks as they arrive from the server. - pub async fn get_item( - &self, - source: &str, - key: &str, - ) -> Result> + Send>>, ClientError> { - let url = format!("{}/item", self.base_url); - trace!(url, source, key, "GET /item"); - let resp = self - .client - .get(url) - .query(&[("source", source), ("key", key)]) - .send() - .await?; - - Ok(Box::pin(check_status(resp).await?.bytes_stream())) - } - /// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`). pub async fn get_extract( &self, diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml index c76fb7b..019b9ce 100644 --- a/crates/pile-value/Cargo.toml +++ b/crates/pile-value/Cargo.toml @@ -21,6 +21,9 @@ toml = { workspace = true } smartstring = { workspace = true } regex = { workspace = true } blake3 = { workspace = true } +sha2 = { workspace = true } +sha1 = { workspace = true } +md5 = { workspace = true } epub = { workspace = true } kamadak-exif = { workspace = true } pdf = { workspace = true } diff --git a/crates/pile-value/src/extract/blob/hash.rs b/crates/pile-value/src/extract/blob/hash.rs new file mode 100644 index 0000000..193636e --- /dev/null +++ b/crates/pile-value/src/extract/blob/hash.rs @@ -0,0 +1,111 @@ +use crate::{ + extract::traits::{ExtractState, ObjectExtractor}, + value::{BinaryPileValue, PileValue}, +}; +use pile_config::Label; +use pile_io::SyncReadBridge; +use std::{io::Read, sync::Arc}; +use tokio::sync::OnceCell; + +fn to_hex(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() +} + +macro_rules! hash_algos { + ($($name:ident),* $(,)?) => { + pub struct HashExtractor { + item: BinaryPileValue, + $($name: OnceCell,)* + } + + impl HashExtractor { + pub fn new(item: &BinaryPileValue) -> Self { + Self { + item: item.clone(), + $($name: OnceCell::new(),)* + } + } + } + + static LABELS: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + vec![$(Label::new(stringify!($name)).unwrap()),*] + }); + }; +} + +hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512); + +impl HashExtractor { + async fn compute(&self, name: &Label) -> Result, std::io::Error> { + let name_str = name.as_ref(); + + macro_rules! algo { + ($cell:ident, $compute:expr) => { + if name_str == stringify!($cell) { + return Ok(Some( + self.$cell + .get_or_try_init(|| async { + let read = self.item.read().await?; + let mut read = SyncReadBridge::new_current(read); + tokio::task::spawn_blocking(move || { + let mut bytes = Vec::new(); + read.read_to_end(&mut bytes)?; + Ok::($compute(&bytes)) + }) + .await? + }) + .await? + .clone(), + )); + } + }; + } + + algo!(blake3, |b: &Vec| blake3::hash(b).to_hex().to_string()); + algo!(md5, |b: &Vec| format!("{:x}", md5::compute(b))); + algo!(sha1, |b: &Vec| { + use sha1::Digest; + to_hex(sha1::Sha1::digest(b).as_ref()) + }); + algo!(sha224, |b: &Vec| { + use sha2::Digest; + to_hex(sha2::Sha224::digest(b).as_ref()) + }); + algo!(sha256, |b: &Vec| { + use sha2::Digest; + to_hex(sha2::Sha256::digest(b).as_ref()) + }); + algo!(sha384, |b: &Vec| { + use sha2::Digest; + to_hex(sha2::Sha384::digest(b).as_ref()) + }); + algo!(sha512, |b: &Vec| { + use sha2::Digest; + to_hex(sha2::Sha512::digest(b).as_ref()) + }); + + Ok(None) + } +} + +#[async_trait::async_trait] +impl ObjectExtractor for HashExtractor { + async fn field( + &self, + _state: &ExtractState, + name: &Label, + args: Option<&str>, + ) -> Result, std::io::Error> { + if args.is_some() { + return Ok(None); + } + Ok(self + .compute(name) + .await? + .map(|s| PileValue::String(Arc::new(s.into())))) + } + + async fn fields(&self) -> Result, std::io::Error> { + Ok(LABELS.clone()) + } +} diff --git a/crates/pile-value/src/extract/blob/mod.rs b/crates/pile-value/src/extract/blob/mod.rs index 5430813..514ebc7 100644 --- a/crates/pile-value/src/extract/blob/mod.rs +++ b/crates/pile-value/src/extract/blob/mod.rs @@ -31,6 +31,9 @@ pub use text::*; mod image; pub use image::*; +mod hash; +pub use hash::*; + use crate::{ extract::{ misc::MapExtractor, @@ -85,6 +88,10 @@ impl BinaryExtractor { Label::new("text").unwrap(), PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))), ), + ( + Label::new("hash").unwrap(), + PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))), + ), ]), }; diff --git a/crates/pile-value/src/value/value.rs b/crates/pile-value/src/value/value.rs index b08c64c..6683c8e 100644 --- a/crates/pile-value/src/value/value.rs +++ b/crates/pile-value/src/value/value.rs @@ -1,6 +1,5 @@ use mime::Mime; use pile_config::objectpath::{ObjectPath, PathSegment}; -use pile_io::SyncReadBridge; use serde_json::{Map, Value}; use smartstring::{LazyCompact, SmartString}; use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc}; @@ -50,18 +49,6 @@ impl BinaryPileValue { } } - pub async fn hash(&self) -> Result { - let read = self.read().await?; - let mut read = SyncReadBridge::new_current(read); - let out = tokio::task::spawn_blocking(move || { - let mut hasher = blake3::Hasher::new(); - std::io::copy(&mut read, &mut hasher)?; - return Ok::<_, std::io::Error>(hasher.finalize()); - }) - .await??; - return Ok(out); - } - pub fn mime(&self) -> &Mime { match self { Self::Blob { mime, .. } => mime, diff --git a/crates/pile/src/config/config.rs b/crates/pile/src/config/config.rs index 0c1c73f..5dbf5f7 100644 --- a/crates/pile/src/config/config.rs +++ b/crates/pile/src/config/config.rs @@ -1,6 +1,6 @@ use serde::Deserialize; use std::{num::NonZeroUsize, path::PathBuf}; -use tracing::info; +use tracing::debug; use crate::config::{ env::load_env, @@ -89,7 +89,7 @@ impl PileServerConfig { } } - info!(message = "Config loaded", ?config); + debug!(message = "Config loaded", ?config); return config; }