From 5b0953d2504b1a61bb924a22865fa60ff5dbae5e Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Tue, 24 Mar 2026 09:44:01 -0700 Subject: [PATCH] Add field endpoint --- crates/pile-client/src/lib.rs | 63 +++++++--- crates/pile-config/src/lib.rs | 9 -- crates/pile-dataset/src/serve/extract.rs | 150 +++++++++++++++++++++++ crates/pile-dataset/src/serve/field.rs | 53 +++----- crates/pile-dataset/src/serve/mod.rs | 7 +- crates/pile/src/command/server.rs | 2 +- 6 files changed, 224 insertions(+), 60 deletions(-) create mode 100644 crates/pile-dataset/src/serve/extract.rs diff --git a/crates/pile-client/src/lib.rs b/crates/pile-client/src/lib.rs index ee16c2a..110a58a 100644 --- a/crates/pile-client/src/lib.rs +++ b/crates/pile-client/src/lib.rs @@ -79,11 +79,9 @@ impl PileClient { /// `GET /datasets` — list all datasets served by this server. pub async fn list_datasets(&self) -> Result, ClientError> { - let resp = self - .client - .get(format!("{}/datasets", self.base_url)) - .send() - .await?; + let url = format!("{}/datasets", self.base_url); + trace!(url, "GET /datasets"); + let resp = self.client.get(url).send().await?; check_status(resp).await?.json().await.map_err(Into::into) } @@ -112,12 +110,9 @@ impl DatasetClient { limit, }; - let resp = self - .client - .post(format!("{}/lookup", self.base_url)) - .json(&body) - .send() - .await?; + let url = format!("{}/lookup", self.base_url); + trace!(url, "POST /lookup"); + let resp = self.client.post(url).json(&body).send().await?; check_status(resp).await?.json().await.map_err(Into::into) } @@ -130,9 +125,11 @@ impl DatasetClient { source: &str, key: &str, ) -> Result> + Send>>, ClientError> { + let url = format!("{}/item", self.base_url); + trace!(url, source, key, "GET /item"); let resp = self .client - .get(format!("{}/item", self.base_url)) + .get(url) .query(&[("source", source), ("key", key)]) .send() .await?; @@ -140,16 +137,18 @@ impl DatasetClient { Ok(Box::pin(check_status(resp).await?.bytes_stream())) } - /// `GET /field` — extract a field from an item by object path (e.g. `$.flac.title`). - pub async fn get_field( + /// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`). + pub async fn get_extract( &self, source: &str, key: &str, path: &str, ) -> Result { + let url = format!("{}/extract", self.base_url); + trace!(url, source, key, path, "GET /extract"); let resp = self .client - .get(format!("{}/field", self.base_url)) + .get(url) .query(&[("source", source), ("key", key), ("path", path)]) .send() .await?; @@ -168,15 +167,47 @@ impl DatasetClient { Ok(FieldResponse { content_type, data }) } + /// `GET /field` — get a field from an item's schema + pub async fn get_field( + &self, + source: &str, + key: &str, + field: &str, + ) -> Result { + let url = format!("{}/field", self.base_url); + trace!(url, source, key, field, "GET /field"); + let resp = self + .client + .get(url) + .query(&[("source", source), ("key", key), ("field", field)]) + .send() + .await?; + + let resp = check_status(resp).await?; + + let content_type = resp + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream") + .to_owned(); + + let data = resp.bytes().await?; + + Ok(FieldResponse { content_type, data }) + } + /// `GET /items` — paginate over all items in this dataset, ordered by (source, key). pub async fn list_items( &self, offset: usize, limit: usize, ) -> Result { + let url = format!("{}/items", self.base_url); + trace!(url, offset, limit, "GET /items"); let resp = self .client - .get(format!("{}/items", self.base_url)) + .get(url) .query(&[("offset", offset), ("limit", limit)]) .send() .await?; diff --git a/crates/pile-config/src/lib.rs b/crates/pile-config/src/lib.rs index e07eb8d..e633485 100644 --- a/crates/pile-config/src/lib.rs +++ b/crates/pile-config/src/lib.rs @@ -96,19 +96,10 @@ pub enum Source { #[derive(Debug, Clone, Deserialize)] pub struct FieldSpec { - /// The type of this field - pub r#type: FieldType, - /// How to find this field in a data entry pub path: Vec, } -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "lowercase")] -pub enum FieldType { - Text, -} - // // MARK: fts // diff --git a/crates/pile-dataset/src/serve/extract.rs b/crates/pile-dataset/src/serve/extract.rs new file mode 100644 index 0000000..718cfdd --- /dev/null +++ b/crates/pile-dataset/src/serve/extract.rs @@ -0,0 +1,150 @@ +use axum::{ + Json, + extract::{Query, RawQuery, State}, + http::{StatusCode, header}, + response::{IntoResponse, Response}, +}; +use pile_config::{Label, objectpath::ObjectPath}; +use pile_value::{extract::traits::ExtractState, value::PileValue}; +use serde::Deserialize; +use std::{sync::Arc, time::Instant}; +use tracing::debug; +use utoipa::ToSchema; + +use crate::Datasets; + +#[derive(Deserialize, ToSchema)] +pub struct ExtractQuery { + source: String, + key: String, + + #[serde(default)] + download: bool, +} + +/// Extract a specific field from an item's metadata. +/// Multiple `path` parameters may be provided; the first non-null result is returned. +#[utoipa::path( + get, + path = "/extract", + params( + ("source" = String, Query, description = "Source label"), + ("key" = String, Query, description = "Item key"), + ("path" = String, Query, description = "Object path (e.g. $.flac.title); repeat for fallbacks"), + ), + responses( + (status = 200, description = "Field value as JSON"), + (status = 400, description = "Invalid source label or path"), + (status = 404, description = "Item or field not found"), + (status = 500, description = "Internal server error"), + ) +)] +pub async fn get_extract( + State(state): State>, + Query(params): Query, + RawQuery(raw_query): RawQuery, +) -> Response { + let start = Instant::now(); + + let label = match Label::try_from(params.source.clone()) { + Ok(l) => l, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + // Collect all `path` query params in order (supports repeated ?path=...&path=...) + let raw = raw_query.as_deref().unwrap_or(""); + let paths: Vec = { + let mut result = Vec::new(); + for part in raw.split('&') { + if let Some((k, v)) = part.split_once('=') + && k == "path" + { + match v.parse::() { + Ok(p) => result.push(p), + Err(e) => { + return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(); + } + } + } + } + result + }; + + if paths.is_empty() { + return (StatusCode::BAD_REQUEST, "Missing `path` query parameter").into_response(); + } + + debug!( + message = "Serving /extract", + source = params.source, + key = params.key, + paths = paths.len(), + ); + + let Some(item) = state.get(&label, ¶ms.key).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let extract_state = ExtractState { ignore_mime: false }; + let item = PileValue::Item(item); + + // Try each path in order, returning the first non-null result + let mut value = None; + for path in &paths { + match item.query(&extract_state, path).await { + Ok(Some(PileValue::Null)) | Ok(None) => continue, + Ok(Some(v)) => { + value = Some(v); + break; + } + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + } + } + + let Some(value) = value else { + return StatusCode::NOT_FOUND.into_response(); + }; + + debug!( + message = "Served /extract", + source = params.source, + key = params.key, + time_ms = start.elapsed().as_millis() + ); + + let disposition = if params.download { + "attachment" + } else { + "inline" + }; + + match value { + PileValue::String(s) => ( + StatusCode::OK, + [ + (header::CONTENT_TYPE, "text/plain".to_owned()), + (header::CONTENT_DISPOSITION, disposition.to_owned()), + ], + s.to_string(), + ) + .into_response(), + PileValue::Blob { mime, bytes } => ( + StatusCode::OK, + [ + (header::CONTENT_TYPE, mime.to_string()), + (header::CONTENT_DISPOSITION, disposition.to_owned()), + ], + bytes.as_ref().clone(), + ) + .into_response(), + _ => match value.to_json(&extract_state).await { + Ok(json) => ( + StatusCode::OK, + [(header::CONTENT_DISPOSITION, disposition.to_owned())], + Json(json), + ) + .into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }, + } +} diff --git a/crates/pile-dataset/src/serve/field.rs b/crates/pile-dataset/src/serve/field.rs index 0eaac4d..cc8eb4a 100644 --- a/crates/pile-dataset/src/serve/field.rs +++ b/crates/pile-dataset/src/serve/field.rs @@ -1,35 +1,36 @@ use axum::{ Json, - extract::{Query, RawQuery, State}, + extract::{Query, State}, http::{StatusCode, header}, response::{IntoResponse, Response}, }; -use pile_config::{Label, objectpath::ObjectPath}; +use pile_config::Label; use pile_value::{extract::traits::ExtractState, value::PileValue}; use serde::Deserialize; use std::{sync::Arc, time::Instant}; use tracing::debug; -use utoipa::ToSchema; +use utoipa::{IntoParams, ToSchema}; use crate::Datasets; -#[derive(Deserialize, ToSchema)] +#[derive(Deserialize, ToSchema, IntoParams)] pub struct FieldQuery { source: String, key: String, + field: String, + #[serde(default)] download: bool, } /// Extract a specific field from an item's metadata. -/// Multiple `path` parameters may be provided; the first non-null result is returned. #[utoipa::path( get, path = "/field", params( ("source" = String, Query, description = "Source label"), ("key" = String, Query, description = "Item key"), - ("path" = String, Query, description = "Object path (e.g. $.flac.title); repeat for fallbacks"), + ("field" = String, Query, description = "Schema field"), ), responses( (status = 200, description = "Field value as JSON"), @@ -41,7 +42,6 @@ pub struct FieldQuery { pub async fn get_field( State(state): State>, Query(params): Query, - RawQuery(raw_query): RawQuery, ) -> Response { let start = Instant::now(); @@ -50,46 +50,32 @@ pub async fn get_field( Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), }; - // Collect all `path` query params in order (supports repeated ?path=...&path=...) - let raw = raw_query.as_deref().unwrap_or(""); - let paths: Vec = { - let mut result = Vec::new(); - for part in raw.split('&') { - if let Some((k, v)) = part.split_once('=') - && k == "path" - { - match v.parse::() { - Ok(p) => result.push(p), - Err(e) => { - return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(); - } - } - } - } - result - }; - - if paths.is_empty() { - return (StatusCode::BAD_REQUEST, "Missing `path` query parameter").into_response(); - } - debug!( message = "Serving /field", source = params.source, key = params.key, - paths = paths.len(), + field = params.field, ); let Some(item) = state.get(&label, ¶ms.key).await else { return StatusCode::NOT_FOUND.into_response(); }; + let field = match Label::new(¶ms.field) { + Some(x) => x, + None => return StatusCode::NOT_FOUND.into_response(), + }; + + let paths = match state.config.schema.get(&field) { + Some(x) => &x.path, + None => return StatusCode::NOT_FOUND.into_response(), + }; + let extract_state = ExtractState { ignore_mime: false }; let item = PileValue::Item(item); - // Try each path in order, returning the first non-null result let mut value = None; - for path in &paths { + for path in paths { match item.query(&extract_state, path).await { Ok(Some(PileValue::Null)) | Ok(None) => continue, Ok(Some(v)) => { @@ -108,6 +94,7 @@ pub async fn get_field( message = "Served /field", source = params.source, key = params.key, + field = params.field, time_ms = start.elapsed().as_millis() ); diff --git a/crates/pile-dataset/src/serve/mod.rs b/crates/pile-dataset/src/serve/mod.rs index d8e6f96..f1392b2 100644 --- a/crates/pile-dataset/src/serve/mod.rs +++ b/crates/pile-dataset/src/serve/mod.rs @@ -14,6 +14,9 @@ pub use lookup::*; mod item; pub use item::*; +mod extract; +pub use extract::*; + mod field; pub use field::*; @@ -23,12 +26,13 @@ pub use items::*; #[derive(OpenApi)] #[openapi( tags(), - paths(lookup, item_get, get_field, items_list), + paths(lookup, item_get, get_extract, items_list, get_field), components(schemas( LookupRequest, LookupResponse, LookupResult, ItemQuery, + ExtractQuery, FieldQuery, ItemsQuery, ItemsResponse, @@ -48,6 +52,7 @@ impl Datasets { let mut router = Router::new() .route("/lookup", post(lookup)) .route("/item", get(item_get)) + .route("/extract", get(get_extract)) .route("/field", get(get_field)) .route("/items", get(items_list)) .with_state(self.clone()); diff --git a/crates/pile/src/command/server.rs b/crates/pile/src/command/server.rs index 4f9b789..6754acd 100644 --- a/crates/pile/src/command/server.rs +++ b/crates/pile/src/command/server.rs @@ -173,7 +173,7 @@ pub struct ListDatasetsResponse { /// List all datasets served by this server #[utoipa::path( get, - path = "/list_datasets", + path = "/datasets", responses( (status = 200, description = "List of datasets"), (status = 500, description = "Internal server error"),