From 614d3273f0f7334ca1c5eb03e04d353193df32e6 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:22:43 -0700 Subject: [PATCH] Serve item bytes & fields --- crates/pile-dataset/src/dataset.rs | 25 ++++- crates/pile/src/command/probe.rs | 35 ++++-- crates/pile/src/command/serve/api.rs | 159 ++++++++++++++++++++++++++- 3 files changed, 203 insertions(+), 16 deletions(-) diff --git a/crates/pile-dataset/src/dataset.rs b/crates/pile-dataset/src/dataset.rs index 2ac2786..322a84e 100644 --- a/crates/pile-dataset/src/dataset.rs +++ b/crates/pile-dataset/src/dataset.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; -use pile_config::{ConfigToml, Label, Source}; +use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use serde_json::Value; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; @@ -9,7 +10,8 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::{debug, info, trace, warn}; use crate::{ - DataSource, Item, + DataSource, Item, PileValue, + extract::MetaExtractor, index::{DbFtsIndex, FtsLookupResult}, path_ts_earliest, source::{DirDataSource, S3DataSource}, @@ -170,6 +172,25 @@ impl Datasets { self.sources.get(source)?.get(key).await } + /// Extract a field from an item by object path. + /// Returns `None` if the item or field is not found. + pub async fn get_field( + &self, + source: &Label, + key: &str, + path: &ObjectPath, + ) -> Result, std::io::Error> { + let Some(item) = self.get(source, key).await else { + return Ok(None); + }; + let extractor = MetaExtractor::new(&item); + let root = PileValue::Extractor(Arc::new(extractor)); + let Some(value) = root.query(path).await? else { + return Ok(None); + }; + Ok(Some(value.to_json().await?)) + } + // // MARK: fts // diff --git a/crates/pile/src/command/probe.rs b/crates/pile/src/command/probe.rs index 3ed33d1..656676b 100644 --- a/crates/pile/src/command/probe.rs +++ b/crates/pile/src/command/probe.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; use clap::Args; -use pile_config::Label; +use pile_config::{Label, objectpath::ObjectPath}; use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use std::{path::PathBuf, sync::Arc}; @@ -15,6 +15,10 @@ pub struct ProbeCommand { /// Item key within the source key: String, + /// If present, extract a specific field + #[arg(long, short = 'p')] + path: Option, + /// Path to dataset config #[arg(long, short = 'c', default_value = "./pile.toml")] config: PathBuf, @@ -34,15 +38,28 @@ impl CliCmd for ProbeCommand { let ds = Datasets::open(&self.config) .with_context(|| format!("while opening dataset for {}", self.config.display()))?; - let item = ds.get(&source, &self.key).await.ok_or_else(|| { - anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) - })?; + let json = if let Some(path_str) = self.path { + let path: ObjectPath = path_str + .parse() + .with_context(|| format!("invalid path {path_str:?}"))?; - let value = PileValue::Extractor(Arc::new(MetaExtractor::new(&item))); - let json = value - .to_json() - .await - .with_context(|| format!("while extracting {}", self.key))?; + ds.get_field(&source, &self.key, &path) + .await + .with_context(|| format!("while extracting {}", self.key))? + .ok_or_else(|| { + anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) + })? + } else { + let item = ds.get(&source, &self.key).await.ok_or_else(|| { + anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) + })?; + + let value = PileValue::Extractor(Arc::new(MetaExtractor::new(&item))); + value + .to_json() + .await + .with_context(|| format!("while extracting {}", self.key))? + }; let json = serde_json::to_string_pretty(&json).unwrap(); println!("{json}"); diff --git a/crates/pile/src/command/serve/api.rs b/crates/pile/src/command/serve/api.rs index 9e0e59a..6b6ff90 100644 --- a/crates/pile/src/command/serve/api.rs +++ b/crates/pile/src/command/serve/api.rs @@ -1,12 +1,15 @@ use axum::{ Json, Router, - extract::{DefaultBodyLimit, State}, - http::StatusCode, + extract::{DefaultBodyLimit, Query, State}, + http::{StatusCode, header}, response::{IntoResponse, Response}, - routing::post, + routing::{get, post}, }; +use pile_config::{Label, objectpath::ObjectPath}; +use pile_dataset::{AsyncReader, PileValue, extract::MetaExtractor}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tracing::debug; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; @@ -15,8 +18,8 @@ use crate::command::serve::cli::ServeState; #[derive(OpenApi)] #[openapi( tags(), - paths(lookup), - components(schemas(LookupRequest, LookupResponse, LookupResult)) + paths(lookup, item_get, field_get), + components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery)) )] pub(super) struct Api; @@ -27,6 +30,8 @@ pub(super) fn router(state: Arc) -> Router<()> { Router::new() .route("/lookup", post(lookup)) + .route("/item", get(item_get)) + .route("/field", get(field_get)) .merge(docs) .with_state(state) .layer(DefaultBodyLimit::max(32 * 1024 * 1024)) @@ -69,6 +74,12 @@ pub struct LookupResult { ) )] async fn lookup(State(state): State>, Json(body): Json) -> Response { + debug!( + message = "Serving /lookup", + query = body.query, + limit = body.limit.unwrap_or(10) + ); + let results: Vec = match state.ds.fts_lookup(&body.query, body.limit.unwrap_or(10)) { Ok(x) => x @@ -87,3 +98,141 @@ async fn lookup(State(state): State>, Json(body): Json>, + Query(params): Query, +) -> Response { + debug!( + message = "Serving /item", + source = params.source, + key = params.key + ); + + let label = match Label::try_from(params.source) { + Ok(l) => l, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let Some(item) = state.ds.get(&label, ¶ms.key).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let mime = item.mime().to_string(); + + let mut reader = match item.read().await { + Ok(r) => r, + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }; + + match reader.read_to_end().await { + Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + } +} + +// +// MARK: field +// + +#[derive(Deserialize, ToSchema)] +struct FieldQuery { + source: String, + key: String, + path: String, +} + +/// Extract a specific field from an item's metadata +#[utoipa::path( + get, + path = "/field", + params( + ("source" = String, Query, description = "Source label"), + ("key" = String, Query, description = "Item key"), + ("path" = String, Query, description = "Object path (e.g. $.flac.title)"), + ), + responses( + (status = 200, description = "Field value as JSON"), + (status = 400, description = "Invalid source label or path"), + (status = 404, description = "Item or field not found"), + (status = 500, description = "Internal server error"), + ) +)] +async fn field_get( + State(state): State>, + Query(params): Query, +) -> Response { + debug!( + message = "Serving /field", + source = params.source, + key = params.key, + path = params.path, + ); + + let label = match Label::try_from(params.source) { + Ok(l) => l, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let path: ObjectPath = match params.path.parse() { + Ok(p) => p, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let Some(item) = state.ds.get(&label, ¶ms.key).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let extractor = MetaExtractor::new(&item); + let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor)); + + let value = match root.query(&path).await { + Ok(Some(v)) => v, + Ok(None) => return StatusCode::NOT_FOUND.into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }; + + match value { + PileValue::String(s) => ( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/plain")], + s.to_string(), + ) + .into_response(), + PileValue::Blob { mime, bytes } => ( + StatusCode::OK, + [(header::CONTENT_TYPE, mime.to_string())], + bytes.as_ref().clone(), + ) + .into_response(), + _ => match value.to_json().await { + Ok(json) => (StatusCode::OK, Json(json)).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }, + } +}