From 4737acbcf40c51e74ffa6a16b66f61007c185bcc Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Sat, 21 Mar 2026 21:03:52 -0700 Subject: [PATCH] Add S3 encryption --- Cargo.lock | 116 ++++++++ Cargo.toml | 4 + crates/pile-config/src/lib.rs | 3 + crates/pile-dataset/Cargo.toml | 1 + crates/pile-dataset/src/dataset.rs | 12 +- crates/pile-dataset/src/serve/item.rs | 2 +- crates/pile-io/Cargo.toml | 15 + crates/pile-io/src/asyncreader.rs | 75 +++++ crates/pile-io/src/chachareader.rs | 205 ++++++++++++++ crates/pile-io/src/chachareader_async.rs | 163 +++++++++++ crates/pile-io/src/chachawriter.rs | 264 ++++++++++++++++++ crates/pile-io/src/chachawriter_async.rs | 95 +++++++ crates/pile-io/src/lib.rs | 17 ++ crates/pile-io/src/s3reader.rs | 88 ++++++ crates/pile-value/Cargo.toml | 3 + .../src/extract/item/epub/epub_meta.rs | 3 +- .../src/extract/item/epub/epub_text.rs | 3 +- crates/pile-value/src/extract/item/exif.rs | 3 +- crates/pile-value/src/extract/item/flac.rs | 3 +- crates/pile-value/src/extract/item/id3.rs | 3 +- crates/pile-value/src/extract/item/json.rs | 3 +- .../src/extract/item/pdf/pdf_meta.rs | 3 +- .../src/extract/item/pdf/pdf_pages.rs | 3 +- .../src/extract/item/pdf/pdf_text.rs | 3 +- crates/pile-value/src/extract/item/text.rs | 3 +- crates/pile-value/src/extract/item/toml.rs | 3 +- crates/pile-value/src/source/s3.rs | 58 +++- crates/pile-value/src/value/item.rs | 48 +++- crates/pile-value/src/value/readers.rs | 171 +----------- crates/pile/Cargo.toml | 2 + crates/pile/src/command/encrypt.rs | 74 +++++ crates/pile/src/command/mod.rs | 15 + crates/pile/src/command/upload.rs | 45 ++- 33 files changed, 1307 insertions(+), 202 deletions(-) create mode 100644 crates/pile-io/Cargo.toml create mode 100644 crates/pile-io/src/asyncreader.rs create mode 100644 crates/pile-io/src/chachareader.rs create mode 100644 crates/pile-io/src/chachareader_async.rs create mode 100644 crates/pile-io/src/chachawriter.rs create mode 100644 crates/pile-io/src/chachawriter_async.rs create mode 100644 crates/pile-io/src/lib.rs create mode 100644 crates/pile-io/src/s3reader.rs create mode 100644 crates/pile/src/command/encrypt.rs diff --git a/Cargo.lock b/Cargo.lock index 28a5767..18086f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common 0.1.7", + "generic-array", +] + [[package]] name = "aes" version = "0.8.4" @@ -123,6 +133,12 @@ dependencies = [ "rustversion", ] +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "arrayref" version = "0.3.9" @@ -580,6 +596,30 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "binrw" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53195f985e88ab94d1cc87e80049dd2929fd39e4a772c5ae96a7e5c4aad3642" +dependencies = [ + "array-init", + "binrw_derive", + "bytemuck", +] + +[[package]] +name = "binrw_derive" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5910da05ee556b789032c8ff5a61fb99239580aa3fd0bfaa8f4d094b2aee00ad" +dependencies = [ + "either", + "owo-colors", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "bitflags" version = "2.11.0" @@ -734,6 +774,30 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chacha20" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "chacha20poly1305" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.43" @@ -755,6 +819,7 @@ checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ "crypto-common 0.1.7", "inout", + "zeroize", ] [[package]] @@ -1002,6 +1067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", + "rand_core", "typenum", ] @@ -2347,6 +2413,12 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "269bca4c2591a28585d6bf10d9ed0332b7d76900a1b02bec41bdc3a2cdcda107" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "openssl-probe" version = "0.2.1" @@ -2368,6 +2440,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "owo-colors" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d" + [[package]] name = "p256" version = "0.11.1" @@ -2487,10 +2565,12 @@ dependencies = [ "anyhow", "aws-sdk-s3", "axum", + "bytes", "clap", "indicatif", "pile-config", "pile-dataset", + "pile-io", "pile-toolbox", "pile-value", "serde", @@ -2519,6 +2599,7 @@ dependencies = [ "axum", "chrono", "pile-config", + "pile-io", "pile-toolbox", "pile-value", "serde", @@ -2547,6 +2628,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pile-io" +version = "0.0.2" +dependencies = [ + "aws-sdk-s3", + "binrw", + "chacha20poly1305", + "smartstring", + "tokio", +] + [[package]] name = "pile-toolbox" version = "0.0.2" @@ -2562,7 +2654,9 @@ dependencies = [ "anyhow", "async-trait", "aws-sdk-s3", + "base64", "blake3", + "chacha20poly1305", "chrono", "epub", "id3", @@ -2574,6 +2668,7 @@ dependencies = [ "pdfium-render", "pile-config", "pile-flac", + "pile-io", "regex", "serde_json", "smartstring", @@ -2630,6 +2725,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "poly1305" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "1.13.0" @@ -3953,6 +4059,16 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81e544489bf3d8ef66c953931f56617f423cd4b5494be343d9b9d3dda037b9a3" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common 0.1.7", + "subtle", +] + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 70ba53b..880021f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ pile-config = { path = "crates/pile-config" } pile-flac = { path = "crates/pile-flac" } pile-dataset = { path = "crates/pile-dataset" } pile-value = { path = "crates/pile-value" } +pile-io = { path = "crates/pile-io" } # Clients & servers tantivy = "0.25.0" @@ -103,10 +104,13 @@ clap = { version = "4.5.60", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" base64 = "0.22.1" +bytes = "1" toml = "1.0.3" toml_edit = "0.25.4" sha2 = "0.11.0-rc.5" blake3 = "1.8.3" +chacha20poly1305 = "0.10.0" +binrw = "0.15.1" # Extractors pdf = "0.10.0" diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index 2e0a9e4..e07eb8d 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -84,6 +84,9 @@ pub enum Source { /// How to group files into items in this source #[serde(default)] pattern: GroupPattern, + + /// If provided, assume objects are encrypted with this secret key. + encryption_key: Option, }, } diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index bf9e532..6e5914c 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -11,6 +11,7 @@ workspace = true pile-config = { workspace = true } pile-toolbox = { workspace = true } pile-value = { workspace = true } +pile-io = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index ccdb38d..e54720f 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -5,7 +5,7 @@ use pile_config::{ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ extract::traits::ExtractState, - source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest}, + source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest, string_to_key}, value::{Item, PileValue}, }; use serde_json::Value; @@ -135,11 +135,14 @@ impl Datasets { region, credentials, pattern, + encryption_key, } => { if !enabled { continue; } + let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); + match S3DataSource::new( label, bucket.clone(), @@ -148,6 +151,7 @@ impl Datasets { region.clone(), credentials, pattern.clone(), + encryption_key, ) .await { @@ -236,11 +240,14 @@ impl Datasets { region, credentials, pattern, + encryption_key, } => { if !enabled { continue; } + let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); + match S3DataSource::new( label, bucket.clone(), @@ -249,6 +256,7 @@ impl Datasets { region.clone(), credentials, pattern.clone(), + encryption_key, ) .await { @@ -349,7 +357,7 @@ impl Datasets { index_writer.add_document(doc).map_err(DatasetError::from)?; total += 1; if logged_at.elapsed().as_secs() >= 5 { - debug!("Indexed {total} documents so far"); + debug!("Indexed {total} documents"); logged_at = Instant::now(); } } diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs index df50488..1505dfd 100644 --- a/crates/pile-dataset/src/serve/item.rs +++ b/crates/pile-dataset/src/serve/item.rs @@ -5,7 +5,7 @@ use axum::{ response::{IntoResponse, Response}, }; use pile_config::Label; -use pile_value::value::{AsyncReader, AsyncSeekReader}; +use pile_io::{AsyncReader, AsyncSeekReader}; use serde::Deserialize; use std::{io::SeekFrom, sync::Arc, time::Instant}; use tokio::sync::mpsc; diff --git a/crates/pile-io/Cargo.toml b/crates/pile-io/Cargo.toml new file mode 100644 index 0000000..e1160e9 --- /dev/null +++ b/crates/pile-io/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "pile-io" +version = { workspace = true } +rust-version = { workspace = true } +edition = { workspace = true } + +[lints] +workspace = true + +[dependencies] +tokio = { workspace = true } +smartstring = { workspace = true } +aws-sdk-s3 = { workspace = true } +chacha20poly1305 = { workspace = true } +binrw = { workspace = true } diff --git a/crates/pile-io/src/asyncreader.rs b/crates/pile-io/src/asyncreader.rs new file mode 100644 index 0000000..de92a3c --- /dev/null +++ b/crates/pile-io/src/asyncreader.rs @@ -0,0 +1,75 @@ +use std::io::{Read, Seek, SeekFrom}; +use tokio::runtime::Handle; + +// +// MARK: asyncreader +// + +/// An `async` equivalent of [std::io::Read]. +pub trait AsyncReader: Send { + /// Read a chunk of bytes. + fn read( + &mut self, + buf: &mut [u8], + ) -> impl Future> + Send; + + /// Read all remaining bytes into a `Vec`. + fn read_to_end(&mut self) -> impl Future, std::io::Error>> + Send { + async { + let mut buf = Vec::new(); + let mut chunk = vec![0u8; 65536]; + loop { + let n = self.read(&mut chunk).await?; + if n == 0 { + break; + } + buf.extend_from_slice(&chunk[..n]); + } + Ok(buf) + } + } +} + +/// An `async` equivalent of [std::io::Read] + [std::io::Seek]. +pub trait AsyncSeekReader: AsyncReader { + fn seek(&mut self, pos: SeekFrom) -> impl Future> + Send; +} + +// +// MARK: sync bridge +// + +/// Turn an async [Reader] into a sync [Read] + [Seek]. +/// +/// Never use this outside of [tokio::task::spawn_blocking], +/// the async runtime will deadlock if this struct blocks +/// the runtime. +pub struct SyncReadBridge { + inner: R, + handle: Handle, +} + +impl SyncReadBridge { + /// Creates a new adapter using a handle to the current runtime. + /// Panics if called outside of a tokio context. + pub fn new_current(inner: R) -> Self { + Self::new(inner, Handle::current()) + } + + /// Creates a new adapter using a handle to an existing runtime. + pub fn new(inner: R, handle: Handle) -> Self { + Self { inner, handle } + } +} + +impl Read for SyncReadBridge { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.handle.block_on(self.inner.read(buf)) + } +} + +impl Seek for SyncReadBridge { + fn seek(&mut self, pos: SeekFrom) -> Result { + self.handle.block_on(self.inner.seek(pos)) + } +} diff --git a/crates/pile-io/src/chachareader.rs b/crates/pile-io/src/chachareader.rs new file mode 100644 index 0000000..121626d --- /dev/null +++ b/crates/pile-io/src/chachareader.rs @@ -0,0 +1,205 @@ +use std::io::{Read, Seek, SeekFrom}; + +use binrw::binrw; + +use crate::{AsyncReader, AsyncSeekReader}; + +// +// MARK: header +// + +/// Serialized size of [`ChaChaHeader`] in bytes: 12 magic + 3×8 config + 8 plaintext_size. +pub const HEADER_SIZE: usize = 44; + +#[binrw] +#[brw(little, magic = b"PileChaChav1")] +#[derive(Debug, Clone, Copy)] +pub struct ChaChaHeader { + pub chunk_size: u64, + pub nonce_size: u64, + pub tag_size: u64, + pub plaintext_size: u64, +} + +// +// MARK: config +// + +#[derive(Debug, Clone, Copy)] +pub struct ChaChaReaderConfig { + pub chunk_size: u64, + pub nonce_size: u64, + pub tag_size: u64, +} + +impl Default for ChaChaReaderConfig { + fn default() -> Self { + Self { + chunk_size: 1_048_576, // 1MiB + nonce_size: 24, + tag_size: 16, + } + } +} + +impl ChaChaReaderConfig { + pub(crate) fn enc_chunk_size(&self) -> u64 { + self.chunk_size + self.nonce_size + self.tag_size + } +} + +impl From for ChaChaReaderConfig { + fn from(h: ChaChaHeader) -> Self { + Self { + chunk_size: h.chunk_size, + nonce_size: h.nonce_size, + tag_size: h.tag_size, + } + } +} + +// +// MARK: reader +// + +pub struct ChaChaReader { + inner: R, + config: ChaChaReaderConfig, + data_offset: u64, + encryption_key: [u8; 32], + cursor: u64, + plaintext_size: u64, + cached_chunk: Option<(u64, Vec)>, +} + +impl ChaChaReader { + pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { + use binrw::BinReaderExt; + + inner.seek(SeekFrom::Start(0))?; + let header: ChaChaHeader = inner.read_le().map_err(std::io::Error::other)?; + let data_offset = inner.stream_position()?; + + Ok(Self { + inner, + config: header.into(), + data_offset, + encryption_key, + cursor: 0, + plaintext_size: header.plaintext_size, + cached_chunk: None, + }) + } + + fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { + use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; + + let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size(); + self.inner.seek(SeekFrom::Start(enc_start))?; + + let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize]; + let n = self.read_exact_or_eof(&mut encrypted)?; + encrypted.truncate(n); + + if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "encrypted chunk too short", + )); + } + + let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize); + let nonce = XNonce::from_slice(nonce_bytes); + let key = chacha20poly1305::Key::from_slice(&self.encryption_key); + let cipher = XChaCha20Poly1305::new(key); + let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed") + })?; + + self.cached_chunk = Some((chunk_index, plaintext)); + Ok(()) + } + + fn read_exact_or_eof(&mut self, buf: &mut [u8]) -> Result { + let mut total = 0; + while total < buf.len() { + match self.inner.read(&mut buf[total..])? { + 0 => break, + n => total += n, + } + } + Ok(total) + } +} + +impl AsyncReader for ChaChaReader { + async fn read(&mut self, buf: &mut [u8]) -> Result { + let remaining = self.plaintext_size.saturating_sub(self.cursor); + if remaining == 0 || buf.is_empty() { + return Ok(0); + } + + let chunk_index = self.cursor / self.config.chunk_size; + + let need_fetch = match &self.cached_chunk { + None => true, + Some((idx, _)) => *idx != chunk_index, + }; + + if need_fetch { + self.fetch_chunk(chunk_index)?; + } + + #[expect(clippy::unwrap_used)] + let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); + + let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize; + let available = chunk_data.len() - offset_in_chunk; + let to_copy = available.min(buf.len()); + + buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]); + self.cursor += to_copy as u64; + Ok(to_copy) + } +} + +impl AsyncSeekReader for ChaChaReader { + async fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), + + SeekFrom::Current(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.cursor { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor -= abs; + } else { + self.cursor += x as u64; + } + } + + SeekFrom::End(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.plaintext_size { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor = self.plaintext_size - abs; + } else { + self.cursor = self.plaintext_size + x as u64; + } + } + } + + self.cursor = self.cursor.min(self.plaintext_size); + Ok(self.cursor) + } +} diff --git a/crates/pile-io/src/chachareader_async.rs b/crates/pile-io/src/chachareader_async.rs new file mode 100644 index 0000000..4c1c206 --- /dev/null +++ b/crates/pile-io/src/chachareader_async.rs @@ -0,0 +1,163 @@ +use std::io::SeekFrom; + +use crate::{AsyncReader, AsyncSeekReader, ChaChaHeader, ChaChaReaderConfig, HEADER_SIZE}; + +pub struct ChaChaReaderAsync { + inner: R, + config: ChaChaReaderConfig, + data_offset: u64, + encryption_key: [u8; 32], + cursor: u64, + plaintext_size: u64, + cached_chunk: Option<(u64, Vec)>, +} + +impl ChaChaReaderAsync { + pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { + use binrw::BinReaderExt; + use std::io::Cursor; + + inner.seek(SeekFrom::Start(0)).await?; + let mut buf = [0u8; HEADER_SIZE]; + read_exact(&mut inner, &mut buf).await?; + let header: ChaChaHeader = Cursor::new(&buf[..]) + .read_le() + .map_err(std::io::Error::other)?; + + Ok(Self { + inner, + config: header.into(), + data_offset: HEADER_SIZE as u64, + encryption_key, + cursor: 0, + plaintext_size: header.plaintext_size, + cached_chunk: None, + }) + } + + async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { + use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; + + let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size(); + self.inner.seek(SeekFrom::Start(enc_start)).await?; + + let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize]; + let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?; + encrypted.truncate(n); + + if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "encrypted chunk too short", + )); + } + + let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize); + let nonce = XNonce::from_slice(nonce_bytes); + let key = chacha20poly1305::Key::from_slice(&self.encryption_key); + let cipher = XChaCha20Poly1305::new(key); + let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed") + })?; + + self.cached_chunk = Some((chunk_index, plaintext)); + Ok(()) + } +} + +async fn read_exact(inner: &mut R, buf: &mut [u8]) -> Result<(), std::io::Error> { + let n = read_exact_or_eof(inner, buf).await?; + if n < buf.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "unexpected EOF reading header", + )); + } + Ok(()) +} + +async fn read_exact_or_eof( + inner: &mut R, + buf: &mut [u8], +) -> Result { + let mut total = 0; + while total < buf.len() { + match inner.read(&mut buf[total..]).await? { + 0 => break, + n => total += n, + } + } + Ok(total) +} + +impl AsyncReader for ChaChaReaderAsync { + async fn read(&mut self, buf: &mut [u8]) -> Result { + let remaining = self.plaintext_size.saturating_sub(self.cursor); + if remaining == 0 || buf.is_empty() { + return Ok(0); + } + + let chunk_index = self.cursor / self.config.chunk_size; + + let need_fetch = match &self.cached_chunk { + None => true, + Some((idx, _)) => *idx != chunk_index, + }; + + if need_fetch { + self.fetch_chunk(chunk_index).await?; + } + + #[expect(clippy::unwrap_used)] + let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); + + let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize; + let available = chunk_data.len() - offset_in_chunk; + let to_copy = available.min(buf.len()); + + buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]); + self.cursor += to_copy as u64; + Ok(to_copy) + } +} + +impl AsyncSeekReader for ChaChaReaderAsync { + async fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), + + SeekFrom::Current(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.cursor { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor -= abs; + } else { + self.cursor += x as u64; + } + } + + SeekFrom::End(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.plaintext_size { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor = self.plaintext_size - abs; + } else { + self.cursor = self.plaintext_size + x as u64; + } + } + } + + self.cursor = self.cursor.min(self.plaintext_size); + Ok(self.cursor) + } +} diff --git a/crates/pile-io/src/chachawriter.rs b/crates/pile-io/src/chachawriter.rs new file mode 100644 index 0000000..8b30e4e --- /dev/null +++ b/crates/pile-io/src/chachawriter.rs @@ -0,0 +1,264 @@ +use std::io::{Seek, SeekFrom, Write}; + +use crate::{ChaChaHeader, ChaChaReaderConfig}; + +/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`]. +pub fn generate_key() -> [u8; 32] { + use chacha20poly1305::aead::OsRng; + use chacha20poly1305::{KeyInit, XChaCha20Poly1305}; + XChaCha20Poly1305::generate_key(&mut OsRng).into() +} + +pub struct ChaChaWriter { + inner: W, + config: ChaChaReaderConfig, + encryption_key: [u8; 32], + buffer: Vec, + plaintext_bytes_written: u64, +} + +impl ChaChaWriter { + pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { + use binrw::BinWriterExt; + + let config = ChaChaReaderConfig::default(); + let header = ChaChaHeader { + chunk_size: config.chunk_size, + nonce_size: config.nonce_size, + tag_size: config.tag_size, + plaintext_size: 0, + }; + inner.write_le(&header).map_err(std::io::Error::other)?; + + Ok(Self { + inner, + config, + encryption_key, + buffer: Vec::new(), + plaintext_bytes_written: 0, + }) + } + + /// Encrypt and write any buffered plaintext, patch the header with the + /// final `plaintext_size`, then return the inner writer. + pub fn finish(mut self) -> Result { + use binrw::BinWriterExt; + + self.flush_buffer()?; + + self.inner.seek(SeekFrom::Start(0))?; + let header = ChaChaHeader { + chunk_size: self.config.chunk_size, + nonce_size: self.config.nonce_size, + tag_size: self.config.tag_size, + plaintext_size: self.plaintext_bytes_written, + }; + self.inner + .write_le(&header) + .map_err(std::io::Error::other)?; + + Ok(self.inner) + } + + fn encrypt_chunk(&self, plaintext: &[u8]) -> Result, std::io::Error> { + use chacha20poly1305::{ + XChaCha20Poly1305, + aead::{Aead, AeadCore, KeyInit, OsRng}, + }; + + let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); + let key = chacha20poly1305::Key::from_slice(&self.encryption_key); + let cipher = XChaCha20Poly1305::new(key); + let ciphertext = cipher + .encrypt(&nonce, plaintext) + .map_err(|_| std::io::Error::other("encryption failed"))?; + + let mut output = Vec::with_capacity(nonce.len() + ciphertext.len()); + output.extend_from_slice(&nonce); + output.extend_from_slice(&ciphertext); + Ok(output) + } + + fn flush_buffer(&mut self) -> Result<(), std::io::Error> { + if !self.buffer.is_empty() { + let encrypted = self.encrypt_chunk(&self.buffer)?; + self.inner.write_all(&encrypted)?; + self.buffer.clear(); + } + Ok(()) + } +} + +impl Write for ChaChaWriter { + fn write(&mut self, buf: &[u8]) -> Result { + self.buffer.extend_from_slice(buf); + self.plaintext_bytes_written += buf.len() as u64; + + let chunk_size = self.config.chunk_size as usize; + while self.buffer.len() >= chunk_size { + let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?; + self.inner.write_all(&encrypted)?; + self.buffer.drain(..chunk_size); + } + + Ok(buf.len()) + } + + /// Encrypts and flushes any buffered plaintext as a partial chunk. + /// + /// Prefer [`finish`](Self::finish) to retrieve the inner writer after + /// all data has been written. Calling `flush` multiple times will produce + /// multiple small encrypted chunks for the same partial data. + fn flush(&mut self) -> Result<(), std::io::Error> { + self.flush_buffer()?; + self.inner.flush() + } +} + +#[cfg(test)] +#[expect(clippy::unwrap_used)] +mod tests { + use std::io::{Cursor, SeekFrom, Write}; + + use super::ChaChaWriter; + use crate::{AsyncReader, AsyncSeekReader, ChaChaReader}; + + const KEY: [u8; 32] = [42u8; 32]; + + fn encrypt(data: &[u8]) -> Cursor> { + let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap(); + writer.write_all(data).unwrap(); + let mut buf = writer.finish().unwrap(); + buf.set_position(0); + buf + } + + async fn decrypt_all(buf: Cursor>) -> Vec { + let mut reader = ChaChaReader::new(buf, KEY).unwrap(); + reader.read_to_end().await.unwrap() + } + + #[tokio::test] + async fn roundtrip_empty() { + let buf = encrypt(&[]); + // Header present but no chunks + assert!(!buf.get_ref().is_empty()); + assert!(decrypt_all(buf).await.is_empty()); + } + + #[tokio::test] + async fn roundtrip_small() { + let data = b"hello, world!"; + assert_eq!(decrypt_all(encrypt(data)).await, data); + } + + #[tokio::test] + async fn roundtrip_exact_chunk() { + let data = vec![0xABu8; 65536]; + assert_eq!(decrypt_all(encrypt(&data)).await, data); + } + + #[tokio::test] + async fn roundtrip_multi_chunk() { + // 2.5 chunks + let data: Vec = (0u8..=255).cycle().take(65536 * 2 + 1000).collect(); + assert_eq!(decrypt_all(encrypt(&data)).await, data); + } + + #[tokio::test] + async fn roundtrip_incremental_writes() { + // Write one byte at a time + let data: Vec = (0u8..200).collect(); + let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap(); + for byte in &data { + writer.write_all(&[*byte]).unwrap(); + } + let mut buf = writer.finish().unwrap(); + buf.set_position(0); + assert_eq!(decrypt_all(buf).await, data); + } + + #[tokio::test] + async fn wrong_key_fails() { + let buf = encrypt(b"secret data"); + let mut reader = ChaChaReader::new(buf, [0u8; 32]).unwrap(); + assert!(reader.read_to_end().await.is_err()); + } + + #[tokio::test] + async fn header_magic_checked() { + // Corrupt the magic bytes — reader should fail + let mut buf = encrypt(b"data"); + buf.get_mut()[0] = 0xFF; + buf.set_position(0); + assert!(ChaChaReader::new(buf, KEY).is_err()); + } + + #[tokio::test] + async fn seek_from_start() { + let data: Vec = (0u8..100).collect(); + let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + + reader.seek(SeekFrom::Start(50)).await.unwrap(); + let mut buf = [0u8; 10]; + let mut read = 0; + while read < buf.len() { + read += reader.read(&mut buf[read..]).await.unwrap(); + } + assert_eq!(buf, data[50..60]); + } + + #[tokio::test] + async fn seek_from_end() { + let data: Vec = (0u8..100).collect(); + let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + + reader.seek(SeekFrom::End(-10)).await.unwrap(); + assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]); + } + + #[tokio::test] + async fn seek_across_chunk_boundary() { + // Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1 + let data: Vec = (0u8..=255).cycle().take(65536 + 500).collect(); + let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + + reader.seek(SeekFrom::Start(65530)).await.unwrap(); + let mut buf = vec![0u8; 12]; + let mut read = 0; + while read < buf.len() { + read += reader.read(&mut buf[read..]).await.unwrap(); + } + assert_eq!(buf, data[65530..65542]); + } + + #[tokio::test] + async fn seek_current() { + let data: Vec = (0u8..=255).cycle().take(200).collect(); + let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + + // Read 10, seek back 5, read 5 — should get bytes 5..10 + let mut first = [0u8; 10]; + let mut n = 0; + while n < first.len() { + n += reader.read(&mut first[n..]).await.unwrap(); + } + reader.seek(SeekFrom::Current(-5)).await.unwrap(); + let mut second = [0u8; 5]; + n = 0; + while n < second.len() { + n += reader.read(&mut second[n..]).await.unwrap(); + } + assert_eq!(second, data[5..10]); + } + + #[tokio::test] + async fn seek_past_end_clamps() { + let data = b"hello"; + let mut reader = ChaChaReader::new(encrypt(data), KEY).unwrap(); + + let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap(); + assert_eq!(pos, data.len() as u64); + assert_eq!(reader.read_to_end().await.unwrap(), b""); + } +} diff --git a/crates/pile-io/src/chachawriter_async.rs b/crates/pile-io/src/chachawriter_async.rs new file mode 100644 index 0000000..2548231 --- /dev/null +++ b/crates/pile-io/src/chachawriter_async.rs @@ -0,0 +1,95 @@ +use std::io::SeekFrom; + +use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; + +use crate::{ChaChaHeader, ChaChaReaderConfig}; + +pub struct ChaChaWriterAsync { + inner: W, + config: ChaChaReaderConfig, + encryption_key: [u8; 32], + buffer: Vec, + plaintext_bytes_written: u64, +} + +impl ChaChaWriterAsync { + pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { + let config = ChaChaReaderConfig::default(); + let header_bytes = serialize_header(ChaChaHeader { + chunk_size: config.chunk_size, + nonce_size: config.nonce_size, + tag_size: config.tag_size, + plaintext_size: 0, + })?; + inner.write_all(&header_bytes).await?; + + Ok(Self { + inner, + config, + encryption_key, + buffer: Vec::new(), + plaintext_bytes_written: 0, + }) + } + + pub async fn write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> { + self.buffer.extend_from_slice(buf); + self.plaintext_bytes_written += buf.len() as u64; + + let chunk_size = self.config.chunk_size as usize; + while self.buffer.len() >= chunk_size { + let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?; + self.inner.write_all(&encrypted).await?; + self.buffer.drain(..chunk_size); + } + + Ok(()) + } + + /// Encrypt and write any buffered plaintext, patch the header with the + /// final `plaintext_size`, then return the inner writer. + pub async fn finish(mut self) -> Result { + if !self.buffer.is_empty() { + let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer)?; + self.inner.write_all(&encrypted).await?; + } + + self.inner.seek(SeekFrom::Start(0)).await?; + let header_bytes = serialize_header(ChaChaHeader { + chunk_size: self.config.chunk_size, + nonce_size: self.config.nonce_size, + tag_size: self.config.tag_size, + plaintext_size: self.plaintext_bytes_written, + })?; + self.inner.write_all(&header_bytes).await?; + + Ok(self.inner) + } +} + +fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result, std::io::Error> { + use chacha20poly1305::{ + XChaCha20Poly1305, + aead::{Aead, AeadCore, KeyInit, OsRng}, + }; + + let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); + let cipher = XChaCha20Poly1305::new(chacha20poly1305::Key::from_slice(key)); + let ciphertext = cipher + .encrypt(&nonce, plaintext) + .map_err(|_| std::io::Error::other("encryption failed"))?; + + let mut output = Vec::with_capacity(nonce.len() + ciphertext.len()); + output.extend_from_slice(&nonce); + output.extend_from_slice(&ciphertext); + Ok(output) +} + +fn serialize_header(header: ChaChaHeader) -> Result, std::io::Error> { + use binrw::BinWriterExt; + use std::io::Cursor; + + let mut buf = Cursor::new(Vec::new()); + buf.write_le(&header).map_err(std::io::Error::other)?; + Ok(buf.into_inner()) +} diff --git a/crates/pile-io/src/lib.rs b/crates/pile-io/src/lib.rs new file mode 100644 index 0000000..8baf522 --- /dev/null +++ b/crates/pile-io/src/lib.rs @@ -0,0 +1,17 @@ +mod asyncreader; +pub use asyncreader::*; + +mod s3reader; +pub use s3reader::*; + +mod chachareader; +pub use chachareader::*; + +mod chachawriter; +pub use chachawriter::*; + +mod chachareader_async; +pub use chachareader_async::*; + +mod chachawriter_async; +pub use chachawriter_async::*; diff --git a/crates/pile-io/src/s3reader.rs b/crates/pile-io/src/s3reader.rs new file mode 100644 index 0000000..c2ce3e9 --- /dev/null +++ b/crates/pile-io/src/s3reader.rs @@ -0,0 +1,88 @@ +use smartstring::{LazyCompact, SmartString}; +use std::{io::SeekFrom, sync::Arc}; + +use crate::{AsyncReader, AsyncSeekReader}; + +pub struct S3Reader { + pub client: Arc, + pub bucket: SmartString, + pub key: SmartString, + pub cursor: u64, + pub size: u64, +} + +impl AsyncReader for S3Reader { + async fn read(&mut self, buf: &mut [u8]) -> Result { + let len_left = self.size.saturating_sub(self.cursor); + if len_left == 0 || buf.is_empty() { + return Ok(0); + } + + let start_byte = self.cursor; + let len_to_read = (buf.len() as u64).min(len_left); + let end_byte = start_byte + len_to_read - 1; + + let resp = self + .client + .get_object() + .bucket(self.bucket.as_str()) + .key(self.key.as_str()) + .range(format!("bytes={start_byte}-{end_byte}")) + .send() + .await + .map_err(std::io::Error::other)?; + + let bytes = resp + .body + .collect() + .await + .map(|x| x.into_bytes()) + .map_err(std::io::Error::other)?; + + let n = bytes.len().min(buf.len()); + buf[..n].copy_from_slice(&bytes[..n]); + self.cursor += n as u64; + Ok(n) + } +} + +impl AsyncSeekReader for S3Reader { + async fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(x) => self.cursor = x.min(self.size), + + SeekFrom::Current(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.cursor { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor -= abs; + } else { + self.cursor += x as u64; + } + } + + std::io::SeekFrom::End(x) => { + if x < 0 { + let abs = x.unsigned_abs(); + if abs > self.size { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek past start", + )); + } + self.cursor = self.size - abs; + } else { + self.cursor = self.size + x as u64; + } + } + } + + self.cursor = self.cursor.min(self.size); + Ok(self.cursor) + } +} diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml index 6f96cd7..25fe015 100644 --- a/crates/pile-value/Cargo.toml +++ b/crates/pile-value/Cargo.toml @@ -8,6 +8,7 @@ edition = { workspace = true } workspace = true [dependencies] +pile-io = { workspace = true } pile-config = { workspace = true } pile-flac = { workspace = true } @@ -20,6 +21,8 @@ toml = { workspace = true } smartstring = { workspace = true } regex = { workspace = true } blake3 = { workspace = true } +chacha20poly1305 = { workspace = true } +base64 = { workspace = true } epub = { workspace = true } kamadak-exif = { workspace = true } pdf = { workspace = true } diff --git a/crates/pile-value/src/extract/item/epub/epub_meta.rs b/crates/pile-value/src/extract/item/epub/epub_meta.rs index 439fc4a..a8d1887 100644 --- a/crates/pile-value/src/extract/item/epub/epub_meta.rs +++ b/crates/pile-value/src/extract/item/epub/epub_meta.rs @@ -1,5 +1,6 @@ use epub::doc::EpubDoc; use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, sync::{Arc, OnceLock}, @@ -8,7 +9,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct EpubMetaExtractor { diff --git a/crates/pile-value/src/extract/item/epub/epub_text.rs b/crates/pile-value/src/extract/item/epub/epub_text.rs index 3e9c768..91292bb 100644 --- a/crates/pile-value/src/extract/item/epub/epub_text.rs +++ b/crates/pile-value/src/extract/item/epub/epub_text.rs @@ -1,5 +1,6 @@ use epub::doc::EpubDoc; use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, sync::{Arc, OnceLock}, @@ -8,7 +9,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct EpubTextExtractor { diff --git a/crates/pile-value/src/extract/item/exif.rs b/crates/pile-value/src/extract/item/exif.rs index 78ef3ff..5c95487 100644 --- a/crates/pile-value/src/extract/item/exif.rs +++ b/crates/pile-value/src/extract/item/exif.rs @@ -1,4 +1,5 @@ use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, io::BufReader, @@ -8,7 +9,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct ExifExtractor { diff --git a/crates/pile-value/src/extract/item/flac.rs b/crates/pile-value/src/extract/item/flac.rs index 30fcbb1..21845f9 100644 --- a/crates/pile-value/src/extract/item/flac.rs +++ b/crates/pile-value/src/extract/item/flac.rs @@ -1,6 +1,7 @@ use mime::Mime; use pile_config::Label; use pile_flac::{FlacBlock, FlacDecodeError, FlacReader}; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, io::BufReader, @@ -10,7 +11,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ListExtractor, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct FlacImagesExtractor { diff --git a/crates/pile-value/src/extract/item/id3.rs b/crates/pile-value/src/extract/item/id3.rs index 7de35e6..653ba06 100644 --- a/crates/pile-value/src/extract/item/id3.rs +++ b/crates/pile-value/src/extract/item/id3.rs @@ -1,5 +1,6 @@ use id3::Tag; use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ borrow::Cow, collections::HashMap, @@ -10,7 +11,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct Id3Extractor { diff --git a/crates/pile-value/src/extract/item/json.rs b/crates/pile-value/src/extract/item/json.rs index 2485e1e..a9898ba 100644 --- a/crates/pile-value/src/extract/item/json.rs +++ b/crates/pile-value/src/extract/item/json.rs @@ -1,4 +1,5 @@ use pile_config::Label; +use pile_io::AsyncReader; use std::{ collections::HashMap, sync::{Arc, OnceLock}, @@ -6,7 +7,7 @@ use std::{ use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{AsyncReader, Item, PileValue}, + value::{Item, PileValue}, }; fn json_to_pile(value: serde_json::Value) -> PileValue { diff --git a/crates/pile-value/src/extract/item/pdf/pdf_meta.rs b/crates/pile-value/src/extract/item/pdf/pdf_meta.rs index 043fa77..221033a 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_meta.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_meta.rs @@ -1,6 +1,7 @@ use pdf::file::FileOptions; use pdf::primitive::{Date, TimeRel}; use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, io::BufReader, @@ -10,7 +11,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct PdfMetaExtractor { diff --git a/crates/pile-value/src/extract/item/pdf/pdf_pages.rs b/crates/pile-value/src/extract/item/pdf/pdf_pages.rs index f443bf5..6c9101e 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_pages.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_pages.rs @@ -1,5 +1,6 @@ use image::ImageFormat; use pdfium_render::prelude::*; +use pile_io::SyncReadBridge; use std::{ io::{BufReader, Cursor}, sync::Arc, @@ -8,7 +9,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ListExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct PdfPagesExtractor { diff --git a/crates/pile-value/src/extract/item/pdf/pdf_text.rs b/crates/pile-value/src/extract/item/pdf/pdf_text.rs index 3ec7bba..b90f244 100644 --- a/crates/pile-value/src/extract/item/pdf/pdf_text.rs +++ b/crates/pile-value/src/extract/item/pdf/pdf_text.rs @@ -1,6 +1,7 @@ use pdf::content::{Op, TextDrawAdjusted}; use pdf::file::FileOptions; use pile_config::Label; +use pile_io::SyncReadBridge; use std::{ collections::HashMap, io::BufReader, @@ -10,7 +11,7 @@ use tracing::trace; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{Item, PileValue, SyncReadBridge}, + value::{Item, PileValue}, }; pub struct PdfTextExtractor { diff --git a/crates/pile-value/src/extract/item/text.rs b/crates/pile-value/src/extract/item/text.rs index c58b342..772646e 100644 --- a/crates/pile-value/src/extract/item/text.rs +++ b/crates/pile-value/src/extract/item/text.rs @@ -1,9 +1,10 @@ use pile_config::Label; +use pile_io::AsyncReader; use std::sync::{Arc, OnceLock}; use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{AsyncReader, Item, PileValue}, + value::{Item, PileValue}, }; pub struct TextExtractor { diff --git a/crates/pile-value/src/extract/item/toml.rs b/crates/pile-value/src/extract/item/toml.rs index e49b3d8..a11c1ae 100644 --- a/crates/pile-value/src/extract/item/toml.rs +++ b/crates/pile-value/src/extract/item/toml.rs @@ -1,4 +1,5 @@ use pile_config::Label; +use pile_io::AsyncReader; use std::{ collections::HashMap, sync::{Arc, OnceLock}, @@ -6,7 +7,7 @@ use std::{ use crate::{ extract::traits::{ExtractState, ObjectExtractor}, - value::{AsyncReader, Item, PileValue}, + value::{Item, PileValue}, }; fn toml_to_pile(value: toml::Value) -> PileValue { diff --git a/crates/pile-value/src/source/s3.rs b/crates/pile-value/src/source/s3.rs index 613b77f..bbe8fa8 100644 --- a/crates/pile-value/src/source/s3.rs +++ b/crates/pile-value/src/source/s3.rs @@ -23,6 +23,7 @@ pub struct S3DataSource { pub prefix: Option>, pub client: Arc, pub pattern: GroupPattern, + pub encryption_key: Option<[u8; 32]>, pub index: OnceLock, Item>>, } @@ -35,6 +36,7 @@ impl S3DataSource { region: String, credentials: &S3Credentials, pattern: GroupPattern, + encryption_key: Option<[u8; 32]>, ) -> Result, std::io::Error> { let client = { let creds = Credentials::new( @@ -63,6 +65,7 @@ impl S3DataSource { prefix: prefix.map(|x| x.into()), client: Arc::new(client), pattern, + encryption_key, index: OnceLock::new(), }); @@ -94,8 +97,15 @@ impl S3DataSource { for obj in resp.contents() { let Some(full_key) = obj.key() else { continue }; - let key = strip_prefix(full_key, source.prefix.as_deref()); - all_keys.insert(key.into()); + let raw_key = strip_prefix(full_key, source.prefix.as_deref()); + let key = match &source.encryption_key { + None => raw_key.into(), + Some(enc_key) => match decrypt_path(enc_key, raw_key) { + Some(decrypted) => decrypted.into(), + None => continue, + }, + }; + all_keys.insert(key); } if !is_truncated { @@ -219,6 +229,50 @@ impl DataSource for Arc { } } +/// Derive an encryption key from a password +pub fn string_to_key(password: &str) -> [u8; 32] { + blake3::derive_key("pile s3 encryption", password.as_bytes()) +} + +/// Encrypt a logical path to a base64 S3 key using a deterministic nonce. +pub fn encrypt_path(enc_key: &[u8; 32], path: &str) -> String { + use base64::Engine; + use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; + + let hash = blake3::keyed_hash(enc_key, path.as_bytes()); + let nonce_bytes = &hash.as_bytes()[..24]; + let nonce = XNonce::from_slice(nonce_bytes); + let key = chacha20poly1305::Key::from_slice(enc_key); + let cipher = XChaCha20Poly1305::new(key); + #[expect(clippy::expect_used)] + let ciphertext = cipher + .encrypt(nonce, path.as_bytes()) + .expect("path encryption should not fail"); + + let mut result = nonce_bytes.to_vec(); + result.extend_from_slice(&ciphertext); + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(result) +} + +/// Decrypt a base64 S3 key back to its logical path. +fn decrypt_path(enc_key: &[u8; 32], encrypted: &str) -> Option { + use base64::Engine; + use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; + + let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(encrypted) + .ok()?; + if bytes.len() < 24 + 16 { + return None; + } + let (nonce_bytes, ciphertext) = bytes.split_at(24); + let nonce = XNonce::from_slice(nonce_bytes); + let key = chacha20poly1305::Key::from_slice(enc_key); + let cipher = XChaCha20Poly1305::new(key); + let plaintext = cipher.decrypt(nonce, ciphertext).ok()?; + String::from_utf8(plaintext).ok() +} + fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str { match prefix { None => key, diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs index f322faa..a49ecd1 100644 --- a/crates/pile-value/src/value/item.rs +++ b/crates/pile-value/src/value/item.rs @@ -1,11 +1,12 @@ use mime::Mime; use pile_config::Label; +use pile_io::{ChaChaReaderAsync, S3Reader, SyncReadBridge}; use smartstring::{LazyCompact, SmartString}; use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; use crate::{ - source::{DirDataSource, S3DataSource}, - value::{ItemReader, S3Reader, SyncReadBridge}, + source::{DirDataSource, S3DataSource, encrypt_path}, + value::ItemReader, }; // @@ -40,13 +41,20 @@ impl Item { Self::File { path, .. } => ItemReader::File(File::open(path)?), Self::S3 { source, key, .. } => { + let logical_key = key.as_str(); + + let s3_key_part: SmartString = match &source.encryption_key { + None => logical_key.into(), + Some(enc_key) => encrypt_path(enc_key, logical_key).into(), + }; + let full_key: SmartString = match &source.prefix { - None => key.clone(), + None => s3_key_part, Some(p) => { if p.ends_with('/') { - format!("{p}{key}").into() + format!("{p}{s3_key_part}").into() } else { - format!("{p}/{key}").into() + format!("{p}/{s3_key_part}").into() } } }; @@ -62,13 +70,29 @@ impl Item { let size = head.content_length().unwrap_or(0) as u64; - ItemReader::S3(S3Reader { - client: source.client.clone(), - bucket: source.bucket.clone(), - key: full_key, - cursor: 0, - size, - }) + match source.encryption_key { + None => ItemReader::S3(S3Reader { + client: source.client.clone(), + bucket: source.bucket.clone(), + key: full_key, + cursor: 0, + size, + }), + + Some(enc_key) => ItemReader::EncryptedS3( + ChaChaReaderAsync::new( + S3Reader { + client: source.client.clone(), + bucket: source.bucket.clone(), + key: full_key, + cursor: 0, + size, + }, + enc_key, + ) + .await?, + ), + } } }) } diff --git a/crates/pile-value/src/value/readers.rs b/crates/pile-value/src/value/readers.rs index 953c2e4..4d3e3df 100644 --- a/crates/pile-value/src/value/readers.rs +++ b/crates/pile-value/src/value/readers.rs @@ -1,81 +1,5 @@ -use smartstring::{LazyCompact, SmartString}; -use std::{ - fs::File, - io::{Read, Seek, SeekFrom}, - sync::Arc, -}; -use tokio::runtime::Handle; - -// -// MARK: traits -// - -pub trait AsyncReader: Send { - /// Read a chunk of bytes. - fn read( - &mut self, - buf: &mut [u8], - ) -> impl Future> + Send; - - /// Read all remaining bytes into a `Vec`. - fn read_to_end(&mut self) -> impl Future, std::io::Error>> + Send { - async { - let mut buf = Vec::new(); - let mut chunk = vec![0u8; 65536]; - loop { - let n = self.read(&mut chunk).await?; - if n == 0 { - break; - } - buf.extend_from_slice(&chunk[..n]); - } - Ok(buf) - } - } -} - -pub trait AsyncSeekReader: AsyncReader { - fn seek(&mut self, pos: SeekFrom) -> impl Future> + Send; -} - -// -// MARK: sync bridge -// - -/// Turn an async [Reader] into a sync [Read] + [Seek]. -/// -/// Never use this outside of [tokio::task::spawn_blocking], -/// the async runtime will deadlock if this struct blocks -/// the runtime. -pub struct SyncReadBridge { - inner: R, - handle: Handle, -} - -impl SyncReadBridge { - /// Creates a new adapter using a handle to the current runtime. - /// Panics if called outside of tokio - pub fn new_current(inner: R) -> Self { - Self::new(inner, Handle::current()) - } - - /// Creates a new adapter using a handle to an existing runtime. - pub fn new(inner: R, handle: Handle) -> Self { - Self { inner, handle } - } -} - -impl Read for SyncReadBridge { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.handle.block_on(self.inner.read(buf)) - } -} - -impl Seek for SyncReadBridge { - fn seek(&mut self, pos: SeekFrom) -> Result { - self.handle.block_on(self.inner.seek(pos)) - } -} +use pile_io::{AsyncReader, AsyncSeekReader, ChaChaReaderAsync, S3Reader}; +use std::{fs::File, io::Seek}; // // MARK: itemreader @@ -84,6 +8,7 @@ impl Seek for SyncReadBridge { pub enum ItemReader { File(File), S3(S3Reader), + EncryptedS3(ChaChaReaderAsync), } impl AsyncReader for ItemReader { @@ -91,6 +16,7 @@ impl AsyncReader for ItemReader { match self { Self::File(x) => std::io::Read::read(x, buf), Self::S3(x) => x.read(buf).await, + Self::EncryptedS3(x) => x.read(buf).await, } } } @@ -100,94 +26,7 @@ impl AsyncSeekReader for ItemReader { match self { Self::File(x) => x.seek(pos), Self::S3(x) => x.seek(pos).await, + Self::EncryptedS3(x) => x.seek(pos).await, } } } - -// -// MARK: S3Reader -// - -pub struct S3Reader { - pub client: Arc, - pub bucket: SmartString, - pub key: SmartString, - pub cursor: u64, - pub size: u64, -} - -impl AsyncReader for S3Reader { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let len_left = self.size.saturating_sub(self.cursor); - if len_left == 0 || buf.is_empty() { - return Ok(0); - } - - let start_byte = self.cursor; - let len_to_read = (buf.len() as u64).min(len_left); - let end_byte = start_byte + len_to_read - 1; - - let resp = self - .client - .get_object() - .bucket(self.bucket.as_str()) - .key(self.key.as_str()) - .range(format!("bytes={start_byte}-{end_byte}")) - .send() - .await - .map_err(std::io::Error::other)?; - - let bytes = resp - .body - .collect() - .await - .map(|x| x.into_bytes()) - .map_err(std::io::Error::other)?; - - let n = bytes.len().min(buf.len()); - buf[..n].copy_from_slice(&bytes[..n]); - self.cursor += n as u64; - Ok(n) - } -} - -impl AsyncSeekReader for S3Reader { - async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(x) => self.cursor = x.min(self.size), - - SeekFrom::Current(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.cursor { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor -= abs; - } else { - self.cursor += x as u64; - } - } - - std::io::SeekFrom::End(x) => { - if x < 0 { - let abs = x.unsigned_abs(); - if abs > self.size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek past start", - )); - } - self.cursor = self.size - abs; - } else { - self.cursor = self.size + x as u64; - } - } - } - - self.cursor = self.cursor.min(self.size); - Ok(self.cursor) - } -} diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index d842fa6..855aab5 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -12,8 +12,10 @@ pile-toolbox = { workspace = true } pile-dataset = { workspace = true, features = ["axum", "pdfium"] } pile-value = { workspace = true, features = ["pdfium"] } pile-config = { workspace = true } +pile-io = { workspace = true } aws-sdk-s3 = { workspace = true } +bytes = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio = { workspace = true } diff --git a/crates/pile/src/command/encrypt.rs b/crates/pile/src/command/encrypt.rs new file mode 100644 index 0000000..1746fb8 --- /dev/null +++ b/crates/pile/src/command/encrypt.rs @@ -0,0 +1,74 @@ +use anyhow::{Context, Result}; +use clap::Args; +use pile_io::{AsyncReader, ChaChaReader, ChaChaWriter}; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use pile_value::source::string_to_key; +use std::io::{Cursor, Write}; +use std::path::PathBuf; + +use crate::{CliCmd, GlobalContext}; + +#[derive(Debug, Args)] +pub struct EncryptCommand { + /// File to encrypt + path: PathBuf, + + /// Encryption password + password: String, +} + +#[derive(Debug, Args)] +pub struct DecryptCommand { + /// File to decrypt + path: PathBuf, + + /// Encryption password + password: String, +} + +impl CliCmd for EncryptCommand { + async fn run( + self, + _ctx: GlobalContext, + _flag: CancelFlag, + ) -> Result> { + let key = string_to_key(&self.password); + let plaintext = tokio::fs::read(&self.path) + .await + .with_context(|| format!("while reading '{}'", self.path.display()))?; + + let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), key) + .context("while initializing encryptor")?; + writer.write_all(&plaintext).context("while encrypting")?; + let buf = writer.finish().context("while finalizing encryptor")?; + + std::io::stdout() + .write_all(buf.get_ref()) + .context("while writing to stdout")?; + + Ok(0) + } +} + +impl CliCmd for DecryptCommand { + async fn run( + self, + _ctx: GlobalContext, + _flag: CancelFlag, + ) -> Result> { + let key = string_to_key(&self.password); + let ciphertext = tokio::fs::read(&self.path) + .await + .with_context(|| format!("while reading '{}'", self.path.display()))?; + + let mut reader = ChaChaReader::new(Cursor::new(ciphertext), key) + .context("while initializing decryptor")?; + let plaintext = reader.read_to_end().await.context("while decrypting")?; + + std::io::stdout() + .write_all(&plaintext) + .context("while writing to stdout")?; + + Ok(0) + } +} diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index e0fe6e6..6731244 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -5,6 +5,7 @@ use pile_toolbox::cancelabletask::{ }; mod check; +mod encrypt; mod fields; mod index; mod init; @@ -83,6 +84,18 @@ pub enum SubCommand { #[command(flatten)] cmd: upload::UploadCommand, }, + + /// Encrypt a file to stdout + Encrypt { + #[command(flatten)] + cmd: encrypt::EncryptCommand, + }, + + /// Decrypt a file to stdout + Decrypt { + #[command(flatten)] + cmd: encrypt::DecryptCommand, + }, } impl CliCmdDispatch for SubCommand { @@ -98,6 +111,8 @@ impl CliCmdDispatch for SubCommand { Self::Item { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx), Self::Upload { cmd } => cmd.start(ctx), + Self::Encrypt { cmd } => cmd.start(ctx), + Self::Decrypt { cmd } => cmd.start(ctx), Self::Docs {} => { print_help_recursively(&mut Cli::command(), None); diff --git a/crates/pile/src/command/upload.rs b/crates/pile/src/command/upload.rs index 1211171..b31320c 100644 --- a/crates/pile/src/command/upload.rs +++ b/crates/pile/src/command/upload.rs @@ -4,9 +4,15 @@ use clap::Args; use indicatif::ProgressBar; use pile_config::Label; use pile_dataset::{Dataset, Datasets}; +use pile_io::ChaChaWriter; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; -use pile_value::source::{DataSource, DirDataSource, S3DataSource}; -use std::{path::PathBuf, sync::Arc, time::Duration}; +use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path}; +use std::{ + io::{Cursor, Write}, + path::PathBuf, + sync::Arc, + time::Duration, +}; use tokio::task::JoinSet; use tracing::info; @@ -146,15 +152,36 @@ impl CliCmd for UploadCommand { let relative_str = item.key().to_string(); let item_path = dir_ds.dir.join(&relative_str); - let key = format!("{full_prefix}/{relative_str}"); + let enc_key_part = match s3_ds.encryption_key { + None => relative_str.clone(), + Some(ref enc_key) => encrypt_path(enc_key, &relative_str), + }; + let key = format!("{full_prefix}/{enc_key_part}"); let mime = item.mime().to_string(); let client = Arc::clone(&s3_ds.client); let bucket = bucket.clone(); + let encryption_key = s3_ds.encryption_key; join_set.spawn(async move { - let body = ByteStream::from_path(&item_path) - .await - .with_context(|| format!("while opening '{}'", item_path.display()))?; + let body = if let Some(enc_key) = encryption_key { + let path = item_path.clone(); + let encrypted = + tokio::task::spawn_blocking(move || -> anyhow::Result> { + let plaintext = std::fs::read(&path) + .with_context(|| format!("while opening '{}'", path.display()))?; + let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), enc_key) + .context("while initializing encryptor")?; + writer.write_all(&plaintext).context("while encrypting")?; + Ok(writer.finish().context("while finalizing")?.into_inner()) + }) + .await + .context("encryptor task panicked")??; + ByteStream::from(bytes::Bytes::from(encrypted)) + } else { + ByteStream::from_path(&item_path) + .await + .with_context(|| format!("while opening '{}'", item_path.display()))? + }; client .put_object() @@ -202,7 +229,9 @@ fn get_dir_source( Some(_) => Err(anyhow::anyhow!( "source '{name}' is not a filesystem source" )), - None => Err(anyhow::anyhow!("source '{name}' not found in config")), + None => Err(anyhow::anyhow!( + "filesystem source '{name}' not found in config" + )), } } @@ -214,7 +243,7 @@ fn get_s3_source( match ds.sources.get(label) { Some(Dataset::S3(s)) => Ok(Arc::clone(s)), Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")), - None => Err(anyhow::anyhow!("source '{name}' not found in config")), + None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")), } }