use chrono::{DateTime, Utc}; use pile_config::Label; use std::{path::PathBuf, sync::Arc}; use tokio_stream::wrappers::ReceiverStream; use walkdir::WalkDir; use crate::{ source::{DataSource, misc::path_ts_latest}, value::Item, }; #[derive(Debug)] pub struct DirDataSource { pub name: Label, pub dir: PathBuf, pub sidecars: bool, } impl DirDataSource { pub fn new(name: &Label, dir: PathBuf, sidecars: bool) -> Self { Self { name: name.clone(), dir, sidecars, } } } impl DataSource for Arc { async fn get(&self, key: &str) -> Result, std::io::Error> { let key = match key.parse::() { Ok(x) => self.dir.join(x), Err(_) => return Ok(None), }; if !key.is_file() { return Ok(None); } // Ignore toml files if sidecars are enabled if self.sidecars && key.extension().and_then(|x| x.to_str()) == Some("toml") { return Ok(None); } return Ok(Some(Item::File { source: Arc::clone(self), mime: mime_guess::from_path(&key).first_or_octet_stream(), path: key.clone(), sidecar: self.sidecars.then(|| { Box::new(Item::File { source: Arc::clone(self), mime: mime_guess::from_path(key.with_extension("toml")).first_or_octet_stream(), path: key.with_extension("toml"), sidecar: None, }) }), })); } fn iter(&self) -> ReceiverStream> { let (tx, rx) = tokio::sync::mpsc::channel(64); let source = Arc::clone(self); let dir = self.dir.clone(); tokio::task::spawn_blocking(move || { for entry in WalkDir::new(dir) { let entry = match entry { Err(e) => { let msg = format!("walkdir error: {e:?}"); let err = e.into_io_error().unwrap_or(std::io::Error::other(msg)); if tx.blocking_send(Err(err)).is_err() { return; } continue; } Ok(e) => e, }; if entry.file_type().is_dir() { continue; } let path = entry.into_path(); let item = match path.extension().and_then(|x| x.to_str()) { None => continue, Some("toml") if source.sidecars => continue, Some(_) => Item::File { source: Arc::clone(&source), mime: mime_guess::from_path(&path).first_or_octet_stream(), path: path.clone(), sidecar: source.sidecars.then(|| { Box::new(Item::File { source: Arc::clone(&source), mime: mime_guess::from_path(path.with_extension("toml")) .first_or_octet_stream(), path: path.with_extension("toml"), sidecar: None, }) }), }, }; if tx.blocking_send(Ok(item)).is_err() { return; } } }); ReceiverStream::new(rx) } async fn latest_change(&self) -> Result>, std::io::Error> { let mut ts: Option> = None; if !self.dir.exists() { return Ok(None); } let new = path_ts_latest(&self.dir)?; match (ts, new) { (_, None) => {} (None, Some(new)) => ts = Some(new), (Some(old), Some(new)) => ts = Some(old.max(new)), }; return Ok(ts); } }