From c03fac0e37462112891829c85ad9ea2819b35e18 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Wed, 11 Mar 2026 10:43:17 -0700 Subject: [PATCH] `/item` range requests --- crates/pile-dataset/src/serve/item.rs | 100 +++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 8 deletions(-) diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs index ae0ab5a..df50488 100644 --- a/crates/pile-dataset/src/serve/item.rs +++ b/crates/pile-dataset/src/serve/item.rs @@ -1,13 +1,13 @@ use axum::{ body::Body, extract::{Query, State}, - http::{StatusCode, header}, + http::{HeaderMap, StatusCode, header}, response::{IntoResponse, Response}, }; use pile_config::Label; -use pile_value::value::AsyncReader; +use pile_value::value::{AsyncReader, AsyncSeekReader}; use serde::Deserialize; -use std::{sync::Arc, time::Instant}; +use std::{io::SeekFrom, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::debug; @@ -21,6 +21,24 @@ pub struct ItemQuery { key: String, } +/// Parse a `Range: bytes=...` header value. +/// Returns `(start, end)` where either may be `None` (suffix form has `None` start). +fn parse_byte_range(s: &str) -> Option<(Option, Option)> { + let spec = s.strip_prefix("bytes=")?; + if spec.contains(',') { + return None; // multiple ranges not supported + } + if let Some(suffix) = spec.strip_prefix('-') { + return Some((None, Some(suffix.parse().ok()?))); + } + let mut parts = spec.splitn(2, '-'); + let start: u64 = parts.next()?.parse().ok()?; + let end = parts + .next() + .and_then(|e| if e.is_empty() { None } else { e.parse().ok() }); + Some((Some(start), end)) +} + /// Fetch the raw bytes of an item by source and key #[utoipa::path( get, @@ -31,14 +49,17 @@ pub struct ItemQuery { ), responses( (status = 200, description = "Raw item bytes"), + (status = 206, description = "Partial content"), (status = 400, description = "Invalid source label"), (status = 404, description = "Item not found"), + (status = 416, description = "Range not satisfiable"), (status = 500, description = "Internal server error"), ) )] pub async fn item_get( State(state): State>, Query(params): Query, + headers: HeaderMap, ) -> Response { let start = Instant::now(); debug!( @@ -63,6 +84,43 @@ pub async fn item_get( Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), }; + let total = match reader.seek(SeekFrom::End(0)).await { + Ok(n) => n, + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), + }; + + let range = headers + .get(header::RANGE) + .and_then(|v| v.to_str().ok()) + .and_then(parse_byte_range); + + // Resolve (byte_start, byte_end, content_length, is_range) + let (byte_start, byte_end, length, is_range) = match range { + Some((Some(s), e)) => { + let e = e + .unwrap_or(total.saturating_sub(1)) + .min(total.saturating_sub(1)); + if s >= total || s > e { + return ( + StatusCode::RANGE_NOT_SATISFIABLE, + [(header::CONTENT_RANGE, format!("bytes */{total}"))], + ) + .into_response(); + } + (s, e, e - s + 1, true) + } + Some((None, Some(suffix))) => { + let s = total.saturating_sub(suffix); + let e = total.saturating_sub(1); + (s, e, total.saturating_sub(s), true) + } + _ => (0, total.saturating_sub(1), total, false), + }; + + if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(); + } + debug!( message = "Served /item", source = params.source, @@ -73,13 +131,17 @@ pub async fn item_get( let (tx, rx) = mpsc::channel::, std::io::Error>>(8); tokio::spawn(async move { - // 1MiB chunks - let mut buf = vec![0u8; 65536 * 8]; - + let mut buf = vec![0u8; 65536]; + let mut remaining = length; loop { - match reader.read(&mut buf).await { + if remaining == 0 { + break; + } + let to_read = (buf.len() as u64).min(remaining) as usize; + match reader.read(&mut buf[..to_read]).await { Ok(0) => break, Ok(n) => { + remaining -= n as u64; if tx.send(Ok(buf[..n].to_vec())).await.is_err() { break; } @@ -93,5 +155,27 @@ pub async fn item_get( }); let body = Body::from_stream(ReceiverStream::new(rx)); - (StatusCode::OK, [(header::CONTENT_TYPE, mime)], body).into_response() + let status = if is_range { + StatusCode::PARTIAL_CONTENT + } else { + StatusCode::OK + }; + + let mut builder = axum::http::Response::builder() + .status(status) + .header(header::CONTENT_TYPE, mime) + .header(header::ACCEPT_RANGES, "bytes") + .header(header::CONTENT_LENGTH, length); + + if is_range { + builder = builder.header( + header::CONTENT_RANGE, + format!("bytes {byte_start}-{byte_end}/{total}"), + ); + } + + builder + .body(body) + .map(IntoResponse::into_response) + .unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response()) }