feat(external-endpoint): Implement heartbeat (#1173)

Fixes #741
This commit is contained in:
TwiN
2025-07-30 12:18:10 -04:00
committed by GitHub
parent aa08321239
commit 6f9a2c7c32
10 changed files with 220 additions and 8 deletions

View File

@@ -306,13 +306,15 @@ For instance:
- You can implement your own monitoring system while using Gatus as the dashboard
| Parameter | Description | Default |
|:-------------------------------|:-----------------------------------------------------------------------------------------------------------------------|:--------------|
|:------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------|:---------------|
| `external-endpoints` | List of endpoints to monitor. | `[]` |
| `external-endpoints[].enabled` | Whether to monitor the endpoint. | `true` |
| `external-endpoints[].name` | Name of the endpoint. Can be anything. | Required `""` |
| `external-endpoints[].group` | Group name. Used to group multiple endpoints together on the dashboard. <br />See [Endpoint groups](#endpoint-groups). | `""` |
| `external-endpoints[].token` | Bearer token required to push status to. | Required `""` |
| `external-endpoints[].alerts` | List of all alerts for a given endpoint. <br />See [Alerting](#alerting). | `[]` |
| `external-endpoints[].heartbeat` | Heartbeat configuration for monitoring when the external endpoint stops sending updates. | `{}` |
| `external-endpoints[].heartbeat.interval` | Expected interval between updates. If no update is received within this interval, alerts will be triggered. Must be at least 10s. | `0` (disabled) |
Example:
```yaml
@@ -320,6 +322,8 @@ external-endpoints:
- name: ext-ep-test
group: core
token: "potato"
heartbeat:
interval: 30m # Automatically create a failure if no update is received within 30 minutes
alerts:
- type: discord
description: "healthcheck failed"

View File

@@ -7,6 +7,7 @@ import (
"github.com/TwiN/gatus/v5/config"
"github.com/TwiN/gatus/v5/config/endpoint"
"github.com/TwiN/gatus/v5/metrics"
"github.com/TwiN/gatus/v5/storage/store"
"github.com/TwiN/gatus/v5/storage/store/common"
"github.com/TwiN/gatus/v5/watchdog"
@@ -72,6 +73,9 @@ func CreateExternalEndpointResult(cfg *config.Config) fiber.Handler {
externalEndpoint.NumberOfSuccessesInARow = convertedEndpoint.NumberOfSuccessesInARow
externalEndpoint.NumberOfFailuresInARow = convertedEndpoint.NumberOfFailuresInARow
}
if cfg.Metrics {
metrics.PublishMetricsForEndpoint(convertedEndpoint, result)
}
// Return the result
return c.Status(200).SendString("")
}

View File

@@ -2,13 +2,19 @@ package endpoint
import (
"errors"
"time"
"github.com/TwiN/gatus/v5/alerting/alert"
"github.com/TwiN/gatus/v5/config/endpoint/heartbeat"
"github.com/TwiN/gatus/v5/config/maintenance"
)
var (
// ErrExternalEndpointWithNoToken is the error with which Gatus will panic if an external endpoint is configured without a token.
ErrExternalEndpointWithNoToken = errors.New("you must specify a token for each external endpoint")
// ErrExternalEndpointHeartbeatIntervalTooLow is the error with which Gatus will panic if an external endpoint's heartbeat interval is less than 10 seconds.
ErrExternalEndpointHeartbeatIntervalTooLow = errors.New("heartbeat interval must be at least 10 seconds")
)
// ExternalEndpoint is an endpoint whose result is pushed from outside Gatus, which means that
@@ -30,6 +36,12 @@ type ExternalEndpoint struct {
// Alerts is the alerting configuration for the endpoint in case of failure
Alerts []*alert.Alert `yaml:"alerts,omitempty"`
// MaintenanceWindow is the configuration for per-endpoint maintenance windows
MaintenanceWindows []*maintenance.Config `yaml:"maintenance-windows,omitempty"`
// Heartbeat is the configuration that checks if the external endpoint has received new results when it should have.
Heartbeat heartbeat.Config `yaml:"heartbeat,omitempty"`
// NumberOfFailuresInARow is the number of unsuccessful evaluations in a row
NumberOfFailuresInARow int `yaml:"-"`
@@ -45,6 +57,10 @@ func (externalEndpoint *ExternalEndpoint) ValidateAndSetDefaults() error {
if len(externalEndpoint.Token) == 0 {
return ErrExternalEndpointWithNoToken
}
if externalEndpoint.Heartbeat.Interval != 0 && externalEndpoint.Heartbeat.Interval < 10*time.Second {
// If the heartbeat interval is set (non-0), it must be at least 10 seconds.
return ErrExternalEndpointHeartbeatIntervalTooLow
}
return nil
}

View File

@@ -0,0 +1,11 @@
package heartbeat
import "time"
// Config used to check if the external endpoint has received new results when it should have.
// This configuration is used to trigger alerts when an external endpoint has no new results for a defined period of time
type Config struct {
// Interval is the time interval at which Gatus verifies whether the external endpoint has received new results
// If no new result is received within the interval, the endpoint is marked as failed and alerts are triggered
Interval time.Duration `yaml:"interval"`
}

View File

@@ -211,6 +211,23 @@ func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.En
return 0
}
// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp
func (s *Store) HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) {
s.RLock()
defer s.RUnlock()
endpointStatus := s.cache.GetValue(key)
if endpointStatus == nil {
// If no endpoint exists, there's no newer status, so return false instead of an error
return false, nil
}
for _, result := range endpointStatus.(*endpoint.Status).Results {
if result.Timestamp.After(timestamp) {
return true, nil
}
}
return false, nil
}
// Clear deletes everything from the store
func (s *Store) Clear() {
s.cache.Clear()

View File

@@ -84,6 +84,7 @@ var (
// This test is simply an extra sanity check
func TestStore_SanityCheck(t *testing.T) {
store, _ := NewStore(storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents)
defer store.Clear()
defer store.Close()
store.Insert(&testEndpoint, &testSuccessfulResult)
endpointStatuses, _ := store.GetAllEndpointStatuses(paging.NewEndpointStatusParams())
@@ -134,3 +135,30 @@ func TestStore_Save(t *testing.T) {
store.Clear()
store.Close()
}
func TestStore_HasEndpointStatusNewerThan(t *testing.T) {
store, _ := NewStore(storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents)
defer store.Clear()
defer store.Close()
// Insert a result
err := store.Insert(&testEndpoint, &testSuccessfulResult)
if err != nil {
t.Fatalf("expected no error while inserting result, got %v", err)
}
// Check with a timestamp in the past
hasNewerStatus, err := store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-time.Hour))
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if !hasNewerStatus {
t.Fatal("expected to have a newer status, but didn't")
}
// Check with a timestamp in the future
hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(time.Hour))
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if hasNewerStatus {
t.Fatal("expected not to have a newer status, but did")
}
}

View File

@@ -514,6 +514,24 @@ func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.En
return int(rowsAffects)
}
// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp
func (s *Store) HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) {
if timestamp.IsZero() {
return false, errors.New("timestamp is zero")
}
var count int
err := s.db.QueryRow(
"SELECT COUNT(*) FROM endpoint_results WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1) AND timestamp > $2",
key,
timestamp.UTC(),
).Scan(&count)
if err != nil {
// If the endpoint doesn't exist, we return false instead of an error
return false, nil
}
return count > 0, nil
}
// Clear deletes everything from the store
func (s *Store) Clear() {
_, _ = s.db.Exec("DELETE FROM endpoints")

View File

@@ -853,3 +853,36 @@ func TestStore_DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(t *testing.T) {
t.Error("expected alert3 to exist for ep2")
}
}
func TestStore_HasEndpointStatusNewerThan(t *testing.T) {
store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_HasEndpointStatusNewerThan.db", false, storage.DefaultMaximumNumberOfResults, storage.DefaultMaximumNumberOfEvents)
defer store.Close()
// Insert an endpoint status
if err := store.Insert(&testEndpoint, &testSuccessfulResult); err != nil {
t.Fatal("expected no error, got", err.Error())
}
// Check if it has a status newer than 1 hour ago
hasNewerStatus, err := store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-time.Hour))
if err != nil {
t.Fatal("expected no error, got", err.Error())
}
if !hasNewerStatus {
t.Error("expected to have a newer status")
}
// Check if it has a status newer than 2 days ago
hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(-48*time.Hour))
if err != nil {
t.Fatal("expected no error, got", err.Error())
}
if !hasNewerStatus {
t.Error("expected to have a newer status")
}
// Check if there's a status newer than 1 hour in the future (silly test, but it should work)
hasNewerStatus, err = store.HasEndpointStatusNewerThan(testEndpoint.Key(), time.Now().Add(time.Hour))
if err != nil {
t.Fatal("expected no error, got", err.Error())
}
if hasNewerStatus {
t.Error("expected not to have a newer status in the future")
}
}

View File

@@ -57,6 +57,9 @@ type Store interface {
// This prevents triggered alerts that have been removed or modified from lingering in the database.
DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int
// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp
HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error)
// Clear deletes everything from the store
Clear()

View File

@@ -34,6 +34,14 @@ func Monitor(cfg *config.Config) {
go monitor(endpoint, cfg.Alerting, cfg.Maintenance, cfg.Connectivity, cfg.DisableMonitoringLock, cfg.Metrics, ctx)
}
}
for _, externalEndpoint := range cfg.ExternalEndpoints {
// Check if the external endpoint is enabled and is using heartbeat
// If the external endpoint does not use heartbeat, then it does not need to be monitored periodically, because
// alerting is checked every time an external endpoint is pushed to Gatus, unlike normal endpoints.
if externalEndpoint.IsEnabled() && externalEndpoint.Heartbeat.Interval > 0 {
go monitorExternalEndpointHeartbeat(externalEndpoint, cfg.Alerting, cfg.Maintenance, cfg.Connectivity, cfg.DisableMonitoringLock, cfg.Metrics, ctx)
}
}
}
// monitor a single endpoint in a loop
@@ -96,6 +104,76 @@ func execute(ep *endpoint.Endpoint, alertingConfig *alerting.Config, maintenance
logr.Debugf("[watchdog.execute] Waiting for interval=%s before monitoring group=%s endpoint=%s (key=%s) again", ep.Interval, ep.Group, ep.Name, ep.Key())
}
func monitorExternalEndpointHeartbeat(ee *endpoint.ExternalEndpoint, alertingConfig *alerting.Config, maintenanceConfig *maintenance.Config, connectivityConfig *connectivity.Config, disableMonitoringLock bool, enabledMetrics bool, ctx context.Context) {
ticker := time.NewTicker(ee.Heartbeat.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logr.Warnf("[watchdog.monitorExternalEndpointHeartbeat] Canceling current execution of group=%s; endpoint=%s; key=%s", ee.Group, ee.Name, ee.Key())
return
case <-ticker.C:
executeExternalEndpointHeartbeat(ee, alertingConfig, maintenanceConfig, connectivityConfig, disableMonitoringLock, enabledMetrics)
}
}
}
func executeExternalEndpointHeartbeat(ee *endpoint.ExternalEndpoint, alertingConfig *alerting.Config, maintenanceConfig *maintenance.Config, connectivityConfig *connectivity.Config, disableMonitoringLock bool, enabledMetrics bool) {
if !disableMonitoringLock {
// By placing the lock here, we prevent multiple endpoints from being monitored at the exact same time, which
// could cause performance issues and return inaccurate results
monitoringMutex.Lock()
defer monitoringMutex.Unlock()
}
// If there's a connectivity checker configured, check if Gatus has internet connectivity
if connectivityConfig != nil && connectivityConfig.Checker != nil && !connectivityConfig.Checker.IsConnected() {
logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] No connectivity; skipping execution")
return
}
logr.Debugf("[watchdog.monitorExternalEndpointHeartbeat] Checking heartbeat for group=%s; endpoint=%s; key=%s", ee.Group, ee.Name, ee.Key())
convertedEndpoint := ee.ToEndpoint()
hasReceivedResultWithinHeartbeatInterval, err := store.Get().HasEndpointStatusNewerThan(ee.Key(), time.Now().Add(-ee.Heartbeat.Interval))
if err != nil {
logr.Errorf("[watchdog.monitorExternalEndpointHeartbeat] Failed to check if endpoint has received a result within the heartbeat interval: %s", err.Error())
return
}
if hasReceivedResultWithinHeartbeatInterval {
// If we received a result within the heartbeat interval, we don't want to create a successful result, so we
// skip the rest. We don't have to worry about alerting or metrics, because if the previous heartbeat failed
// while this one succeeds, it implies that there was a new result pushed, and that result being pushed
// should've resolved the alert.
logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] Checked heartbeat for group=%s; endpoint=%s; key=%s; success=%v; errors=%d", ee.Group, ee.Name, ee.Key(), hasReceivedResultWithinHeartbeatInterval, 0)
return
}
// All code after this point assumes the heartbeat failed
result := &endpoint.Result{
Timestamp: time.Now(),
Success: false,
Errors: []string{"heartbeat: no update received within " + ee.Heartbeat.Interval.String()},
}
if enabledMetrics {
metrics.PublishMetricsForEndpoint(convertedEndpoint, result)
}
UpdateEndpointStatuses(convertedEndpoint, result)
logr.Infof("[watchdog.monitorExternalEndpointHeartbeat] Checked heartbeat for group=%s; endpoint=%s; key=%s; success=%v; errors=%d; duration=%s", ee.Group, ee.Name, ee.Key(), result.Success, len(result.Errors), result.Duration.Round(time.Millisecond))
inEndpointMaintenanceWindow := false
for _, maintenanceWindow := range ee.MaintenanceWindows {
if maintenanceWindow.IsUnderMaintenance() {
logr.Debug("[watchdog.monitorExternalEndpointHeartbeat] Under endpoint maintenance window")
inEndpointMaintenanceWindow = true
}
}
if !maintenanceConfig.IsUnderMaintenance() && !inEndpointMaintenanceWindow {
HandleAlerting(convertedEndpoint, result, alertingConfig)
// Sync the failure/success counters back to the external endpoint
ee.NumberOfSuccessesInARow = convertedEndpoint.NumberOfSuccessesInARow
ee.NumberOfFailuresInARow = convertedEndpoint.NumberOfFailuresInARow
} else {
logr.Debug("[watchdog.monitorExternalEndpointHeartbeat] Not handling alerting because currently in the maintenance window")
}
logr.Debugf("[watchdog.monitorExternalEndpointHeartbeat] Waiting for interval=%s before checking heartbeat for group=%s endpoint=%s (key=%s) again", ee.Heartbeat.Interval, ee.Group, ee.Name, ee.Key())
}
// UpdateEndpointStatuses updates the slice of endpoint statuses
func UpdateEndpointStatuses(ep *endpoint.Endpoint, result *endpoint.Result) {
if err := store.Get().Insert(ep, result); err != nil {