Compare commits
3 Commits
95a547045d
...
a2079877fd
| Author | SHA1 | Date | |
|---|---|---|---|
| a2079877fd | |||
| 32aedb9dc1 | |||
| 6dd806b246 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2485,6 +2485,7 @@ version = "0.0.2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anstyle",
|
"anstyle",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"aws-sdk-s3",
|
||||||
"axum",
|
"axum",
|
||||||
"clap",
|
"clap",
|
||||||
"indicatif",
|
"indicatif",
|
||||||
|
|||||||
@@ -51,6 +51,10 @@ pub struct S3Credentials {
|
|||||||
pub enum Source {
|
pub enum Source {
|
||||||
/// A directory of files
|
/// A directory of files
|
||||||
Filesystem {
|
Filesystem {
|
||||||
|
/// If false, ignore this dataset
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
enabled: bool,
|
||||||
|
|
||||||
/// The directories to scan.
|
/// The directories to scan.
|
||||||
/// Must be relative.
|
/// Must be relative.
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
@@ -66,6 +70,10 @@ pub enum Source {
|
|||||||
|
|
||||||
/// An S3-compatible object store bucket
|
/// An S3-compatible object store bucket
|
||||||
S3 {
|
S3 {
|
||||||
|
/// If false, ignore this dataset
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
enabled: bool,
|
||||||
|
|
||||||
bucket: String,
|
bucket: String,
|
||||||
prefix: Option<String>,
|
prefix: Option<String>,
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
|
|||||||
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
|
use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::{
|
use pile_value::{
|
||||||
|
extract::traits::ExtractState,
|
||||||
source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest},
|
source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
@@ -114,7 +115,15 @@ impl Datasets {
|
|||||||
let mut sources = HashMap::new();
|
let mut sources = HashMap::new();
|
||||||
for (label, source) in &config.dataset.source {
|
for (label, source) in &config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem { path, sidecars } => {
|
Source::Filesystem {
|
||||||
|
enabled,
|
||||||
|
path,
|
||||||
|
sidecars,
|
||||||
|
} => {
|
||||||
|
if !enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
sources.insert(
|
sources.insert(
|
||||||
label.clone(),
|
label.clone(),
|
||||||
Dataset::Dir(Arc::new(DirDataSource::new(
|
Dataset::Dir(Arc::new(DirDataSource::new(
|
||||||
@@ -126,6 +135,7 @@ impl Datasets {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Source::S3 {
|
Source::S3 {
|
||||||
|
enabled,
|
||||||
bucket,
|
bucket,
|
||||||
prefix,
|
prefix,
|
||||||
endpoint,
|
endpoint,
|
||||||
@@ -133,6 +143,10 @@ impl Datasets {
|
|||||||
credentials,
|
credentials,
|
||||||
sidecars,
|
sidecars,
|
||||||
} => {
|
} => {
|
||||||
|
if !enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
match S3DataSource::new(
|
match S3DataSource::new(
|
||||||
label,
|
label,
|
||||||
bucket.clone(),
|
bucket.clone(),
|
||||||
@@ -174,6 +188,7 @@ impl Datasets {
|
|||||||
/// Returns `None` if the item or field is not found.
|
/// Returns `None` if the item or field is not found.
|
||||||
pub async fn get_field(
|
pub async fn get_field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
source: &Label,
|
source: &Label,
|
||||||
key: &str,
|
key: &str,
|
||||||
path: &ObjectPath,
|
path: &ObjectPath,
|
||||||
@@ -183,11 +198,11 @@ impl Datasets {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let item = PileValue::Item(item);
|
let item = PileValue::Item(item);
|
||||||
let Some(value) = item.query(path).await? else {
|
let Some(value) = item.query(state, path).await? else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Some(value.to_json().await?))
|
Ok(Some(value.to_json(state).await?))
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -197,6 +212,7 @@ impl Datasets {
|
|||||||
/// Refresh this dataset's fts index.
|
/// Refresh this dataset's fts index.
|
||||||
pub async fn fts_refresh(
|
pub async fn fts_refresh(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
_threads: usize,
|
_threads: usize,
|
||||||
flag: Option<CancelFlag>,
|
flag: Option<CancelFlag>,
|
||||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||||
@@ -252,9 +268,10 @@ impl Datasets {
|
|||||||
|
|
||||||
let item = item_result.map_err(DatasetError::from)?;
|
let item = item_result.map_err(DatasetError::from)?;
|
||||||
let db = Arc::clone(&db_index);
|
let db = Arc::clone(&db_index);
|
||||||
|
let state = state.clone();
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let key = item.key();
|
let key = item.key();
|
||||||
let result = db.entry_to_document(&item).await;
|
let result = db.entry_to_document(&state, &item).await;
|
||||||
(key, result)
|
(key, result)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
use pile_config::{ConfigToml, DatasetFts, Label};
|
use pile_config::{ConfigToml, DatasetFts, Label};
|
||||||
use pile_value::value::{Item, PileValue};
|
use pile_value::{
|
||||||
|
extract::traits::ExtractState,
|
||||||
|
value::{Item, PileValue},
|
||||||
|
};
|
||||||
use std::{path::PathBuf, sync::LazyLock};
|
use std::{path::PathBuf, sync::LazyLock};
|
||||||
use tantivy::{
|
use tantivy::{
|
||||||
DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError,
|
DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError,
|
||||||
@@ -63,6 +66,7 @@ impl DbFtsIndex {
|
|||||||
/// Turn an entry into a tantivy document
|
/// Turn an entry into a tantivy document
|
||||||
pub async fn entry_to_document(
|
pub async fn entry_to_document(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
item: &Item,
|
item: &Item,
|
||||||
) -> Result<Option<TantivyDocument>, TantivyError> {
|
) -> Result<Option<TantivyDocument>, TantivyError> {
|
||||||
let mut doc = TantivyDocument::default();
|
let mut doc = TantivyDocument::default();
|
||||||
@@ -75,7 +79,7 @@ impl DbFtsIndex {
|
|||||||
|
|
||||||
let mut empty = true;
|
let mut empty = true;
|
||||||
for name in self.fts_cfg().fields.keys() {
|
for name in self.fts_cfg().fields.keys() {
|
||||||
let x = self.get_field(&item, name).await?;
|
let x = self.get_field(state, &item, name).await?;
|
||||||
|
|
||||||
let val = match x {
|
let val = match x {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
@@ -99,6 +103,7 @@ impl DbFtsIndex {
|
|||||||
|
|
||||||
pub async fn get_field(
|
pub async fn get_field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
extractor: &PileValue,
|
extractor: &PileValue,
|
||||||
field_name: &Label,
|
field_name: &Label,
|
||||||
) -> Result<Option<String>, std::io::Error> {
|
) -> Result<Option<String>, std::io::Error> {
|
||||||
@@ -112,7 +117,7 @@ impl DbFtsIndex {
|
|||||||
|
|
||||||
// Try paths in order, using the first value we find
|
// Try paths in order, using the first value we find
|
||||||
'outer: for path in field.path.as_slice() {
|
'outer: for path in field.path.as_slice() {
|
||||||
let val = match extractor.query(path).await? {
|
let val = match extractor.query(state, path).await? {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => return Ok(None),
|
None => return Ok(None),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use axum::{
|
|||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
use pile_config::{Label, objectpath::ObjectPath};
|
use pile_config::{Label, objectpath::ObjectPath};
|
||||||
use pile_value::value::PileValue;
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
@@ -62,8 +62,10 @@ pub async fn get_field(
|
|||||||
return StatusCode::NOT_FOUND.into_response();
|
return StatusCode::NOT_FOUND.into_response();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
let item = PileValue::Item(item);
|
let item = PileValue::Item(item);
|
||||||
let value = match item.query(&path).await {
|
let value = match item.query(&state, &path).await {
|
||||||
Ok(Some(v)) => v,
|
Ok(Some(v)) => v,
|
||||||
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
|
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
|
||||||
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
@@ -90,7 +92,7 @@ pub async fn get_field(
|
|||||||
bytes.as_ref().clone(),
|
bytes.as_ref().clone(),
|
||||||
)
|
)
|
||||||
.into_response(),
|
.into_response(),
|
||||||
_ => match value.to_json().await {
|
_ => match value.to_json(&state).await {
|
||||||
Ok(json) => (StatusCode::OK, Json(json)).into_response(),
|
Ok(json) => (StatusCode::OK, Json(json)).into_response(),
|
||||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")).into_response(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -82,6 +82,7 @@ impl EpubMetaExtractor {
|
|||||||
impl ObjectExtractor for EpubMetaExtractor {
|
impl ObjectExtractor for EpubMetaExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -89,6 +90,10 @@ impl ObjectExtractor for EpubMetaExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/epub+zip" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -92,6 +92,7 @@ fn strip_html(html: &str) -> String {
|
|||||||
impl ObjectExtractor for EpubTextExtractor {
|
impl ObjectExtractor for EpubTextExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -99,6 +100,10 @@ impl ObjectExtractor for EpubTextExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/epub+zip" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ mod epub_text;
|
|||||||
pub use epub_text::*;
|
pub use epub_text::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -30,13 +30,14 @@ impl EpubExtractor {
|
|||||||
impl ObjectExtractor for EpubExtractor {
|
impl ObjectExtractor for EpubExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &pile_config::Label,
|
name: &pile_config::Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
match (name.as_str(), args) {
|
match (name.as_str(), args) {
|
||||||
("text", args) => Ok(Some(
|
("text", args) => Ok(Some(
|
||||||
self.text
|
self.text
|
||||||
.field(name, args)
|
.field(state, name, args)
|
||||||
.await
|
.await
|
||||||
.map(|x| x.unwrap_or(PileValue::Null))?,
|
.map(|x| x.unwrap_or(PileValue::Null))?,
|
||||||
)),
|
)),
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -87,6 +87,7 @@ fn tag_to_label(tag: &str) -> Option<Label> {
|
|||||||
impl ObjectExtractor for ExifExtractor {
|
impl ObjectExtractor for ExifExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -100,6 +101,10 @@ impl ObjectExtractor for ExifExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().type_() != mime::IMAGE {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::{ListExtractor, ObjectExtractor},
|
extract::traits::{ExtractState, ListExtractor, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -46,16 +46,32 @@ impl FlacImagesExtractor {
|
|||||||
|
|
||||||
return Ok(count);
|
return Ok(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mime_ok(&self, state: &ExtractState) -> bool {
|
||||||
|
if state.ignore_mime {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
let essence = self.item.mime().essence_str();
|
||||||
|
essence == "audio/flac" || essence == "audio/x-flac"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ListExtractor for FlacImagesExtractor {
|
impl ListExtractor for FlacImagesExtractor {
|
||||||
async fn get<'a>(&'a self, mut idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
async fn get(
|
||||||
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
|
mut idx: usize,
|
||||||
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
trace!(
|
trace!(
|
||||||
key = self.item.key().as_str(),
|
key = self.item.key().as_str(),
|
||||||
"Getting index {idx} from FlacImagesExtractor",
|
"Getting index {idx} from FlacImagesExtractor",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if !self.mime_ok(state) {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
let key = self.item.key();
|
let key = self.item.key();
|
||||||
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
let reader = SyncReadBridge::new_current(self.item.read().await?);
|
||||||
let image = tokio::task::spawn_blocking(move || {
|
let image = tokio::task::spawn_blocking(move || {
|
||||||
@@ -98,7 +114,11 @@ impl ListExtractor for FlacImagesExtractor {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
async fn len(&self, state: &ExtractState) -> Result<usize, std::io::Error> {
|
||||||
|
if !self.mime_ok(state) {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(x) = self.cached_count.get() {
|
if let Some(x) = self.cached_count.get() {
|
||||||
return Ok(*x);
|
return Ok(*x);
|
||||||
}
|
}
|
||||||
@@ -178,12 +198,21 @@ impl FlacExtractor {
|
|||||||
|
|
||||||
return Ok(self.output.get_or_init(|| output));
|
return Ok(self.output.get_or_init(|| output));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mime_ok(&self, state: &ExtractState) -> bool {
|
||||||
|
if state.ignore_mime {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
let essence = self.item.mime().essence_str();
|
||||||
|
essence == "audio/flac" || essence == "audio/x-flac"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ObjectExtractor for FlacExtractor {
|
impl ObjectExtractor for FlacExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -191,6 +220,10 @@ impl ObjectExtractor for FlacExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !self.mime_ok(state) {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
if name.as_str() == "images" {
|
if name.as_str() == "images" {
|
||||||
return Ok(Some(self.images.clone()));
|
return Ok(Some(self.images.clone()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
@@ -88,6 +88,7 @@ impl FsExtractor {
|
|||||||
impl ObjectExtractor for FsExtractor {
|
impl ObjectExtractor for FsExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -123,6 +123,7 @@ fn frame_id_to_field(id: &str) -> Cow<'static, str> {
|
|||||||
impl ObjectExtractor for Id3Extractor {
|
impl ObjectExtractor for Id3Extractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -130,6 +131,10 @@ impl ObjectExtractor for Id3Extractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "audio/mpeg" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,7 +26,10 @@ mod sidecar;
|
|||||||
pub use sidecar::*;
|
pub use sidecar::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::{misc::MapExtractor, traits::ObjectExtractor},
|
extract::{
|
||||||
|
misc::MapExtractor,
|
||||||
|
traits::{ExtractState, ObjectExtractor},
|
||||||
|
},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -82,10 +85,11 @@ impl ItemExtractor {
|
|||||||
impl ObjectExtractor for ItemExtractor {
|
impl ObjectExtractor for ItemExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &pile_config::Label,
|
name: &pile_config::Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
self.inner.field(name, args).await
|
self.inner.field(state, name, args).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ mod pdf_text;
|
|||||||
pub use pdf_text::*;
|
pub use pdf_text::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -40,6 +40,7 @@ impl PdfExtractor {
|
|||||||
impl ObjectExtractor for PdfExtractor {
|
impl ObjectExtractor for PdfExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &pile_config::Label,
|
name: &pile_config::Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -50,7 +51,7 @@ impl ObjectExtractor for PdfExtractor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
match (name.as_str(), args) {
|
match (name.as_str(), args) {
|
||||||
("text", args) => self.text.field(name, args).await,
|
("text", args) => self.text.field(state, name, args).await,
|
||||||
("meta", None) => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
("meta", None) => Ok(Some(PileValue::ObjectExtractor(self.meta.clone()))),
|
||||||
#[cfg(feature = "pdfium")]
|
#[cfg(feature = "pdfium")]
|
||||||
("pages", None) => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
|
("pages", None) => Ok(Some(PileValue::ListExtractor(self.pages.clone()))),
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -124,12 +124,18 @@ fn format_date(d: &Date) -> String {
|
|||||||
impl ObjectExtractor for PdfMetaExtractor {
|
impl ObjectExtractor for PdfMetaExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
if args.is_some() {
|
if args.is_some() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/pdf" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ListExtractor,
|
extract::traits::{ExtractState, ListExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -34,12 +34,20 @@ impl PdfPagesExtractor {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ListExtractor for PdfPagesExtractor {
|
impl ListExtractor for PdfPagesExtractor {
|
||||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
async fn get(
|
||||||
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
|
idx: usize,
|
||||||
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
trace!(
|
trace!(
|
||||||
key = self.item.key().as_str(),
|
key = self.item.key().as_str(),
|
||||||
"Getting index {idx} from PdfPagesExtractor",
|
"Getting index {idx} from PdfPagesExtractor",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/pdf" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
let bytes = self.get_bytes().await?;
|
let bytes = self.get_bytes().await?;
|
||||||
let png = tokio::task::spawn_blocking(move || {
|
let png = tokio::task::spawn_blocking(move || {
|
||||||
let pdfium = Pdfium::default();
|
let pdfium = Pdfium::default();
|
||||||
@@ -81,7 +89,11 @@ impl ListExtractor for PdfPagesExtractor {
|
|||||||
Ok(Some(value))
|
Ok(Some(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
async fn len(&self, state: &ExtractState) -> Result<usize, std::io::Error> {
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/pdf" {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
let bytes = self.get_bytes().await?;
|
let bytes = self.get_bytes().await?;
|
||||||
let count = tokio::task::spawn_blocking(move || {
|
let count = tokio::task::spawn_blocking(move || {
|
||||||
let pdfium = Pdfium::default();
|
let pdfium = Pdfium::default();
|
||||||
@@ -103,10 +115,10 @@ impl ListExtractor for PdfPagesExtractor {
|
|||||||
|
|
||||||
// Override, extracting all pages is very slow,
|
// Override, extracting all pages is very slow,
|
||||||
// and we can't display binary in json anyway
|
// and we can't display binary in json anyway
|
||||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||||
Ok(serde_json::Value::String(format!(
|
Ok(serde_json::Value::String(format!(
|
||||||
"<PdfPages ({} pages)>",
|
"<PdfPages ({} pages)>",
|
||||||
self.len().await?
|
self.len(state).await?
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::{
|
|||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue, SyncReadBridge},
|
value::{Item, PileValue, SyncReadBridge},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -104,6 +104,7 @@ impl PdfTextExtractor {
|
|||||||
impl ObjectExtractor for PdfTextExtractor {
|
impl ObjectExtractor for PdfTextExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -111,6 +112,10 @@ impl ObjectExtractor for PdfTextExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().essence_str() != "application/pdf" {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use tracing::trace;
|
|||||||
|
|
||||||
use super::TomlExtractor;
|
use super::TomlExtractor;
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -26,6 +26,7 @@ impl SidecarExtractor {
|
|||||||
impl ObjectExtractor for SidecarExtractor {
|
impl ObjectExtractor for SidecarExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -39,7 +40,7 @@ impl ObjectExtractor for SidecarExtractor {
|
|||||||
.output
|
.output
|
||||||
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
.get_or_init(|| self.item.sidecar().map(TomlExtractor::new))
|
||||||
{
|
{
|
||||||
Some(x) => Ok(x.field(name, args).await?),
|
Some(x) => Ok(x.field(state, name, args).await?),
|
||||||
None => Ok(Some(PileValue::Null)),
|
None => Ok(Some(PileValue::Null)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
extract::traits::ObjectExtractor,
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
value::{AsyncReader, Item, PileValue},
|
value::{AsyncReader, Item, PileValue},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -64,6 +64,7 @@ impl TomlExtractor {
|
|||||||
impl ObjectExtractor for TomlExtractor {
|
impl ObjectExtractor for TomlExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -71,6 +72,10 @@ impl ObjectExtractor for TomlExtractor {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !state.ignore_mime && self.item.mime().type_() != mime::TEXT {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.get_inner().await?.get(name).cloned())
|
Ok(self.get_inner().await?.get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::{extract::traits::ListExtractor, value::PileValue};
|
use crate::{
|
||||||
|
extract::traits::{ExtractState, ListExtractor},
|
||||||
|
value::PileValue,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct ArrayExtractor {
|
pub struct ArrayExtractor {
|
||||||
inner: Arc<Vec<PileValue>>,
|
inner: Arc<Vec<PileValue>>,
|
||||||
@@ -14,11 +17,15 @@ impl ArrayExtractor {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ListExtractor for ArrayExtractor {
|
impl ListExtractor for ArrayExtractor {
|
||||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
async fn get(
|
||||||
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
|
idx: usize,
|
||||||
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
Ok(self.inner.get(idx).cloned())
|
Ok(self.inner.get(idx).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
async fn len(&self, _state: &ExtractState) -> Result<usize, std::io::Error> {
|
||||||
Ok(self.inner.len())
|
Ok(self.inner.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
use pile_config::Label;
|
use pile_config::Label;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::{extract::traits::ObjectExtractor, value::PileValue};
|
use crate::{
|
||||||
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
|
value::PileValue,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct MapExtractor {
|
pub struct MapExtractor {
|
||||||
@@ -12,6 +15,7 @@ pub struct MapExtractor {
|
|||||||
impl ObjectExtractor for MapExtractor {
|
impl ObjectExtractor for MapExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
use crate::{extract::traits::ListExtractor, value::PileValue};
|
use crate::{
|
||||||
|
extract::traits::{ExtractState, ListExtractor},
|
||||||
|
value::PileValue,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct VecExtractor {
|
pub struct VecExtractor {
|
||||||
@@ -7,11 +10,15 @@ pub struct VecExtractor {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ListExtractor for VecExtractor {
|
impl ListExtractor for VecExtractor {
|
||||||
async fn get(&self, idx: usize) -> Result<Option<PileValue>, std::io::Error> {
|
async fn get(
|
||||||
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
|
idx: usize,
|
||||||
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
Ok(self.inner.get(idx).cloned())
|
Ok(self.inner.get(idx).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn len(&self) -> Result<usize, std::io::Error> {
|
async fn len(&self, _state: &ExtractState) -> Result<usize, std::io::Error> {
|
||||||
Ok(self.inner.len())
|
Ok(self.inner.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,10 @@ use pile_config::Label;
|
|||||||
use smartstring::{LazyCompact, SmartString};
|
use smartstring::{LazyCompact, SmartString};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::{extract::traits::ObjectExtractor, value::PileValue};
|
use crate::{
|
||||||
|
extract::traits::{ExtractState, ObjectExtractor},
|
||||||
|
value::PileValue,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct StringExtractor {
|
pub struct StringExtractor {
|
||||||
item: Arc<SmartString<LazyCompact>>,
|
item: Arc<SmartString<LazyCompact>>,
|
||||||
@@ -18,6 +21,7 @@ impl StringExtractor {
|
|||||||
impl ObjectExtractor for StringExtractor {
|
impl ObjectExtractor for StringExtractor {
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
_state: &ExtractState,
|
||||||
name: &Label,
|
name: &Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<PileValue>, std::io::Error> {
|
) -> Result<Option<PileValue>, std::io::Error> {
|
||||||
@@ -89,7 +93,10 @@ mod tests {
|
|||||||
|
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
async fn field(ext: &StringExtractor, name: &str, args: Option<&str>) -> Option<PileValue> {
|
async fn field(ext: &StringExtractor, name: &str, args: Option<&str>) -> Option<PileValue> {
|
||||||
ext.field(&Label::new(name).unwrap(), args).await.unwrap()
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
ext.field(&state, &Label::new(name).unwrap(), args)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn string(v: Option<PileValue>) -> Option<String> {
|
fn string(v: Option<PileValue>) -> Option<String> {
|
||||||
|
|||||||
@@ -1,3 +1,13 @@
|
|||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ExtractState {
|
||||||
|
/// If true, extract all fields from all items.
|
||||||
|
/// Do not pre-filter using mime type.
|
||||||
|
///
|
||||||
|
/// This may detect additional fields, but
|
||||||
|
/// makes extraction take much longer
|
||||||
|
pub ignore_mime: bool,
|
||||||
|
}
|
||||||
|
|
||||||
/// An attachment that extracts metadata from an [Item].
|
/// An attachment that extracts metadata from an [Item].
|
||||||
///
|
///
|
||||||
/// Metadata is exposed as an immutable map of {label: value},
|
/// Metadata is exposed as an immutable map of {label: value},
|
||||||
@@ -15,6 +25,7 @@ pub trait ObjectExtractor: Send + Sync {
|
|||||||
/// this fn should return `Ok(Some(None))`.
|
/// this fn should return `Ok(Some(None))`.
|
||||||
async fn field(
|
async fn field(
|
||||||
&self,
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
name: &pile_config::Label,
|
name: &pile_config::Label,
|
||||||
args: Option<&str>,
|
args: Option<&str>,
|
||||||
) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
||||||
@@ -25,15 +36,15 @@ pub trait ObjectExtractor: Send + Sync {
|
|||||||
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
|
async fn fields(&self) -> Result<Vec<pile_config::Label>, std::io::Error>;
|
||||||
|
|
||||||
/// Convert this to a JSON value.
|
/// Convert this to a JSON value.
|
||||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||||
let keys = self.fields().await?;
|
let keys = self.fields().await?;
|
||||||
let mut map = serde_json::Map::new();
|
let mut map = serde_json::Map::new();
|
||||||
for k in &keys {
|
for k in &keys {
|
||||||
let v = match self.field(k, None).await? {
|
let v = match self.field(state, k, None).await? {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => continue,
|
None => continue,
|
||||||
};
|
};
|
||||||
map.insert(k.to_string(), Box::pin(v.to_json()).await?);
|
map.insert(k.to_string(), Box::pin(v.to_json(state)).await?);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(serde_json::Value::Object(map))
|
Ok(serde_json::Value::Object(map))
|
||||||
@@ -49,25 +60,25 @@ pub trait ListExtractor: Send + Sync {
|
|||||||
/// Indices start at zero, and must be consecutive.
|
/// Indices start at zero, and must be consecutive.
|
||||||
/// - returns `None` if `idx` is out of range
|
/// - returns `None` if `idx` is out of range
|
||||||
/// - returns `Some(Null)` if `None` is at `idx`
|
/// - returns `Some(Null)` if `None` is at `idx`
|
||||||
async fn get(&self, idx: usize) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
async fn get(
|
||||||
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
|
idx: usize,
|
||||||
|
) -> Result<Option<crate::value::PileValue>, std::io::Error>;
|
||||||
|
|
||||||
async fn len(&self) -> Result<usize, std::io::Error>;
|
async fn len(&self, state: &ExtractState) -> Result<usize, std::io::Error>;
|
||||||
|
|
||||||
async fn is_empty(&self) -> Result<bool, std::io::Error> {
|
|
||||||
Ok(self.len().await? == 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convert this list to a JSON value.
|
/// Convert this list to a JSON value.
|
||||||
async fn to_json(&self) -> Result<serde_json::Value, std::io::Error> {
|
async fn to_json(&self, state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
|
||||||
let len = self.len().await?;
|
let len = self.len(state).await?;
|
||||||
let mut list = Vec::with_capacity(len);
|
let mut list = Vec::with_capacity(len);
|
||||||
for i in 0..len {
|
for i in 0..len {
|
||||||
#[expect(clippy::expect_used)]
|
#[expect(clippy::expect_used)]
|
||||||
let v = self
|
let v = self
|
||||||
.get(i)
|
.get(state, i)
|
||||||
.await?
|
.await?
|
||||||
.expect("value must be present according to length");
|
.expect("value must be present according to length");
|
||||||
list.push(Box::pin(v.to_json()).await?);
|
list.push(Box::pin(v.to_json(state)).await?);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(serde_json::Value::Array(list))
|
Ok(serde_json::Value::Array(list))
|
||||||
|
|||||||
@@ -92,10 +92,15 @@ impl S3DataSource {
|
|||||||
|
|
||||||
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
|
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
|
||||||
let key: SmartString<LazyCompact> = key.into();
|
let key: SmartString<LazyCompact> = key.into();
|
||||||
let mime = mime_guess::from_path(key.as_str()).first_or_octet_stream();
|
let object_path = match &self.prefix {
|
||||||
|
Some(x) => format!("{x}/{key}").into(),
|
||||||
|
None => key.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream();
|
||||||
|
|
||||||
let sidecar = if self.sidecars {
|
let sidecar = if self.sidecars {
|
||||||
self.find_sidecar_key(key.as_str())
|
self.find_sidecar_key(object_path.as_str())
|
||||||
.await
|
.await
|
||||||
.map(|sidecar_key| {
|
.map(|sidecar_key| {
|
||||||
Box::new(Item::S3 {
|
Box::new(Item::S3 {
|
||||||
@@ -124,11 +129,17 @@ impl DataSource for Arc<S3DataSource> {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let key: SmartString<LazyCompact> = key.into();
|
||||||
|
let key = match &self.prefix {
|
||||||
|
Some(x) => format!("{x}/{key}").into(),
|
||||||
|
None => key,
|
||||||
|
};
|
||||||
|
|
||||||
let result = self
|
let result = self
|
||||||
.client
|
.client
|
||||||
.head_object()
|
.head_object()
|
||||||
.bucket(self.bucket.as_str())
|
.bucket(self.bucket.as_str())
|
||||||
.key(key)
|
.key(key.as_str())
|
||||||
.send()
|
.send()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use crate::{
|
|||||||
item::ItemExtractor,
|
item::ItemExtractor,
|
||||||
misc::{ArrayExtractor, MapExtractor, VecExtractor},
|
misc::{ArrayExtractor, MapExtractor, VecExtractor},
|
||||||
string::StringExtractor,
|
string::StringExtractor,
|
||||||
traits::{ListExtractor, ObjectExtractor},
|
traits::{ExtractState, ListExtractor, ObjectExtractor},
|
||||||
},
|
},
|
||||||
value::Item,
|
value::Item,
|
||||||
};
|
};
|
||||||
@@ -91,7 +91,11 @@ impl PileValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn query(&self, query: &ObjectPath) -> Result<Option<Self>, std::io::Error> {
|
pub async fn query(
|
||||||
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
|
query: &ObjectPath,
|
||||||
|
) -> Result<Option<Self>, std::io::Error> {
|
||||||
let mut out: Option<PileValue> = Some(self.clone());
|
let mut out: Option<PileValue> = Some(self.clone());
|
||||||
|
|
||||||
for s in &query.segments {
|
for s in &query.segments {
|
||||||
@@ -106,7 +110,7 @@ impl PileValue {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
out = e.field(name, args.as_deref()).await?;
|
out = e.field(state, name, args.as_deref()).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
PathSegment::Index(idx) => {
|
PathSegment::Index(idx) => {
|
||||||
@@ -121,7 +125,7 @@ impl PileValue {
|
|||||||
let idx = if *idx >= 0 {
|
let idx = if *idx >= 0 {
|
||||||
usize::try_from(*idx).ok()
|
usize::try_from(*idx).ok()
|
||||||
} else {
|
} else {
|
||||||
usize::try_from(e.len().await? as i64 - idx).ok()
|
usize::try_from(e.len(state).await? as i64 - idx).ok()
|
||||||
};
|
};
|
||||||
|
|
||||||
let idx = match idx {
|
let idx = match idx {
|
||||||
@@ -132,7 +136,7 @@ impl PileValue {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
out = e.get(idx).await?;
|
out = e.get(state, idx).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -147,7 +151,10 @@ impl PileValue {
|
|||||||
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
|
/// - `ObjectExtractor` is recursed into; returns `Some(Object(map))` with
|
||||||
/// only the fields that had data, or `None` if all fields were absent.
|
/// only the fields that had data, or `None` if all fields were absent.
|
||||||
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
|
/// - `Array` / `ListExtractor` are treated as opaque leaf values (not descended into).
|
||||||
pub async fn count_fields(&self) -> Result<Option<Value>, std::io::Error> {
|
pub async fn count_fields(
|
||||||
|
&self,
|
||||||
|
state: &ExtractState,
|
||||||
|
) -> Result<Option<Value>, std::io::Error> {
|
||||||
Ok(match self {
|
Ok(match self {
|
||||||
Self::Null => None,
|
Self::Null => None,
|
||||||
|
|
||||||
@@ -156,18 +163,18 @@ impl PileValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
|
Self::Array(x) => (!x.is_empty()).then(|| Value::Number(1u64.into())),
|
||||||
Self::ListExtractor(x) => (!x.is_empty().await?).then(|| Value::Number(1u64.into())),
|
Self::ListExtractor(x) => (x.len(state).await? > 0).then(|| Value::Number(1u64.into())),
|
||||||
|
|
||||||
Self::ObjectExtractor(_) | Self::Item(_) => {
|
Self::ObjectExtractor(_) | Self::Item(_) => {
|
||||||
let e = self.object_extractor();
|
let e = self.object_extractor();
|
||||||
let keys = e.fields().await?;
|
let keys = e.fields().await?;
|
||||||
let mut map = Map::new();
|
let mut map = Map::new();
|
||||||
for k in &keys {
|
for k in &keys {
|
||||||
let v = match e.field(k, None).await? {
|
let v = match e.field(state, k, None).await? {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => continue,
|
None => continue,
|
||||||
};
|
};
|
||||||
if let Some(counted) = Box::pin(v.count_fields()).await? {
|
if let Some(counted) = Box::pin(v.count_fields(state)).await? {
|
||||||
map.insert(k.to_string(), counted);
|
map.insert(k.to_string(), counted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -187,7 +194,7 @@ impl PileValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn to_json(&self) -> Result<Value, std::io::Error> {
|
pub async fn to_json(&self, state: &ExtractState) -> Result<Value, std::io::Error> {
|
||||||
Ok(match self {
|
Ok(match self {
|
||||||
Self::Null => Value::Null,
|
Self::Null => Value::Null,
|
||||||
Self::U64(x) => Value::Number((*x).into()),
|
Self::U64(x) => Value::Number((*x).into()),
|
||||||
@@ -201,12 +208,12 @@ impl PileValue {
|
|||||||
|
|
||||||
Self::Array(_) | Self::ListExtractor(_) => {
|
Self::Array(_) | Self::ListExtractor(_) => {
|
||||||
let e = self.list_extractor();
|
let e = self.list_extractor();
|
||||||
return e.to_json().await;
|
return e.to_json(state).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::ObjectExtractor(_) | Self::Item(_) => {
|
Self::ObjectExtractor(_) | Self::Item(_) => {
|
||||||
let e = self.object_extractor();
|
let e = self.object_extractor();
|
||||||
return e.to_json().await;
|
return e.to_json(state).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ pile-dataset = { workspace = true, features = ["axum", "pdfium"] }
|
|||||||
pile-value = { workspace = true, features = ["pdfium"] }
|
pile-value = { workspace = true, features = ["pdfium"] }
|
||||||
pile-config = { workspace = true }
|
pile-config = { workspace = true }
|
||||||
|
|
||||||
|
aws-sdk-s3 = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true }
|
tracing-subscriber = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use anstyle::{AnsiColor, Color, Style};
|
use anstyle::{AnsiColor, Color, Style};
|
||||||
|
use indicatif::ProgressStyle;
|
||||||
|
|
||||||
pub fn clap_styles() -> clap::builder::Styles {
|
pub fn clap_styles() -> clap::builder::Styles {
|
||||||
clap::builder::Styles::styled()
|
clap::builder::Styles::styled()
|
||||||
@@ -36,7 +37,6 @@ pub fn clap_styles() -> clap::builder::Styles {
|
|||||||
.placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White))))
|
.placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
pub fn progress_big() -> ProgressStyle {
|
pub fn progress_big() -> ProgressStyle {
|
||||||
return ProgressStyle::default_bar()
|
return ProgressStyle::default_bar()
|
||||||
@@ -50,6 +50,7 @@ pub fn progress_big() -> ProgressStyle {
|
|||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
#[expect(clippy::unwrap_used)]
|
#[expect(clippy::unwrap_used)]
|
||||||
pub fn spinner_small() -> ProgressStyle {
|
pub fn spinner_small() -> ProgressStyle {
|
||||||
return ProgressStyle::default_bar()
|
return ProgressStyle::default_bar()
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use pile_config::{Label, Source};
|
|||||||
use pile_dataset::{Datasets, index::DbFtsIndex};
|
use pile_dataset::{Datasets, index::DbFtsIndex};
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::{
|
use pile_value::{
|
||||||
|
extract::traits::ExtractState,
|
||||||
source::{DataSource, DirDataSource},
|
source::{DataSource, DirDataSource},
|
||||||
value::{Item, PileValue},
|
value::{Item, PileValue},
|
||||||
};
|
};
|
||||||
@@ -46,6 +47,7 @@ impl CliCmd for AnnotateCommand {
|
|||||||
.ok_or_else(|| anyhow::anyhow!("invalid field name {:?}", self.field))?;
|
.ok_or_else(|| anyhow::anyhow!("invalid field name {:?}", self.field))?;
|
||||||
let dest_path = Self::parse_dest(&self.dest)?;
|
let dest_path = Self::parse_dest(&self.dest)?;
|
||||||
|
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
let ds = Datasets::open(&self.config)
|
let ds = Datasets::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
@@ -58,7 +60,7 @@ impl CliCmd for AnnotateCommand {
|
|||||||
|
|
||||||
for (name, source) in &ds.config.dataset.source {
|
for (name, source) in &ds.config.dataset.source {
|
||||||
match source {
|
match source {
|
||||||
Source::Filesystem { path, sidecars } => {
|
Source::Filesystem { path, sidecars, .. } => {
|
||||||
if !sidecars {
|
if !sidecars {
|
||||||
warn!("Source {name} does not have sidecars enabled, skipping");
|
warn!("Source {name} does not have sidecars enabled, skipping");
|
||||||
continue;
|
continue;
|
||||||
@@ -75,8 +77,10 @@ impl CliCmd for AnnotateCommand {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let item = PileValue::Item(item.clone());
|
let item = PileValue::Item(item.clone());
|
||||||
let Some(value) =
|
let Some(value) = index
|
||||||
index.get_field(&item, &field).await.with_context(|| {
|
.get_field(&state, &item, &field)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
format!("while extracting field from {}", path.display())
|
format!("while extracting field from {}", path.display())
|
||||||
})?
|
})?
|
||||||
else {
|
else {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use anyhow::{Context, Result};
|
|||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::value::PileValue;
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use std::{path::PathBuf, time::Instant};
|
use std::{path::PathBuf, time::Instant};
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
@@ -61,6 +61,7 @@ impl CliCmd for FieldsCommand {
|
|||||||
let mut total_counts: Map<String, Value> = Map::new();
|
let mut total_counts: Map<String, Value> = Map::new();
|
||||||
let mut total_items = 0u64;
|
let mut total_items = 0u64;
|
||||||
let jobs = self.jobs.max(1);
|
let jobs = self.jobs.max(1);
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
||||||
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
||||||
@@ -93,9 +94,10 @@ impl CliCmd for FieldsCommand {
|
|||||||
let item =
|
let item =
|
||||||
item_result.with_context(|| format!("while reading source {name}"))?;
|
item_result.with_context(|| format!("while reading source {name}"))?;
|
||||||
let name = name.clone();
|
let name = name.clone();
|
||||||
|
let state = state.clone();
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let item = PileValue::Item(item);
|
let item = PileValue::Item(item);
|
||||||
let result = item.count_fields().await.with_context(|| {
|
let result = item.count_fields(&state).await.with_context(|| {
|
||||||
format!("while counting fields in source {name}")
|
format!("while counting fields in source {name}")
|
||||||
})?;
|
})?;
|
||||||
Ok(result.and_then(|v| {
|
Ok(result.and_then(|v| {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use anyhow::{Context, Result};
|
|||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use pile_value::extract::traits::ExtractState;
|
||||||
use std::{fmt::Debug, path::PathBuf};
|
use std::{fmt::Debug, path::PathBuf};
|
||||||
|
|
||||||
use crate::{CliCmd, GlobalContext};
|
use crate::{CliCmd, GlobalContext};
|
||||||
@@ -26,14 +27,17 @@ impl CliCmd for IndexCommand {
|
|||||||
let ds = Datasets::open(&self.config)
|
let ds = Datasets::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
ds.fts_refresh(self.jobs, Some(flag)).await.map_err(|x| {
|
let state = ExtractState { ignore_mime: false };
|
||||||
x.map_err(|x| {
|
ds.fts_refresh(&state, self.jobs, Some(flag))
|
||||||
anyhow::Error::from(x).context(format!(
|
.await
|
||||||
"while refreshing fts for {}",
|
.map_err(|x| {
|
||||||
self.config.display()
|
x.map_err(|x| {
|
||||||
))
|
anyhow::Error::from(x).context(format!(
|
||||||
})
|
"while refreshing fts for {}",
|
||||||
})?;
|
self.config.display()
|
||||||
|
))
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use clap::Args;
|
|||||||
use pile_config::objectpath::ObjectPath;
|
use pile_config::objectpath::ObjectPath;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::value::PileValue;
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
use std::{path::PathBuf, str::FromStr, sync::Arc};
|
use std::{path::PathBuf, str::FromStr, sync::Arc};
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
@@ -48,6 +48,7 @@ impl CliCmd for ListCommand {
|
|||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
let jobs = self.jobs.max(1);
|
let jobs = self.jobs.max(1);
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
for (name, dataset) in ds.sources.iter().filter(|(name, _)| {
|
||||||
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
self.source.is_empty() || self.source.iter().any(|s| s == name.as_str())
|
||||||
@@ -78,10 +79,11 @@ impl CliCmd for ListCommand {
|
|||||||
let key = item.key().to_string();
|
let key = item.key().to_string();
|
||||||
let path = path.clone();
|
let path = path.clone();
|
||||||
let invert = self.invert;
|
let invert = self.invert;
|
||||||
|
let state = state.clone();
|
||||||
|
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let item = PileValue::Item(item);
|
let item = PileValue::Item(item);
|
||||||
let value = item.query(&path).await?;
|
let value = item.query(&state, &path).await?;
|
||||||
|
|
||||||
let is_present =
|
let is_present =
|
||||||
matches!(value, Some(v) if !matches!(v, PileValue::Null));
|
matches!(value, Some(v) if !matches!(v, PileValue::Null));
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use anyhow::{Context, Result};
|
|||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use pile_value::extract::traits::ExtractState;
|
||||||
use std::{fmt::Debug, path::PathBuf};
|
use std::{fmt::Debug, path::PathBuf};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
@@ -42,16 +43,20 @@ impl CliCmd for LookupCommand {
|
|||||||
let ds = Datasets::open(&self.config)
|
let ds = Datasets::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
if self.refresh && ds.needs_fts().await.context("while checking dataset fts")? {
|
if self.refresh && ds.needs_fts().await.context("while checking dataset fts")? {
|
||||||
info!("FTS index is missing or out-of-date, regenerating");
|
info!("FTS index is missing or out-of-date, regenerating");
|
||||||
ds.fts_refresh(self.jobs, Some(flag)).await.map_err(|x| {
|
ds.fts_refresh(&state, self.jobs, Some(flag))
|
||||||
x.map_err(|x| {
|
.await
|
||||||
anyhow::Error::from(x).context(format!(
|
.map_err(|x| {
|
||||||
"while refreshing fts for {}",
|
x.map_err(|x| {
|
||||||
self.config.display()
|
anyhow::Error::from(x).context(format!(
|
||||||
))
|
"while refreshing fts for {}",
|
||||||
})
|
self.config.display()
|
||||||
})?;
|
))
|
||||||
|
})
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let results = ds
|
let results = ds
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ mod list;
|
|||||||
mod lookup;
|
mod lookup;
|
||||||
mod probe;
|
mod probe;
|
||||||
mod serve;
|
mod serve;
|
||||||
|
mod upload;
|
||||||
|
|
||||||
use crate::{Cli, GlobalContext};
|
use crate::{Cli, GlobalContext};
|
||||||
|
|
||||||
@@ -60,7 +61,7 @@ pub enum SubCommand {
|
|||||||
},
|
},
|
||||||
|
|
||||||
/// Print an overview of all fields present in this dataset
|
/// Print an overview of all fields present in this dataset
|
||||||
Overview {
|
Fields {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
cmd: fields::FieldsCommand,
|
cmd: fields::FieldsCommand,
|
||||||
},
|
},
|
||||||
@@ -76,6 +77,12 @@ pub enum SubCommand {
|
|||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
cmd: serve::ServeCommand,
|
cmd: serve::ServeCommand,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Upload a filesystem source to an S3 source
|
||||||
|
Upload {
|
||||||
|
#[command(flatten)]
|
||||||
|
cmd: upload::UploadCommand,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliCmdDispatch for SubCommand {
|
impl CliCmdDispatch for SubCommand {
|
||||||
@@ -87,9 +94,10 @@ impl CliCmdDispatch for SubCommand {
|
|||||||
Self::Index { cmd } => cmd.start(ctx),
|
Self::Index { cmd } => cmd.start(ctx),
|
||||||
Self::List { cmd } => cmd.start(ctx),
|
Self::List { cmd } => cmd.start(ctx),
|
||||||
Self::Lookup { cmd } => cmd.start(ctx),
|
Self::Lookup { cmd } => cmd.start(ctx),
|
||||||
Self::Overview { cmd } => cmd.start(ctx),
|
Self::Fields { cmd } => cmd.start(ctx),
|
||||||
Self::Probe { cmd } => cmd.start(ctx),
|
Self::Probe { cmd } => cmd.start(ctx),
|
||||||
Self::Serve { cmd } => cmd.start(ctx),
|
Self::Serve { cmd } => cmd.start(ctx),
|
||||||
|
Self::Upload { cmd } => cmd.start(ctx),
|
||||||
|
|
||||||
Self::Docs {} => {
|
Self::Docs {} => {
|
||||||
print_help_recursively(&mut Cli::command(), None);
|
print_help_recursively(&mut Cli::command(), None);
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use clap::Args;
|
|||||||
use pile_config::{Label, objectpath::ObjectPath};
|
use pile_config::{Label, objectpath::ObjectPath};
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
use pile_value::value::PileValue;
|
use pile_value::{extract::traits::ExtractState, value::PileValue};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use crate::{CliCmd, GlobalContext};
|
use crate::{CliCmd, GlobalContext};
|
||||||
@@ -39,12 +39,14 @@ impl CliCmd for ProbeCommand {
|
|||||||
let ds = Datasets::open(&self.config)
|
let ds = Datasets::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
let json = if let Some(path_str) = self.path {
|
let json = if let Some(path_str) = self.path {
|
||||||
let path: ObjectPath = path_str
|
let path: ObjectPath = path_str
|
||||||
.parse()
|
.parse()
|
||||||
.with_context(|| format!("invalid path {path_str:?}"))?;
|
.with_context(|| format!("invalid path {path_str:?}"))?;
|
||||||
|
|
||||||
ds.get_field(&source, &self.key, &path)
|
ds.get_field(&state, &source, &self.key, &path)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("while extracting {}", self.key))?
|
.with_context(|| format!("while extracting {}", self.key))?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
@@ -56,7 +58,7 @@ impl CliCmd for ProbeCommand {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let item = PileValue::Item(item);
|
let item = PileValue::Item(item);
|
||||||
item.to_json()
|
item.to_json(&state)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("while extracting {}", self.key))?
|
.with_context(|| format!("while extracting {}", self.key))?
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use anyhow::{Context, Result};
|
|||||||
use clap::Args;
|
use clap::Args;
|
||||||
use pile_dataset::Datasets;
|
use pile_dataset::Datasets;
|
||||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use pile_value::extract::traits::ExtractState;
|
||||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
@@ -35,9 +36,11 @@ impl CliCmd for ServeCommand {
|
|||||||
let ds = Datasets::open(&self.config)
|
let ds = Datasets::open(&self.config)
|
||||||
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let state = ExtractState { ignore_mime: false };
|
||||||
|
|
||||||
if self.refresh && ds.needs_fts().await.context("while checking dataset fts")? {
|
if self.refresh && ds.needs_fts().await.context("while checking dataset fts")? {
|
||||||
info!("FTS index is missing or out-of-date, regenerating");
|
info!("FTS index is missing or out-of-date, regenerating");
|
||||||
ds.fts_refresh(self.jobs, Some(flag.clone()))
|
ds.fts_refresh(&state, self.jobs, Some(flag.clone()))
|
||||||
.await
|
.await
|
||||||
.map_err(|x| {
|
.map_err(|x| {
|
||||||
x.map_err(|x| {
|
x.map_err(|x| {
|
||||||
|
|||||||
272
crates/pile/src/command/upload.rs
Normal file
272
crates/pile/src/command/upload.rs
Normal file
@@ -0,0 +1,272 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use aws_sdk_s3::primitives::ByteStream;
|
||||||
|
use clap::Args;
|
||||||
|
use indicatif::ProgressBar;
|
||||||
|
use pile_config::Label;
|
||||||
|
use pile_dataset::{Dataset, Datasets};
|
||||||
|
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||||
|
use pile_value::source::{DataSource, DirDataSource, S3DataSource};
|
||||||
|
use std::{path::PathBuf, sync::Arc, time::Duration};
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{CliCmd, GlobalContext, cli::progress_big};
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct UploadCommand {
|
||||||
|
/// Name of the filesystem source to upload from
|
||||||
|
dir_source: String,
|
||||||
|
|
||||||
|
/// Name of the S3 source to upload to
|
||||||
|
s3_source: String,
|
||||||
|
|
||||||
|
/// Prefix path under the S3 source to upload files to
|
||||||
|
prefix: String,
|
||||||
|
|
||||||
|
/// Path to dataset config
|
||||||
|
#[arg(long, short = 'c', default_value = "./pile.toml")]
|
||||||
|
config: PathBuf,
|
||||||
|
|
||||||
|
/// Override the S3 bucket from pile.toml
|
||||||
|
#[arg(long)]
|
||||||
|
bucket: Option<String>,
|
||||||
|
|
||||||
|
/// Allow overwriting files that already exist at the target prefix
|
||||||
|
#[arg(long)]
|
||||||
|
overwrite: bool,
|
||||||
|
|
||||||
|
/// Delete all files at the target prefix before uploading
|
||||||
|
#[arg(long)]
|
||||||
|
delete_existing_forever: bool,
|
||||||
|
|
||||||
|
/// Number of parallel upload jobs
|
||||||
|
#[arg(long, short = 'j', default_value = "5")]
|
||||||
|
jobs: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CliCmd for UploadCommand {
|
||||||
|
async fn run(
|
||||||
|
self,
|
||||||
|
ctx: GlobalContext,
|
||||||
|
flag: CancelFlag,
|
||||||
|
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
|
||||||
|
let ds = Datasets::open(&self.config)
|
||||||
|
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
|
||||||
|
|
||||||
|
let dir_label = Label::new(&self.dir_source)
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.dir_source))?;
|
||||||
|
let s3_label = Label::new(&self.s3_source)
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("invalid source name: {}", self.s3_source))?;
|
||||||
|
|
||||||
|
let dir_ds: Arc<DirDataSource> = get_dir_source(&ds, &dir_label, &self.dir_source)?;
|
||||||
|
let s3_ds: Arc<S3DataSource> = get_s3_source(&ds, &s3_label, &self.s3_source)?;
|
||||||
|
|
||||||
|
let bucket = self
|
||||||
|
.bucket
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or(s3_ds.bucket.as_str())
|
||||||
|
.to_owned();
|
||||||
|
let full_prefix = self.prefix.trim_matches('/').to_owned();
|
||||||
|
|
||||||
|
// Check for existing objects at the target prefix
|
||||||
|
let existing_keys = list_prefix(&s3_ds.client, &bucket, &full_prefix)
|
||||||
|
.await
|
||||||
|
.context("while checking for existing objects at target prefix")?;
|
||||||
|
|
||||||
|
if !existing_keys.is_empty() {
|
||||||
|
if self.delete_existing_forever {
|
||||||
|
info!(
|
||||||
|
"Deleting {} existing object(s) at '{}'",
|
||||||
|
existing_keys.len(),
|
||||||
|
full_prefix
|
||||||
|
);
|
||||||
|
for key in &existing_keys {
|
||||||
|
s3_ds
|
||||||
|
.client
|
||||||
|
.delete_object()
|
||||||
|
.bucket(&bucket)
|
||||||
|
.key(key)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("while deleting existing object '{key}'"))?;
|
||||||
|
}
|
||||||
|
} else if !self.overwrite {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"{} file(s) already exist at '{}'. \
|
||||||
|
Pass --overwrite to allow overwriting, \
|
||||||
|
or --delete-existing-forever to delete them first.",
|
||||||
|
existing_keys.len(),
|
||||||
|
full_prefix
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count total files before uploading so we can show accurate progress
|
||||||
|
let total = {
|
||||||
|
let mut count = 0u64;
|
||||||
|
let mut count_stream = Arc::clone(&dir_ds).iter();
|
||||||
|
while let Some(result) = count_stream.next().await {
|
||||||
|
result.context("while counting filesystem source")?;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
count
|
||||||
|
};
|
||||||
|
|
||||||
|
// Walk filesystem source and upload files in parallel
|
||||||
|
let jobs = self.jobs.max(1);
|
||||||
|
let mut uploaded: u64 = 0;
|
||||||
|
let mut stream = Arc::clone(&dir_ds).iter();
|
||||||
|
let mut join_set: JoinSet<Result<String, anyhow::Error>> = JoinSet::new();
|
||||||
|
|
||||||
|
let pb = ctx.mp.add(ProgressBar::new(total));
|
||||||
|
pb.set_style(progress_big());
|
||||||
|
pb.enable_steady_tick(Duration::from_millis(100));
|
||||||
|
pb.set_message(full_prefix.clone());
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Drain completed tasks before checking for cancellation or new work
|
||||||
|
while join_set.len() >= jobs {
|
||||||
|
match join_set.join_next().await {
|
||||||
|
Some(Ok(Ok(key))) => {
|
||||||
|
info!("Uploaded {key}");
|
||||||
|
pb.set_message(key);
|
||||||
|
pb.inc(1);
|
||||||
|
uploaded += 1;
|
||||||
|
}
|
||||||
|
Some(Ok(Err(e))) => return Err(e.into()),
|
||||||
|
Some(Err(e)) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()),
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if flag.is_cancelled() {
|
||||||
|
join_set.abort_all();
|
||||||
|
return Err(CancelableTaskError::Cancelled);
|
||||||
|
}
|
||||||
|
|
||||||
|
let item = match stream.next().await {
|
||||||
|
None => break,
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(anyhow::Error::from(e)
|
||||||
|
.context("while iterating filesystem source")
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
Some(Ok(item)) => item,
|
||||||
|
};
|
||||||
|
|
||||||
|
let item_path = PathBuf::from(item.key().as_str());
|
||||||
|
let relative = item_path.strip_prefix(&dir_ds.dir).with_context(|| {
|
||||||
|
format!("path '{}' is not under source root", item_path.display())
|
||||||
|
})?;
|
||||||
|
let relative_str = relative
|
||||||
|
.to_str()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("non-UTF-8 path: {}", item_path.display()))?
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
let key = format!("{full_prefix}/{relative_str}");
|
||||||
|
let mime = item.mime().to_string();
|
||||||
|
let client = Arc::clone(&s3_ds.client);
|
||||||
|
let bucket = bucket.clone();
|
||||||
|
|
||||||
|
join_set.spawn(async move {
|
||||||
|
let body = ByteStream::from_path(&item_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("while opening '{}'", item_path.display()))?;
|
||||||
|
|
||||||
|
client
|
||||||
|
.put_object()
|
||||||
|
.bucket(&bucket)
|
||||||
|
.key(&key)
|
||||||
|
.content_type(&mime)
|
||||||
|
.body(body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("while uploading '{}' to '{key}'", item_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(key)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain remaining tasks
|
||||||
|
while let Some(result) = join_set.join_next().await {
|
||||||
|
match result {
|
||||||
|
Ok(Ok(key)) => {
|
||||||
|
info!("Uploaded {key}");
|
||||||
|
pb.set_message(key);
|
||||||
|
pb.inc(1);
|
||||||
|
uploaded += 1;
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => return Err(e.into()),
|
||||||
|
Err(e) => return Err(anyhow::anyhow!("upload task panicked: {e}").into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pb.finish_and_clear();
|
||||||
|
info!("Done: uploaded {uploaded} file(s) to '{full_prefix}'");
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_dir_source(
|
||||||
|
ds: &Datasets,
|
||||||
|
label: &Label,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Arc<DirDataSource>, anyhow::Error> {
|
||||||
|
match ds.sources.get(label) {
|
||||||
|
Some(Dataset::Dir(d)) => Ok(Arc::clone(d)),
|
||||||
|
Some(_) => Err(anyhow::anyhow!(
|
||||||
|
"source '{name}' is not a filesystem source"
|
||||||
|
)),
|
||||||
|
None => Err(anyhow::anyhow!("source '{name}' not found in config")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_s3_source(
|
||||||
|
ds: &Datasets,
|
||||||
|
label: &Label,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Arc<S3DataSource>, anyhow::Error> {
|
||||||
|
match ds.sources.get(label) {
|
||||||
|
Some(Dataset::S3(s)) => Ok(Arc::clone(s)),
|
||||||
|
Some(_) => Err(anyhow::anyhow!("source '{name}' is not an S3 source")),
|
||||||
|
None => Err(anyhow::anyhow!("source '{name}' not found in config")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all S3 object keys under the given prefix.
|
||||||
|
async fn list_prefix(
|
||||||
|
client: &aws_sdk_s3::Client,
|
||||||
|
bucket: &str,
|
||||||
|
prefix: &str,
|
||||||
|
) -> Result<Vec<String>> {
|
||||||
|
let mut keys = Vec::new();
|
||||||
|
let mut continuation_token: Option<String> = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let mut req = client.list_objects_v2().bucket(bucket).prefix(prefix);
|
||||||
|
|
||||||
|
if let Some(token) = continuation_token {
|
||||||
|
req = req.continuation_token(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
let resp = req.send().await.context("list_objects_v2 failed")?;
|
||||||
|
|
||||||
|
for obj in resp.contents() {
|
||||||
|
if let Some(k) = obj.key() {
|
||||||
|
keys.push(k.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !resp.is_truncated().unwrap_or(false) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
continuation_token = resp.next_continuation_token().map(ToOwned::to_owned);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(keys)
|
||||||
|
}
|
||||||
@@ -35,8 +35,7 @@ struct Cli {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct GlobalContext {
|
pub struct GlobalContext {
|
||||||
#[expect(dead_code)]
|
pub mp: MultiProgress,
|
||||||
mp: MultiProgress,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> ExitCode {
|
fn main() -> ExitCode {
|
||||||
|
|||||||
Reference in New Issue
Block a user