Proxy router
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2705,11 +2705,13 @@ dependencies = [
|
|||||||
name = "pile-client"
|
name = "pile-client"
|
||||||
version = "0.0.2"
|
version = "0.0.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"axum",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -13,3 +13,5 @@ futures-core = "0.3"
|
|||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
bytes = { workspace = true }
|
bytes = { workspace = true }
|
||||||
|
axum = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
|||||||
@@ -1,9 +1,14 @@
|
|||||||
|
use axum::{
|
||||||
|
Router, body::Body as AxumBody, extract::State, response::Response as AxumResponse,
|
||||||
|
routing::any,
|
||||||
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use reqwest::{Client, StatusCode, header};
|
use reqwest::{Client, StatusCode, header};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tracing::{trace, warn};
|
||||||
|
|
||||||
//
|
//
|
||||||
// MARK: Error
|
// MARK: Error
|
||||||
@@ -77,6 +82,7 @@ pub struct FieldResponse {
|
|||||||
pub struct PileClient {
|
pub struct PileClient {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
client: Client,
|
client: Client,
|
||||||
|
token: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PileClient {
|
impl PileClient {
|
||||||
@@ -97,6 +103,7 @@ impl PileClient {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
base_url: base_url.into(),
|
base_url: base_url.into(),
|
||||||
client,
|
client,
|
||||||
|
token: token.map(str::to_owned),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,6 +112,7 @@ impl PileClient {
|
|||||||
DatasetClient {
|
DatasetClient {
|
||||||
base_url: format!("{}/{name}", self.base_url),
|
base_url: format!("{}/{name}", self.base_url),
|
||||||
client: self.client.clone(),
|
client: self.client.clone(),
|
||||||
|
token: self.token.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,6 +136,7 @@ impl PileClient {
|
|||||||
pub struct DatasetClient {
|
pub struct DatasetClient {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
client: Client,
|
client: Client,
|
||||||
|
token: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DatasetClient {
|
impl DatasetClient {
|
||||||
@@ -213,6 +222,106 @@ impl DatasetClient {
|
|||||||
|
|
||||||
check_status(resp).await?.json().await.map_err(Into::into)
|
check_status(resp).await?.json().await.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns an axum [`Router`] that proxies all requests to this dataset's
|
||||||
|
/// endpoints on the remote pile server, streaming responses without buffering.
|
||||||
|
/// All headers are forwarded; hop-by-hop headers are stripped.
|
||||||
|
pub fn proxy_router(&self) -> Router {
|
||||||
|
let state = ProxyState {
|
||||||
|
base_url: self.base_url.clone(),
|
||||||
|
client: self.client.clone(),
|
||||||
|
token: self.token.clone(),
|
||||||
|
};
|
||||||
|
Router::new()
|
||||||
|
.route("/", any(proxy_handler))
|
||||||
|
.route("/{*path}", any(proxy_handler))
|
||||||
|
.with_state(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// MARK: Proxy
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ProxyState {
|
||||||
|
base_url: String,
|
||||||
|
client: Client,
|
||||||
|
token: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn proxy_handler(
|
||||||
|
State(state): State<ProxyState>,
|
||||||
|
req: axum::extract::Request,
|
||||||
|
) -> AxumResponse {
|
||||||
|
let path = req.uri().path().to_owned();
|
||||||
|
let query_str = req
|
||||||
|
.uri()
|
||||||
|
.query()
|
||||||
|
.map(|q| format!("?{q}"))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let method = req.method().clone();
|
||||||
|
|
||||||
|
let url = format!("{}{}{}", state.base_url, path, query_str);
|
||||||
|
trace!(method = %method, url, "proxying request");
|
||||||
|
let mut req_builder = state.client.request(method, &url);
|
||||||
|
|
||||||
|
// Forward all request headers except hop-by-hop and Host.
|
||||||
|
// Authorization is skipped so the client's default bearer token is used.
|
||||||
|
for (name, value) in req.headers() {
|
||||||
|
if !is_hop_by_hop(name) && name != header::HOST && name != header::AUTHORIZATION {
|
||||||
|
req_builder = req_builder.header(name, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attach bearer token if present (overrides client default for clarity).
|
||||||
|
if let Some(ref token) = state.token
|
||||||
|
&& let Ok(value) = header::HeaderValue::from_str(&format!("Bearer {token}"))
|
||||||
|
{
|
||||||
|
req_builder = req_builder.header(header::AUTHORIZATION, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream the request body upstream.
|
||||||
|
let body_stream = req.into_body().into_data_stream();
|
||||||
|
req_builder = req_builder.body(reqwest::Body::wrap_stream(body_stream));
|
||||||
|
|
||||||
|
let upstream = match req_builder.send().await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "upstream request failed");
|
||||||
|
return AxumResponse::builder()
|
||||||
|
.status(StatusCode::BAD_GATEWAY.as_u16())
|
||||||
|
.body(AxumBody::from(e.to_string()))
|
||||||
|
.unwrap_or_else(|_| AxumResponse::new(AxumBody::empty()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let status = upstream.status().as_u16();
|
||||||
|
trace!(status, "upstream response");
|
||||||
|
let resp_headers = upstream.headers().clone();
|
||||||
|
|
||||||
|
let mut builder = AxumResponse::builder().status(status);
|
||||||
|
for (name, value) in &resp_headers {
|
||||||
|
if !is_hop_by_hop(name) {
|
||||||
|
builder = builder.header(name, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream the response body without buffering.
|
||||||
|
builder
|
||||||
|
.body(AxumBody::from_stream(upstream.bytes_stream()))
|
||||||
|
.unwrap_or_else(|_| AxumResponse::new(AxumBody::empty()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_hop_by_hop(name: &header::HeaderName) -> bool {
|
||||||
|
name == header::CONNECTION
|
||||||
|
|| name == header::TRANSFER_ENCODING
|
||||||
|
|| name == header::TE
|
||||||
|
|| name == header::UPGRADE
|
||||||
|
|| name == header::PROXY_AUTHORIZATION
|
||||||
|
|| name == header::PROXY_AUTHENTICATE
|
||||||
|
|| name.as_str() == "keep-alive"
|
||||||
|
|| name.as_str() == "trailers"
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -61,7 +61,11 @@ impl Dataset {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter_page(&self, offset: usize, limit: usize) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
|
pub fn iter_page(
|
||||||
|
&self,
|
||||||
|
offset: usize,
|
||||||
|
limit: usize,
|
||||||
|
) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
|
||||||
match self {
|
match self {
|
||||||
Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)),
|
Self::Dir(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||||
Self::S3(ds) => Box::new(ds.iter_page(offset, limit)),
|
Self::S3(ds) => Box::new(ds.iter_page(offset, limit)),
|
||||||
|
|||||||
@@ -24,7 +24,16 @@ pub use items::*;
|
|||||||
#[openapi(
|
#[openapi(
|
||||||
tags(),
|
tags(),
|
||||||
paths(lookup, item_get, get_field, items_list),
|
paths(lookup, item_get, get_field, items_list),
|
||||||
components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery, ItemsQuery, ItemsResponse, ItemRef))
|
components(schemas(
|
||||||
|
LookupRequest,
|
||||||
|
LookupResponse,
|
||||||
|
LookupResult,
|
||||||
|
ItemQuery,
|
||||||
|
FieldQuery,
|
||||||
|
ItemsQuery,
|
||||||
|
ItemsResponse,
|
||||||
|
ItemRef
|
||||||
|
))
|
||||||
)]
|
)]
|
||||||
pub(crate) struct Api;
|
pub(crate) struct Api;
|
||||||
|
|
||||||
|
|||||||
@@ -21,11 +21,7 @@ pub trait DataSource {
|
|||||||
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
|
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
|
||||||
|
|
||||||
/// Iterate over a page of items, sorted by key
|
/// Iterate over a page of items, sorted by key
|
||||||
fn iter_page(
|
fn iter_page(&self, offset: usize, limit: usize) -> impl Iterator<Item = &crate::value::Item> {
|
||||||
&self,
|
|
||||||
offset: usize,
|
|
||||||
limit: usize,
|
|
||||||
) -> impl Iterator<Item = &crate::value::Item> {
|
|
||||||
self.iter().skip(offset).take(limit)
|
self.iter().skip(offset).take(limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user