Serve item bytes & fields
Some checks failed
CI / Typos (push) Successful in 27s
CI / Clippy (push) Failing after 59s
CI / Build and test (push) Failing after 4m45s

This commit is contained in:
2026-03-10 17:22:43 -07:00
parent a05cf9da01
commit 614d3273f0
3 changed files with 203 additions and 16 deletions

View File

@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use pile_config::{ConfigToml, Label, Source}; use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use serde_json::Value;
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
use thiserror::Error; use thiserror::Error;
@@ -9,7 +10,8 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{debug, info, trace, warn}; use tracing::{debug, info, trace, warn};
use crate::{ use crate::{
DataSource, Item, DataSource, Item, PileValue,
extract::MetaExtractor,
index::{DbFtsIndex, FtsLookupResult}, index::{DbFtsIndex, FtsLookupResult},
path_ts_earliest, path_ts_earliest,
source::{DirDataSource, S3DataSource}, source::{DirDataSource, S3DataSource},
@@ -170,6 +172,25 @@ impl Datasets {
self.sources.get(source)?.get(key).await self.sources.get(source)?.get(key).await
} }
/// Extract a field from an item by object path.
/// Returns `None` if the item or field is not found.
pub async fn get_field(
&self,
source: &Label,
key: &str,
path: &ObjectPath,
) -> Result<Option<Value>, std::io::Error> {
let Some(item) = self.get(source, key).await else {
return Ok(None);
};
let extractor = MetaExtractor::new(&item);
let root = PileValue::Extractor(Arc::new(extractor));
let Some(value) = root.query(path).await? else {
return Ok(None);
};
Ok(Some(value.to_json().await?))
}
// //
// MARK: fts // MARK: fts
// //

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::Args; use clap::Args;
use pile_config::Label; use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::{Datasets, PileValue, extract::MetaExtractor}; use pile_dataset::{Datasets, PileValue, extract::MetaExtractor};
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
@@ -15,6 +15,10 @@ pub struct ProbeCommand {
/// Item key within the source /// Item key within the source
key: String, key: String,
/// If present, extract a specific field
#[arg(long, short = 'p')]
path: Option<String>,
/// Path to dataset config /// Path to dataset config
#[arg(long, short = 'c', default_value = "./pile.toml")] #[arg(long, short = 'c', default_value = "./pile.toml")]
config: PathBuf, config: PathBuf,
@@ -34,15 +38,28 @@ impl CliCmd for ProbeCommand {
let ds = Datasets::open(&self.config) let ds = Datasets::open(&self.config)
.with_context(|| format!("while opening dataset for {}", self.config.display()))?; .with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let item = ds.get(&source, &self.key).await.ok_or_else(|| { let json = if let Some(path_str) = self.path {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source) let path: ObjectPath = path_str
})?; .parse()
.with_context(|| format!("invalid path {path_str:?}"))?;
let value = PileValue::Extractor(Arc::new(MetaExtractor::new(&item))); ds.get_field(&source, &self.key, &path)
let json = value .await
.to_json() .with_context(|| format!("while extracting {}", self.key))?
.await .ok_or_else(|| {
.with_context(|| format!("while extracting {}", self.key))?; anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?
} else {
let item = ds.get(&source, &self.key).await.ok_or_else(|| {
anyhow::anyhow!("{:?} not found in source {:?}", self.key, self.source)
})?;
let value = PileValue::Extractor(Arc::new(MetaExtractor::new(&item)));
value
.to_json()
.await
.with_context(|| format!("while extracting {}", self.key))?
};
let json = serde_json::to_string_pretty(&json).unwrap(); let json = serde_json::to_string_pretty(&json).unwrap();
println!("{json}"); println!("{json}");

View File

@@ -1,12 +1,15 @@
use axum::{ use axum::{
Json, Router, Json, Router,
extract::{DefaultBodyLimit, State}, extract::{DefaultBodyLimit, Query, State},
http::StatusCode, http::{StatusCode, header},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
routing::post, routing::{get, post},
}; };
use pile_config::{Label, objectpath::ObjectPath};
use pile_dataset::{AsyncReader, PileValue, extract::MetaExtractor};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tracing::debug;
use utoipa::{OpenApi, ToSchema}; use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi; use utoipa_swagger_ui::SwaggerUi;
@@ -15,8 +18,8 @@ use crate::command::serve::cli::ServeState;
#[derive(OpenApi)] #[derive(OpenApi)]
#[openapi( #[openapi(
tags(), tags(),
paths(lookup), paths(lookup, item_get, field_get),
components(schemas(LookupRequest, LookupResponse, LookupResult)) components(schemas(LookupRequest, LookupResponse, LookupResult, ItemQuery, FieldQuery))
)] )]
pub(super) struct Api; pub(super) struct Api;
@@ -27,6 +30,8 @@ pub(super) fn router(state: Arc<ServeState>) -> Router<()> {
Router::new() Router::new()
.route("/lookup", post(lookup)) .route("/lookup", post(lookup))
.route("/item", get(item_get))
.route("/field", get(field_get))
.merge(docs) .merge(docs)
.with_state(state) .with_state(state)
.layer(DefaultBodyLimit::max(32 * 1024 * 1024)) .layer(DefaultBodyLimit::max(32 * 1024 * 1024))
@@ -69,6 +74,12 @@ pub struct LookupResult {
) )
)] )]
async fn lookup(State(state): State<Arc<ServeState>>, Json(body): Json<LookupRequest>) -> Response { async fn lookup(State(state): State<Arc<ServeState>>, Json(body): Json<LookupRequest>) -> Response {
debug!(
message = "Serving /lookup",
query = body.query,
limit = body.limit.unwrap_or(10)
);
let results: Vec<LookupResult> = let results: Vec<LookupResult> =
match state.ds.fts_lookup(&body.query, body.limit.unwrap_or(10)) { match state.ds.fts_lookup(&body.query, body.limit.unwrap_or(10)) {
Ok(x) => x Ok(x) => x
@@ -87,3 +98,141 @@ async fn lookup(State(state): State<Arc<ServeState>>, Json(body): Json<LookupReq
return (StatusCode::OK, Json(LookupResponse { results })).into_response(); return (StatusCode::OK, Json(LookupResponse { results })).into_response();
} }
//
// MARK: item
//
#[derive(Deserialize, ToSchema)]
struct ItemQuery {
source: String,
key: String,
}
/// Fetch the raw bytes of an item by source and key
#[utoipa::path(
get,
path = "/item",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
),
responses(
(status = 200, description = "Raw item bytes"),
(status = 400, description = "Invalid source label"),
(status = 404, description = "Item not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn item_get(
State(state): State<Arc<ServeState>>,
Query(params): Query<ItemQuery>,
) -> Response {
debug!(
message = "Serving /item",
source = params.source,
key = params.key
);
let label = match Label::try_from(params.source) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.ds.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let mime = item.mime().to_string();
let mut reader = match item.read().await {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
match reader.read_to_end().await {
Ok(bytes) => (StatusCode::OK, [(header::CONTENT_TYPE, mime)], bytes).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
}
}
//
// MARK: field
//
#[derive(Deserialize, ToSchema)]
struct FieldQuery {
source: String,
key: String,
path: String,
}
/// Extract a specific field from an item's metadata
#[utoipa::path(
get,
path = "/field",
params(
("source" = String, Query, description = "Source label"),
("key" = String, Query, description = "Item key"),
("path" = String, Query, description = "Object path (e.g. $.flac.title)"),
),
responses(
(status = 200, description = "Field value as JSON"),
(status = 400, description = "Invalid source label or path"),
(status = 404, description = "Item or field not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn field_get(
State(state): State<Arc<ServeState>>,
Query(params): Query<FieldQuery>,
) -> Response {
debug!(
message = "Serving /field",
source = params.source,
key = params.key,
path = params.path,
);
let label = match Label::try_from(params.source) {
Ok(l) => l,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let path: ObjectPath = match params.path.parse() {
Ok(p) => p,
Err(e) => return (StatusCode::BAD_REQUEST, format!("{e:?}")).into_response(),
};
let Some(item) = state.ds.get(&label, &params.key).await else {
return StatusCode::NOT_FOUND.into_response();
};
let extractor = MetaExtractor::new(&item);
let root: PileValue<'_> = PileValue::Extractor(Arc::new(extractor));
let value = match root.query(&path).await {
Ok(Some(v)) => v,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
};
match value {
PileValue::String(s) => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain")],
s.to_string(),
)
.into_response(),
PileValue::Blob { mime, bytes } => (
StatusCode::OK,
[(header::CONTENT_TYPE, mime.to_string())],
bytes.as_ref().clone(),
)
.into_response(),
_ => match value.to_json().await {
Ok(json) => (StatusCode::OK, Json(json)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
},
}
}