/item range requests

This commit is contained in:
2026-03-11 10:43:17 -07:00
parent c9d99e8719
commit c03fac0e37

View File

@@ -1,13 +1,13 @@
use axum::{ use axum::{
body::Body, body::Body,
extract::{Query, State}, extract::{Query, State},
http::{StatusCode, header}, http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use pile_config::Label; use pile_config::Label;
use pile_value::value::AsyncReader; use pile_value::value::{AsyncReader, AsyncSeekReader};
use serde::Deserialize; use serde::Deserialize;
use std::{sync::Arc, time::Instant}; use std::{io::SeekFrom, sync::Arc, time::Instant};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::debug; use tracing::debug;
@@ -21,6 +21,24 @@ pub struct ItemQuery {
key: String, key: String,
} }
/// Parse a `Range: bytes=...` header value.
/// Returns `(start, end)` where either may be `None` (suffix form has `None` start).
fn parse_byte_range(s: &str) -> Option<(Option<u64>, Option<u64>)> {
let spec = s.strip_prefix("bytes=")?;
if spec.contains(',') {
return None; // multiple ranges not supported
}
if let Some(suffix) = spec.strip_prefix('-') {
return Some((None, Some(suffix.parse().ok()?)));
}
let mut parts = spec.splitn(2, '-');
let start: u64 = parts.next()?.parse().ok()?;
let end = parts
.next()
.and_then(|e| if e.is_empty() { None } else { e.parse().ok() });
Some((Some(start), end))
}
/// Fetch the raw bytes of an item by source and key /// Fetch the raw bytes of an item by source and key
#[utoipa::path( #[utoipa::path(
get, get,
@@ -31,14 +49,17 @@ pub struct ItemQuery {
), ),
responses( responses(
(status = 200, description = "Raw item bytes"), (status = 200, description = "Raw item bytes"),
(status = 206, description = "Partial content"),
(status = 400, description = "Invalid source label"), (status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"), (status = 404, description = "Item not found"),
(status = 416, description = "Range not satisfiable"),
(status = 500, description = "Internal server error"), (status = 500, description = "Internal server error"),
) )
)] )]
pub async fn item_get( pub async fn item_get(
State(state): State<Arc<Datasets>>, State(state): State<Arc<Datasets>>,
Query(params): Query<ItemQuery>, Query(params): Query<ItemQuery>,
headers: HeaderMap,
) -> Response { ) -> Response {
let start = Instant::now(); let start = Instant::now();
debug!( debug!(
@@ -63,6 +84,43 @@ pub async fn item_get(
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}; };
let total = match reader.seek(SeekFrom::End(0)).await {
Ok(n) => n,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
let range = headers
.get(header::RANGE)
.and_then(|v| v.to_str().ok())
.and_then(parse_byte_range);
// Resolve (byte_start, byte_end, content_length, is_range)
let (byte_start, byte_end, length, is_range) = match range {
Some((Some(s), e)) => {
let e = e
.unwrap_or(total.saturating_sub(1))
.min(total.saturating_sub(1));
if s >= total || s > e {
return (
StatusCode::RANGE_NOT_SATISFIABLE,
[(header::CONTENT_RANGE, format!("bytes */{total}"))],
)
.into_response();
}
(s, e, e - s + 1, true)
}
Some((None, Some(suffix))) => {
let s = total.saturating_sub(suffix);
let e = total.saturating_sub(1);
(s, e, total.saturating_sub(s), true)
}
_ => (0, total.saturating_sub(1), total, false),
};
if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
}
debug!( debug!(
message = "Served /item", message = "Served /item",
source = params.source, source = params.source,
@@ -73,13 +131,17 @@ pub async fn item_get(
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8); let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8);
tokio::spawn(async move { tokio::spawn(async move {
// 1MiB chunks let mut buf = vec![0u8; 65536];
let mut buf = vec![0u8; 65536 * 8]; let mut remaining = length;
loop { loop {
match reader.read(&mut buf).await { if remaining == 0 {
break;
}
let to_read = (buf.len() as u64).min(remaining) as usize;
match reader.read(&mut buf[..to_read]).await {
Ok(0) => break, Ok(0) => break,
Ok(n) => { Ok(n) => {
remaining -= n as u64;
if tx.send(Ok(buf[..n].to_vec())).await.is_err() { if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
break; break;
} }
@@ -93,5 +155,27 @@ pub async fn item_get(
}); });
let body = Body::from_stream(ReceiverStream::new(rx)); let body = Body::from_stream(ReceiverStream::new(rx));
(StatusCode::OK, [(header::CONTENT_TYPE, mime)], body).into_response() let status = if is_range {
StatusCode::PARTIAL_CONTENT
} else {
StatusCode::OK
};
let mut builder = axum::http::Response::builder()
.status(status)
.header(header::CONTENT_TYPE, mime)
.header(header::ACCEPT_RANGES, "bytes")
.header(header::CONTENT_LENGTH, length);
if is_range {
builder = builder.header(
header::CONTENT_RANGE,
format!("bytes {byte_start}-{byte_end}/{total}"),
);
}
builder
.body(body)
.map(IntoResponse::into_response)
.unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response())
} }