use chrono::{DateTime, Utc}; use pile_config::{ ConfigToml, DatasetConfig, Label, Source, objectpath::ObjectPath, pattern::GroupPattern, }; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ extract::traits::ExtractState, source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest, string_to_key}, value::{Item, PileValue}, }; use serde_json::Value; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; use tokio::task::JoinSet; use tracing::{debug, info, trace, warn}; use crate::index::{DbFtsIndex, FtsLookupResult}; #[derive(Debug, Error)] pub enum DatasetError { #[error("{0}")] IoError(#[from] std::io::Error), #[error("{0}")] TantivyError(#[from] TantivyError), #[error("this dataset does not have an fts index")] NoFtsIndex, } // // MARK: Dataset enum // /// An opened data source — either a local filesystem directory or an S3 bucket. pub enum Dataset { Dir(Arc), S3(Arc), } impl Dataset { pub fn len(&self) -> usize { match self { Self::Dir(ds) => ds.len(), Self::S3(ds) => ds.len(), } } pub async fn get(&self, key: &str) -> Option { match self { Self::Dir(ds) => ds.get(key).await.ok().flatten(), Self::S3(ds) => ds.get(key).await.ok().flatten(), } } pub fn iter(&self) -> Box + Send + '_> { match self { Self::Dir(ds) => Box::new(ds.iter()), Self::S3(ds) => Box::new(ds.iter()), } } pub async fn latest_change(&self) -> Result>, std::io::Error> { match self { Self::Dir(ds) => ds.latest_change().await, Self::S3(ds) => ds.latest_change().await, } } } // // MARK: Datasets collection // /// An opened dataset: config, working directory, and all opened sources. pub struct Datasets { pub path_config: Option, pub path_parent: PathBuf, pub path_workdir: Option, pub config: ConfigToml, pub sources: HashMap, } impl Datasets { #[expect(clippy::unwrap_used)] pub fn virt_source() -> Label { Label::new("virtual-source").unwrap() } #[expect(clippy::unwrap_used)] pub async fn virt(parent: impl Into) -> Result { let path_parent = parent.into(); let config = ConfigToml { dataset: DatasetConfig { name: Label::new("virtual-dataset").unwrap(), working_dir: None, source: [( Self::virt_source(), Source::Filesystem { enabled: true, path: path_parent.clone(), pattern: GroupPattern::default(), }, )] .into_iter() .collect(), }, schema: HashMap::new(), fts: None, }; let mut sources = HashMap::new(); for (label, source) in &config.dataset.source { match source { Source::Filesystem { enabled, path, pattern, } => { if !enabled { continue; } sources.insert( label.clone(), Dataset::Dir( DirDataSource::new(label, path_parent.join(path), pattern.clone()) .await?, ), ); } Source::S3 { enabled, bucket, prefix, endpoint, region, credentials, pattern, encryption_key, } => { if !enabled { continue; } let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); match S3DataSource::new( label, bucket.clone(), prefix.clone(), endpoint.clone(), region.clone(), credentials, pattern.clone(), encryption_key, ) .await { Ok(ds) => { sources.insert(label.clone(), Dataset::S3(ds)); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); } } } } } return Ok(Self { path_config: None, path_workdir: None, path_parent, config, sources, }); } pub async fn open(config: impl Into) -> Result { let path_config = config.into(); let path_parent = path_config .parent() .ok_or(std::io::Error::new( ErrorKind::NotADirectory, format!("Config file {} has no parent", path_config.display()), ))? .to_owned(); let config = { let config = std::fs::read_to_string(&path_config)?; let config: Result = toml::from_str(&config); match config { Ok(config) => { trace!(message = "Loaded config", ?config); config } Err(error) => { return Err(std::io::Error::new( ErrorKind::InvalidData, format!("{} is invalid:\n{error}", path_config.display()), )); } } }; let path_workdir = config .dataset .working_dir .clone() .unwrap_or(path_parent.join(".pile")) .join(config.dataset.name.as_str()); let mut sources = HashMap::new(); for (label, source) in &config.dataset.source { match source { Source::Filesystem { enabled, path, pattern, } => { if !enabled { continue; } sources.insert( label.clone(), Dataset::Dir( DirDataSource::new(label, path_parent.join(path), pattern.clone()) .await?, ), ); } Source::S3 { enabled, bucket, prefix, endpoint, region, credentials, pattern, encryption_key, } => { if !enabled { continue; } let encryption_key = encryption_key.as_ref().map(|x| string_to_key(x)); match S3DataSource::new( label, bucket.clone(), prefix.clone(), endpoint.clone(), region.clone(), credentials, pattern.clone(), encryption_key, ) .await { Ok(ds) => { sources.insert(label.clone(), Dataset::S3(ds)); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); } } } } } return Ok(Self { path_config: Some(path_config), path_workdir: Some(path_workdir), path_parent, config, sources, }); } // // MARK: get // pub async fn get(&self, source: &Label, key: &str) -> Option { self.sources.get(source)?.get(key).await } /// Extract a field from an item by object path. /// Returns `None` if the item or field is not found. pub async fn get_field( &self, state: &ExtractState, source: &Label, key: &str, path: &ObjectPath, ) -> Result, std::io::Error> { let Some(item) = self.get(source, key).await else { return Ok(None); }; let item = PileValue::Item(item); let Some(value) = item.query(state, path).await? else { return Ok(None); }; Ok(Some(value.to_json(state).await?)) } // // MARK: fts // /// Refresh this dataset's fts index. pub async fn fts_refresh( &self, state: &ExtractState, _threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { let workdir = match self.path_workdir.as_ref() { Some(x) => x, None => { warn!("Skipping fts_refresh, no workdir"); return Ok(()); } }; let fts_tmp_dir = workdir.join(".tmp-fts"); let fts_dir = workdir.join("fts"); if fts_tmp_dir.is_dir() { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; } std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; let db_index = Arc::new(DbFtsIndex::new(&fts_tmp_dir, &self.config)); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?); let mut index_writer: IndexWriter = index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?; let mut total = 0u64; let mut logged_at = Instant::now(); const CONCURRENCY: usize = 32; let mut join_set = JoinSet::new(); let mut process = |key, result: Result<_, _>| -> Result<(), DatasetError> { match result { Ok(Some(doc)) => { index_writer.add_document(doc).map_err(DatasetError::from)?; total += 1; if logged_at.elapsed().as_secs() >= 5 { debug!("Indexed {total} documents"); logged_at = Instant::now(); } } Ok(None) => warn!("Skipping {key:?}, document is empty"), Err(err) => warn!("Could not read {key:?}, skipping. {err}"), } Ok(()) }; for (name, dataset) in &self.sources { info!("Loading source {name}"); let stream = dataset.iter(); for item in stream { if let Some(flag) = &flag && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } let db = Arc::clone(&db_index); let state = state.clone(); let item = item.clone(); join_set.spawn(async move { let key = item.key(); let result = db.entry_to_document(&state, &item).await; (key, result) }); while join_set.len() >= CONCURRENCY { match join_set.join_next().await { Some(Ok((key, result))) => process(key, result)?, Some(Err(e)) => warn!("Indexing task panicked: {e}"), None => break, } } } } while let Some(join_result) = join_set.join_next().await { match join_result { Ok((key, result)) => process(key, result)?, Err(e) => warn!("Indexing task panicked: {e}"), } } if let Some(flag) = flag.as_ref() && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } info!("Committing {total} documents"); index_writer.commit().map_err(DatasetError::from)?; if fts_dir.is_dir() { warn!("Removing existing index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; } std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; return Ok(()); } pub fn fts_lookup( &self, query: &str, top_n: usize, ) -> Result, DatasetError> { let workdir = match self.path_workdir.as_ref() { Some(x) => x, None => { warn!("Skipping fts_lookup, no workdir"); return Ok(Vec::new()); } }; let fts_dir = workdir.join("fts"); if !fts_dir.exists() { return Err(DatasetError::NoFtsIndex); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), ) .into()); } let db_index = DbFtsIndex::new(&fts_dir, &self.config); let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?; return Ok(results); } /// Time at which fts was created pub fn ts_fts(&self) -> Result>, std::io::Error> { let workdir = match self.path_workdir.as_ref() { Some(x) => x, None => return Ok(None), }; let fts_dir = workdir.join("fts"); if !fts_dir.exists() { return Ok(None); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), )); } return path_ts_earliest(&fts_dir); } /// Time at which data was last modified pub async fn ts_data(&self) -> Result>, std::io::Error> { let mut ts: Option> = None; for dataset in self.sources.values() { match (ts, dataset.latest_change().await?) { (_, None) => continue, (None, Some(new)) => ts = Some(new), (Some(old), Some(new)) => ts = Some(old.max(new)), } } return Ok(ts); } /// Returns true if we do not have an fts index, /// or if our fts index is older than our data. pub async fn needs_fts(&self) -> Result { let start = Instant::now(); let ts_fts = self.ts_fts()?; let ts_data = self.ts_data().await?; let result = match (ts_fts, ts_data) { (None, Some(_)) => true, (None, None) | (Some(_), None) => { warn!("Could not determine data age"); false } (Some(ts_fts), Some(ts_data)) => ts_data > ts_fts, }; debug!( message = "Ran needs_fts", time_ms = start.elapsed().as_millis(), ?result ); return Ok(result); } }