Tweak schema api
This commit is contained in:
31
crates/pile-dataset/src/serve/config_schema.rs
Normal file
31
crates/pile-dataset/src/serve/config_schema.rs
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
use axum::{
|
||||||
|
Json,
|
||||||
|
extract::State,
|
||||||
|
http::StatusCode,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
|
pub use pile_config::FieldSpec;
|
||||||
|
|
||||||
|
use crate::Datasets;
|
||||||
|
|
||||||
|
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
||||||
|
|
||||||
|
/// Retrieve this dataset's schema.
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "/config/schema",
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "This dataset's schema"),
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
pub async fn config_schema(State(state): State<Arc<Datasets>>) -> Response {
|
||||||
|
let fields: FieldsResponse = state
|
||||||
|
.config
|
||||||
|
.schema
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.as_str().to_owned(), v.clone()))
|
||||||
|
.collect();
|
||||||
|
(StatusCode::OK, Json(fields)).into_response()
|
||||||
|
}
|
||||||
@@ -17,26 +17,36 @@ pub use item::*;
|
|||||||
mod extract;
|
mod extract;
|
||||||
pub use extract::*;
|
pub use extract::*;
|
||||||
|
|
||||||
mod field;
|
|
||||||
pub use field::*;
|
|
||||||
|
|
||||||
mod items;
|
mod items;
|
||||||
pub use items::*;
|
pub use items::*;
|
||||||
|
|
||||||
|
mod config_schema;
|
||||||
|
pub use config_schema::*;
|
||||||
|
|
||||||
|
mod schema_field;
|
||||||
|
pub use schema_field::*;
|
||||||
|
|
||||||
mod schema;
|
mod schema;
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
|
|
||||||
#[derive(OpenApi)]
|
#[derive(OpenApi)]
|
||||||
#[openapi(
|
#[openapi(
|
||||||
tags(),
|
tags(),
|
||||||
paths(lookup, item_get, get_extract, items_list, get_field, get_schema),
|
paths(
|
||||||
|
lookup,
|
||||||
|
item_get,
|
||||||
|
get_extract,
|
||||||
|
items_list,
|
||||||
|
config_schema,
|
||||||
|
schema_field,
|
||||||
|
schema_all
|
||||||
|
),
|
||||||
components(schemas(
|
components(schemas(
|
||||||
LookupRequest,
|
LookupRequest,
|
||||||
LookupResponse,
|
LookupResponse,
|
||||||
LookupResult,
|
LookupResult,
|
||||||
ItemQuery,
|
ItemQuery,
|
||||||
ExtractQuery,
|
ExtractQuery,
|
||||||
FieldQuery,
|
|
||||||
ItemsQuery,
|
ItemsQuery,
|
||||||
ItemsResponse,
|
ItemsResponse,
|
||||||
ItemRef
|
ItemRef
|
||||||
@@ -56,9 +66,10 @@ impl Datasets {
|
|||||||
.route("/lookup", post(lookup))
|
.route("/lookup", post(lookup))
|
||||||
.route("/item", get(item_get))
|
.route("/item", get(item_get))
|
||||||
.route("/extract", get(get_extract))
|
.route("/extract", get(get_extract))
|
||||||
.route("/field", get(get_field))
|
|
||||||
.route("/items", get(items_list))
|
.route("/items", get(items_list))
|
||||||
.route("/schema", get(get_schema))
|
.route("/config/schema", get(config_schema))
|
||||||
|
.route("/schema", get(schema_all))
|
||||||
|
.route("/schema/{field}", get(schema_field))
|
||||||
.with_state(self.clone());
|
.with_state(self.clone());
|
||||||
|
|
||||||
if let Some(prefix) = prefix {
|
if let Some(prefix) = prefix {
|
||||||
@@ -71,8 +82,14 @@ impl Datasets {
|
|||||||
Some(prefix) => format!("{prefix}/docs"),
|
Some(prefix) => format!("{prefix}/docs"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let docs = SwaggerUi::new(docs_path.clone())
|
let api = Api::openapi();
|
||||||
.url(format!("{}/openapi.json", docs_path), Api::openapi());
|
let api = match prefix {
|
||||||
|
None => api,
|
||||||
|
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
|
||||||
|
};
|
||||||
|
|
||||||
|
let docs =
|
||||||
|
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
|
||||||
|
|
||||||
router = router.merge(docs);
|
router = router.merge(docs);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,31 +1,130 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
Json,
|
Json,
|
||||||
extract::State,
|
extract::{Query, State},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
use pile_config::Label;
|
||||||
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
use utoipa::IntoParams;
|
||||||
pub use pile_config::FieldSpec;
|
|
||||||
|
|
||||||
use crate::Datasets;
|
use crate::Datasets;
|
||||||
|
|
||||||
pub type FieldsResponse = HashMap<String, FieldSpec>;
|
#[derive(Deserialize, IntoParams)]
|
||||||
|
pub struct SchemaQuery {
|
||||||
|
source: String,
|
||||||
|
key: String,
|
||||||
|
|
||||||
/// Retrieve this dataset's schema.
|
#[serde(default)]
|
||||||
|
hidden: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
pub enum ApiValue {
|
||||||
|
Binary { binary: bool, mime: String },
|
||||||
|
Object { object: bool },
|
||||||
|
Array(Vec<ApiValue>),
|
||||||
|
String(String),
|
||||||
|
Number(serde_json::Number),
|
||||||
|
Null,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type SchemaResponse = HashMap<String, ApiValue>;
|
||||||
|
|
||||||
|
async fn pile_value_to_api(
|
||||||
|
state: &ExtractState,
|
||||||
|
value: PileValue,
|
||||||
|
) -> Result<ApiValue, std::io::Error> {
|
||||||
|
match value {
|
||||||
|
PileValue::String(s) => Ok(ApiValue::String(s.to_string())),
|
||||||
|
PileValue::U64(n) => Ok(ApiValue::Number(n.into())),
|
||||||
|
PileValue::I64(n) => Ok(ApiValue::Number(n.into())),
|
||||||
|
PileValue::Null => Ok(ApiValue::Null),
|
||||||
|
|
||||||
|
PileValue::Blob { mime, .. } => Ok(ApiValue::Binary {
|
||||||
|
binary: true,
|
||||||
|
mime: mime.to_string(),
|
||||||
|
}),
|
||||||
|
|
||||||
|
PileValue::Array(arr) => {
|
||||||
|
let mut out = Vec::with_capacity(arr.len());
|
||||||
|
for item in arr.iter() {
|
||||||
|
out.push(Box::pin(pile_value_to_api(state, item.clone())).await?);
|
||||||
|
}
|
||||||
|
Ok(ApiValue::Array(out))
|
||||||
|
}
|
||||||
|
|
||||||
|
PileValue::ObjectExtractor(_) | PileValue::ListExtractor(_) | PileValue::Item(_) => {
|
||||||
|
Ok(ApiValue::Object { object: true })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get all schema field values for a single item.
|
||||||
#[utoipa::path(
|
#[utoipa::path(
|
||||||
get,
|
get,
|
||||||
path = "/schema",
|
path = "/schema",
|
||||||
|
params(
|
||||||
|
("source" = String, Query, description = "Source label"),
|
||||||
|
("key" = String, Query, description = "Item key"),
|
||||||
|
("hidden" = bool, Query, description = "Include hidden fields (default: false)"),
|
||||||
|
),
|
||||||
responses(
|
responses(
|
||||||
(status = 200, description = "This dataset's schema"),
|
(status = 200, description = "Schema field values as a map of label to value"),
|
||||||
|
(status = 400, description = "Invalid source label"),
|
||||||
|
(status = 404, description = "Item not found"),
|
||||||
|
(status = 500, description = "Internal server error"),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub async fn get_schema(State(state): State<Arc<Datasets>>) -> Response {
|
pub async fn schema_all(
|
||||||
let fields: FieldsResponse = state
|
State(state): State<Arc<Datasets>>,
|
||||||
.config
|
Query(params): Query<SchemaQuery>,
|
||||||
.schema
|
) -> Response {
|
||||||
.iter()
|
let label = match Label::try_from(params.source.clone()) {
|
||||||
.map(|(k, v)| (k.as_str().to_owned(), v.clone()))
|
Ok(l) => l,
|
||||||
.collect();
|
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
|
||||||
(StatusCode::OK, Json(fields)).into_response()
|
};
|
||||||
|
|
||||||
|
let Some(item) = state.get(&label, ¶ms.key).await else {
|
||||||
|
return StatusCode::NOT_FOUND.into_response();
|
||||||
|
};
|
||||||
|
|
||||||
|
let extract_state = ExtractState { ignore_mime: false };
|
||||||
|
let item = PileValue::Item(item);
|
||||||
|
|
||||||
|
let mut result: SchemaResponse = HashMap::new();
|
||||||
|
|
||||||
|
for (field_label, field_spec) in &state.config.schema {
|
||||||
|
if field_spec.hidden && !params.hidden {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut value = None;
|
||||||
|
for path in &field_spec.path {
|
||||||
|
match item.query(&extract_state, path).await {
|
||||||
|
Ok(Some(PileValue::Null)) | Ok(None) => continue,
|
||||||
|
Ok(Some(v)) => {
|
||||||
|
value = Some(v);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(v) = value else { continue };
|
||||||
|
|
||||||
|
let api_value = match pile_value_to_api(&extract_state, v).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
result.insert(field_label.as_str().to_owned(), api_value);
|
||||||
|
}
|
||||||
|
|
||||||
|
(StatusCode::OK, Json(result)).into_response()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
Json,
|
Json,
|
||||||
extract::{Query, State},
|
extract::{Path, Query, State},
|
||||||
http::{StatusCode, header},
|
http::{StatusCode, header},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
@@ -9,29 +9,28 @@ use pile_value::{extract::traits::ExtractState, value::PileValue};
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::{IntoParams, ToSchema};
|
use utoipa::IntoParams;
|
||||||
|
|
||||||
use crate::Datasets;
|
use crate::Datasets;
|
||||||
|
|
||||||
#[derive(Deserialize, ToSchema, IntoParams)]
|
#[derive(Deserialize, IntoParams)]
|
||||||
pub struct FieldQuery {
|
pub struct SchemaFieldQuery {
|
||||||
source: String,
|
source: String,
|
||||||
key: String,
|
key: String,
|
||||||
field: String,
|
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
download: bool,
|
download: bool,
|
||||||
name: Option<String>,
|
name: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract a specific field from an item's metadata.
|
/// Extract a specific schema field from an item's metadata.
|
||||||
#[utoipa::path(
|
#[utoipa::path(
|
||||||
get,
|
get,
|
||||||
path = "/field",
|
path = "/schema/{field}",
|
||||||
params(
|
params(
|
||||||
|
("field" = String, Path, description = "Schema field"),
|
||||||
("source" = String, Query, description = "Source label"),
|
("source" = String, Query, description = "Source label"),
|
||||||
("key" = String, Query, description = "Item key"),
|
("key" = String, Query, description = "Item key"),
|
||||||
("field" = String, Query, description = "Schema field"),
|
|
||||||
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
|
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
|
||||||
),
|
),
|
||||||
responses(
|
responses(
|
||||||
@@ -41,9 +40,10 @@ pub struct FieldQuery {
|
|||||||
(status = 500, description = "Internal server error"),
|
(status = 500, description = "Internal server error"),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub async fn get_field(
|
pub async fn schema_field(
|
||||||
State(state): State<Arc<Datasets>>,
|
State(state): State<Arc<Datasets>>,
|
||||||
Query(params): Query<FieldQuery>,
|
Path(field): Path<String>,
|
||||||
|
Query(params): Query<SchemaFieldQuery>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
@@ -53,22 +53,22 @@ pub async fn get_field(
|
|||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
message = "Serving /field",
|
message = "Serving /schema/{field}",
|
||||||
source = params.source,
|
source = params.source,
|
||||||
key = params.key,
|
key = params.key,
|
||||||
field = params.field,
|
field = field,
|
||||||
);
|
);
|
||||||
|
|
||||||
let Some(item) = state.get(&label, ¶ms.key).await else {
|
let Some(item) = state.get(&label, ¶ms.key).await else {
|
||||||
return StatusCode::NOT_FOUND.into_response();
|
return StatusCode::NOT_FOUND.into_response();
|
||||||
};
|
};
|
||||||
|
|
||||||
let field = match Label::new(¶ms.field) {
|
let field_label = match Label::new(&field) {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => return StatusCode::NOT_FOUND.into_response(),
|
None => return StatusCode::NOT_FOUND.into_response(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let paths = match state.config.schema.get(&field) {
|
let paths = match state.config.schema.get(&field_label) {
|
||||||
Some(x) => &x.path,
|
Some(x) => &x.path,
|
||||||
None => return StatusCode::NOT_FOUND.into_response(),
|
None => return StatusCode::NOT_FOUND.into_response(),
|
||||||
};
|
};
|
||||||
@@ -93,10 +93,10 @@ pub async fn get_field(
|
|||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
message = "Served /field",
|
message = "Served /schema/{field}",
|
||||||
source = params.source,
|
source = params.source,
|
||||||
key = params.key,
|
key = params.key,
|
||||||
field = params.field,
|
field = field,
|
||||||
time_ms = start.elapsed().as_millis()
|
time_ms = start.elapsed().as_millis()
|
||||||
);
|
);
|
||||||
|
|
||||||
Reference in New Issue
Block a user