Stream items in /item

This commit is contained in:
2026-03-11 10:37:19 -07:00
parent cd3c4b59c0
commit efb31a0af1

View File

@@ -1,4 +1,5 @@
use axum::{ use axum::{
body::Body,
extract::{Query, State}, extract::{Query, State},
http::{StatusCode, header}, http::{StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
@@ -7,6 +8,8 @@ use pile_config::Label;
use pile_value::value::AsyncReader; use pile_value::value::AsyncReader;
use serde::Deserialize; use serde::Deserialize;
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug; use tracing::debug;
use utoipa::ToSchema; use utoipa::ToSchema;
@@ -67,8 +70,28 @@ pub async fn item_get(
time_ms = start.elapsed().as_millis() time_ms = start.elapsed().as_millis()
); );
match reader.read_to_end().await { let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8);
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(), tokio::spawn(async move {
// 1MiB chunks
let mut buf = vec![0u8; 65536 * 8];
loop {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
break;
} }
}
Err(e) => {
let _ = tx.send(Err(e)).await;
break;
}
}
}
});
let body = Body::from_stream(ReceiverStream::new(rx));
(StatusCode::OK, [(header::CONTENT_TYPE, mime)], body).into_response()
} }