use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use chrono::{DateTime, Utc}; use pile_config::{Label, S3Credentials}; use smartstring::{LazyCompact, SmartString}; use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; use crate::{source::DataSource, value::Item}; #[derive(Debug)] pub struct S3DataSource { pub name: Label, pub bucket: SmartString, pub prefix: Option>, pub sidecars: bool, pub client: Arc, } impl S3DataSource { pub fn new( name: &Label, bucket: String, prefix: Option, endpoint: Option, region: String, credentials: &S3Credentials, sidecars: bool, ) -> Result { let client = { let creds = Credentials::new( &credentials.access_key_id, &credentials.secret_access_key, None, None, "pile", ); let mut s3_config = aws_sdk_s3::config::Builder::new() .behavior_version(BehaviorVersion::latest()) .region(Region::new(region)) .credentials_provider(creds); if let Some(ep) = endpoint { s3_config = s3_config.endpoint_url(ep).force_path_style(true); } aws_sdk_s3::Client::from_conf(s3_config.build()) }; Ok(Self { name: name.clone(), bucket: bucket.into(), prefix: prefix.map(|x| x.into()), sidecars, client: Arc::new(client), }) } async fn find_sidecar_key(&self, key: &str) -> Option> { // First try {key}.toml let full_toml = format!("{key}.toml"); if self .client .head_object() .bucket(self.bucket.as_str()) .key(&full_toml) .send() .await .is_ok() { return Some(full_toml.into()); } // Then try {key-with-extension-stripped}.toml let stripped = std::path::Path::new(key).with_extension("toml"); if let Some(stripped_str) = stripped.to_str() && stripped_str != full_toml.as_str() && self .client .head_object() .bucket(self.bucket.as_str()) .key(stripped_str) .send() .await .is_ok() { return Some(stripped_str.into()); } None } async fn make_item(self: &Arc, key: impl Into>) -> Item { let key: SmartString = key.into(); let object_path = match &self.prefix { Some(x) => format!("{x}/{key}").into(), None => key.clone(), }; let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream(); let sidecar = if self.sidecars { self.find_sidecar_key(object_path.as_str()) .await .map(|sidecar_key| { Box::new(Item::S3 { source: Arc::clone(self), mime: mime_guess::from_path(sidecar_key.as_str()).first_or_octet_stream(), key: sidecar_key, sidecar: None, }) }) } else { None }; Item::S3 { source: Arc::clone(self), mime, key, sidecar, } } } impl DataSource for Arc { async fn get(&self, key: &str) -> Result, std::io::Error> { if self.sidecars && key.ends_with(".toml") { return Ok(None); } let key: SmartString = key.into(); let key = match &self.prefix { Some(x) => format!("{x}/{key}").into(), None => key, }; let result = self .client .head_object() .bucket(self.bucket.as_str()) .key(key.as_str()) .send() .await; match result { Err(sdk_err) => { let not_found = sdk_err .as_service_error() .map(|e| e.is_not_found()) .unwrap_or(false); if not_found { return Ok(None); } Err(std::io::Error::other(sdk_err)) } Ok(_) => Ok(Some(self.make_item(key).await)), } } fn iter(&self) -> ReceiverStream> { let (tx, rx) = tokio::sync::mpsc::channel(64); let source = Arc::clone(self); tokio::spawn(async move { let mut continuation_token: Option = None; loop { let mut req = source .client .list_objects_v2() .bucket(source.bucket.as_str()); if let Some(prefix) = &source.prefix { req = req.prefix(prefix.as_str()); } if let Some(token) = continuation_token { req = req.continuation_token(token); } let resp = match req.send().await { Err(e) => { let _ = tx.send(Err(std::io::Error::other(e))).await; break; } Ok(resp) => resp, }; let next_token = resp.next_continuation_token().map(ToOwned::to_owned); let is_truncated = resp.is_truncated().unwrap_or(false); for obj in resp.contents() { let key = match obj.key() { Some(k) => k.to_owned(), None => continue, }; if source.sidecars && key.ends_with(".toml") { continue; } let item = source.make_item(key).await; if tx.send(Ok(item)).await.is_err() { return; } } if !is_truncated { break; } continuation_token = next_token; } }); ReceiverStream::new(rx) } async fn latest_change(&self) -> Result>, std::io::Error> { let mut ts: Option> = None; let mut continuation_token: Option = None; loop { let mut req = self.client.list_objects_v2().bucket(self.bucket.as_str()); if let Some(prefix) = &self.prefix { req = req.prefix(prefix.as_str()); } if let Some(token) = continuation_token { req = req.continuation_token(token); } let resp = match req.send().await { Err(_) => return Ok(None), Ok(resp) => resp, }; let next_token = resp.next_continuation_token().map(ToOwned::to_owned); let is_truncated = resp.is_truncated().unwrap_or(false); for obj in resp.contents() { if let Some(last_modified) = obj.last_modified() { let dt = DateTime::from_timestamp( last_modified.secs(), last_modified.subsec_nanos(), ); if let Some(dt) = dt { ts = Some(match ts { None => dt, Some(prev) => prev.max(dt), }); } } } if !is_truncated { break; } continuation_token = next_token; } Ok(ts) } }