From 6f9a2c7c321605310785f4cdccac047fa3e5528a Mon Sep 17 00:00:00 2001 From: TwiN Date: Wed, 30 Jul 2025 12:18:10 -0400 Subject: [PATCH] feat(external-endpoint): Implement heartbeat (#1173) Fixes #741 --- README.md | 20 ++++--- api/external_endpoint.go | 4 ++ config/endpoint/external_endpoint.go | 16 ++++++ config/endpoint/heartbeat/heartbeat.go | 11 ++++ storage/store/memory/memory.go | 17 ++++++ storage/store/memory/memory_test.go | 28 +++++++++ storage/store/sql/sql.go | 18 ++++++ storage/store/sql/sql_test.go | 33 +++++++++++ storage/store/store.go | 3 + watchdog/watchdog.go | 78 ++++++++++++++++++++++++++ 10 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 config/endpoint/heartbeat/heartbeat.go diff --git a/README.md b/README.md index 62a54464..32d3332b 100644 --- a/README.md +++ b/README.md @@ -305,14 +305,16 @@ For instance: - You can monitor services that are not supported by Gatus - You can implement your own monitoring system while using Gatus as the dashboard -| Parameter | Description | Default | -|:-------------------------------|:-----------------------------------------------------------------------------------------------------------------------|:--------------| -| `external-endpoints` | List of endpoints to monitor. | `[]` | -| `external-endpoints[].enabled` | Whether to monitor the endpoint. | `true` | -| `external-endpoints[].name` | Name of the endpoint. Can be anything. | Required `""` | -| `external-endpoints[].group` | Group name. Used to group multiple endpoints together on the dashboard.
See [Endpoint groups](#endpoint-groups). | `""` | -| `external-endpoints[].token` | Bearer token required to push status to. | Required `""` | -| `external-endpoints[].alerts` | List of all alerts for a given endpoint.
See [Alerting](#alerting). | `[]` | +| Parameter | Description | Default | +|:------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------|:---------------| +| `external-endpoints` | List of endpoints to monitor. | `[]` | +| `external-endpoints[].enabled` | Whether to monitor the endpoint. | `true` | +| `external-endpoints[].name` | Name of the endpoint. Can be anything. | Required `""` | +| `external-endpoints[].group` | Group name. Used to group multiple endpoints together on the dashboard.
See [Endpoint groups](#endpoint-groups). | `""` | +| `external-endpoints[].token` | Bearer token required to push status to. | Required `""` | +| `external-endpoints[].alerts` | List of all alerts for a given endpoint.
See [Alerting](#alerting). | `[]` | +| `external-endpoints[].heartbeat` | Heartbeat configuration for monitoring when the external endpoint stops sending updates. | `{}` | +| `external-endpoints[].heartbeat.interval` | Expected interval between updates. If no update is received within this interval, alerts will be triggered. Must be at least 10s. | `0` (disabled) | Example: ```yaml @@ -320,6 +322,8 @@ external-endpoints: - name: ext-ep-test group: core token: "potato" + heartbeat: + interval: 30m # Automatically create a failure if no update is received within 30 minutes alerts: - type: discord description: "healthcheck failed" diff --git a/api/external_endpoint.go b/api/external_endpoint.go index 18c08802..c2c0fce6 100644 --- a/api/external_endpoint.go +++ b/api/external_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/TwiN/gatus/v5/config" "github.com/TwiN/gatus/v5/config/endpoint" + "github.com/TwiN/gatus/v5/metrics" "github.com/TwiN/gatus/v5/storage/store" "github.com/TwiN/gatus/v5/storage/store/common" "github.com/TwiN/gatus/v5/watchdog" @@ -72,6 +73,9 @@ func CreateExternalEndpointResult(cfg *config.Config) fiber.Handler { externalEndpoint.NumberOfSuccessesInARow = convertedEndpoint.NumberOfSuccessesInARow externalEndpoint.NumberOfFailuresInARow = convertedEndpoint.NumberOfFailuresInARow } + if cfg.Metrics { + metrics.PublishMetricsForEndpoint(convertedEndpoint, result) + } // Return the result return c.Status(200).SendString("") } diff --git a/config/endpoint/external_endpoint.go b/config/endpoint/external_endpoint.go index 58f37fed..3563c54e 100644 --- a/config/endpoint/external_endpoint.go +++ b/config/endpoint/external_endpoint.go @@ -2,13 +2,19 @@ package endpoint import ( "errors" + "time" "github.com/TwiN/gatus/v5/alerting/alert" + "github.com/TwiN/gatus/v5/config/endpoint/heartbeat" + "github.com/TwiN/gatus/v5/config/maintenance" ) var ( // ErrExternalEndpointWithNoToken is the error with which Gatus will panic if an external endpoint is configured without a token. ErrExternalEndpointWithNoToken = errors.New("you must specify a token for each external endpoint") + + // ErrExternalEndpointHeartbeatIntervalTooLow is the error with which Gatus will panic if an external endpoint's heartbeat interval is less than 10 seconds. + ErrExternalEndpointHeartbeatIntervalTooLow = errors.New("heartbeat interval must be at least 10 seconds") ) // ExternalEndpoint is an endpoint whose result is pushed from outside Gatus, which means that @@ -30,6 +36,12 @@ type ExternalEndpoint struct { // Alerts is the alerting configuration for the endpoint in case of failure Alerts []*alert.Alert `yaml:"alerts,omitempty"` + // MaintenanceWindow is the configuration for per-endpoint maintenance windows + MaintenanceWindows []*maintenance.Config `yaml:"maintenance-windows,omitempty"` + + // Heartbeat is the configuration that checks if the external endpoint has received new results when it should have. + Heartbeat heartbeat.Config `yaml:"heartbeat,omitempty"` + // NumberOfFailuresInARow is the number of unsuccessful evaluations in a row NumberOfFailuresInARow int `yaml:"-"` @@ -45,6 +57,10 @@ func (externalEndpoint *ExternalEndpoint) ValidateAndSetDefaults() error { if len(externalEndpoint.Token) == 0 { return ErrExternalEndpointWithNoToken } + if externalEndpoint.Heartbeat.Interval != 0 && externalEndpoint.Heartbeat.Interval < 10*time.Second { + // If the heartbeat interval is set (non-0), it must be at least 10 seconds. + return ErrExternalEndpointHeartbeatIntervalTooLow + } return nil } diff --git a/config/endpoint/heartbeat/heartbeat.go b/config/endpoint/heartbeat/heartbeat.go new file mode 100644 index 00000000..c27b5717 --- /dev/null +++ b/config/endpoint/heartbeat/heartbeat.go @@ -0,0 +1,11 @@ +package heartbeat + +import "time" + +// Config used to check if the external endpoint has received new results when it should have. +// This configuration is used to trigger alerts when an external endpoint has no new results for a defined period of time +type Config struct { + // Interval is the time interval at which Gatus verifies whether the external endpoint has received new results + // If no new result is received within the interval, the endpoint is marked as failed and alerts are triggered + Interval time.Duration `yaml:"interval"` +} diff --git a/storage/store/memory/memory.go b/storage/store/memory/memory.go index 41714dcd..e47b9ee8 100644 --- a/storage/store/memory/memory.go +++ b/storage/store/memory/memory.go @@ -211,6 +211,23 @@ func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.En return 0 } +// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp +func (s *Store) HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) { + s.RLock() + defer s.RUnlock() + endpointStatus := s.cache.GetValue(key) + if endpointStatus == nil { + // If no endpoint exists, there's no newer status, so return false instead of an error + return false, nil + } + for _, result := range endpointStatus.(*endpoint.Status).Results { + if result.Timestamp.After(timestamp) { + return true, nil + } + } + return false, nil +} + // Clear deletes everything from the store func (s *Store) Clear() { s.cache.Clear() diff --git a/storage/store/memory/memory_test.go b/storage/store/memory/memory_test.go index 544577bf..2f21528d 100644 --- a/storage/store/memory/memory_test.go +++ b/storage/store/memory/memory_test.go @@ -84,6 +84,7 @@ var ( // This test is simply an extra sanity check func TestStore_SanityCheck(t *testing.T) { store, _ := NewStore(storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents) + defer store.Clear() defer store.Close() store.Insert(&testEndpoint, &testSuccessfulResult) endpointStatuses, _ := store.GetAllEndpointStatuses(paging.NewEndpointStatusParams()) @@ -134,3 +135,30 @@ func TestStore_Save(t *testing.T) { store.Clear() store.Close() } + +func TestStore_HasEndpointStatusNewerThan(t *testing.T) { + store, _ := NewStore(storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents) + defer store.Clear() + defer store.Close() + // Insert a result + err := store.Insert(&testEndpoint, &testSuccessfulResult) + if err != nil { + t.Fatalf("expected no error while inserting result, got %v", err) + } + // Check with a timestamp in the past + hasNewerStatus, err := store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-time.Hour)) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if !hasNewerStatus { + t.Fatal("expected to have a newer status, but didn't") + } + // Check with a timestamp in the future + hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(time.Hour)) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if hasNewerStatus { + t.Fatal("expected not to have a newer status, but did") + } +} diff --git a/storage/store/sql/sql.go b/storage/store/sql/sql.go index cb0dd058..8d3312d2 100644 --- a/storage/store/sql/sql.go +++ b/storage/store/sql/sql.go @@ -514,6 +514,24 @@ func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.En return int(rowsAffects) } +// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp +func (s *Store) HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) { + if timestamp.IsZero() { + return false, errors.New("timestamp is zero") + } + var count int + err := s.db.QueryRow( + "SELECT COUNT(*) FROM endpoint_results WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1) AND timestamp > $2", + key, + timestamp.UTC(), + ).Scan(&count) + if err != nil { + // If the endpoint doesn't exist, we return false instead of an error + return false, nil + } + return count > 0, nil +} + // Clear deletes everything from the store func (s *Store) Clear() { _, _ = s.db.Exec("DELETE FROM endpoints") diff --git a/storage/store/sql/sql_test.go b/storage/store/sql/sql_test.go index 5dbfb899..a9eb9ed1 100644 --- a/storage/store/sql/sql_test.go +++ b/storage/store/sql/sql_test.go @@ -853,3 +853,36 @@ func TestStore_DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(t *testing.T) { t.Error("expected alert3 to exist for ep2") } } + +func TestStore_HasEndpointStatusNewerThan(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_HasEndpointStatusNewerThan.db", false, storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents) + defer store.Close() + // Insert an endpoint status + if err := store.Insert(&testEndpoint, &testSuccessfulResult); err != nil { + t.Fatal("expected no error, got", err.Error()) + } + // Check if it has a status newer than 1 hour ago + hasNewerStatus, err := store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-time.Hour)) + if err != nil { + t.Fatal("expected no error, got", err.Error()) + } + if !hasNewerStatus { + t.Error("expected to have a newer status") + } + // Check if it has a status newer than 2 days ago + hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-48*time.Hour)) + if err != nil { + t.Fatal("expected no error, got", err.Error()) + } + if !hasNewerStatus { + t.Error("expected to have a newer status") + } + // Check if there's a status newer than 1 hour in the future (silly test, but it should work) + hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(time.Hour)) + if err != nil { + t.Fatal("expected no error, got", err.Error()) + } + if hasNewerStatus { + t.Error("expected not to have a newer status in the future") + } +} diff --git a/storage/store/store.go b/storage/store/store.go index f7b581ec..7fb0ae06 100644 --- a/storage/store/store.go +++ b/storage/store/store.go @@ -57,6 +57,9 @@ type Store interface { // This prevents triggered alerts that have been removed or modified from lingering in the database. DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int + // HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp + HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) + // Clear deletes everything from the store Clear() diff --git a/watchdog/watchdog.go b/watchdog/watchdog.go index 7c5a1548..1a8f14c4 100644 --- a/watchdog/watchdog.go +++ b/watchdog/watchdog.go @@ -34,6 +34,14 @@ func Monitor(cfg *config.Config) { go monitor(endpoint, cfg.Alerting, cfg.Maintenance, cfg.Connectivity, cfg.DisableMonitoringLock, cfg.Metrics, ctx) } } + for _, externalEndpoint := range cfg.ExternalEndpoints { + // Check if the external endpoint is enabled and is using heartbeat + // If the external endpoint does not use heartbeat, then it does not need to be monitored periodically, because + // alerting is checked every time an external endpoint is pushed to Gatus, unlike normal endpoints. + if externalEndpoint.IsEnabled() && externalEndpoint.Heartbeat.Interval > 0 { + go monitorExternalEndpointHeartbeat(externalEndpoint, cfg.Alerting, cfg.Maintenance, cfg.Connectivity, cfg.DisableMonitoringLock, cfg.Metrics, ctx) + } + } } // monitor a single endpoint in a loop @@ -96,6 +104,76 @@ func execute(ep *endpoint.Endpoint, alertingConfig *alerting.Config, maintenance logr.Debugf("[watchdog.execute] Waiting for interval=%s before monitoring group=%s endpoint=%s (key=%s) again", ep.Interval, ep.Group, ep.Name, ep.Key()) } +func monitorExternalEndpointHeartbeat(ee *endpoint.ExternalEndpoint, alertingConfig *alerting.Config, maintenanceConfig *maintenance.Config, connectivityConfig *connectivity.Config, disableMonitoringLock bool, enabledMetrics bool, ctx context.Context) { + ticker := time.NewTicker(ee.Heartbeat.Interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + logr.Warnf("[watchdog.monitorExternalEndpointHeartbeat] Canceling current execution of group=%s; endpoint=%s; key=%s", ee.Group, ee.Name, ee.Key()) + return + case <-ticker.C: + executeExternalEndpointHeartbeat(ee, alertingConfig, maintenanceConfig, connectivityConfig, disableMonitoringLock, enabledMetrics) + } + } +} + +func executeExternalEndpointHeartbeat(ee *endpoint.ExternalEndpoint, alertingConfig *alerting.Config, maintenanceConfig *maintenance.Config, connectivityConfig *connectivity.Config, disableMonitoringLock bool, enabledMetrics bool) { + if !disableMonitoringLock { + // By placing the lock here, we prevent multiple endpoints from being monitored at the exact same time, which + // could cause performance issues and return inaccurate results + monitoringMutex.Lock() + defer monitoringMutex.Unlock() + } + // If there's a connectivity checker configured, check if Gatus has internet connectivity + if connectivityConfig != nil && connectivityConfig.Checker != nil && !connectivityConfig.Checker.IsConnected() { + logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] No connectivity; skipping execution") + return + } + logr.Debugf("[watchdog.monitorExternalEndpointHeartbeat] Checking heartbeat for group=%s; endpoint=%s; key=%s", ee.Group, ee.Name, ee.Key()) + convertedEndpoint := ee.ToEndpoint() + hasReceivedResultWithinHeartbeatInterval, err := store.Get().HasEndpointStatusNewerThan(ee.Key(), time.Now().Add(-ee.Heartbeat.Interval)) + if err != nil { + logr.Errorf("[watchdog.monitorExternalEndpointHeartbeat] Failed to check if endpoint has received a result within the heartbeat interval: %s", err.Error()) + return + } + if hasReceivedResultWithinHeartbeatInterval { + // If we received a result within the heartbeat interval, we don't want to create a successful result, so we + // skip the rest. We don't have to worry about alerting or metrics, because if the previous heartbeat failed + // while this one succeeds, it implies that there was a new result pushed, and that result being pushed + // should've resolved the alert. + logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] Checked heartbeat for group=%s; endpoint=%s; key=%s; success=%v; errors=%d", ee.Group, ee.Name, ee.Key(), hasReceivedResultWithinHeartbeatInterval, 0) + return + } + // All code after this point assumes the heartbeat failed + result := &endpoint.Result{ + Timestamp: time.Now(), + Success: false, + Errors: []string{"heartbeat: no update received within " + ee.Heartbeat.Interval.String()}, + } + if enabledMetrics { + metrics.PublishMetricsForEndpoint(convertedEndpoint, result) + } + UpdateEndpointStatuses(convertedEndpoint, result) + logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] Checked heartbeat for group=%s; endpoint=%s; key=%s; success=%v; errors=%d; duration=%s", ee.Group, ee.Name, ee.Key(), result.Success, len(result.Errors), result.Duration.Round(time.Millisecond)) + inEndpointMaintenanceWindow := false + for _, maintenanceWindow := range ee.MaintenanceWindows { + if maintenanceWindow.IsUnderMaintenance() { + logr.Debug("[watchdog.monitorExternalEndpointHeartbeat] Under endpoint maintenance window") + inEndpointMaintenanceWindow = true + } + } + if !maintenanceConfig.IsUnderMaintenance() && !inEndpointMaintenanceWindow { + HandleAlerting(convertedEndpoint, result, alertingConfig) + // Sync the failure/success counters back to the external endpoint + ee.NumberOfSuccessesInARow = convertedEndpoint.NumberOfSuccessesInARow + ee.NumberOfFailuresInARow = convertedEndpoint.NumberOfFailuresInARow + } else { + logr.Debug("[watchdog.monitorExternalEndpointHeartbeat] Not handling alerting because currently in the maintenance window") + } + logr.Debugf("[watchdog.monitorExternalEndpointHeartbeat] Waiting for interval=%s before checking heartbeat for group=%s endpoint=%s (key=%s) again", ee.Heartbeat.Interval, ee.Group, ee.Name, ee.Key()) +} + // UpdateEndpointStatuses updates the slice of endpoint statuses func UpdateEndpointStatuses(ep *endpoint.Endpoint, result *endpoint.Result) { if err := store.Get().Insert(ep, result); err != nil {