From c9d99e87194a612da2ba414614427a51786e85d9 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Wed, 11 Mar 2026 10:37:19 -0700 Subject: [PATCH] Stream items in `/item` --- crates/pile-dataset/src/serve/item.rs | 31 +++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/pile-dataset/src/serve/item.rs b/crates/pile-dataset/src/serve/item.rs index 83bbc1e..ae0ab5a 100644 --- a/crates/pile-dataset/src/serve/item.rs +++ b/crates/pile-dataset/src/serve/item.rs @@ -1,4 +1,5 @@ use axum::{ + body::Body, extract::{Query, State}, http::{StatusCode, header}, response::{IntoResponse, Response}, @@ -7,6 +8,8 @@ use pile_config::Label; use pile_value::value::AsyncReader; use serde::Deserialize; use std::{sync::Arc, time::Instant}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tracing::debug; use utoipa::ToSchema; @@ -67,8 +70,28 @@ pub async fn item_get( time_ms = start.elapsed().as_millis() ); - match reader.read_to_end().await { - Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), - } + let (tx, rx) = mpsc::channel::, std::io::Error>>(8); + + tokio::spawn(async move { + // 1MiB chunks + let mut buf = vec![0u8; 65536 * 8]; + + loop { + match reader.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx.send(Ok(buf[..n].to_vec())).await.is_err() { + break; + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + let body = Body::from_stream(ReceiverStream::new(rx)); + (StatusCode::OK, [(header::CONTENT_TYPE, mime)], body).into_response() }