use chrono::{DateTime, Utc}; use pile_config::{ConfigToml, Label, Source}; use pile_toolbox::cancelabletask::{CancelFlag, CancelableTaskError}; use rayon::{ ThreadPoolBuilder, iter::{IntoParallelIterator, ParallelIterator}, }; use std::{ io::ErrorKind, path::PathBuf, sync::{ Arc, atomic::{AtomicU64, Ordering}, mpsc::Receiver, }, thread::JoinHandle, time::Instant, }; use tantivy::{Executor, Index, IndexWriter, TantivyError, collector::TopDocs}; use thiserror::Error; use tracing::{debug, info, trace, warn}; use crate::{ DataSource, Item, index::{DbFtsIndex, FtsLookupResult}, path_ts_earliest, source::DirDataSource, }; #[derive(Debug, Error)] pub enum DatasetError { #[error("{0}")] IoError(#[from] std::io::Error), #[error("{0}")] TantivyError(#[from] TantivyError), #[error("this dataset does not have an fts index")] NoFtsIndex, } pub struct Dataset { pub path_config: PathBuf, pub path_parent: PathBuf, pub path_workdir: PathBuf, pub config: ConfigToml, } impl Dataset { pub fn open(config: impl Into) -> Result { let path_config = config.into(); let path_parent = path_config .parent() .ok_or(std::io::Error::new( ErrorKind::NotADirectory, format!("Config file {} has no parent", path_config.display()), ))? .to_owned(); let config = { let config = std::fs::read_to_string(&path_config)?; let config: Result = toml::from_str(&config); match config { Ok(config) => { trace!(message = "Loaded config", ?config); config } Err(error) => { return Err(std::io::Error::new( ErrorKind::InvalidData, format!("{} is invalid:\n{error}", path_config.display()), )); } } }; let path_workdir = config .dataset .working_dir .clone() .unwrap_or(path_parent.join(".pile")) .join(config.dataset.name.as_str()); return Ok(Self { path_config, path_parent, path_workdir, config, }); } // // MARK: get // pub fn get( &self, source: &Label, key: &PathBuf, ) -> Option + 'static>> { let s = self.config.dataset.source.get(source)?; let s = match s { Source::Flac { path } => DirDataSource::new(source, path.clone().to_vec()), }; s.get(key).ok().flatten() } // // MARK: fts // /// Refresh this dataset's fts index pub fn fts_refresh( &self, threads: usize, flag: Option, ) -> Result<(), CancelableTaskError> { let fts_tmp_dir = self.path_workdir.join(".tmp-fts"); let fts_dir = self.path_workdir.join("fts"); if fts_tmp_dir.is_dir() { warn!("Removing temporary index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; } std::fs::create_dir_all(&fts_tmp_dir).map_err(DatasetError::from)?; let db_index = DbFtsIndex::new(&fts_tmp_dir, &self.config); let mut index = Index::create_in_dir(&fts_tmp_dir, db_index.schema.clone()) .map_err(DatasetError::from)?; index.set_executor(Executor::multi_thread(10, "build-fts").map_err(DatasetError::from)?); let mut index_writer: IndexWriter = index.writer(50 * 1024 * 1024).map_err(DatasetError::from)?; let batch_size = 1000; let (_read_task, read_rx) = start_read_task(&self.config, batch_size); #[expect(clippy::unwrap_used)] let write_pool = ThreadPoolBuilder::new() .num_threads(threads.max(1)) .thread_name(|x| format!("fts_refresh_thread_{x}")) .build() .unwrap(); let mut total = 0u64; while let Ok(batch) = read_rx.recv() { let batch = batch?; if let Some(flag) = &flag && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } let this = AtomicU64::new(0); let start = Instant::now(); write_pool .install(|| { batch .into_par_iter() .filter_map(|(key, item)| match db_index.entry_to_document(&*item) { Ok(Some(doc)) => Some((key, doc)), Ok(None) => { warn!("Skipping {key:?}, document is empty"); None } Err(err) => { warn!("Could not read {key:?}, skipping. {err}"); None } }) .map(|(key, doc)| { this.fetch_add(1, Ordering::Relaxed); index_writer .add_document(doc) .map_err(|err| (key, err)) .map(|_| ()) }) .find_first(|x| x.is_err()) .unwrap_or(Ok(())) }) .map_err(|(_key, err)| DatasetError::from(err))?; let this = this.load(Ordering::Relaxed); total += this; let time_ms = start.elapsed().as_millis(); debug!("Added a batch of {this} in {time_ms} ms ({total} total)"); } if let Some(flag) = flag.as_ref() && flag.is_cancelled() { return Err(CancelableTaskError::Cancelled); } info!("Committing index"); index_writer.commit().map_err(DatasetError::from)?; if fts_dir.is_dir() { warn!("Removing existing index in {}", fts_dir.display()); std::fs::remove_dir_all(&fts_dir).map_err(DatasetError::from)?; } std::fs::rename(&fts_tmp_dir, &fts_dir).map_err(DatasetError::from)?; return Ok(()); } pub fn fts_lookup( &self, query: &str, top_n: usize, ) -> Result, DatasetError> { let fts_dir = self.path_workdir.join("fts"); if !fts_dir.exists() { return Err(DatasetError::NoFtsIndex); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), ) .into()); } let db_index = DbFtsIndex::new(&fts_dir, &self.config); let results = db_index.lookup(query, Arc::new(TopDocs::with_limit(top_n)))?; return Ok(results); } /// Time at which fts was created pub fn ts_fts(&self) -> Result>, std::io::Error> { let fts_dir = self.path_workdir.join("fts"); if !fts_dir.exists() { return Ok(None); } if !fts_dir.is_dir() { return Err(std::io::Error::new( ErrorKind::NotADirectory, format!("fts index {} is not a directory", fts_dir.display()), )); } return path_ts_earliest(&fts_dir); } /// Time at which data was last modified pub fn ts_data(&self) -> Result>, std::io::Error> { let mut ts: Option> = None; for (label, source) in &self.config.dataset.source { match source { Source::Flac { path } => { let s = DirDataSource::new(label, path.clone().to_vec()); match (ts, s.latest_change()?) { (_, None) => continue, (None, Some(new)) => ts = Some(new), (Some(old), Some(new)) => ts = Some(old.max(new)), }; } } } return Ok(ts); } /// Returns true if we do not have an fts index, /// or if our fts index is older than our data. pub fn needs_fts(&self) -> Result { let start = Instant::now(); let ts_fts = self.ts_fts()?; let ts_data = self.ts_data()?; let result = match (ts_fts, ts_data) { (None, Some(_)) => true, (None, None) | (Some(_), None) => { warn!("Could not determine data age"); false } (Some(ts_fts), Some(ts_data)) => ts_data > ts_fts, }; debug!( message = "Ran needs_fts", time_ms = start.elapsed().as_millis(), ?result ); return Ok(result); } } // // MARK: read_task // fn start_read_task( config: &ConfigToml, batch_size: usize, ) -> ( JoinHandle<()>, Receiver>)>, DatasetError>>, ) { let config = config.clone(); let (read_tx, read_rx) = std::sync::mpsc::sync_channel(2); let read_task = std::thread::spawn(move || { let mut batch = Vec::with_capacity(batch_size); for (name, source) in &config.dataset.source { info!("Loading source {name}"); match source { Source::Flac { path: dir } => { let source = DirDataSource::new(name, dir.clone().to_vec()); for i in source.iter() { match i { Ok(x) => batch.push(x), Err(err) => { let err = Err(DatasetError::from(err)); let _ = read_tx.send(err); return; } } if batch.len() >= batch_size { let b = std::mem::replace(&mut batch, Vec::with_capacity(batch_size)); match read_tx.send(Ok(b)) { Ok(()) => {} Err(_) => return, }; } } } } } if !batch.is_empty() { match read_tx.send(Ok(batch)) { Ok(()) => {} Err(_) => return, }; } }); return (read_task, read_rx); }