Add field endpoint
Some checks failed
CI / Typos (push) Successful in 17s
CI / Build and test (push) Failing after 2m37s
CI / Clippy (push) Successful in 3m29s
CI / Build and test (all features) (push) Has been cancelled

This commit is contained in:
2026-03-24 09:44:01 -07:00
parent f7ea25f059
commit 5b0953d250
6 changed files with 224 additions and 60 deletions

View File

@@ -79,11 +79,9 @@ impl PileClient {
/// `GET /datasets` — list all datasets served by this server. /// `GET /datasets` — list all datasets served by this server.
pub async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, ClientError> { pub async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, ClientError> {
let resp = self let url = format!("{}/datasets", self.base_url);
.client trace!(url, "GET /datasets");
.get(format!("{}/datasets", self.base_url)) let resp = self.client.get(url).send().await?;
.send()
.await?;
check_status(resp).await?.json().await.map_err(Into::into) check_status(resp).await?.json().await.map_err(Into::into)
} }
@@ -112,12 +110,9 @@ impl DatasetClient {
limit, limit,
}; };
let resp = self let url = format!("{}/lookup", self.base_url);
.client trace!(url, "POST /lookup");
.post(format!("{}/lookup", self.base_url)) let resp = self.client.post(url).json(&body).send().await?;
.json(&body)
.send()
.await?;
check_status(resp).await?.json().await.map_err(Into::into) check_status(resp).await?.json().await.map_err(Into::into)
} }
@@ -130,9 +125,11 @@ impl DatasetClient {
source: &str, source: &str,
key: &str, key: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> { ) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
let url = format!("{}/item", self.base_url);
trace!(url, source, key, "GET /item");
let resp = self let resp = self
.client .client
.get(format!("{}/item", self.base_url)) .get(url)
.query(&[("source", source), ("key", key)]) .query(&[("source", source), ("key", key)])
.send() .send()
.await?; .await?;
@@ -140,16 +137,18 @@ impl DatasetClient {
Ok(Box::pin(check_status(resp).await?.bytes_stream())) Ok(Box::pin(check_status(resp).await?.bytes_stream()))
} }
/// `GET /field` — extract a field from an item by object path (e.g. `$.flac.title`). /// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
pub async fn get_field( pub async fn get_extract(
&self, &self,
source: &str, source: &str,
key: &str, key: &str,
path: &str, path: &str,
) -> Result<FieldResponse, ClientError> { ) -> Result<FieldResponse, ClientError> {
let url = format!("{}/extract", self.base_url);
trace!(url, source, key, path, "GET /extract");
let resp = self let resp = self
.client .client
.get(format!("{}/field", self.base_url)) .get(url)
.query(&[("source", source), ("key", key), ("path", path)]) .query(&[("source", source), ("key", key), ("path", path)])
.send() .send()
.await?; .await?;
@@ -168,15 +167,47 @@ impl DatasetClient {
Ok(FieldResponse { content_type, data }) Ok(FieldResponse { content_type, data })
} }
/// `GET /field` — get a field from an item's schema
pub async fn get_field(
&self,
source: &str,
key: &str,
field: &str,
) -> Result<FieldResponse, ClientError> {
let url = format!("{}/field", self.base_url);
trace!(url, source, key, field, "GET /field");
let resp = self
.client
.get(url)
.query(&[("source", source), ("key", key), ("field", field)])
.send()
.await?;
let resp = check_status(resp).await?;
let content_type = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_owned();
let data = resp.bytes().await?;
Ok(FieldResponse { content_type, data })
}
/// `GET /items` — paginate over all items in this dataset, ordered by (source, key). /// `GET /items` — paginate over all items in this dataset, ordered by (source, key).
pub async fn list_items( pub async fn list_items(
&self, &self,
offset: usize, offset: usize,
limit: usize, limit: usize,
) -> Result<ItemsResponse, ClientError> { ) -> Result<ItemsResponse, ClientError> {
let url = format!("{}/items", self.base_url);
trace!(url, offset, limit, "GET /items");
let resp = self let resp = self
.client .client
.get(format!("{}/items", self.base_url)) .get(url)
.query(&[("offset", offset), ("limit", limit)]) .query(&[("offset", offset), ("limit", limit)])
.send() .send()
.await?; .await?;

View File

@@ -96,19 +96,10 @@ pub enum Source {
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct FieldSpec { pub struct FieldSpec {
/// The type of this field
pub r#type: FieldType,
/// How to find this field in a data entry /// How to find this field in a data entry
pub path: Vec<ObjectPath>, pub path: Vec<ObjectPath>,
} }
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum FieldType {
Text,
}
// //
// MARK: fts // MARK: fts
// //

View File

@@ -0,0 +1,150 @@
use axum::{
Json,
extract::{Query, RawQuery, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
};
use pile_config::{Label, objectpath::ObjectPath};
use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Deserialize, ToSchema)]
pub struct ExtractQuery {
source: String,
key: String,
#[serde(default)]
download: bool,
}
/// Extract a specific field from an item's metadata.
/// Multiple `path` parameters may be provided; the first non-null result is returned.
#[utoipa::path(
get,
path = "/extract",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title); repeat for fallbacks"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn get_extract(
State(state): State<Arc<Datasets>>,
Query(params): Query<ExtractQuery>,
RawQuery(raw_query): RawQuery,
) -> Response {
let start = Instant::now();
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
// Collect all `path` query params in order (supports repeated ?path=...&path=...)
let raw = raw_query.as_deref().unwrap_or("");
let paths: Vec<ObjectPath> = {
let mut result = Vec::new();
for part in raw.split('&') {
if let Some((k, v)) = part.split_once('=')
&& k == "path"
{
match v.parse::<ObjectPath>() {
Ok(p) => result.push(p),
Err(e) => {
return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response();
}
}
}
}
result
};
if paths.is_empty() {
return (StatusCode::BAD_REQUEST, "Missing `path` query parameter").into_response();
}
debug!(
message = "Serving /extract",
source = params.source,
key = params.key,
paths = paths.len(),
);
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item);
// Try each path in order, returning the first non-null result
let mut value = None;
for path in &paths {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return StatusCode::NOT_FOUND.into_response();
};
debug!(
message = "Served /extract",
source = params.source,
key = params.key,
time_ms = start.elapsed().as_millis()
);
let disposition = if params.download {
"attachment"
} else {
"inline"
};
match value {
PileValue::String(s) => (
StatusCode::OK,
[
(header::CONTENT_TYPE, "text/plain".to_owned()),
(header::CONTENT_DISPOSITION, disposition.to_owned()),
],
s.to_string(),
)
.into_response(),
PileValue::Blob { mime, bytes } => (
StatusCode::OK,
[
(header::CONTENT_TYPE, mime.to_string()),
(header::CONTENT_DISPOSITION, disposition.to_owned()),
],
bytes.as_ref().clone(),
)
.into_response(),
_ => match value.to_json(&extract_state).await {
Ok(json) => (
StatusCode::OK,
[(header::CONTENT_DISPOSITION, disposition.to_owned())],
Json(json),
)
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}

View File

@@ -1,35 +1,36 @@
use axum::{ use axum::{
Json, Json,
extract::{Query, RawQuery, State}, extract::{Query, State},
http::{StatusCode, header}, http::{StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_config::{Label, objectpath::ObjectPath}; use pile_config::Label;
use pile_value::{extract::traits::ExtractState, value::PileValue}; use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::Deserialize; use serde::Deserialize;
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use tracing::debug; use tracing::debug;
use utoipa::ToSchema; use utoipa::{IntoParams, ToSchema};
use crate::Datasets; use crate::Datasets;
#[derive(Deserialize, ToSchema)] #[derive(Deserialize, ToSchema, IntoParams)]
pub struct FieldQuery { pub struct FieldQuery {
source: String, source: String,
key: String, key: String,
field: String,
#[serde(default)] #[serde(default)]
download: bool, download: bool,
} }
/// Extract a specific field from an item's metadata. /// Extract a specific field from an item's metadata.
/// Multiple `path` parameters may be provided; the first non-null result is returned.
#[utoipa::path( #[utoipa::path(
get, get,
path = "/field", path = "/field",
params( params(
("source" = String, Query, description = "Source label"), ("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"), ("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title); repeat for fallbacks"), ("field" = String, Query, description = "Schema field"),
), ),
responses( responses(
(status = 200, description = "Field value as JSON"), (status = 200, description = "Field value as JSON"),
@@ -41,7 +42,6 @@ pub struct FieldQuery {
pub async fn get_field( pub async fn get_field(
State(state): State<Arc<Datasets>>, State(state): State<Arc<Datasets>>,
Query(params): Query<FieldQuery>, Query(params): Query<FieldQuery>,
RawQuery(raw_query): RawQuery,
) -> Response { ) -> Response {
let start = Instant::now(); let start = Instant::now();
@@ -50,46 +50,32 @@ pub async fn get_field(
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
}; };
// Collect all `path` query params in order (supports repeated ?path=...&path=...)
let raw = raw_query.as_deref().unwrap_or("");
let paths: Vec<ObjectPath> = {
let mut result = Vec::new();
for part in raw.split('&') {
if let Some((k, v)) = part.split_once('=')
&& k == "path"
{
match v.parse::<ObjectPath>() {
Ok(p) => result.push(p),
Err(e) => {
return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response();
}
}
}
}
result
};
if paths.is_empty() {
return (StatusCode::BAD_REQUEST, "Missing `path` query parameter").into_response();
}
debug!( debug!(
message = "Serving /field", message = "Serving /field",
source = params.source, source = params.source,
key = params.key, key = params.key,
paths = paths.len(), field = params.field,
); );
let Some(item) = state.get(&label, &params.key).await else { let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response(); return StatusCode::NOT_FOUND.into_response();
}; };
let field = match Label::new(&params.field) {
Some(x) => x,
None => return StatusCode::NOT_FOUND.into_response(),
};
let paths = match state.config.schema.get(&field) {
Some(x) => &x.path,
None => return StatusCode::NOT_FOUND.into_response(),
};
let extract_state = ExtractState { ignore_mime: false }; let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item); let item = PileValue::Item(item);
// Try each path in order, returning the first non-null result
let mut value = None; let mut value = None;
for path in &paths { for path in paths {
match item.query(&extract_state, path).await { match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue, Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(v)) => { Ok(Some(v)) => {
@@ -108,6 +94,7 @@ pub async fn get_field(
message = "Served /field", message = "Served /field",
source = params.source, source = params.source,
key = params.key, key = params.key,
field = params.field,
time_ms = start.elapsed().as_millis() time_ms = start.elapsed().as_millis()
); );

View File

@@ -14,6 +14,9 @@ pub use lookup::*;
mod item; mod item;
pub use item::*; pub use item::*;
mod extract;
pub use extract::*;
mod field; mod field;
pub use field::*; pub use field::*;
@@ -23,12 +26,13 @@ pub use items::*;
#[derive(OpenApi)] #[derive(OpenApi)]
#[openapi( #[openapi(
tags(), tags(),
paths(lookup, item_get, get_field, items_list), paths(lookup, item_get, get_extract, items_list, get_field),
components(schemas( components(schemas(
LookupRequest, LookupRequest,
LookupResponse, LookupResponse,
LookupResult, LookupResult,
ItemQuery, ItemQuery,
ExtractQuery,
FieldQuery, FieldQuery,
ItemsQuery, ItemsQuery,
ItemsResponse, ItemsResponse,
@@ -48,6 +52,7 @@ impl Datasets {
let mut router = Router::new() let mut router = Router::new()
.route("/lookup", post(lookup)) .route("/lookup", post(lookup))
.route("/item", get(item_get)) .route("/item", get(item_get))
.route("/extract", get(get_extract))
.route("/field", get(get_field)) .route("/field", get(get_field))
.route("/items", get(items_list)) .route("/items", get(items_list))
.with_state(self.clone()); .with_state(self.clone());

View File

@@ -173,7 +173,7 @@ pub struct ListDatasetsResponse {
/// List all datasets served by this server /// List all datasets served by this server
#[utoipa::path( #[utoipa::path(
get, get,
path = "/list_datasets", path = "/datasets",
responses( responses(
(status = 200, description = "List of datasets"), (status = 200, description = "List of datasets"),
(status = 500, description = "Internal server error"), (status = 500, description = "Internal server error"),