From 76d38d48c5d010eaa47e460587e50933eb68edc6 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Mon, 23 Mar 2026 21:09:22 -0700 Subject: [PATCH] Reorganize S3 clients --- Cargo.lock | 1 - crates/pile-dataset/src/dataset.rs | 66 ++++++++----- crates/pile-io/src/chacha/format.rs | 47 +++++++++ crates/pile-io/src/chacha/mod.rs | 9 ++ .../src/{chachareader.rs => chacha/reader.rs} | 86 +++------------- .../reader_async.rs} | 34 ++++--- .../writer.rs} | 28 +++--- .../writer_async.rs} | 52 +++++----- crates/pile-io/src/lib.rs | 12 +-- crates/pile-io/src/s3reader.rs | 97 ++++++++++++++++++- crates/pile-value/Cargo.toml | 1 - crates/pile-value/src/source/s3.rs | 61 ++++++------ crates/pile-value/src/value/item.rs | 38 ++------ crates/pile-value/src/value/readers.rs | 4 +- crates/pile/src/command/encrypt.rs | 7 +- crates/pile/src/command/upload.rs | 14 +-- 16 files changed, 310 insertions(+), 247 deletions(-) create mode 100644 crates/pile-io/src/chacha/format.rs create mode 100644 crates/pile-io/src/chacha/mod.rs rename crates/pile-io/src/{chachareader.rs => chacha/reader.rs} (63%) rename crates/pile-io/src/{chachareader_async.rs => chacha/reader_async.rs} (78%) rename crates/pile-io/src/{chachawriter_async.rs => chacha/writer.rs} (77%) rename crates/pile-io/src/{chachawriter.rs => chacha/writer_async.rs} (82%) diff --git a/Cargo.lock b/Cargo.lock index 18086f5..6999763 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2653,7 +2653,6 @@ version = "0.0.2" dependencies = [ "anyhow", "async-trait", - "aws-sdk-s3", "base64", "blake3", "chacha20poly1305", diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 0fd9756..499a127 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -81,6 +81,7 @@ pub struct Datasets { pub config: ConfigToml, pub sources: HashMap, + pub disabled_sources: HashMap, } impl Datasets { @@ -114,6 +115,8 @@ impl Datasets { }; let mut sources = HashMap::new(); + let mut disabled_sources = HashMap::new(); + for (label, source) in &config.dataset.source { match source { Source::Filesystem { @@ -121,11 +124,12 @@ impl Datasets { path, pattern, } => { - if !enabled { - continue; - } + let target = match enabled { + true => &mut sources, + false => &mut disabled_sources, + }; - sources.insert( + target.insert( label.clone(), Dataset::Dir( DirDataSource::new(label, path_parent.join(path), pattern.clone()) @@ -144,26 +148,29 @@ impl Datasets { pattern, encryption_key, } => { - if !enabled { - continue; - } + let target = match enabled { + true => &mut sources, + false => &mut disabled_sources, + }; let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); match S3DataSource::new( label, - bucket.clone(), - prefix.clone(), - endpoint.clone(), - region.clone(), - credentials, + bucket, + prefix.as_ref().map(|x| x.as_str()), + endpoint.as_ref().map(|x| x.as_str()), + region, + &credentials.access_key_id, + &credentials.secret_access_key, + 10_000_000, pattern.clone(), encryption_key, ) .await { Ok(ds) => { - sources.insert(label.clone(), Dataset::S3(ds)); + target.insert(label.clone(), Dataset::S3(ds)); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); @@ -179,6 +186,7 @@ impl Datasets { path_parent, config, sources, + disabled_sources, }); } @@ -219,6 +227,7 @@ impl Datasets { .join(config.dataset.name.as_str()); let mut sources = HashMap::new(); + let mut disabled_sources = HashMap::new(); for (label, source) in &config.dataset.source { match source { Source::Filesystem { @@ -226,11 +235,12 @@ impl Datasets { path, pattern, } => { - if !enabled { - continue; - } + let target = match enabled { + true => &mut sources, + false => &mut disabled_sources, + }; - sources.insert( + target.insert( label.clone(), Dataset::Dir( DirDataSource::new(label, path_parent.join(path), pattern.clone()) @@ -249,26 +259,29 @@ impl Datasets { pattern, encryption_key, } => { - if !enabled { - continue; - } + let target = match enabled { + true => &mut sources, + false => &mut disabled_sources, + }; let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); match S3DataSource::new( label, - bucket.clone(), - prefix.clone(), - endpoint.clone(), - region.clone(), - credentials, + bucket, + prefix.as_ref().map(|x| x.as_str()), + endpoint.as_ref().map(|x| x.as_str()), + region, + &credentials.access_key_id, + &credentials.secret_access_key, + 10_000_000, pattern.clone(), encryption_key, ) .await { Ok(ds) => { - sources.insert(label.clone(), Dataset::S3(ds)); + target.insert(label.clone(), Dataset::S3(ds)); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); @@ -284,6 +297,7 @@ impl Datasets { path_parent, config, sources, + disabled_sources, }); } diff --git a/crates/pile-io/src/chacha/format.rs b/crates/pile-io/src/chacha/format.rs new file mode 100644 index 0000000..f96598a --- /dev/null +++ b/crates/pile-io/src/chacha/format.rs @@ -0,0 +1,47 @@ +use binrw::{binrw, meta::ReadMagic}; + +#[binrw] +#[brw(little, magic = b"PileChaChav1")] +#[derive(Debug, Clone, Copy)] +pub struct ChaChaHeaderv1 { + pub config: ChaChaConfigv1, + pub plaintext_size: u64, +} + +impl ChaChaHeaderv1 { + pub const SIZE: usize = ChaChaHeaderv1::MAGIC.len() + std::mem::size_of::() + 8; +} + +#[test] +fn chachaheader_size() { + assert_eq!(ChaChaHeaderv1::SIZE, std::mem::size_of::()) +} + +// +// MARK: config +// + +#[binrw] +#[brw(little)] +#[derive(Debug, Clone, Copy)] +pub struct ChaChaConfigv1 { + pub chunk_size: u64, + pub nonce_size: u64, + pub tag_size: u64, +} + +impl Default for ChaChaConfigv1 { + fn default() -> Self { + Self { + chunk_size: 64 * 1024, + nonce_size: 24, + tag_size: 16, + } + } +} + +impl ChaChaConfigv1 { + pub(crate) fn enc_chunk_size(&self) -> u64 { + self.chunk_size + self.nonce_size + self.tag_size + } +} diff --git a/crates/pile-io/src/chacha/mod.rs b/crates/pile-io/src/chacha/mod.rs new file mode 100644 index 0000000..8f3f6a1 --- /dev/null +++ b/crates/pile-io/src/chacha/mod.rs @@ -0,0 +1,9 @@ +mod reader; +mod reader_async; +mod writer; +mod writer_async; + +pub use {reader::*, reader_async::*, writer::*, writer_async::*}; + +mod format; +pub use format::*; diff --git a/crates/pile-io/src/chachareader.rs b/crates/pile-io/src/chacha/reader.rs similarity index 63% rename from crates/pile-io/src/chachareader.rs rename to crates/pile-io/src/chacha/reader.rs index 121626d..2353238 100644 --- a/crates/pile-io/src/chachareader.rs +++ b/crates/pile-io/src/chacha/reader.rs @@ -1,70 +1,15 @@ use std::io::{Read, Seek, SeekFrom}; -use binrw::binrw; - -use crate::{AsyncReader, AsyncSeekReader}; - -// -// MARK: header -// - -/// Serialized size of [`ChaChaHeader`] in bytes: 12 magic + 3×8 config + 8 plaintext_size. -pub const HEADER_SIZE: usize = 44; - -#[binrw] -#[brw(little, magic = b"PileChaChav1")] -#[derive(Debug, Clone, Copy)] -pub struct ChaChaHeader { - pub chunk_size: u64, - pub nonce_size: u64, - pub tag_size: u64, - pub plaintext_size: u64, -} - -// -// MARK: config -// - -#[derive(Debug, Clone, Copy)] -pub struct ChaChaReaderConfig { - pub chunk_size: u64, - pub nonce_size: u64, - pub tag_size: u64, -} - -impl Default for ChaChaReaderConfig { - fn default() -> Self { - Self { - chunk_size: 1_048_576, // 1MiB - nonce_size: 24, - tag_size: 16, - } - } -} - -impl ChaChaReaderConfig { - pub(crate) fn enc_chunk_size(&self) -> u64 { - self.chunk_size + self.nonce_size + self.tag_size - } -} - -impl From for ChaChaReaderConfig { - fn from(h: ChaChaHeader) -> Self { - Self { - chunk_size: h.chunk_size, - nonce_size: h.nonce_size, - tag_size: h.tag_size, - } - } -} +use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1}; // // MARK: reader // -pub struct ChaChaReader { +pub struct ChaChaReaderv1 { inner: R, - config: ChaChaReaderConfig, + header: ChaChaHeaderv1, + data_offset: u64, encryption_key: [u8; 32], cursor: u64, @@ -72,17 +17,17 @@ pub struct ChaChaReader { cached_chunk: Option<(u64, Vec)>, } -impl ChaChaReader { +impl ChaChaReaderv1 { pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { use binrw::BinReaderExt; inner.seek(SeekFrom::Start(0))?; - let header: ChaChaHeader = inner.read_le().map_err(std::io::Error::other)?; + let header: ChaChaHeaderv1 = inner.read_le().map_err(std::io::Error::other)?; let data_offset = inner.stream_position()?; Ok(Self { inner, - config: header.into(), + header, data_offset, encryption_key, cursor: 0, @@ -94,21 +39,22 @@ impl ChaChaReader { fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size(); + let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size(); self.inner.seek(SeekFrom::Start(enc_start))?; - let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize]; + let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize]; let n = self.read_exact_or_eof(&mut encrypted)?; encrypted.truncate(n); - if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize { + if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize + { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "encrypted chunk too short", )); } - let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize); + let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize); let nonce = XNonce::from_slice(nonce_bytes); let key = chacha20poly1305::Key::from_slice(&self.encryption_key); let cipher = XChaCha20Poly1305::new(key); @@ -132,14 +78,14 @@ impl ChaChaReader { } } -impl AsyncReader for ChaChaReader { +impl AsyncReader for ChaChaReaderv1 { async fn read(&mut self, buf: &mut [u8]) -> Result { let remaining = self.plaintext_size.saturating_sub(self.cursor); if remaining == 0 || buf.is_empty() { return Ok(0); } - let chunk_index = self.cursor / self.config.chunk_size; + let chunk_index = self.cursor / self.header.config.chunk_size; let need_fetch = match &self.cached_chunk { None => true, @@ -153,7 +99,7 @@ impl AsyncReader for ChaChaReader { #[expect(clippy::unwrap_used)] let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); - let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize; + let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize; let available = chunk_data.len() - offset_in_chunk; let to_copy = available.min(buf.len()); @@ -163,7 +109,7 @@ impl AsyncReader for ChaChaReader { } } -impl AsyncSeekReader for ChaChaReader { +impl AsyncSeekReader for ChaChaReaderv1 { async fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), diff --git a/crates/pile-io/src/chachareader_async.rs b/crates/pile-io/src/chacha/reader_async.rs similarity index 78% rename from crates/pile-io/src/chachareader_async.rs rename to crates/pile-io/src/chacha/reader_async.rs index 4c1c206..4d1aeb3 100644 --- a/crates/pile-io/src/chachareader_async.rs +++ b/crates/pile-io/src/chacha/reader_async.rs @@ -1,10 +1,11 @@ use std::io::SeekFrom; -use crate::{AsyncReader, AsyncSeekReader, ChaChaHeader, ChaChaReaderConfig, HEADER_SIZE}; +use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1}; -pub struct ChaChaReaderAsync { +pub struct ChaChaReaderv1Async { inner: R, - config: ChaChaReaderConfig, + header: ChaChaHeaderv1, + data_offset: u64, encryption_key: [u8; 32], cursor: u64, @@ -12,22 +13,22 @@ pub struct ChaChaReaderAsync { cached_chunk: Option<(u64, Vec)>, } -impl ChaChaReaderAsync { +impl ChaChaReaderv1Async { pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result { use binrw::BinReaderExt; use std::io::Cursor; inner.seek(SeekFrom::Start(0)).await?; - let mut buf = [0u8; HEADER_SIZE]; + let mut buf = [0u8; ChaChaHeaderv1::SIZE]; read_exact(&mut inner, &mut buf).await?; - let header: ChaChaHeader = Cursor::new(&buf[..]) + let header: ChaChaHeaderv1 = Cursor::new(&buf[..]) .read_le() .map_err(std::io::Error::other)?; Ok(Self { inner, - config: header.into(), - data_offset: HEADER_SIZE as u64, + header, + data_offset: buf.len() as u64, encryption_key, cursor: 0, plaintext_size: header.plaintext_size, @@ -38,21 +39,22 @@ impl ChaChaReaderAsync { async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> { use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead}; - let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size(); + let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size(); self.inner.seek(SeekFrom::Start(enc_start)).await?; - let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize]; + let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize]; let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?; encrypted.truncate(n); - if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize { + if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize + { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "encrypted chunk too short", )); } - let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize); + let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize); let nonce = XNonce::from_slice(nonce_bytes); let key = chacha20poly1305::Key::from_slice(&self.encryption_key); let cipher = XChaCha20Poly1305::new(key); @@ -90,14 +92,14 @@ async fn read_exact_or_eof( Ok(total) } -impl AsyncReader for ChaChaReaderAsync { +impl AsyncReader for ChaChaReaderv1Async { async fn read(&mut self, buf: &mut [u8]) -> Result { let remaining = self.plaintext_size.saturating_sub(self.cursor); if remaining == 0 || buf.is_empty() { return Ok(0); } - let chunk_index = self.cursor / self.config.chunk_size; + let chunk_index = self.cursor / self.header.config.chunk_size; let need_fetch = match &self.cached_chunk { None => true, @@ -111,7 +113,7 @@ impl AsyncReader for ChaChaReaderAsync { #[expect(clippy::unwrap_used)] let (_, chunk_data) = self.cached_chunk.as_ref().unwrap(); - let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize; + let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize; let available = chunk_data.len() - offset_in_chunk; let to_copy = available.min(buf.len()); @@ -121,7 +123,7 @@ impl AsyncReader for ChaChaReaderAsync { } } -impl AsyncSeekReader for ChaChaReaderAsync { +impl AsyncSeekReader for ChaChaReaderv1Async { async fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size), diff --git a/crates/pile-io/src/chachawriter_async.rs b/crates/pile-io/src/chacha/writer.rs similarity index 77% rename from crates/pile-io/src/chachawriter_async.rs rename to crates/pile-io/src/chacha/writer.rs index 2548231..41e57c3 100644 --- a/crates/pile-io/src/chachawriter_async.rs +++ b/crates/pile-io/src/chacha/writer.rs @@ -2,11 +2,12 @@ use std::io::SeekFrom; use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; -use crate::{ChaChaHeader, ChaChaReaderConfig}; +use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1}; pub struct ChaChaWriterAsync { inner: W, - config: ChaChaReaderConfig, + header: ChaChaHeaderv1, + encryption_key: [u8; 32], buffer: Vec, plaintext_bytes_written: u64, @@ -14,18 +15,15 @@ pub struct ChaChaWriterAsync { impl ChaChaWriterAsync { pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { - let config = ChaChaReaderConfig::default(); - let header_bytes = serialize_header(ChaChaHeader { - chunk_size: config.chunk_size, - nonce_size: config.nonce_size, - tag_size: config.tag_size, + let header = ChaChaHeaderv1 { + config: ChaChaConfigv1::default(), plaintext_size: 0, - })?; - inner.write_all(&header_bytes).await?; + }; + inner.write_all(&serialize_header(header)?).await?; Ok(Self { inner, - config, + header, encryption_key, buffer: Vec::new(), plaintext_bytes_written: 0, @@ -36,7 +34,7 @@ impl ChaChaWriterAsync { self.buffer.extend_from_slice(buf); self.plaintext_bytes_written += buf.len() as u64; - let chunk_size = self.config.chunk_size as usize; + let chunk_size = self.header.config.chunk_size as usize; while self.buffer.len() >= chunk_size { let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?; self.inner.write_all(&encrypted).await?; @@ -55,10 +53,8 @@ impl ChaChaWriterAsync { } self.inner.seek(SeekFrom::Start(0)).await?; - let header_bytes = serialize_header(ChaChaHeader { - chunk_size: self.config.chunk_size, - nonce_size: self.config.nonce_size, - tag_size: self.config.tag_size, + let header_bytes = serialize_header(ChaChaHeaderv1 { + config: self.header.config, plaintext_size: self.plaintext_bytes_written, })?; self.inner.write_all(&header_bytes).await?; @@ -85,7 +81,7 @@ fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result, std::io::E Ok(output) } -fn serialize_header(header: ChaChaHeader) -> Result, std::io::Error> { +fn serialize_header(header: ChaChaHeaderv1) -> Result, std::io::Error> { use binrw::BinWriterExt; use std::io::Cursor; diff --git a/crates/pile-io/src/chachawriter.rs b/crates/pile-io/src/chacha/writer_async.rs similarity index 82% rename from crates/pile-io/src/chachawriter.rs rename to crates/pile-io/src/chacha/writer_async.rs index 8b30e4e..db6dbf8 100644 --- a/crates/pile-io/src/chachawriter.rs +++ b/crates/pile-io/src/chacha/writer_async.rs @@ -1,6 +1,6 @@ use std::io::{Seek, SeekFrom, Write}; -use crate::{ChaChaHeader, ChaChaReaderConfig}; +use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1}; /// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`]. pub fn generate_key() -> [u8; 32] { @@ -9,30 +9,28 @@ pub fn generate_key() -> [u8; 32] { XChaCha20Poly1305::generate_key(&mut OsRng).into() } -pub struct ChaChaWriter { +pub struct ChaChaWriterv1 { inner: W, - config: ChaChaReaderConfig, + header: ChaChaHeaderv1, + encryption_key: [u8; 32], buffer: Vec, plaintext_bytes_written: u64, } -impl ChaChaWriter { +impl ChaChaWriterv1 { pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result { use binrw::BinWriterExt; - let config = ChaChaReaderConfig::default(); - let header = ChaChaHeader { - chunk_size: config.chunk_size, - nonce_size: config.nonce_size, - tag_size: config.tag_size, + let header = ChaChaHeaderv1 { + config: ChaChaConfigv1::default(), plaintext_size: 0, }; inner.write_le(&header).map_err(std::io::Error::other)?; Ok(Self { inner, - config, + header, encryption_key, buffer: Vec::new(), plaintext_bytes_written: 0, @@ -47,10 +45,8 @@ impl ChaChaWriter { self.flush_buffer()?; self.inner.seek(SeekFrom::Start(0))?; - let header = ChaChaHeader { - chunk_size: self.config.chunk_size, - nonce_size: self.config.nonce_size, - tag_size: self.config.tag_size, + let header = ChaChaHeaderv1 { + config: self.header.config, plaintext_size: self.plaintext_bytes_written, }; self.inner @@ -89,12 +85,12 @@ impl ChaChaWriter { } } -impl Write for ChaChaWriter { +impl Write for ChaChaWriterv1 { fn write(&mut self, buf: &[u8]) -> Result { self.buffer.extend_from_slice(buf); self.plaintext_bytes_written += buf.len() as u64; - let chunk_size = self.config.chunk_size as usize; + let chunk_size = self.header.config.chunk_size as usize; while self.buffer.len() >= chunk_size { let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?; self.inner.write_all(&encrypted)?; @@ -120,13 +116,13 @@ impl Write for ChaChaWriter { mod tests { use std::io::{Cursor, SeekFrom, Write}; - use super::ChaChaWriter; - use crate::{AsyncReader, AsyncSeekReader, ChaChaReader}; + use super::ChaChaWriterv1; + use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaReaderv1}; const KEY: [u8; 32] = [42u8; 32]; fn encrypt(data: &[u8]) -> Cursor> { - let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap(); + let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap(); writer.write_all(data).unwrap(); let mut buf = writer.finish().unwrap(); buf.set_position(0); @@ -134,7 +130,7 @@ mod tests { } async fn decrypt_all(buf: Cursor>) -> Vec { - let mut reader = ChaChaReader::new(buf, KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(buf, KEY).unwrap(); reader.read_to_end().await.unwrap() } @@ -169,7 +165,7 @@ mod tests { async fn roundtrip_incremental_writes() { // Write one byte at a time let data: Vec = (0u8..200).collect(); - let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap(); + let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap(); for byte in &data { writer.write_all(&[*byte]).unwrap(); } @@ -181,7 +177,7 @@ mod tests { #[tokio::test] async fn wrong_key_fails() { let buf = encrypt(b"secret data"); - let mut reader = ChaChaReader::new(buf, [0u8; 32]).unwrap(); + let mut reader = ChaChaReaderv1::new(buf, [0u8; 32]).unwrap(); assert!(reader.read_to_end().await.is_err()); } @@ -191,13 +187,13 @@ mod tests { let mut buf = encrypt(b"data"); buf.get_mut()[0] = 0xFF; buf.set_position(0); - assert!(ChaChaReader::new(buf, KEY).is_err()); + assert!(ChaChaReaderv1::new(buf, KEY).is_err()); } #[tokio::test] async fn seek_from_start() { let data: Vec = (0u8..100).collect(); - let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); reader.seek(SeekFrom::Start(50)).await.unwrap(); let mut buf = [0u8; 10]; @@ -211,7 +207,7 @@ mod tests { #[tokio::test] async fn seek_from_end() { let data: Vec = (0u8..100).collect(); - let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); reader.seek(SeekFrom::End(-10)).await.unwrap(); assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]); @@ -221,7 +217,7 @@ mod tests { async fn seek_across_chunk_boundary() { // Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1 let data: Vec = (0u8..=255).cycle().take(65536 + 500).collect(); - let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); reader.seek(SeekFrom::Start(65530)).await.unwrap(); let mut buf = vec![0u8; 12]; @@ -235,7 +231,7 @@ mod tests { #[tokio::test] async fn seek_current() { let data: Vec = (0u8..=255).cycle().take(200).collect(); - let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap(); // Read 10, seek back 5, read 5 — should get bytes 5..10 let mut first = [0u8; 10]; @@ -255,7 +251,7 @@ mod tests { #[tokio::test] async fn seek_past_end_clamps() { let data = b"hello"; - let mut reader = ChaChaReader::new(encrypt(data), KEY).unwrap(); + let mut reader = ChaChaReaderv1::new(encrypt(data), KEY).unwrap(); let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap(); assert_eq!(pos, data.len() as u64); diff --git a/crates/pile-io/src/lib.rs b/crates/pile-io/src/lib.rs index 8baf522..27eae54 100644 --- a/crates/pile-io/src/lib.rs +++ b/crates/pile-io/src/lib.rs @@ -4,14 +4,4 @@ pub use asyncreader::*; mod s3reader; pub use s3reader::*; -mod chachareader; -pub use chachareader::*; - -mod chachawriter; -pub use chachawriter::*; - -mod chachareader_async; -pub use chachareader_async::*; - -mod chachawriter_async; -pub use chachawriter_async::*; +pub mod chacha; diff --git a/crates/pile-io/src/s3reader.rs b/crates/pile-io/src/s3reader.rs index c2ce3e9..77cc871 100644 --- a/crates/pile-io/src/s3reader.rs +++ b/crates/pile-io/src/s3reader.rs @@ -1,10 +1,102 @@ +use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use smartstring::{LazyCompact, SmartString}; -use std::{io::SeekFrom, sync::Arc}; +use std::{fmt::Debug, io::SeekFrom, sync::Arc}; use crate::{AsyncReader, AsyncSeekReader}; +// +// MARK: client +// + +/// An interface to an S3 bucket. +/// +/// TODO: S3 is slow and expensive. Ideally, we'll have this struct cache data +/// so we don't have to download anything twice. This is, however, complicated, +/// and doesn't fully solve the "expensive" problem. +pub struct S3Client { + pub client: aws_sdk_s3::Client, + bucket: SmartString, + + /// maximum number of bytes to use for cached data + cache_limit_bytes: usize, +} + +impl Debug for S3Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("S3Client") + .field("bucket", &self.bucket) + .field("cache_limit_bytes", &self.cache_limit_bytes) + .finish() + } +} + +impl S3Client { + pub async fn new( + bucket: &str, + endpoint: Option<&str>, + region: &str, + access_key_id: &str, + secret_access_key: &str, + cache_limit_bytes: usize, + ) -> Arc { + let client = { + let mut s3_config = aws_sdk_s3::config::Builder::new() + .behavior_version(BehaviorVersion::latest()) + .region(Region::new(region.to_owned())) + .credentials_provider(Credentials::new( + access_key_id, + secret_access_key, + None, + None, + "pile", + )); + + if let Some(ep) = endpoint { + s3_config = s3_config.endpoint_url(ep).force_path_style(true); + } + + aws_sdk_s3::Client::from_conf(s3_config.build()) + }; + + return Arc::new(Self { + bucket: bucket.into(), + client, + cache_limit_bytes, + }); + } + + pub fn bucket(&self) -> &str { + &self.bucket + } + + pub async fn get(self: &Arc, key: &str) -> Result { + let head = self + .client + .head_object() + .bucket(self.bucket.as_str()) + .key(key) + .send() + .await + .map_err(std::io::Error::other)?; + + let size = head.content_length().unwrap_or(0) as u64; + + Ok(S3Reader { + client: self.clone(), + bucket: self.bucket.clone(), + key: key.into(), + cursor: 0, + size, + }) + } +} + +// +// MARK: reader +// + pub struct S3Reader { - pub client: Arc, + pub client: Arc, pub bucket: SmartString, pub key: SmartString, pub cursor: u64, @@ -23,6 +115,7 @@ impl AsyncReader for S3Reader { let end_byte = start_byte + len_to_read - 1; let resp = self + .client .client .get_object() .bucket(self.bucket.as_str()) diff --git a/crates/pile-value/Cargo.toml b/crates/pile-value/Cargo.toml index 25fe015..2af5667 100644 --- a/crates/pile-value/Cargo.toml +++ b/crates/pile-value/Cargo.toml @@ -31,7 +31,6 @@ image = { workspace = true, optional = true } id3 = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } -aws-sdk-s3 = { workspace = true } mime = { workspace = true } mime_guess = { workspace = true } diff --git a/crates/pile-value/src/source/s3.rs b/crates/pile-value/src/source/s3.rs index 58e43d3..886ae67 100644 --- a/crates/pile-value/src/source/s3.rs +++ b/crates/pile-value/src/source/s3.rs @@ -1,9 +1,9 @@ -use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use chrono::{DateTime, Utc}; use pile_config::{ - Label, S3Credentials, + Label, pattern::{GroupPattern, GroupSegment}, }; +use pile_io::S3Client; use smartstring::{LazyCompact, SmartString}; use std::{ collections::{HashMap, HashSet}, @@ -19,9 +19,9 @@ use crate::{ #[derive(Debug)] pub struct S3DataSource { pub name: Label, - pub bucket: SmartString, + pub client: Arc, + pub prefix: Option>, - pub client: Arc, pub pattern: GroupPattern, pub encryption_key: Option<[u8; 32]>, pub index: OnceLock, Item>>, @@ -30,40 +30,30 @@ pub struct S3DataSource { impl S3DataSource { pub async fn new( name: &Label, - bucket: String, - prefix: Option, - endpoint: Option, - region: String, - credentials: &S3Credentials, + bucket: &str, + prefix: Option<&str>, + endpoint: Option<&str>, + region: &str, + access_key_id: &str, + secret_access_key: &str, + cache_limit_bytes: usize, pattern: GroupPattern, encryption_key: Option<[u8; 32]>, ) -> Result, std::io::Error> { - let client = { - let creds = Credentials::new( - &credentials.access_key_id, - &credentials.secret_access_key, - None, - None, - "pile", - ); - - let mut s3_config = aws_sdk_s3::config::Builder::new() - .behavior_version(BehaviorVersion::latest()) - .region(Region::new(region)) - .credentials_provider(creds); - - if let Some(ep) = endpoint { - s3_config = s3_config.endpoint_url(ep).force_path_style(true); - } - - aws_sdk_s3::Client::from_conf(s3_config.build()) - }; + let client = S3Client::new( + bucket, + endpoint, + region, + access_key_id, + secret_access_key, + cache_limit_bytes, + ) + .await; let source = Arc::new(Self { name: name.clone(), - bucket: bucket.into(), + client, prefix: prefix.map(|x| x.into()), - client: Arc::new(client), pattern, encryption_key, index: OnceLock::new(), @@ -78,9 +68,10 @@ impl S3DataSource { loop { let mut req = source + .client .client .list_objects_v2() - .bucket(source.bucket.as_str()); + .bucket(source.client.bucket()); if let Some(prefix) = &source.prefix { req = req.prefix(prefix.as_str()); @@ -191,7 +182,11 @@ impl DataSource for Arc { let mut continuation_token: Option = None; loop { - let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str()); + let mut req = self + .client + .client + .list_objects_v2() + .bucket(self.client.bucket()); if let Some(prefix) = &self.prefix { req = req.prefix(prefix.as_str()); diff --git a/crates/pile-value/src/value/item.rs b/crates/pile-value/src/value/item.rs index a49ecd1..281bdc3 100644 --- a/crates/pile-value/src/value/item.rs +++ b/crates/pile-value/src/value/item.rs @@ -1,6 +1,6 @@ use mime::Mime; use pile_config::Label; -use pile_io::{ChaChaReaderAsync, S3Reader, SyncReadBridge}; +use pile_io::{SyncReadBridge, chacha::ChaChaReaderv1Async}; use smartstring::{LazyCompact, SmartString}; use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; @@ -59,39 +59,13 @@ impl Item { } }; - let head = source - .client - .head_object() - .bucket(source.bucket.as_str()) - .key(full_key.as_str()) - .send() - .await - .map_err(std::io::Error::other)?; - - let size = head.content_length().unwrap_or(0) as u64; + let reader = source.client.get(&full_key).await?; match source.encryption_key { - None => ItemReader::S3(S3Reader { - client: source.client.clone(), - bucket: source.bucket.clone(), - key: full_key, - cursor: 0, - size, - }), - - Some(enc_key) => ItemReader::EncryptedS3( - ChaChaReaderAsync::new( - S3Reader { - client: source.client.clone(), - bucket: source.bucket.clone(), - key: full_key, - cursor: 0, - size, - }, - enc_key, - ) - .await?, - ), + None => ItemReader::S3(reader), + Some(enc_key) => { + ItemReader::EncryptedS3(ChaChaReaderv1Async::new(reader, enc_key).await?) + } } } }) diff --git a/crates/pile-value/src/value/readers.rs b/crates/pile-value/src/value/readers.rs index 4d3e3df..fbbe299 100644 --- a/crates/pile-value/src/value/readers.rs +++ b/crates/pile-value/src/value/readers.rs @@ -1,4 +1,4 @@ -use pile_io::{AsyncReader, AsyncSeekReader, ChaChaReaderAsync, S3Reader}; +use pile_io::{AsyncReader, AsyncSeekReader, S3Reader, chacha::ChaChaReaderv1Async}; use std::{fs::File, io::Seek}; // @@ -8,7 +8,7 @@ use std::{fs::File, io::Seek}; pub enum ItemReader { File(File), S3(S3Reader), - EncryptedS3(ChaChaReaderAsync), + EncryptedS3(ChaChaReaderv1Async), } impl AsyncReader for ItemReader { diff --git a/crates/pile/src/command/encrypt.rs b/crates/pile/src/command/encrypt.rs index 1746fb8..bc2a13c 100644 --- a/crates/pile/src/command/encrypt.rs +++ b/crates/pile/src/command/encrypt.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use clap::Args; -use pile_io::{AsyncReader, ChaChaReader, ChaChaWriter}; +use pile_io::AsyncReader; +use pile_io::chacha::{ChaChaReaderv1, ChaChaWriterv1}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::source::string_to_key; use std::io::{Cursor, Write}; @@ -37,7 +38,7 @@ impl CliCmd for EncryptCommand { .await .with_context(|| format!("while reading '{}'", self.path.display()))?; - let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), key) + let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), key) .context("while initializing encryptor")?; writer.write_all(&plaintext).context("while encrypting")?; let buf = writer.finish().context("while finalizing encryptor")?; @@ -61,7 +62,7 @@ impl CliCmd for DecryptCommand { .await .with_context(|| format!("while reading '{}'", self.path.display()))?; - let mut reader = ChaChaReader::new(Cursor::new(ciphertext), key) + let mut reader = ChaChaReaderv1::new(Cursor::new(ciphertext), key) .context("while initializing decryptor")?; let plaintext = reader.read_to_end().await.context("while decrypting")?; diff --git a/crates/pile/src/command/upload.rs b/crates/pile/src/command/upload.rs index b31320c..a4b501a 100644 --- a/crates/pile/src/command/upload.rs +++ b/crates/pile/src/command/upload.rs @@ -4,7 +4,7 @@ use clap::Args; use indicatif::ProgressBar; use pile_config::Label; use pile_dataset::{Dataset, Datasets}; -use pile_io::ChaChaWriter; +use pile_io::chacha::ChaChaWriterv1; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path}; use std::{ @@ -71,12 +71,12 @@ impl CliCmd for UploadCommand { let bucket = self .bucket .as_deref() - .unwrap_or(s3_ds.bucket.as_str()) + .unwrap_or(s3_ds.client.bucket()) .to_owned(); let full_prefix = self.prefix.trim_matches('/').to_owned(); // Check for existing objects at the target prefix - let existing_keys = list_prefix(&s3_ds.client, &bucket, &full_prefix) + let existing_keys = list_prefix(&s3_ds.client.client, &bucket, &full_prefix) .await .context("while checking for existing objects at target prefix")?; @@ -89,6 +89,7 @@ impl CliCmd for UploadCommand { ); for key in &existing_keys { s3_ds + .client .client .delete_object() .bucket(&bucket) @@ -169,7 +170,7 @@ impl CliCmd for UploadCommand { tokio::task::spawn_blocking(move || -> anyhow::Result> { let plaintext = std::fs::read(&path) .with_context(|| format!("while opening '{}'", path.display()))?; - let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), enc_key) + let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), enc_key) .context("while initializing encryptor")?; writer.write_all(&plaintext).context("while encrypting")?; Ok(writer.finish().context("while finalizing")?.into_inner()) @@ -184,6 +185,7 @@ impl CliCmd for UploadCommand { }; client + .client .put_object() .bucket(&bucket) .key(&key) @@ -224,7 +226,7 @@ fn get_dir_source( label: &Label, name: &str, ) -> Result, anyhow::Error> { - match ds.sources.get(label) { + match ds.sources.get(label).or(ds.disabled_sources.get(label)) { Some(Dataset::Dir(d)) => Ok(Arc::clone(d)), Some(_) => Err(anyhow::anyhow!( "source '{name}' is not a filesystem source" @@ -240,7 +242,7 @@ fn get_s3_source( label: &Label, name: &str, ) -> Result, anyhow::Error> { - match ds.sources.get(label) { + match ds.sources.get(label).or(ds.disabled_sources.get(label)) { Some(Dataset::S3(s)) => Ok(Arc::clone(s)), Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")), None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),