Split server into crate
Some checks failed
CI / Typos (push) Failing after 17s
CI / Build and test (push) Successful in 1m41s
CI / Clippy (push) Has been cancelled
CI / Build and test (all features) (push) Has been cancelled
Docker / build-and-push (push) Has been cancelled

This commit is contained in:
2026-04-15 09:23:52 -07:00
parent 251d130987
commit 3bc66ddc48
18 changed files with 161 additions and 137 deletions

View File

@@ -0,0 +1,28 @@
[package]
name = "pile-serve"
version = { workspace = true }
rust-version = { workspace = true }
edition = { workspace = true }
[lints]
workspace = true
[dependencies]
pile-config = { workspace = true }
pile-value = { workspace = true }
pile-dataset = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
serde = { workspace = true }
axum = { workspace = true }
percent-encoding = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
[features]
default = []
pdfium = ["pile-value/pdfium"]

View File

@@ -0,0 +1,30 @@
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use std::{collections::HashMap, sync::Arc};
pub use pile_config::FieldSpec;
pub type FieldsResponse = HashMap<String, FieldSpec>;
/// Retrieve this dataset's schema.
#[utoipa::path(
get,
path = "/config/schema",
responses(
(status = 200, description = "This dataset's schema"),
)
)]
pub async fn config_schema(State(state): State<Arc<Datasets>>) -> Response {
let fields: FieldsResponse = state
.config
.schema
.iter()
.map(|(k, v)| (k.as_str().to_owned(), v.clone()))
.collect();
(StatusCode::OK, Json(fields)).into_response()
}

View File

@@ -0,0 +1,190 @@
use axum::{
Json,
body::Body,
extract::{Query, RawQuery, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
};
use percent_encoding::percent_decode_str;
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
};
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::ToSchema;
#[derive(Deserialize, ToSchema)]
pub struct ExtractQuery {
source: String,
key: String,
#[serde(default)]
download: bool,
name: Option<String>,
}
/// Extract a specific field from an item's metadata.
/// Multiple `path` parameters may be provided; the first non-null result is returned.
#[utoipa::path(
get,
path = "/extract",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title); repeat for fallbacks"),
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn get_extract(
State(state): State<Arc<Datasets>>,
Query(params): Query<ExtractQuery>,
RawQuery(raw_query): RawQuery,
) -> Response {
let start = Instant::now();
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
// Collect all `path` query params in order (supports repeated ?path=...&path=...)
let raw = raw_query.as_deref().unwrap_or("");
let paths: Vec<ObjectPath> = {
let mut result = Vec::new();
for part in raw.split('&') {
if let Some((k, v)) = part.split_once('=')
&& k == "path"
{
let v = percent_decode_str(v).decode_utf8_lossy();
match v.parse::<ObjectPath>() {
Ok(p) => result.push(p),
Err(e) => {
return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response();
}
}
}
}
result
};
if paths.is_empty() {
return (StatusCode::BAD_REQUEST, "Missing `path` query parameter").into_response();
}
debug!(
message = "Serving /extract",
source = params.source,
key = params.key,
paths = paths.len(),
);
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item);
// Try each path in order, returning the first non-null result
let mut value = None;
for path in &paths {
match item.query(&extract_state, path).await {
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(
message = "Served /extract",
source = params.source,
key = params.key,
time_ms = start.elapsed().as_millis()
);
let disposition_type = if params.download {
"attachment"
} else {
"inline"
};
let file_name = params.name.unwrap_or_else(|| {
params
.key
.rsplit('/')
.next()
.unwrap_or(&params.key)
.to_owned()
});
let disposition = format!("{disposition_type}; filename=\"{file_name}\"");
match value {
PileValue::String(s) => (
StatusCode::OK,
[
(header::CONTENT_TYPE, "text/plain".to_owned()),
(header::CONTENT_DISPOSITION, disposition),
],
s.to_string(),
)
.into_response(),
PileValue::Binary(binary) => {
let mime = binary.mime().to_string();
let body = match binary {
BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()),
BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await {
Ok(file) => Body::from_stream(ReaderStream::new(file)),
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}"))
.into_response();
}
},
};
(
StatusCode::OK,
[
(header::CONTENT_TYPE, mime),
(header::CONTENT_DISPOSITION, disposition),
],
body,
)
.into_response()
}
_ => match value.to_json(&extract_state).await {
Ok(json) => (
StatusCode::OK,
[(header::CONTENT_DISPOSITION, disposition)],
Json(json),
)
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}

View File

@@ -0,0 +1,103 @@
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
use utoipa::ToSchema;
#[derive(Deserialize, ToSchema)]
pub struct ItemsQuery {
#[serde(default)]
offset: usize,
#[serde(default = "default_limit")]
limit: usize,
}
fn default_limit() -> usize {
100
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ItemsResponse {
pub items: Vec<ItemRef>,
pub total: usize,
pub offset: usize,
pub limit: usize,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ItemRef {
pub source: String,
pub key: String,
}
/// List all items across all sources with consistent ordering, paginated by offset and limit
#[utoipa::path(
get,
path = "/items",
params(
("offset" = usize, Query, description = "Number of items to skip"),
("limit" = usize, Query, description = "Maximum number of items to return (max 1000)"),
),
responses(
(status = 200, description = "Paginated list of items", body = ItemsResponse),
)
)]
pub async fn items_list(
State(state): State<Arc<Datasets>>,
Query(params): Query<ItemsQuery>,
) -> Response {
let limit = params.limit.min(1000);
let offset = params.offset;
debug!(message = "Serving /items", offset, limit);
// Sort sources by label for a consistent global order: (source, key)
let mut source_labels: Vec<_> = state.sources.keys().collect();
source_labels.sort();
let mut items: Vec<ItemRef> = Vec::with_capacity(limit);
let mut total = 0usize;
let mut remaining_offset = offset;
for label in source_labels {
let dataset = &state.sources[label];
let source_len = dataset.len();
if remaining_offset >= source_len {
// This entire source is before our window; skip it efficiently
remaining_offset -= source_len;
total += source_len;
continue;
}
let want = (limit - items.len()).min(source_len - remaining_offset);
let source_str = label.as_str().to_owned();
for item in dataset.iter_page(remaining_offset, want) {
items.push(ItemRef {
source: source_str.clone(),
key: item.key().to_string(),
});
}
remaining_offset = 0;
total += source_len;
}
debug!(message = "Served /items", offset, limit, total);
(
StatusCode::OK,
Json(ItemsResponse {
items,
total,
offset,
limit,
}),
)
.into_response()
}

View File

@@ -0,0 +1,89 @@
use axum::{
Router,
routing::{get, post},
};
use pile_dataset::Datasets;
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
mod lookup;
pub use lookup::*;
mod extract;
pub use extract::*;
mod items;
pub use items::*;
mod config_schema;
pub use config_schema::*;
mod schema_field;
pub use schema_field::*;
mod schema;
pub use schema::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(
lookup,
get_extract,
items_list,
config_schema,
schema_field,
schema_all
),
components(schemas(
LookupRequest,
LookupResponse,
LookupResult,
ExtractQuery,
ItemsQuery,
ItemsResponse,
ItemRef
))
)]
pub(crate) struct Api;
#[inline]
pub fn router(ds: Arc<Datasets>, with_docs: bool) -> Router<()> {
router_prefix(ds, with_docs, None)
}
#[inline]
pub fn router_prefix(ds: Arc<Datasets>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/extract", get(get_extract))
.route("/items", get(items_list))
.route("/config/schema", get(config_schema))
.route("/schema", get(schema_all))
.route("/schema/{field}", get(schema_field))
.with_state(ds.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let api = Api::openapi();
let api = match prefix {
None => api,
Some(prefix) => utoipa::openapi::OpenApi::default().nest(prefix, api),
};
let docs =
SwaggerUi::new(docs_path.clone()).url(format!("{}/openapi.json", docs_path), api);
router = router.merge(docs);
}
router
}

View File

@@ -0,0 +1,79 @@
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_dataset::Datasets;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest {
pub query: String,
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct LookupResponse {
pub results: Vec<LookupResult>,
pub total: u64,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct LookupResult {
pub score: f32,
pub source: String,
pub key: String,
}
/// Search for an item in this dataset
#[utoipa::path(
post,
path = "/lookup",
responses(
(status = 200, description = "Search results", body = Vec<LookupResponse>),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 404, description = "URL not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn lookup(
State(state): State<Arc<Datasets>>,
Json(body): Json<LookupRequest>,
) -> Response {
let start = Instant::now();
let limit = body.limit.unwrap_or(128).min(1024);
debug!(message = "Serving /lookup", query = body.query, limit);
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, limit) {
Ok(x) => x
.into_iter()
.map(|x| LookupResult {
key: x.key,
score: x.score,
source: x.source.into(),
})
.collect(),
Err(error) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")).into_response();
}
};
let total: u64 = state.sources.iter().map(|x| x.1.len() as u64).sum();
debug!(
message = "Served /lookup",
query = body.query,
limit = body.limit.unwrap_or(10),
time_ms = start.elapsed().as_millis()
);
return (StatusCode::OK, Json(LookupResponse { results, total })).into_response();
}

View File

@@ -0,0 +1,129 @@
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use utoipa::IntoParams;
#[derive(Deserialize, IntoParams)]
pub struct SchemaQuery {
source: String,
key: String,
#[serde(default)]
hidden: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ApiValue {
Binary { binary: bool, mime: String },
Object { object: bool },
Array(Vec<ApiValue>),
String(String),
Number(serde_json::Number),
Null,
}
pub type SchemaResponse = HashMap<String, ApiValue>;
async fn pile_value_to_api(
state: &ExtractState,
value: PileValue,
) -> Result<ApiValue, std::io::Error> {
match value {
PileValue::String(s) => Ok(ApiValue::String(s.to_string())),
PileValue::U64(n) => Ok(ApiValue::Number(n.into())),
PileValue::I64(n) => Ok(ApiValue::Number(n.into())),
PileValue::Null => Ok(ApiValue::Null),
PileValue::Binary(x) => Ok(ApiValue::Binary {
binary: true,
mime: x.mime().to_string(),
}),
PileValue::Array(arr) => {
let mut out = Vec::with_capacity(arr.len());
for item in arr.iter() {
out.push(Box::pin(pile_value_to_api(state, item.clone())).await?);
}
Ok(ApiValue::Array(out))
}
PileValue::ObjectExtractor(_) | PileValue::ListExtractor(_) | PileValue::Item(_) => {
Ok(ApiValue::Object { object: true })
}
}
}
/// Get all schema field values for a single item.
#[utoipa::path(
get,
path = "/schema",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("hidden" = bool, Query, description = "Include hidden fields (default: false)"),
),
responses(
(status = 200, description = "Schema field values as a map of label to value"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn schema_all(
State(state): State<Arc<Datasets>>,
Query(params): Query<SchemaQuery>,
) -> Response {
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item);
let mut result: SchemaResponse = HashMap::new();
for (field_label, field_spec) in &state.config.schema {
if field_spec.hidden && !params.hidden {
continue;
}
let mut value = None;
for path in &field_spec.path {
match item.query(&extract_state, path).await {
Ok(Some(PileValue::Null)) | Ok(None) => continue,
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
}
}
}
let Some(v) = value else { continue };
let api_value = match pile_value_to_api(&extract_state, v).await {
Ok(v) => v,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
result.insert(field_label.as_str().to_owned(), api_value);
}
(StatusCode::OK, Json(result)).into_response()
}

View File

@@ -0,0 +1,173 @@
use axum::{
Json,
body::Body,
extract::{Path, Query, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
};
use pile_config::Label;
use pile_dataset::Datasets;
use pile_value::{
extract::traits::ExtractState,
value::{BinaryPileValue, PileValue},
};
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tokio_util::io::ReaderStream;
use tracing::debug;
use utoipa::IntoParams;
#[derive(Deserialize, IntoParams)]
pub struct SchemaFieldQuery {
source: String,
key: String,
#[serde(default)]
download: bool,
name: Option<String>,
}
/// Extract a specific schema field from an item's metadata.
#[utoipa::path(
get,
path = "/schema/{field}",
params(
("field" = String, Path, description = "Schema field"),
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn schema_field(
State(state): State<Arc<Datasets>>,
Path(field): Path<String>,
Query(params): Query<SchemaFieldQuery>,
) -> Response {
let start = Instant::now();
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
debug!(
message = "Serving /schema/{field}",
source = params.source,
key = params.key,
field = field,
);
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let field_label = match Label::new(&field) {
Some(x) => x,
None => return StatusCode::NOT_FOUND.into_response(),
};
let paths = match state.config.schema.get(&field_label) {
Some(x) => &x.path,
None => return StatusCode::NOT_FOUND.into_response(),
};
let extract_state = ExtractState { ignore_mime: false };
let item = PileValue::Item(item);
let mut value = None;
for path in paths {
match item.query(&extract_state, path).await {
Ok(None) => continue,
Ok(Some(PileValue::Null)) => {
value = Some(PileValue::Null);
continue;
}
Ok(Some(v)) => {
value = Some(v);
break;
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
let Some(value) = value else {
return (StatusCode::BAD_REQUEST, "no value").into_response();
};
debug!(
message = "Served /schema/{field}",
source = params.source,
key = params.key,
field = field,
time_ms = start.elapsed().as_millis()
);
let disposition_type = if params.download {
"attachment"
} else {
"inline"
};
let file_name = params.name.unwrap_or_else(|| {
params
.key
.rsplit('/')
.next()
.unwrap_or(&params.key)
.to_owned()
});
let disposition = format!("{disposition_type}; filename=\"{file_name}\"");
match value {
PileValue::String(s) => (
StatusCode::OK,
[
(header::CONTENT_TYPE, "text/plain".to_owned()),
(header::CONTENT_DISPOSITION, disposition),
],
s.to_string(),
)
.into_response(),
PileValue::Binary(binary) => {
let mime = binary.mime().to_string();
let body = match binary {
BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()),
BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await {
Ok(file) => Body::from_stream(ReaderStream::new(file)),
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}"))
.into_response();
}
},
};
(
StatusCode::OK,
[
(header::CONTENT_TYPE, mime),
(header::CONTENT_DISPOSITION, disposition),
],
body,
)
.into_response()
}
_ => match value.to_json(&extract_state).await {
Ok(json) => (
StatusCode::OK,
[(header::CONTENT_DISPOSITION, disposition)],
Json(json),
)
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}