Remove S3 + encryption
All checks were successful
CI / Typos (push) Successful in 20s
CI / Clippy (push) Successful in 2m44s
CI / Build and test (push) Successful in 3m10s
Docker / build-and-push (push) Successful in 5m6s
CI / Build and test (all features) (push) Successful in 6m51s

This commit is contained in:
2026-03-26 14:37:18 -07:00
parent ec7326a55e
commit 80f4ebdbe6
24 changed files with 42 additions and 2915 deletions

985
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -92,8 +92,6 @@ utoipa-swagger-ui = { version = "9.0.2", features = [
tokio = { version = "1.49.0", features = ["full"] } tokio = { version = "1.49.0", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
async-trait = "0.1" async-trait = "0.1"
aws-sdk-s3 = "1"
aws-config = "1"
# CLI & logging # CLI & logging
tracing = "0.1.44" tracing = "0.1.44"
@@ -112,8 +110,6 @@ toml = "1.0.3"
toml_edit = "0.25.4" toml_edit = "0.25.4"
sha2 = "0.11.0-rc.5" sha2 = "0.11.0-rc.5"
blake3 = "1.8.3" blake3 = "1.8.3"
chacha20poly1305 = "0.10.0"
binrw = "0.15.1"
# Extractors # Extractors
pdf = "0.10.0" pdf = "0.10.0"

View File

@@ -40,12 +40,6 @@ pub struct DatasetConfig {
pub source: HashMap<Label, Source>, pub source: HashMap<Label, Source>,
} }
#[derive(Debug, Clone, Deserialize)]
pub struct S3Credentials {
pub access_key_id: String,
pub secret_access_key: String,
}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
@@ -64,30 +58,6 @@ pub enum Source {
#[serde(default)] #[serde(default)]
pattern: GroupPattern, pattern: GroupPattern,
}, },
/// An S3-compatible object store bucket
S3 {
/// If false, ignore this dataset
#[serde(default = "default_true")]
enabled: bool,
bucket: String,
prefix: Option<String>,
/// Custom endpoint URL (for MinIO, etc.)
endpoint: Option<String>,
region: String,
credentials: S3Credentials,
/// How to group files into items in this source
#[serde(default)]
pattern: GroupPattern,
/// If provided, assume objects are encrypted with this secret key.
encryption_key: Option<String>,
},
} }
// //

View File

@@ -5,7 +5,7 @@ use pile_config::{
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{ use pile_value::{
extract::traits::ExtractState, extract::traits::ExtractState,
source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest, string_to_key}, source::{DataSource, DirDataSource, misc::path_ts_earliest},
value::{Item, PileValue}, value::{Item, PileValue},
}; };
use serde_json::Value; use serde_json::Value;
@@ -33,31 +33,27 @@ pub enum DatasetError {
// MARK: Dataset enum // MARK: Dataset enum
// //
/// An opened data source — either a local filesystem directory or an S3 bucket. /// An opened data source
pub enum Dataset { pub enum Dataset {
Dir(Arc<DirDataSource>), Dir(Arc<DirDataSource>),
S3(Arc<S3DataSource>),
} }
impl Dataset { impl Dataset {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
match self { match self {
Self::Dir(ds) => ds.len(), Self::Dir(ds) => ds.len(),
Self::S3(ds) => ds.len(),
} }
} }
pub async fn get(&self, key: &str) -> Option<Item> { pub async fn get(&self, key: &str) -> Option<Item> {
match self { match self {
Self::Dir(ds) => ds.get(key).await.ok().flatten(), Self::Dir(ds) => ds.get(key).await.ok().flatten(),
Self::S3(ds) => ds.get(key).await.ok().flatten(),
} }
} }
pub fn iter(&self) -> Box<dyn Iterator<Item = &Item> + Send + '_> { pub fn iter(&self) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
match self { match self {
Self::Dir(ds) => Box::new(ds.iter()), Self::Dir(ds) => Box::new(ds.iter()),
Self::S3(ds) => Box::new(ds.iter()),
} }
} }
@@ -68,14 +64,12 @@ impl Dataset {
) -> Box<dyn Iterator<Item = &Item> + Send + '_> { ) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
match self { match self {
Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)), Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)),
Self::S3(ds) => Box::new(ds.iter_page(offset, limit)),
} }
} }
pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> { pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
match self { match self {
Self::Dir(ds) => ds.latest_change().await, Self::Dir(ds) => ds.latest_change().await,
Self::S3(ds) => ds.latest_change().await,
} }
} }
} }
@@ -148,46 +142,6 @@ impl Datasets {
), ),
); );
} }
Source::S3 {
enabled,
bucket,
prefix,
endpoint,
region,
credentials,
pattern,
encryption_key,
} => {
let target = match enabled {
true => &mut sources,
false => &mut disabled_sources,
};
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
match S3DataSource::new(
label,
bucket,
prefix.as_ref().map(|x| x.as_str()),
endpoint.as_ref().map(|x| x.as_str()),
region,
&credentials.access_key_id,
&credentials.secret_access_key,
10_000_000,
pattern.clone(),
encryption_key,
)
.await
{
Ok(ds) => {
target.insert(label.clone(), Dataset::S3(ds));
}
Err(err) => {
warn!("Could not open S3 source {label}: {err}");
}
}
}
} }
} }
@@ -259,46 +213,6 @@ impl Datasets {
), ),
); );
} }
Source::S3 {
enabled,
bucket,
prefix,
endpoint,
region,
credentials,
pattern,
encryption_key,
} => {
let target = match enabled {
true => &mut sources,
false => &mut disabled_sources,
};
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
match S3DataSource::new(
label,
bucket,
prefix.as_ref().map(|x| x.as_str()),
endpoint.as_ref().map(|x| x.as_str()),
region,
&credentials.access_key_id,
&credentials.secret_access_key,
10_000_000,
pattern.clone(),
encryption_key,
)
.await
{
Ok(ds) => {
target.insert(label.clone(), Dataset::S3(ds));
}
Err(err) => {
warn!("Could not open S3 source {label}: {err}");
}
}
}
} }
} }

View File

@@ -9,7 +9,3 @@ workspace = true
[dependencies] [dependencies]
tokio = { workspace = true } tokio = { workspace = true }
smartstring = { workspace = true }
aws-sdk-s3 = { workspace = true }
chacha20poly1305 = { workspace = true }
binrw = { workspace = true }

View File

@@ -1,50 +0,0 @@
use binrw::{binrw, meta::ReadMagic};
#[binrw]
#[brw(little, magic = b"PileChaChav1")]
#[derive(Debug, Clone, Copy)]
pub struct ChaChaHeaderv1 {
pub config: ChaChaConfigv1,
pub plaintext_size: u64,
}
impl ChaChaHeaderv1 {
pub const SIZE: usize = ChaChaHeaderv1::MAGIC.len() + std::mem::size_of::<ChaChaConfigv1>() + 8;
}
#[test]
fn chachaheader_size() {
assert_eq!(
ChaChaHeaderv1::SIZE,
std::mem::size_of::<ChaChaHeaderv1>() - ChaChaHeaderv1::MAGIC.len()
)
}
//
// MARK: config
//
#[binrw]
#[brw(little)]
#[derive(Debug, Clone, Copy)]
pub struct ChaChaConfigv1 {
pub chunk_size: u64,
pub nonce_size: u64,
pub tag_size: u64,
}
impl Default for ChaChaConfigv1 {
fn default() -> Self {
Self {
chunk_size: 64 * 1024,
nonce_size: 24,
tag_size: 16,
}
}
}
impl ChaChaConfigv1 {
pub(crate) fn enc_chunk_size(&self) -> u64 {
self.chunk_size + self.nonce_size + self.tag_size
}
}

View File

@@ -1,9 +0,0 @@
mod reader;
mod reader_async;
mod writer;
mod writer_async;
pub use {reader::*, reader_async::*, writer::*, writer_async::*};
mod format;
pub use format::*;

View File

@@ -1,151 +0,0 @@
use std::io::{Read, Seek, SeekFrom};
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
//
// MARK: reader
//
pub struct ChaChaReaderv1<R: Read + Seek> {
inner: R,
header: ChaChaHeaderv1,
data_offset: u64,
encryption_key: [u8; 32],
cursor: u64,
plaintext_size: u64,
cached_chunk: Option<(u64, Vec<u8>)>,
}
impl<R: Read + Seek> ChaChaReaderv1<R> {
pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
use binrw::BinReaderExt;
inner.seek(SeekFrom::Start(0))?;
let header: ChaChaHeaderv1 = inner.read_le().map_err(std::io::Error::other)?;
let data_offset = inner.stream_position()?;
Ok(Self {
inner,
header,
data_offset,
encryption_key,
cursor: 0,
plaintext_size: header.plaintext_size,
cached_chunk: None,
})
}
fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
self.inner.seek(SeekFrom::Start(enc_start))?;
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
let n = self.read_exact_or_eof(&mut encrypted)?;
encrypted.truncate(n);
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
{
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"encrypted chunk too short",
));
}
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
let nonce = XNonce::from_slice(nonce_bytes);
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
let cipher = XChaCha20Poly1305::new(key);
let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed")
})?;
self.cached_chunk = Some((chunk_index, plaintext));
Ok(())
}
fn read_exact_or_eof(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let mut total = 0;
while total < buf.len() {
match self.inner.read(&mut buf[total..])? {
0 => break,
n => total += n,
}
}
Ok(total)
}
}
impl<R: Read + Seek + Send> AsyncReader for ChaChaReaderv1<R> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let remaining = self.plaintext_size.saturating_sub(self.cursor);
if remaining == 0 || buf.is_empty() {
return Ok(0);
}
let chunk_index = self.cursor / self.header.config.chunk_size;
let need_fetch = match &self.cached_chunk {
None => true,
Some((idx, _)) => *idx != chunk_index,
};
if need_fetch {
self.fetch_chunk(chunk_index)?;
}
#[expect(clippy::unwrap_used)]
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
let available = chunk_data.len() - offset_in_chunk;
let to_copy = available.min(buf.len());
buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]);
self.cursor += to_copy as u64;
Ok(to_copy)
}
}
impl<R: Read + Seek + Send> AsyncSeekReader for ChaChaReaderv1<R> {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.plaintext_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.plaintext_size - abs;
} else {
self.cursor = self.plaintext_size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.plaintext_size);
Ok(self.cursor)
}
}

View File

@@ -1,165 +0,0 @@
use std::io::SeekFrom;
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
pub struct ChaChaReaderv1Async<R: AsyncSeekReader> {
inner: R,
header: ChaChaHeaderv1,
data_offset: u64,
encryption_key: [u8; 32],
cursor: u64,
plaintext_size: u64,
cached_chunk: Option<(u64, Vec<u8>)>,
}
impl<R: AsyncSeekReader> ChaChaReaderv1Async<R> {
pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
use binrw::BinReaderExt;
use std::io::Cursor;
inner.seek(SeekFrom::Start(0)).await?;
let mut buf = [0u8; ChaChaHeaderv1::SIZE];
read_exact(&mut inner, &mut buf).await?;
let header: ChaChaHeaderv1 = Cursor::new(&buf[..])
.read_le()
.map_err(std::io::Error::other)?;
Ok(Self {
inner,
header,
data_offset: buf.len() as u64,
encryption_key,
cursor: 0,
plaintext_size: header.plaintext_size,
cached_chunk: None,
})
}
async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
self.inner.seek(SeekFrom::Start(enc_start)).await?;
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?;
encrypted.truncate(n);
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
{
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"encrypted chunk too short",
));
}
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
let nonce = XNonce::from_slice(nonce_bytes);
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
let cipher = XChaCha20Poly1305::new(key);
let plaintext = cipher.decrypt(nonce, ciphertext).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decryption failed")
})?;
self.cached_chunk = Some((chunk_index, plaintext));
Ok(())
}
}
async fn read_exact<R: AsyncReader>(inner: &mut R, buf: &mut [u8]) -> Result<(), std::io::Error> {
let n = read_exact_or_eof(inner, buf).await?;
if n < buf.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"unexpected EOF reading header",
));
}
Ok(())
}
async fn read_exact_or_eof<R: AsyncReader>(
inner: &mut R,
buf: &mut [u8],
) -> Result<usize, std::io::Error> {
let mut total = 0;
while total < buf.len() {
match inner.read(&mut buf[total..]).await? {
0 => break,
n => total += n,
}
}
Ok(total)
}
impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderv1Async<R> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let remaining = self.plaintext_size.saturating_sub(self.cursor);
if remaining == 0 || buf.is_empty() {
return Ok(0);
}
let chunk_index = self.cursor / self.header.config.chunk_size;
let need_fetch = match &self.cached_chunk {
None => true,
Some((idx, _)) => *idx != chunk_index,
};
if need_fetch {
self.fetch_chunk(chunk_index).await?;
}
#[expect(clippy::unwrap_used)]
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
let available = chunk_data.len() - offset_in_chunk;
let to_copy = available.min(buf.len());
buf[..to_copy].copy_from_slice(&chunk_data[offset_in_chunk..offset_in_chunk + to_copy]);
self.cursor += to_copy as u64;
Ok(to_copy)
}
}
impl<R: AsyncSeekReader> AsyncSeekReader for ChaChaReaderv1Async<R> {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.plaintext_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.plaintext_size - abs;
} else {
self.cursor = self.plaintext_size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.plaintext_size);
Ok(self.cursor)
}
}

View File

@@ -1,91 +0,0 @@
use std::io::SeekFrom;
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
inner: W,
header: ChaChaHeaderv1,
encryption_key: [u8; 32],
buffer: Vec<u8>,
plaintext_bytes_written: u64,
}
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
let header = ChaChaHeaderv1 {
config: ChaChaConfigv1::default(),
plaintext_size: 0,
};
inner.write_all(&serialize_header(header)?).await?;
Ok(Self {
inner,
header,
encryption_key,
buffer: Vec::new(),
plaintext_bytes_written: 0,
})
}
pub async fn write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
self.buffer.extend_from_slice(buf);
self.plaintext_bytes_written += buf.len() as u64;
let chunk_size = self.header.config.chunk_size as usize;
while self.buffer.len() >= chunk_size {
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?;
self.inner.write_all(&encrypted).await?;
self.buffer.drain(..chunk_size);
}
Ok(())
}
/// Encrypt and write any buffered plaintext, patch the header with the
/// final `plaintext_size`, then return the inner writer.
pub async fn finish(mut self) -> Result<W, std::io::Error> {
if !self.buffer.is_empty() {
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer)?;
self.inner.write_all(&encrypted).await?;
}
self.inner.seek(SeekFrom::Start(0)).await?;
let header_bytes = serialize_header(ChaChaHeaderv1 {
config: self.header.config,
plaintext_size: self.plaintext_bytes_written,
})?;
self.inner.write_all(&header_bytes).await?;
Ok(self.inner)
}
}
fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>, std::io::Error> {
use chacha20poly1305::{
XChaCha20Poly1305,
aead::{Aead, AeadCore, KeyInit, OsRng},
};
let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
let cipher = XChaCha20Poly1305::new(chacha20poly1305::Key::from_slice(key));
let ciphertext = cipher
.encrypt(&nonce, plaintext)
.map_err(|_| std::io::Error::other("encryption failed"))?;
let mut output = Vec::with_capacity(nonce.len() + ciphertext.len());
output.extend_from_slice(&nonce);
output.extend_from_slice(&ciphertext);
Ok(output)
}
fn serialize_header(header: ChaChaHeaderv1) -> Result<Vec<u8>, std::io::Error> {
use binrw::BinWriterExt;
use std::io::Cursor;
let mut buf = Cursor::new(Vec::new());
buf.write_le(&header).map_err(std::io::Error::other)?;
Ok(buf.into_inner())
}

View File

@@ -1,260 +0,0 @@
use std::io::{Seek, SeekFrom, Write};
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`].
pub fn generate_key() -> [u8; 32] {
use chacha20poly1305::aead::OsRng;
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
XChaCha20Poly1305::generate_key(&mut OsRng).into()
}
pub struct ChaChaWriterv1<W: Write + Seek> {
inner: W,
header: ChaChaHeaderv1,
encryption_key: [u8; 32],
buffer: Vec<u8>,
plaintext_bytes_written: u64,
}
impl<W: Write + Seek> ChaChaWriterv1<W> {
pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
use binrw::BinWriterExt;
let header = ChaChaHeaderv1 {
config: ChaChaConfigv1::default(),
plaintext_size: 0,
};
inner.write_le(&header).map_err(std::io::Error::other)?;
Ok(Self {
inner,
header,
encryption_key,
buffer: Vec::new(),
plaintext_bytes_written: 0,
})
}
/// Encrypt and write any buffered plaintext, patch the header with the
/// final `plaintext_size`, then return the inner writer.
pub fn finish(mut self) -> Result<W, std::io::Error> {
use binrw::BinWriterExt;
self.flush_buffer()?;
self.inner.seek(SeekFrom::Start(0))?;
let header = ChaChaHeaderv1 {
config: self.header.config,
plaintext_size: self.plaintext_bytes_written,
};
self.inner
.write_le(&header)
.map_err(std::io::Error::other)?;
Ok(self.inner)
}
fn encrypt_chunk(&self, plaintext: &[u8]) -> Result<Vec<u8>, std::io::Error> {
use chacha20poly1305::{
XChaCha20Poly1305,
aead::{Aead, AeadCore, KeyInit, OsRng},
};
let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
let cipher = XChaCha20Poly1305::new(key);
let ciphertext = cipher
.encrypt(&nonce, plaintext)
.map_err(|_| std::io::Error::other("encryption failed"))?;
let mut output = Vec::with_capacity(nonce.len() + ciphertext.len());
output.extend_from_slice(&nonce);
output.extend_from_slice(&ciphertext);
Ok(output)
}
fn flush_buffer(&mut self) -> Result<(), std::io::Error> {
if !self.buffer.is_empty() {
let encrypted = self.encrypt_chunk(&self.buffer)?;
self.inner.write_all(&encrypted)?;
self.buffer.clear();
}
Ok(())
}
}
impl<W: Write + Seek> Write for ChaChaWriterv1<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.buffer.extend_from_slice(buf);
self.plaintext_bytes_written += buf.len() as u64;
let chunk_size = self.header.config.chunk_size as usize;
while self.buffer.len() >= chunk_size {
let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?;
self.inner.write_all(&encrypted)?;
self.buffer.drain(..chunk_size);
}
Ok(buf.len())
}
/// Encrypts and flushes any buffered plaintext as a partial chunk.
///
/// Prefer [`finish`](Self::finish) to retrieve the inner writer after
/// all data has been written. Calling `flush` multiple times will produce
/// multiple small encrypted chunks for the same partial data.
fn flush(&mut self) -> Result<(), std::io::Error> {
self.flush_buffer()?;
self.inner.flush()
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::io::{Cursor, SeekFrom, Write};
use super::ChaChaWriterv1;
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaReaderv1};
const KEY: [u8; 32] = [42u8; 32];
fn encrypt(data: &[u8]) -> Cursor<Vec<u8>> {
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
writer.write_all(data).unwrap();
let mut buf = writer.finish().unwrap();
buf.set_position(0);
buf
}
async fn decrypt_all(buf: Cursor<Vec<u8>>) -> Vec<u8> {
let mut reader = ChaChaReaderv1::new(buf, KEY).unwrap();
reader.read_to_end().await.unwrap()
}
#[tokio::test]
async fn roundtrip_empty() {
let buf = encrypt(&[]);
// Header present but no chunks
assert!(!buf.get_ref().is_empty());
assert!(decrypt_all(buf).await.is_empty());
}
#[tokio::test]
async fn roundtrip_small() {
let data = b"hello, world!";
assert_eq!(decrypt_all(encrypt(data)).await, data);
}
#[tokio::test]
async fn roundtrip_exact_chunk() {
let data = vec![0xABu8; 65536];
assert_eq!(decrypt_all(encrypt(&data)).await, data);
}
#[tokio::test]
async fn roundtrip_multi_chunk() {
// 2.5 chunks
let data: Vec<u8> = (0u8..=255).cycle().take(65536 * 2 + 1000).collect();
assert_eq!(decrypt_all(encrypt(&data)).await, data);
}
#[tokio::test]
async fn roundtrip_incremental_writes() {
// Write one byte at a time
let data: Vec<u8> = (0u8..200).collect();
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
for byte in &data {
writer.write_all(&[*byte]).unwrap();
}
let mut buf = writer.finish().unwrap();
buf.set_position(0);
assert_eq!(decrypt_all(buf).await, data);
}
#[tokio::test]
async fn wrong_key_fails() {
let buf = encrypt(b"secret data");
let mut reader = ChaChaReaderv1::new(buf, [0u8; 32]).unwrap();
assert!(reader.read_to_end().await.is_err());
}
#[tokio::test]
async fn header_magic_checked() {
// Corrupt the magic bytes — reader should fail
let mut buf = encrypt(b"data");
buf.get_mut()[0] = 0xFF;
buf.set_position(0);
assert!(ChaChaReaderv1::new(buf, KEY).is_err());
}
#[tokio::test]
async fn seek_from_start() {
let data: Vec<u8> = (0u8..100).collect();
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
reader.seek(SeekFrom::Start(50)).await.unwrap();
let mut buf = [0u8; 10];
let mut read = 0;
while read < buf.len() {
read += reader.read(&mut buf[read..]).await.unwrap();
}
assert_eq!(buf, data[50..60]);
}
#[tokio::test]
async fn seek_from_end() {
let data: Vec<u8> = (0u8..100).collect();
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
reader.seek(SeekFrom::End(-10)).await.unwrap();
assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]);
}
#[tokio::test]
async fn seek_across_chunk_boundary() {
// Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1
let data: Vec<u8> = (0u8..=255).cycle().take(65536 + 500).collect();
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
reader.seek(SeekFrom::Start(65530)).await.unwrap();
let mut buf = vec![0u8; 12];
let mut read = 0;
while read < buf.len() {
read += reader.read(&mut buf[read..]).await.unwrap();
}
assert_eq!(buf, data[65530..65542]);
}
#[tokio::test]
async fn seek_current() {
let data: Vec<u8> = (0u8..=255).cycle().take(200).collect();
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
// Read 10, seek back 5, read 5 — should get bytes 5..10
let mut first = [0u8; 10];
let mut n = 0;
while n < first.len() {
n += reader.read(&mut first[n..]).await.unwrap();
}
reader.seek(SeekFrom::Current(-5)).await.unwrap();
let mut second = [0u8; 5];
n = 0;
while n < second.len() {
n += reader.read(&mut second[n..]).await.unwrap();
}
assert_eq!(second, data[5..10]);
}
#[tokio::test]
async fn seek_past_end_clamps() {
let data = b"hello";
let mut reader = ChaChaReaderv1::new(encrypt(data), KEY).unwrap();
let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap();
assert_eq!(pos, data.len() as u64);
assert_eq!(reader.read_to_end().await.unwrap(), b"");
}
}

View File

@@ -1,7 +1,2 @@
mod asyncreader; mod asyncreader;
pub use asyncreader::*; pub use asyncreader::*;
mod s3reader;
pub use s3reader::*;
pub mod chacha;

View File

@@ -1,181 +0,0 @@
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use smartstring::{LazyCompact, SmartString};
use std::{fmt::Debug, io::SeekFrom, sync::Arc};
use crate::{AsyncReader, AsyncSeekReader};
//
// MARK: client
//
/// An interface to an S3 bucket.
///
/// TODO: S3 is slow and expensive. Ideally, we'll have this struct cache data
/// so we don't have to download anything twice. This is, however, complicated,
/// and doesn't fully solve the "expensive" problem.
pub struct S3Client {
pub client: aws_sdk_s3::Client,
bucket: SmartString<LazyCompact>,
/// maximum number of bytes to use for cached data
cache_limit_bytes: usize,
}
impl Debug for S3Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Client")
.field("bucket", &self.bucket)
.field("cache_limit_bytes", &self.cache_limit_bytes)
.finish()
}
}
impl S3Client {
pub async fn new(
bucket: &str,
endpoint: Option<&str>,
region: &str,
access_key_id: &str,
secret_access_key: &str,
cache_limit_bytes: usize,
) -> Arc<Self> {
let client = {
let mut s3_config = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())
.region(Region::new(region.to_owned()))
.credentials_provider(Credentials::new(
access_key_id,
secret_access_key,
None,
None,
"pile",
));
if let Some(ep) = endpoint {
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
}
aws_sdk_s3::Client::from_conf(s3_config.build())
};
return Arc::new(Self {
bucket: bucket.into(),
client,
cache_limit_bytes,
});
}
pub fn bucket(&self) -> &str {
&self.bucket
}
pub async fn get(self: &Arc<Self>, key: &str) -> Result<S3Reader, std::io::Error> {
let head = self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(key)
.send()
.await
.map_err(std::io::Error::other)?;
let size = head.content_length().unwrap_or(0) as u64;
Ok(S3Reader {
client: self.clone(),
bucket: self.bucket.clone(),
key: key.into(),
cursor: 0,
size,
})
}
}
//
// MARK: reader
//
pub struct S3Reader {
pub client: Arc<S3Client>,
pub bucket: SmartString<LazyCompact>,
pub key: SmartString<LazyCompact>,
pub cursor: u64,
pub size: u64,
}
impl AsyncReader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
let start_byte = self.cursor;
let len_to_read = (buf.len() as u64).min(len_left);
let end_byte = start_byte + len_to_read - 1;
let resp = self
.client
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
.map_err(std::io::Error::other)?;
let bytes = resp
.body
.collect()
.await
.map(|x| x.into_bytes())
.map_err(std::io::Error::other)?;
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
self.cursor += n as u64;
Ok(n)
}
}
impl AsyncSeekReader for S3Reader {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
std::io::SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.size - abs;
} else {
self.cursor = self.size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.size);
Ok(self.cursor)
}
}

View File

@@ -21,8 +21,6 @@ toml = { workspace = true }
smartstring = { workspace = true } smartstring = { workspace = true }
regex = { workspace = true } regex = { workspace = true }
blake3 = { workspace = true } blake3 = { workspace = true }
chacha20poly1305 = { workspace = true }
base64 = { workspace = true }
epub = { workspace = true } epub = { workspace = true }
kamadak-exif = { workspace = true } kamadak-exif = { workspace = true }
pdf = { workspace = true } pdf = { workspace = true }

View File

@@ -1,9 +1,6 @@
mod dir; mod dir;
pub use dir::*; pub use dir::*;
mod s3;
pub use s3::*;
pub mod misc; pub mod misc;
/// A read-only set of [Item]s. /// A read-only set of [Item]s.

View File

@@ -1,322 +0,0 @@
use chrono::{DateTime, Utc};
use pile_config::{
Label,
pattern::{GroupPattern, GroupSegment},
};
use pile_io::S3Client;
use smartstring::{LazyCompact, SmartString};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{Arc, OnceLock},
};
use crate::{
extract::traits::ExtractState,
source::DataSource,
value::{Item, PileValue},
};
#[derive(Debug)]
pub struct S3DataSource {
pub name: Label,
pub client: Arc<S3Client>,
pub prefix: Option<SmartString<LazyCompact>>,
pub pattern: GroupPattern,
pub encryption_key: Option<[u8; 32]>,
pub index: OnceLock<BTreeMap<SmartString<LazyCompact>, Item>>,
}
impl S3DataSource {
pub async fn new(
name: &Label,
bucket: &str,
prefix: Option<&str>,
endpoint: Option<&str>,
region: &str,
access_key_id: &str,
secret_access_key: &str,
cache_limit_bytes: usize,
pattern: GroupPattern,
encryption_key: Option<[u8; 32]>,
) -> Result<Arc<Self>, std::io::Error> {
let client = S3Client::new(
bucket,
endpoint,
region,
access_key_id,
secret_access_key,
cache_limit_bytes,
)
.await;
let source = Arc::new(Self {
name: name.clone(),
client,
prefix: prefix.map(|x| x.into()),
pattern,
encryption_key,
index: OnceLock::new(),
});
//
// MARK: list keys
//
let mut all_keys: HashSet<SmartString<LazyCompact>> = HashSet::new();
let mut continuation_token: Option<String> = None;
loop {
let mut req = source
.client
.client
.list_objects_v2()
.bucket(source.client.bucket());
if let Some(prefix) = &source.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = req.send().await.map_err(std::io::Error::other)?;
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
let Some(full_key) = obj.key() else { continue };
let raw_key = strip_prefix(full_key, source.prefix.as_deref());
let key = match &source.encryption_key {
None => raw_key.into(),
Some(enc_key) => match decrypt_path(enc_key, raw_key) {
Some(decrypted) => decrypted.into(),
None => continue,
},
};
all_keys.insert(key);
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
//
// MARK: resolve groups
//
let mut keys_grouped: HashSet<SmartString<LazyCompact>> = HashSet::new();
for key in &all_keys {
let groups = resolve_groups(&source.pattern, key).await;
for group_key in groups.into_values() {
if all_keys.contains(&group_key) {
keys_grouped.insert(group_key);
}
}
}
let mut index = BTreeMap::new();
for key in all_keys.difference(&keys_grouped) {
let groups = resolve_groups(&source.pattern, key).await;
let group = groups
.into_iter()
.filter(|(_, gk)| all_keys.contains(gk))
.map(|(label, gk)| {
(
label,
Box::new(Item::S3 {
source: Arc::clone(&source),
mime: mime_guess::from_path(gk.as_str()).first_or_octet_stream(),
key: gk,
group: Arc::new(HashMap::new()),
}),
)
})
.collect::<HashMap<_, _>>();
let item = Item::S3 {
source: Arc::clone(&source),
mime: mime_guess::from_path(key.as_str()).first_or_octet_stream(),
key: key.clone(),
group: Arc::new(group),
};
index.insert(item.key(), item);
}
source.index.get_or_init(|| index);
Ok(source)
}
}
impl DataSource for Arc<S3DataSource> {
#[expect(clippy::expect_used)]
fn len(&self) -> usize {
self.index.get().expect("index should be initialized").len()
}
#[expect(clippy::expect_used)]
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
return Ok(self
.index
.get()
.expect("index should be initialized")
.get(key)
.cloned());
}
#[expect(clippy::expect_used)]
fn iter(&self) -> impl Iterator<Item = &Item> {
self.index
.get()
.expect("index should be initialized")
.values()
}
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut ts: Option<DateTime<Utc>> = None;
let mut continuation_token: Option<String> = None;
loop {
let mut req = self
.client
.client
.list_objects_v2()
.bucket(self.client.bucket());
if let Some(prefix) = &self.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = match req.send().await {
Err(_) => return Ok(None),
Ok(resp) => resp,
};
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
if let Some(last_modified) = obj.last_modified() {
let dt = DateTime::from_timestamp(
last_modified.secs(),
last_modified.subsec_nanos(),
);
if let Some(dt) = dt {
ts = Some(match ts {
None => dt,
Some(prev) => prev.max(dt),
});
}
}
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
Ok(ts)
}
}
/// Derive an encryption key from a password
pub fn string_to_key(password: &str) -> [u8; 32] {
blake3::derive_key("pile s3 encryption", password.as_bytes())
}
/// Encrypt a logical path to a base64 S3 key using a deterministic nonce.
pub fn encrypt_path(enc_key: &[u8; 32], path: &str) -> String {
use base64::Engine;
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
let hash = blake3::keyed_hash(enc_key, path.as_bytes());
let nonce_bytes = &hash.as_bytes()[..24];
let nonce = XNonce::from_slice(nonce_bytes);
let key = chacha20poly1305::Key::from_slice(enc_key);
let cipher = XChaCha20Poly1305::new(key);
#[expect(clippy::expect_used)]
let ciphertext = cipher
.encrypt(nonce, path.as_bytes())
.expect("path encryption should not fail");
let mut result = nonce_bytes.to_vec();
result.extend_from_slice(&ciphertext);
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(result)
}
/// Decrypt a base64 S3 key back to its logical path.
fn decrypt_path(enc_key: &[u8; 32], encrypted: &str) -> Option<String> {
use base64::Engine;
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(encrypted)
.ok()?;
if bytes.len() < 24 + 16 {
return None;
}
let (nonce_bytes, ciphertext) = bytes.split_at(24);
let nonce = XNonce::from_slice(nonce_bytes);
let key = chacha20poly1305::Key::from_slice(enc_key);
let cipher = XChaCha20Poly1305::new(key);
let plaintext = cipher.decrypt(nonce, ciphertext).ok()?;
String::from_utf8(plaintext).ok()
}
fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str {
match prefix {
None => key,
Some(p) => {
let with_slash = if p.ends_with('/') {
key.strip_prefix(p)
} else {
key.strip_prefix(&format!("{p}/"))
};
with_slash.unwrap_or(key)
}
}
}
async fn resolve_groups(
pattern: &GroupPattern,
key: &str,
) -> HashMap<Label, SmartString<LazyCompact>> {
let state = ExtractState { ignore_mime: false };
let mut group = HashMap::new();
'pattern: for (l, pat) in &pattern.pattern {
let item = PileValue::String(Arc::new(key.into()));
let mut target = String::new();
for p in pat {
match p {
GroupSegment::Literal(x) => target.push_str(x),
GroupSegment::Path(op) => {
let res = match item.query(&state, op).await {
Ok(Some(x)) => x,
_ => continue 'pattern,
};
let res = match res.as_str() {
Some(x) => x,
None => continue 'pattern,
};
target.push_str(res);
}
}
}
group.insert(l.clone(), target.into());
}
return group;
}

View File

@@ -1,158 +0,0 @@
use aws_sdk_s3::{error::SdkError, operation::get_object::GetObjectError};
use mime::Mime;
use std::io::{Error as IoError, Seek, SeekFrom, Write};
use thiserror::Error;
use super::S3Client;
use crate::retry;
#[derive(Debug, Error)]
#[expect(clippy::large_enum_variant)]
pub enum S3ReaderError {
#[error("sdk error")]
SdkError(#[from] SdkError<GetObjectError>),
#[error("byte stream error")]
ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError),
#[error("i/o error")]
IoError(#[from] IoError),
}
/// Provides a [`std::io::Read`]-like interface to an S3 object. \
/// This doesn't actually implement [`std::io::Read`] because Read isn't async.
///
/// Also implements [`std::io::Seek`]
pub struct S3Reader {
pub(super) client: S3Client,
pub(super) bucket: String,
pub(super) key: String,
pub(super) cursor: u64,
pub(super) size: u64,
pub(super) mime: Mime,
}
impl S3Reader {
pub async fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, S3ReaderError> {
let len_left = self.size - self.cursor;
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
#[expect(clippy::unwrap_used)] // TODO: probably fits?
let start_byte = usize::try_from(self.cursor).unwrap();
#[expect(clippy::unwrap_used)] // usize fits in u64
let len_to_read = u64::try_from(buf.len()).unwrap().min(len_left);
#[expect(clippy::unwrap_used)] // must fit, we called min()
let len_to_read = usize::try_from(len_to_read).unwrap();
let end_byte = start_byte + len_to_read - 1;
let b = retry!(
self.client.retries,
self.client
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
)?;
// Looks like `bytes 31000000-31999999/33921176``
// println!("{:?}", b.content_range);
let mut bytes = b.body.collect().await?.into_bytes();
bytes.truncate(len_to_read);
let l = bytes.len();
// Memory to memory writes are infallible
#[expect(clippy::unwrap_used)]
buf.write_all(&bytes).unwrap();
// Cannot fail, usize should always fit into u64
#[expect(clippy::unwrap_used)]
{
self.cursor += u64::try_from(l).unwrap();
}
return Ok(len_to_read);
}
pub fn is_done(&self) -> bool {
return self.cursor == self.size;
}
pub fn mime(&self) -> &Mime {
&self.mime
}
/// Write the entire contents of this reader to `r`.
///
/// This method always downloads the whole object,
/// and always preserves `self.cursor`.
pub async fn download<W: Write>(&mut self, r: &mut W) -> Result<(), S3ReaderError> {
let pos = self.stream_position()?;
const BUF_LEN: usize = 10_000_000;
#[expect(clippy::unwrap_used)] // Cannot fail
let mut buf: Box<[u8; BUF_LEN]> = vec![0u8; BUF_LEN].try_into().unwrap();
while !self.is_done() {
let b = self.read(&mut buf[..]).await?;
r.write_all(&buf[0..b])?;
}
self.seek(SeekFrom::Start(pos))?;
Ok(())
}
}
impl Seek for S3Reader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size - 1),
// Cannot panic, we handle all cases
#[expect(clippy::unwrap_used)]
SeekFrom::Current(x) => {
if x < 0 {
if u64::try_from(x.abs()).unwrap() > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= u64::try_from(x.abs()).unwrap();
} else {
self.cursor += u64::try_from(x).unwrap();
}
}
// Cannot panic, we handle all cases
#[expect(clippy::unwrap_used)]
SeekFrom::End(x) => {
if x < 0 {
if u64::try_from(x.abs()).unwrap() > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
// Cannot fail, is abs
self.cursor = self.size - u64::try_from(x.abs()).unwrap();
} else {
// Cannot fail, is positive
self.cursor = self.size + u64::try_from(x).unwrap();
}
}
}
self.cursor = self.cursor.min(self.size - 1);
return Ok(self.cursor);
}
}

View File

@@ -1,13 +1,10 @@
use mime::Mime; use mime::Mime;
use pile_config::Label; use pile_config::Label;
use pile_io::{SyncReadBridge, chacha::ChaChaReaderv1Async}; use pile_io::SyncReadBridge;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
use crate::{ use crate::{source::DirDataSource, value::ItemReader};
source::{DirDataSource, S3DataSource, encrypt_path},
value::ItemReader,
};
// //
// MARK: item // MARK: item
@@ -23,58 +20,19 @@ pub enum Item {
path: PathBuf, path: PathBuf,
group: Arc<HashMap<Label, Box<Item>>>, group: Arc<HashMap<Label, Box<Item>>>,
}, },
S3 {
source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>,
group: Arc<HashMap<Label, Box<Item>>>,
},
} }
impl Item { impl Item {
/// Open the item for reading. For S3, performs a HEAD request to determine /// Open the item for reading.
/// the object size.
pub async fn read(&self) -> Result<ItemReader, std::io::Error> { pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
Ok(match self { Ok(match self {
Self::File { path, .. } => ItemReader::File(File::open(path)?), Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let logical_key = key.as_str();
let s3_key_part: SmartString<LazyCompact> = match &source.encryption_key {
None => logical_key.into(),
Some(enc_key) => encrypt_path(enc_key, logical_key).into(),
};
let full_key: SmartString<LazyCompact> = match &source.prefix {
None => s3_key_part,
Some(p) => {
if p.ends_with('/') {
format!("{p}{s3_key_part}").into()
} else {
format!("{p}/{s3_key_part}").into()
}
}
};
let reader = source.client.get(&full_key).await?;
match source.encryption_key {
None => ItemReader::S3(reader),
Some(enc_key) => {
ItemReader::EncryptedS3(ChaChaReaderv1Async::new(reader, enc_key).await?)
}
}
}
}) })
} }
pub fn source_name(&self) -> &pile_config::Label { pub fn source_name(&self) -> &pile_config::Label {
match self { match self {
Self::File { source, .. } => &source.name, Self::File { source, .. } => &source.name,
Self::S3 { source, .. } => &source.name,
} }
} }
@@ -87,7 +45,6 @@ impl Item {
.to_str() .to_str()
.expect("path is not utf-8") .expect("path is not utf-8")
.into(), .into(),
Self::S3 { key, .. } => key.clone(),
} }
} }
@@ -106,14 +63,12 @@ impl Item {
pub fn mime(&self) -> &Mime { pub fn mime(&self) -> &Mime {
match self { match self {
Self::File { mime, .. } => mime, Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
} }
} }
pub fn group(&self) -> &HashMap<Label, Box<Self>> { pub fn group(&self) -> &HashMap<Label, Box<Self>> {
match self { match self {
Self::File { group, .. } => group, Self::File { group, .. } => group,
Self::S3 { group, .. } => group,
} }
} }
} }

View File

@@ -1,4 +1,4 @@
use pile_io::{AsyncReader, AsyncSeekReader, S3Reader, chacha::ChaChaReaderv1Async}; use pile_io::{AsyncReader, AsyncSeekReader};
use std::{fs::File, io::Seek}; use std::{fs::File, io::Seek};
// //
@@ -7,16 +7,12 @@ use std::{fs::File, io::Seek};
pub enum ItemReader { pub enum ItemReader {
File(File), File(File),
S3(S3Reader),
EncryptedS3(ChaChaReaderv1Async<S3Reader>),
} }
impl AsyncReader for ItemReader { impl AsyncReader for ItemReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self { match self {
Self::File(x) => std::io::Read::read(x, buf), Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await,
Self::EncryptedS3(x) => x.read(buf).await,
} }
} }
} }
@@ -25,8 +21,6 @@ impl AsyncSeekReader for ItemReader {
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> { async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match self { match self {
Self::File(x) => x.seek(pos), Self::File(x) => x.seek(pos),
Self::S3(x) => x.seek(pos).await,
Self::EncryptedS3(x) => x.seek(pos).await,
} }
} }
} }

View File

@@ -12,10 +12,7 @@ pile-toolbox = { workspace = true }
pile-dataset = { workspace = true, features = ["axum", "pdfium"] } pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
pile-value = { workspace = true, features = ["pdfium"] } pile-value = { workspace = true, features = ["pdfium"] }
pile-config = { workspace = true } pile-config = { workspace = true }
pile-io = { workspace = true }
aws-sdk-s3 = { workspace = true }
bytes = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }

View File

@@ -1,5 +1,4 @@
use anstyle::{AnsiColor, Color, Style}; use anstyle::{AnsiColor, Color, Style};
use indicatif::ProgressStyle;
pub fn clap_styles() -> clap::builder::Styles { pub fn clap_styles() -> clap::builder::Styles {
clap::builder::Styles::styled() clap::builder::Styles::styled()
@@ -37,6 +36,7 @@ pub fn clap_styles() -> clap::builder::Styles {
.placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White)))) .placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White))))
} }
/*
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
pub fn progress_big() -> ProgressStyle { pub fn progress_big() -> ProgressStyle {
return ProgressStyle::default_bar() return ProgressStyle::default_bar()
@@ -50,7 +50,6 @@ pub fn progress_big() -> ProgressStyle {
]); ]);
} }
/*
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
pub fn spinner_small() -> ProgressStyle { pub fn spinner_small() -> ProgressStyle {
return ProgressStyle::default_bar() return ProgressStyle::default_bar()

View File

@@ -1,75 +0,0 @@
use anyhow::{Context, Result};
use clap::Args;
use pile_io::AsyncReader;
use pile_io::chacha::{ChaChaReaderv1, ChaChaWriterv1};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::source::string_to_key;
use std::io::{Cursor, Write};
use std::path::PathBuf;
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct EncryptCommand {
/// File to encrypt
path: PathBuf,
/// Encryption password
password: String,
}
#[derive(Debug, Args)]
pub struct DecryptCommand {
/// File to decrypt
path: PathBuf,
/// Encryption password
password: String,
}
impl CliCmd for EncryptCommand {
async fn run(
self,
_ctx: GlobalContext,
_flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let key = string_to_key(&self.password);
let plaintext = tokio::fs::read(&self.path)
.await
.with_context(|| format!("while reading '{}'", self.path.display()))?;
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), key)
.context("while initializing encryptor")?;
writer.write_all(&plaintext).context("while encrypting")?;
let buf = writer.finish().context("while finalizing encryptor")?;
std::io::stdout()
.write_all(buf.get_ref())
.context("while writing to stdout")?;
Ok(0)
}
}
impl CliCmd for DecryptCommand {
async fn run(
self,
_ctx: GlobalContext,
_flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let key = string_to_key(&self.password);
let ciphertext = tokio::fs::read(&self.path)
.await
.with_context(|| format!("while reading '{}'", self.path.display()))?;
let mut reader = ChaChaReaderv1::new(Cursor::new(ciphertext), key)
.context("while initializing decryptor")?;
let plaintext = reader.read_to_end().await.context("while decrypting")?;
std::io::stdout()
.write_all(&plaintext)
.context("while writing to stdout")?;
Ok(0)
}
}

View File

@@ -5,7 +5,6 @@ use pile_toolbox::cancelabletask::{
}; };
mod check; mod check;
mod encrypt;
mod fields; mod fields;
mod index; mod index;
mod init; mod init;
@@ -15,7 +14,6 @@ mod lookup;
mod probe; mod probe;
mod serve; mod serve;
mod server; mod server;
mod upload;
use crate::{Cli, GlobalContext}; use crate::{Cli, GlobalContext};
@@ -85,24 +83,6 @@ pub enum SubCommand {
#[command(flatten)] #[command(flatten)]
cmd: server::ServerCommand, cmd: server::ServerCommand,
}, },
/// Upload a filesystem source to an S3 source
Upload {
#[command(flatten)]
cmd: upload::UploadCommand,
},
/// Encrypt a file to stdout
Encrypt {
#[command(flatten)]
cmd: encrypt::EncryptCommand,
},
/// Decrypt a file to stdout
Decrypt {
#[command(flatten)]
cmd: encrypt::DecryptCommand,
},
} }
impl CliCmdDispatch for SubCommand { impl CliCmdDispatch for SubCommand {
@@ -118,9 +98,6 @@ impl CliCmdDispatch for SubCommand {
Self::Item { cmd } => cmd.start(ctx), Self::Item { cmd } => cmd.start(ctx),
Self::Serve { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx),
Self::Server { cmd } => cmd.start(ctx), Self::Server { cmd } => cmd.start(ctx),
Self::Upload { cmd } => cmd.start(ctx),
Self::Encrypt { cmd } => cmd.start(ctx),
Self::Decrypt { cmd } => cmd.start(ctx),
Self::Docs {} => { Self::Docs {} => {
print_help_recursively(&mut Cli::command(), None); print_help_recursively(&mut Cli::command(), None);

View File

@@ -1,284 +0,0 @@
use anyhow::{Context, Result};
use aws_sdk_s3::primitives::ByteStream;
use clap::Args;
use indicatif::ProgressBar;
use pile_config::Label;
use pile_dataset::{Dataset, Datasets};
use pile_io::chacha::ChaChaWriterv1;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path};
use std::{
io::{Cursor, Write},
path::PathBuf,
sync::Arc,
time::Duration,
};
use tokio::task::JoinSet;
use tracing::info;
use crate::{CliCmd, GlobalContext, cli::progress_big};
#[derive(Debug, Args)]
pub struct UploadCommand {
/// Name of the filesystem source to upload from
dir_source: String,
/// Name of the S3 source to upload to
s3_source: String,
/// Prefix path under the S3 source to upload files to
prefix: String,
/// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf,
/// Override the S3 bucket from pile.toml
#[arg(long)]
bucket: Option<String>,
/// Allow overwriting files that already exist at the target prefix
#[arg(long)]
overwrite: bool,
/// Delete all files at the target prefix before uploading
#[arg(long)]
delete_existing_forever: bool,
/// Number of parallel upload jobs
#[arg(long, short = 'j', default_value = "5")]
jobs: usize,
}
impl CliCmd for UploadCommand {
async fn run(
self,
ctx: GlobalContext,
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let dir_label = Label::new(&self.dir_source)
.ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.dir_source))?;
let s3_label = Label::new(&self.s3_source)
.ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.s3_source))?;
let dir_ds: Arc<DirDataSource> = get_dir_source(&ds, &dir_label, &self.dir_source)?;
let s3_ds: Arc<S3DataSource> = get_s3_source(&ds, &s3_label, &self.s3_source)?;
let bucket = self
.bucket
.as_deref()
.unwrap_or(s3_ds.client.bucket())
.to_owned();
let full_prefix = self.prefix.trim_matches('/').to_owned();
// Check for existing objects at the target prefix
let existing_keys = list_prefix(&s3_ds.client.client, &bucket, &full_prefix)
.await
.context("while checking for existing objects at target prefix")?;
if !existing_keys.is_empty() {
if self.delete_existing_forever {
info!(
"Deleting {} existing object(s) at '{}'",
existing_keys.len(),
full_prefix
);
for key in &existing_keys {
s3_ds
.client
.client
.delete_object()
.bucket(&bucket)
.key(key)
.send()
.await
.with_context(|| format!("while deleting existing object '{key}'"))?;
}
} else if !self.overwrite {
return Err(anyhow::anyhow!(
"{} file(s) already exist at '{}'. \
Pass --overwrite to allow overwriting, \
or --delete-existing-forever to delete them first.",
existing_keys.len(),
full_prefix
)
.into());
}
}
// Count total files before uploading so we can show accurate progress
let total = dir_ds.iter().count() as u64;
// Walk filesystem source and upload files in parallel
let jobs = self.jobs.max(1);
let mut uploaded: u64 = 0;
let mut stream = dir_ds.iter();
let mut join_set: JoinSet<Result<String, anyhow::Error>> = JoinSet::new();
let pb = ctx.mp.add(ProgressBar::new(total));
pb.set_style(progress_big());
pb.enable_steady_tick(Duration::from_millis(100));
pb.set_message(full_prefix.clone());
loop {
// Drain completed tasks before checking for cancellation or new work
while join_set.len() >= jobs {
match join_set.join_next().await {
Some(Ok(Ok(key))) => {
info!("Uploaded {key}");
pb.set_message(key);
pb.inc(1);
uploaded += 1;
}
Some(Ok(Err(e))) => return Err(e.into()),
Some(Err(e)) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()),
None => break,
}
}
if flag.is_cancelled() {
join_set.abort_all();
return Err(CancelableTaskError::Cancelled);
}
let item = match stream.next() {
None => break,
Some(item) => item.clone(),
};
let relative_str = item.key().to_string();
let item_path = dir_ds.dir.join(&relative_str);
let enc_key_part = match s3_ds.encryption_key {
None => relative_str.clone(),
Some(ref enc_key) => encrypt_path(enc_key, &relative_str),
};
let key = format!("{full_prefix}/{enc_key_part}");
let mime = item.mime().to_string();
let client = Arc::clone(&s3_ds.client);
let bucket = bucket.clone();
let encryption_key = s3_ds.encryption_key;
join_set.spawn(async move {
let body = if let Some(enc_key) = encryption_key {
let path = item_path.clone();
let encrypted =
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
let plaintext = std::fs::read(&path)
.with_context(|| format!("while opening '{}'", path.display()))?;
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), enc_key)
.context("while initializing encryptor")?;
writer.write_all(&plaintext).context("while encrypting")?;
Ok(writer.finish().context("while finalizing")?.into_inner())
})
.await
.context("encryptor task panicked")??;
ByteStream::from(bytes::Bytes::from(encrypted))
} else {
ByteStream::from_path(&item_path)
.await
.with_context(|| format!("while opening '{}'", item_path.display()))?
};
client
.client
.put_object()
.bucket(&bucket)
.key(&key)
.content_type(&mime)
.body(body)
.send()
.await
.with_context(|| {
format!("while uploading '{}' to '{key}'", item_path.display())
})?;
Ok(key)
});
}
// Drain remaining tasks
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(key)) => {
info!("Uploaded {key}");
pb.set_message(key);
pb.inc(1);
uploaded += 1;
}
Ok(Err(e)) => return Err(e.into()),
Err(e) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()),
}
}
pb.finish_and_clear();
info!("Done: uploaded {uploaded} file(s) to '{full_prefix}'");
Ok(0)
}
}
fn get_dir_source(
ds: &Datasets,
label: &Label,
name: &str,
) -> Result<Arc<DirDataSource>, anyhow::Error> {
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
Some(_) => Err(anyhow::anyhow!(
"source '{name}' is not a filesystem source"
)),
None => Err(anyhow::anyhow!(
"filesystem source '{name}' not found in config"
)),
}
}
fn get_s3_source(
ds: &Datasets,
label: &Label,
name: &str,
) -> Result<Arc<S3DataSource>, anyhow::Error> {
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),
}
}
/// List all S3 object keys under the given prefix.
async fn list_prefix(
client: &aws_sdk_s3::Client,
bucket: &str,
prefix: &str,
) -> Result<Vec<String>> {
let mut keys = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let mut req = client.list_objects_v2().bucket(bucket).prefix(prefix);
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = req.send().await.context("list_objects_v2 failed")?;
for obj in resp.contents() {
if let Some(k) = obj.key() {
keys.push(k.to_owned());
}
}
if !resp.is_truncated().unwrap_or(false) {
break;
}
continuation_token = resp.next_continuation_token().map(ToOwned::to_owned);
}
Ok(keys)
}