Add server subcommand

This commit is contained in:
2026-03-23 21:43:18 -07:00
parent 76d38d48c5
commit dfcb4b0a24
7 changed files with 184 additions and 10 deletions

2
Cargo.lock generated
View File

@@ -2580,6 +2580,8 @@ dependencies = [
"tracing",
"tracing-indicatif",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
]
[[package]]

View File

@@ -73,6 +73,7 @@ pile-io = { path = "crates/pile-io" }
# Clients & servers
tantivy = "0.25.0"
servable = { version = "0.0.7", features = ["image"] }
axum = { version = "0.8.8", features = ["macros", "multipart"] }
utoipa = { version = "5.4.0", features = [
"axum_extras",

View File

@@ -48,13 +48,10 @@ pub async fn lookup(
Json(body): Json<LookupRequest>,
) -> Response {
let start = Instant::now();
debug!(
message = "Serving /lookup",
query = body.query,
limit = body.limit.unwrap_or(10)
);
let limit = body.limit.unwrap_or(128).min(1024);
debug!(message = "Serving /lookup", query = body.query, limit);
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, body.limit.unwrap_or(10)) {
let results: Vec<LookupResult> = match state.fts_lookup(&body.query, limit) {
Ok(x) => x
.into_iter()
.map(|x| LookupResult {

View File

@@ -28,20 +28,32 @@ pub(crate) struct Api;
impl Datasets {
#[inline]
pub fn router(self: Arc<Self>, with_docs: bool) -> Router<()> {
self.router_prefix(with_docs, None)
}
#[inline]
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
let mut router = Router::new()
.route("/lookup", post(lookup))
.route("/item", get(item_get))
.route("/field", get(get_field))
.with_state(self.clone());
if let Some(prefix) = prefix {
router = Router::new().nest(prefix, router);
}
if with_docs {
let docs_path = "/docs";
let docs = SwaggerUi::new(docs_path)
let docs_path = match prefix {
None => "/docs".into(),
Some(prefix) => format!("{prefix}/docs"),
};
let docs = SwaggerUi::new(docs_path.clone())
.url(format!("{}/openapi.json", docs_path), Api::openapi());
router = router.merge(docs);
}
router
}
}

View File

@@ -29,3 +29,5 @@ anstyle = { workspace = true }
toml = { workspace = true }
serde_json = { workspace = true }
axum = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }

View File

@@ -14,6 +14,7 @@ mod list;
mod lookup;
mod probe;
mod serve;
mod server;
mod upload;
use crate::{Cli, GlobalContext};
@@ -73,12 +74,18 @@ pub enum SubCommand {
cmd: item::ItemCommand,
},
/// Expose a dataset via an http api
/// Expose one dataset via a simple http api
Serve {
#[command(flatten)]
cmd: serve::ServeCommand,
},
/// Serve many datasets under an authenticated http api
Server {
#[command(flatten)]
cmd: server::ServerCommand,
},
/// Upload a filesystem source to an S3 source
Upload {
#[command(flatten)]
@@ -110,6 +117,7 @@ impl CliCmdDispatch for SubCommand {
Self::Probe { cmd } => cmd.start(ctx),
Self::Item { cmd } => cmd.start(ctx),
Self::Serve { cmd } => cmd.start(ctx),
Self::Server { cmd } => cmd.start(ctx),
Self::Upload { cmd } => cmd.start(ctx),
Self::Encrypt { cmd } => cmd.start(ctx),
Self::Decrypt { cmd } => cmd.start(ctx),

View File

@@ -0,0 +1,152 @@
use anyhow::{Context, Result};
use axum::{
Json, Router,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use clap::Args;
use pile_dataset::Datasets;
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use serde::Serialize;
use std::{fmt::Debug, path::PathBuf, sync::Arc};
use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;
use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ServerCommand {
/// Address to bind to
#[arg(default_value = "0.0.0.0:9000")]
addr: String,
/// The datasets we should serve. Can be repeated.
#[arg(long, short = 'c')]
config: Vec<PathBuf>,
/// If provided, do not serve docs
#[arg(long)]
no_docs: bool,
}
impl CliCmd for ServerCommand {
async fn run(
self,
_ctx: GlobalContext,
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let datasets = {
let mut datasets = Vec::new();
for c in &self.config {
let ds = Datasets::open(&c)
.await
.with_context(|| format!("while opening dataset for {}", c.display()))?;
datasets.push(Arc::new(ds));
}
Arc::new(datasets)
};
let mut router = Router::new();
for d in datasets.iter() {
let prefix = format!("/{}", d.config.dataset.name);
router = router.merge(d.clone().router_prefix(!self.no_docs, Some(&prefix)))
}
router = router.merge(
Router::new()
.route("/datasets", get(list_datasets))
.with_state(datasets.clone()),
);
if !self.no_docs {
let docs_path = "/docs";
let docs = SwaggerUi::new(docs_path)
.url(format!("{}/openapi.json", docs_path), Api::openapi());
router = router.merge(docs);
}
let app = router.into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
Ok(x) => x,
Err(error) => {
match error.kind() {
std::io::ErrorKind::AddrInUse => {
error!(
message = "Cannot bind to address, already in use",
addr = self.addr
);
}
_ => {
error!(message = "Error while starting server", ?error);
}
}
std::process::exit(1);
}
};
match listener.local_addr() {
Ok(x) => info!("listening on http://{x}"),
Err(error) => {
error!(message = "Could not determine local address", ?error);
return Err(anyhow::Error::from(error).into());
}
}
match axum::serve(listener, app)
.with_graceful_shutdown(async move { flag.await_cancel().await })
.await
{
Ok(_) => {}
Err(error) => {
error!(message = "Error while serving api", ?error);
return Err(anyhow::Error::from(error).into());
}
}
return Err(CancelableTaskError::Cancelled);
}
}
//
// MARK: routes
//
#[derive(OpenApi)]
#[openapi(
tags(),
paths(list_datasets),
components(schemas(ListDatasetsResponse))
)]
pub(crate) struct Api;
#[derive(Serialize, ToSchema)]
pub struct ListDatasetsResponse {
name: String,
}
/// List all datasets served by this server
#[utoipa::path(
get,
path = "/list_datasets",
responses(
(status = 200, description = "List of datasets"),
(status = 500, description = "Internal server error"),
)
)]
pub async fn list_datasets(State(state): State<Arc<Vec<Arc<Datasets>>>>) -> Response {
let datasets = state
.iter()
.map(|x| ListDatasetsResponse {
name: x.config.dataset.name.clone().into(),
})
.collect::<Vec<_>>();
return (StatusCode::OK, Json(datasets)).into_response();
}