use chrono::{DateTime, Utc}; use pile_config::{ConfigToml, Label, Source, objectpath::ObjectPath}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use pile_value::{ source::{DataSource, DirDataSource, S3DataSource, misc::path_ts_earliest}, value::{Item, PileValue}, }; use serde_json::Value; use std::{collections::HashMap, io::ErrorKind, path::PathBuf, sync::Arc, time::Instant}; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; use tokio::task::JoinSet; use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::{debug, info, trace, warn}; use crate::index::{DbFtsIndex, FtsLookupResult}; #[derive(Debug, Error)] pub enum DatasetError { #[error("{0}")] IoError(#[from] std::io::Error), #[error("{0}")] TantivyError(#[from] TantivyError), #[error("this dataset does not have an fts index")] NoFtsIndex, } // // MARK: Dataset enum // /// An opened data source — either a local filesystem directory or an S3 bucket. pub enum Dataset { Dir(Arc), S3(Arc), } impl Dataset { pub async fn get(&self, key: &str) -> Option { match self { Self::Dir(ds) => ds.get(key).await.ok().flatten(), Self::S3(ds) => ds.get(key).await.ok().flatten(), } } pub fn iter(&self) -> ReceiverStream> { match self { Self::Dir(ds) => ds.iter(), Self::S3(ds) => ds.iter(), } } pub async fn latest_change(&self) -> Result>, std::io::Error> { match self { Self::Dir(ds) => ds.latest_change().await, Self::S3(ds) => ds.latest_change().await, } } } // // MARK: Datasets collection // /// An opened dataset: config, working directory, and all opened sources. pub struct Datasets { pub path_config: PathBuf, pub path_parent: PathBuf, pub path_workdir: PathBuf, pub config: ConfigToml, pub sources: HashMap, } impl Datasets { pub fn open(config: impl Into) -> Result { let path_config = config.into(); let path_parent = path_config .parent() .ok_or(std::io::Error::new( ErrorKind::NotADirectory, format!("Config file {} has no parent", path_config.display()), ))? .to_owned(); let config = { let config = std::fs::read_to_string(&path_config)?; let config: Result = toml::from_str(&config); match config { Ok(config) => { trace!(message = "Loaded config", ?config); config } Err(error) => { return Err(std::io::Error::new( ErrorKind::InvalidData, format!("{} is invalid:\n{error}", path_config.display()), )); } } }; let path_workdir = config .dataset .working_dir .clone() .unwrap_or(path_parent.join(".pile")) .join(config.dataset.name.as_str()); let mut sources = HashMap::new(); for (label, source) in &config.dataset.source { match source { Source::Filesystem { path, sidecars } => { sources.insert( label.clone(), Dataset::Dir(Arc::new(DirDataSource::new( label, path_parent.join(path), *sidecars, ))), ); } Source::S3 { bucket, prefix, endpoint, region, credentials, sidecars, } => { match S3DataSource::new( label, bucket.clone(), prefix.clone(), endpoint.clone(), region.clone(), credentials, *sidecars, ) { Ok(ds) => { sources.insert(label.clone(), Dataset::S3(Arc::new(ds))); } Err(err) => { warn!("Could not open S3 source {label}: {err}"); } } } } } return Ok(Self { path_config, path_parent, path_workdir, config, sources, }); } // // MARK: get // pub async fn get(&self, source: &Label, key: &str) -> Option { self.sources.get(source)?.get(key).await } /// Extract a field from an item by object path. /// Returns `None` if the item or field is not found. pub async fn get_field( &self, source: &Label, key: &str, path: &ObjectPath, ) -> Result, std::io::Error> { let Some(item) = self.get(source, key).await else { return Ok(None); }; let item = PileValue::Item(item); let Some(value) = item.query(path).await? else { return Ok(None); }; Ok(Some(value.to_json().await?)) } // // MARK: fts // /// Refresh this dataset's fts index. pub async fn fts_refresh( &self, _threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); let fts_dir = self.path_workdir.join("fts"); if fts_tmp_dir.is_dir() { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; } std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; let db_index = Arc::new(DbFtsIndex::new(&fts_tmp_dir, &self.config)); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?); let mut index_writer: IndexWriter = index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?; let mut total = 0u64; let mut logged_at = Instant::now(); const CONCURRENCY: usize = 32; let mut join_set = JoinSet::new(); let mut process = |key, result: Result<_, _>| -> Result<(), DatasetError> { match result { Ok(Some(doc)) => { index_writer.add_document(doc).map_err(DatasetError::from)?; total += 1; if logged_at.elapsed().as_secs() >= 5 { debug!("Indexed {total} documents so far"); logged_at = Instant::now(); } } Ok(None) => warn!("Skipping {key:?}, document is empty"), Err(err) => warn!("Could not read {key:?}, skipping. {err}"), } Ok(()) }; for (name, dataset) in &self.sources { info!("Loading source {name}"); let mut stream = dataset.iter(); while let Some(item_result) = stream.next().await { if let Some(flag) = &flag && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } let item = item_result.map_err(DatasetError::from)?; let db = Arc::clone(&db_index); join_set.spawn(async move { let key = item.key(); let result = db.entry_to_document(&item).await; (key, result) }); while join_set.len() >= CONCURRENCY { match join_set.join_next().await { Some(Ok((key, result))) => process(key, result)?, Some(Err(e)) => warn!("Indexing task panicked: {e}"), None => break, } } } } while let Some(join_result) = join_set.join_next().await { match join_result { Ok((key, result)) => process(key, result)?, Err(e) => warn!("Indexing task panicked: {e}"), } } if let Some(flag) = flag.as_ref() && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } info!("Committing {total} documents"); index_writer.commit().map_err(DatasetError::from)?; if fts_dir.is_dir() { warn!("Removing existing index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; } std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; return Ok(()); } pub fn fts_lookup( &self, query: &str, top_n: usize, ) -> Result, DatasetError> { let fts_dir = self.path_workdir.join("fts"); if !fts_dir.exists() { return Err(DatasetError::NoFtsIndex); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), ) .into()); } let db_index = DbFtsIndex::new(&fts_dir, &self.config); let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?; return Ok(results); } /// Time at which fts was created pub fn ts_fts(&self) -> Result>, std::io::Error> { let fts_dir = self.path_workdir.join("fts"); if !fts_dir.exists() { return Ok(None); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), )); } return path_ts_earliest(&fts_dir); } /// Time at which data was last modified pub async fn ts_data(&self) -> Result>, std::io::Error> { let mut ts: Option> = None; for dataset in self.sources.values() { match (ts, dataset.latest_change().await?) { (_, None) => continue, (None, Some(new)) => ts = Some(new), (Some(old), Some(new)) => ts = Some(old.max(new)), } } return Ok(ts); } /// Returns true if we do not have an fts index, /// or if our fts index is older than our data. pub async fn needs_fts(&self) -> Result { let start = Instant::now(); let ts_fts = self.ts_fts()?; let ts_data = self.ts_data().await?; let result = match (ts_fts, ts_data) { (None, Some(_)) => true, (None, None) | (Some(_), None) => { warn!("Could not determine data age"); false } (Some(ts_fts), Some(ts_data)) => ts_data > ts_fts, }; debug!( message = "Ran needs_fts", time_ms = start.elapsed().as_millis(), ?result ); return Ok(result); } }