From 54981e60cb09cbdfdfc85808f26b6c6282d02f02 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 10 Mar 2026 10:53:15 -0700 Subject: [PATCH] S3 sidecars --- crates/pile-dataset/src/source/s3.rs | 64 ++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/crates/pile-dataset/src/source/s3.rs b/crates/pile-dataset/src/source/s3.rs index 1dcc586..889bb2a 100644 --- a/crates/pile-dataset/src/source/s3.rs +++ b/crates/pile-dataset/src/source/s3.rs @@ -56,11 +56,61 @@ impl S3DataSource { }) } - fn make_item(self: &Arc, key: impl Into>) -> Item { + async fn find_sidecar_key(&self, key: &str) -> Option> { + // First try {key}.toml + let full_toml = format!("{key}.toml"); + if self + .client + .head_object() + .bucket(self.bucket.as_str()) + .key(&full_toml) + .send() + .await + .is_ok() + { + return Some(full_toml.into()); + } + + // Then try {key-with-extension-stripped}.toml + let stripped = std::path::Path::new(key).with_extension("toml"); + if let Some(stripped_str) = stripped.to_str() + && stripped_str != full_toml.as_str() + && self + .client + .head_object() + .bucket(self.bucket.as_str()) + .key(stripped_str) + .send() + .await + .is_ok() + { + return Some(stripped_str.into()); + } + + None + } + + async fn make_item(self: &Arc, key: impl Into>) -> Item { + let key: SmartString = key.into(); + + let sidecar = if self.sidecars { + self.find_sidecar_key(key.as_str()) + .await + .map(|sidecar_key| { + Box::new(Item::S3 { + source: Arc::clone(self), + key: sidecar_key, + sidecar: None, + }) + }) + } else { + None + }; + Item::S3 { source: Arc::clone(self), - key: key.into(), - sidecar: None, // TODO: add sidecars + key, + sidecar, } } } @@ -90,7 +140,7 @@ impl DataSource for Arc { } Err(std::io::Error::other(sdk_err)) } - Ok(_) => Ok(Some(self.make_item(key))), + Ok(_) => Ok(Some(self.make_item(key).await)), } } @@ -136,11 +186,7 @@ impl DataSource for Arc { continue; } - let item = Item::S3 { - source: Arc::clone(&source), - key: key.into(), - sidecar: None, // TODO: add sidecars - }; + let item = source.make_item(key).await; if tx.send(Ok(item)).await.is_err() { return;