Move router to pile-dataset

This commit is contained in:
2026-03-10 17:43:25 -07:00
parent 614d3273f0
commit 899b47b169
13 changed files with 321 additions and 255 deletions

6
Cargo.lock generated
View File

@@ -2391,8 +2391,6 @@ dependencies = [
"tracing",
"tracing-indicatif",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]
@@ -2411,6 +2409,7 @@ version = "0.0.1"
dependencies = [
"async-trait",
"aws-sdk-s3",
"axum",
"blake3",
"chrono",
"epub",
@@ -2423,6 +2422,7 @@ dependencies = [
"pile-config",
"pile-flac",
"pile-toolbox",
"serde",
"serde_json",
"smartstring",
"tantivy",
@@ -2431,6 +2431,8 @@ dependencies = [
"tokio-stream",
"toml",
"tracing",
"utoipa",
"utoipa-swagger-ui",
"walkdir",
]

View File

@@ -32,3 +32,12 @@ async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true }
mime = { workspace = true }
mime_guess = { workspace = true }
serde = { workspace = true }
axum = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
[features]
default = []
axum = ["dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"]

View File

@@ -16,3 +16,6 @@ pub use value::*;
pub mod extract;
pub mod index;
pub mod source;
#[cfg(feature = "axum")]
pub mod serve;

View File

@@ -0,0 +1,99 @@
use axum::{
Json,
extract::{Query, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
};
use pile_config::{Label, objectpath::ObjectPath};
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::{Datasets, PileValue, extract::MetaExtractor};
#[derive(Deserialize, ToSchema)]
pub struct FieldQuery {
source: String,
key: String,
path: String,
}
/// Extract a specific field from an item's metadata
#[utoipa::path(
get,
path = "/field",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title)"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn get_field(
State(state): State<Arc<Datasets>>,
Query(params): Query<FieldQuery>,
) -> Response {
let start = Instant::now();
debug!(
message = "Serving /field",
source = params.source,
key = params.key,
path = params.path,
);
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let path: ObjectPath = match params.path.parse() {
Ok(p) => p,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extractor = MetaExtractor::new(&item);
let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor));
let value = match root.query(&path).await {
Ok(Some(v)) => v,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
debug!(
message = "Served /field",
source = params.source,
key = params.key,
path = params.path,
time_ms = start.elapsed().as_millis()
);
match value {
PileValue::String(s) => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain")],
s.to_string(),
)
.into_response(),
PileValue::Blob { mime, bytes } => (
StatusCode::OK,
[(header::CONTENT_TYPE, mime.to_string())],
bytes.as_ref().clone(),
)
.into_response(),
_ => match value.to_json().await {
Ok(json) => (StatusCode::OK, Json(json)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}

View File

@@ -0,0 +1,73 @@
use axum::{
extract::{Query, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
};
use pile_config::Label;
use serde::Deserialize;
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::{AsyncReader, Datasets};
#[derive(Deserialize, ToSchema)]
pub struct ItemQuery {
source: String,
key: String,
}
/// Fetch the raw bytes of an item by source and key
#[utoipa::path(
get,
path = "/item",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
),
responses(
(status = 200, description = "Raw item bytes"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn item_get(
State(state): State<Arc<Datasets>>,
Query(params): Query<ItemQuery>,
) -> Response {
let start = Instant::now();
debug!(
message = "Serving /item",
source = params.source,
key = params.key
);
let label = match Label::try_from(params.source.clone()) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let mime = item.mime().to_string();
let mut reader = match item.read().await {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
debug!(
message = "Served /item",
source = params.source,
key = params.key,
time_ms = start.elapsed().as_millis()
);
match reader.read_to_end().await {
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}

View File

@@ -0,0 +1,80 @@
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Instant};
use tracing::debug;
use utoipa::ToSchema;
use crate::Datasets;
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest {
pub query: String,
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct LookupResponse {
pub results: Vec<LookupResult>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct LookupResult {
pub score: f32,
pub source: String,
pub key: String,
}
/// Search for an item in this dataset
#[utoipa::path(
post,
path = "/lookup",
responses(
(status = 200, description = "Search results", body = Vec<LookupResponse>),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 404, description = "URL not found"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn lookup(
State(state): State<Arc<Datasets>>,
Json(body): Json<LookupRequest>,
) -> Response {
let start = Instant::now();
debug!(
message = "Serving /lookup",
query = body.query,
limit = body.limit.unwrap_or(10)
);
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, body.limit.unwrap_or(10)) {
Ok(x) => x
.into_iter()
.map(|x| LookupResult {
key: x.key,
score: x.score,
source: x.source.into(),
})
.collect(),
Err(error) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")).into_response();
}
};
debug!(
message = "Served /lookup",
query = body.query,
limit = body.limit.unwrap_or(10),
time_ms = start.elapsed().as_millis()
);
return (StatusCode::OK, Json(LookupResponse { results })).into_response();
}

View File

@@ -0,0 +1,47 @@
use axum::{
Router,
routing::{get, post},
};
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::Datasets;
mod lookup;
pub use lookup::*;
mod item;
pub use item::*;
mod field;
pub use field::*;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(lookup, item_get, get_field),
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery))
)]
pub(crate) struct Api;
impl Datasets {
#[inline]
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/item", get(item_get))
.route("/field", get(get_field))
.with_state(self.clone());
if with_docs {
let docs_path = "/docs";
let docs = SwaggerUi::new(docs_path)
.url(format!("{}/openapi.json", docs_path), Api::openapi());
router = router.merge(docs);
}
router
}
}

View File

@@ -9,7 +9,7 @@ workspace = true
[dependencies]
pile-toolbox = { workspace = true }
pile-dataset = { workspace = true }
pile-dataset = { workspace = true, features = ["axum"] }
pile-config = { workspace = true }
tracing = { workspace = true }
@@ -25,7 +25,4 @@ tracing-indicatif = { workspace = true }
anstyle = { workspace = true }
toml = { workspace = true }
serde_json = { workspace = true }
axum = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }

View File

@@ -59,7 +59,7 @@ pub enum SubCommand {
/// Expose a dataset via an http api
Serve {
#[command(flatten)]
cmd: serve::cli::ServeCommand,
cmd: serve::ServeCommand,
},
}

View File

@@ -5,11 +5,7 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use tracing::{error, info};
use crate::{CliCmd, GlobalContext, command::serve::api};
pub(super) struct ServeState {
pub ds: Datasets,
}
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ServeCommand {
@@ -53,7 +49,8 @@ impl CliCmd for ServeCommand {
})?;
}
let app = api::router(Arc::new(ServeState { ds }))
let app = Arc::new(ds)
.router(true)
.into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {

View File

@@ -1,238 +0,0 @@
use axum::{
Json, Router,
extract::{DefaultBodyLimit, Query, State},
http::{StatusCode, header},
response::{IntoResponse, Response},
routing::{get, post},
};
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::{AsyncReader, PileValue, extract::MetaExtractor};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;
use crate::command::serve::cli::ServeState;
#[derive(OpenApi)]
#[openapi(
tags(),
paths(lookup, item_get, field_get),
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery))
)]
pub(super) struct Api;
#[inline]
pub(super) fn router(state: Arc<ServeState>) -> Router<()> {
let docs_path = "/docs";
let docs = SwaggerUi::new(docs_path).url(format!("{}/openapi.json", docs_path), Api::openapi());
Router::new()
.route("/lookup", post(lookup))
.route("/item", get(item_get))
.route("/field", get(field_get))
.merge(docs)
.with_state(state)
.layer(DefaultBodyLimit::max(32 * 1024 * 1024))
}
//
// MARK: lookup
//
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LookupRequest {
pub query: String,
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
struct LookupResponse {
pub results: Vec<LookupResult>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct LookupResult {
pub score: f32,
pub source: String,
pub key: String,
}
/// Search a user's captures
#[utoipa::path(
post,
path = "/lookup",
responses(
(status = 200, description = "Search results", body = Vec<LookupResponse>),
(status = 400, description = "Invalid request"),
(status = 401, description = "Unauthorized"),
(status = 404, description = "URL not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn lookup(State(state): State<Arc<ServeState>>, Json(body): Json<LookupRequest>) -> Response {
debug!(
message = "Serving /lookup",
query = body.query,
limit = body.limit.unwrap_or(10)
);
let results: Vec<LookupResult> =
match state.ds.fts_lookup(&body.query, body.limit.unwrap_or(10)) {
Ok(x) => x
.into_iter()
.map(|x| LookupResult {
key: x.key,
score: x.score,
source: x.source.into(),
})
.collect(),
Err(error) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{error:?}")).into_response();
}
};
return (StatusCode::OK, Json(LookupResponse { results })).into_response();
}
//
// MARK: item
//
#[derive(Deserialize, ToSchema)]
struct ItemQuery {
source: String,
key: String,
}
/// Fetch the raw bytes of an item by source and key
#[utoipa::path(
get,
path = "/item",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
),
responses(
(status = 200, description = "Raw item bytes"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn item_get(
State(state): State<Arc<ServeState>>,
Query(params): Query<ItemQuery>,
) -> Response {
debug!(
message = "Serving /item",
source = params.source,
key = params.key
);
let label = match Label::try_from(params.source) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.ds.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let mime = item.mime().to_string();
let mut reader = match item.read().await {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
match reader.read_to_end().await {
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
//
// MARK: field
//
#[derive(Deserialize, ToSchema)]
struct FieldQuery {
source: String,
key: String,
path: String,
}
/// Extract a specific field from an item's metadata
#[utoipa::path(
get,
path = "/field",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title)"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn field_get(
State(state): State<Arc<ServeState>>,
Query(params): Query<FieldQuery>,
) -> Response {
debug!(
message = "Serving /field",
source = params.source,
key = params.key,
path = params.path,
);
let label = match Label::try_from(params.source) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let path: ObjectPath = match params.path.parse() {
Ok(p) => p,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.ds.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extractor = MetaExtractor::new(&item);
let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor));
let value = match root.query(&path).await {
Ok(Some(v)) => v,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
match value {
PileValue::String(s) => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain")],
s.to_string(),
)
.into_response(),
PileValue::Blob { mime, bytes } => (
StatusCode::OK,
[(header::CONTENT_TYPE, mime.to_string())],
bytes.as_ref().clone(),
)
.into_response(),
_ => match value.to_json().await {
Ok(json) => (StatusCode::OK, Json(json)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}

View File

@@ -1,2 +0,0 @@
pub mod api;
pub mod cli;

View File

@@ -1,10 +1,9 @@
use std::process::ExitCode;
use anyhow::{Context, Result};
use clap::Parser;
use config::LoggingPreset;
use indicatif::MultiProgress;
use pile_toolbox::cancelabletask::CancelableTaskResult;
use std::process::ExitCode;
use tracing::{error, warn};
use tracing_indicatif::{IndicatifWriter, writer::Stderr};
use tracing_subscriber::fmt::MakeWriter;