use axum::{ body::Body, extract::{Query, State}, http::{HeaderMap, StatusCode, header}, response::{IntoResponse, Response}, }; use pile_config::Label; use pile_value::value::{AsyncReader, AsyncSeekReader}; use serde::Deserialize; use std::{io::SeekFrom, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::debug; use utoipa::ToSchema; use crate::Datasets; #[derive(Deserialize, ToSchema)] pub struct ItemQuery { source: String, key: String, } /// Parse a `Range: bytes=...` header value. /// Returns `(start, end)` where either may be `None` (suffix form has `None` start). fn parse_byte_range(s: &str) -> Option<(Option, Option)> { let spec = s.strip_prefix("bytes=")?; if spec.contains(',') { return None; // multiple ranges not supported } if let Some(suffix) = spec.strip_prefix('-') { return Some((None, Some(suffix.parse().ok()?))); } let mut parts = spec.splitn(2, '-'); let start: u64 = parts.next()?.parse().ok()?; let end = parts .next() .and_then(|e| if e.is_empty() { None } else { e.parse().ok() }); Some((Some(start), end)) } /// Fetch the raw bytes of an item by source and key #[utoipa::path( get, path = "/item", params( ("source" = String, Query, description = "Source label"), ("key" = String, Query, description = "Item key"), ), responses( (status = 200, description = "Raw item bytes"), (status = 206, description = "Partial content"), (status = 400, description = "Invalid source label"), (status = 404, description = "Item not found"), (status = 416, description = "Range not satisfiable"), (status = 500, description = "Internal server error"), ) )] pub async fn item_get( State(state): State>, Query(params): Query, headers: HeaderMap, ) -> Response { let start = Instant::now(); debug!( message = "Serving /item", source = params.source, key = params.key ); let label = match Label::try_from(params.source.clone()) { Ok(l) => l, Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(), }; let Some(item) = state.get(&label, ¶ms.key).await else { return StatusCode::NOT_FOUND.into_response(); }; let mime = item.mime().to_string(); let mut reader = match item.read().await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), }; let total = match reader.seek(SeekFrom::End(0)).await { Ok(n) => n, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), }; let range = headers .get(header::RANGE) .and_then(|v| v.to_str().ok()) .and_then(parse_byte_range); // Resolve (byte_start, byte_end, content_length, is_range) let (byte_start, byte_end, length, is_range) = match range { Some((Some(s), e)) => { let e = e .unwrap_or(total.saturating_sub(1)) .min(total.saturating_sub(1)); if s >= total || s > e { return ( StatusCode::RANGE_NOT_SATISFIABLE, [(header::CONTENT_RANGE, format!("bytes */{total}"))], ) .into_response(); } (s, e, e - s + 1, true) } Some((None, Some(suffix))) => { let s = total.saturating_sub(suffix); let e = total.saturating_sub(1); (s, e, total.saturating_sub(s), true) } _ => (0, total.saturating_sub(1), total, false), }; if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await { return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(); } debug!( message = "Served /item", source = params.source, key = params.key, time_ms = start.elapsed().as_millis() ); let (tx, rx) = mpsc::channel::, std::io::Error>>(8); tokio::spawn(async move { let mut buf = vec![0u8; 65536]; let mut remaining = length; loop { if remaining == 0 { break; } let to_read = (buf.len() as u64).min(remaining) as usize; match reader.read(&mut buf[..to_read]).await { Ok(0) => break, Ok(n) => { remaining -= n as u64; if tx.send(Ok(buf[..n].to_vec())).await.is_err() { break; } } Err(e) => { let _ = tx.send(Err(e)).await; break; } } } }); let body = Body::from_stream(ReceiverStream::new(rx)); let status = if is_range { StatusCode::PARTIAL_CONTENT } else { StatusCode::OK }; let mut builder = axum::http::Response::builder() .status(status) .header(header::CONTENT_TYPE, mime) .header(header::ACCEPT_RANGES, "bytes") .header(header::CONTENT_LENGTH, length); if is_range { builder = builder.header( header::CONTENT_RANGE, format!("bytes {byte_start}-{byte_end}/{total}"), ); } builder .body(body) .map(IntoResponse::into_response) .unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response()) }