diff --git a/Cargo.lock b/Cargo.lock index 6999763..dca7ba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2580,6 +2580,8 @@ dependencies = [ "tracing", "tracing-indicatif", "tracing-subscriber", + "utoipa", + "utoipa-swagger-ui", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 855ca0a..1ee617b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ pile-io = { path = "crates/pile-io" } # Clients & servers tantivy = "0.25.0" +servable = { version = "0.0.7", features = ["image"] } axum = { version = "0.8.8", features = ["macros", "multipart"] } utoipa = { version = "5.4.0", features = [ "axum_extras", diff --git a/crates/pile-dataset/src/serve/lookup.rs b/crates/pile-dataset/src/serve/lookup.rs index 3dd8d8d..005cfbe 100644 --- a/crates/pile-dataset/src/serve/lookup.rs +++ b/crates/pile-dataset/src/serve/lookup.rs @@ -48,13 +48,10 @@ pub async fn lookup( Json(body): Json, ) -> Response { let start = Instant::now(); - debug!( - message = "Serving /lookup", - query = body.query, - limit = body.limit.unwrap_or(10) - ); + let limit = body.limit.unwrap_or(128).min(1024); + debug!(message = "Serving /lookup", query = body.query, limit); - let results: Vec = match state.fts_lookup(&body.query, body.limit.unwrap_or(10)) { + let results: Vec = match state.fts_lookup(&body.query, limit) { Ok(x) => x .into_iter() .map(|x| LookupResult { diff --git a/crates/pile-dataset/src/serve/mod.rs b/crates/pile-dataset/src/serve/mod.rs index fd0a476..03d1159 100644 --- a/crates/pile-dataset/src/serve/mod.rs +++ b/crates/pile-dataset/src/serve/mod.rs @@ -28,20 +28,32 @@ pub(crate) struct Api; impl Datasets { #[inline] pub fn router(self: Arc, with_docs: bool) -> Router<()> { + self.router_prefix(with_docs, None) + } + + #[inline] + pub fn router_prefix(self: Arc, with_docs: bool, prefix: Option<&str>) -> Router<()> { let mut router = Router::new() .route("/lookup", post(lookup)) .route("/item", get(item_get)) .route("/field", get(get_field)) .with_state(self.clone()); + if let Some(prefix) = prefix { + router = Router::new().nest(prefix, router); + } + if with_docs { - let docs_path = "/docs"; - let docs = SwaggerUi::new(docs_path) + let docs_path = match prefix { + None => "/docs".into(), + Some(prefix) => format!("{prefix}/docs"), + }; + + let docs = SwaggerUi::new(docs_path.clone()) .url(format!("{}/openapi.json", docs_path), Api::openapi()); router = router.merge(docs); } - router } } diff --git a/crates/pile/Cargo.toml b/crates/pile/Cargo.toml index 855aab5..f7c3105 100644 --- a/crates/pile/Cargo.toml +++ b/crates/pile/Cargo.toml @@ -29,3 +29,5 @@ anstyle = { workspace = true } toml = { workspace = true } serde_json = { workspace = true } axum = { workspace = true } +utoipa = { workspace = true } +utoipa-swagger-ui = { workspace = true } diff --git a/crates/pile/src/command/mod.rs b/crates/pile/src/command/mod.rs index 6731244..3412b15 100644 --- a/crates/pile/src/command/mod.rs +++ b/crates/pile/src/command/mod.rs @@ -14,6 +14,7 @@ mod list; mod lookup; mod probe; mod serve; +mod server; mod upload; use crate::{Cli, GlobalContext}; @@ -73,12 +74,18 @@ pub enum SubCommand { cmd: item::ItemCommand, }, - /// Expose a dataset via an http api + /// Expose one dataset via a simple http api Serve { #[command(flatten)] cmd: serve::ServeCommand, }, + /// Serve many datasets under an authenticated http api + Server { + #[command(flatten)] + cmd: server::ServerCommand, + }, + /// Upload a filesystem source to an S3 source Upload { #[command(flatten)] @@ -110,6 +117,7 @@ impl CliCmdDispatch for SubCommand { Self::Probe { cmd } => cmd.start(ctx), Self::Item { cmd } => cmd.start(ctx), Self::Serve { cmd } => cmd.start(ctx), + Self::Server { cmd } => cmd.start(ctx), Self::Upload { cmd } => cmd.start(ctx), Self::Encrypt { cmd } => cmd.start(ctx), Self::Decrypt { cmd } => cmd.start(ctx), diff --git a/crates/pile/src/command/server.rs b/crates/pile/src/command/server.rs new file mode 100644 index 0000000..47437bf --- /dev/null +++ b/crates/pile/src/command/server.rs @@ -0,0 +1,152 @@ +use anyhow::{Context, Result}; +use axum::{ + Json, Router, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, +}; +use clap::Args; +use pile_dataset::Datasets; +use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; +use serde::Serialize; +use std::{fmt::Debug, path::PathBuf, sync::Arc}; +use tracing::{error, info}; +use utoipa::{OpenApi, ToSchema}; +use utoipa_swagger_ui::SwaggerUi; + +use crate::{CliCmd, GlobalContext}; + +#[derive(Debug, Args)] +pub struct ServerCommand { + /// Address to bind to + #[arg(default_value = "0.0.0.0:9000")] + addr: String, + + /// The datasets we should serve. Can be repeated. + #[arg(long, short = 'c')] + config: Vec, + + /// If provided, do not serve docs + #[arg(long)] + no_docs: bool, +} + +impl CliCmd for ServerCommand { + async fn run( + self, + _ctx: GlobalContext, + flag: CancelFlag, + ) -> Result> { + let datasets = { + let mut datasets = Vec::new(); + for c in &self.config { + let ds = Datasets::open(&c) + .await + .with_context(|| format!("while opening dataset for {}", c.display()))?; + datasets.push(Arc::new(ds)); + } + + Arc::new(datasets) + }; + + let mut router = Router::new(); + for d in datasets.iter() { + let prefix = format!("/{}", d.config.dataset.name); + router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix))) + } + + router = router.merge( + Router::new() + .route("/datasets", get(list_datasets)) + .with_state(datasets.clone()), + ); + + if !self.no_docs { + let docs_path = "/docs"; + let docs = SwaggerUi::new(docs_path) + .url(format!("{}/openapi.json", docs_path), Api::openapi()); + + router = router.merge(docs); + } + + let app = router.into_make_service_with_connect_info::(); + + let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await { + Ok(x) => x, + Err(error) => { + match error.kind() { + std::io::ErrorKind::AddrInUse => { + error!( + message = "Cannot bind to address, already in use", + addr = self.addr + ); + } + _ => { + error!(message = "Error while starting server", ?error); + } + } + + std::process::exit(1); + } + }; + + match listener.local_addr() { + Ok(x) => info!("listening on http://{x}"), + Err(error) => { + error!(message = "Could not determine local address", ?error); + return Err(anyhow::Error::from(error).into()); + } + } + + match axum::serve(listener, app) + .with_graceful_shutdown(async move { flag.await_cancel().await }) + .await + { + Ok(_) => {} + Err(error) => { + error!(message = "Error while serving api", ?error); + return Err(anyhow::Error::from(error).into()); + } + } + + return Err(CancelableTaskError::Cancelled); + } +} + +// +// MARK: routes +// + +#[derive(OpenApi)] +#[openapi( + tags(), + paths(list_datasets), + components(schemas(ListDatasetsResponse)) +)] +pub(crate) struct Api; + +#[derive(Serialize, ToSchema)] +pub struct ListDatasetsResponse { + name: String, +} + +/// List all datasets served by this server +#[utoipa::path( + get, + path = "/list_datasets", + responses( + (status = 200, description = "List of datasets"), + (status = 500, description = "Internal server error"), + ) +)] +pub async fn list_datasets(State(state): State>>>) -> Response { + let datasets = state + .iter() + .map(|x| ListDatasetsResponse { + name: x.config.dataset.name.clone().into(), + }) + .collect::>(); + + return (StatusCode::OK, Json(datasets)).into_response(); +}