Refactor grouping
This commit is contained in:
@@ -11,8 +11,8 @@ workspace = true
|
||||
pile-config = { workspace = true }
|
||||
pile-toolbox = { workspace = true }
|
||||
pile-value = { workspace = true }
|
||||
pile-io = { workspace = true }
|
||||
|
||||
regex = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tantivy = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
@@ -20,7 +20,7 @@ chrono = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
serde = { workspace = true, optional = true }
|
||||
axum = { workspace = true, optional = true }
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{
|
||||
ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern,
|
||||
ConfigToml, DatasetConfig, Label, Source, default_base, default_files, objectpath::ObjectPath,
|
||||
};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::{
|
||||
@@ -8,6 +8,7 @@ use pile_value::{
|
||||
source::{DataSource, DirDataSource, misc::path_ts_earliest},
|
||||
value::{Item, PileValue},
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde_json::Value;
|
||||
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
|
||||
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
||||
@@ -107,7 +108,8 @@ impl Datasets {
|
||||
Source::Filesystem {
|
||||
enabled: true,
|
||||
path: path_parent.clone(),
|
||||
pattern: GroupPattern::default(),
|
||||
base_pattern: default_base(),
|
||||
files: default_files(),
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
@@ -125,18 +127,37 @@ impl Datasets {
|
||||
Source::Filesystem {
|
||||
enabled,
|
||||
path,
|
||||
pattern,
|
||||
base_pattern,
|
||||
files,
|
||||
} => {
|
||||
let target = match enabled {
|
||||
true => &mut sources,
|
||||
false => &mut disabled_sources,
|
||||
};
|
||||
|
||||
let base_regex = Regex::new(base_pattern).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid base_pattern: {e}"),
|
||||
)
|
||||
})?;
|
||||
if base_regex.captures_len() != 2 {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"base_pattern must have exactly one capture group",
|
||||
));
|
||||
}
|
||||
|
||||
target.insert(
|
||||
label.clone(),
|
||||
Dataset::Dir(
|
||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||
.await?,
|
||||
DirDataSource::new(
|
||||
label,
|
||||
path_parent.join(path),
|
||||
base_regex,
|
||||
files.clone(),
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -194,18 +215,37 @@ impl Datasets {
|
||||
Source::Filesystem {
|
||||
enabled,
|
||||
path,
|
||||
pattern,
|
||||
base_pattern,
|
||||
files,
|
||||
} => {
|
||||
let target = match enabled {
|
||||
true => &mut sources,
|
||||
false => &mut disabled_sources,
|
||||
};
|
||||
|
||||
let base_regex = Regex::new(base_pattern).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid base_pattern: {e}"),
|
||||
)
|
||||
})?;
|
||||
if base_regex.captures_len() != 2 {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"base_pattern must have exactly one capture group",
|
||||
));
|
||||
}
|
||||
|
||||
target.insert(
|
||||
label.clone(),
|
||||
Dataset::Dir(
|
||||
DirDataSource::new(label, path_parent.join(path), pattern.clone())
|
||||
.await?,
|
||||
DirDataSource::new(
|
||||
label,
|
||||
path_parent.join(path),
|
||||
base_regex,
|
||||
files.clone(),
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -245,7 +245,7 @@ async fn val_to_string(
|
||||
PileValue::Null => {}
|
||||
PileValue::ObjectExtractor(_) => {}
|
||||
PileValue::Item(_) => {}
|
||||
PileValue::Blob { .. } => {}
|
||||
PileValue::Binary(_) => {}
|
||||
}
|
||||
|
||||
return Ok(Vec::new());
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
use axum::{
|
||||
Json,
|
||||
body::Body,
|
||||
extract::{Query, RawQuery, State},
|
||||
http::{StatusCode, header},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use percent_encoding::percent_decode_str;
|
||||
use pile_config::{Label, objectpath::ObjectPath};
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
@@ -141,15 +146,30 @@ pub async fn get_extract(
|
||||
s.to_string(),
|
||||
)
|
||||
.into_response(),
|
||||
PileValue::Blob { mime, bytes } => (
|
||||
StatusCode::OK,
|
||||
[
|
||||
(header::CONTENT_TYPE, mime.to_string()),
|
||||
(header::CONTENT_DISPOSITION, disposition),
|
||||
],
|
||||
bytes.as_ref().clone(),
|
||||
)
|
||||
.into_response(),
|
||||
|
||||
PileValue::Binary(binary) => {
|
||||
let mime = binary.mime().to_string();
|
||||
let body = match binary {
|
||||
BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()),
|
||||
BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await {
|
||||
Ok(file) => Body::from_stream(ReaderStream::new(file)),
|
||||
Err(e) => {
|
||||
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}"))
|
||||
.into_response();
|
||||
}
|
||||
},
|
||||
};
|
||||
(
|
||||
StatusCode::OK,
|
||||
[
|
||||
(header::CONTENT_TYPE, mime),
|
||||
(header::CONTENT_DISPOSITION, disposition),
|
||||
],
|
||||
body,
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
_ => match value.to_json(&extract_state).await {
|
||||
Ok(json) => (
|
||||
StatusCode::OK,
|
||||
|
||||
@@ -1,202 +0,0 @@
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Query, State},
|
||||
http::{HeaderMap, StatusCode, header},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_io::{AsyncReader, AsyncSeekReader};
|
||||
use serde::Deserialize;
|
||||
use std::{io::SeekFrom, sync::Arc, time::Instant};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::debug;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::Datasets;
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct ItemQuery {
|
||||
source: String,
|
||||
key: String,
|
||||
|
||||
#[serde(default)]
|
||||
download: bool,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
/// Parse a `Range: bytes=...` header value.
|
||||
/// Returns `(start, end)` where either may be `None` (suffix form has `None` start).
|
||||
fn parse_byte_range(s: &str) -> Option<(Option<u64>, Option<u64>)> {
|
||||
let spec = s.strip_prefix("bytes=")?;
|
||||
if spec.contains(',') {
|
||||
return None; // multiple ranges not supported
|
||||
}
|
||||
if let Some(suffix) = spec.strip_prefix('-') {
|
||||
return Some((None, Some(suffix.parse().ok()?)));
|
||||
}
|
||||
let mut parts = spec.splitn(2, '-');
|
||||
let start: u64 = parts.next()?.parse().ok()?;
|
||||
let end = parts
|
||||
.next()
|
||||
.and_then(|e| if e.is_empty() { None } else { e.parse().ok() });
|
||||
Some((Some(start), end))
|
||||
}
|
||||
|
||||
/// Fetch the raw bytes of an item by source and key
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/item",
|
||||
params(
|
||||
("source" = String, Query, description = "Source label"),
|
||||
("key" = String, Query, description = "Item key"),
|
||||
("name" = Option<String>, Query, description = "Downloaded filename; defaults to the last segment of the key"),
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Raw item bytes"),
|
||||
(status = 206, description = "Partial content"),
|
||||
(status = 400, description = "Invalid source label"),
|
||||
(status = 404, description = "Item not found"),
|
||||
(status = 416, description = "Range not satisfiable"),
|
||||
(status = 500, description = "Internal server error"),
|
||||
)
|
||||
)]
|
||||
pub async fn item_get(
|
||||
State(state): State<Arc<Datasets>>,
|
||||
Query(params): Query<ItemQuery>,
|
||||
headers: HeaderMap,
|
||||
) -> Response {
|
||||
let start = Instant::now();
|
||||
debug!(
|
||||
message = "Serving /item",
|
||||
source = params.source,
|
||||
key = params.key
|
||||
);
|
||||
|
||||
let label = match Label::try_from(params.source.clone()) {
|
||||
Ok(l) => l,
|
||||
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
|
||||
};
|
||||
|
||||
let Some(item) = state.get(&label, ¶ms.key).await else {
|
||||
return StatusCode::NOT_FOUND.into_response();
|
||||
};
|
||||
|
||||
let mime = item.mime().to_string();
|
||||
|
||||
let mut reader = match item.read().await {
|
||||
Ok(r) => r,
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
};
|
||||
|
||||
let total = match reader.seek(SeekFrom::End(0)).await {
|
||||
Ok(n) => n,
|
||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||
};
|
||||
|
||||
let range = headers
|
||||
.get(header::RANGE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(parse_byte_range);
|
||||
|
||||
// Resolve (byte_start, byte_end, content_length, is_range)
|
||||
let (byte_start, byte_end, length, is_range) = match range {
|
||||
Some((Some(s), e)) => {
|
||||
let e = e
|
||||
.unwrap_or(total.saturating_sub(1))
|
||||
.min(total.saturating_sub(1));
|
||||
if s >= total || s > e {
|
||||
return (
|
||||
StatusCode::RANGE_NOT_SATISFIABLE,
|
||||
[(header::CONTENT_RANGE, format!("bytes */{total}"))],
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
(s, e, e - s + 1, true)
|
||||
}
|
||||
Some((None, Some(suffix))) => {
|
||||
let s = total.saturating_sub(suffix);
|
||||
let e = total.saturating_sub(1);
|
||||
(s, e, total.saturating_sub(s), true)
|
||||
}
|
||||
_ => (0, total.saturating_sub(1), total, false),
|
||||
};
|
||||
|
||||
if let Err(e) = reader.seek(SeekFrom::Start(byte_start)).await {
|
||||
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response();
|
||||
}
|
||||
|
||||
debug!(
|
||||
message = "Served /item",
|
||||
source = params.source,
|
||||
key = params.key,
|
||||
time_ms = start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(8);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let mut remaining = length;
|
||||
loop {
|
||||
if remaining == 0 {
|
||||
break;
|
||||
}
|
||||
let to_read = (buf.len() as u64).min(remaining) as usize;
|
||||
match reader.read(&mut buf[..to_read]).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
remaining -= n as u64;
|
||||
if tx.send(Ok(buf[..n].to_vec())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(e)).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let body = Body::from_stream(ReceiverStream::new(rx));
|
||||
let status = if is_range {
|
||||
StatusCode::PARTIAL_CONTENT
|
||||
} else {
|
||||
StatusCode::OK
|
||||
};
|
||||
|
||||
let disposition_type = if params.download {
|
||||
"attachment"
|
||||
} else {
|
||||
"inline"
|
||||
};
|
||||
let file_name = params.name.unwrap_or_else(|| {
|
||||
params
|
||||
.key
|
||||
.rsplit('/')
|
||||
.next()
|
||||
.unwrap_or(¶ms.key)
|
||||
.to_owned()
|
||||
});
|
||||
let disposition = format!("{disposition_type}; filename=\"{file_name}\"");
|
||||
|
||||
let mut builder = axum::http::Response::builder()
|
||||
.status(status)
|
||||
.header(header::CONTENT_TYPE, mime)
|
||||
.header(header::ACCEPT_RANGES, "bytes")
|
||||
.header(header::CONTENT_LENGTH, length)
|
||||
.header(header::CONTENT_DISPOSITION, disposition);
|
||||
|
||||
if is_range {
|
||||
builder = builder.header(
|
||||
header::CONTENT_RANGE,
|
||||
format!("bytes {byte_start}-{byte_end}/{total}"),
|
||||
);
|
||||
}
|
||||
|
||||
builder
|
||||
.body(body)
|
||||
.map(IntoResponse::into_response)
|
||||
.unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response())
|
||||
}
|
||||
@@ -11,9 +11,6 @@ use crate::Datasets;
|
||||
mod lookup;
|
||||
pub use lookup::*;
|
||||
|
||||
mod item;
|
||||
pub use item::*;
|
||||
|
||||
mod extract;
|
||||
pub use extract::*;
|
||||
|
||||
@@ -34,7 +31,6 @@ pub use schema::*;
|
||||
tags(),
|
||||
paths(
|
||||
lookup,
|
||||
item_get,
|
||||
get_extract,
|
||||
items_list,
|
||||
config_schema,
|
||||
@@ -45,7 +41,6 @@ pub use schema::*;
|
||||
LookupRequest,
|
||||
LookupResponse,
|
||||
LookupResult,
|
||||
ItemQuery,
|
||||
ExtractQuery,
|
||||
ItemsQuery,
|
||||
ItemsResponse,
|
||||
@@ -64,7 +59,6 @@ impl Datasets {
|
||||
pub fn router_prefix(self: Arc<Self>, with_docs: bool, prefix: Option<&str>) -> Router<()> {
|
||||
let mut router = Router::new()
|
||||
.route("/lookup", post(lookup))
|
||||
.route("/item", get(item_get))
|
||||
.route("/extract", get(get_extract))
|
||||
.route("/items", get(items_list))
|
||||
.route("/config/schema", get(config_schema))
|
||||
|
||||
@@ -44,9 +44,9 @@ async fn pile_value_to_api(
|
||||
PileValue::I64(n) => Ok(ApiValue::Number(n.into())),
|
||||
PileValue::Null => Ok(ApiValue::Null),
|
||||
|
||||
PileValue::Blob { mime, .. } => Ok(ApiValue::Binary {
|
||||
PileValue::Binary(x) => Ok(ApiValue::Binary {
|
||||
binary: true,
|
||||
mime: mime.to_string(),
|
||||
mime: x.mime().to_string(),
|
||||
}),
|
||||
|
||||
PileValue::Array(arr) => {
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
use axum::{
|
||||
Json,
|
||||
body::Body,
|
||||
extract::{Path, Query, State},
|
||||
http::{StatusCode, header},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||
use pile_value::{
|
||||
extract::traits::ExtractState,
|
||||
value::{BinaryPileValue, PileValue},
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
@@ -125,15 +130,30 @@ pub async fn schema_field(
|
||||
s.to_string(),
|
||||
)
|
||||
.into_response(),
|
||||
PileValue::Blob { mime, bytes } => (
|
||||
StatusCode::OK,
|
||||
[
|
||||
(header::CONTENT_TYPE, mime.to_string()),
|
||||
(header::CONTENT_DISPOSITION, disposition),
|
||||
],
|
||||
bytes.as_ref().clone(),
|
||||
)
|
||||
.into_response(),
|
||||
|
||||
PileValue::Binary(binary) => {
|
||||
let mime = binary.mime().to_string();
|
||||
let body = match binary {
|
||||
BinaryPileValue::Blob { bytes, .. } => Body::from(bytes.0.to_vec()),
|
||||
BinaryPileValue::File { path, .. } => match tokio::fs::File::open(&path).await {
|
||||
Ok(file) => Body::from_stream(ReaderStream::new(file)),
|
||||
Err(e) => {
|
||||
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}"))
|
||||
.into_response();
|
||||
}
|
||||
},
|
||||
};
|
||||
(
|
||||
StatusCode::OK,
|
||||
[
|
||||
(header::CONTENT_TYPE, mime),
|
||||
(header::CONTENT_DISPOSITION, disposition),
|
||||
],
|
||||
body,
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
_ => match value.to_json(&extract_state).await {
|
||||
Ok(json) => (
|
||||
StatusCode::OK,
|
||||
|
||||
Reference in New Issue
Block a user