Add S3 encryption
All checks were successful
CI / Typos (push) Successful in 19s
CI / Build and test (push) Successful in 2m36s
CI / Clippy (push) Successful in 3m33s
CI / Build and test (all features) (push) Successful in 8m52s

This commit is contained in:
2026-03-21 21:03:52 -07:00
parent 39f3c7707b
commit 4737acbcf4
33 changed files with 1307 additions and 202 deletions

View File

@@ -1,11 +1,12 @@
use mime::Mime;
use pile_config::Label;
use pile_io::{ChaChaReaderAsync, S3Reader, SyncReadBridge};
use smartstring::{LazyCompact, SmartString};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
use crate::{
source::{DirDataSource, S3DataSource},
value::{ItemReader, S3Reader, SyncReadBridge},
source::{DirDataSource, S3DataSource, encrypt_path},
value::ItemReader,
};
//
@@ -40,13 +41,20 @@ impl Item {
Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let logical_key = key.as_str();
let s3_key_part: SmartString<LazyCompact> = match &source.encryption_key {
None => logical_key.into(),
Some(enc_key) => encrypt_path(enc_key, logical_key).into(),
};
let full_key: SmartString<LazyCompact> = match &source.prefix {
None => key.clone(),
None => s3_key_part,
Some(p) => {
if p.ends_with('/') {
format!("{p}{key}").into()
format!("{p}{s3_key_part}").into()
} else {
format!("{p}/{key}").into()
format!("{p}/{s3_key_part}").into()
}
}
};
@@ -62,13 +70,29 @@ impl Item {
let size = head.content_length().unwrap_or(0) as u64;
ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: full_key,
cursor: 0,
size,
})
match source.encryption_key {
None => ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: full_key,
cursor: 0,
size,
}),
Some(enc_key) => ItemReader::EncryptedS3(
ChaChaReaderAsync::new(
S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: full_key,
cursor: 0,
size,
},
enc_key,
)
.await?,
),
}
}
})
}

View File

@@ -1,81 +1,5 @@
use smartstring::{LazyCompact, SmartString};
use std::{
fs::File,
io::{Read, Seek, SeekFrom},
sync::Arc,
};
use tokio::runtime::Handle;
//
// MARK: traits
//
pub trait AsyncReader: Send {
/// Read a chunk of bytes.
fn read(
&mut self,
buf: &mut [u8],
) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
/// Read all remaining bytes into a `Vec`.
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
async {
let mut buf = Vec::new();
let mut chunk = vec![0u8; 65536];
loop {
let n = self.read(&mut chunk).await?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
}
Ok(buf)
}
}
}
pub trait AsyncSeekReader: AsyncReader {
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
}
//
// MARK: sync bridge
//
/// Turn an async [Reader] into a sync [Read] + [Seek].
///
/// Never use this outside of [tokio::task::spawn_blocking],
/// the async runtime will deadlock if this struct blocks
/// the runtime.
pub struct SyncReadBridge<R: AsyncReader> {
inner: R,
handle: Handle,
}
impl<R: AsyncReader> SyncReadBridge<R> {
/// Creates a new adapter using a handle to the current runtime.
/// Panics if called outside of tokio
pub fn new_current(inner: R) -> Self {
Self::new(inner, Handle::current())
}
/// Creates a new adapter using a handle to an existing runtime.
pub fn new(inner: R, handle: Handle) -> Self {
Self { inner, handle }
}
}
impl<R: AsyncReader> Read for SyncReadBridge<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.handle.block_on(self.inner.read(buf))
}
}
impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.handle.block_on(self.inner.seek(pos))
}
}
use pile_io::{AsyncReader, AsyncSeekReader, ChaChaReaderAsync, S3Reader};
use std::{fs::File, io::Seek};
//
// MARK: itemreader
@@ -84,6 +8,7 @@ impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
pub enum ItemReader {
File(File),
S3(S3Reader),
EncryptedS3(ChaChaReaderAsync<S3Reader>),
}
impl AsyncReader for ItemReader {
@@ -91,6 +16,7 @@ impl AsyncReader for ItemReader {
match self {
Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await,
Self::EncryptedS3(x) => x.read(buf).await,
}
}
}
@@ -100,94 +26,7 @@ impl AsyncSeekReader for ItemReader {
match self {
Self::File(x) => x.seek(pos),
Self::S3(x) => x.seek(pos).await,
Self::EncryptedS3(x) => x.seek(pos).await,
}
}
}
//
// MARK: S3Reader
//
pub struct S3Reader {
pub client: Arc<aws_sdk_s3::Client>,
pub bucket: SmartString<LazyCompact>,
pub key: SmartString<LazyCompact>,
pub cursor: u64,
pub size: u64,
}
impl AsyncReader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
let start_byte = self.cursor;
let len_to_read = (buf.len() as u64).min(len_left);
let end_byte = start_byte + len_to_read - 1;
let resp = self
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
.map_err(std::io::Error::other)?;
let bytes = resp
.body
.collect()
.await
.map(|x| x.into_bytes())
.map_err(std::io::Error::other)?;
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
self.cursor += n as u64;
Ok(n)
}
}
impl AsyncSeekReader for S3Reader {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
std::io::SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.size - abs;
} else {
self.cursor = self.size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.size);
Ok(self.cursor)
}
}