Read files incrementally
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
use epub::doc::EpubDoc;
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, io::Cursor, sync::OnceLock};
|
||||
use std::{collections::HashMap, sync::OnceLock};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct EpubMetaExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -23,20 +23,12 @@ impl<'a> EpubMetaExtractor<'a> {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_meta = tokio::task::spawn_blocking(move || {
|
||||
let doc = EpubDoc::from_reader(reader)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let cursor = Cursor::new(bytes);
|
||||
let doc = match EpubDoc::from_reader(cursor) {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
|
||||
|
||||
let fields = &[
|
||||
let fields: &[&'static str] = &[
|
||||
"title",
|
||||
"creator",
|
||||
"description",
|
||||
@@ -47,10 +39,28 @@ impl<'a> EpubMetaExtractor<'a> {
|
||||
"identifier",
|
||||
];
|
||||
|
||||
let meta: Vec<(&'static str, Option<String>)> =
|
||||
fields.iter().map(|&key| (key, doc.mdata(key))).collect();
|
||||
|
||||
Ok::<_, std::io::Error>(meta)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_meta = match raw_meta {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
for key in fields {
|
||||
let label = Label::new(*key).unwrap();
|
||||
let value = match doc.mdata(key) {
|
||||
for (key, val) in raw_meta {
|
||||
let label = Label::new(key).unwrap();
|
||||
let value = match val {
|
||||
Some(s) => PileValue::String(s.into()),
|
||||
None => PileValue::Null,
|
||||
};
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use epub::doc::EpubDoc;
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, io::Cursor, sync::OnceLock};
|
||||
use std::{collections::HashMap, sync::OnceLock};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct EpubTextExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -23,16 +23,10 @@ impl<'a> EpubTextExtractor<'a> {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
|
||||
let cursor = Cursor::new(bytes);
|
||||
let mut doc = match EpubDoc::from_reader(cursor) {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_text = tokio::task::spawn_blocking(move || {
|
||||
let mut doc = EpubDoc::from_reader(reader)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let mut text_parts: Vec<String> = Vec::new();
|
||||
|
||||
@@ -45,10 +39,24 @@ impl<'a> EpubTextExtractor<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
let text = text_parts.join(" ");
|
||||
Ok::<_, std::io::Error>(text_parts.join(" "))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_text = match raw_text {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process epub", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let output = HashMap::from([(Label::new("text").unwrap(), PileValue::String(text.into()))]);
|
||||
let output = HashMap::from([(
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::String(raw_text.into()),
|
||||
)]);
|
||||
|
||||
let _ = self.output.set(output);
|
||||
#[expect(clippy::unwrap_used)]
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, io::Cursor, sync::OnceLock};
|
||||
use std::{collections::HashMap, io::BufReader, sync::OnceLock};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct ExifExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -22,10 +22,29 @@ impl<'a> ExifExtractor<'a> {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_fields = tokio::task::spawn_blocking(move || {
|
||||
let mut br = BufReader::new(reader);
|
||||
let exif = exif::Reader::new()
|
||||
.read_from_container(&mut br)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
|
||||
let exif = match exif::Reader::new().read_from_container(&mut cursor) {
|
||||
let fields: Vec<(String, String)> = exif
|
||||
.fields()
|
||||
.map(|f| {
|
||||
(
|
||||
f.tag.to_string(),
|
||||
f.display_value().with_unit(&exif).to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok::<_, std::io::Error>(fields)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_fields = match raw_fields {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process exif", ?error, key = ?self.item.key());
|
||||
@@ -35,23 +54,22 @@ impl<'a> ExifExtractor<'a> {
|
||||
|
||||
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
|
||||
|
||||
for field in exif.fields() {
|
||||
let Some(label) = tag_to_label(&field.tag) else {
|
||||
for (tag_name, value) in raw_fields {
|
||||
let Some(label) = tag_to_label(&tag_name) else {
|
||||
continue;
|
||||
};
|
||||
// First occurrence wins (PRIMARY IFD comes before THUMBNAIL)
|
||||
output.entry(label).or_insert_with(|| {
|
||||
PileValue::String(field.display_value().with_unit(&exif).to_string().into())
|
||||
});
|
||||
output
|
||||
.entry(label)
|
||||
.or_insert_with(|| PileValue::String(value.into()));
|
||||
}
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
fn tag_to_label(tag: &exif::Tag) -> Option<Label> {
|
||||
fn tag_to_label(tag: &str) -> Option<Label> {
|
||||
let sanitized: String = tag
|
||||
.to_string()
|
||||
.chars()
|
||||
.map(|c| if c == ' ' { '_' } else { c })
|
||||
.filter(|c| Label::VALID_CHARS.contains(*c))
|
||||
|
||||
@@ -2,7 +2,7 @@ use pile_config::Label;
|
||||
use pile_flac::{FlacBlock, FlacReader};
|
||||
use std::{collections::HashMap, io::BufReader, sync::OnceLock};
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct FlacExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -33,26 +33,34 @@ impl<'a> FlacExtractor<'a> {
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let reader = FlacReader::new(BufReader::new(std::io::Cursor::new(bytes)));
|
||||
|
||||
let mut output: HashMap<Label, Vec<_>> = HashMap::new();
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_tags = tokio::task::spawn_blocking(move || {
|
||||
let reader = FlacReader::new(BufReader::new(reader));
|
||||
let mut tags: Vec<(String, String)> = Vec::new();
|
||||
for block in reader {
|
||||
if let FlacBlock::VorbisComment(comment) =
|
||||
block.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
|
||||
{
|
||||
for (k, v) in comment.comment.comments {
|
||||
match Label::new(k.to_string().to_lowercase()) {
|
||||
Some(k) => output.entry(k).or_default().push(PileValue::String(v)),
|
||||
None => continue,
|
||||
tags.push((k.to_string().to_lowercase(), v.into()));
|
||||
}
|
||||
}
|
||||
|
||||
// We should only have one comment block,
|
||||
// stop reading when we find it
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok::<_, std::io::Error>(tags)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)??;
|
||||
|
||||
let mut output: HashMap<Label, Vec<PileValue<'a>>> = HashMap::new();
|
||||
for (k, v) in raw_tags {
|
||||
if let Some(label) = Label::new(k) {
|
||||
output
|
||||
.entry(label)
|
||||
.or_default()
|
||||
.push(PileValue::String(v.into()));
|
||||
}
|
||||
}
|
||||
|
||||
let output = output
|
||||
.into_iter()
|
||||
|
||||
@@ -2,7 +2,7 @@ use id3::Tag;
|
||||
use pile_config::Label;
|
||||
use std::{borrow::Cow, collections::HashMap, io::BufReader, sync::OnceLock};
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct Id3Extractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -34,22 +34,28 @@ impl<'a> Id3Extractor<'a> {
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let tag = match Tag::read_from2(BufReader::new(std::io::Cursor::new(bytes))) {
|
||||
Ok(tag) => tag,
|
||||
Err(id3::Error {
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let tag = match tokio::task::spawn_blocking(move || Tag::read_from2(BufReader::new(reader)))
|
||||
.await
|
||||
{
|
||||
Ok(Ok(tag)) => tag,
|
||||
|
||||
Ok(Err(id3::Error {
|
||||
kind: id3::ErrorKind::NoTag,
|
||||
..
|
||||
}) => {
|
||||
})) => {
|
||||
let _ = self.output.set(HashMap::new());
|
||||
#[expect(clippy::unwrap_used)]
|
||||
return Ok(self.output.get().unwrap());
|
||||
}
|
||||
Err(id3::Error {
|
||||
|
||||
Ok(Err(id3::Error {
|
||||
kind: id3::ErrorKind::Io(e),
|
||||
..
|
||||
}) => return Err(e),
|
||||
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
|
||||
})) => return Err(e),
|
||||
|
||||
Ok(Err(e)) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, Vec<PileValue<'a>>> = HashMap::new();
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use pdf::file::FileOptions;
|
||||
use pdf::primitive::{Date, PdfString, TimeRel};
|
||||
use pdf::primitive::{Date, TimeRel};
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, sync::OnceLock};
|
||||
use std::{collections::HashMap, io::BufReader, sync::OnceLock};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct PdfMetaExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -24,21 +24,27 @@ impl<'a> PdfMetaExtractor<'a> {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_meta = tokio::task::spawn_blocking(move || {
|
||||
let mut bytes = Vec::new();
|
||||
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
|
||||
|
||||
let file = match FileOptions::cached().load(bytes) {
|
||||
Ok(x) => x,
|
||||
Err(pdf::PdfError::Io { source }) => return Err(source),
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
error.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
|
||||
let mut meta: Vec<(&'static str, Option<String>)> = Vec::new();
|
||||
|
||||
if let Some(info) = &file.trailer.info_dict {
|
||||
let fields: &[(&str, Option<&PdfString>)] = &[
|
||||
use pdf::primitive::PdfString;
|
||||
let fields: &[(&'static str, Option<&PdfString>)] = &[
|
||||
("title", info.title.as_ref()),
|
||||
("author", info.author.as_ref()),
|
||||
("subject", info.subject.as_ref()),
|
||||
@@ -47,35 +53,42 @@ impl<'a> PdfMetaExtractor<'a> {
|
||||
("producer", info.producer.as_ref()),
|
||||
];
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
for (key, val) in fields {
|
||||
let label = Label::new(*key).unwrap();
|
||||
meta.push((key, val.map(|s| s.to_string_lossy())));
|
||||
}
|
||||
|
||||
meta.push((
|
||||
"creation_date",
|
||||
info.creation_date.as_ref().map(format_date),
|
||||
));
|
||||
meta.push(("mod_date", info.mod_date.as_ref().map(format_date)));
|
||||
}
|
||||
|
||||
Ok::<_, std::io::Error>(meta)
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_meta = match raw_meta {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
let mut output: HashMap<Label, PileValue<'a>> = HashMap::new();
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
for (key, val) in raw_meta {
|
||||
let label = Label::new(key).unwrap();
|
||||
let value = match val {
|
||||
Some(s) => PileValue::String(s.to_string_lossy().into()),
|
||||
Some(s) => PileValue::String(s.into()),
|
||||
None => PileValue::Null,
|
||||
};
|
||||
output.insert(label, value);
|
||||
}
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
{
|
||||
output.insert(
|
||||
Label::new("creation_date").unwrap(),
|
||||
info.creation_date
|
||||
.as_ref()
|
||||
.map(|d| PileValue::String(format_date(d).into()))
|
||||
.unwrap_or(PileValue::Null),
|
||||
);
|
||||
output.insert(
|
||||
Label::new("mod_date").unwrap(),
|
||||
info.mod_date
|
||||
.as_ref()
|
||||
.map(|d| PileValue::String(format_date(d).into()))
|
||||
.unwrap_or(PileValue::Null),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(self.output.get_or_init(|| output));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use pdf::content::{Op, TextDrawAdjusted};
|
||||
use pdf::file::FileOptions;
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, sync::OnceLock};
|
||||
use std::{collections::HashMap, io::BufReader, sync::OnceLock};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, SyncReadBridge, extract::Extractor};
|
||||
|
||||
pub struct PdfTextExtractor<'a> {
|
||||
item: &'a Item,
|
||||
@@ -24,22 +24,28 @@ impl<'a> PdfTextExtractor<'a> {
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let bytes = self.item.read().await?.read_to_end().await?;
|
||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||
let raw_text = tokio::task::spawn_blocking(move || {
|
||||
let mut bytes = Vec::new();
|
||||
std::io::Read::read_to_end(&mut BufReader::new(reader), &mut bytes)?;
|
||||
|
||||
let file = match FileOptions::cached().load(bytes) {
|
||||
Ok(x) => x,
|
||||
Err(pdf::PdfError::Io { source }) => return Err(source),
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
error.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let mut text_parts: Vec<String> = Vec::new();
|
||||
|
||||
for page in file.pages() {
|
||||
let page = page
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
|
||||
let page = page.map_err(|e| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
|
||||
})?;
|
||||
|
||||
if let Some(content) = &page.contents {
|
||||
let ops = content.operations(&file.resolver()).map_err(|e| {
|
||||
@@ -64,10 +70,24 @@ impl<'a> PdfTextExtractor<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
let text = text_parts.join(" ");
|
||||
Ok::<_, std::io::Error>(text_parts.join(" "))
|
||||
})
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
let raw_text = match raw_text {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
debug!(message = "Could not process pdf", ?error, key = ?self.item.key());
|
||||
return Ok(self.output.get_or_init(HashMap::new));
|
||||
}
|
||||
};
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let output = HashMap::from([(Label::new("text").unwrap(), PileValue::String(text.into()))]);
|
||||
let output = HashMap::from([(
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::String(raw_text.into()),
|
||||
)]);
|
||||
|
||||
let _ = self.output.set(output);
|
||||
#[expect(clippy::unwrap_used)]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use pile_config::Label;
|
||||
use std::{collections::HashMap, sync::OnceLock};
|
||||
|
||||
use crate::{Item, PileValue, extract::Extractor};
|
||||
use crate::{Item, PileValue, Reader, extract::Extractor};
|
||||
|
||||
fn toml_to_pile(value: toml::Value) -> PileValue<'static> {
|
||||
match value {
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{fs::File, io::Seek, path::PathBuf, sync::Arc};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::source::{DirDataSource, S3DataSource};
|
||||
|
||||
@@ -90,33 +96,26 @@ impl Item {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ItemReader {
|
||||
File(File),
|
||||
S3(S3Reader),
|
||||
}
|
||||
//
|
||||
// MARK: reader
|
||||
//
|
||||
|
||||
impl ItemReader {
|
||||
pub trait Reader: Send {
|
||||
/// Read a chunk of bytes.
|
||||
pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
match self {
|
||||
Self::File(x) => std::io::Read::read(x, buf),
|
||||
Self::S3(x) => x.read(buf).await,
|
||||
}
|
||||
}
|
||||
fn read(
|
||||
&mut self,
|
||||
buf: &mut [u8],
|
||||
) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
|
||||
|
||||
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
|
||||
|
||||
/// Read all remaining bytes into a `Vec`.
|
||||
pub async fn read_to_end(mut self) -> std::io::Result<Vec<u8>> {
|
||||
match self {
|
||||
Self::File(mut f) => {
|
||||
let mut buf = Vec::new();
|
||||
std::io::Read::read_to_end(&mut f, &mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
Self::S3(ref mut r) => {
|
||||
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
|
||||
async {
|
||||
let mut buf = Vec::new();
|
||||
let mut chunk = vec![0u8; 65536];
|
||||
loop {
|
||||
let n = r.read(&mut chunk).await?;
|
||||
let n = self.read(&mut chunk).await?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
@@ -125,12 +124,68 @@ impl ItemReader {
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: sync bridge
|
||||
//
|
||||
|
||||
/// Turn an async [Reader] into a sync [Read] + [Seek].
|
||||
///
|
||||
/// Never use this outside of [tokio::task::spawn_blocking],
|
||||
/// the async runtime will deadlock if this struct blocks
|
||||
/// the runtime.
|
||||
pub struct SyncReadBridge<R: Reader> {
|
||||
inner: R,
|
||||
handle: Handle,
|
||||
}
|
||||
|
||||
impl<R: Reader> SyncReadBridge<R> {
|
||||
/// Creates a new adapter using a handle to the current runtime.
|
||||
/// Panics if called outside of tokio
|
||||
pub fn new_current(inner: R) -> Self {
|
||||
Self::new(inner, Handle::current())
|
||||
}
|
||||
|
||||
pub fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
|
||||
/// Creates a new adapter using a handle to an existing runtime.
|
||||
pub fn new(inner: R, handle: Handle) -> Self {
|
||||
Self { inner, handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Reader> Read for SyncReadBridge<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
self.handle.block_on(self.inner.read(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Reader> Seek for SyncReadBridge<R> {
|
||||
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||
self.handle.block_on(self.inner.seek(pos))
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: itemreader
|
||||
//
|
||||
|
||||
pub enum ItemReader {
|
||||
File(File),
|
||||
S3(S3Reader),
|
||||
}
|
||||
|
||||
impl Reader for ItemReader {
|
||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
match self {
|
||||
Self::File(x) => std::io::Read::read(x, buf),
|
||||
Self::S3(x) => x.read(buf).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
|
||||
match self {
|
||||
Self::File(x) => x.seek(pos),
|
||||
Self::S3(x) => x.seek(pos),
|
||||
Self::S3(x) => x.seek(pos).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,8 +202,8 @@ pub struct S3Reader {
|
||||
size: u64,
|
||||
}
|
||||
|
||||
impl S3Reader {
|
||||
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
impl Reader for S3Reader {
|
||||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
let len_left = self.size.saturating_sub(self.cursor);
|
||||
if len_left == 0 || buf.is_empty() {
|
||||
return Ok(0);
|
||||
@@ -181,11 +236,11 @@ impl S3Reader {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
|
||||
match pos {
|
||||
std::io::SeekFrom::Start(x) => self.cursor = x.min(self.size),
|
||||
SeekFrom::Start(x) => self.cursor = x.min(self.size),
|
||||
|
||||
std::io::SeekFrom::Current(x) => {
|
||||
SeekFrom::Current(x) => {
|
||||
if x < 0 {
|
||||
let abs = x.unsigned_abs();
|
||||
if abs > self.cursor {
|
||||
|
||||
Reference in New Issue
Block a user