Files
gatus/storage/store/memory/memory.go
TwiN 8238a42a55 Revert "fix(storage): resolve race condition in memory store" (#1271)
Revert "fix(storage): resolve race condition in memory store (#1270)"

This reverts commit 83c4fac217.
2025-09-17 15:10:08 -04:00

338 lines
12 KiB
Go

package memory
import (
"sort"
"sync"
"time"
"github.com/TwiN/gatus/v5/alerting/alert"
"github.com/TwiN/gatus/v5/config/endpoint"
"github.com/TwiN/gatus/v5/config/key"
"github.com/TwiN/gatus/v5/config/suite"
"github.com/TwiN/gatus/v5/storage/store/common"
"github.com/TwiN/gatus/v5/storage/store/common/paging"
"github.com/TwiN/gocache/v2"
"github.com/TwiN/logr"
)
// Store that leverages gocache
type Store struct {
sync.RWMutex
endpointCache *gocache.Cache // Cache for endpoint statuses
suiteCache *gocache.Cache // Cache for suite statuses
maximumNumberOfResults int // maximum number of results that an endpoint can have
maximumNumberOfEvents int // maximum number of events that an endpoint can have
}
// NewStore creates a new store using gocache.Cache
//
// This store holds everything in memory, and if the file parameter is not blank,
// supports eventual persistence.
func NewStore(maximumNumberOfResults, maximumNumberOfEvents int) (*Store, error) {
store := &Store{
endpointCache: gocache.NewCache().WithMaxSize(gocache.NoMaxSize),
suiteCache: gocache.NewCache().WithMaxSize(gocache.NoMaxSize),
maximumNumberOfResults: maximumNumberOfResults,
maximumNumberOfEvents: maximumNumberOfEvents,
}
return store, nil
}
// GetAllEndpointStatuses returns all monitored endpoint.Status
// with a subset of endpoint.Result defined by the page and pageSize parameters
func (s *Store) GetAllEndpointStatuses(params *paging.EndpointStatusParams) ([]*endpoint.Status, error) {
s.RLock()
defer s.RUnlock()
allStatuses := s.endpointCache.GetAll()
pagedEndpointStatuses := make([]*endpoint.Status, 0, len(allStatuses))
for _, v := range allStatuses {
if status, ok := v.(*endpoint.Status); ok {
pagedEndpointStatuses = append(pagedEndpointStatuses, ShallowCopyEndpointStatus(status, params))
}
}
sort.Slice(pagedEndpointStatuses, func(i, j int) bool {
return pagedEndpointStatuses[i].Key < pagedEndpointStatuses[j].Key
})
return pagedEndpointStatuses, nil
}
// GetAllSuiteStatuses returns all monitored suite.Status
func (s *Store) GetAllSuiteStatuses(params *paging.SuiteStatusParams) ([]*suite.Status, error) {
s.RLock()
defer s.RUnlock()
suiteStatuses := make([]*suite.Status, 0)
for _, v := range s.suiteCache.GetAll() {
if status, ok := v.(*suite.Status); ok {
suiteStatuses = append(suiteStatuses, ShallowCopySuiteStatus(status, params))
}
}
sort.Slice(suiteStatuses, func(i, j int) bool {
return suiteStatuses[i].Key < suiteStatuses[j].Key
})
return suiteStatuses, nil
}
// GetEndpointStatus returns the endpoint status for a given endpoint name in the given group
func (s *Store) GetEndpointStatus(groupName, endpointName string, params *paging.EndpointStatusParams) (*endpoint.Status, error) {
return s.GetEndpointStatusByKey(key.ConvertGroupAndNameToKey(groupName, endpointName), params)
}
// GetEndpointStatusByKey returns the endpoint status for a given key
func (s *Store) GetEndpointStatusByKey(key string, params *paging.EndpointStatusParams) (*endpoint.Status, error) {
s.RLock()
defer s.RUnlock()
endpointStatus := s.endpointCache.GetValue(key)
if endpointStatus == nil {
return nil, common.ErrEndpointNotFound
}
return ShallowCopyEndpointStatus(endpointStatus.(*endpoint.Status), params), nil
}
// GetSuiteStatusByKey returns the suite status for a given key
func (s *Store) GetSuiteStatusByKey(key string, params *paging.SuiteStatusParams) (*suite.Status, error) {
s.RLock()
defer s.RUnlock()
suiteStatus := s.suiteCache.GetValue(key)
if suiteStatus == nil {
return nil, common.ErrSuiteNotFound
}
return ShallowCopySuiteStatus(suiteStatus.(*suite.Status), params), nil
}
// GetUptimeByKey returns the uptime percentage during a time range
func (s *Store) GetUptimeByKey(key string, from, to time.Time) (float64, error) {
if from.After(to) {
return 0, common.ErrInvalidTimeRange
}
s.RLock()
defer s.RUnlock()
endpointStatus := s.endpointCache.GetValue(key)
if endpointStatus == nil || endpointStatus.(*endpoint.Status).Uptime == nil {
return 0, common.ErrEndpointNotFound
}
successfulExecutions := uint64(0)
totalExecutions := uint64(0)
current := from
for to.Sub(current) >= 0 {
hourlyUnixTimestamp := current.Truncate(time.Hour).Unix()
hourlyStats := endpointStatus.(*endpoint.Status).Uptime.HourlyStatistics[hourlyUnixTimestamp]
if hourlyStats == nil || hourlyStats.TotalExecutions == 0 {
current = current.Add(time.Hour)
continue
}
successfulExecutions += hourlyStats.SuccessfulExecutions
totalExecutions += hourlyStats.TotalExecutions
current = current.Add(time.Hour)
}
if totalExecutions == 0 {
return 0, nil
}
return float64(successfulExecutions) / float64(totalExecutions), nil
}
// GetAverageResponseTimeByKey returns the average response time in milliseconds (value) during a time range
func (s *Store) GetAverageResponseTimeByKey(key string, from, to time.Time) (int, error) {
if from.After(to) {
return 0, common.ErrInvalidTimeRange
}
s.RLock()
defer s.RUnlock()
endpointStatus := s.endpointCache.GetValue(key)
if endpointStatus == nil || endpointStatus.(*endpoint.Status).Uptime == nil {
return 0, common.ErrEndpointNotFound
}
current := from
var totalExecutions, totalResponseTime uint64
for to.Sub(current) >= 0 {
hourlyUnixTimestamp := current.Truncate(time.Hour).Unix()
hourlyStats := endpointStatus.(*endpoint.Status).Uptime.HourlyStatistics[hourlyUnixTimestamp]
if hourlyStats == nil || hourlyStats.TotalExecutions == 0 {
current = current.Add(time.Hour)
continue
}
totalExecutions += hourlyStats.TotalExecutions
totalResponseTime += hourlyStats.TotalExecutionsResponseTime
current = current.Add(time.Hour)
}
if totalExecutions == 0 {
return 0, nil
}
return int(float64(totalResponseTime) / float64(totalExecutions)), nil
}
// GetHourlyAverageResponseTimeByKey returns a map of hourly (key) average response time in milliseconds (value) during a time range
func (s *Store) GetHourlyAverageResponseTimeByKey(key string, from, to time.Time) (map[int64]int, error) {
if from.After(to) {
return nil, common.ErrInvalidTimeRange
}
s.RLock()
defer s.RUnlock()
endpointStatus := s.endpointCache.GetValue(key)
if endpointStatus == nil || endpointStatus.(*endpoint.Status).Uptime == nil {
return nil, common.ErrEndpointNotFound
}
hourlyAverageResponseTimes := make(map[int64]int)
current := from
for to.Sub(current) >= 0 {
hourlyUnixTimestamp := current.Truncate(time.Hour).Unix()
hourlyStats := endpointStatus.(*endpoint.Status).Uptime.HourlyStatistics[hourlyUnixTimestamp]
if hourlyStats == nil || hourlyStats.TotalExecutions == 0 {
current = current.Add(time.Hour)
continue
}
hourlyAverageResponseTimes[hourlyUnixTimestamp] = int(float64(hourlyStats.TotalExecutionsResponseTime) / float64(hourlyStats.TotalExecutions))
current = current.Add(time.Hour)
}
return hourlyAverageResponseTimes, nil
}
// InsertEndpointResult adds the observed result for the specified endpoint into the store
func (s *Store) InsertEndpointResult(ep *endpoint.Endpoint, result *endpoint.Result) error {
endpointKey := ep.Key()
s.Lock()
status, exists := s.endpointCache.Get(endpointKey)
if !exists {
status = endpoint.NewStatus(ep.Group, ep.Name)
status.(*endpoint.Status).Events = append(status.(*endpoint.Status).Events, &endpoint.Event{
Type: endpoint.EventStart,
Timestamp: time.Now(),
})
}
AddResult(status.(*endpoint.Status), result, s.maximumNumberOfResults, s.maximumNumberOfEvents)
s.endpointCache.Set(endpointKey, status)
s.Unlock()
return nil
}
// InsertSuiteResult adds the observed result for the specified suite into the store
func (s *Store) InsertSuiteResult(su *suite.Suite, result *suite.Result) error {
s.Lock()
defer s.Unlock()
suiteKey := su.Key()
suiteStatus := s.suiteCache.GetValue(suiteKey)
if suiteStatus == nil {
suiteStatus = &suite.Status{
Name: su.Name,
Group: su.Group,
Key: su.Key(),
Results: []*suite.Result{},
}
logr.Debugf("[memory.InsertSuiteResult] Created new suite status for suiteKey=%s", suiteKey)
}
status := suiteStatus.(*suite.Status)
// Add the new result at the end (append like endpoint implementation)
status.Results = append(status.Results, result)
// Keep only the maximum number of results
if len(status.Results) > s.maximumNumberOfResults {
status.Results = status.Results[len(status.Results)-s.maximumNumberOfResults:]
}
s.suiteCache.Set(suiteKey, status)
logr.Debugf("[memory.InsertSuiteResult] Stored suite result for suiteKey=%s, total results=%d", suiteKey, len(status.Results))
return nil
}
// DeleteAllEndpointStatusesNotInKeys removes all Status that are not within the keys provided
func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
var keysToDelete []string
for _, existingKey := range s.endpointCache.GetKeysByPattern("*", 0) {
shouldDelete := true
for _, k := range keys {
if existingKey == k {
shouldDelete = false
break
}
}
if shouldDelete {
keysToDelete = append(keysToDelete, existingKey)
}
}
return s.endpointCache.DeleteAll(keysToDelete)
}
// DeleteAllSuiteStatusesNotInKeys removes all suite statuses that are not within the keys provided
func (s *Store) DeleteAllSuiteStatusesNotInKeys(keys []string) int {
s.Lock()
defer s.Unlock()
keysToKeep := make(map[string]bool, len(keys))
for _, k := range keys {
keysToKeep[k] = true
}
var keysToDelete []string
for existingKey := range s.suiteCache.GetAll() {
if !keysToKeep[existingKey] {
keysToDelete = append(keysToDelete, existingKey)
}
}
return s.suiteCache.DeleteAll(keysToDelete)
}
// GetTriggeredEndpointAlert returns whether the triggered alert for the specified endpoint as well as the necessary information to resolve it
//
// Always returns that the alert does not exist for the in-memory store since it does not support persistence across restarts
func (s *Store) GetTriggeredEndpointAlert(ep *endpoint.Endpoint, alert *alert.Alert) (exists bool, resolveKey string, numberOfSuccessesInARow int, err error) {
return false, "", 0, nil
}
// UpsertTriggeredEndpointAlert inserts/updates a triggered alert for an endpoint
// Used for persistence of triggered alerts across application restarts
//
// Does nothing for the in-memory store since it does not support persistence across restarts
func (s *Store) UpsertTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
return nil
}
// DeleteTriggeredEndpointAlert deletes a triggered alert for an endpoint
//
// Does nothing for the in-memory store since it does not support persistence across restarts
func (s *Store) DeleteTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
return nil
}
// DeleteAllTriggeredAlertsNotInChecksumsByEndpoint removes all triggered alerts owned by an endpoint whose alert
// configurations are not provided in the checksums list.
// This prevents triggered alerts that have been removed or modified from lingering in the database.
//
// Does nothing for the in-memory store since it does not support persistence across restarts
func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int {
return 0
}
// HasEndpointStatusNewerThan checks whether an endpoint has a status newer than the provided timestamp
func (s *Store) HasEndpointStatusNewerThan(key string, timestamp time.Time) (bool, error) {
s.RLock()
defer s.RUnlock()
endpointStatus := s.endpointCache.GetValue(key)
if endpointStatus == nil {
// If no endpoint exists, there's no newer status, so return false instead of an error
return false, nil
}
status, ok := endpointStatus.(*endpoint.Status)
if !ok {
return false, nil
}
for _, result := range status.Results {
if result.Timestamp.After(timestamp) {
return true, nil
}
}
return false, nil
}
// Clear deletes everything from the store
func (s *Store) Clear() {
s.endpointCache.Clear()
s.suiteCache.Clear()
}
// Save persists the cache to the store file
func (s *Store) Save() error {
return nil
}
// Close does nothing, because there's nothing to close
func (s *Store) Close() {
return
}