From 20dc30ea182de606aa35242192ca4c8f4eae8955 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 10 Mar 2026 09:18:26 -0700 Subject: [PATCH] Read files incrementally --- .../src/extract/epub/epub_meta.rs | 58 ++++++--- .../src/extract/epub/epub_text.rs | 52 +++++--- crates/pile-dataset/src/extract/exif.rs | 42 ++++-- crates/pile-dataset/src/extract/flac.rs | 40 +++--- crates/pile-dataset/src/extract/id3.rs | 44 +++---- .../pile-dataset/src/extract/pdf/pdf_meta.rs | 103 ++++++++------- .../pile-dataset/src/extract/pdf/pdf_text.rs | 102 +++++++++------ crates/pile-dataset/src/extract/toml.rs | 2 +- crates/pile-dataset/src/item.rs | 123 +++++++++++++----- 9 files changed, 352 insertions(+), 214 deletions(-) diff --git a/crates/pile-dataset/src/extract/epub/epub_meta.rs b/crates/pile-dataset/src/extract/epub/epub_meta.rs index 4681a67..d6edfa6 100644 --- a/crates/pile-dataset/src/extract/epub/epub_meta.rs +++ b/crates/pile-dataset/src/extract/epub/epub_meta.rs @@ -1,9 +1,9 @@ use epub::doc::EpubDoc; use pile_config::Label; -use std::{collections::HashMap, io::Cursor, sync::OnceLock}; -use tracing::debug; +use std::{collections::HashMap, sync::OnceLock}; +use tracing::trace; -use crate::{Item, PileValue, extract::Extractor}; +use crate::{Item, PileValue, SyncReadBridge, extract::Extractor}; pub struct EpubMetaExtractor<'a> { item: &'a Item, @@ -23,34 +23,50 @@ impl<'a> EpubMetaExtractor<'a> { return Ok(x); } - let bytes = self.item.read().await?.read_to_end().await?; + let key = self.item.key(); + let ext = key.as_str().rsplit('.').next(); + if !matches!(ext, Some("epub")) { + return Ok(self.output.get_or_init(HashMap::new)); + } - let cursor = Cursor::new(bytes); - let doc = match EpubDoc::from_reader(cursor) { + let reader = SyncReadBridge::new_current(self.item.read().await?); + let raw_meta = tokio::task::spawn_blocking(move || { + let doc = EpubDoc::from_reader(reader) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; + + let fields: &[&'static str] = &[ + "title", + "creator", + "description", + "language", + "publisher", + "date", + "subject", + "identifier", + ]; + + let meta: Vec<(&'static str, Option)> = + fields.iter().map(|&key| (key, doc.mdata(key))).collect(); + + Ok::<_, std::io::Error>(meta) + }) + .await + .map_err(std::io::Error::other)?; + + let raw_meta = match raw_meta { Ok(x) => x, Err(error) => { - debug!(message = "Could not process epub", ?error, key = ?self.item.key()); + trace!(message = "Could not process epub", ?error, key = ?self.item.key()); return Ok(self.output.get_or_init(HashMap::new)); } }; let mut output: HashMap> = HashMap::new(); - let fields = &[ - "title", - "creator", - "description", - "language", - "publisher", - "date", - "subject", - "identifier", - ]; - #[expect(clippy::unwrap_used)] - for key in fields { - let label = Label::new(*key).unwrap(); - let value = match doc.mdata(key) { + for (key, val) in raw_meta { + let label = Label::new(key).unwrap(); + let value = match val { Some(s) => PileValue::String(s.into()), None => PileValue::Null, }; diff --git a/crates/pile-dataset/src/extract/epub/epub_text.rs b/crates/pile-dataset/src/extract/epub/epub_text.rs index 8d91c8b..24dbf1a 100644 --- a/crates/pile-dataset/src/extract/epub/epub_text.rs +++ b/crates/pile-dataset/src/extract/epub/epub_text.rs @@ -1,9 +1,9 @@ use epub::doc::EpubDoc; use pile_config::Label; -use std::{collections::HashMap, io::Cursor, sync::OnceLock}; +use std::{collections::HashMap, sync::OnceLock}; use tracing::debug; -use crate::{Item, PileValue, extract::Extractor}; +use crate::{Item, PileValue, SyncReadBridge, extract::Extractor}; pub struct EpubTextExtractor<'a> { item: &'a Item, @@ -23,10 +23,34 @@ impl<'a> EpubTextExtractor<'a> { return Ok(x); } - let bytes = self.item.read().await?.read_to_end().await?; + let key = self.item.key(); + let ext = key.as_str().rsplit('.').next(); + if !matches!(ext, Some("epub")) { + return Ok(self.output.get_or_init(HashMap::new)); + } - let cursor = Cursor::new(bytes); - let mut doc = match EpubDoc::from_reader(cursor) { + let reader = SyncReadBridge::new_current(self.item.read().await?); + let raw_text = tokio::task::spawn_blocking(move || { + let mut doc = EpubDoc::from_reader(reader) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; + + let mut text_parts: Vec = Vec::new(); + + loop { + if let Ok(content) = doc.get_current_str() { + text_parts.push(strip_html(&content)); + } + if doc.go_next().is_err() { + break; + } + } + + Ok::<_, std::io::Error>(text_parts.join(" ")) + }) + .await + .map_err(std::io::Error::other)?; + + let raw_text = match raw_text { Ok(x) => x, Err(error) => { debug!(message = "Could not process epub", ?error, key = ?self.item.key()); @@ -34,21 +58,11 @@ impl<'a> EpubTextExtractor<'a> { } }; - let mut text_parts: Vec = Vec::new(); - - loop { - if let Ok(content) = doc.get_current_str() { - text_parts.push(strip_html(&content)); - } - if doc.go_next().is_err() { - break; - } - } - - let text = text_parts.join(" "); - #[expect(clippy::unwrap_used)] - let output = HashMap::from([(Label::new("text").unwrap(), PileValue::String(text.into()))]); + let output = HashMap::from([( + Label::new("text").unwrap(), + PileValue::String(raw_text.into()), + )]); let _ = self.output.set(output); #[expect(clippy::unwrap_used)] diff --git a/crates/pile-dataset/src/extract/exif.rs b/crates/pile-dataset/src/extract/exif.rs index d879b78..d2166f2 100644 --- a/crates/pile-dataset/src/extract/exif.rs +++ b/crates/pile-dataset/src/extract/exif.rs @@ -1,8 +1,8 @@ use pile_config::Label; -use std::{collections::HashMap, io::Cursor, sync::OnceLock}; +use std::{collections::HashMap, io::BufReader, sync::OnceLock}; use tracing::debug; -use crate::{Item, PileValue, extract::Extractor}; +use crate::{Item, PileValue, SyncReadBridge, extract::Extractor}; pub struct ExifExtractor<'a> { item: &'a Item, @@ -22,10 +22,29 @@ impl<'a> ExifExtractor<'a> { return Ok(x); } - let bytes = self.item.read().await?.read_to_end().await?; - let mut cursor = Cursor::new(bytes); + let reader = SyncReadBridge::new_current(self.item.read().await?); + let raw_fields = tokio::task::spawn_blocking(move || { + let mut br = BufReader::new(reader); + let exif = exif::Reader::new() + .read_from_container(&mut br) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - let exif = match exif::Reader::new().read_from_container(&mut cursor) { + let fields: Vec<(String, String)> = exif + .fields() + .map(|f| { + ( + f.tag.to_string(), + f.display_value().with_unit(&exif).to_string(), + ) + }) + .collect(); + + Ok::<_, std::io::Error>(fields) + }) + .await + .map_err(std::io::Error::other)?; + + let raw_fields = match raw_fields { Ok(x) => x, Err(error) => { debug!(message = "Could not process exif", ?error, key = ?self.item.key()); @@ -35,23 +54,22 @@ impl<'a> ExifExtractor<'a> { let mut output: HashMap> = HashMap::new(); - for field in exif.fields() { - let Some(label) = tag_to_label(&field.tag) else { + for (tag_name, value) in raw_fields { + let Some(label) = tag_to_label(&tag_name) else { continue; }; // First occurrence wins (PRIMARY IFD comes before THUMBNAIL) - output.entry(label).or_insert_with(|| { - PileValue::String(field.display_value().with_unit(&exif).to_string().into()) - }); + output + .entry(label) + .or_insert_with(|| PileValue::String(value.into())); } return Ok(self.output.get_or_init(|| output)); } } -fn tag_to_label(tag: &exif::Tag) -> Option