Reorganize S3 clients
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2653,7 +2653,6 @@ version = "0.0.2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"aws-sdk-s3",
|
|
||||||
"base64",
|
"base64",
|
||||||
"blake3",
|
"blake3",
|
||||||
"chacha20poly1305",
|
"chacha20poly1305",
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ pub struct Datasets {
|
|||||||
|
|
||||||
pub config: ConfigToml,
|
pub config: ConfigToml,
|
||||||
pub sources: HashMap<Label, Dataset>,
|
pub sources: HashMap<Label, Dataset>,
|
||||||
|
pub disabled_sources: HashMap<Label, Dataset>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Datasets {
|
impl Datasets {
|
||||||
@@ -114,6 +115,8 @@ impl Datasets {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut sources = HashMap::new();
|
let mut sources = HashMap::new();
|
||||||
|
let mut disabled_sources = HashMap::new();
|
||||||
|
|
||||||
for (label, source) in &config.dataset.source {
|
for (label, source) in &config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem {
|
Source::Filesystem {
|
||||||
@@ -121,11 +124,12 @@ impl Datasets {
|
|||||||
path,
|
path,
|
||||||
pattern,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
sources.insert(
|
target.insert(
|
||||||
label.clone(),
|
label.clone(),
|
||||||
Dataset::Dir(
|
Dataset::Dir(
|
||||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||||
@@ -144,26 +148,29 @@ impl Datasets {
|
|||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
||||||
|
|
||||||
match S3DataSource::new(
|
match S3DataSource::new(
|
||||||
label,
|
label,
|
||||||
bucket.clone(),
|
bucket,
|
||||||
prefix.clone(),
|
prefix.as_ref().map(|x| x.as_str()),
|
||||||
endpoint.clone(),
|
endpoint.as_ref().map(|x| x.as_str()),
|
||||||
region.clone(),
|
region,
|
||||||
credentials,
|
&credentials.access_key_id,
|
||||||
|
&credentials.secret_access_key,
|
||||||
|
10_000_000,
|
||||||
pattern.clone(),
|
pattern.clone(),
|
||||||
encryption_key,
|
encryption_key,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(ds) => {
|
Ok(ds) => {
|
||||||
sources.insert(label.clone(), Dataset::S3(ds));
|
target.insert(label.clone(), Dataset::S3(ds));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Could not open S3 source {label}: {err}");
|
warn!("Could not open S3 source {label}: {err}");
|
||||||
@@ -179,6 +186,7 @@ impl Datasets {
|
|||||||
path_parent,
|
path_parent,
|
||||||
config,
|
config,
|
||||||
sources,
|
sources,
|
||||||
|
disabled_sources,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -219,6 +227,7 @@ impl Datasets {
|
|||||||
.join(config.dataset.name.as_str());
|
.join(config.dataset.name.as_str());
|
||||||
|
|
||||||
let mut sources = HashMap::new();
|
let mut sources = HashMap::new();
|
||||||
|
let mut disabled_sources = HashMap::new();
|
||||||
for (label, source) in &config.dataset.source {
|
for (label, source) in &config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem {
|
Source::Filesystem {
|
||||||
@@ -226,11 +235,12 @@ impl Datasets {
|
|||||||
path,
|
path,
|
||||||
pattern,
|
pattern,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
sources.insert(
|
target.insert(
|
||||||
label.clone(),
|
label.clone(),
|
||||||
Dataset::Dir(
|
Dataset::Dir(
|
||||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||||
@@ -249,26 +259,29 @@ impl Datasets {
|
|||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
} => {
|
} => {
|
||||||
if !enabled {
|
let target = match enabled {
|
||||||
continue;
|
true => &mut sources,
|
||||||
}
|
false => &mut disabled_sources,
|
||||||
|
};
|
||||||
|
|
||||||
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x));
|
||||||
|
|
||||||
match S3DataSource::new(
|
match S3DataSource::new(
|
||||||
label,
|
label,
|
||||||
bucket.clone(),
|
bucket,
|
||||||
prefix.clone(),
|
prefix.as_ref().map(|x| x.as_str()),
|
||||||
endpoint.clone(),
|
endpoint.as_ref().map(|x| x.as_str()),
|
||||||
region.clone(),
|
region,
|
||||||
credentials,
|
&credentials.access_key_id,
|
||||||
|
&credentials.secret_access_key,
|
||||||
|
10_000_000,
|
||||||
pattern.clone(),
|
pattern.clone(),
|
||||||
encryption_key,
|
encryption_key,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(ds) => {
|
Ok(ds) => {
|
||||||
sources.insert(label.clone(), Dataset::S3(ds));
|
target.insert(label.clone(), Dataset::S3(ds));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Could not open S3 source {label}: {err}");
|
warn!("Could not open S3 source {label}: {err}");
|
||||||
@@ -284,6 +297,7 @@ impl Datasets {
|
|||||||
path_parent,
|
path_parent,
|
||||||
config,
|
config,
|
||||||
sources,
|
sources,
|
||||||
|
disabled_sources,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
47
crates/pile-io/src/chacha/format.rs
Normal file
47
crates/pile-io/src/chacha/format.rs
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
use binrw::{binrw, meta::ReadMagic};
|
||||||
|
|
||||||
|
#[binrw]
|
||||||
|
#[brw(little, magic = b"PileChaChav1")]
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ChaChaHeaderv1 {
|
||||||
|
pub config: ChaChaConfigv1,
|
||||||
|
pub plaintext_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChaChaHeaderv1 {
|
||||||
|
pub const SIZE: usize = ChaChaHeaderv1::MAGIC.len() + std::mem::size_of::<ChaChaConfigv1>() + 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn chachaheader_size() {
|
||||||
|
assert_eq!(ChaChaHeaderv1::SIZE, std::mem::size_of::<ChaChaHeaderv1>())
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: config
|
||||||
|
//
|
||||||
|
|
||||||
|
#[binrw]
|
||||||
|
#[brw(little)]
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ChaChaConfigv1 {
|
||||||
|
pub chunk_size: u64,
|
||||||
|
pub nonce_size: u64,
|
||||||
|
pub tag_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ChaChaConfigv1 {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
chunk_size: 64 * 1024,
|
||||||
|
nonce_size: 24,
|
||||||
|
tag_size: 16,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChaChaConfigv1 {
|
||||||
|
pub(crate) fn enc_chunk_size(&self) -> u64 {
|
||||||
|
self.chunk_size + self.nonce_size + self.tag_size
|
||||||
|
}
|
||||||
|
}
|
||||||
9
crates/pile-io/src/chacha/mod.rs
Normal file
9
crates/pile-io/src/chacha/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
mod reader;
|
||||||
|
mod reader_async;
|
||||||
|
mod writer;
|
||||||
|
mod writer_async;
|
||||||
|
|
||||||
|
pub use {reader::*, reader_async::*, writer::*, writer_async::*};
|
||||||
|
|
||||||
|
mod format;
|
||||||
|
pub use format::*;
|
||||||
@@ -1,70 +1,15 @@
|
|||||||
use std::io::{Read, Seek, SeekFrom};
|
use std::io::{Read, Seek, SeekFrom};
|
||||||
|
|
||||||
use binrw::binrw;
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader};
|
|
||||||
|
|
||||||
//
|
|
||||||
// MARK: header
|
|
||||||
//
|
|
||||||
|
|
||||||
/// Serialized size of [`ChaChaHeader`] in bytes: 12 magic + 3×8 config + 8 plaintext_size.
|
|
||||||
pub const HEADER_SIZE: usize = 44;
|
|
||||||
|
|
||||||
#[binrw]
|
|
||||||
#[brw(little, magic = b"PileChaChav1")]
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct ChaChaHeader {
|
|
||||||
pub chunk_size: u64,
|
|
||||||
pub nonce_size: u64,
|
|
||||||
pub tag_size: u64,
|
|
||||||
pub plaintext_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// MARK: config
|
|
||||||
//
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct ChaChaReaderConfig {
|
|
||||||
pub chunk_size: u64,
|
|
||||||
pub nonce_size: u64,
|
|
||||||
pub tag_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ChaChaReaderConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
chunk_size: 1_048_576, // 1MiB
|
|
||||||
nonce_size: 24,
|
|
||||||
tag_size: 16,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ChaChaReaderConfig {
|
|
||||||
pub(crate) fn enc_chunk_size(&self) -> u64 {
|
|
||||||
self.chunk_size + self.nonce_size + self.tag_size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ChaChaHeader> for ChaChaReaderConfig {
|
|
||||||
fn from(h: ChaChaHeader) -> Self {
|
|
||||||
Self {
|
|
||||||
chunk_size: h.chunk_size,
|
|
||||||
nonce_size: h.nonce_size,
|
|
||||||
tag_size: h.tag_size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// MARK: reader
|
// MARK: reader
|
||||||
//
|
//
|
||||||
|
|
||||||
pub struct ChaChaReader<R: Read + Seek> {
|
pub struct ChaChaReaderv1<R: Read + Seek> {
|
||||||
inner: R,
|
inner: R,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
data_offset: u64,
|
data_offset: u64,
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
cursor: u64,
|
cursor: u64,
|
||||||
@@ -72,17 +17,17 @@ pub struct ChaChaReader<R: Read + Seek> {
|
|||||||
cached_chunk: Option<(u64, Vec<u8>)>,
|
cached_chunk: Option<(u64, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek> ChaChaReader<R> {
|
impl<R: Read + Seek> ChaChaReaderv1<R> {
|
||||||
pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinReaderExt;
|
use binrw::BinReaderExt;
|
||||||
|
|
||||||
inner.seek(SeekFrom::Start(0))?;
|
inner.seek(SeekFrom::Start(0))?;
|
||||||
let header: ChaChaHeader = inner.read_le().map_err(std::io::Error::other)?;
|
let header: ChaChaHeaderv1 = inner.read_le().map_err(std::io::Error::other)?;
|
||||||
let data_offset = inner.stream_position()?;
|
let data_offset = inner.stream_position()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config: header.into(),
|
header,
|
||||||
data_offset,
|
data_offset,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
cursor: 0,
|
cursor: 0,
|
||||||
@@ -94,21 +39,22 @@ impl<R: Read + Seek> ChaChaReader<R> {
|
|||||||
fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
||||||
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
||||||
|
|
||||||
let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size();
|
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
|
||||||
self.inner.seek(SeekFrom::Start(enc_start))?;
|
self.inner.seek(SeekFrom::Start(enc_start))?;
|
||||||
|
|
||||||
let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize];
|
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
|
||||||
let n = self.read_exact_or_eof(&mut encrypted)?;
|
let n = self.read_exact_or_eof(&mut encrypted)?;
|
||||||
encrypted.truncate(n);
|
encrypted.truncate(n);
|
||||||
|
|
||||||
if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize {
|
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
|
||||||
|
{
|
||||||
return Err(std::io::Error::new(
|
return Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::InvalidData,
|
std::io::ErrorKind::InvalidData,
|
||||||
"encrypted chunk too short",
|
"encrypted chunk too short",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize);
|
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
|
||||||
let nonce = XNonce::from_slice(nonce_bytes);
|
let nonce = XNonce::from_slice(nonce_bytes);
|
||||||
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
||||||
let cipher = XChaCha20Poly1305::new(key);
|
let cipher = XChaCha20Poly1305::new(key);
|
||||||
@@ -132,14 +78,14 @@ impl<R: Read + Seek> ChaChaReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
impl<R: Read + Seek + Send> AsyncReader for ChaChaReaderv1<R> {
|
||||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
||||||
if remaining == 0 || buf.is_empty() {
|
if remaining == 0 || buf.is_empty() {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_index = self.cursor / self.config.chunk_size;
|
let chunk_index = self.cursor / self.header.config.chunk_size;
|
||||||
|
|
||||||
let need_fetch = match &self.cached_chunk {
|
let need_fetch = match &self.cached_chunk {
|
||||||
None => true,
|
None => true,
|
||||||
@@ -153,7 +99,7 @@ impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
|||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
||||||
|
|
||||||
let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize;
|
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
|
||||||
let available = chunk_data.len() - offset_in_chunk;
|
let available = chunk_data.len() - offset_in_chunk;
|
||||||
let to_copy = available.min(buf.len());
|
let to_copy = available.min(buf.len());
|
||||||
|
|
||||||
@@ -163,7 +109,7 @@ impl<R: Read + Seek + Send> AsyncReader for ChaChaReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Seek + Send> AsyncSeekReader for ChaChaReader<R> {
|
impl<R: Read + Seek + Send> AsyncSeekReader for ChaChaReaderv1<R> {
|
||||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
||||||
@@ -1,10 +1,11 @@
|
|||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader, ChaChaHeader, ChaChaReaderConfig, HEADER_SIZE};
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaHeaderv1};
|
||||||
|
|
||||||
pub struct ChaChaReaderAsync<R: AsyncSeekReader> {
|
pub struct ChaChaReaderv1Async<R: AsyncSeekReader> {
|
||||||
inner: R,
|
inner: R,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
data_offset: u64,
|
data_offset: u64,
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
cursor: u64,
|
cursor: u64,
|
||||||
@@ -12,22 +13,22 @@ pub struct ChaChaReaderAsync<R: AsyncSeekReader> {
|
|||||||
cached_chunk: Option<(u64, Vec<u8>)>,
|
cached_chunk: Option<(u64, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> ChaChaReaderv1Async<R> {
|
||||||
pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub async fn new(mut inner: R, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinReaderExt;
|
use binrw::BinReaderExt;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
inner.seek(SeekFrom::Start(0)).await?;
|
inner.seek(SeekFrom::Start(0)).await?;
|
||||||
let mut buf = [0u8; HEADER_SIZE];
|
let mut buf = [0u8; ChaChaHeaderv1::SIZE];
|
||||||
read_exact(&mut inner, &mut buf).await?;
|
read_exact(&mut inner, &mut buf).await?;
|
||||||
let header: ChaChaHeader = Cursor::new(&buf[..])
|
let header: ChaChaHeaderv1 = Cursor::new(&buf[..])
|
||||||
.read_le()
|
.read_le()
|
||||||
.map_err(std::io::Error::other)?;
|
.map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config: header.into(),
|
header,
|
||||||
data_offset: HEADER_SIZE as u64,
|
data_offset: buf.len() as u64,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
cursor: 0,
|
cursor: 0,
|
||||||
plaintext_size: header.plaintext_size,
|
plaintext_size: header.plaintext_size,
|
||||||
@@ -38,21 +39,22 @@ impl<R: AsyncSeekReader> ChaChaReaderAsync<R> {
|
|||||||
async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
async fn fetch_chunk(&mut self, chunk_index: u64) -> Result<(), std::io::Error> {
|
||||||
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce, aead::Aead};
|
||||||
|
|
||||||
let enc_start = self.data_offset + chunk_index * self.config.enc_chunk_size();
|
let enc_start = self.data_offset + chunk_index * self.header.config.enc_chunk_size();
|
||||||
self.inner.seek(SeekFrom::Start(enc_start)).await?;
|
self.inner.seek(SeekFrom::Start(enc_start)).await?;
|
||||||
|
|
||||||
let mut encrypted = vec![0u8; self.config.enc_chunk_size() as usize];
|
let mut encrypted = vec![0u8; self.header.config.enc_chunk_size() as usize];
|
||||||
let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?;
|
let n = read_exact_or_eof(&mut self.inner, &mut encrypted).await?;
|
||||||
encrypted.truncate(n);
|
encrypted.truncate(n);
|
||||||
|
|
||||||
if encrypted.len() < (self.config.nonce_size + self.config.tag_size) as usize {
|
if encrypted.len() < (self.header.config.nonce_size + self.header.config.tag_size) as usize
|
||||||
|
{
|
||||||
return Err(std::io::Error::new(
|
return Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::InvalidData,
|
std::io::ErrorKind::InvalidData,
|
||||||
"encrypted chunk too short",
|
"encrypted chunk too short",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (nonce_bytes, ciphertext) = encrypted.split_at(self.config.nonce_size as usize);
|
let (nonce_bytes, ciphertext) = encrypted.split_at(self.header.config.nonce_size as usize);
|
||||||
let nonce = XNonce::from_slice(nonce_bytes);
|
let nonce = XNonce::from_slice(nonce_bytes);
|
||||||
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
let key = chacha20poly1305::Key::from_slice(&self.encryption_key);
|
||||||
let cipher = XChaCha20Poly1305::new(key);
|
let cipher = XChaCha20Poly1305::new(key);
|
||||||
@@ -90,14 +92,14 @@ async fn read_exact_or_eof<R: AsyncReader>(
|
|||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderv1Async<R> {
|
||||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
let remaining = self.plaintext_size.saturating_sub(self.cursor);
|
||||||
if remaining == 0 || buf.is_empty() {
|
if remaining == 0 || buf.is_empty() {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_index = self.cursor / self.config.chunk_size;
|
let chunk_index = self.cursor / self.header.config.chunk_size;
|
||||||
|
|
||||||
let need_fetch = match &self.cached_chunk {
|
let need_fetch = match &self.cached_chunk {
|
||||||
None => true,
|
None => true,
|
||||||
@@ -111,7 +113,7 @@ impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
|||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
let (_, chunk_data) = self.cached_chunk.as_ref().unwrap();
|
||||||
|
|
||||||
let offset_in_chunk = (self.cursor % self.config.chunk_size) as usize;
|
let offset_in_chunk = (self.cursor % self.header.config.chunk_size) as usize;
|
||||||
let available = chunk_data.len() - offset_in_chunk;
|
let available = chunk_data.len() - offset_in_chunk;
|
||||||
let to_copy = available.min(buf.len());
|
let to_copy = available.min(buf.len());
|
||||||
|
|
||||||
@@ -121,7 +123,7 @@ impl<R: AsyncSeekReader> AsyncReader for ChaChaReaderAsync<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncSeekReader> AsyncSeekReader for ChaChaReaderAsync<R> {
|
impl<R: AsyncSeekReader> AsyncSeekReader for ChaChaReaderv1Async<R> {
|
||||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
SeekFrom::Start(x) => self.cursor = x.min(self.plaintext_size),
|
||||||
@@ -2,11 +2,12 @@ use std::io::SeekFrom;
|
|||||||
|
|
||||||
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
use crate::{ChaChaHeader, ChaChaReaderConfig};
|
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
|
||||||
|
|
||||||
pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
||||||
inner: W,
|
inner: W,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
plaintext_bytes_written: u64,
|
plaintext_bytes_written: u64,
|
||||||
@@ -14,18 +15,15 @@ pub struct ChaChaWriterAsync<W: AsyncWrite + AsyncSeek + Unpin + Send> {
|
|||||||
|
|
||||||
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
||||||
pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub async fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
let config = ChaChaReaderConfig::default();
|
let header = ChaChaHeaderv1 {
|
||||||
let header_bytes = serialize_header(ChaChaHeader {
|
config: ChaChaConfigv1::default(),
|
||||||
chunk_size: config.chunk_size,
|
|
||||||
nonce_size: config.nonce_size,
|
|
||||||
tag_size: config.tag_size,
|
|
||||||
plaintext_size: 0,
|
plaintext_size: 0,
|
||||||
})?;
|
};
|
||||||
inner.write_all(&header_bytes).await?;
|
inner.write_all(&serialize_header(header)?).await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config,
|
header,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
plaintext_bytes_written: 0,
|
plaintext_bytes_written: 0,
|
||||||
@@ -36,7 +34,7 @@ impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
|||||||
self.buffer.extend_from_slice(buf);
|
self.buffer.extend_from_slice(buf);
|
||||||
self.plaintext_bytes_written += buf.len() as u64;
|
self.plaintext_bytes_written += buf.len() as u64;
|
||||||
|
|
||||||
let chunk_size = self.config.chunk_size as usize;
|
let chunk_size = self.header.config.chunk_size as usize;
|
||||||
while self.buffer.len() >= chunk_size {
|
while self.buffer.len() >= chunk_size {
|
||||||
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?;
|
let encrypted = encrypt_chunk(&self.encryption_key, &self.buffer[..chunk_size])?;
|
||||||
self.inner.write_all(&encrypted).await?;
|
self.inner.write_all(&encrypted).await?;
|
||||||
@@ -55,10 +53,8 @@ impl<W: AsyncWrite + AsyncSeek + Unpin + Send> ChaChaWriterAsync<W> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.inner.seek(SeekFrom::Start(0)).await?;
|
self.inner.seek(SeekFrom::Start(0)).await?;
|
||||||
let header_bytes = serialize_header(ChaChaHeader {
|
let header_bytes = serialize_header(ChaChaHeaderv1 {
|
||||||
chunk_size: self.config.chunk_size,
|
config: self.header.config,
|
||||||
nonce_size: self.config.nonce_size,
|
|
||||||
tag_size: self.config.tag_size,
|
|
||||||
plaintext_size: self.plaintext_bytes_written,
|
plaintext_size: self.plaintext_bytes_written,
|
||||||
})?;
|
})?;
|
||||||
self.inner.write_all(&header_bytes).await?;
|
self.inner.write_all(&header_bytes).await?;
|
||||||
@@ -85,7 +81,7 @@ fn encrypt_chunk(key: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>, std::io::E
|
|||||||
Ok(output)
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_header(header: ChaChaHeader) -> Result<Vec<u8>, std::io::Error> {
|
fn serialize_header(header: ChaChaHeaderv1) -> Result<Vec<u8>, std::io::Error> {
|
||||||
use binrw::BinWriterExt;
|
use binrw::BinWriterExt;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::io::{Seek, SeekFrom, Write};
|
use std::io::{Seek, SeekFrom, Write};
|
||||||
|
|
||||||
use crate::{ChaChaHeader, ChaChaReaderConfig};
|
use crate::chacha::{ChaChaConfigv1, ChaChaHeaderv1};
|
||||||
|
|
||||||
/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`].
|
/// Generate a random 32-byte encryption key suitable for use with [`ChaChaWriter`].
|
||||||
pub fn generate_key() -> [u8; 32] {
|
pub fn generate_key() -> [u8; 32] {
|
||||||
@@ -9,30 +9,28 @@ pub fn generate_key() -> [u8; 32] {
|
|||||||
XChaCha20Poly1305::generate_key(&mut OsRng).into()
|
XChaCha20Poly1305::generate_key(&mut OsRng).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ChaChaWriter<W: Write + Seek> {
|
pub struct ChaChaWriterv1<W: Write + Seek> {
|
||||||
inner: W,
|
inner: W,
|
||||||
config: ChaChaReaderConfig,
|
header: ChaChaHeaderv1,
|
||||||
|
|
||||||
encryption_key: [u8; 32],
|
encryption_key: [u8; 32],
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
plaintext_bytes_written: u64,
|
plaintext_bytes_written: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: Write + Seek> ChaChaWriter<W> {
|
impl<W: Write + Seek> ChaChaWriterv1<W> {
|
||||||
pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
pub fn new(mut inner: W, encryption_key: [u8; 32]) -> Result<Self, std::io::Error> {
|
||||||
use binrw::BinWriterExt;
|
use binrw::BinWriterExt;
|
||||||
|
|
||||||
let config = ChaChaReaderConfig::default();
|
let header = ChaChaHeaderv1 {
|
||||||
let header = ChaChaHeader {
|
config: ChaChaConfigv1::default(),
|
||||||
chunk_size: config.chunk_size,
|
|
||||||
nonce_size: config.nonce_size,
|
|
||||||
tag_size: config.tag_size,
|
|
||||||
plaintext_size: 0,
|
plaintext_size: 0,
|
||||||
};
|
};
|
||||||
inner.write_le(&header).map_err(std::io::Error::other)?;
|
inner.write_le(&header).map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
config,
|
header,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
plaintext_bytes_written: 0,
|
plaintext_bytes_written: 0,
|
||||||
@@ -47,10 +45,8 @@ impl<W: Write + Seek> ChaChaWriter<W> {
|
|||||||
self.flush_buffer()?;
|
self.flush_buffer()?;
|
||||||
|
|
||||||
self.inner.seek(SeekFrom::Start(0))?;
|
self.inner.seek(SeekFrom::Start(0))?;
|
||||||
let header = ChaChaHeader {
|
let header = ChaChaHeaderv1 {
|
||||||
chunk_size: self.config.chunk_size,
|
config: self.header.config,
|
||||||
nonce_size: self.config.nonce_size,
|
|
||||||
tag_size: self.config.tag_size,
|
|
||||||
plaintext_size: self.plaintext_bytes_written,
|
plaintext_size: self.plaintext_bytes_written,
|
||||||
};
|
};
|
||||||
self.inner
|
self.inner
|
||||||
@@ -89,12 +85,12 @@ impl<W: Write + Seek> ChaChaWriter<W> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: Write + Seek> Write for ChaChaWriter<W> {
|
impl<W: Write + Seek> Write for ChaChaWriterv1<W> {
|
||||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||||
self.buffer.extend_from_slice(buf);
|
self.buffer.extend_from_slice(buf);
|
||||||
self.plaintext_bytes_written += buf.len() as u64;
|
self.plaintext_bytes_written += buf.len() as u64;
|
||||||
|
|
||||||
let chunk_size = self.config.chunk_size as usize;
|
let chunk_size = self.header.config.chunk_size as usize;
|
||||||
while self.buffer.len() >= chunk_size {
|
while self.buffer.len() >= chunk_size {
|
||||||
let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?;
|
let encrypted = self.encrypt_chunk(&self.buffer[..chunk_size])?;
|
||||||
self.inner.write_all(&encrypted)?;
|
self.inner.write_all(&encrypted)?;
|
||||||
@@ -120,13 +116,13 @@ impl<W: Write + Seek> Write for ChaChaWriter<W> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::io::{Cursor, SeekFrom, Write};
|
use std::io::{Cursor, SeekFrom, Write};
|
||||||
|
|
||||||
use super::ChaChaWriter;
|
use super::ChaChaWriterv1;
|
||||||
use crate::{AsyncReader, AsyncSeekReader, ChaChaReader};
|
use crate::{AsyncReader, AsyncSeekReader, chacha::ChaChaReaderv1};
|
||||||
|
|
||||||
const KEY: [u8; 32] = [42u8; 32];
|
const KEY: [u8; 32] = [42u8; 32];
|
||||||
|
|
||||||
fn encrypt(data: &[u8]) -> Cursor<Vec<u8>> {
|
fn encrypt(data: &[u8]) -> Cursor<Vec<u8>> {
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap();
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
|
||||||
writer.write_all(data).unwrap();
|
writer.write_all(data).unwrap();
|
||||||
let mut buf = writer.finish().unwrap();
|
let mut buf = writer.finish().unwrap();
|
||||||
buf.set_position(0);
|
buf.set_position(0);
|
||||||
@@ -134,7 +130,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn decrypt_all(buf: Cursor<Vec<u8>>) -> Vec<u8> {
|
async fn decrypt_all(buf: Cursor<Vec<u8>>) -> Vec<u8> {
|
||||||
let mut reader = ChaChaReader::new(buf, KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(buf, KEY).unwrap();
|
||||||
reader.read_to_end().await.unwrap()
|
reader.read_to_end().await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +165,7 @@ mod tests {
|
|||||||
async fn roundtrip_incremental_writes() {
|
async fn roundtrip_incremental_writes() {
|
||||||
// Write one byte at a time
|
// Write one byte at a time
|
||||||
let data: Vec<u8> = (0u8..200).collect();
|
let data: Vec<u8> = (0u8..200).collect();
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), KEY).unwrap();
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), KEY).unwrap();
|
||||||
for byte in &data {
|
for byte in &data {
|
||||||
writer.write_all(&[*byte]).unwrap();
|
writer.write_all(&[*byte]).unwrap();
|
||||||
}
|
}
|
||||||
@@ -181,7 +177,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn wrong_key_fails() {
|
async fn wrong_key_fails() {
|
||||||
let buf = encrypt(b"secret data");
|
let buf = encrypt(b"secret data");
|
||||||
let mut reader = ChaChaReader::new(buf, [0u8; 32]).unwrap();
|
let mut reader = ChaChaReaderv1::new(buf, [0u8; 32]).unwrap();
|
||||||
assert!(reader.read_to_end().await.is_err());
|
assert!(reader.read_to_end().await.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,13 +187,13 @@ mod tests {
|
|||||||
let mut buf = encrypt(b"data");
|
let mut buf = encrypt(b"data");
|
||||||
buf.get_mut()[0] = 0xFF;
|
buf.get_mut()[0] = 0xFF;
|
||||||
buf.set_position(0);
|
buf.set_position(0);
|
||||||
assert!(ChaChaReader::new(buf, KEY).is_err());
|
assert!(ChaChaReaderv1::new(buf, KEY).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_from_start() {
|
async fn seek_from_start() {
|
||||||
let data: Vec<u8> = (0u8..100).collect();
|
let data: Vec<u8> = (0u8..100).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::Start(50)).await.unwrap();
|
reader.seek(SeekFrom::Start(50)).await.unwrap();
|
||||||
let mut buf = [0u8; 10];
|
let mut buf = [0u8; 10];
|
||||||
@@ -211,7 +207,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_from_end() {
|
async fn seek_from_end() {
|
||||||
let data: Vec<u8> = (0u8..100).collect();
|
let data: Vec<u8> = (0u8..100).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::End(-10)).await.unwrap();
|
reader.seek(SeekFrom::End(-10)).await.unwrap();
|
||||||
assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]);
|
assert_eq!(reader.read_to_end().await.unwrap(), &data[90..]);
|
||||||
@@ -221,7 +217,7 @@ mod tests {
|
|||||||
async fn seek_across_chunk_boundary() {
|
async fn seek_across_chunk_boundary() {
|
||||||
// Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1
|
// Seek to 6 bytes before the end of chunk 0, read 12 bytes spanning into chunk 1
|
||||||
let data: Vec<u8> = (0u8..=255).cycle().take(65536 + 500).collect();
|
let data: Vec<u8> = (0u8..=255).cycle().take(65536 + 500).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
reader.seek(SeekFrom::Start(65530)).await.unwrap();
|
reader.seek(SeekFrom::Start(65530)).await.unwrap();
|
||||||
let mut buf = vec![0u8; 12];
|
let mut buf = vec![0u8; 12];
|
||||||
@@ -235,7 +231,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_current() {
|
async fn seek_current() {
|
||||||
let data: Vec<u8> = (0u8..=255).cycle().take(200).collect();
|
let data: Vec<u8> = (0u8..=255).cycle().take(200).collect();
|
||||||
let mut reader = ChaChaReader::new(encrypt(&data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(&data), KEY).unwrap();
|
||||||
|
|
||||||
// Read 10, seek back 5, read 5 — should get bytes 5..10
|
// Read 10, seek back 5, read 5 — should get bytes 5..10
|
||||||
let mut first = [0u8; 10];
|
let mut first = [0u8; 10];
|
||||||
@@ -255,7 +251,7 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn seek_past_end_clamps() {
|
async fn seek_past_end_clamps() {
|
||||||
let data = b"hello";
|
let data = b"hello";
|
||||||
let mut reader = ChaChaReader::new(encrypt(data), KEY).unwrap();
|
let mut reader = ChaChaReaderv1::new(encrypt(data), KEY).unwrap();
|
||||||
|
|
||||||
let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap();
|
let pos = reader.seek(SeekFrom::Start(9999)).await.unwrap();
|
||||||
assert_eq!(pos, data.len() as u64);
|
assert_eq!(pos, data.len() as u64);
|
||||||
@@ -4,14 +4,4 @@ pub use asyncreader::*;
|
|||||||
mod s3reader;
|
mod s3reader;
|
||||||
pub use s3reader::*;
|
pub use s3reader::*;
|
||||||
|
|
||||||
mod chachareader;
|
pub mod chacha;
|
||||||
pub use chachareader::*;
|
|
||||||
|
|
||||||
mod chachawriter;
|
|
||||||
pub use chachawriter::*;
|
|
||||||
|
|
||||||
mod chachareader_async;
|
|
||||||
pub use chachareader_async::*;
|
|
||||||
|
|
||||||
mod chachawriter_async;
|
|
||||||
pub use chachawriter_async::*;
|
|
||||||
|
|||||||
@@ -1,10 +1,102 @@
|
|||||||
|
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{io::SeekFrom, sync::Arc};
|
use std::{fmt::Debug, io::SeekFrom, sync::Arc};
|
||||||
|
|
||||||
use crate::{AsyncReader, AsyncSeekReader};
|
use crate::{AsyncReader, AsyncSeekReader};
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: client
|
||||||
|
//
|
||||||
|
|
||||||
|
/// An interface to an S3 bucket.
|
||||||
|
///
|
||||||
|
/// TODO: S3 is slow and expensive. Ideally, we'll have this struct cache data
|
||||||
|
/// so we don't have to download anything twice. This is, however, complicated,
|
||||||
|
/// and doesn't fully solve the "expensive" problem.
|
||||||
|
pub struct S3Client {
|
||||||
|
pub client: aws_sdk_s3::Client,
|
||||||
|
bucket: SmartString<LazyCompact>,
|
||||||
|
|
||||||
|
/// maximum number of bytes to use for cached data
|
||||||
|
cache_limit_bytes: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for S3Client {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("S3Client")
|
||||||
|
.field("bucket", &self.bucket)
|
||||||
|
.field("cache_limit_bytes", &self.cache_limit_bytes)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl S3Client {
|
||||||
|
pub async fn new(
|
||||||
|
bucket: &str,
|
||||||
|
endpoint: Option<&str>,
|
||||||
|
region: &str,
|
||||||
|
access_key_id: &str,
|
||||||
|
secret_access_key: &str,
|
||||||
|
cache_limit_bytes: usize,
|
||||||
|
) -> Arc<Self> {
|
||||||
|
let client = {
|
||||||
|
let mut s3_config = aws_sdk_s3::config::Builder::new()
|
||||||
|
.behavior_version(BehaviorVersion::latest())
|
||||||
|
.region(Region::new(region.to_owned()))
|
||||||
|
.credentials_provider(Credentials::new(
|
||||||
|
access_key_id,
|
||||||
|
secret_access_key,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"pile",
|
||||||
|
));
|
||||||
|
|
||||||
|
if let Some(ep) = endpoint {
|
||||||
|
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
aws_sdk_s3::Client::from_conf(s3_config.build())
|
||||||
|
};
|
||||||
|
|
||||||
|
return Arc::new(Self {
|
||||||
|
bucket: bucket.into(),
|
||||||
|
client,
|
||||||
|
cache_limit_bytes,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bucket(&self) -> &str {
|
||||||
|
&self.bucket
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(self: &Arc<Self>, key: &str) -> Result<S3Reader, std::io::Error> {
|
||||||
|
let head = self
|
||||||
|
.client
|
||||||
|
.head_object()
|
||||||
|
.bucket(self.bucket.as_str())
|
||||||
|
.key(key)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(std::io::Error::other)?;
|
||||||
|
|
||||||
|
let size = head.content_length().unwrap_or(0) as u64;
|
||||||
|
|
||||||
|
Ok(S3Reader {
|
||||||
|
client: self.clone(),
|
||||||
|
bucket: self.bucket.clone(),
|
||||||
|
key: key.into(),
|
||||||
|
cursor: 0,
|
||||||
|
size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: reader
|
||||||
|
//
|
||||||
|
|
||||||
pub struct S3Reader {
|
pub struct S3Reader {
|
||||||
pub client: Arc<aws_sdk_s3::Client>,
|
pub client: Arc<S3Client>,
|
||||||
pub bucket: SmartString<LazyCompact>,
|
pub bucket: SmartString<LazyCompact>,
|
||||||
pub key: SmartString<LazyCompact>,
|
pub key: SmartString<LazyCompact>,
|
||||||
pub cursor: u64,
|
pub cursor: u64,
|
||||||
@@ -23,6 +115,7 @@ impl AsyncReader for S3Reader {
|
|||||||
let end_byte = start_byte + len_to_read - 1;
|
let end_byte = start_byte + len_to_read - 1;
|
||||||
|
|
||||||
let resp = self
|
let resp = self
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.get_object()
|
.get_object()
|
||||||
.bucket(self.bucket.as_str())
|
.bucket(self.bucket.as_str())
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ image = { workspace = true, optional = true }
|
|||||||
id3 = { workspace = true }
|
id3 = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
aws-sdk-s3 = { workspace = true }
|
|
||||||
mime = { workspace = true }
|
mime = { workspace = true }
|
||||||
mime_guess = { workspace = true }
|
mime_guess = { workspace = true }
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use pile_config::{
|
use pile_config::{
|
||||||
Label, S3Credentials,
|
Label,
|
||||||
pattern::{GroupPattern, GroupSegment},
|
pattern::{GroupPattern, GroupSegment},
|
||||||
};
|
};
|
||||||
|
use pile_io::S3Client;
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
@@ -19,9 +19,9 @@ use crate::{
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct S3DataSource {
|
pub struct S3DataSource {
|
||||||
pub name: Label,
|
pub name: Label,
|
||||||
pub bucket: SmartString<LazyCompact>,
|
pub client: Arc<S3Client>,
|
||||||
|
|
||||||
pub prefix: Option<SmartString<LazyCompact>>,
|
pub prefix: Option<SmartString<LazyCompact>>,
|
||||||
pub client: Arc<aws_sdk_s3::Client>,
|
|
||||||
pub pattern: GroupPattern,
|
pub pattern: GroupPattern,
|
||||||
pub encryption_key: Option<[u8; 32]>,
|
pub encryption_key: Option<[u8; 32]>,
|
||||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
||||||
@@ -30,40 +30,30 @@ pub struct S3DataSource {
|
|||||||
impl S3DataSource {
|
impl S3DataSource {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
name: &Label,
|
name: &Label,
|
||||||
bucket: String,
|
bucket: &str,
|
||||||
prefix: Option<String>,
|
prefix: Option<&str>,
|
||||||
endpoint: Option<String>,
|
endpoint: Option<&str>,
|
||||||
region: String,
|
region: &str,
|
||||||
credentials: &S3Credentials,
|
access_key_id: &str,
|
||||||
|
secret_access_key: &str,
|
||||||
|
cache_limit_bytes: usize,
|
||||||
pattern: GroupPattern,
|
pattern: GroupPattern,
|
||||||
encryption_key: Option<[u8; 32]>,
|
encryption_key: Option<[u8; 32]>,
|
||||||
) -> Result<Arc<Self>, std::io::Error> {
|
) -> Result<Arc<Self>, std::io::Error> {
|
||||||
let client = {
|
let client = S3Client::new(
|
||||||
let creds = Credentials::new(
|
bucket,
|
||||||
&credentials.access_key_id,
|
endpoint,
|
||||||
&credentials.secret_access_key,
|
region,
|
||||||
None,
|
access_key_id,
|
||||||
None,
|
secret_access_key,
|
||||||
"pile",
|
cache_limit_bytes,
|
||||||
);
|
)
|
||||||
|
.await;
|
||||||
let mut s3_config = aws_sdk_s3::config::Builder::new()
|
|
||||||
.behavior_version(BehaviorVersion::latest())
|
|
||||||
.region(Region::new(region))
|
|
||||||
.credentials_provider(creds);
|
|
||||||
|
|
||||||
if let Some(ep) = endpoint {
|
|
||||||
s3_config = s3_config.endpoint_url(ep).force_path_style(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
aws_sdk_s3::Client::from_conf(s3_config.build())
|
|
||||||
};
|
|
||||||
|
|
||||||
let source = Arc::new(Self {
|
let source = Arc::new(Self {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
bucket: bucket.into(),
|
client,
|
||||||
prefix: prefix.map(|x| x.into()),
|
prefix: prefix.map(|x| x.into()),
|
||||||
client: Arc::new(client),
|
|
||||||
pattern,
|
pattern,
|
||||||
encryption_key,
|
encryption_key,
|
||||||
index: OnceLock::new(),
|
index: OnceLock::new(),
|
||||||
@@ -78,9 +68,10 @@ impl S3DataSource {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut req = source
|
let mut req = source
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(source.bucket.as_str());
|
.bucket(source.client.bucket());
|
||||||
|
|
||||||
if let Some(prefix) = &source.prefix {
|
if let Some(prefix) = &source.prefix {
|
||||||
req = req.prefix(prefix.as_str());
|
req = req.prefix(prefix.as_str());
|
||||||
@@ -191,7 +182,11 @@ impl DataSource for Arc<S3DataSource> {
|
|||||||
let mut continuation_token: Option<String> = None;
|
let mut continuation_token: Option<String> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str());
|
let mut req = self
|
||||||
|
.client
|
||||||
|
.client
|
||||||
|
.list_objects_v2()
|
||||||
|
.bucket(self.client.bucket());
|
||||||
|
|
||||||
if let Some(prefix) = &self.prefix {
|
if let Some(prefix) = &self.prefix {
|
||||||
req = req.prefix(prefix.as_str());
|
req = req.prefix(prefix.as_str());
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use pile_io::{ChaChaReaderAsync, S3Reader, SyncReadBridge};
|
use pile_io::{SyncReadBridge, chacha::ChaChaReaderv1Async};
|
||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
|
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
@@ -59,39 +59,13 @@ impl Item {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let head = source
|
let reader = source.client.get(&full_key).await?;
|
||||||
.client
|
|
||||||
.head_object()
|
|
||||||
.bucket(source.bucket.as_str())
|
|
||||||
.key(full_key.as_str())
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(std::io::Error::other)?;
|
|
||||||
|
|
||||||
let size = head.content_length().unwrap_or(0) as u64;
|
|
||||||
|
|
||||||
match source.encryption_key {
|
match source.encryption_key {
|
||||||
None => ItemReader::S3(S3Reader {
|
None => ItemReader::S3(reader),
|
||||||
client: source.client.clone(),
|
Some(enc_key) => {
|
||||||
bucket: source.bucket.clone(),
|
ItemReader::EncryptedS3(ChaChaReaderv1Async::new(reader, enc_key).await?)
|
||||||
key: full_key,
|
}
|
||||||
cursor: 0,
|
|
||||||
size,
|
|
||||||
}),
|
|
||||||
|
|
||||||
Some(enc_key) => ItemReader::EncryptedS3(
|
|
||||||
ChaChaReaderAsync::new(
|
|
||||||
S3Reader {
|
|
||||||
client: source.client.clone(),
|
|
||||||
bucket: source.bucket.clone(),
|
|
||||||
key: full_key,
|
|
||||||
cursor: 0,
|
|
||||||
size,
|
|
||||||
},
|
|
||||||
enc_key,
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use pile_io::{AsyncReader, AsyncSeekReader, ChaChaReaderAsync, S3Reader};
|
use pile_io::{AsyncReader, AsyncSeekReader, S3Reader, chacha::ChaChaReaderv1Async};
|
||||||
use std::{fs::File, io::Seek};
|
use std::{fs::File, io::Seek};
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -8,7 +8,7 @@ use std::{fs::File, io::Seek};
|
|||||||
pub enum ItemReader {
|
pub enum ItemReader {
|
||||||
File(File),
|
File(File),
|
||||||
S3(S3Reader),
|
S3(S3Reader),
|
||||||
EncryptedS3(ChaChaReaderAsync<S3Reader>),
|
EncryptedS3(ChaChaReaderv1Async<S3Reader>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncReader for ItemReader {
|
impl AsyncReader for ItemReader {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_io::{AsyncReader, ChaChaReader, ChaChaWriter};
|
use pile_io::AsyncReader;
|
||||||
|
use pile_io::chacha::{ChaChaReaderv1, ChaChaWriterv1};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::source::string_to_key;
|
use pile_value::source::string_to_key;
|
||||||
use std::io::{Cursor, Write};
|
use std::io::{Cursor, Write};
|
||||||
@@ -37,7 +38,7 @@ impl CliCmd for EncryptCommand {
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
||||||
|
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), key)
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), key)
|
||||||
.context("while initializing encryptor")?;
|
.context("while initializing encryptor")?;
|
||||||
writer.write_all(&plaintext).context("while encrypting")?;
|
writer.write_all(&plaintext).context("while encrypting")?;
|
||||||
let buf = writer.finish().context("while finalizing encryptor")?;
|
let buf = writer.finish().context("while finalizing encryptor")?;
|
||||||
@@ -61,7 +62,7 @@ impl CliCmd for DecryptCommand {
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
.with_context(|| format!("while reading '{}'", self.path.display()))?;
|
||||||
|
|
||||||
let mut reader = ChaChaReader::new(Cursor::new(ciphertext), key)
|
let mut reader = ChaChaReaderv1::new(Cursor::new(ciphertext), key)
|
||||||
.context("while initializing decryptor")?;
|
.context("while initializing decryptor")?;
|
||||||
let plaintext = reader.read_to_end().await.context("while decrypting")?;
|
let plaintext = reader.read_to_end().await.context("while decrypting")?;
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use clap::Args;
|
|||||||
use indicatif::ProgressBar;
|
use indicatif::ProgressBar;
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use pile_dataset::{Dataset, Datasets};
|
use pile_dataset::{Dataset, Datasets};
|
||||||
use pile_io::ChaChaWriter;
|
use pile_io::chacha::ChaChaWriterv1;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path};
|
use pile_value::source::{DataSource, DirDataSource, S3DataSource, encrypt_path};
|
||||||
use std::{
|
use std::{
|
||||||
@@ -71,12 +71,12 @@ impl CliCmd for UploadCommand {
|
|||||||
let bucket = self
|
let bucket = self
|
||||||
.bucket
|
.bucket
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.unwrap_or(s3_ds.bucket.as_str())
|
.unwrap_or(s3_ds.client.bucket())
|
||||||
.to_owned();
|
.to_owned();
|
||||||
let full_prefix = self.prefix.trim_matches('/').to_owned();
|
let full_prefix = self.prefix.trim_matches('/').to_owned();
|
||||||
|
|
||||||
// Check for existing objects at the target prefix
|
// Check for existing objects at the target prefix
|
||||||
let existing_keys = list_prefix(&s3_ds.client, &bucket, &full_prefix)
|
let existing_keys = list_prefix(&s3_ds.client.client, &bucket, &full_prefix)
|
||||||
.await
|
.await
|
||||||
.context("while checking for existing objects at target prefix")?;
|
.context("while checking for existing objects at target prefix")?;
|
||||||
|
|
||||||
@@ -89,6 +89,7 @@ impl CliCmd for UploadCommand {
|
|||||||
);
|
);
|
||||||
for key in &existing_keys {
|
for key in &existing_keys {
|
||||||
s3_ds
|
s3_ds
|
||||||
|
.client
|
||||||
.client
|
.client
|
||||||
.delete_object()
|
.delete_object()
|
||||||
.bucket(&bucket)
|
.bucket(&bucket)
|
||||||
@@ -169,7 +170,7 @@ impl CliCmd for UploadCommand {
|
|||||||
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
|
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
|
||||||
let plaintext = std::fs::read(&path)
|
let plaintext = std::fs::read(&path)
|
||||||
.with_context(|| format!("while opening '{}'", path.display()))?;
|
.with_context(|| format!("while opening '{}'", path.display()))?;
|
||||||
let mut writer = ChaChaWriter::new(Cursor::new(Vec::new()), enc_key)
|
let mut writer = ChaChaWriterv1::new(Cursor::new(Vec::new()), enc_key)
|
||||||
.context("while initializing encryptor")?;
|
.context("while initializing encryptor")?;
|
||||||
writer.write_all(&plaintext).context("while encrypting")?;
|
writer.write_all(&plaintext).context("while encrypting")?;
|
||||||
Ok(writer.finish().context("while finalizing")?.into_inner())
|
Ok(writer.finish().context("while finalizing")?.into_inner())
|
||||||
@@ -184,6 +185,7 @@ impl CliCmd for UploadCommand {
|
|||||||
};
|
};
|
||||||
|
|
||||||
client
|
client
|
||||||
|
.client
|
||||||
.put_object()
|
.put_object()
|
||||||
.bucket(&bucket)
|
.bucket(&bucket)
|
||||||
.key(&key)
|
.key(&key)
|
||||||
@@ -224,7 +226,7 @@ fn get_dir_source(
|
|||||||
label: &Label,
|
label: &Label,
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<Arc<DirDataSource>, anyhow::Error> {
|
) -> Result<Arc<DirDataSource>, anyhow::Error> {
|
||||||
match ds.sources.get(label) {
|
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
|
||||||
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
|
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
|
||||||
Some(_) => Err(anyhow::anyhow!(
|
Some(_) => Err(anyhow::anyhow!(
|
||||||
"source '{name}' is not a filesystem source"
|
"source '{name}' is not a filesystem source"
|
||||||
@@ -240,7 +242,7 @@ fn get_s3_source(
|
|||||||
label: &Label,
|
label: &Label,
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<Arc<S3DataSource>, anyhow::Error> {
|
) -> Result<Arc<S3DataSource>, anyhow::Error> {
|
||||||
match ds.sources.get(label) {
|
match ds.sources.get(label).or(ds.disabled_sources.get(label)) {
|
||||||
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
|
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
|
||||||
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
|
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
|
||||||
None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),
|
None => Err(anyhow::anyhow!("s3 source '{name}' not found in config")),
|
||||||
|
|||||||
Reference in New Issue
Block a user