Compare commits

..

3 Commits

Author SHA1 Message Date
35c450676e Implement hash() for S3
Some checks failed
CI / Typos (push) Successful in 20s
CI / Build and test (push) Successful in 2m35s
CI / Clippy (push) Successful in 6m45s
CI / Build and test (all features) (push) Has been cancelled
2026-03-16 22:24:54 -07:00
e03d3acfde Cancel fix 2026-03-16 22:24:39 -07:00
12130a0471 Refactor sidecars 2026-03-16 22:24:30 -07:00
21 changed files with 432 additions and 288 deletions

2
Cargo.lock generated
View File

@@ -2496,7 +2496,6 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-stream",
"toml",
"tracing",
"tracing-indicatif",
@@ -2578,7 +2577,6 @@ dependencies = [
"serde_json",
"smartstring",
"tokio",
"tokio-stream",
"toml",
"tracing",
"walkdir",

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
- configurable sidecars
- wasm
- tantivy index in s3?

View File

@@ -27,7 +27,7 @@ pub enum GroupPatternParseError {
#[derive(Debug, Clone, Default)]
pub struct GroupPattern {
pub parts: HashMap<Label, Vec<GroupSegment>>,
pub pattern: HashMap<Label, Vec<GroupSegment>>,
}
impl<'de> Deserialize<'de> for GroupPattern {
@@ -44,6 +44,6 @@ impl<'de> Deserialize<'de> for GroupPattern {
.collect();
parts.insert(label, segments);
}
Ok(GroupPattern { parts })
Ok(GroupPattern { pattern: parts })
}
}

View File

@@ -11,7 +11,6 @@ use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::I
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
use thiserror::Error;
use tokio::task::JoinSet;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{debug, info, trace, warn};
use crate::index::{DbFtsIndex, FtsLookupResult};
@@ -46,10 +45,10 @@ impl Dataset {
}
}
pub fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
pub fn iter(&self) -> Box<dyn Iterator<Item = &Item> + Send + '_> {
match self {
Self::Dir(ds) => ds.iter(),
Self::S3(ds) => ds.iter(),
Self::Dir(ds) => Box::new(ds.iter()),
Self::S3(ds) => Box::new(ds.iter()),
}
}
@@ -76,7 +75,7 @@ pub struct Datasets {
}
impl Datasets {
pub fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
pub async fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
let path_config = config.into();
let path_parent = path_config
.parent()
@@ -126,11 +125,10 @@ impl Datasets {
sources.insert(
label.clone(),
Dataset::Dir(Arc::new(DirDataSource::new(
label,
path_parent.join(path),
pattern.clone(),
))),
Dataset::Dir(
DirDataSource::new(label, path_parent.join(path), pattern.clone())
.await?,
),
);
}
@@ -155,9 +153,11 @@ impl Datasets {
region.clone(),
credentials,
pattern.clone(),
) {
)
.await
{
Ok(ds) => {
sources.insert(label.clone(), Dataset::S3(Arc::new(ds)));
sources.insert(label.clone(), Dataset::S3(ds));
}
Err(err) => {
warn!("Could not open S3 source {label}: {err}");
@@ -258,17 +258,17 @@ impl Datasets {
for (name, dataset) in &self.sources {
info!("Loading source {name}");
let mut stream = dataset.iter();
while let Some(item_result) = stream.next().await {
let stream = dataset.iter();
for item in stream {
if let Some(flag) = &flag
&& flag.is_cancelled()
{
return Err(CancelableTaskError::Cancelled);
}
let item = item_result.map_err(DatasetError::from)?;
let db = Arc::clone(&db_index);
let state = state.clone();
let item = item.clone();
join_set.spawn(async move {
let key = item.key();
let result = db.entry_to_document(&state, &item).await;

View File

@@ -80,10 +80,12 @@ impl CancelFlag {
#[inline]
pub async fn await_cancel(&self) {
if self.is_cancelled() {
return;
let notified = self.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if !self.is_cancelled() {
notified.await;
}
self.notify.notified().await;
}
#[inline]

View File

@@ -26,7 +26,6 @@ pdfium-render = { workspace = true, optional = true }
image = { workspace = true, optional = true }
id3 = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
async-trait = { workspace = true }
aws-sdk-s3 = { workspace = true }
mime = { workspace = true }

View File

@@ -0,0 +1,56 @@
use std::sync::Arc;
use pile_config::Label;
use crate::{
extract::traits::{ExtractState, ObjectExtractor},
value::{Item, PileValue},
};
pub struct GroupExtractor {
item: Item,
}
impl GroupExtractor {
pub fn new(item: &Item) -> Self {
Self { item: item.clone() }
}
}
#[async_trait::async_trait]
impl ObjectExtractor for GroupExtractor {
async fn field(
&self,
_state: &ExtractState,
name: &Label,
args: Option<&str>,
) -> Result<Option<PileValue>, std::io::Error> {
if args.is_some() {
return Ok(None);
}
Ok(self
.item
.group()
.get(name)
.map(|item| PileValue::ObjectExtractor(Arc::new(super::ItemExtractor::new(item)))))
}
async fn fields(&self) -> Result<Vec<Label>, std::io::Error> {
Ok(self.item.group().keys().cloned().collect())
}
async fn to_json(&self, _state: &ExtractState) -> Result<serde_json::Value, std::io::Error> {
Ok(serde_json::Value::Object(
self.item
.group()
.iter()
.map(|(k, v)| {
(
k.to_string(),
serde_json::Value::String(format!("<GroupItem ({})>", v.key())),
)
})
.collect(),
))
}
}

View File

@@ -25,6 +25,9 @@ mod toml;
use pile_config::Label;
pub use toml::*;
mod group;
pub use group::*;
use crate::{
extract::{
misc::MapExtractor,
@@ -74,6 +77,10 @@ impl ItemExtractor {
Label::new("toml").unwrap(),
PileValue::ObjectExtractor(Arc::new(TomlExtractor::new(item))),
),
(
Label::new("groups").unwrap(),
PileValue::ObjectExtractor(Arc::new(GroupExtractor::new(item))),
),
]),
};
@@ -102,6 +109,8 @@ impl ObjectExtractor for ItemExtractor {
Label::new("exif").unwrap(),
Label::new("pdf").unwrap(),
Label::new("json").unwrap(),
Label::new("toml").unwrap(),
Label::new("groups").unwrap(),
]);
}
}

View File

@@ -1,12 +1,20 @@
use chrono::{DateTime, Utc};
use pile_config::{Label, pattern::GroupPattern};
use std::{path::PathBuf, sync::Arc};
use tokio_stream::wrappers::ReceiverStream;
use pile_config::{
Label,
pattern::{GroupPattern, GroupSegment},
};
use smartstring::{LazyCompact, SmartString};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::{Arc, OnceLock},
};
use walkdir::WalkDir;
use crate::{
extract::traits::ExtractState,
source::{DataSource, misc::path_ts_latest},
value::Item,
value::{Item, PileValue},
};
#[derive(Debug)]
@@ -14,96 +22,155 @@ pub struct DirDataSource {
pub name: Label,
pub dir: PathBuf,
pub pattern: GroupPattern,
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
}
impl DirDataSource {
pub fn new(name: &Label, dir: PathBuf, pattern: GroupPattern) -> Self {
Self {
pub async fn new(
name: &Label,
dir: PathBuf,
pattern: GroupPattern,
) -> Result<Arc<Self>, std::io::Error> {
let source = Arc::new(Self {
name: name.clone(),
dir,
pattern,
index: OnceLock::new(),
});
//
// MARK: list paths
//
let mut paths_items = HashSet::new();
let mut paths_grouped_items = HashSet::new();
'entry: for entry in WalkDir::new(&source.dir) {
let entry = match entry {
Err(e) => {
let msg = format!("walkdir error: {e:?}");
let err = e.into_io_error().unwrap_or(std::io::Error::other(msg));
return Err(err);
}
Ok(e) => e,
};
if entry.file_type().is_dir() {
continue;
}
let path = entry.into_path();
let path_str = match path.to_str() {
Some(x) => x,
None => continue 'entry,
};
let groups = resolve_groups(&source.pattern, path_str).await;
paths_grouped_items.extend(groups.into_values());
paths_items.insert(path);
}
//
// MARK: resolve groups
//
let mut index = HashMap::new();
'entry: for path in paths_items.difference(&paths_grouped_items) {
let path_str = match path.to_str() {
Some(x) => x,
None => continue 'entry,
};
let group = resolve_groups(&source.pattern, path_str).await;
let group = group
.into_iter()
.map(|(k, group_path)| {
(
k,
Box::new(Item::File {
source: Arc::clone(&source),
mime: mime_guess::from_path(&group_path).first_or_octet_stream(),
path: group_path.clone(),
group: Arc::new(HashMap::new()),
}),
)
})
.collect::<HashMap<_, _>>();
let item = Item::File {
source: Arc::clone(&source),
mime: mime_guess::from_path(path).first_or_octet_stream(),
path: path.into(),
group: Arc::new(group),
};
index.insert(item.key(), item);
}
source.index.get_or_init(|| index);
Ok(source)
}
}
impl DataSource for Arc<DirDataSource> {
#[expect(clippy::expect_used)]
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
let key = match key.parse::<PathBuf>() {
Ok(x) => self.dir.join(x),
Err(_) => return Ok(None),
};
if !key.is_file() {
return Ok(None);
}
return Ok(Some(Item::File {
source: Arc::clone(self),
mime: mime_guess::from_path(&key).first_or_octet_stream(),
path: key.clone(),
group: todo!(),
}));
return Ok(self
.index
.get()
.expect("index should be initialized")
.get(key)
.cloned());
}
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let source = Arc::clone(self);
let dir = self.dir.clone();
tokio::task::spawn_blocking(move || {
for entry in WalkDir::new(dir) {
let entry = match entry {
Err(e) => {
let msg = format!("walkdir error: {e:?}");
let err = e.into_io_error().unwrap_or(std::io::Error::other(msg));
if tx.blocking_send(Err(err)).is_err() {
return;
}
continue;
}
Ok(e) => e,
};
if entry.file_type().is_dir() {
continue;
}
let path = entry.into_path();
let item = match path.extension().and_then(|x| x.to_str()) {
None => continue,
Some(_) => Item::File {
source: Arc::clone(&source),
mime: mime_guess::from_path(&path).first_or_octet_stream(),
path: path.clone(),
group: todo!(),
},
};
if tx.blocking_send(Ok(item)).is_err() {
return;
}
}
});
ReceiverStream::new(rx)
#[expect(clippy::expect_used)]
fn iter(&self) -> impl Iterator<Item = &Item> {
self.index
.get()
.expect("index should be initialized")
.values()
}
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
let mut ts: Option<DateTime<Utc>> = None;
if !self.dir.exists() {
return Ok(None);
}
let new = path_ts_latest(&self.dir)?;
match (ts, new) {
(_, None) => {}
(None, Some(new)) => ts = Some(new),
(Some(old), Some(new)) => ts = Some(old.max(new)),
};
return Ok(ts);
path_ts_latest(&self.dir)
}
}
async fn resolve_groups(pattern: &GroupPattern, path_str: &str) -> HashMap<Label, PathBuf> {
let state = ExtractState { ignore_mime: false };
let mut group = HashMap::new();
'pattern: for (l, pat) in &pattern.pattern {
let item = PileValue::String(Arc::new(path_str.into()));
let mut target = String::new();
for p in pat {
match p {
GroupSegment::Literal(x) => target.push_str(x),
GroupSegment::Path(op) => {
let res = match item.query(&state, op).await {
Ok(Some(x)) => x,
_ => continue 'pattern,
};
let res = match res.as_str() {
Some(x) => x,
None => continue 'pattern,
};
target.push_str(res);
}
}
}
let group_path: PathBuf = match target.parse() {
Ok(x) => x,
Err(_) => continue 'pattern,
};
if !group_path.exists() {
continue;
}
group.insert(l.clone(), group_path);
}
return group;
}

View File

@@ -6,9 +6,6 @@ pub use s3::*;
pub mod misc;
use chrono::{DateTime, Utc};
use tokio_stream::wrappers::ReceiverStream;
/// A read-only set of [Item]s.
pub trait DataSource {
/// Get an item from this datasource
@@ -18,10 +15,10 @@ pub trait DataSource {
) -> impl Future<Output = Result<Option<crate::value::Item>, std::io::Error>> + Send;
/// Iterate over all items in this source in an arbitrary order
fn iter(&self) -> ReceiverStream<Result<crate::value::Item, std::io::Error>>;
fn iter(&self) -> impl Iterator<Item = &crate::value::Item>;
/// Return the time of the latest change to the data in this source
fn latest_change(
&self,
) -> impl Future<Output = Result<Option<DateTime<Utc>>, std::io::Error>> + Send;
) -> impl Future<Output = Result<Option<chrono::DateTime<chrono::Utc>>, std::io::Error>> + Send;
}

View File

@@ -1,11 +1,20 @@
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use chrono::{DateTime, Utc};
use pile_config::{Label, S3Credentials, pattern::GroupPattern};
use pile_config::{
Label, S3Credentials,
pattern::{GroupPattern, GroupSegment},
};
use smartstring::{LazyCompact, SmartString};
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, OnceLock},
};
use crate::{source::DataSource, value::Item};
use crate::{
extract::traits::ExtractState,
source::DataSource,
value::{Item, PileValue},
};
#[derive(Debug)]
pub struct S3DataSource {
@@ -14,10 +23,11 @@ pub struct S3DataSource {
pub prefix: Option<SmartString<LazyCompact>>,
pub client: Arc<aws_sdk_s3::Client>,
pub pattern: GroupPattern,
pub index: OnceLock<HashMap<SmartString<LazyCompact>, Item>>,
}
impl S3DataSource {
pub fn new(
pub async fn new(
name: &Label,
bucket: String,
prefix: Option<String>,
@@ -25,7 +35,7 @@ impl S3DataSource {
region: String,
credentials: &S3Credentials,
pattern: GroupPattern,
) -> Result<Self, std::io::Error> {
) -> Result<Arc<Self>, std::io::Error> {
let client = {
let creds = Credentials::new(
&credentials.access_key_id,
@@ -47,151 +57,118 @@ impl S3DataSource {
aws_sdk_s3::Client::from_conf(s3_config.build())
};
Ok(Self {
let source = Arc::new(Self {
name: name.clone(),
bucket: bucket.into(),
prefix: prefix.map(|x| x.into()),
client: Arc::new(client),
pattern,
})
}
index: OnceLock::new(),
});
async fn find_sidecar_key(&self, key: &str) -> Option<SmartString<LazyCompact>> {
// First try {key}.toml
let full_toml = format!("{key}.toml");
if self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(&full_toml)
.send()
.await
.is_ok()
{
return Some(full_toml.into());
}
//
// MARK: list keys
//
// Then try {key-with-extension-stripped}.toml
let stripped = std::path::Path::new(key).with_extension("toml");
if let Some(stripped_str) = stripped.to_str()
&& stripped_str != full_toml.as_str()
&& self
let mut all_keys: HashSet<SmartString<LazyCompact>> = HashSet::new();
let mut continuation_token: Option<String> = None;
loop {
let mut req = source
.client
.head_object()
.bucket(self.bucket.as_str())
.key(stripped_str)
.send()
.await
.is_ok()
{
return Some(stripped_str.into());
.list_objects_v2()
.bucket(source.bucket.as_str());
if let Some(prefix) = &source.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = req.send().await.map_err(std::io::Error::other)?;
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
let Some(full_key) = obj.key() else { continue };
let key = strip_prefix(full_key, source.prefix.as_deref());
all_keys.insert(key.into());
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
None
}
//
// MARK: resolve groups
//
async fn make_item(self: &Arc<Self>, key: impl Into<SmartString<LazyCompact>>) -> Item {
let key: SmartString<LazyCompact> = key.into();
let object_path = match &self.prefix {
Some(x) => format!("{x}/{key}").into(),
None => key.clone(),
};
let mime = mime_guess::from_path(object_path.as_str()).first_or_octet_stream();
Item::S3 {
source: Arc::clone(self),
mime,
key,
group: todo!(),
let mut keys_grouped: HashSet<SmartString<LazyCompact>> = HashSet::new();
for key in &all_keys {
let groups = resolve_groups(&source.pattern, key).await;
for group_key in groups.into_values() {
if all_keys.contains(&group_key) {
keys_grouped.insert(group_key);
}
}
}
let mut index = HashMap::new();
for key in all_keys.difference(&keys_grouped) {
let groups = resolve_groups(&source.pattern, key).await;
let group = groups
.into_iter()
.filter(|(_, gk)| all_keys.contains(gk))
.map(|(label, gk)| {
(
label,
Box::new(Item::S3 {
source: Arc::clone(&source),
mime: mime_guess::from_path(gk.as_str()).first_or_octet_stream(),
key: gk,
group: Arc::new(HashMap::new()),
}),
)
})
.collect::<HashMap<_, _>>();
let item = Item::S3 {
source: Arc::clone(&source),
mime: mime_guess::from_path(key.as_str()).first_or_octet_stream(),
key: key.clone(),
group: Arc::new(group),
};
index.insert(item.key(), item);
}
source.index.get_or_init(|| index);
Ok(source)
}
}
impl DataSource for Arc<S3DataSource> {
#[expect(clippy::expect_used)]
async fn get(&self, key: &str) -> Result<Option<Item>, std::io::Error> {
let key: SmartString<LazyCompact> = key.into();
let key = match &self.prefix {
Some(x) => format!("{x}/{key}").into(),
None => key,
};
let result = self
.client
.head_object()
.bucket(self.bucket.as_str())
.key(key.as_str())
.send()
.await;
match result {
Err(sdk_err) => {
let not_found = sdk_err
.as_service_error()
.map(|e| e.is_not_found())
.unwrap_or(false);
if not_found {
return Ok(None);
}
Err(std::io::Error::other(sdk_err))
}
Ok(_) => Ok(Some(self.make_item(key).await)),
}
return Ok(self
.index
.get()
.expect("index should be initialized")
.get(key)
.cloned());
}
fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let source = Arc::clone(self);
tokio::spawn(async move {
let mut continuation_token: Option<String> = None;
loop {
let mut req = source
.client
.list_objects_v2()
.bucket(source.bucket.as_str());
if let Some(prefix) = &source.prefix {
req = req.prefix(prefix.as_str());
}
if let Some(token) = continuation_token {
req = req.continuation_token(token);
}
let resp = match req.send().await {
Err(e) => {
let _ = tx.send(Err(std::io::Error::other(e))).await;
break;
}
Ok(resp) => resp,
};
let next_token = resp.next_continuation_token().map(ToOwned::to_owned);
let is_truncated = resp.is_truncated().unwrap_or(false);
for obj in resp.contents() {
let key = match obj.key() {
Some(k) => k.to_owned(),
None => continue,
};
let item = source.make_item(key).await;
if tx.send(Ok(item)).await.is_err() {
return;
}
}
if !is_truncated {
break;
}
continuation_token = next_token;
}
});
ReceiverStream::new(rx)
#[expect(clippy::expect_used)]
fn iter(&self) -> impl Iterator<Item = &Item> {
self.index
.get()
.expect("index should be initialized")
.values()
}
async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
@@ -241,3 +218,51 @@ impl DataSource for Arc<S3DataSource> {
Ok(ts)
}
}
fn strip_prefix<'a>(key: &'a str, prefix: Option<&str>) -> &'a str {
match prefix {
None => key,
Some(p) => {
let with_slash = if p.ends_with('/') {
key.strip_prefix(p)
} else {
key.strip_prefix(&format!("{p}/"))
};
with_slash.unwrap_or(key)
}
}
}
async fn resolve_groups(
pattern: &GroupPattern,
key: &str,
) -> HashMap<Label, SmartString<LazyCompact>> {
let state = ExtractState { ignore_mime: false };
let mut group = HashMap::new();
'pattern: for (l, pat) in &pattern.pattern {
let item = PileValue::String(Arc::new(key.into()));
let mut target = String::new();
for p in pat {
match p {
GroupSegment::Literal(x) => target.push_str(x),
GroupSegment::Path(op) => {
let res = match item.query(&state, op).await {
Ok(Some(x)) => x,
_ => continue 'pattern,
};
let res = match res.as_str() {
Some(x) => x,
None => continue 'pattern,
};
target.push_str(res);
}
}
}
group.insert(l.clone(), target.into());
}
return group;
}

View File

@@ -5,7 +5,7 @@ use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
use crate::{
source::{DirDataSource, S3DataSource},
value::{ItemReader, S3Reader},
value::{ItemReader, S3Reader, SyncReadBridge},
};
//
@@ -72,22 +72,26 @@ impl Item {
#[expect(clippy::expect_used)]
pub fn key(&self) -> SmartString<LazyCompact> {
match self {
Self::File { path, .. } => path.to_str().expect("path is not utf-8").into(),
Self::File { source, path, .. } => path
.strip_prefix(&source.dir)
.expect("item must be inside source")
.to_str()
.expect("path is not utf-8")
.into(),
Self::S3 { key, .. } => key.clone(),
}
}
pub fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
match self {
Self::File { path, .. } => {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
std::io::copy(&mut file, &mut hasher)?;
return Ok(hasher.finalize());
}
Self::S3 { .. } => todo!(),
}
pub async fn hash(&self) -> Result<blake3::Hash, std::io::Error> {
let read = self.read().await?;
let mut read = SyncReadBridge::new_current(read);
let out = tokio::task::spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut read, &mut hasher)?;
return Ok::<_, std::io::Error>(hasher.finalize());
})
.await??;
return Ok(out);
}
pub fn mime(&self) -> &Mime {

View File

@@ -17,7 +17,6 @@ aws-sdk-s3 = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
clap = { workspace = true }
#clap_complete = { workspace = true }
serde = { workspace = true }

View File

@@ -44,6 +44,7 @@ impl CliCmd for CheckCommand {
}
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let ts_fts = ds.ts_fts().context("while determining fts age")?;

View File

@@ -6,7 +6,6 @@ use pile_value::{extract::traits::ExtractState, value::PileValue};
use serde_json::{Map, Value};
use std::{path::PathBuf, time::Instant};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::info;
use crate::{CliCmd, GlobalContext};
@@ -55,6 +54,7 @@ impl CliCmd for FieldsCommand {
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let start = Instant::now();
@@ -88,11 +88,10 @@ impl CliCmd for FieldsCommand {
return Err(CancelableTaskError::Cancelled);
}
match stream.next().await {
match stream.next() {
None => break,
Some(item_result) => {
let item =
item_result.with_context(|| format!("while reading source {name}"))?;
Some(item) => {
let item = item.clone();
let name = name.clone();
let state = state.clone();
join_set.spawn(async move {

View File

@@ -25,6 +25,7 @@ impl CliCmd for IndexCommand {
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };

View File

@@ -6,7 +6,6 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::{extract::traits::ExtractState, value::PileValue};
use std::{path::PathBuf, str::FromStr, sync::Arc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::info;
use crate::{CliCmd, GlobalContext};
@@ -14,6 +13,7 @@ use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ListCommand {
/// Path to query, e.g. $.flac.artist
#[clap(default_value = "$")]
path: String,
/// Only print items where the value is null (inverse of default)
@@ -45,6 +45,7 @@ impl CliCmd for ListCommand {
let path = Arc::new(path);
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let jobs = self.jobs.max(1);
@@ -70,11 +71,10 @@ impl CliCmd for ListCommand {
return Err(CancelableTaskError::Cancelled);
}
match stream.next().await {
match stream.next() {
None => break,
Some(item_result) => {
let item =
item_result.with_context(|| format!("while reading source {name}"))?;
Some(item) => {
let item = item.clone();
let source_name = name.to_string();
let key = item.key().to_string();
let path = path.clone();

View File

@@ -41,6 +41,7 @@ impl CliCmd for LookupCommand {
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };

View File

@@ -37,6 +37,7 @@ impl CliCmd for ProbeCommand {
.ok_or_else(|| anyhow::anyhow!("invalid source name {:?}", self.source))?;
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };

View File

@@ -34,6 +34,7 @@ impl CliCmd for ServeCommand {
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let state = ExtractState { ignore_mime: false };

View File

@@ -8,7 +8,6 @@ use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
use pile_value::source::{DataSource, DirDataSource, S3DataSource};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::info;
use crate::{CliCmd, GlobalContext, cli::progress_big};
@@ -52,6 +51,7 @@ impl CliCmd for UploadCommand {
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let ds = Datasets::open(&self.config)
.await
.with_context(|| format!("while opening dataset for {}", self.config.display()))?;
let dir_label = Label::new(&self.dir_source)
@@ -104,20 +104,12 @@ impl CliCmd for UploadCommand {
}
// Count total files before uploading so we can show accurate progress
let total = {
let mut count = 0u64;
let mut count_stream = Arc::clone(&dir_ds).iter();
while let Some(result) = count_stream.next().await {
result.context("while counting filesystem source")?;
count += 1;
}
count
};
let total = dir_ds.iter().count() as u64;
// Walk filesystem source and upload files in parallel
let jobs = self.jobs.max(1);
let mut uploaded: u64 = 0;
let mut stream = Arc::clone(&dir_ds).iter();
let mut stream = dir_ds.iter();
let mut join_set: JoinSet<Result<String, anyhow::Error>> = JoinSet::new();
let pb = ctx.mp.add(ProgressBar::new(total));
@@ -146,24 +138,13 @@ impl CliCmd for UploadCommand {
return Err(CancelableTaskError::Cancelled);
}
let item = match stream.next().await {
let item = match stream.next() {
None => break,
Some(Err(e)) => {
return Err(anyhow::Error::from(e)
.context("while iterating filesystem source")
.into());
}
Some(Ok(item)) => item,
Some(item) => item.clone(),
};
let item_path = PathBuf::from(item.key().as_str());
let relative = item_path.strip_prefix(&dir_ds.dir).with_context(|| {
format!("path '{}' is not under source root", item_path.display())
})?;
let relative_str = relative
.to_str()
.ok_or_else(|| anyhow::anyhow!("non-UTF-8 path: {}", item_path.display()))?
.to_owned();
let relative_str = item.key().to_string();
let item_path = dir_ds.dir.join(&relative_str);
let key = format!("{full_prefix}/{relative_str}");
let mime = item.mime().to_string();