Tweak schema api
All checks were successful
CI / Typos (push) Successful in 32s
CI / Clippy (push) Successful in 1m15s
CI / Build and test (push) Successful in 1m42s
CI / Build and test (all features) (push) Successful in 6m9s
Docker / build-and-push (push) Successful in 3m23s

This commit is contained in:
2026-03-27 03:13:15 -07:00
parent 336480469c
commit 9967e066bb
5 changed files with 215 additions and 48 deletions

View File

@@ -11,7 +11,8 @@ use thiserror::Error;
use tracing::{trace, warn}; use tracing::{trace, warn};
pub use pile_dataset::serve::{ pub use pile_dataset::serve::{
FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse, ApiValue, FieldSpec, FieldsResponse, ItemsResponse, LookupRequest, LookupResponse,
SchemaResponse,
}; };
#[derive(Debug, Error)] #[derive(Debug, Error)]
@@ -169,19 +170,19 @@ impl DatasetClient {
Ok(FieldResponse { content_type, data }) Ok(FieldResponse { content_type, data })
} }
/// `GET /field` — get a field from an item's schema /// `GET /schema/{field}` — get a single schema field value from an item.
pub async fn get_field( pub async fn schema_field(
&self, &self,
source: &str, source: &str,
key: &str, key: &str,
field: &str, field: &str,
) -> Result<FieldResponse, ClientError> { ) -> Result<FieldResponse, ClientError> {
let url = format!("{}/field", self.base_url); let url = format!("{}/schema/{field}", self.base_url);
trace!(url, source, key, field, "GET /field"); trace!(url, source, key, field, "GET /schema/{field}");
let resp = self let resp = self
.client .client
.get(url) .get(url)
.query(&[("source", source), ("key", key), ("field", field)]) .query(&[("source", source), ("key", key)])
.send() .send()
.await?; .await?;
@@ -199,10 +200,29 @@ impl DatasetClient {
Ok(FieldResponse { content_type, data }) Ok(FieldResponse { content_type, data })
} }
/// `GET /schema` — retrieve this dataset's schema. /// `GET /schema` — get all schema field values for a single item.
pub async fn schema(&self) -> Result<FieldsResponse, ClientError> { pub async fn schema(
&self,
source: &str,
key: &str,
hidden: bool,
) -> Result<SchemaResponse, ClientError> {
let url = format!("{}/schema", self.base_url); let url = format!("{}/schema", self.base_url);
trace!(url, "GET /schema"); trace!(url, source, key, hidden, "GET /schema");
let resp = self
.client
.get(url)
.query(&[("source", source), ("key", key)])
.query(&[("hidden", hidden)])
.send()
.await?;
check_status(resp).await?.json().await.map_err(Into::into)
}
/// `GET /config/schema` — retrieve this dataset's schema spec.
pub async fn config_schema(&self) -> Result<FieldsResponse, ClientError> {
let url = format!("{}/config/schema", self.base_url);
trace!(url, "GET /config/schema");
let resp = self.client.get(url).send().await?; let resp = self.client.get(url).send().await?;
check_status(resp).await?.json().await.map_err(Into::into) check_status(resp).await?.json().await.map_err(Into::into)
} }

View File

@@ -0,0 +1,31 @@
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use std::{collections::HashMap, sync::Arc};
pub use pile_config::FieldSpec;
use crate::Datasets;
pub type FieldsResponse = HashMap<String, FieldSpec>;
/// Retrieve this dataset's schema.
#[utoipa::path(
get,
path = "/config/schema",
responses(
(status = 200, description = "This dataset's schema"),
)
)]
pub async fn config_schema(State(state): State<Arc<Datasets>>) -> Response {
let fields: FieldsResponse = state
.config
.schema
.iter()
.map(|(k, v)| (k.as_str().to_owned(), v.clone()))
.collect();
(StatusCode::OK, Json(fields)).into_response()
}

View File

@@ -17,26 +17,36 @@ pub use item::*;
mod extract; mod extract;
pub use extract::*; pub use extract::*;
mod field;
pub use field::*;
mod items; mod items;
pub use items::*; pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema; mod schema;
pub use schema::*; pub use schema::*;
#[derive(OpenApi)] #[derive(OpenApi)]
#[openapi( #[openapi(
tags(), tags(),
paths(lookup, item_get, get_extract, items_list, get_field, get_schema), paths(
lookup,
item_get,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas( components(schemas(
LookupRequest, LookupRequest,
LookupResponse, LookupResponse,
LookupResult, LookupResult,
ItemQuery, ItemQuery,
ExtractQuery, ExtractQuery,
FieldQuery,
ItemsQuery, ItemsQuery,
ItemsResponse, ItemsResponse,
ItemRef ItemRef
@@ -56,9 +66,10 @@ impl Datasets {
.route("/lookup", post(lookup)) .route("/lookup", post(lookup))
.route("/item", get(item_get)) .route("/item", get(item_get))
.route("/extract", get(get_extract)) .route("/extract", get(get_extract))
.route("/field", get(get_field))
.route("/items", get(items_list)) .route("/items", get(items_list))
.route("/schema", get(get_schema)) .route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(self.clone()); .with_state(self.clone());
if let Some(prefix) = prefix { if let Some(prefix) = prefix {
@@ -71,8 +82,14 @@ impl Datasets {
Some(prefix) => format!("{prefix}/docs"), Some(prefix) => format!("{prefix}/docs"),
}; };
let docs = SwaggerUi::new(docs_path.clone()) let api = Api::openapi();
.url(format!("{}/openapi.json", docs_path), Api::openapi()); let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs); router = router.merge(docs);
} }

View File

@@ -1,31 +1,130 @@
use axum::{ use axum::{
Json, Json,
extract::State, extract::{Query, State},
http::StatusCode, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_config::Label;
use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use utoipa::IntoParams;
pub use pile_config::FieldSpec;
use crate::Datasets; use crate::Datasets;
pub type FieldsResponse = HashMap<String, FieldSpec>; #[derive(Deserialize, IntoParams)]
pub struct SchemaQuery {
source: String,
key: String,
/// Retrieve this dataset's schema. #[serde(default)]
hidden: bool,
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
pub enum ApiValue {
Binary { binary: bool, mime: String },
Object { object: bool },
Array(Vec<ApiValue>),
String(String),
Number(serde_json::Number),
Null,
}
pub type SchemaResponse = HashMap<String, ApiValue>;
async fn pile_value_to_api(
state: &ExtractState,
value: PileValue,
) -> Result<ApiValue, std::io::Error> {
match value {
PileValue::String(s) => Ok(ApiValue::String(s.to_string())),
PileValue::U64(n) => Ok(ApiValue::Number(n.into())),
PileValue::I64(n) => Ok(ApiValue::Number(n.into())),
PileValue::Null => Ok(ApiValue::Null),
PileValue::Blob { mime, .. } => Ok(ApiValue::Binary {
binary: true,
mime: mime.to_string(),
}),
PileValue::Array(arr) => {
let mut out = Vec::with_capacity(arr.len());
for item in arr.iter() {
out.push(Box::pin(pile_value_to_api(state, item.clone())).await?);
}
Ok(ApiValue::Array(out))
}
PileValue::ObjectExtractor(_) | PileValue::ListExtractor(_) | PileValue::Item(_) => {
Ok(ApiValue::Object { object: true })
}
}
}
/// Get all schema field values for a single item.
#[utoipa::path( #[utoipa::path(
get, get,
path = "/schema", path = "/schema",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("hidden" = bool, Query, description = "Include hidden fields (default: false)"),
),
responses( responses(
(status = 200, description = "This dataset's schema"), (status = 200, description = "Schema field values as a map of label to value"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 500, description = "Internal server error"),
) )
)] )]
pub async fn get_schema(State(state): State<Arc<Datasets>>) -> Response { pub async fn schema_all(
let fields: FieldsResponse = state State(state): State<Arc<Datasets>>,
.config Query(params): Query<SchemaQuery>,
.schema ) -> Response {
.iter() let label = match Label::try_from(params.source.clone()) {
.map(|(k, v)| (k.as_str().to_owned(), v.clone())) Ok(l) => l,
.collect(); Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
(StatusCode::OK, Json(fields)).into_response() };
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item);
let mut result: SchemaResponse = HashMap::new();
for (field_label, field_spec) in &state.config.schema {
if field_spec.hidden && !params.hidden {
continue;
}
let mut value = None;
for path in &field_spec.path {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
}
}
}
let Some(v) = value else { continue };
let api_value = match pile_value_to_api(&extract_state, v).await {
Ok(v) => v,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
result.insert(field_label.as_str().to_owned(), api_value);
}
(StatusCode::OK, Json(result)).into_response()
} }

View File

@@ -1,6 +1,6 @@
use axum::{ use axum::{
Json, Json,
extract::{Query, State}, extract::{Path, Query, State},
http::{StatusCode, header}, http::{StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
@@ -9,29 +9,28 @@ use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::Deserialize; use serde::Deserialize;
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use tracing::debug; use tracing::debug;
use utoipa::{IntoParams, ToSchema}; use utoipa::IntoParams;
use crate::Datasets; use crate::Datasets;
#[derive(Deserialize, ToSchema, IntoParams)] #[derive(Deserialize, IntoParams)]
pub struct FieldQuery { pub struct SchemaFieldQuery {
source: String, source: String,
key: String, key: String,
field: String,
#[serde(default)] #[serde(default)]
download: bool, download: bool,
name: Option<String>, name: Option<String>,
} }
/// Extract a specific field from an item's metadata. /// Extract a specific schema field from an item's metadata.
#[utoipa::path( #[utoipa::path(
get, get,
path = "/field", path = "/schema/{field}",
params( params(
("field" = String, Path, description = "Schema field"),
("source" = String, Query, description = "Source label"), ("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"), ("key" = String, Query, description = "Item key"),
("field" = String, Query, description = "Schema field"),
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"), ("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
), ),
responses( responses(
@@ -41,9 +40,10 @@ pub struct FieldQuery {
(status = 500, description = "Internal server error"), (status = 500, description = "Internal server error"),
) )
)] )]
pub async fn get_field( pub async fn schema_field(
State(state): State<Arc<Datasets>>, State(state): State<Arc<Datasets>>,
Query(params): Query<FieldQuery>, Path(field): Path<String>,
Query(params): Query<SchemaFieldQuery>,
) -> Response { ) -> Response {
let start = Instant::now(); let start = Instant::now();
@@ -53,22 +53,22 @@ pub async fn get_field(
}; };
debug!( debug!(
message = "Serving /field", message = "Serving /schema/{field}",
source = params.source, source = params.source,
key = params.key, key = params.key,
field = params.field, field = field,
); );
let Some(item) = state.get(&label, &params.key).await else { let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response(); return StatusCode::NOT_FOUND.into_response();
}; };
let field = match Label::new(&params.field) { let field_label = match Label::new(&field) {
Some(x) => x, Some(x) => x,
None => return StatusCode::NOT_FOUND.into_response(), None => return StatusCode::NOT_FOUND.into_response(),
}; };
let paths = match state.config.schema.get(&field) { let paths = match state.config.schema.get(&field_label) {
Some(x) => &x.path, Some(x) => &x.path,
None => return StatusCode::NOT_FOUND.into_response(), None => return StatusCode::NOT_FOUND.into_response(),
}; };
@@ -93,10 +93,10 @@ pub async fn get_field(
}; };
debug!( debug!(
message = "Served /field", message = "Served /schema/{field}",
source = params.source, source = params.source,
key = params.key, key = params.key,
field = params.field, field = field,
time_ms = start.elapsed().as_millis() time_ms = start.elapsed().as_millis()
); );