Configure server though env
All checks were successful
CI / Typos (push) Successful in 21s
CI / Build and test (push) Successful in 2m33s
CI / Clippy (push) Successful in 3m12s
Docker / build-and-push (push) Successful in 5m35s
CI / Build and test (all features) (push) Successful in 7m1s

This commit is contained in:
2026-03-26 20:40:51 -07:00
parent 256af68382
commit 5807733e62
10 changed files with 579 additions and 66 deletions

View File

@@ -28,3 +28,9 @@ serde_json = { workspace = true }
axum = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }
url = { workspace = true }
tracing-loki = { workspace = true }
base64 = { workspace = true }
dotenvy = { workspace = true }
envy = { workspace = true }
thiserror = { workspace = true }

View File

@@ -20,10 +20,6 @@ use crate::{CliCmd, GlobalContext};
#[derive(Debug, Args)]
pub struct ServerCommand {
/// Address to bind to
#[arg(default_value = "0.0.0.0:9000")]
addr: String,
/// The datasets we should serve. Can be repeated.
#[arg(long, short = 'c')]
config: Vec<PathBuf>,
@@ -31,26 +27,18 @@ pub struct ServerCommand {
/// If provided, do not serve docs
#[arg(long)]
no_docs: bool,
/// If provided, require this bearer token for all requests
#[arg(long)]
token: Option<String>,
/// Working directory root
#[arg(long, default_value = "./.pile")]
workdir: PathBuf,
}
impl CliCmd for ServerCommand {
async fn run(
self,
_ctx: GlobalContext,
ctx: GlobalContext,
flag: CancelFlag,
) -> Result<i32, CancelableTaskError<anyhow::Error>> {
let datasets = {
let mut datasets = Vec::new();
for c in &self.config {
let ds = Datasets::open(&c, &self.workdir)
let ds = Datasets::open(&c, &ctx.config.workdir_root)
.await
.with_context(|| format!("while opening dataset for {}", c.display()))?;
datasets.push(Arc::new(ds));
@@ -59,7 +47,7 @@ impl CliCmd for ServerCommand {
Arc::new(datasets)
};
let bearer = BearerToken(self.token.map(Arc::new));
let bearer = BearerToken(ctx.config.api_token.clone().map(Arc::new));
let mut router = Router::new();
for d in datasets.iter() {
@@ -85,14 +73,14 @@ impl CliCmd for ServerCommand {
let app = router.into_make_service_with_connect_info::<std::net::SocketAddr>();
let listener = match tokio::net::TcpListener::bind(self.addr.clone()).await {
let listener = match tokio::net::TcpListener::bind(ctx.config.server_addr.clone()).await {
Ok(x) => x,
Err(error) => {
match error.kind() {
std::io::ErrorKind::AddrInUse => {
error!(
message = "Cannot bind to address, already in use",
addr = self.addr
addr = ctx.config.server_addr
);
}
_ => {

View File

@@ -0,0 +1,109 @@
use serde::Deserialize;
use std::{num::NonZeroUsize, path::PathBuf};
use tracing::info;
use crate::config::{
env::load_env,
logging::{LoggingFormat, LoggingInitializer, LoggingPreset, LoggingTarget, LokiConfig},
};
/// Note that the field of this struct are not capitalized.
/// Envy is case-insensitive, and expects Rust fields to be snake_case.
#[derive(Debug, Deserialize, Clone)]
pub struct PileServerConfig {
#[serde(flatten)]
pub loki: Option<LokiConfig>,
/// The logging level to run with
#[serde(default)]
pub loglevel: LoggingPreset,
#[serde(default)]
pub logformat: LoggingFormat,
/// How many worker threads to use
pub threads: Option<NonZeroUsize>,
/// IP and port to bind to
/// Should look like `127.0.0.1:3030`
pub server_addr: String,
pub api_token: Option<String>,
pub workdir_root: PathBuf,
}
impl Default for PileServerConfig {
fn default() -> Self {
Self {
loki: None,
loglevel: LoggingPreset::Debug,
logformat: LoggingFormat::Ansi,
threads: None,
server_addr: "0.0.0.0:3000".into(),
api_token: None,
workdir_root: "./.pile".into(),
}
}
}
impl PileServerConfig {
pub fn load(with_env: bool, cli_log_level: LoggingPreset) -> Self {
let config = match with_env {
false => Self::default(),
true => {
let env = match load_env::<Self>() {
Ok(x) => x,
#[expect(clippy::print_stdout)]
Err(err) => {
println!("Error while loading .env: {err}");
std::process::exit(1);
}
};
env.get_config().clone()
}
};
{
let res = LoggingInitializer {
app_name: "pile-server",
loki: config.loki.clone(),
preset: if with_env {
config.loglevel
} else {
cli_log_level
},
target: LoggingTarget::Stderr {
format: config.logformat,
},
}
.initialize();
if let Err(e) = res {
#[expect(clippy::print_stderr)]
for e in e.chain() {
eprintln!("{e}");
}
std::process::exit(1);
}
}
info!(message = "Config loaded", ?config);
return config;
}
pub fn make_runtime(&self) -> tokio::runtime::Runtime {
let mut rt = tokio::runtime::Builder::new_multi_thread();
rt.enable_all();
if let Some(threads) = self.threads {
rt.worker_threads(threads.into());
}
#[expect(clippy::unwrap_used)]
let rt = rt.build().unwrap();
return rt;
}
}

View File

@@ -0,0 +1,108 @@
#![expect(dead_code)]
use serde::de::DeserializeOwned;
use std::{
collections::HashMap,
env::VarError,
io::ErrorKind,
path::{Path, PathBuf},
};
use thiserror::Error;
/// An error we might encounter when loading an env
#[derive(Debug, Error)]
pub enum EnvLoadError {
#[error("i/o error")]
IOError(#[from] std::io::Error),
#[error("varerror")]
VarError(#[from] VarError),
#[error("line parse error: `{on_line}` at char {at_char}")]
LineParse { on_line: String, at_char: usize },
#[error("other dotenvy error")]
Other(#[from] dotenvy::Error),
#[error("missing value {0}")]
MissingValue(String),
#[error("parse error: {0}")]
OtherParseError(String),
}
pub enum LoadedEnv<T> {
/// We loaded config from `.env` and env vars
FoundFile { config: T, path: PathBuf },
/// We could not find `.env` and only loaded env vars
OnlyVars(T),
}
impl<T> LoadedEnv<T> {
pub fn get_config(&self) -> &T {
match self {
Self::FoundFile { config, .. } => config,
Self::OnlyVars(config) => config,
}
}
}
/// Load the configuration type `T` from the current environment,
/// including the `.env` if it exists.
#[expect(clippy::wildcard_enum_match_arm)]
pub fn load_env<T: DeserializeOwned>() -> Result<LoadedEnv<T>, EnvLoadError> {
let env_path = match dotenvy::dotenv() {
Ok(path) => Some(path),
Err(dotenvy::Error::Io(err)) => match err.kind() {
ErrorKind::NotFound => None,
_ => return Err(EnvLoadError::IOError(err)),
},
Err(dotenvy::Error::EnvVar(err)) => {
return Err(EnvLoadError::VarError(err));
}
Err(dotenvy::Error::LineParse(on_line, at_char)) => {
return Err(EnvLoadError::LineParse { on_line, at_char });
}
Err(err) => {
return Err(EnvLoadError::Other(err));
}
};
match envy::from_env::<T>() {
Ok(config) => {
if let Some(path) = env_path {
return Ok(LoadedEnv::FoundFile { path, config });
} else {
return Ok(LoadedEnv::OnlyVars(config));
}
}
Err(envy::Error::MissingValue(value)) => {
return Err(EnvLoadError::MissingValue(value.into()));
}
Err(envy::Error::Custom(message)) => {
return Err(EnvLoadError::OtherParseError(message));
}
};
}
/// Load an .env file to a hashmap.
///
/// This function does not read the current env,
/// only parsing vars explicitly declared in the given file.
pub fn load_env_dict(p: impl AsRef<Path>) -> Result<HashMap<String, String>, EnvLoadError> {
let mut out = HashMap::new();
for item in dotenvy::from_filename_iter(p)? {
let (key, val) = item?;
out.insert(key, val);
}
return Ok(out);
}

View File

@@ -1,7 +1,13 @@
use anyhow::Result;
use clap::ValueEnum;
use indicatif::MultiProgress;
use serde::Deserialize;
use std::{fmt::Display, str::FromStr};
use tracing_subscriber::EnvFilter;
use tracing_indicatif::IndicatifWriter;
use tracing_subscriber::{
EnvFilter, Layer, fmt::MakeWriter, layer::SubscriberExt, util::SubscriberInitExt,
};
use url::Url;
#[derive(Debug, Default)]
pub enum LogLevel {
@@ -32,6 +38,7 @@ pub enum LoggingPreset {
Info,
Debug,
Trace,
Loki,
}
pub struct LoggingConfig {
@@ -138,6 +145,203 @@ impl LoggingPreset {
pile_dataset: LogLevel::Trace,
pile_toolbox: LogLevel::Trace,
},
Self::Loki => LoggingConfig {
other: LogLevel::Warn,
extractor: LogLevel::Error,
pile: LogLevel::Trace,
pile_flac: LogLevel::Trace,
pile_config: LogLevel::Trace,
pile_dataset: LogLevel::Trace,
pile_toolbox: LogLevel::Trace,
},
}
}
}
//
// MARK: initializer
//
#[derive(Debug, Deserialize, Clone)]
pub struct LokiConfig {
pub loki_host: Url,
pub loki_user: String,
pub loki_pass: String,
pub loki_node_name: String,
}
/// Where to print logs
#[expect(dead_code)]
pub enum LoggingTarget {
/// Send logs to stdout
Stdout { format: LoggingFormat },
/// Send logs to stderr
Stderr { format: LoggingFormat },
/// Send logs to an IndicatifWriter.
///
/// This is the same as Stderr { format: Ansi {color:true} },
/// but uses an indicatifwriter with the given multiprogress.
Indicatif(MultiProgress),
}
/// How to print logs
#[derive(Debug, Clone, Copy, Deserialize, Default)]
pub enum LoggingFormat {
#[default]
Ansi,
AnsiNoColor,
Json,
}
pub struct LoggingInitializer {
pub app_name: &'static str,
/// If `Some`, send logs to the given loki server
pub loki: Option<LokiConfig>,
/// Log filter for printed logs
pub preset: LoggingPreset,
/// Where to print logs
pub target: LoggingTarget,
}
impl LoggingInitializer {
pub fn initialize(self) -> Result<()> {
let mut stderr_ansi_layer = None;
let mut stderr_json_layer = None;
let mut stdout_ansi_layer = None;
let mut stdout_json_layer = None;
let mut indicatif_layer = None;
match self.target {
LoggingTarget::Stderr {
format: LoggingFormat::Ansi,
} => {
stderr_ansi_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(true)
.with_writer(std::io::stderr)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Stderr {
format: LoggingFormat::AnsiNoColor,
} => {
stderr_ansi_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Stderr {
format: LoggingFormat::Json,
} => {
stderr_json_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(false)
.json()
.flatten_event(true)
.with_writer(std::io::stderr)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Stdout {
format: LoggingFormat::Ansi,
} => {
stdout_ansi_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(true)
.with_writer(std::io::stdout)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Stdout {
format: LoggingFormat::AnsiNoColor,
} => {
stdout_ansi_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(false)
.with_writer(std::io::stdout)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Stdout {
format: LoggingFormat::Json,
} => {
stdout_json_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(false)
.json()
.flatten_event(true)
.with_writer(std::io::stdout)
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
LoggingTarget::Indicatif(mp) => {
let writer: IndicatifWriter<tracing_indicatif::writer::Stderr> =
IndicatifWriter::new(mp);
indicatif_layer = Some(
tracing_subscriber::fmt::Layer::default()
.without_time()
.with_ansi(true)
.with_writer(writer.make_writer())
.with_filter::<EnvFilter>(self.preset.get_config().into()),
)
}
}
let loki_layer = {
if let Some(cfg) = self.loki {
use anyhow::Context;
use base64::{Engine, prelude::BASE64_STANDARD};
let basic_auth = format!("{}:{}", cfg.loki_user, cfg.loki_pass);
let encoded_basic_auth = BASE64_STANDARD.encode(basic_auth.as_bytes());
let (layer, task) = tracing_loki::builder()
.label("node_name", cfg.loki_node_name)
.context("while building loki node_name label")?
.label("app", self.app_name)
.context("while building loki app label")?
.http_header("Authorization", format!("Basic {encoded_basic_auth}"))
.context("while building loki header")?
.build_url(cfg.loki_host)
.context("while building loki layer")?;
tokio::spawn(task);
Some(layer.with_filter::<EnvFilter>(LoggingPreset::Loki.get_config().into()))
} else {
None
}
};
tracing_subscriber::registry()
.with(loki_layer)
.with(stdout_ansi_layer)
.with(stdout_json_layer)
.with(stderr_ansi_layer)
.with(stderr_json_layer)
.with(indicatif_layer)
.init();
Ok(())
}
}

View File

@@ -1,2 +1,6 @@
mod logging;
pub use logging::*;
pub mod env;
pub mod logging;
#[expect(clippy::module_inception)]
mod config;
pub use config::*;

View File

@@ -1,15 +1,13 @@
use anyhow::{Context, Result};
use clap::Parser;
use config::LoggingPreset;
use indicatif::MultiProgress;
use pile_toolbox::cancelabletask::CancelableTaskResult;
use std::process::ExitCode;
use tracing::{error, warn};
use tracing_indicatif::{IndicatifWriter, writer::Stderr};
use tracing_subscriber::fmt::MakeWriter;
use crate::{
command::{CliCmd, CliCmdDispatch, SubCommand},
config::{PileServerConfig, logging::LoggingPreset},
signal::start_signal_task,
};
@@ -36,17 +34,11 @@ struct Cli {
#[derive(Clone)]
pub struct GlobalContext {
pub mp: MultiProgress,
pub config: PileServerConfig,
}
fn main() -> ExitCode {
#[expect(clippy::unwrap_used)]
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(10)
.build()
.unwrap();
match rt.block_on(main_inner()) {
match main_inner() {
Ok(code) => {
std::process::exit(code);
}
@@ -59,7 +51,7 @@ fn main() -> ExitCode {
}
}
async fn main_inner() -> Result<i32> {
fn main_inner() -> Result<i32> {
let cli = Cli::parse();
let level_i: i16 = cli.v as i16 - cli.q as i16;
@@ -80,36 +72,34 @@ async fn main_inner() -> Result<i32> {
}
let mp = MultiProgress::new();
let writer: IndicatifWriter<Stderr> = IndicatifWriter::new(mp.clone());
let config = PileServerConfig::load(matches!(cli.cmd, SubCommand::Server { .. }), level);
let rt = config.make_runtime();
tracing_subscriber::fmt()
.with_env_filter(level.get_config())
.without_time()
.with_ansi(true)
.with_writer(writer.make_writer())
.init();
let ctx = GlobalContext { mp, config };
let ctx = GlobalContext { mp };
let res = rt.block_on(async {
let task = cli.cmd.start(ctx).context("while starting task")?;
let signal_task = start_signal_task(task.flag().clone());
let task = cli.cmd.start(ctx).context("while starting task")?;
let signal_task = start_signal_task(task.flag().clone());
match task.join().await {
Ok(CancelableTaskResult::Finished(Ok(code))) => Ok(code),
Ok(CancelableTaskResult::Cancelled) => {
signal_task.abort();
warn!("Task cancelled successfully");
Ok(1)
}
match task.join().await {
Ok(CancelableTaskResult::Finished(Ok(code))) => Ok(code),
Ok(CancelableTaskResult::Cancelled) => {
signal_task.abort();
warn!("Task cancelled successfully");
Ok(1)
Err(err) => {
signal_task.abort();
Err(err).context("while joining task")
}
Ok(CancelableTaskResult::Finished(Err(err))) => {
signal_task.abort();
Err(err).context("while running task")
}
}
});
Err(err) => {
signal_task.abort();
Err(err).context("while joining task")
}
Ok(CancelableTaskResult::Finished(Err(err))) => {
signal_task.abort();
Err(err).context("while running task")
}
}
res
}