feat: Add ntfy score #69
774
Cargo.lock
generated
774
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
22
Cargo.toml
22
Cargo.toml
@ -24,19 +24,31 @@ log = "0.4"
|
||||
env_logger = "0.11"
|
||||
derive-new = "0.7"
|
||||
async-trait = "0.1"
|
||||
tokio = { version = "1.40", features = ["io-std", "fs", "macros", "rt-multi-thread"] }
|
||||
tokio = { version = "1.40", features = [
|
||||
"io-std",
|
||||
"fs",
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
] }
|
||||
cidr = { features = ["serde"], version = "0.2" }
|
||||
russh = "0.45"
|
||||
russh-keys = "0.45"
|
||||
rand = "0.8"
|
||||
url = "2.5"
|
||||
kube = "0.98"
|
||||
k8s-openapi = { version = "0.24", features = ["v1_30"] }
|
||||
kube = { version = "1.1.0", features = [
|
||||
"config",
|
||||
"client",
|
||||
"runtime",
|
||||
"rustls-tls",
|
||||
"ws",
|
||||
"jsonpatch",
|
||||
] }
|
||||
k8s-openapi = { version = "0.25", features = ["v1_30"] }
|
||||
serde_yaml = "0.9"
|
||||
serde-value = "0.7"
|
||||
http = "1.2"
|
||||
inquire = "0.7"
|
||||
convert_case = "0.8"
|
||||
convert_case = "0.8"
|
||||
chrono = "0.4"
|
||||
similar = "2"
|
||||
uuid = { version = "1.11", features = [ "v4", "fast-rng", "macro-diagnostics" ] }
|
||||
uuid = { version = "1.11", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
||||
|
78
adr/013-monitoring-notifications.md
Normal file
78
adr/013-monitoring-notifications.md
Normal file
@ -0,0 +1,78 @@
|
||||
# Architecture Decision Record: Monitoring Notifications
|
||||
|
||||
Initial Author: Taha Hawa
|
||||
|
||||
Initial Date: 2025-06-26
|
||||
|
||||
Last Updated Date: 2025-06-26
|
||||
|
||||
## Status
|
||||
|
||||
Proposed
|
||||
|
||||
## Context
|
||||
|
||||
We need to send notifications (typically from AlertManager/Prometheus) and we need to receive said notifications on mobile devices for sure in some way, whether it's push messages, SMS, phone call, email, etc or all of the above.
|
||||
|
||||
## Decision
|
||||
|
||||
We should go with https://ntfy.sh except host it ourselves.
|
||||
|
||||
`ntfy` is an open source solution written in Go that has the features we need.
|
||||
|
||||
## Rationale
|
||||
|
||||
`ntfy` has pretty much everything we need (push notifications, email forwarding, receives via webhook), and nothing/not much we don't. Good fit, lightweight.
|
||||
|
||||
## Consequences
|
||||
|
||||
Pros:
|
||||
|
||||
- topics, with ACLs
|
||||
- lightweight
|
||||
- reliable
|
||||
- easy to configure
|
||||
- mobile app
|
||||
- the mobile app can listen via websocket, poll, or receive via Firebase/GCM on Android, or similar on iOS.
|
||||
- Forward to email
|
||||
- Text-to-Speech phone call messages using Twilio integration
|
||||
- Operates based on simple HTTP requests/Webhooks, easily usable via AlertManager
|
||||
|
||||
Cons:
|
||||
|
||||
- No SMS pushes
|
||||
- SQLite DB, makes it harder to HA/scale
|
||||
|
||||
## Alternatives considered
|
||||
|
||||
[AWS SNS](https://aws.amazon.com/sns/):
|
||||
Pros:
|
||||
|
||||
- highly reliable
|
||||
- no hosting needed
|
||||
|
||||
Cons:
|
||||
|
||||
- no control, not self hosted
|
||||
- costs (per usage)
|
||||
|
||||
[Apprise](https://github.com/caronc/apprise):
|
||||
Pros:
|
||||
|
||||
- Way more ways of sending notifications
|
||||
- Can use ntfy as one of the backends/ways of sending
|
||||
|
||||
Cons:
|
||||
|
||||
- Way too overkill for what we need in terms of features
|
||||
|
||||
[Gotify](https://github.com/gotify/server):
|
||||
Pros:
|
||||
|
||||
- simple, lightweight, golang, etc
|
||||
|
||||
Cons:
|
||||
|
||||
- Pushes topics are per-user
|
||||
|
||||
## Additional Notes
|
@ -14,8 +14,8 @@ harmony_macros = { path = "../../harmony_macros" }
|
||||
log = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
url = { workspace = true }
|
||||
kube = "0.98.0"
|
||||
k8s-openapi = { version = "0.24.0", features = [ "v1_30" ] }
|
||||
kube = "1.1.0"
|
||||
k8s-openapi = { version = "0.25.0", features = ["v1_30"] }
|
||||
http = "1.2.0"
|
||||
serde_yaml = "0.9.34"
|
||||
inquire.workspace = true
|
||||
|
12
examples/ntfy/Cargo.toml
Normal file
12
examples/ntfy/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "example-ntfy"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { version = "0.1.0", path = "../../harmony" }
|
||||
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
|
||||
tokio.workspace = true
|
||||
url.workspace = true
|
19
examples/ntfy/src/main.rs
Normal file
19
examples/ntfy/src/main.rs
Normal file
@ -0,0 +1,19 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, maestro::Maestro, modules::monitoring::ntfy::ntfy::NtfyScore,
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
|
||||
Inventory::autoload(),
|
||||
K8sAnywhereTopology::from_env(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
maestro.register_all(vec![Box::new(NtfyScore {
|
||||
namespace: "monitoring".to_string(),
|
||||
})]);
|
||||
harmony_cli::init(maestro, None).await.unwrap();
|
||||
}
|
@ -54,3 +54,6 @@ fqdn = { version = "0.4.6", features = [
|
||||
temp-dir = "0.1.14"
|
||||
dyn-clone = "1.0.19"
|
||||
similar.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
tokio-util = "0.7.15"
|
||||
strum = { version = "0.27.1", features = ["derive"] }
|
||||
|
@ -124,3 +124,11 @@ impl From<kube::Error> for InterpretError {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for InterpretError {
|
||||
fn from(value: String) -> Self {
|
||||
Self {
|
||||
msg: format!("InterpretError : {value}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,14 +1,21 @@
|
||||
use derive_new::new;
|
||||
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
|
||||
use futures_util::StreamExt;
|
||||
use k8s_openapi::{
|
||||
ClusterResourceScope, NamespaceResourceScope,
|
||||
api::{apps::v1::Deployment, core::v1::Pod},
|
||||
};
|
||||
use kube::runtime::conditions;
|
||||
use kube::runtime::wait::await_condition;
|
||||
use kube::{
|
||||
Api, Client, Config, Error, Resource,
|
||||
api::{Patch, PatchParams},
|
||||
Client, Config, Error, Resource,
|
||||
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
|
||||
config::{KubeConfigOptions, Kubeconfig},
|
||||
core::ErrorResponse,
|
||||
runtime::reflector::Lookup,
|
||||
};
|
||||
use log::{debug, error, trace};
|
||||
use serde::de::DeserializeOwned;
|
||||
use similar::TextDiff;
|
||||
use similar::{DiffableStr, TextDiff};
|
||||
|
||||
#[derive(new)]
|
||||
pub struct K8sClient {
|
||||
@ -22,6 +29,88 @@ impl K8sClient {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn wait_until_deployment_ready(
|
||||
&self,
|
||||
name: String,
|
||||
namespace: Option<&str>,
|
||||
timeout: Option<u64>,
|
||||
) -> Result<(), String> {
|
||||
let api: Api<Deployment>;
|
||||
|
||||
if let Some(ns) = namespace {
|
||||
api = Api::namespaced(self.client.clone(), ns);
|
||||
} else {
|
||||
api = Api::default_namespaced(self.client.clone());
|
||||
}
|
||||
|
||||
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
|
||||
let t = if let Some(t) = timeout { t } else { 300 };
|
||||
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
|
||||
|
||||
if let Ok(r) = res {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err("timed out while waiting for deployment".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
|
||||
pub async fn exec_app(
|
||||
&self,
|
||||
name: String,
|
||||
namespace: Option<&str>,
|
||||
command: Vec<&str>,
|
||||
) -> Result<(), String> {
|
||||
let api: Api<Pod>;
|
||||
|
||||
if let Some(ns) = namespace {
|
||||
api = Api::namespaced(self.client.clone(), ns);
|
||||
} else {
|
||||
api = Api::default_namespaced(self.client.clone());
|
||||
}
|
||||
let pod_list = api
|
||||
.list(&ListParams::default().labels(format!("app.kubernetes.io/name={name}").as_str()))
|
||||
.await
|
||||
.expect("couldn't get list of pods");
|
||||
|
||||
let res = api
|
||||
.exec(
|
||||
pod_list
|
||||
.items
|
||||
.first()
|
||||
.expect("couldn't get pod")
|
||||
.name()
|
||||
.expect("couldn't get pod name")
|
||||
.into_owned()
|
||||
.as_str(),
|
||||
command,
|
||||
&AttachParams::default(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => return Err(e.to_string()),
|
||||
Ok(mut process) => {
|
||||
let status = process
|
||||
.take_status()
|
||||
.expect("Couldn't get status")
|
||||
.await
|
||||
.expect("Couldn't unwrap status");
|
||||
|
||||
if let Some(s) = status.status {
|
||||
debug!("Status: {}", s);
|
||||
if s == "Success" {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(s);
|
||||
}
|
||||
} else {
|
||||
return Err("Couldn't get inner status of pod exec".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a resource in namespace
|
||||
///
|
||||
/// See `kubectl apply` for more information on the expected behavior of this function
|
||||
|
@ -1,8 +1,7 @@
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::modules::monitoring::{
|
||||
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup,
|
||||
kube_prometheus::types::{AlertManagerAdditionalPromRules, AlertManagerChannelConfig},
|
||||
use crate::modules::monitoring::kube_prometheus::types::{
|
||||
AlertManagerAdditionalPromRules, AlertManagerChannelConfig,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
@ -1,3 +1,4 @@
|
||||
pub mod alert_channel;
|
||||
pub mod alert_rule;
|
||||
pub mod kube_prometheus;
|
||||
pub mod ntfy;
|
||||
|
1
harmony/src/modules/monitoring/ntfy/helm/mod.rs
Normal file
1
harmony/src/modules/monitoring/ntfy/helm/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod ntfy_helm_chart;
|
83
harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs
Normal file
83
harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs
Normal file
@ -0,0 +1,83 @@
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
|
||||
|
||||
pub fn ntfy_helm_chart_score(namespace: String) -> HelmChartScore {
|
||||
let values = format!(
|
||||
r#"
|
||||
replicaCount: 1
|
||||
|
||||
image:
|
||||
repository: binwiederhier/ntfy
|
||||
pullPolicy: IfNotPresent
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: "v2.12.0"
|
||||
|
||||
serviceAccount:
|
||||
# Specifies whether a service account should be created
|
||||
create: true
|
||||
# Annotations to add to the service account
|
||||
# annotations:
|
||||
# The name of the service account to use.
|
||||
# If not set and create is true, a name is generated using the fullname template
|
||||
# name: ""
|
||||
|
||||
service:
|
||||
type: ClusterIP
|
||||
port: 80
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
# annotations:
|
||||
# kubernetes.io/ingress.class: nginx
|
||||
# kubernetes.io/tls-acme: "true"
|
||||
hosts:
|
||||
- host: ntfy.host.com
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
tls: []
|
||||
# - secretName: chart-example-tls
|
||||
# hosts:
|
||||
# - chart-example.local
|
||||
|
||||
|
||||
autoscaling:
|
||||
enabled: false
|
||||
|
||||
config:
|
||||
enabled: true
|
||||
data:
|
||||
# base-url: "https://ntfy.something.com"
|
||||
auth-file: "/var/cache/ntfy/user.db"
|
||||
auth-default-access: "deny-all"
|
||||
cache-file: "/var/cache/ntfy/cache.db"
|
||||
attachment-cache-dir: "/var/cache/ntfy/attachments"
|
||||
behind-proxy: true
|
||||
# web-root: "disable"
|
||||
enable-signup: false
|
||||
enable-login: "true"
|
||||
|
||||
persistence:
|
||||
enabled: true
|
||||
size: 200Mi
|
||||
"#,
|
||||
);
|
||||
|
||||
HelmChartScore {
|
||||
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
|
||||
release_name: NonBlankString::from_str("ntfy").unwrap(),
|
||||
chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(),
|
||||
chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()),
|
||||
values_overrides: None,
|
||||
values_yaml: Some(values.to_string()),
|
||||
create_namespace: true,
|
||||
install_only: false,
|
||||
repository: Some(HelmRepository::new(
|
||||
"sarab97".to_string(),
|
||||
url::Url::parse("https://charts.sarabsingh.com").unwrap(),
|
||||
true,
|
||||
)),
|
||||
}
|
||||
}
|
2
harmony/src/modules/monitoring/ntfy/mod.rs
Normal file
2
harmony/src/modules/monitoring/ntfy/mod.rs
Normal file
@ -0,0 +1,2 @@
|
||||
pub mod helm;
|
||||
pub mod ntfy;
|
169
harmony/src/modules/monitoring/ntfy/ntfy.rs
Normal file
169
harmony/src/modules/monitoring/ntfy/ntfy.rs
Normal file
@ -0,0 +1,169 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use strum::{Display, EnumString};
|
||||
|
||||
use crate::{
|
||||
data::{Id, Version},
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct NtfyScore {
|
||||
pub namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(NtfyInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("Ntfy")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct NtfyInterpret {
|
||||
pub score: NtfyScore,
|
||||
}
|
||||
|
||||
#[derive(Debug, EnumString, Display)]
|
||||
enum NtfyAccessMode {
|
||||
#[strum(serialize = "read-write", serialize = "rw", to_string = "read-write")]
|
||||
ReadWrite,
|
||||
#[strum(
|
||||
serialize = "read-only",
|
||||
serialize = "ro",
|
||||
serialize = "read",
|
||||
to_string = "read-only"
|
||||
)]
|
||||
ReadOnly,
|
||||
#[strum(
|
||||
serialize = "write-only",
|
||||
serialize = "wo",
|
||||
serialize = "write",
|
||||
to_string = "write-only"
|
||||
)]
|
||||
WriteOnly,
|
||||
#[strum(serialize = "none", to_string = "deny")]
|
||||
Deny,
|
||||
}
|
||||
|
||||
#[derive(Debug, EnumString, Display)]
|
||||
enum NtfyRole {
|
||||
#[strum(serialize = "user", to_string = "user")]
|
||||
User,
|
||||
#[strum(serialize = "admin", to_string = "admin")]
|
||||
Admin,
|
||||
}
|
||||
|
||||
impl NtfyInterpret {
|
||||
taha marked this conversation as resolved
|
||||
async fn add_user(
|
||||
&self,
|
||||
k8s_client: Arc<K8sClient>,
|
||||
username: &str,
|
||||
password: &str,
|
||||
role: Option<NtfyRole>,
|
||||
) -> Result<(), String> {
|
||||
let role = match role {
|
||||
Some(r) => r,
|
||||
None => NtfyRole::User,
|
||||
};
|
||||
|
||||
k8s_client
|
||||
.exec_app(
|
||||
"ntfy".to_string(),
|
||||
Some(&self.score.namespace),
|
||||
vec![
|
||||
"sh",
|
||||
"-c",
|
||||
format!("NTFY_PASSWORD={password} ntfy user add --role={role} {username}")
|
||||
.as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_access(
|
||||
&self,
|
||||
k8s_client: Arc<K8sClient>,
|
||||
username: &str,
|
||||
topic: &str,
|
||||
mode: NtfyAccessMode,
|
||||
) -> Result<(), String> {
|
||||
k8s_client
|
||||
.exec_app(
|
||||
"ntfy".to_string(),
|
||||
Some(&self.score.namespace),
|
||||
vec![
|
||||
"sh",
|
||||
"-c",
|
||||
format!("ntfy access {username} {topic} {mode}").as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy
|
||||
#[async_trait]
|
||||
impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for NtfyInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
ntfy_helm_chart_score(self.score.namespace.clone())
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?;
|
||||
|
||||
debug!("installed ntfy helm chart");
|
||||
let client = topology
|
||||
.k8s_client()
|
||||
.await
|
||||
.expect("couldn't get k8s client");
|
||||
|
||||
client
|
||||
.wait_until_deployment_ready(
|
||||
"ntfy".to_string(),
|
||||
Some(&self.score.namespace.as_str()),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
debug!("created k8s client");
|
||||
|
||||
self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin))
|
||||
.await?;
|
||||
|
||||
debug!("exec into pod done");
|
||||
|
||||
Ok(Outcome::success("installed ntfy".to_string()))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
}
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user
Explain why you need ntfy interpret : we don't just install it, we also bootstrap the configuration