Refactor sidecars
This commit is contained in:
@@ -1,31 +1,41 @@
|
||||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{Label, S3Credentials};
|
||||
use pile_config::{
|
||||
Label, S3Credentials,
|
||||
pattern::{GroupPattern, GroupSegment},
|
||||
};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::sync::Arc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use crate::{source::DataSource, value::Item};
|
||||
use crate::{
|
||||
extract::traits::ExtractState,
|
||||
source::DataSource,
|
||||
value::{Item, PileValue},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct S3DataSource {
|
||||
pub name: Label,
|
||||
pub bucket: SmartString<LazyCompact>,
|
||||
pub prefix: Option<SmartString<LazyCompact>>,
|
||||
pub sidecars: bool,
|
||||
pub client: Arc<aws_sdk_s3::Client>,
|
||||
pub pattern: GroupPattern,
|
||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
||||
}
|
||||
|
||||
impl S3DataSource {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
name: &Label,
|
||||
bucket: String,
|
||||
prefix: Option<String>,
|
||||
endpoint: Option<String>,
|
||||
region: String,
|
||||
credentials: &S3Credentials,
|
||||
sidecars: bool,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
pattern: GroupPattern,
|
||||
) -> Result<Arc<Self>, std::io::Error> {
|
||||
let client = {
|
||||
let creds = Credentials::new(
|
||||
&credentials.access_key_id,
|
||||
@@ -47,174 +57,118 @@ impl S3DataSource {
|
||||
aws_sdk_s3::Client::from_conf(s3_config.build())
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
let source = Arc::new(Self {
|
||||
name: name.clone(),
|
||||
bucket: bucket.into(),
|
||||
prefix: prefix.map(|x| x.into()),
|
||||
sidecars,
|
||||
client: Arc::new(client),
|
||||
})
|
||||
}
|
||||
pattern,
|
||||
index: OnceLock::new(),
|
||||
});
|
||||
|
||||
async fn find_sidecar_key(&self, key: &str) -> Option<SmartString<LazyCompact>> {
|
||||
// First try {key}.toml
|
||||
let full_toml = format!("{key}.toml");
|
||||
if self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(&full_toml)
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return Some(full_toml.into());
|
||||
}
|
||||
//
|
||||
// MARK: list keys
|
||||
//
|
||||
|
||||
// Then try {key-with-extension-stripped}.toml
|
||||
let stripped = std::path::Path::new(key).with_extension("toml");
|
||||
if let Some(stripped_str) = stripped.to_str()
|
||||
&& stripped_str != full_toml.as_str()
|
||||
&& self
|
||||
let mut all_keys: HashSet<SmartString<LazyCompact>> = HashSet::new();
|
||||
let mut continuation_token: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let mut req = source
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(stripped_str)
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return Some(stripped_str.into());
|
||||
.list_objects_v2()
|
||||
.bucket(source.bucket.as_str());
|
||||
|
||||
if let Some(prefix) = &source.prefix {
|
||||
req = req.prefix(prefix.as_str());
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
req = req.continuation_token(token);
|
||||
}
|
||||
|
||||
let resp = req.send().await.map_err(std::io::Error::other)?;
|
||||
|
||||
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
|
||||
let is_truncated = resp.is_truncated().unwrap_or(false);
|
||||
|
||||
for obj in resp.contents() {
|
||||
let Some(full_key) = obj.key() else { continue };
|
||||
let key = strip_prefix(full_key, source.prefix.as_deref());
|
||||
all_keys.insert(key.into());
|
||||
}
|
||||
|
||||
if !is_truncated {
|
||||
break;
|
||||
}
|
||||
continuation_token = next_token;
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
//
|
||||
// MARK: resolve groups
|
||||
//
|
||||
|
||||
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
|
||||
let key: SmartString<LazyCompact> = key.into();
|
||||
let object_path = match &self.prefix {
|
||||
Some(x) => format!("{x}/{key}").into(),
|
||||
None => key.clone(),
|
||||
};
|
||||
let mut keys_grouped: HashSet<SmartString<LazyCompact>> = HashSet::new();
|
||||
for key in &all_keys {
|
||||
let groups = resolve_groups(&source.pattern, key).await;
|
||||
for group_key in groups.into_values() {
|
||||
if all_keys.contains(&group_key) {
|
||||
keys_grouped.insert(group_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream();
|
||||
|
||||
let sidecar = if self.sidecars {
|
||||
self.find_sidecar_key(object_path.as_str())
|
||||
.await
|
||||
.map(|sidecar_key| {
|
||||
Box::new(Item::S3 {
|
||||
source: Arc::clone(self),
|
||||
mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(),
|
||||
key: sidecar_key,
|
||||
sidecar: None,
|
||||
})
|
||||
let mut index = HashMap::new();
|
||||
for key in all_keys.difference(&keys_grouped) {
|
||||
let groups = resolve_groups(&source.pattern, key).await;
|
||||
let group = groups
|
||||
.into_iter()
|
||||
.filter(|(_, gk)| all_keys.contains(gk))
|
||||
.map(|(label, gk)| {
|
||||
(
|
||||
label,
|
||||
Box::new(Item::S3 {
|
||||
source: Arc::clone(&source),
|
||||
mime: mime_guess::from_path(gk.as_str()).first_or_octet_stream(),
|
||||
key: gk,
|
||||
group: Arc::new(HashMap::new()),
|
||||
}),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Item::S3 {
|
||||
source: Arc::clone(self),
|
||||
mime,
|
||||
key,
|
||||
sidecar,
|
||||
let item = Item::S3 {
|
||||
source: Arc::clone(&source),
|
||||
mime: mime_guess::from_path(key.as_str()).first_or_octet_stream(),
|
||||
key: key.clone(),
|
||||
group: Arc::new(group),
|
||||
};
|
||||
|
||||
index.insert(item.key(), item);
|
||||
}
|
||||
|
||||
source.index.get_or_init(|| index);
|
||||
Ok(source)
|
||||
}
|
||||
}
|
||||
|
||||
impl DataSource for Arc<S3DataSource> {
|
||||
#[expect(clippy::expect_used)]
|
||||
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
|
||||
if self.sidecars && key.ends_with(".toml") {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let key: SmartString<LazyCompact> = key.into();
|
||||
let key = match &self.prefix {
|
||||
Some(x) => format!("{x}/{key}").into(),
|
||||
None => key,
|
||||
};
|
||||
|
||||
let result = self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket.as_str())
|
||||
.key(key.as_str())
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(sdk_err) => {
|
||||
let not_found = sdk_err
|
||||
.as_service_error()
|
||||
.map(|e| e.is_not_found())
|
||||
.unwrap_or(false);
|
||||
if not_found {
|
||||
return Ok(None);
|
||||
}
|
||||
Err(std::io::Error::other(sdk_err))
|
||||
}
|
||||
Ok(_) => Ok(Some(self.make_item(key).await)),
|
||||
}
|
||||
return Ok(self
|
||||
.index
|
||||
.get()
|
||||
.expect("index should be initialized")
|
||||
.get(key)
|
||||
.cloned());
|
||||
}
|
||||
|
||||
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(64);
|
||||
let source = Arc::clone(self);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut continuation_token: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let mut req = source
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(source.bucket.as_str());
|
||||
|
||||
if let Some(prefix) = &source.prefix {
|
||||
req = req.prefix(prefix.as_str());
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
req = req.continuation_token(token);
|
||||
}
|
||||
|
||||
let resp = match req.send().await {
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(std::io::Error::other(e))).await;
|
||||
break;
|
||||
}
|
||||
Ok(resp) => resp,
|
||||
};
|
||||
|
||||
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
|
||||
let is_truncated = resp.is_truncated().unwrap_or(false);
|
||||
|
||||
for obj in resp.contents() {
|
||||
let key = match obj.key() {
|
||||
Some(k) => k.to_owned(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if source.sidecars && key.ends_with(".toml") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let item = source.make_item(key).await;
|
||||
|
||||
if tx.send(Ok(item)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if !is_truncated {
|
||||
break;
|
||||
}
|
||||
continuation_token = next_token;
|
||||
}
|
||||
});
|
||||
|
||||
ReceiverStream::new(rx)
|
||||
#[expect(clippy::expect_used)]
|
||||
fn iter(&self) -> impl Iterator<Item = &Item> {
|
||||
self.index
|
||||
.get()
|
||||
.expect("index should be initialized")
|
||||
.values()
|
||||
}
|
||||
|
||||
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
@@ -264,3 +218,51 @@ impl DataSource for Arc<S3DataSource> {
|
||||
Ok(ts)
|
||||
}
|
||||
}
|
||||
|
||||
fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str {
|
||||
match prefix {
|
||||
None => key,
|
||||
Some(p) => {
|
||||
let with_slash = if p.ends_with('/') {
|
||||
key.strip_prefix(p)
|
||||
} else {
|
||||
key.strip_prefix(&format!("{p}/"))
|
||||
};
|
||||
with_slash.unwrap_or(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_groups(
|
||||
pattern: &GroupPattern,
|
||||
key: &str,
|
||||
) -> HashMap<Label, SmartString<LazyCompact>> {
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
let mut group = HashMap::new();
|
||||
'pattern: for (l, pat) in &pattern.pattern {
|
||||
let item = PileValue::String(Arc::new(key.into()));
|
||||
let mut target = String::new();
|
||||
for p in pat {
|
||||
match p {
|
||||
GroupSegment::Literal(x) => target.push_str(x),
|
||||
GroupSegment::Path(op) => {
|
||||
let res = match item.query(&state, op).await {
|
||||
Ok(Some(x)) => x,
|
||||
_ => continue 'pattern,
|
||||
};
|
||||
|
||||
let res = match res.as_str() {
|
||||
Some(x) => x,
|
||||
None => continue 'pattern,
|
||||
};
|
||||
|
||||
target.push_str(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
group.insert(l.clone(), target.into());
|
||||
}
|
||||
|
||||
return group;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user