diff --git a/Cargo.lock b/Cargo.lock index b508c27..ed884a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2391,8 +2391,6 @@ dependencies = [ "tracing", "tracing-indicatif", "tracing-subscriber", - "utoipa", - "utoipa-swagger-ui", ] [[package]] @@ -2411,6 +2409,7 @@ version = "0.0.1" dependencies = [ "async-trait", "aws-sdk-s3", + "axum", "blake3", "chrono", "epub", @@ -2423,6 +2422,7 @@ dependencies = [ "pile-config", "pile-flac", "pile-toolbox", + "serde", "serde_json", "smartstring", "tantivy", @@ -2431,6 +2431,8 @@ dependencies = [ "tokio-stream", "toml", "tracing", + "utoipa", + "utoipa-swagger-ui", "walkdir", ] diff --git a/crates/pile-dataset/Cargo.toml b/crates/pile-dataset/Cargo.toml index 8e55456..2888064 100644 --- a/crates/pile-dataset/Cargo.toml +++ b/crates/pile-dataset/Cargo.toml @@ -32,3 +32,12 @@ async-trait = { workspace = true } aws-sdk-s3 = { workspace = true } mime = { workspace = true } mime_guess = { workspace = true } +serde = { workspace = true } + +axum = { workspace = true, optional = true } +utoipa = { workspace = true, optional = true } +utoipa-swagger-ui = { workspace = true, optional = true } + +[features] +default = [] +axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"] diff --git a/crates/pile-dataset/src/lib.rs b/crates/pile-dataset/src/lib.rs index 01fae03..a7295a5 100644 --- a/crates/pile-dataset/src/lib.rs +++ b/crates/pile-dataset/src/lib.rs @@ -16,3 +16,6 @@ pub use value::*; pub mod extract; pub mod index; pub mod source; + +#[cfg(feature = "axum")] +pub mod serve; diff --git a/crates/pile-dataset/src/serve/field.rs b/crates/pile-dataset/src/serve/field.rs new file mode 100644 index 0000000..4c755f4 --- /dev/null +++ b/crates/pile-dataset/src/serve/field.rs @@ -0,0 +1,90 @@ +use axum::{ + Json, + extract::{Query, State}, + http::{StatusCode, header}, + response::{IntoResponse, Response}, +}; +use pile_config::{Label, objectpath::ObjectPath}; +use serde::Deserialize; +use std::sync::Arc; +use tracing::debug; +use utoipa::ToSchema; + +use crate::{Datasets, PileValue, extract::MetaExtractor}; + +#[derive(Deserialize, ToSchema)] +pub struct FieldQuery { + source: String, + key: String, + path: String, +} + +/// Extract a specific field from an item's metadata +#[utoipa::path( + get, + path = "/field", + params( + ("source" = String, Query, description = "Source label"), + ("key" = String, Query, description = "Item key"), + ("path" = String, Query, description = "Object path (e.g. $.flac.title)"), + ), + responses( + (status = 200, description = "Field value as JSON"), + (status = 400, description = "Invalid source label or path"), + (status = 404, description = "Item or field not found"), + (status = 500, description = "Internal server error"), + ) +)] +pub async fn get_field( + State(state): State>, + Query(params): Query, +) -> Response { + debug!( + message = "Serving /field", + source = params.source, + key = params.key, + path = params.path, + ); + + let label = match Label::try_from(params.source) { + Ok(l) => l, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let path: ObjectPath = match params.path.parse() { + Ok(p) => p, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let Some(item) = state.get(&label, ¶ms.key).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let extractor = MetaExtractor::new(&item); + let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor)); + + let value = match root.query(&path).await { + Ok(Some(v)) => v, + Ok(None) => return StatusCode::NOT_FOUND.into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }; + + match value { + PileValue::String(s) => ( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/plain")], + s.to_string(), + ) + .into_response(), + PileValue::Blob { mime, bytes } => ( + StatusCode::OK, + [(header::CONTENT_TYPE, mime.to_string())], + bytes.as_ref().clone(), + ) + .into_response(), + _ => match value.to_json().await { + Ok(json) => (StatusCode::OK, Json(json)).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }, + } +} diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs new file mode 100644 index 0000000..d0b4598 --- /dev/null +++ b/crates/pile-dataset/src/serve/item.rs @@ -0,0 +1,65 @@ +use axum::{ + extract::{Query, State}, + http::{StatusCode, header}, + response::{IntoResponse, Response}, +}; +use pile_config::Label; +use serde::Deserialize; +use std::sync::Arc; +use tracing::debug; +use utoipa::ToSchema; + +use crate::{AsyncReader, Datasets}; + +#[derive(Deserialize, ToSchema)] +pub struct ItemQuery { + source: String, + key: String, +} + +/// Fetch the raw bytes of an item by source and key +#[utoipa::path( + get, + path = "/item", + params( + ("source" = String, Query, description = "Source label"), + ("key" = String, Query, description = "Item key"), + ), + responses( + (status = 200, description = "Raw item bytes"), + (status = 400, description = "Invalid source label"), + (status = 404, description = "Item not found"), + (status = 500, description = "Internal server error"), + ) +)] +pub async fn item_get( + State(state): State>, + Query(params): Query, +) -> Response { + debug!( + message = "Serving /item", + source = params.source, + key = params.key + ); + + let label = match Label::try_from(params.source) { + Ok(l) => l, + Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), + }; + + let Some(item) = state.get(&label, ¶ms.key).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let mime = item.mime().to_string(); + + let mut reader = match item.read().await { + Ok(r) => r, + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }; + + match reader.read_to_end().await { + Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + } +} diff --git a/crates/pile-dataset/src/serve/lookup.rs b/crates/pile-dataset/src/serve/lookup.rs new file mode 100644 index 0000000..d6840c5 --- /dev/null +++ b/crates/pile-dataset/src/serve/lookup.rs @@ -0,0 +1,72 @@ +use axum::{ + Json, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::debug; +use utoipa::ToSchema; + +use crate::Datasets; + +#[derive(Serialize, Deserialize, ToSchema, Debug)] +pub struct LookupRequest { + pub query: String, + + #[serde(default)] + pub limit: Option, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct LookupResponse { + pub results: Vec, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct LookupResult { + pub score: f32, + pub source: String, + pub key: String, +} + +/// Search for an item in this dataset +#[utoipa::path( + post, + path = "/lookup", + responses( + (status = 200, description = "Search results", body = Vec), + (status = 400, description = "Invalid request"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "URL not found"), + (status = 500, description = "Internal server error"), + ) +)] +pub async fn lookup( + State(state): State>, + Json(body): Json, +) -> Response { + debug!( + message = "Serving /lookup", + query = body.query, + limit = body.limit.unwrap_or(10) + ); + + let results: Vec = match state.fts_lookup(&body.query, body.limit.unwrap_or(10)) { + Ok(x) => x + .into_iter() + .map(|x| LookupResult { + key: x.key, + score: x.score, + source: x.source.into(), + }) + .collect(), + + Err(error) => { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")).into_response(); + } + }; + + return (StatusCode::OK, Json(LookupResponse { results })).into_response(); +} diff --git a/crates/pile-dataset/src/serve/mod.rs b/crates/pile-dataset/src/serve/mod.rs new file mode 100644 index 0000000..fd0a476 --- /dev/null +++ b/crates/pile-dataset/src/serve/mod.rs @@ -0,0 +1,47 @@ +use axum::{ + Router, + routing::{get, post}, +}; +use std::sync::Arc; +use utoipa::OpenApi; +use utoipa_swagger_ui::SwaggerUi; + +use crate::Datasets; + +mod lookup; +pub use lookup::*; + +mod item; +pub use item::*; + +mod field; +pub use field::*; + +#[derive(OpenApi)] +#[openapi( + tags(), + paths(lookup, item_get, get_field), + components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery)) +)] +pub(crate) struct Api; + +impl Datasets { + #[inline] + pub fn router(self: Arc, with_docs: bool) -> Router<()> { + let mut router = Router::new() + .route("/lookup", post(lookup)) + .route("/item", get(item_get)) + .route("/field", get(get_field)) + .with_state(self.clone()); + + if with_docs { + let docs_path = "/docs"; + let docs = SwaggerUi::new(docs_path) + .url(format!("{}/openapi.json", docs_path), Api::openapi()); + + router = router.merge(docs); + } + + router + } +} diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index b0ef0d5..93687fd 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -9,7 +9,7 @@ workspace = true [dependencies] pile-toolbox = { workspace = true } -pile-dataset = { workspace = true } +pile-dataset = { workspace = true, features = ["axum"] } pile-config = { workspace = true } tracing = { workspace = true } @@ -25,7 +25,4 @@ tracing-indicatif = { workspace = true } anstyle = { workspace = true } toml = { workspace = true } serde_json = { workspace = true } - axum = { workspace = true } -utoipa = { workspace = true } -utoipa-swagger-ui = { workspace = true } diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 5c4bb26..7043857 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -59,7 +59,7 @@ pub enum SubCommand { /// Expose a dataset via an http api Serve { #[command(flatten)] - cmd: serve::cli::ServeCommand, + cmd: serve::ServeCommand, }, } diff --git a/crates/pile/src/command/serve/cli.rs b/crates/pile/src/command/serve.rs similarity index 93% rename from crates/pile/src/command/serve/cli.rs rename to crates/pile/src/command/serve.rs index d1f954b..b076b31 100644 --- a/crates/pile/src/command/serve/cli.rs +++ b/crates/pile/src/command/serve.rs @@ -5,11 +5,7 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use std::{fmt::Debug, path::PathBuf, sync::Arc}; use tracing::{error, info}; -use crate::{CliCmd, GlobalContext, command::serve::api}; - -pub(super) struct ServeState { - pub ds: Datasets, -} +use crate::{CliCmd, GlobalContext}; #[derive(Debug, Args)] pub struct ServeCommand { @@ -53,7 +49,8 @@ impl CliCmd for ServeCommand { })?; } - let app = api::router(Arc::new(ServeState { ds })) + let app = Arc::new(ds) + .router(true) .into_make_service_with_connect_info::(); let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await { diff --git a/crates/pile/src/command/serve/api.rs b/crates/pile/src/command/serve/api.rs deleted file mode 100644 index 6b6ff90..0000000 --- a/crates/pile/src/command/serve/api.rs +++ /dev/null @@ -1,238 +0,0 @@ -use axum::{ - Json, Router, - extract::{DefaultBodyLimit, Query, State}, - http::{StatusCode, header}, - response::{IntoResponse, Response}, - routing::{get, post}, -}; -use pile_config::{Label, objectpath::ObjectPath}; -use pile_dataset::{AsyncReader, PileValue, extract::MetaExtractor}; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tracing::debug; -use utoipa::{OpenApi, ToSchema}; -use utoipa_swagger_ui::SwaggerUi; - -use crate::command::serve::cli::ServeState; - -#[derive(OpenApi)] -#[openapi( - tags(), - paths(lookup, item_get, field_get), - components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery)) -)] -pub(super) struct Api; - -#[inline] -pub(super) fn router(state: Arc) -> Router<()> { - let docs_path = "/docs"; - let docs = SwaggerUi::new(docs_path).url(format!("{}/openapi.json", docs_path), Api::openapi()); - - Router::new() - .route("/lookup", post(lookup)) - .route("/item", get(item_get)) - .route("/field", get(field_get)) - .merge(docs) - .with_state(state) - .layer(DefaultBodyLimit::max(32 * 1024 * 1024)) -} - -// -// MARK: lookup -// - -#[derive(Serialize, Deserialize, ToSchema, Debug)] -pub struct LookupRequest { - pub query: String, - - #[serde(default)] - pub limit: Option, -} - -#[derive(Debug, Serialize, Deserialize, ToSchema)] -struct LookupResponse { - pub results: Vec, -} - -#[derive(Debug, Serialize, Deserialize, ToSchema)] -pub struct LookupResult { - pub score: f32, - pub source: String, - pub key: String, -} - -/// Search a user's captures -#[utoipa::path( - post, - path = "/lookup", - responses( - (status = 200, description = "Search results", body = Vec), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "URL not found"), - (status = 500, description = "Internal server error"), - ) -)] -async fn lookup(State(state): State>, Json(body): Json) -> Response { - debug!( - message = "Serving /lookup", - query = body.query, - limit = body.limit.unwrap_or(10) - ); - - let results: Vec = - match state.ds.fts_lookup(&body.query, body.limit.unwrap_or(10)) { - Ok(x) => x - .into_iter() - .map(|x| LookupResult { - key: x.key, - score: x.score, - source: x.source.into(), - }) - .collect(), - - Err(error) => { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")).into_response(); - } - }; - - return (StatusCode::OK, Json(LookupResponse { results })).into_response(); -} - -// -// MARK: item -// - -#[derive(Deserialize, ToSchema)] -struct ItemQuery { - source: String, - key: String, -} - -/// Fetch the raw bytes of an item by source and key -#[utoipa::path( - get, - path = "/item", - params( - ("source" = String, Query, description = "Source label"), - ("key" = String, Query, description = "Item key"), - ), - responses( - (status = 200, description = "Raw item bytes"), - (status = 400, description = "Invalid source label"), - (status = 404, description = "Item not found"), - (status = 500, description = "Internal server error"), - ) -)] -async fn item_get( - State(state): State>, - Query(params): Query, -) -> Response { - debug!( - message = "Serving /item", - source = params.source, - key = params.key - ); - - let label = match Label::try_from(params.source) { - Ok(l) => l, - Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), - }; - - let Some(item) = state.ds.get(&label, ¶ms.key).await else { - return StatusCode::NOT_FOUND.into_response(); - }; - - let mime = item.mime().to_string(); - - let mut reader = match item.read().await { - Ok(r) => r, - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - }; - - match reader.read_to_end().await { - Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - } -} - -// -// MARK: field -// - -#[derive(Deserialize, ToSchema)] -struct FieldQuery { - source: String, - key: String, - path: String, -} - -/// Extract a specific field from an item's metadata -#[utoipa::path( - get, - path = "/field", - params( - ("source" = String, Query, description = "Source label"), - ("key" = String, Query, description = "Item key"), - ("path" = String, Query, description = "Object path (e.g. $.flac.title)"), - ), - responses( - (status = 200, description = "Field value as JSON"), - (status = 400, description = "Invalid source label or path"), - (status = 404, description = "Item or field not found"), - (status = 500, description = "Internal server error"), - ) -)] -async fn field_get( - State(state): State>, - Query(params): Query, -) -> Response { - debug!( - message = "Serving /field", - source = params.source, - key = params.key, - path = params.path, - ); - - let label = match Label::try_from(params.source) { - Ok(l) => l, - Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), - }; - - let path: ObjectPath = match params.path.parse() { - Ok(p) => p, - Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), - }; - - let Some(item) = state.ds.get(&label, ¶ms.key).await else { - return StatusCode::NOT_FOUND.into_response(); - }; - - let extractor = MetaExtractor::new(&item); - let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor)); - - let value = match root.query(&path).await { - Ok(Some(v)) => v, - Ok(None) => return StatusCode::NOT_FOUND.into_response(), - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - }; - - match value { - PileValue::String(s) => ( - StatusCode::OK, - [(header::CONTENT_TYPE, "text/plain")], - s.to_string(), - ) - .into_response(), - PileValue::Blob { mime, bytes } => ( - StatusCode::OK, - [(header::CONTENT_TYPE, mime.to_string())], - bytes.as_ref().clone(), - ) - .into_response(), - _ => match value.to_json().await { - Ok(json) => (StatusCode::OK, Json(json)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - }, - } -} diff --git a/crates/pile/src/command/serve/mod.rs b/crates/pile/src/command/serve/mod.rs deleted file mode 100644 index 95f6506..0000000 --- a/crates/pile/src/command/serve/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod api; -pub mod cli; diff --git a/crates/pile/src/main.rs b/crates/pile/src/main.rs index 30515fc..cfedace 100644 --- a/crates/pile/src/main.rs +++ b/crates/pile/src/main.rs @@ -1,10 +1,9 @@ -use std::process::ExitCode; - use anyhow::{Context, Result}; use clap::Parser; use config::LoggingPreset; use indicatif::MultiProgress; use pile_toolbox::cancelabletask::CancelableTaskResult; +use std::process::ExitCode; use tracing::{error, warn}; use tracing_indicatif::{IndicatifWriter, writer::Stderr}; use tracing_subscriber::fmt::MakeWriter;