Extractor refactor, S3 support
This commit is contained in:
@@ -1,30 +1,17 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use pile_config::{ConfigToml, Label, Source};
|
||||
use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError};
|
||||
use rayon::{
|
||||
ThreadPoolBuilder,
|
||||
iter::{IntoParallelIterator, ParallelIterator},
|
||||
};
|
||||
use std::{
|
||||
io::ErrorKind,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
mpsc::Receiver,
|
||||
},
|
||||
thread::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant};
|
||||
use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs};
|
||||
use thiserror::Error;
|
||||
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use crate::{
|
||||
DataSource, FileItem,
|
||||
DataSource, Item,
|
||||
index::{DbFtsIndex, FtsLookupResult},
|
||||
path_ts_earliest,
|
||||
source::DirDataSource,
|
||||
source::{DirDataSource, S3DataSource},
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -39,15 +26,54 @@ pub enum DatasetError {
|
||||
NoFtsIndex,
|
||||
}
|
||||
|
||||
pub struct Dataset {
|
||||
//
|
||||
// MARK: Dataset enum
|
||||
//
|
||||
|
||||
/// An opened data source — either a local filesystem directory or an S3 bucket.
|
||||
pub enum Dataset {
|
||||
Dir(Arc<DirDataSource>),
|
||||
S3(Arc<S3DataSource>),
|
||||
}
|
||||
|
||||
impl Dataset {
|
||||
pub async fn get(&self, key: &str) -> Option<Item> {
|
||||
match self {
|
||||
Self::Dir(ds) => ds.get(key).await.ok().flatten(),
|
||||
Self::S3(ds) => ds.get(key).await.ok().flatten(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> ReceiverStream<Result<Item, std::io::Error>> {
|
||||
match self {
|
||||
Self::Dir(ds) => ds.iter(),
|
||||
Self::S3(ds) => ds.iter(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn latest_change(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
match self {
|
||||
Self::Dir(ds) => ds.latest_change().await,
|
||||
Self::S3(ds) => ds.latest_change().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: Datasets collection
|
||||
//
|
||||
|
||||
/// An opened dataset: config, working directory, and all opened sources.
|
||||
pub struct Datasets {
|
||||
pub path_config: PathBuf,
|
||||
pub path_parent: PathBuf,
|
||||
pub path_workdir: PathBuf,
|
||||
|
||||
pub config: ConfigToml,
|
||||
pub sources: HashMap<Label, Dataset>,
|
||||
}
|
||||
|
||||
impl Dataset {
|
||||
impl Datasets {
|
||||
pub fn open(config: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
|
||||
let path_config = config.into();
|
||||
let path_parent = path_config
|
||||
@@ -84,11 +110,54 @@ impl Dataset {
|
||||
.unwrap_or(path_parent.join(".pile"))
|
||||
.join(config.dataset.name.as_str());
|
||||
|
||||
let mut sources = HashMap::new();
|
||||
for (label, source) in &config.dataset.source {
|
||||
match source {
|
||||
Source::Filesystem { path, sidecars } => {
|
||||
sources.insert(
|
||||
label.clone(),
|
||||
Dataset::Dir(Arc::new(DirDataSource::new(
|
||||
label,
|
||||
path_parent.join(path),
|
||||
*sidecars,
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
Source::S3 {
|
||||
bucket,
|
||||
prefix,
|
||||
endpoint,
|
||||
region,
|
||||
credentials,
|
||||
sidecars,
|
||||
} => {
|
||||
match S3DataSource::new(
|
||||
label,
|
||||
bucket.clone(),
|
||||
prefix.clone(),
|
||||
endpoint.clone(),
|
||||
region.clone(),
|
||||
credentials,
|
||||
*sidecars,
|
||||
) {
|
||||
Ok(ds) => {
|
||||
sources.insert(label.clone(), Dataset::S3(Arc::new(ds)));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Could not open S3 source {label}: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(Self {
|
||||
path_config,
|
||||
path_parent,
|
||||
path_workdir,
|
||||
config,
|
||||
sources,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -96,15 +165,8 @@ impl Dataset {
|
||||
// MARK: get
|
||||
//
|
||||
|
||||
pub fn get(&self, source: &Label, key: &PathBuf) -> Option<FileItem> {
|
||||
let s = self.config.dataset.source.get(source)?;
|
||||
let s = match s {
|
||||
Source::Filesystem { path, sidecars } => {
|
||||
DirDataSource::new(source, path.clone().to_vec(), *sidecars)
|
||||
}
|
||||
};
|
||||
|
||||
s.get(key).ok().flatten()
|
||||
pub async fn get(&self, source: &Label, key: &str) -> Option<Item> {
|
||||
self.sources.get(source)?.get(key).await
|
||||
}
|
||||
|
||||
//
|
||||
@@ -112,9 +174,9 @@ impl Dataset {
|
||||
//
|
||||
|
||||
/// Refresh this dataset's fts index.
|
||||
pub fn fts_refresh(
|
||||
pub async fn fts_refresh(
|
||||
&self,
|
||||
threads: usize,
|
||||
_threads: usize,
|
||||
flag: Option<CancelFlag>,
|
||||
) -> Result<(), CancelableTaskError<DatasetError>> {
|
||||
let fts_tmp_dir = self.path_workdir.join(".tmp-fts");
|
||||
@@ -134,58 +196,40 @@ impl Dataset {
|
||||
let mut index_writer: IndexWriter =
|
||||
index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?;
|
||||
|
||||
let batch_size = 1000;
|
||||
let (_read_task, read_rx) = start_read_task(&self.config, batch_size);
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let write_pool = ThreadPoolBuilder::new()
|
||||
.num_threads(threads.max(1))
|
||||
.thread_name(|x| format!("fts_refresh_thread_{x}"))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let mut total = 0u64;
|
||||
while let Ok(batch) = read_rx.recv() {
|
||||
let batch = batch?;
|
||||
if let Some(flag) = &flag
|
||||
&& flag.is_cancelled()
|
||||
{
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
let mut logged_at = Instant::now();
|
||||
|
||||
for (name, dataset) in &self.sources {
|
||||
info!("Loading source {name}");
|
||||
|
||||
let mut stream = dataset.iter();
|
||||
while let Some(item_result) = stream.next().await {
|
||||
if let Some(flag) = &flag
|
||||
&& flag.is_cancelled()
|
||||
{
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
let item = item_result.map_err(DatasetError::from)?;
|
||||
let key = item.key();
|
||||
|
||||
match db_index.entry_to_document(&item).await {
|
||||
Ok(Some(doc)) => {
|
||||
index_writer.add_document(doc).map_err(DatasetError::from)?;
|
||||
total += 1;
|
||||
if logged_at.elapsed().as_secs() >= 5 {
|
||||
debug!("Indexed {total} documents so far");
|
||||
logged_at = Instant::now();
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("Skipping {key:?}, document is empty");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Could not read {key:?}, skipping. {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let this = AtomicU64::new(0);
|
||||
let start = Instant::now();
|
||||
write_pool
|
||||
.install(|| {
|
||||
batch
|
||||
.into_par_iter()
|
||||
.filter_map(|(key, item)| match db_index.entry_to_document(&item) {
|
||||
Ok(Some(doc)) => Some((key, doc)),
|
||||
Ok(None) => {
|
||||
warn!("Skipping {key:?}, document is empty");
|
||||
None
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Could not read {key:?}, skipping. {err}");
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|(key, doc)| {
|
||||
this.fetch_add(1, Ordering::Relaxed);
|
||||
index_writer
|
||||
.add_document(doc)
|
||||
.map_err(|err| (key, err))
|
||||
.map(|_| ())
|
||||
})
|
||||
.find_first(|x| x.is_err())
|
||||
.unwrap_or(Ok(()))
|
||||
})
|
||||
.map_err(|(_key, err)| DatasetError::from(err))?;
|
||||
|
||||
let this = this.load(Ordering::Relaxed);
|
||||
total += this;
|
||||
let time_ms = start.elapsed().as_millis();
|
||||
debug!("Added a batch of {this} in {time_ms} ms ({total} total)");
|
||||
}
|
||||
|
||||
if let Some(flag) = flag.as_ref()
|
||||
@@ -194,7 +238,7 @@ impl Dataset {
|
||||
return Err(CancelableTaskError::Cancelled);
|
||||
}
|
||||
|
||||
info!("Committing index");
|
||||
info!("Committing {total} documents");
|
||||
index_writer.commit().map_err(DatasetError::from)?;
|
||||
|
||||
if fts_dir.is_dir() {
|
||||
@@ -247,19 +291,14 @@ impl Dataset {
|
||||
}
|
||||
|
||||
/// Time at which data was last modified
|
||||
pub fn ts_data(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
pub async fn ts_data(&self) -> Result<Option<DateTime<Utc>>, std::io::Error> {
|
||||
let mut ts: Option<DateTime<Utc>> = None;
|
||||
|
||||
for (label, source) in &self.config.dataset.source {
|
||||
match source {
|
||||
Source::Filesystem { path, sidecars } => {
|
||||
let s = DirDataSource::new(label, path.clone().to_vec(), *sidecars);
|
||||
match (ts, s.latest_change()?) {
|
||||
(_, None) => continue,
|
||||
(None, Some(new)) => ts = Some(new),
|
||||
(Some(old), Some(new)) => ts = Some(old.max(new)),
|
||||
};
|
||||
}
|
||||
for dataset in self.sources.values() {
|
||||
match (ts, dataset.latest_change().await?) {
|
||||
(_, None) => continue,
|
||||
(None, Some(new)) => ts = Some(new),
|
||||
(Some(old), Some(new)) => ts = Some(old.max(new)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,10 +307,10 @@ impl Dataset {
|
||||
|
||||
/// Returns true if we do not have an fts index,
|
||||
/// or if our fts index is older than our data.
|
||||
pub fn needs_fts(&self) -> Result<bool, std::io::Error> {
|
||||
pub async fn needs_fts(&self) -> Result<bool, std::io::Error> {
|
||||
let start = Instant::now();
|
||||
let ts_fts = self.ts_fts()?;
|
||||
let ts_data = self.ts_data()?;
|
||||
let ts_data = self.ts_data().await?;
|
||||
|
||||
let result = match (ts_fts, ts_data) {
|
||||
(None, Some(_)) => true,
|
||||
@@ -292,59 +331,3 @@ impl Dataset {
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// MARK: read_task
|
||||
//
|
||||
|
||||
fn start_read_task(
|
||||
config: &ConfigToml,
|
||||
batch_size: usize,
|
||||
) -> (
|
||||
JoinHandle<()>,
|
||||
Receiver<Result<Vec<(PathBuf, FileItem)>, DatasetError>>,
|
||||
) {
|
||||
let config = config.clone();
|
||||
let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2);
|
||||
|
||||
let read_task = std::thread::spawn(move || {
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
for (name, source) in &config.dataset.source {
|
||||
info!("Loading source {name}");
|
||||
|
||||
match source {
|
||||
Source::Filesystem { path, sidecars } => {
|
||||
let source = DirDataSource::new(name, path.clone().to_vec(), *sidecars);
|
||||
for i in source.iter() {
|
||||
match i {
|
||||
Ok(x) => batch.push(x),
|
||||
Err(err) => {
|
||||
let err = Err(DatasetError::from(err));
|
||||
let _ = read_tx.send(err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if batch.len() >= batch_size {
|
||||
let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size));
|
||||
|
||||
match read_tx.send(Ok(b)) {
|
||||
Ok(()) => {}
|
||||
Err(_) => return,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !batch.is_empty() {
|
||||
match read_tx.send(Ok(batch)) {
|
||||
Ok(()) => {}
|
||||
Err(_) => return,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
return (read_task, read_rx);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user