feat: Add ntfy score #69

Merged
taha merged 6 commits from ntfy into master 2025-07-02 16:19:41 +00:00
15 changed files with 1039 additions and 239 deletions

774
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,14 +24,26 @@ log = "0.4"
env_logger = "0.11" env_logger = "0.11"
derive-new = "0.7" derive-new = "0.7"
async-trait = "0.1" async-trait = "0.1"
tokio = { version = "1.40", features = ["io-std", "fs", "macros", "rt-multi-thread"] } tokio = { version = "1.40", features = [
"io-std",
"fs",
"macros",
"rt-multi-thread",
] }
cidr = { features = ["serde"], version = "0.2" } cidr = { features = ["serde"], version = "0.2" }
russh = "0.45" russh = "0.45"
russh-keys = "0.45" russh-keys = "0.45"
rand = "0.8" rand = "0.8"
url = "2.5" url = "2.5"
kube = "0.98" kube = { version = "1.1.0", features = [
k8s-openapi = { version = "0.24", features = ["v1_30"] } "config",
"client",
"runtime",
"rustls-tls",
"ws",
"jsonpatch",
] }
k8s-openapi = { version = "0.25", features = ["v1_30"] }
serde_yaml = "0.9" serde_yaml = "0.9"
serde-value = "0.7" serde-value = "0.7"
http = "1.2" http = "1.2"

View File

@ -0,0 +1,78 @@
# Architecture Decision Record: Monitoring Notifications
Initial Author: Taha Hawa
Initial Date: 2025-06-26
Last Updated Date: 2025-06-26
## Status
Proposed
## Context
We need to send notifications (typically from AlertManager/Prometheus) and we need to receive said notifications on mobile devices for sure in some way, whether it's push messages, SMS, phone call, email, etc or all of the above.
## Decision
We should go with https://ntfy.sh except host it ourselves.
`ntfy` is an open source solution written in Go that has the features we need.
## Rationale
`ntfy` has pretty much everything we need (push notifications, email forwarding, receives via webhook), and nothing/not much we don't. Good fit, lightweight.
## Consequences
Pros:
- topics, with ACLs
- lightweight
- reliable
- easy to configure
- mobile app
- the mobile app can listen via websocket, poll, or receive via Firebase/GCM on Android, or similar on iOS.
- Forward to email
- Text-to-Speech phone call messages using Twilio integration
- Operates based on simple HTTP requests/Webhooks, easily usable via AlertManager
Cons:
- No SMS pushes
- SQLite DB, makes it harder to HA/scale
## Alternatives considered
[AWS SNS](https://aws.amazon.com/sns/):
Pros:
- highly reliable
- no hosting needed
Cons:
- no control, not self hosted
- costs (per usage)
[Apprise](https://github.com/caronc/apprise):
Pros:
- Way more ways of sending notifications
- Can use ntfy as one of the backends/ways of sending
Cons:
- Way too overkill for what we need in terms of features
[Gotify](https://github.com/gotify/server):
Pros:
- simple, lightweight, golang, etc
Cons:
- Pushes topics are per-user
## Additional Notes

View File

@ -14,8 +14,8 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true } log = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
url = { workspace = true } url = { workspace = true }
kube = "0.98.0" kube = "1.1.0"
k8s-openapi = { version = "0.24.0", features = [ "v1_30" ] } k8s-openapi = { version = "0.25.0", features = ["v1_30"] }
http = "1.2.0" http = "1.2.0"
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
inquire.workspace = true inquire.workspace = true

12
examples/ntfy/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "example-ntfy"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true
url.workspace = true

19
examples/ntfy/src/main.rs Normal file
View File

@ -0,0 +1,19 @@
use harmony::{
inventory::Inventory, maestro::Maestro, modules::monitoring::ntfy::ntfy::NtfyScore,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
)
.await
.unwrap();
maestro.register_all(vec![Box::new(NtfyScore {
namespace: "monitoring".to_string(),
})]);
harmony_cli::init(maestro, None).await.unwrap();
}

View File

@ -54,3 +54,6 @@ fqdn = { version = "0.4.6", features = [
temp-dir = "0.1.14" temp-dir = "0.1.14"
dyn-clone = "1.0.19" dyn-clone = "1.0.19"
similar.workspace = true similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }

View File

@ -124,3 +124,11 @@ impl From<kube::Error> for InterpretError {
} }
} }
} }
impl From<String> for InterpretError {
fn from(value: String) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}

View File

@ -1,14 +1,21 @@
use derive_new::new; use derive_new::new;
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; use futures_util::StreamExt;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
};
use kube::runtime::conditions;
use kube::runtime::wait::await_condition;
use kube::{ use kube::{
Api, Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Patch, PatchParams}, api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
runtime::reflector::Lookup,
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use similar::TextDiff; use similar::{DiffableStr, TextDiff};
#[derive(new)] #[derive(new)]
pub struct K8sClient { pub struct K8sClient {
@ -22,6 +29,88 @@ impl K8sClient {
}) })
} }
pub async fn wait_until_deployment_ready(
&self,
name: String,
namespace: Option<&str>,
timeout: Option<u64>,
) -> Result<(), String> {
let api: Api<Deployment>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
let t = if let Some(t) = timeout { t } else { 300 };
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
if let Ok(r) = res {
return Ok(());
} else {
return Err("timed out while waiting for deployment".to_string());
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
taha marked this conversation as resolved Outdated

rename to exec_app and add a bit of documentation to indicate that it will execute in the first pod matching label app.kubernetes.io/name={name}

rename to exec_app and add a bit of documentation to indicate that it will execute in the first pod matching label `app.kubernetes.io/name={name}`
pub async fn exec_app(
&self,
name: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<(), String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("app.kubernetes.io/name={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default(),
)
.await;
match res {
Err(e) => return Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
debug!("Status: {}", s);
if s == "Success" {
return Ok(());
} else {
return Err(s);
}
} else {
return Err("Couldn't get inner status of pod exec".to_string());
}
}
}
}
/// Apply a resource in namespace /// Apply a resource in namespace
/// ///
/// See `kubectl apply` for more information on the expected behavior of this function /// See `kubectl apply` for more information on the expected behavior of this function

View File

@ -1,8 +1,7 @@
use serde::Serialize; use serde::Serialize;
use crate::modules::monitoring::{ use crate::modules::monitoring::kube_prometheus::types::{
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, AlertManagerAdditionalPromRules, AlertManagerChannelConfig,
kube_prometheus::types::{AlertManagerAdditionalPromRules, AlertManagerChannelConfig},
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]

View File

@ -1,3 +1,4 @@
pub mod alert_channel; pub mod alert_channel;
pub mod alert_rule; pub mod alert_rule;
pub mod kube_prometheus; pub mod kube_prometheus;
pub mod ntfy;

View File

@ -0,0 +1 @@
pub mod ntfy_helm_chart;

View File

@ -0,0 +1,83 @@
use non_blank_string_rs::NonBlankString;
use std::str::FromStr;
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
pub fn ntfy_helm_chart_score(namespace: String) -> HelmChartScore {
let values = format!(
r#"
replicaCount: 1
image:
repository: binwiederhier/ntfy
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "v2.12.0"
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
# annotations:
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
# name: ""
service:
type: ClusterIP
port: 80
ingress:
enabled: false
# annotations:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: ntfy.host.com
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
autoscaling:
enabled: false
config:
enabled: true
data:
# base-url: "https://ntfy.something.com"
auth-file: "/var/cache/ntfy/user.db"
auth-default-access: "deny-all"
cache-file: "/var/cache/ntfy/cache.db"
attachment-cache-dir: "/var/cache/ntfy/attachments"
behind-proxy: true
# web-root: "disable"
enable-signup: false
enable-login: "true"
persistence:
enabled: true
size: 200Mi
"#,
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str("ntfy").unwrap(),
chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(),
chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()),
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"sarab97".to_string(),
url::Url::parse("https://charts.sarabsingh.com").unwrap(),
true,
)),
}
}

View File

@ -0,0 +1,2 @@
pub mod helm;
pub mod ntfy;

View File

@ -0,0 +1,169 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use strum::{Display, EnumString};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
score::Score,
topology::{HelmCommand, K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct NtfyScore {
pub namespace: String,
}
impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NtfyInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("Ntfy")
}
}
#[derive(Debug, Serialize)]
pub struct NtfyInterpret {
pub score: NtfyScore,
}
#[derive(Debug, EnumString, Display)]
enum NtfyAccessMode {
#[strum(serialize = "read-write", serialize = "rw", to_string = "read-write")]
ReadWrite,
#[strum(
serialize = "read-only",
serialize = "ro",
serialize = "read",
to_string = "read-only"
)]
ReadOnly,
#[strum(
serialize = "write-only",
serialize = "wo",
serialize = "write",
to_string = "write-only"
)]
WriteOnly,
#[strum(serialize = "none", to_string = "deny")]
Deny,
}
#[derive(Debug, EnumString, Display)]
enum NtfyRole {
#[strum(serialize = "user", to_string = "user")]
User,
#[strum(serialize = "admin", to_string = "admin")]
Admin,
}
impl NtfyInterpret {
taha marked this conversation as resolved
Review

Explain why you need ntfy interpret : we don't just install it, we also bootstrap the configuration

Explain why you need ntfy interpret : we don't just install it, we also bootstrap the configuration
async fn add_user(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
password: &str,
role: Option<NtfyRole>,
) -> Result<(), String> {
let role = match role {
Some(r) => r,
None => NtfyRole::User,
};
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("NTFY_PASSWORD={password} ntfy user add --role={role} {username}")
.as_str(),
],
)
.await?;
Ok(())
}
async fn set_access(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
topic: &str,
mode: NtfyAccessMode,
) -> Result<(), String> {
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("ntfy access {username} {topic} {mode}").as_str(),
],
)
.await?;
Ok(())
}
}
/// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy
#[async_trait]
impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for NtfyInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
ntfy_helm_chart_score(self.score.namespace.clone())
.create_interpret()
.execute(inventory, topology)
.await?;
debug!("installed ntfy helm chart");
let client = topology
.k8s_client()
.await
.expect("couldn't get k8s client");
client
.wait_until_deployment_ready(
"ntfy".to_string(),
Some(&self.score.namespace.as_str()),
None,
)
.await?;
debug!("created k8s client");
self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin))
.await?;
debug!("exec into pod done");
Ok(Outcome::success("installed ntfy".to_string()))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}