Stream items in api
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
|
body::Body,
|
||||||
extract::{Query, State},
|
extract::{Query, State},
|
||||||
http::{StatusCode, header},
|
http::{StatusCode, header},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
@@ -7,6 +8,8 @@ use pile_config::Label;
|
|||||||
use pile_value::value::AsyncReader;
|
use pile_value::value::AsyncReader;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::ToSchema;
|
use utoipa::ToSchema;
|
||||||
|
|
||||||
@@ -67,8 +70,28 @@ pub async fn item_get(
|
|||||||
time_ms = start.elapsed().as_millis()
|
time_ms = start.elapsed().as_millis()
|
||||||
);
|
);
|
||||||
|
|
||||||
match reader.read_to_end().await {
|
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8);
|
||||||
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
|
|
||||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
tokio::spawn(async move {
|
||||||
}
|
// 1MiB chunks
|
||||||
|
let mut buf = vec![0u8; 65536 * 8];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match reader.read(&mut buf).await {
|
||||||
|
Ok(0) => break,
|
||||||
|
Ok(n) => {
|
||||||
|
if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = tx.send(Err(e)).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let body = Body::from_stream(ReceiverStream::new(rx));
|
||||||
|
(StatusCode::OK, [(header::CONTENT_TYPE, mime)], body).into_response()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user