use jsonpath_rust::JsonPath; use pile_config::{ConfigToml, DatasetFts, Label}; use serde_json::Value; use std::{path::PathBuf, sync::LazyLock}; use tantivy::{ DocAddress, Index, ReloadPolicy, TantivyDocument, TantivyError, collector::Collector, query::QueryParser, schema::{self, Schema, Value as TantivyValue}, }; use tracing::{debug, trace, warn}; use crate::{Item, Key}; #[derive(Debug, Clone)] pub struct FtsLookupResult { pub score: f32, pub source_name: Label, pub key: String, } pub struct DbFtsIndex { pub path: PathBuf, pub cfg: ConfigToml, pub schema: Schema, } impl DbFtsIndex { fn fts_cfg(&self) -> &DatasetFts { static DEFAULT: LazyLock = LazyLock::new(DatasetFts::default); self.cfg.fts.as_ref().unwrap_or(&DEFAULT) } } impl DbFtsIndex { pub fn new(path: impl Into, cfg: &ConfigToml) -> Self { let path = path.into(); let cfg = cfg.clone(); let schema = { let mut schema_builder = Schema::builder(); schema_builder.add_text_field("_meta_source", schema::STORED); schema_builder.add_text_field("_meta_key", schema::STORED); let default = DatasetFts::default(); let fields = &cfg.fts.as_ref().unwrap_or(&default).fields; for (name, field) in fields { if field.tokenize { schema_builder.add_text_field(name, schema::TEXT); } else { schema_builder.add_text_field(name, schema::STRING); } } schema_builder.build() }; Self { path, cfg, schema } } // // MARK: write // /// Turn an entry into a tantivy document pub fn entry_to_document( &self, item: &dyn Item, ) -> Result, TantivyError> { let mut doc = TantivyDocument::default(); let key = match item.key().to_string() { Some(x) => x, None => { warn!( message = "Item key cannot be converted to a string, skipping", key = ?item.key(), ); return Ok(None); } }; doc.add_text(self.schema.get_field("_meta_source")?, item.source_name()); doc.add_text(self.schema.get_field("_meta_key")?, key); let json = item.json()?; let mut empty = true; for name in self.fts_cfg().fields.keys() { let x = self.get_field(&json, name)?; let val = match x { Some(x) => x, None => continue, }; empty = false; let field = self.schema.get_field(name); if let Ok(field) = field { doc.add_text(field, val); } } return Ok((!empty).then_some(doc)); } // // MARK: read // pub fn get_field( &self, json: &Value, field_name: &Label, ) -> Result, std::io::Error> { let field = match self.cfg.schema.get(field_name) { Some(x) => x, None => { warn!("Unknown field {field_name:?}"); return Ok(None); } }; // Try paths in order, using the first value we find 'outer: for path in field.path.as_slice() { let val = match json.query(path) { Ok(mut x) => { if x.len() > 1 { warn!( message = "Path returned more than one value, this is not supported. Skipping.", ?path, field = field_name.to_string() ); continue; } match x.pop() { Some(x) => x, None => continue, } } Err(error) => { warn!( message = "Invalid path, skipping", ?path, field = field_name.to_string(), ?error ); continue; } }; let mut val = match val { Value::Null => { trace!( message = "Skipping field, is null", field = field_name.to_string(), path, value = ?val ); continue; } x => x.clone(), }; for post in &field.post { val = match post.apply(&val) { Some(x) => x, None => return Ok(None), }; } loop { val = match val { #[expect(clippy::unwrap_used)] Value::Array(ref mut x) => { if x.len() == 1 { x.pop().unwrap() } else if x.len() > 1 { debug!( message = "Skipping field, is array with more than one element", field = field_name.to_string(), path, value = ?val ); continue 'outer; } else { debug!( message = "Skipping field, is empty array", field = field_name.to_string(), path, value = ?val ); continue 'outer; } } Value::Null => { trace!( message = "Skipping field, is null", field = field_name.to_string(), path, value = ?val ); continue 'outer; } Value::Object(_) => { trace!( message = "Skipping field, is object", field = field_name.to_string(), path, value = ?val ); continue 'outer; } Value::Bool(x) => return Ok(Some(x.to_string())), Value::Number(x) => return Ok(Some(x.to_string())), Value::String(x) => return Ok(Some(x)), } } } return Ok(None); } /// Run the given query on this table's ftx index. /// Returns `None` if this table does not have an fts index. /// /// Note that concurrent writes to the table may invalidate `Entry::Id`. /// When you use [Table::get_meta] on this method's results, there is /// NO GUARANTEE that the entries returned by this method will exist. pub fn lookup( &self, query: impl Into, collector: impl AsRef + Send + 'static, ) -> Result, TantivyError> where C: Collector, C::Fruit: IntoIterator, { if !self.path.exists() { return Ok(Vec::new()); } if !self.path.is_dir() { warn!("fts index at {} is not a directory?!", self.path.display()); return Ok(Vec::new()); } let index = Index::open_in_dir(&self.path)?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::OnCommitWithDelay) .try_into()?; let schema = index.schema(); let mut fields = Vec::new(); for f in self.schema.fields() { fields.push(f.0) } let query: String = query.into(); let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, fields); let query = query_parser.parse_query(&query)?; let res = searcher.search(&query, collector.as_ref())?; let mut out = Vec::new(); for (score, doc) in res { let retrieved_doc: TantivyDocument = searcher.doc(doc)?; #[expect(clippy::unwrap_used)] let source_name = { let f_source = schema.get_field("_meta_source").unwrap(); let mut source = None; for v in retrieved_doc.get_all(f_source) { assert!(source.is_none()); // Must only exist once let v = v.as_str().unwrap().to_owned(); let v = Label::new(v).unwrap(); source = Some(v) } source.unwrap() }; #[expect(clippy::unwrap_used)] let key = { let f_source = schema.get_field("_meta_key").unwrap(); let mut source = None; for v in retrieved_doc.get_all(f_source) { assert!(source.is_none()); // Must only exist once let v = v.as_str().unwrap().to_owned(); source = Some(v) } source.unwrap() }; out.push(FtsLookupResult { score, source_name, key, }); } return Ok(out); } }