Read files incrementally

This commit is contained in:
2026-03-10 09:18:26 -07:00
parent c02e92dd72
commit c683939d7c
9 changed files with 352 additions and 214 deletions

View File

@@ -1,9 +1,9 @@
use epub::doc::EpubDoc; use epub::doc::EpubDoc;
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, io::Cursor, sync::OnceLock}; use std::{collections::HashMap, sync::OnceLock};
use tracing::debug; use tracing::trace;
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct EpubMetaExtractor<'a> { pub struct EpubMetaExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -23,20 +23,18 @@ impl<'a> EpubMetaExtractor<'a> {
return Ok(x); return Ok(x);
} }
let bytes = self.item.read().await?.read_to_end().await?; let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
let cursor = Cursor::new(bytes); if !matches!(ext, Some("epub")) {
let doc = match EpubDoc::from_reader(cursor) { return Ok(self.output.get_or_init(|| HashMap::new()));
Ok(x) => x,
Err(error) => {
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
} }
};
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new(); let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_meta = tokio::task::spawn_blocking(move || {
let doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let fields = &[ let fields: &[&'static str] = &[
"title", "title",
"creator", "creator",
"description", "description",
@@ -47,10 +45,28 @@ impl<'a> EpubMetaExtractor<'a> {
"identifier", "identifier",
]; ];
let meta: Vec<(&'static str, Option<String>)> =
fields.iter().map(|&key| (key, doc.mdata(key))).collect();
Ok::<_, std::io::Error>(meta)
})
.await
.map_err(std::io::Error::other)?;
let raw_meta = match raw_meta {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
for key in fields { for (key, val) in raw_meta {
let label = Label::new(*key).unwrap(); let label = Label::new(key).unwrap();
let value = match doc.mdata(key) { let value = match val {
Some(s) => PileValue::String(s.into()), Some(s) => PileValue::String(s.into()),
None => PileValue::Null, None => PileValue::Null,
}; };

View File

@@ -1,9 +1,9 @@
use epub::doc::EpubDoc; use epub::doc::EpubDoc;
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, io::Cursor, sync::OnceLock}; use std::{collections::HashMap, sync::OnceLock};
use tracing::debug; use tracing::debug;
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct EpubTextExtractor<'a> { pub struct EpubTextExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -23,16 +23,16 @@ impl<'a> EpubTextExtractor<'a> {
return Ok(x); return Ok(x);
} }
let bytes = self.item.read().await?.read_to_end().await?; let key = self.item.key();
let ext = key.as_str().rsplit('.').next();
let cursor = Cursor::new(bytes); if !matches!(ext, Some("epub")) {
let mut doc = match EpubDoc::from_reader(cursor) { return Ok(self.output.get_or_init(|| HashMap::new()));
Ok(x) => x,
Err(error) => {
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
} }
};
let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_text = tokio::task::spawn_blocking(move || {
let mut doc = EpubDoc::from_reader(reader)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut text_parts: Vec<String> = Vec::new(); let mut text_parts: Vec<String> = Vec::new();
@@ -45,10 +45,24 @@ impl<'a> EpubTextExtractor<'a> {
} }
} }
let text = text_parts.join(" "); Ok::<_, std::io::Error>(text_parts.join(" "))
})
.await
.map_err(std::io::Error::other)?;
let raw_text = match raw_text {
Ok(x) => x,
Err(error) => {
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
let output = HashMap::from([(Label::new("text").unwrap(), PileValue::String(text.into()))]); let output = HashMap::from([(
Label::new("text").unwrap(),
PileValue::String(raw_text.into()),
)]);
let _ = self.output.set(output); let _ = self.output.set(output);
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]

View File

@@ -1,8 +1,8 @@
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, io::Cursor, sync::OnceLock}; use std::{collections::HashMap, io::BufReader, sync::OnceLock};
use tracing::debug; use tracing::debug;
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct ExifExtractor<'a> { pub struct ExifExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -22,10 +22,29 @@ impl<'a> ExifExtractor<'a> {
return Ok(x); return Ok(x);
} }
let bytes = self.item.read().await?.read_to_end().await?; let reader = SyncReadBridge::new_current(self.item.read().await?);
let mut cursor = Cursor::new(bytes); let raw_fields = tokio::task::spawn_blocking(move || {
let mut br = BufReader::new(reader);
let exif = exif::Reader::new()
.read_from_container(&mut br)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let exif = match exif::Reader::new().read_from_container(&mut cursor) { let fields: Vec<(String, String)> = exif
.fields()
.map(|f| {
(
f.tag.to_string(),
f.display_value().with_unit(&exif).to_string(),
)
})
.collect();
Ok::<_, std::io::Error>(fields)
})
.await
.map_err(std::io::Error::other)?;
let raw_fields = match raw_fields {
Ok(x) => x, Ok(x) => x,
Err(error) => { Err(error) => {
debug!(message = "Could not process exif", ?error, key = ?self.item.key()); debug!(message = "Could not process exif", ?error, key = ?self.item.key());
@@ -35,23 +54,22 @@ impl<'a> ExifExtractor<'a> {
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new(); let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
for field in exif.fields() { for (tag_name, value) in raw_fields {
let Some(label) = tag_to_label(&field.tag) else { let Some(label) = tag_to_label(&tag_name) else {
continue; continue;
}; };
// First occurrence wins (PRIMARY IFD comes before THUMBNAIL) // First occurrence wins (PRIMARY IFD comes before THUMBNAIL)
output.entry(label).or_insert_with(|| { output
PileValue::String(field.display_value().with_unit(&exif).to_string().into()) .entry(label)
}); .or_insert_with(|| PileValue::String(value.into()));
} }
return Ok(self.output.get_or_init(|| output)); return Ok(self.output.get_or_init(|| output));
} }
} }
fn tag_to_label(tag: &exif::Tag) -> Option<Label> { fn tag_to_label(tag: &str) -> Option<Label> {
let sanitized: String = tag let sanitized: String = tag
.to_string()
.chars() .chars()
.map(|c| if c == ' ' { '_' } else { c }) .map(|c| if c == ' ' { '_' } else { c })
.filter(|c| Label::VALID_CHARS.contains(*c)) .filter(|c| Label::VALID_CHARS.contains(*c))

View File

@@ -2,7 +2,7 @@ use pile_config::Label;
use pile_flac::{FlacBlock, FlacReader}; use pile_flac::{FlacBlock, FlacReader};
use std::{collections::HashMap, io::BufReader, sync::OnceLock}; use std::{collections::HashMap, io::BufReader, sync::OnceLock};
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct FlacExtractor<'a> { pub struct FlacExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -33,26 +33,34 @@ impl<'a> FlacExtractor<'a> {
return Ok(self.output.get().unwrap()); return Ok(self.output.get().unwrap());
} }
let bytes = self.item.read().await?.read_to_end().await?; let reader = SyncReadBridge::new_current(self.item.read().await?);
let reader = FlacReader::new(BufReader::new(std::io::Cursor::new(bytes))); let raw_tags = tokio::task::spawn_blocking(move || {
let reader = FlacReader::new(BufReader::new(reader));
let mut output: HashMap<Label, Vec<_>> = HashMap::new(); let mut tags: Vec<(String, String)> = Vec::new();
for block in reader { for block in reader {
if let FlacBlock::VorbisComment(comment) = if let FlacBlock::VorbisComment(comment) =
block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
{ {
for (k, v) in comment.comment.comments { for (k, v) in comment.comment.comments {
match Label::new(k.to_string().to_lowercase()) { tags.push((k.to_string().to_lowercase(), v.into()));
Some(k) => output.entry(k).or_default().push(PileValue::String(v)),
None => continue,
} }
}
// We should only have one comment block,
// stop reading when we find it
break; break;
} }
} }
Ok::<_, std::io::Error>(tags)
})
.await
.map_err(std::io::Error::other)??;
let mut output: HashMap<Label, Vec<PileValue<'a>>> = HashMap::new();
for (k, v) in raw_tags {
if let Some(label) = Label::new(k) {
output
.entry(label)
.or_default()
.push(PileValue::String(v.into()));
}
}
let output = output let output = output
.into_iter() .into_iter()

View File

@@ -2,7 +2,7 @@ use id3::Tag;
use pile_config::Label; use pile_config::Label;
use std::{borrow::Cow, collections::HashMap, io::BufReader, sync::OnceLock}; use std::{borrow::Cow, collections::HashMap, io::BufReader, sync::OnceLock};
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct Id3Extractor<'a> { pub struct Id3Extractor<'a> {
item: &'a Item, item: &'a Item,
@@ -22,34 +22,32 @@ impl<'a> Id3Extractor<'a> {
return Ok(x); return Ok(x);
} }
let key = match self.item { let key = self.item.key();
Item::File { path, .. } => path.to_str().unwrap_or_default().to_owned(), let ext = key.as_str().rsplit('.').next();
Item::S3 { key, .. } => key.to_string(),
};
let ext = key.rsplit('.').next();
if !matches!(ext, Some("mp3") | Some("aiff") | Some("aif") | Some("wav")) { if !matches!(ext, Some("mp3") | Some("aiff") | Some("aif") | Some("wav")) {
let _ = self.output.set(HashMap::new()); return Ok(self.output.get_or_init(|| HashMap::new()));
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
} }
let bytes = self.item.read().await?.read_to_end().await?; let reader = SyncReadBridge::new_current(self.item.read().await?);
let tag = match Tag::read_from2(BufReader::new(std::io::Cursor::new(bytes))) { let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader)))
Ok(tag) => tag, .await
Err(id3::Error { {
Ok(Ok(tag)) => tag,
Ok(Err(id3::Error {
kind: id3::ErrorKind::NoTag, kind: id3::ErrorKind::NoTag,
.. ..
}) => { })) => {
let _ = self.output.set(HashMap::new()); return Ok(self.output.get_or_init(|| HashMap::new()));
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
} }
Err(id3::Error {
Ok(Err(id3::Error {
kind: id3::ErrorKind::Io(e), kind: id3::ErrorKind::Io(e),
.. ..
}) => return Err(e), })) => return Err(e),
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Ok(Err(e)) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Err(e) => return Err(e.into()),
}; };
let mut output: HashMap<Label, Vec<PileValue<'a>>> = HashMap::new(); let mut output: HashMap<Label, Vec<PileValue<'a>>> = HashMap::new();
@@ -70,9 +68,7 @@ impl<'a> Id3Extractor<'a> {
.map(|(k, v)| (k, PileValue::Array(v))) .map(|(k, v)| (k, PileValue::Array(v)))
.collect(); .collect();
let _ = self.output.set(output); return Ok(self.output.get_or_init(|| output));
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
} }
} }

View File

@@ -1,10 +1,10 @@
use pdf::file::FileOptions; use pdf::file::FileOptions;
use pdf::primitive::{Date, PdfString, TimeRel}; use pdf::primitive::{Date, TimeRel};
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, sync::OnceLock}; use std::{collections::HashMap, io::BufReader, sync::OnceLock};
use tracing::debug; use tracing::trace;
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct PdfMetaExtractor<'a> { pub struct PdfMetaExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -24,21 +24,27 @@ impl<'a> PdfMetaExtractor<'a> {
return Ok(x); return Ok(x);
} }
let bytes = self.item.read().await?.read_to_end().await?; let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_meta = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let file = match FileOptions::cached().load(bytes) { let file = match FileOptions::cached().load(bytes) {
Ok(x) => x, Ok(x) => x,
Err(pdf::PdfError::Io { source }) => return Err(source), Err(pdf::PdfError::Io { source }) => return Err(source),
Err(error) => { Err(error) => {
debug!(message = "Could not process pdf", ?error, key = ?self.item.key()); return Err(std::io::Error::new(
return Ok(self.output.get_or_init(HashMap::new)); std::io::ErrorKind::InvalidData,
error.to_string(),
));
} }
}; };
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new(); let mut meta: Vec<(&'static str, Option<String>)> = Vec::new();
if let Some(info) = &file.trailer.info_dict { if let Some(info) = &file.trailer.info_dict {
let fields: &[(&str, Option<&PdfString>)] = &[ use pdf::primitive::PdfString;
let fields: &[(&'static str, Option<&PdfString>)] = &[
("title", info.title.as_ref()), ("title", info.title.as_ref()),
("author", info.author.as_ref()), ("author", info.author.as_ref()),
("subject", info.subject.as_ref()), ("subject", info.subject.as_ref()),
@@ -47,35 +53,42 @@ impl<'a> PdfMetaExtractor<'a> {
("producer", info.producer.as_ref()), ("producer", info.producer.as_ref()),
]; ];
#[expect(clippy::unwrap_used)]
for (key, val) in fields { for (key, val) in fields {
let label = Label::new(*key).unwrap(); meta.push((key, val.map(|s| s.to_string_lossy())));
}
meta.push((
"creation_date",
info.creation_date.as_ref().map(format_date),
));
meta.push(("mod_date", info.mod_date.as_ref().map(format_date)));
}
Ok::<_, std::io::Error>(meta)
})
.await
.map_err(std::io::Error::other)?;
let raw_meta = match raw_meta {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
#[expect(clippy::unwrap_used)]
for (key, val) in raw_meta {
let label = Label::new(key).unwrap();
let value = match val { let value = match val {
Some(s) => PileValue::String(s.to_string_lossy().into()), Some(s) => PileValue::String(s.into()),
None => PileValue::Null, None => PileValue::Null,
}; };
output.insert(label, value); output.insert(label, value);
} }
#[expect(clippy::unwrap_used)]
{
output.insert(
Label::new("creation_date").unwrap(),
info.creation_date
.as_ref()
.map(|d| PileValue::String(format_date(d).into()))
.unwrap_or(PileValue::Null),
);
output.insert(
Label::new("mod_date").unwrap(),
info.mod_date
.as_ref()
.map(|d| PileValue::String(format_date(d).into()))
.unwrap_or(PileValue::Null),
);
}
}
return Ok(self.output.get_or_init(|| output)); return Ok(self.output.get_or_init(|| output));
} }
} }

View File

@@ -1,10 +1,10 @@
use pdf::content::{Op, TextDrawAdjusted}; use pdf::content::{Op, TextDrawAdjusted};
use pdf::file::FileOptions; use pdf::file::FileOptions;
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, sync::OnceLock}; use std::{collections::HashMap, io::BufReader, sync::OnceLock};
use tracing::debug; use tracing::trace;
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
pub struct PdfTextExtractor<'a> { pub struct PdfTextExtractor<'a> {
item: &'a Item, item: &'a Item,
@@ -24,22 +24,28 @@ impl<'a> PdfTextExtractor<'a> {
return Ok(x); return Ok(x);
} }
let bytes = self.item.read().await?.read_to_end().await?; let reader = SyncReadBridge::new_current(self.item.read().await?);
let raw_text = tokio::task::spawn_blocking(move || {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
let file = match FileOptions::cached().load(bytes) { let file = match FileOptions::cached().load(bytes) {
Ok(x) => x, Ok(x) => x,
Err(pdf::PdfError::Io { source }) => return Err(source), Err(pdf::PdfError::Io { source }) => return Err(source),
Err(error) => { Err(error) => {
debug!(message = "Could not process pdf", ?error, key = ?self.item.key()); return Err(std::io::Error::new(
return Ok(self.output.get_or_init(HashMap::new)); std::io::ErrorKind::InvalidData,
error.to_string(),
));
} }
}; };
let mut text_parts: Vec<String> = Vec::new(); let mut text_parts: Vec<String> = Vec::new();
for page in file.pages() { for page in file.pages() {
let page = page let page = page.map_err(|e| {
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
if let Some(content) = &page.contents { if let Some(content) = &page.contents {
let ops = content.operations(&file.resolver()).map_err(|e| { let ops = content.operations(&file.resolver()).map_err(|e| {
@@ -64,14 +70,26 @@ impl<'a> PdfTextExtractor<'a> {
} }
} }
let text = text_parts.join(" "); Ok::<_, std::io::Error>(text_parts.join(" "))
})
.await
.map_err(std::io::Error::other)?;
let raw_text = match raw_text {
Ok(x) => x,
Err(error) => {
trace!(message = "Could not process pdf", ?error, key = ?self.item.key());
return Ok(self.output.get_or_init(HashMap::new));
}
};
#[expect(clippy::unwrap_used)] #[expect(clippy::unwrap_used)]
let output = HashMap::from([(Label::new("text").unwrap(), PileValue::String(text.into()))]); let output = HashMap::from([(
Label::new("text").unwrap(),
PileValue::String(raw_text.into()),
)]);
let _ = self.output.set(output); return Ok(self.output.get_or_init(|| output));
#[expect(clippy::unwrap_used)]
return Ok(self.output.get().unwrap());
} }
} }

View File

@@ -1,7 +1,7 @@
use pile_config::Label; use pile_config::Label;
use std::{collections::HashMap, sync::OnceLock}; use std::{collections::HashMap, sync::OnceLock};
use crate::{Item, PileValue, extract::Extractor}; use crate::{Item, PileValue, Reader, extract::Extractor};
fn toml_to_pile(value: toml::Value) -> PileValue<'static> { fn toml_to_pile(value: toml::Value) -> PileValue<'static> {
match value { match value {

View File

@@ -1,5 +1,11 @@
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use std::{fs::File, io::Seek, path::PathBuf, sync::Arc}; use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::PathBuf,
sync::Arc,
};
use tokio::runtime::Handle;
use crate::source::{DirDataSource, S3DataSource}; use crate::source::{DirDataSource, S3DataSource};
@@ -90,33 +96,26 @@ impl Item {
} }
} }
pub enum ItemReader { //
File(File), // MARK: reader
S3(S3Reader), //
}
impl ItemReader { pub trait Reader: Send {
/// Read a chunk of bytes. /// Read a chunk of bytes.
pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(
match self { &mut self,
Self::File(x) => std::io::Read::read(x, buf), buf: &mut [u8],
Self::S3(x) => x.read(buf).await, ) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
}
} fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
/// Read all remaining bytes into a `Vec`. /// Read all remaining bytes into a `Vec`.
pub async fn read_to_end(mut self) -> std::io::Result<Vec<u8>> { fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
match self { async {
Self::File(mut f) => {
let mut buf = Vec::new();
std::io::Read::read_to_end(&mut f, &mut buf)?;
Ok(buf)
}
Self::S3(ref mut r) => {
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut chunk = vec![0u8; 65536]; let mut chunk = vec![0u8; 65536];
loop { loop {
let n = r.read(&mut chunk).await?; let n = self.read(&mut chunk).await?;
if n == 0 { if n == 0 {
break; break;
} }
@@ -127,10 +126,66 @@ impl ItemReader {
} }
} }
pub fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { //
// MARK: sync bridge
//
/// Turn an async [Reader] into a sync [Read] + [Seek].
///
/// Never use this outside of [tokio::task::spawn_blocking],
/// the async runtime will deadlock if this struct blocks
/// the runtime.
pub struct SyncReadBridge<R: Reader> {
inner: R,
handle: Handle,
}
impl<R: Reader> SyncReadBridge<R> {
/// Creates a new adapter using a handle to the current runtime.
/// Panics if called outside of tokio
pub fn new_current(inner: R) -> Self {
Self::new(inner, Handle::current())
}
/// Creates a new adapter using a handle to an existing runtime.
pub fn new(inner: R, handle: Handle) -> Self {
Self { inner, handle }
}
}
impl<R: Reader> Read for SyncReadBridge<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.handle.block_on(self.inner.read(buf))
}
}
impl<R: Reader> Seek for SyncReadBridge<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.handle.block_on(self.inner.seek(pos))
}
}
//
// MARK: itemreader
//
pub enum ItemReader {
File(File),
S3(S3Reader),
}
impl Reader for ItemReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self {
Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await,
}
}
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match self { match self {
Self::File(x) => x.seek(pos), Self::File(x) => x.seek(pos),
Self::S3(x) => x.seek(pos), Self::S3(x) => x.seek(pos).await,
} }
} }
} }
@@ -147,8 +202,8 @@ pub struct S3Reader {
size: u64, size: u64,
} }
impl S3Reader { impl Reader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor); let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() { if len_left == 0 || buf.is_empty() {
return Ok(0); return Ok(0);
@@ -181,11 +236,11 @@ impl S3Reader {
Ok(n) Ok(n)
} }
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos { match pos {
std::io::SeekFrom::Start(x) => self.cursor = x.min(self.size), SeekFrom::Start(x) => self.cursor = x.min(self.size),
std::io::SeekFrom::Current(x) => { SeekFrom::Current(x) => {
if x < 0 { if x < 0 {
let abs = x.unsigned_abs(); let abs = x.unsigned_abs();
if abs > self.cursor { if abs > self.cursor {