Fix S3 source
This commit is contained in:
@@ -40,11 +40,22 @@ impl Item {
|
|||||||
Self::File { path, .. } => ItemReader::File(File::open(path)?),
|
Self::File { path, .. } => ItemReader::File(File::open(path)?),
|
||||||
|
|
||||||
Self::S3 { source, key, .. } => {
|
Self::S3 { source, key, .. } => {
|
||||||
|
let full_key: SmartString<LazyCompact> = match &source.prefix {
|
||||||
|
None => key.clone(),
|
||||||
|
Some(p) => {
|
||||||
|
if p.ends_with('/') {
|
||||||
|
format!("{p}{key}").into()
|
||||||
|
} else {
|
||||||
|
format!("{p}/{key}").into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let head = source
|
let head = source
|
||||||
.client
|
.client
|
||||||
.head_object()
|
.head_object()
|
||||||
.bucket(source.bucket.as_str())
|
.bucket(source.bucket.as_str())
|
||||||
.key(key.as_str())
|
.key(full_key.as_str())
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.map_err(std::io::Error::other)?;
|
.map_err(std::io::Error::other)?;
|
||||||
@@ -54,7 +65,7 @@ impl Item {
|
|||||||
ItemReader::S3(S3Reader {
|
ItemReader::S3(S3Reader {
|
||||||
client: source.client.clone(),
|
client: source.client.clone(),
|
||||||
bucket: source.bucket.clone(),
|
bucket: source.bucket.clone(),
|
||||||
key: key.to_owned(),
|
key: full_key,
|
||||||
cursor: 0,
|
cursor: 0,
|
||||||
size,
|
size,
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user