Extractor rewrite

This commit is contained in:
2026-03-11 10:12:36 -07:00
parent b789255ea9
commit 078801be40
51 changed files with 676 additions and 693 deletions

View File

@@ -0,0 +1,105 @@
use mime::Mime;
use smartstring::{LazyCompact, SmartString};
use std::{fs::File, path::PathBuf, sync::Arc};
use crate::{
source::{DirDataSource, S3DataSource},
value::{ItemReader, S3Reader},
};
//
// MARK: item
//
/// A cheaply-cloneable pointer to an item in a dataset
#[derive(Debug, Clone)]
pub enum Item {
File {
source: Arc<DirDataSource>,
mime: Mime,
path: PathBuf,
sidecar: Option<Box<Item>>,
},
S3 {
source: Arc<S3DataSource>,
mime: Mime,
key: SmartString<LazyCompact>,
sidecar: Option<Box<Item>>,
},
}
impl Item {
/// Open the item for reading. For S3, performs a HEAD request to determine
/// the object size.
pub async fn read(&self) -> Result<ItemReader, std::io::Error> {
Ok(match self {
Self::File { path, .. } => ItemReader::File(File::open(path)?),
Self::S3 { source, key, .. } => {
let head = source
.client
.head_object()
.bucket(source.bucket.as_str())
.key(key.as_str())
.send()
.await
.map_err(std::io::Error::other)?;
let size = head.content_length().unwrap_or(0) as u64;
ItemReader::S3(S3Reader {
client: source.client.clone(),
bucket: source.bucket.clone(),
key: key.to_owned(),
cursor: 0,
size,
})
}
})
}
pub fn source_name(&self) -> &pile_config::Label {
match self {
Self::File { source, .. } => &source.name,
Self::S3 { source, .. } => &source.name,
}
}
#[expect(clippy::expect_used)]
pub fn key(&self) -> SmartString<LazyCompact> {
match self {
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
Self::S3 { key, .. } => key.clone(),
}
}
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
match self {
Self::File { path, .. } => {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
std::io::copy(&mut file, &mut hasher)?;
return Ok(hasher.finalize());
}
Self::S3 { .. } => todo!(),
}
}
pub fn mime(&self) -> &Mime {
match self {
Self::File { mime, .. } => mime,
Self::S3 { mime, .. } => mime,
}
}
pub fn sidecar(&self) -> Option<&Self> {
match self {
Self::File { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
Self::S3 { sidecar, .. } => sidecar.as_ref().map(|x| &**x),
}
}
}

View File

@@ -0,0 +1,9 @@
mod item;
pub use item::*;
mod readers;
pub use readers::*;
#[expect(clippy::module_inception)]
mod value;
pub use value::*;

View File

@@ -0,0 +1,193 @@
use smartstring::{LazyCompact, SmartString};
use std::{
fs::File,
io::{Read, Seek, SeekFrom},
sync::Arc,
};
use tokio::runtime::Handle;
//
// MARK: traits
//
pub trait AsyncReader: Send {
/// Read a chunk of bytes.
fn read(
&mut self,
buf: &mut [u8],
) -> impl Future<Output = Result<usize, std::io::Error>> + Send;
/// Read all remaining bytes into a `Vec`.
fn read_to_end(&mut self) -> impl Future<Output = Result<Vec<u8>, std::io::Error>> + Send {
async {
let mut buf = Vec::new();
let mut chunk = vec![0u8; 65536];
loop {
let n = self.read(&mut chunk).await?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
}
Ok(buf)
}
}
}
pub trait AsyncSeekReader: AsyncReader {
fn seek(&mut self, pos: SeekFrom) -> impl Future<Output = Result<u64, std::io::Error>> + Send;
}
//
// MARK: sync bridge
//
/// Turn an async [Reader] into a sync [Read] + [Seek].
///
/// Never use this outside of [tokio::task::spawn_blocking],
/// the async runtime will deadlock if this struct blocks
/// the runtime.
pub struct SyncReadBridge<R: AsyncReader> {
inner: R,
handle: Handle,
}
impl<R: AsyncReader> SyncReadBridge<R> {
/// Creates a new adapter using a handle to the current runtime.
/// Panics if called outside of tokio
pub fn new_current(inner: R) -> Self {
Self::new(inner, Handle::current())
}
/// Creates a new adapter using a handle to an existing runtime.
pub fn new(inner: R, handle: Handle) -> Self {
Self { inner, handle }
}
}
impl<R: AsyncReader> Read for SyncReadBridge<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.handle.block_on(self.inner.read(buf))
}
}
impl<R: AsyncReader + AsyncSeekReader> Seek for SyncReadBridge<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.handle.block_on(self.inner.seek(pos))
}
}
//
// MARK: itemreader
//
pub enum ItemReader {
File(File),
S3(S3Reader),
}
impl AsyncReader for ItemReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self {
Self::File(x) => std::io::Read::read(x, buf),
Self::S3(x) => x.read(buf).await,
}
}
}
impl AsyncSeekReader for ItemReader {
async fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match self {
Self::File(x) => x.seek(pos),
Self::S3(x) => x.seek(pos).await,
}
}
}
//
// MARK: S3Reader
//
pub struct S3Reader {
pub client: Arc<aws_sdk_s3::Client>,
pub bucket: SmartString<LazyCompact>,
pub key: SmartString<LazyCompact>,
pub cursor: u64,
pub size: u64,
}
impl AsyncReader for S3Reader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len_left = self.size.saturating_sub(self.cursor);
if len_left == 0 || buf.is_empty() {
return Ok(0);
}
let start_byte = self.cursor;
let len_to_read = (buf.len() as u64).min(len_left);
let end_byte = start_byte + len_to_read - 1;
let resp = self
.client
.get_object()
.bucket(self.bucket.as_str())
.key(self.key.as_str())
.range(format!("bytes={start_byte}-{end_byte}"))
.send()
.await
.map_err(std::io::Error::other)?;
let bytes = resp
.body
.collect()
.await
.map(|x| x.into_bytes())
.map_err(std::io::Error::other)?;
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
self.cursor += n as u64;
Ok(n)
}
}
impl AsyncSeekReader for S3Reader {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
match pos {
SeekFrom::Start(x) => self.cursor = x.min(self.size),
SeekFrom::Current(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.cursor {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor -= abs;
} else {
self.cursor += x as u64;
}
}
std::io::SeekFrom::End(x) => {
if x < 0 {
let abs = x.unsigned_abs();
if abs > self.size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek past start",
));
}
self.cursor = self.size - abs;
} else {
self.cursor = self.size + x as u64;
}
}
}
self.cursor = self.cursor.min(self.size);
Ok(self.cursor)
}
}

View File

@@ -0,0 +1,229 @@
use mime::Mime;
use pile_config::objectpath::{ObjectPath, PathSegment};
use serde_json::{Map, Value};
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use crate::{
extract::{
item::ItemExtractor,
misc::{ArrayExtractor, MapExtractor, VecExtractor},
string::StringExtractor,
traits::{ListExtractor, ObjectExtractor},
},
value::Item,
};
/// An immutable, cheaply-cloneable, lazily-computed value.
/// Very similar to [serde_json::Value].
pub enum PileValue {
Null,
U64(u64),
I64(i64),
/// A string
String(Arc<SmartString<LazyCompact>>),
/// An array of values
Array(Arc<Vec<PileValue>>),
/// A binary blob
Blob {
mime: Mime,
bytes: Arc<Vec<u8>>,
},
/// A lazily-computed map of {label: value}
ObjectExtractor(Arc<dyn ObjectExtractor>),
/// A lazily-computed array
ListExtractor(Arc<dyn ListExtractor>),
/// An pointer to an item in this dataset
Item(Item),
}
impl Clone for PileValue {
fn clone(&self) -> Self {
match self {
Self::Null => Self::Null,
Self::U64(x) => Self::U64(*x),
Self::I64(x) => Self::I64(*x),
Self::String(x) => Self::String(x.clone()),
Self::Array(x) => Self::Array(x.clone()),
Self::ObjectExtractor(x) => Self::ObjectExtractor(x.clone()),
Self::ListExtractor(x) => Self::ListExtractor(x.clone()),
Self::Blob { mime, bytes } => Self::Blob {
mime: mime.clone(),
bytes: bytes.clone(),
},
Self::Item(i) => Self::Item(i.clone()),
}
}
}
impl PileValue {
pub fn object_extractor(&self) -> Arc<dyn ObjectExtractor> {
match self {
Self::Null => Arc::new(MapExtractor::default()),
Self::U64(_) => Arc::new(MapExtractor::default()),
Self::I64(_) => Arc::new(MapExtractor::default()),
Self::Array(_) => Arc::new(MapExtractor::default()),
Self::String(s) => Arc::new(StringExtractor::new(s)),
Self::Blob { .. } => Arc::new(MapExtractor::default()),
Self::ListExtractor(_) => Arc::new(MapExtractor::default()),
Self::ObjectExtractor(e) => e.clone(),
Self::Item(i) => Arc::new(ItemExtractor::new(i)),
}
}
pub fn list_extractor(&self) -> Arc<dyn ListExtractor> {
match self {
Self::Null => Arc::new(VecExtractor::default()),
Self::U64(_) => Arc::new(VecExtractor::default()),
Self::I64(_) => Arc::new(VecExtractor::default()),
Self::Array(a) => Arc::new(ArrayExtractor::new(a.clone())),
Self::String(_) => Arc::new(VecExtractor::default()),
Self::Blob { .. } => Arc::new(VecExtractor::default()),
Self::ListExtractor(e) => e.clone(),
Self::ObjectExtractor(_) => Arc::new(VecExtractor::default()),
Self::Item(_) => Arc::new(VecExtractor::default()),
}
}
pub async fn query(&self, query: &ObjectPath) -> Result<Option<Self>, std::io::Error> {
let mut out: Option<PileValue> = Some(self.clone());
for s in &query.segments {
match s {
PathSegment::Root => out = Some(self.clone()),
PathSegment::Field(field) => {
let e = match out.map(|x| x.object_extractor()) {
Some(e) => e,
None => {
out = None;
continue;
}
};
out = e.field(field).await?;
}
PathSegment::Index(idx) => {
let e = match out.map(|x| x.list_extractor()) {
Some(e) => e,
None => {
out = None;
continue;
}
};
let idx = if *idx >= 0 {
usize::try_from(*idx).ok()
} else {
usize::try_from(e.len().await? as i64 - idx).ok()
};
let idx = match idx {
Some(idx) => idx,
None => {
out = None;
continue;
}
};
out = e.get(idx).await?;
}
}
}
return Ok(out.clone());
}
/// Like `to_json`, but counts populated fields instead of collecting values.
///
/// - Leaf values (non-null scalars, arrays, blobs) contribute `Some(1)`.
/// - `Null` contributes `None`.
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
/// only the fields that had data, or `None` if all fields were absent.
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
pub async fn count_fields(&self) -> Result<Option<Value>, std::io::Error> {
Ok(match self {
Self::Null => None,
Self::U64(_) | Self::I64(_) | Self::String(_) | Self::Blob { .. } => {
Some(Value::Number(1u64.into()))
}
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
Self::ObjectExtractor(_) | Self::Item(_) => {
let e = self.object_extractor();
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
Some(x) => x,
None => continue,
};
if let Some(counted) = Box::pin(v.count_fields()).await? {
map.insert(k.to_string(), counted);
}
}
if map.is_empty() {
None
} else {
Some(Value::Object(map))
}
}
})
}
pub fn as_str(&self) -> Option<&str> {
match self {
Self::String(x) => Some(x),
_ => None,
}
}
pub async fn to_json(&self) -> Result<Value, std::io::Error> {
Ok(match self {
Self::Null => Value::Null,
Self::U64(x) => Value::Number((*x).into()),
Self::I64(x) => Value::Number((*x).into()),
Self::String(x) => Value::String(x.to_string()),
// TODO: replace with something meaningful?
Self::Blob { mime, bytes } => {
Value::String(format!("<Blob ({mime}, {} bytes)>", bytes.len()))
}
#[expect(clippy::expect_used)]
Self::Array(_) | Self::ListExtractor(_) => {
let e = self.list_extractor();
let len = e.len().await?;
let mut arr = Vec::new();
for i in 0..len {
let v = e.get(i).await?.expect("item must be present");
arr.push(Box::pin(v.to_json()).await?);
}
Value::Array(arr)
}
Self::ObjectExtractor(_) | Self::Item(_) => {
let e = self.object_extractor();
let keys = e.fields().await?;
let mut map = Map::new();
for k in &keys {
let v = match e.field(k).await? {
Some(x) => x,
None => continue,
};
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
}
Value::Object(map)
}
})
}
}