Compare commits
2 Commits
9b8f825667
...
d3ab2684f4
| Author | SHA1 | Date | |
|---|---|---|---|
| d3ab2684f4 | |||
| 4d4e9c93a2 |
15
Cargo.lock
generated
15
Cargo.lock
generated
@@ -2016,7 +2016,6 @@ version = "0.0.2"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"pile-dataset",
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -2098,6 +2097,7 @@ dependencies = [
|
||||
"id3",
|
||||
"image",
|
||||
"kamadak-exif",
|
||||
"md5",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"pdf",
|
||||
@@ -2109,6 +2109,8 @@ dependencies = [
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"sha2 0.11.0-rc.5",
|
||||
"smartstring",
|
||||
"strum",
|
||||
"tokio",
|
||||
@@ -2654,6 +2656,17 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"digest 0.10.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.9"
|
||||
|
||||
@@ -111,6 +111,8 @@ bytes = "1"
|
||||
toml = "1.0.3"
|
||||
toml_edit = "0.25.4"
|
||||
sha2 = "0.11.0-rc.5"
|
||||
sha1 = "0.10"
|
||||
md5 = "0.7"
|
||||
blake3 = "1.8.3"
|
||||
dotenvy = "0.15.7"
|
||||
envy = "0.4.2"
|
||||
|
||||
@@ -11,7 +11,6 @@ workspace = true
|
||||
pile-dataset = { workspace = true, features = ["axum"] }
|
||||
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
futures-core = "0.3"
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
@@ -3,10 +3,8 @@ use axum::{
|
||||
routing::any,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use futures_core::Stream;
|
||||
use reqwest::{Client, StatusCode, header};
|
||||
use serde::Deserialize;
|
||||
use std::pin::Pin;
|
||||
use thiserror::Error;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
@@ -120,26 +118,6 @@ impl DatasetClient {
|
||||
check_status(resp).await?.json().await.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// `GET /item` — stream the raw bytes of an item.
|
||||
///
|
||||
/// The returned stream yields chunks as they arrive from the server.
|
||||
pub async fn get_item(
|
||||
&self,
|
||||
source: &str,
|
||||
key: &str,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>, ClientError> {
|
||||
let url = format!("{}/item", self.base_url);
|
||||
trace!(url, source, key, "GET /item");
|
||||
let resp = self
|
||||
.client
|
||||
.get(url)
|
||||
.query(&[("source", source), ("key", key)])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(Box::pin(check_status(resp).await?.bytes_stream()))
|
||||
}
|
||||
|
||||
/// `GET /extract` — extract a field from an item by object path (e.g. `$.flac.title`).
|
||||
pub async fn get_extract(
|
||||
&self,
|
||||
|
||||
@@ -302,6 +302,7 @@ impl Datasets {
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let start = Instant::now();
|
||||
let workdir = match self.path_workdir.as_ref() {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
@@ -313,6 +314,14 @@ impl Datasets {
|
||||
let fts_tmp_dir = workdir.join(".tmp-fts");
|
||||
let fts_dir = workdir.join("fts");
|
||||
|
||||
debug!(
|
||||
message = "Rebuilding fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir
|
||||
);
|
||||
|
||||
if fts_tmp_dir.is_dir() {
|
||||
warn!("Removing temporary index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?;
|
||||
@@ -392,9 +401,18 @@ impl Datasets {
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
info!("Committing {total} documents");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
|
||||
debug!(
|
||||
message = "Rebuilt fts index",
|
||||
dataset = self.config.dataset.name.as_str(),
|
||||
?fts_dir,
|
||||
?fts_tmp_dir,
|
||||
?workdir,
|
||||
n_docs = total,
|
||||
time_ms = start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if fts_dir.is_dir() {
|
||||
warn!("Removing existing index in {}", fts_dir.display());
|
||||
std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?;
|
||||
|
||||
@@ -21,6 +21,9 @@ toml = { workspace = true }
|
||||
smartstring = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
blake3 = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
sha1 = { workspace = true }
|
||||
md5 = { workspace = true }
|
||||
epub = { workspace = true }
|
||||
kamadak-exif = { workspace = true }
|
||||
pdf = { workspace = true }
|
||||
|
||||
111
crates/pile-value/src/extract/blob/hash.rs
Normal file
111
crates/pile-value/src/extract/blob/hash.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use crate::{
|
||||
extract::traits::{ExtractState, ObjectExtractor},
|
||||
value::{BinaryPileValue, PileValue},
|
||||
};
|
||||
use pile_config::Label;
|
||||
use pile_io::SyncReadBridge;
|
||||
use std::{io::Read, sync::Arc};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
fn to_hex(bytes: &[u8]) -> String {
|
||||
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||
}
|
||||
|
||||
macro_rules! hash_algos {
|
||||
($($name:ident),* $(,)?) => {
|
||||
pub struct HashExtractor {
|
||||
item: BinaryPileValue,
|
||||
$($name: OnceCell<String>,)*
|
||||
}
|
||||
|
||||
impl HashExtractor {
|
||||
pub fn new(item: &BinaryPileValue) -> Self {
|
||||
Self {
|
||||
item: item.clone(),
|
||||
$($name: OnceCell::new(),)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static LABELS: std::sync::LazyLock<Vec<Label>> = std::sync::LazyLock::new(|| {
|
||||
vec![$(Label::new(stringify!($name)).unwrap()),*]
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
hash_algos!(blake3, md5, sha1, sha224, sha256, sha384, sha512);
|
||||
|
||||
impl HashExtractor {
|
||||
async fn compute(&self, name: &Label) -> Result<Option<String>, std::io::Error> {
|
||||
let name_str = name.as_ref();
|
||||
|
||||
macro_rules! algo {
|
||||
($cell:ident, $compute:expr) => {
|
||||
if name_str == stringify!($cell) {
|
||||
return Ok(Some(
|
||||
self.$cell
|
||||
.get_or_try_init(|| async {
|
||||
let read = self.item.read().await?;
|
||||
let mut read = SyncReadBridge::new_current(read);
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut bytes = Vec::new();
|
||||
read.read_to_end(&mut bytes)?;
|
||||
Ok::<String, std::io::Error>($compute(&bytes))
|
||||
})
|
||||
.await?
|
||||
})
|
||||
.await?
|
||||
.clone(),
|
||||
));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
algo!(blake3, |b: &Vec<u8>| blake3::hash(b).to_hex().to_string());
|
||||
algo!(md5, |b: &Vec<u8>| format!("{:x}", md5::compute(b)));
|
||||
algo!(sha1, |b: &Vec<u8>| {
|
||||
use sha1::Digest;
|
||||
to_hex(sha1::Sha1::digest(b).as_ref())
|
||||
});
|
||||
algo!(sha224, |b: &Vec<u8>| {
|
||||
use sha2::Digest;
|
||||
to_hex(sha2::Sha224::digest(b).as_ref())
|
||||
});
|
||||
algo!(sha256, |b: &Vec<u8>| {
|
||||
use sha2::Digest;
|
||||
to_hex(sha2::Sha256::digest(b).as_ref())
|
||||
});
|
||||
algo!(sha384, |b: &Vec<u8>| {
|
||||
use sha2::Digest;
|
||||
to_hex(sha2::Sha384::digest(b).as_ref())
|
||||
});
|
||||
algo!(sha512, |b: &Vec<u8>| {
|
||||
use sha2::Digest;
|
||||
to_hex(sha2::Sha512::digest(b).as_ref())
|
||||
});
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectExtractor for HashExtractor {
|
||||
async fn field(
|
||||
&self,
|
||||
_state: &ExtractState,
|
||||
name: &Label,
|
||||
args: Option<&str>,
|
||||
) -> Result<Option<PileValue>, std::io::Error> {
|
||||
if args.is_some() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(self
|
||||
.compute(name)
|
||||
.await?
|
||||
.map(|s| PileValue::String(Arc::new(s.into()))))
|
||||
}
|
||||
|
||||
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
|
||||
Ok(LABELS.clone())
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,9 @@ pub use text::*;
|
||||
mod image;
|
||||
pub use image::*;
|
||||
|
||||
mod hash;
|
||||
pub use hash::*;
|
||||
|
||||
use crate::{
|
||||
extract::{
|
||||
misc::MapExtractor,
|
||||
@@ -85,6 +88,10 @@ impl BinaryExtractor {
|
||||
Label::new("text").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(TextExtractor::new(item))),
|
||||
),
|
||||
(
|
||||
Label::new("hash").unwrap(),
|
||||
PileValue::ObjectExtractor(Arc::new(HashExtractor::new(item))),
|
||||
),
|
||||
]),
|
||||
};
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use mime::Mime;
|
||||
use pile_config::objectpath::{ObjectPath, PathSegment};
|
||||
use pile_io::SyncReadBridge;
|
||||
use serde_json::{Map, Value};
|
||||
use smartstring::{LazyCompact, SmartString};
|
||||
use std::{fmt::Debug, fs::File, io::Cursor, path::PathBuf, sync::Arc};
|
||||
@@ -50,18 +49,6 @@ impl BinaryPileValue {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
|
||||
let read = self.read().await?;
|
||||
let mut read = SyncReadBridge::new_current(read);
|
||||
let out = tokio::task::spawn_blocking(move || {
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
std::io::copy(&mut read, &mut hasher)?;
|
||||
return Ok::<_, std::io::Error>(hasher.finalize());
|
||||
})
|
||||
.await??;
|
||||
return Ok(out);
|
||||
}
|
||||
|
||||
pub fn mime(&self) -> &Mime {
|
||||
match self {
|
||||
Self::Blob { mime, .. } => mime,
|
||||
|
||||
@@ -8,10 +8,11 @@ use axum::{
|
||||
routing::get,
|
||||
};
|
||||
use clap::Args;
|
||||
use pile_dataset::Datasets;
|
||||
use pile_dataset::{DatasetError, Datasets};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use pile_value::extract::traits::ExtractState;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc};
|
||||
use std::{fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tracing::{error, info};
|
||||
use utoipa::{OpenApi, ToSchema};
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
@@ -27,6 +28,18 @@ pub struct ServerCommand {
|
||||
/// If provided, do not serve docs
|
||||
#[arg(long)]
|
||||
no_docs: bool,
|
||||
|
||||
/// If provided, never auto-refresh indices
|
||||
#[arg(long)]
|
||||
no_refresh: bool,
|
||||
|
||||
/// Number of threads to use to referesh indices
|
||||
#[arg(long, default_value = "5")]
|
||||
refresh_jobs: usize,
|
||||
|
||||
/// Refresh indices every `n` seconds
|
||||
#[arg(long, default_value = "300")]
|
||||
refresh_delay: usize,
|
||||
}
|
||||
|
||||
impl CliCmd for ServerCommand {
|
||||
@@ -47,6 +60,47 @@ impl CliCmd for ServerCommand {
|
||||
Arc::new(datasets)
|
||||
};
|
||||
|
||||
// Start auto-refresh task
|
||||
if !self.no_refresh {
|
||||
let datasets = datasets.clone();
|
||||
let jobs = self.refresh_jobs.max(1);
|
||||
let delay = self.refresh_delay.max(1);
|
||||
|
||||
async fn refresh_dataset(ds: &Datasets, jobs: usize) -> Result<(), DatasetError> {
|
||||
if ds.needs_fts().await? {
|
||||
let state = ExtractState { ignore_mime: false };
|
||||
match ds.fts_refresh(&state, jobs, None).await {
|
||||
Ok(()) => {}
|
||||
Err(CancelableTaskError::Error(err)) => return Err(err),
|
||||
Err(CancelableTaskError::Cancelled) => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
for ds in datasets.iter() {
|
||||
match refresh_dataset(ds, jobs).await {
|
||||
Ok(x) => x,
|
||||
Err(error) => {
|
||||
error!(
|
||||
message = "Error while refreshing dataset",
|
||||
dataset = ds.config.dataset.name.as_str(),
|
||||
?error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(delay as u64)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
|
||||
|
||||
let mut router = Router::new();
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use serde::Deserialize;
|
||||
use std::{num::NonZeroUsize, path::PathBuf};
|
||||
use tracing::info;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::config::{
|
||||
env::load_env,
|
||||
@@ -89,7 +89,7 @@ impl PileServerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
info!(message = "Config loaded", ?config);
|
||||
debug!(message = "Config loaded", ?config);
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user