Add server client
This commit is contained in:
15
crates/pile-client/Cargo.toml
Normal file
15
crates/pile-client/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "pile-client"
|
||||
version = { workspace = true }
|
||||
rust-version = { workspace = true }
|
||||
edition = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
futures-core = "0.3"
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
230
crates/pile-client/src/lib.rs
Normal file
230
crates/pile-client/src/lib.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
use bytes::Bytes;
|
||||
use futures_core::Stream;
|
||||
use reqwest::{Client, StatusCode, header};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::pin::Pin;
|
||||
use thiserror::Error;
|
||||
|
||||
//
|
||||
// MARK: Error
|
||||
//
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ClientError {
|
||||
#[error("invalid bearer token")]
|
||||
InvalidToken,
|
||||
|
||||
#[error("HTTP {status}: {body}")]
|
||||
Http { status: StatusCode, body: String },
|
||||
|
||||
#[error(transparent)]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: Response types
|
||||
//
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct DatasetInfo {
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct LookupRequest {
|
||||
pub query: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub limit: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct LookupResult {
|
||||
pub score: f32,
|
||||
pub source: String,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct LookupResponse {
|
||||
pub results: Vec<LookupResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ItemRef {
|
||||
pub source: String,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ItemsResponse {
|
||||
pub items: Vec<ItemRef>,
|
||||
pub total: usize,
|
||||
pub offset: usize,
|
||||
pub limit: usize,
|
||||
}
|
||||
|
||||
/// Raw field response: the content-type and body bytes as returned by the server.
|
||||
pub struct FieldResponse {
|
||||
pub content_type: String,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: PileClient
|
||||
//
|
||||
|
||||
/// A client for a pile server. Use [`PileClient::dataset`] to get a dataset-scoped client.
|
||||
pub struct PileClient {
|
||||
base_url: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl PileClient {
|
||||
pub fn new(base_url: impl Into<String>, token: Option<&str>) -> Result<Self, ClientError> {
|
||||
let mut headers = header::HeaderMap::new();
|
||||
|
||||
if let Some(token) = token {
|
||||
let value = header::HeaderValue::from_str(&format!("Bearer {token}"))
|
||||
.map_err(|_| ClientError::InvalidToken)?;
|
||||
headers.insert(header::AUTHORIZATION, value);
|
||||
}
|
||||
|
||||
let client = Client::builder()
|
||||
.default_headers(headers)
|
||||
.build()
|
||||
.map_err(ClientError::Reqwest)?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.into(),
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a client scoped to a specific dataset (i.e. `/{name}/...`).
|
||||
pub fn dataset(&self, name: &str) -> DatasetClient {
|
||||
DatasetClient {
|
||||
base_url: format!("{}/{name}", self.base_url),
|
||||
client: self.client.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// `GET /datasets` — list all datasets served by this server.
|
||||
pub async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, ClientError> {
|
||||
let resp = self
|
||||
.client
|
||||
.get(format!("{}/datasets", self.base_url))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
check_status(resp).await?.json().await.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: DatasetClient
|
||||
//
|
||||
|
||||
/// A client scoped to a single dataset on the server.
|
||||
pub struct DatasetClient {
|
||||
base_url: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl DatasetClient {
|
||||
/// `POST /lookup` — full-text search within this dataset.
|
||||
pub async fn lookup(
|
||||
&self,
|
||||
query: impl Into<String>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<LookupResponse, ClientError> {
|
||||
let body = LookupRequest {
|
||||
query: query.into(),
|
||||
limit,
|
||||
};
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.post(format!("{}/lookup", self.base_url))
|
||||
.json(&body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
check_status(resp).await?.json().await.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// `GET /item` — stream the raw bytes of an item.
|
||||
///
|
||||
/// The returned stream yields chunks as they arrive from the server.
|
||||
pub async fn get_item(
|
||||
&self,
|
||||
source: &str,
|
||||
key: &str,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
|
||||
let resp = self
|
||||
.client
|
||||
.get(format!("{}/item", self.base_url))
|
||||
.query(&[("source", source), ("key", key)])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
|
||||
}
|
||||
|
||||
/// `GET /field` — extract a field from an item by object path (e.g. `$.flac.title`).
|
||||
pub async fn get_field(
|
||||
&self,
|
||||
source: &str,
|
||||
key: &str,
|
||||
path: &str,
|
||||
) -> Result<FieldResponse, ClientError> {
|
||||
let resp = self
|
||||
.client
|
||||
.get(format!("{}/field", self.base_url))
|
||||
.query(&[("source", source), ("key", key), ("path", path)])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let resp = check_status(resp).await?;
|
||||
|
||||
let content_type = resp
|
||||
.headers()
|
||||
.get(header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("application/octet-stream")
|
||||
.to_owned();
|
||||
|
||||
let data = resp.bytes().await?;
|
||||
|
||||
Ok(FieldResponse { content_type, data })
|
||||
}
|
||||
|
||||
/// `GET /items` — paginate over all items in this dataset, ordered by (source, key).
|
||||
pub async fn list_items(
|
||||
&self,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
) -> Result<ItemsResponse, ClientError> {
|
||||
let resp = self
|
||||
.client
|
||||
.get(format!("{}/items", self.base_url))
|
||||
.query(&[("offset", offset), ("limit", limit)])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
check_status(resp).await?.json().await.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: helpers
|
||||
//
|
||||
|
||||
async fn check_status(resp: reqwest::Response) -> Result<reqwest::Response, ClientError> {
|
||||
let status = resp.status();
|
||||
if status.is_success() {
|
||||
return Ok(resp);
|
||||
}
|
||||
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
Err(ClientError::Http { status, body })
|
||||
}
|
||||
@@ -61,6 +61,13 @@ impl Dataset {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter_page(&self, offset: usize, limit: usize) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
|
||||
match self {
|
||||
Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||
Self::S3(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
match self {
|
||||
Self::Dir(ds) => ds.latest_change().await,
|
||||
|
||||
104
crates/pile-dataset/src/serve/items.rs
Normal file
104
crates/pile-dataset/src/serve/items.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use axum::{
|
||||
Json,
|
||||
extract::{Query, State},
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ItemsQuery {
|
||||
#[serde(default)]
|
||||
offset: usize,
|
||||
#[serde(default = "default_limit")]
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
fn default_limit() -> usize {
|
||||
100
|
||||
}
|
||||
|
||||
#[derive(Serialize, ToSchema)]
|
||||
pub struct ItemsResponse {
|
||||
pub items: Vec<ItemRef>,
|
||||
pub total: usize,
|
||||
pub offset: usize,
|
||||
pub limit: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, ToSchema)]
|
||||
pub struct ItemRef {
|
||||
pub source: String,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
/// List all items across all sources with consistent ordering, paginated by offset and limit
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/items",
|
||||
params(
|
||||
("offset" = usize, Query, description = "Number of items to skip"),
|
||||
("limit" = usize, Query, description = "Maximum number of items to return (max 1000)"),
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Paginated list of items", body = ItemsResponse),
|
||||
)
|
||||
)]
|
||||
pub async fn items_list(
|
||||
State(state): State<Arc<Datasets>>,
|
||||
Query(params): Query<ItemsQuery>,
|
||||
) -> Response {
|
||||
let limit = params.limit.min(1000);
|
||||
let offset = params.offset;
|
||||
|
||||
debug!(message = "Serving /items", offset, limit);
|
||||
|
||||
// Sort sources by label for a consistent global order: (source, key)
|
||||
let mut source_labels: Vec<_> = state.sources.keys().collect();
|
||||
source_labels.sort();
|
||||
|
||||
let mut items: Vec<ItemRef> = Vec::with_capacity(limit);
|
||||
let mut total = 0usize;
|
||||
let mut remaining_offset = offset;
|
||||
|
||||
for label in source_labels {
|
||||
let dataset = &state.sources[label];
|
||||
let source_len = dataset.len();
|
||||
|
||||
if remaining_offset >= source_len {
|
||||
// This entire source is before our window; skip it efficiently
|
||||
remaining_offset -= source_len;
|
||||
total += source_len;
|
||||
continue;
|
||||
}
|
||||
|
||||
let want = (limit - items.len()).min(source_len - remaining_offset);
|
||||
let source_str = label.as_str().to_owned();
|
||||
for item in dataset.iter_page(remaining_offset, want) {
|
||||
items.push(ItemRef {
|
||||
source: source_str.clone(),
|
||||
key: item.key().to_string(),
|
||||
});
|
||||
}
|
||||
remaining_offset = 0;
|
||||
total += source_len;
|
||||
}
|
||||
|
||||
debug!(message = "Served /items", offset, limit, total);
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(ItemsResponse {
|
||||
items,
|
||||
total,
|
||||
offset,
|
||||
limit,
|
||||
}),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
@@ -17,11 +17,14 @@ pub use item::*;
|
||||
mod field;
|
||||
pub use field::*;
|
||||
|
||||
mod items;
|
||||
pub use items::*;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
tags(),
|
||||
paths(lookup, item_get, get_field),
|
||||
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery))
|
||||
paths(lookup, item_get, get_field, items_list),
|
||||
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery, ItemsQuery, ItemsResponse, ItemRef))
|
||||
)]
|
||||
pub(crate) struct Api;
|
||||
|
||||
@@ -37,6 +40,7 @@ impl Datasets {
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/item", get(item_get))
|
||||
.route("/field", get(get_field))
|
||||
.route("/items", get(items_list))
|
||||
.with_state(self.clone());
|
||||
|
||||
if let Some(prefix) = prefix {
|
||||
|
||||
@@ -5,7 +5,7 @@ use pile_config::{
|
||||
};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
@@ -22,7 +22,7 @@ pub struct DirDataSource {
|
||||
pub name: Label,
|
||||
pub dir: PathBuf,
|
||||
pub pattern: GroupPattern,
|
||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
||||
pub index: OnceLock<BTreeMap<SmartString<LazyCompact>, Item>>,
|
||||
}
|
||||
|
||||
impl DirDataSource {
|
||||
@@ -73,7 +73,7 @@ impl DirDataSource {
|
||||
// MARK: resolve groups
|
||||
//
|
||||
|
||||
let mut index = HashMap::new();
|
||||
let mut index = BTreeMap::new();
|
||||
'entry: for path in paths_items.difference(&paths_grouped_items) {
|
||||
let path_str = match path.to_str() {
|
||||
Some(x) => x,
|
||||
|
||||
@@ -17,9 +17,18 @@ pub trait DataSource {
|
||||
key: &str,
|
||||
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
|
||||
|
||||
/// Iterate over all items in this source in an arbitrary order
|
||||
/// Iterate over all items in this source in sorted key order
|
||||
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
|
||||
|
||||
/// Iterate over a page of items, sorted by key
|
||||
fn iter_page(
|
||||
&self,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
) -> impl Iterator<Item = &crate::value::Item> {
|
||||
self.iter().skip(offset).take(limit)
|
||||
}
|
||||
|
||||
/// Return the time of the latest change to the data in this source
|
||||
fn latest_change(
|
||||
&self,
|
||||
|
||||
@@ -6,7 +6,7 @@ use pile_config::{
|
||||
use pile_io::S3Client;
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
@@ -24,7 +24,7 @@ pub struct S3DataSource {
|
||||
pub prefix: Option<SmartString<LazyCompact>>,
|
||||
pub pattern: GroupPattern,
|
||||
pub encryption_key: Option<[u8; 32]>,
|
||||
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
|
||||
pub index: OnceLock<BTreeMap<SmartString<LazyCompact>, Item>>,
|
||||
}
|
||||
|
||||
impl S3DataSource {
|
||||
@@ -119,7 +119,7 @@ impl S3DataSource {
|
||||
}
|
||||
}
|
||||
|
||||
let mut index = HashMap::new();
|
||||
let mut index = BTreeMap::new();
|
||||
for key in all_keys.difference(&keys_grouped) {
|
||||
let groups = resolve_groups(&source.pattern, key).await;
|
||||
let group = groups
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use anyhow::{Context, Result};
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::State,
|
||||
extract::{Request, State},
|
||||
http::StatusCode,
|
||||
middleware::{Next, from_fn_with_state},
|
||||
response::{IntoResponse, Response},
|
||||
routing::get,
|
||||
};
|
||||
@@ -30,6 +31,10 @@ pub struct ServerCommand {
|
||||
/// If provided, do not serve docs
|
||||
#[arg(long)]
|
||||
no_docs: bool,
|
||||
|
||||
/// If provided, require this bearer token for all requests
|
||||
#[arg(long)]
|
||||
token: Option<String>,
|
||||
}
|
||||
|
||||
impl CliCmd for ServerCommand {
|
||||
@@ -50,6 +55,8 @@ impl CliCmd for ServerCommand {
|
||||
Arc::new(datasets)
|
||||
};
|
||||
|
||||
let bearer = BearerToken(self.token.map(Arc::new));
|
||||
|
||||
let mut router = Router::new();
|
||||
for d in datasets.iter() {
|
||||
let prefix = format!("/{}", d.config.dataset.name);
|
||||
@@ -70,6 +77,8 @@ impl CliCmd for ServerCommand {
|
||||
router = router.merge(docs);
|
||||
}
|
||||
|
||||
router = router.layer(from_fn_with_state(bearer, bearer_auth_middleware));
|
||||
|
||||
let app = router.into_make_service_with_connect_info::<std::net::SocketAddr>();
|
||||
|
||||
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
|
||||
@@ -114,6 +123,36 @@ impl CliCmd for ServerCommand {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: bearer auth middleware
|
||||
//
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BearerToken(Option<Arc<String>>);
|
||||
|
||||
async fn bearer_auth_middleware(
|
||||
State(BearerToken(expected)): State<BearerToken>,
|
||||
request: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
let Some(expected) = expected else {
|
||||
return next.run(request).await;
|
||||
};
|
||||
|
||||
let authorized = request
|
||||
.headers()
|
||||
.get(axum::http::header::AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|v| v.strip_prefix("Bearer "))
|
||||
.is_some_and(|token| token == expected.as_str());
|
||||
|
||||
if authorized {
|
||||
next.run(request).await
|
||||
} else {
|
||||
StatusCode::UNAUTHORIZED.into_response()
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: routes
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user