use pile_config::{ConfigToml, DatasetFts, Label, objectpath::ObjectPath}; use pile_value::{ extract::traits::ExtractState, value::{Item, PileValue}, }; use std::{path::PathBuf, sync::LazyLock}; use tantivy::{ DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError, collector::Collector, query::QueryParser, schema::{self, Schema, Value as TantivyValue}, }; use tracing::warn; #[derive(Debug, Clone)] pub struct FtsLookupResult { pub score: f32, pub source: Label, pub key: String, } pub struct DbFtsIndex { pub path: PathBuf, pub cfg: ConfigToml, pub schema: Schema, } impl DbFtsIndex { fn fts_cfg(&self) -> &DatasetFts { static DEFAULT: LazyLock = LazyLock::new(DatasetFts::default); self.cfg.fts.as_ref().unwrap_or(&DEFAULT) } } impl DbFtsIndex { pub fn new(path: impl Into, cfg: &ConfigToml) -> Self { let path = path.into(); let cfg = cfg.clone(); let schema = { let mut schema_builder = Schema::builder(); schema_builder.add_text_field("_meta_source", schema::STORED); schema_builder.add_text_field("_meta_key", schema::STORED); let default = DatasetFts::default(); let fields = &cfg.fts.as_ref().unwrap_or(&default).fields; for (name, field) in fields { if field.tokenize { schema_builder.add_text_field(name, schema::TEXT); } else { schema_builder.add_text_field(name, schema::STRING); } } schema_builder.build() }; Self { path, cfg, schema } } // // MARK: write // /// Turn an entry into a tantivy document pub async fn entry_to_document( &self, state: &ExtractState, item: &Item, ) -> Result, TantivyError> { let mut doc = TantivyDocument::default(); let key = item.key(); doc.add_text(self.schema.get_field("_meta_source")?, item.source_name()); doc.add_text(self.schema.get_field("_meta_key")?, key); let item = PileValue::Item(item.clone()); let mut empty = true; for name in self.fts_cfg().fields.keys() { let vals = self.get_field(state, &item, name).await?; let field = self.schema.get_field(name)?; for v in vals { empty = false; doc.add_text(field, v); } } return Ok((!empty).then_some(doc)); } // // MARK: read // pub async fn get_field( &self, state: &ExtractState, extractor: &PileValue, field_name: &Label, ) -> Result, std::io::Error> { let field = match self.cfg.schema.get(field_name) { Some(x) => x, None => { warn!("Unknown field {field_name:?}"); return Ok(Vec::new()); } }; // Try paths in order, using the first value we find for path in field.path.as_slice() { let val = match extractor.query(state, path).await? { Some(PileValue::Null) | None => continue, Some(x) => x, }; let val = val_to_string(state, &val, path, field_name).await?; if !val.is_empty() { return Ok(val); } } return Ok(Vec::new()); } /// Run the given query on this table's ftx index. /// Returns `None` if this table does not have an fts index. /// /// Note that concurrent writes to the table may invalidate `Entry::Id`. /// When you use [Table::get_meta] on this method's results, there is /// NO GUARANTEE that the entries returned by this method will exist. pub fn lookup( &self, query: impl Into, collector: impl AsRef + Send + 'static, ) -> Result, TantivyError> where C: Collector, C::Fruit: IntoIterator, { if !self.path.exists() { return Ok(Vec::new()); } if !self.path.is_dir() { warn!("fts index at {} is not a directory?!", self.path.display()); return Ok(Vec::new()); } let index = Index::open_in_dir(&self.path)?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::OnCommitWithDelay) .try_into()?; let schema = index.schema(); let mut fields = Vec::new(); for f in self.schema.fields() { fields.push(f.0) } let query: String = query.into(); let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, fields); let query = query_parser.parse_query(&query)?; let res = searcher.search(&query, collector.as_ref())?; let mut out = Vec::new(); for (score, doc) in res { let retrieved_doc: TantivyDocument = searcher.doc(doc)?; #[expect(clippy::unwrap_used)] let source_name = { let f_source = schema.get_field("_meta_source").unwrap(); let mut source = None; for v in retrieved_doc.get_all(f_source) { assert!(source.is_none()); // Must only exist once let v = v.as_str().unwrap().to_owned(); let v = Label::new(v).unwrap(); source = Some(v) } source.unwrap() }; #[expect(clippy::unwrap_used)] let key = { let f_source = schema.get_field("_meta_key").unwrap(); let mut source = None; for v in retrieved_doc.get_all(f_source) { assert!(source.is_none()); // Must only exist once let v = v.as_str().unwrap().to_owned(); source = Some(v) } source.unwrap() }; out.push(FtsLookupResult { score, source: source_name, key, }); } return Ok(out); } } async fn val_to_string( state: &ExtractState, val: &PileValue, path: &ObjectPath, field_name: &str, ) -> Result, std::io::Error> { match val { PileValue::String(x) => return Ok(vec![x.to_string()]), PileValue::U64(x) => return Ok(vec![x.to_string()]), PileValue::I64(x) => return Ok(vec![x.to_string()]), PileValue::Array(x) => { let mut out = Vec::new(); for x in x.iter() { out.extend(Box::pin(val_to_string(state, x, path, field_name)).await?); } return Ok(out); } #[expect(clippy::unwrap_used)] PileValue::ListExtractor(x) => { let mut out = Vec::new(); let len = x.len(state).await?; for i in 0..len { let v = x.get(state, i).await?; out.extend(Box::pin(val_to_string(state, &v.unwrap(), path, field_name)).await?); } return Ok(out); } PileValue::Null => {} PileValue::ObjectExtractor(_) => {} PileValue::Item(_) => {} PileValue::Binary(_) => {} } return Ok(Vec::new()); }