feat(suite): Implement Suites (#1239)
* feat(suite): Implement Suites Fixes #1230 * Update docs * Fix variable alignment * Prevent always-run endpoint from running if a context placeholder fails to resolve in the URL * Return errors when a context placeholder path fails to resolve * Add a couple of unit tests * Add a couple of unit tests * fix(ui): Update group count properly Fixes #1233 * refactor: Pass down entire config instead of several sub-configs * fix: Change default suite interval and timeout * fix: Deprecate disable-monitoring-lock in favor of concurrency * fix: Make sure there are no duplicate keys * Refactor some code * Update watchdog/watchdog.go * Update web/app/src/components/StepDetailsModal.vue Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore: Remove useless log * fix: Set default concurrency to 3 instead of 5 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
|
||||
"github.com/TwiN/gatus/v5/alerting/alert"
|
||||
"github.com/TwiN/gatus/v5/config/endpoint"
|
||||
"github.com/TwiN/gatus/v5/config/key"
|
||||
"github.com/TwiN/gatus/v5/config/suite"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common/paging"
|
||||
"github.com/TwiN/gocache/v2"
|
||||
@@ -138,7 +140,7 @@ func (s *Store) GetAllEndpointStatuses(params *paging.EndpointStatusParams) ([]*
|
||||
|
||||
// GetEndpointStatus returns the endpoint status for a given endpoint name in the given group
|
||||
func (s *Store) GetEndpointStatus(groupName, endpointName string, params *paging.EndpointStatusParams) (*endpoint.Status, error) {
|
||||
return s.GetEndpointStatusByKey(endpoint.ConvertGroupAndEndpointNameToKey(groupName, endpointName), params)
|
||||
return s.GetEndpointStatusByKey(key.ConvertGroupAndNameToKey(groupName, endpointName), params)
|
||||
}
|
||||
|
||||
// GetEndpointStatusByKey returns the endpoint status for a given key
|
||||
@@ -233,8 +235,8 @@ func (s *Store) GetHourlyAverageResponseTimeByKey(key string, from, to time.Time
|
||||
return hourlyAverageResponseTimes, nil
|
||||
}
|
||||
|
||||
// Insert adds the observed result for the specified endpoint into the store
|
||||
func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// InsertEndpointResult adds the observed result for the specified endpoint into the store
|
||||
func (s *Store) InsertEndpointResult(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -245,12 +247,12 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// Endpoint doesn't exist in the database, insert it
|
||||
if endpointID, err = s.insertEndpoint(tx, ep); err != nil {
|
||||
_ = tx.Rollback()
|
||||
logr.Errorf("[sql.Insert] Failed to create endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to create endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve id of endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve id of endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -266,7 +268,7 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
numberOfEvents, err := s.getNumberOfEventsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve total number of events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve total number of events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
if numberOfEvents == 0 {
|
||||
// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event
|
||||
@@ -276,18 +278,18 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
})
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
logr.Errorf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", endpoint.EventStart, ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to insert event=%s for endpoint with key=%s: %s", endpoint.EventStart, ep.Key(), err.Error())
|
||||
}
|
||||
event := endpoint.NewEventFromResult(result)
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
logr.Errorf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
}
|
||||
} else {
|
||||
// Get the success value of the previous result
|
||||
var lastResultSuccess bool
|
||||
if lastResultSuccess, err = s.getLastEndpointResultSuccessValue(tx, endpointID); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve outcome of previous result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve outcome of previous result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result.
|
||||
// If the final outcome (success or failure) of the previous and the new result aren't the same, it means
|
||||
@@ -297,7 +299,7 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
event := endpoint.NewEventFromResult(result)
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
logr.Errorf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -306,42 +308,42 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// (since we're only deleting MaximumNumberOfEvents at a time instead of 1)
|
||||
if numberOfEvents > int64(s.maximumNumberOfEvents+eventsAboveMaximumCleanUpThreshold) {
|
||||
if err = s.deleteOldEndpointEvents(tx, endpointID); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to delete old events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to delete old events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Second, we need to insert the result.
|
||||
if err = s.insertEndpointResult(tx, endpointID, result); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to insert result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to insert result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
_ = tx.Rollback() // If we can't insert the result, we'll rollback now since there's no point continuing
|
||||
return err
|
||||
}
|
||||
// Clean up old results
|
||||
numberOfResults, err := s.getNumberOfResultsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve total number of results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve total number of results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
if numberOfResults > int64(s.maximumNumberOfResults+resultsAboveMaximumCleanUpThreshold) {
|
||||
if err = s.deleteOldEndpointResults(tx, endpointID); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to delete old results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to delete old results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finally, we need to insert the uptime data.
|
||||
// Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime
|
||||
if err = s.updateEndpointUptime(tx, endpointID, result); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to update uptime for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to update uptime for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
// Merge hourly uptime entries that can be merged into daily entries and clean up old uptime entries
|
||||
numberOfUptimeEntries, err := s.getNumberOfUptimeEntriesByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve total number of uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve total number of uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
// Merge older hourly uptime entries into daily uptime entries if we have more than uptimeTotalEntriesMergeThreshold
|
||||
if numberOfUptimeEntries >= uptimeTotalEntriesMergeThreshold {
|
||||
logr.Infof("[sql.Insert] Merging hourly uptime entries for endpoint with key=%s; This is a lot of work, it shouldn't happen too often", ep.Key())
|
||||
logr.Infof("[sql.InsertEndpointResult] Merging hourly uptime entries for endpoint with key=%s; This is a lot of work, it shouldn't happen too often", ep.Key())
|
||||
if err = s.mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries(tx, endpointID); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to merge hourly uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to merge hourly uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -350,11 +352,11 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// but if Gatus was temporarily shut down, we might have some old entries that need to be cleaned up
|
||||
ageOfOldestUptimeEntry, err := s.getAgeOfOldestEndpointUptimeEntry(tx, endpointID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to retrieve oldest endpoint uptime entry for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to retrieve oldest endpoint uptime entry for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
if ageOfOldestUptimeEntry > uptimeAgeCleanUpThreshold {
|
||||
if err = s.deleteOldUptimeEntries(tx, endpointID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
|
||||
logr.Errorf("[sql.Insert] Failed to delete old uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Failed to delete old uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -364,7 +366,7 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
s.writeThroughCache.Delete(cacheKey)
|
||||
endpointKey, params, err := extractKeyAndParamsFromCacheKey(cacheKey)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.Insert] Silently deleting cache key %s instead of refreshing due to error: %s", cacheKey, err.Error())
|
||||
logr.Errorf("[sql.InsertEndpointResult] Silently deleting cache key %s instead of refreshing due to error: %s", cacheKey, err.Error())
|
||||
continue
|
||||
}
|
||||
// Retrieve the endpoint status by key, which will in turn refresh the cache
|
||||
@@ -379,17 +381,43 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
|
||||
// DeleteAllEndpointStatusesNotInKeys removes all rows owned by an endpoint whose key is not within the keys provided
|
||||
func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
|
||||
logr.Debugf("[sql.DeleteAllEndpointStatusesNotInKeys] Called with %d keys", len(keys))
|
||||
var err error
|
||||
var result sql.Result
|
||||
if len(keys) == 0 {
|
||||
// Delete everything
|
||||
logr.Debugf("[sql.DeleteAllEndpointStatusesNotInKeys] No keys provided, deleting all endpoints")
|
||||
result, err = s.db.Exec("DELETE FROM endpoints")
|
||||
} else {
|
||||
// First check what we're about to delete
|
||||
args := make([]interface{}, 0, len(keys))
|
||||
checkQuery := "SELECT endpoint_key FROM endpoints WHERE endpoint_key NOT IN ("
|
||||
for i := range keys {
|
||||
checkQuery += fmt.Sprintf("$%d,", i+1)
|
||||
args = append(args, keys[i])
|
||||
}
|
||||
checkQuery = checkQuery[:len(checkQuery)-1] + ")"
|
||||
|
||||
rows, checkErr := s.db.Query(checkQuery, args...)
|
||||
if checkErr == nil {
|
||||
defer rows.Close()
|
||||
var deletedKeys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err == nil {
|
||||
deletedKeys = append(deletedKeys, key)
|
||||
}
|
||||
}
|
||||
if len(deletedKeys) > 0 {
|
||||
logr.Infof("[sql.DeleteAllEndpointStatusesNotInKeys] Deleting endpoints with keys: %v", deletedKeys)
|
||||
} else {
|
||||
logr.Debugf("[sql.DeleteAllEndpointStatusesNotInKeys] No endpoints to delete")
|
||||
}
|
||||
}
|
||||
|
||||
query := "DELETE FROM endpoints WHERE endpoint_key NOT IN ("
|
||||
for i := range keys {
|
||||
query += fmt.Sprintf("$%d,", i+1)
|
||||
args = append(args, keys[i])
|
||||
}
|
||||
query = query[:len(query)-1] + ")" // Remove the last comma and add the closing parenthesis
|
||||
result, err = s.db.Exec(query, args...)
|
||||
@@ -586,11 +614,16 @@ func (s *Store) insertEndpointEvent(tx *sql.Tx, endpointID int64, event *endpoin
|
||||
|
||||
// insertEndpointResult inserts a result in the store
|
||||
func (s *Store) insertEndpointResult(tx *sql.Tx, endpointID int64, result *endpoint.Result) error {
|
||||
return s.insertEndpointResultWithSuiteID(tx, endpointID, result, nil)
|
||||
}
|
||||
|
||||
// insertEndpointResultWithSuiteID inserts a result in the store with optional suite linkage
|
||||
func (s *Store) insertEndpointResultWithSuiteID(tx *sql.Tx, endpointID int64, result *endpoint.Result, suiteResultID *int64) error {
|
||||
var endpointResultID int64
|
||||
err := tx.QueryRow(
|
||||
`
|
||||
INSERT INTO endpoint_results (endpoint_id, success, errors, connected, status, dns_rcode, certificate_expiration, domain_expiration, hostname, ip, duration, timestamp)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
INSERT INTO endpoint_results (endpoint_id, success, errors, connected, status, dns_rcode, certificate_expiration, domain_expiration, hostname, ip, duration, timestamp, suite_result_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||||
RETURNING endpoint_result_id
|
||||
`,
|
||||
endpointID,
|
||||
@@ -605,6 +638,7 @@ func (s *Store) insertEndpointResult(tx *sql.Tx, endpointID int64, result *endpo
|
||||
result.IP,
|
||||
result.Duration,
|
||||
result.Timestamp.UTC(),
|
||||
suiteResultID,
|
||||
).Scan(&endpointResultID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -652,7 +686,16 @@ func (s *Store) updateEndpointUptime(tx *sql.Tx, endpointID int64, result *endpo
|
||||
}
|
||||
|
||||
func (s *Store) getAllEndpointKeys(tx *sql.Tx) (keys []string, err error) {
|
||||
rows, err := tx.Query("SELECT endpoint_key FROM endpoints ORDER BY endpoint_key")
|
||||
// Only get endpoints that have at least one result not linked to a suite
|
||||
// This excludes endpoints that only exist as part of suites
|
||||
// Using JOIN for better performance than EXISTS subquery
|
||||
rows, err := tx.Query(`
|
||||
SELECT DISTINCT e.endpoint_key
|
||||
FROM endpoints e
|
||||
INNER JOIN endpoint_results er ON e.endpoint_id = er.endpoint_id
|
||||
WHERE er.suite_result_id IS NULL
|
||||
ORDER BY e.endpoint_key
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1108,3 +1151,428 @@ func extractKeyAndParamsFromCacheKey(cacheKey string) (string, *paging.EndpointS
|
||||
}
|
||||
return strings.Join(parts[:len(parts)-4], "-"), params, nil
|
||||
}
|
||||
|
||||
// GetAllSuiteStatuses returns all monitored suite statuses
|
||||
func (s *Store) GetAllSuiteStatuses(params *paging.SuiteStatusParams) ([]*suite.Status, error) {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Get all suites
|
||||
rows, err := tx.Query(`
|
||||
SELECT suite_id, suite_key, suite_name, suite_group
|
||||
FROM suites
|
||||
ORDER BY suite_key
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var suiteStatuses []*suite.Status
|
||||
for rows.Next() {
|
||||
var suiteID int64
|
||||
var key, name, group string
|
||||
if err = rows.Scan(&suiteID, &key, &name, &group); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status := &suite.Status{
|
||||
Name: name,
|
||||
Group: group,
|
||||
Key: key,
|
||||
Results: []*suite.Result{},
|
||||
}
|
||||
|
||||
// Get suite results with pagination
|
||||
pageSize := 20
|
||||
page := 1
|
||||
if params != nil {
|
||||
if params.PageSize > 0 {
|
||||
pageSize = params.PageSize
|
||||
}
|
||||
if params.Page > 0 {
|
||||
page = params.Page
|
||||
}
|
||||
}
|
||||
|
||||
status.Results, err = s.getSuiteResults(tx, suiteID, page, pageSize)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.GetAllSuiteStatuses] Failed to retrieve results for suite_id=%d: %s", suiteID, err.Error())
|
||||
}
|
||||
// Populate Name and Group fields on each result
|
||||
for _, result := range status.Results {
|
||||
result.Name = name
|
||||
result.Group = group
|
||||
}
|
||||
|
||||
suiteStatuses = append(suiteStatuses, status)
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return suiteStatuses, nil
|
||||
}
|
||||
|
||||
// GetSuiteStatusByKey returns the suite status for a given key
|
||||
func (s *Store) GetSuiteStatusByKey(key string, params *paging.SuiteStatusParams) (*suite.Status, error) {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
var suiteID int64
|
||||
var name, group string
|
||||
err = tx.QueryRow(`
|
||||
SELECT suite_id, suite_name, suite_group
|
||||
FROM suites
|
||||
WHERE suite_key = $1
|
||||
`, key).Scan(&suiteID, &name, &group)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status := &suite.Status{
|
||||
Name: name,
|
||||
Group: group,
|
||||
Key: key,
|
||||
Results: []*suite.Result{},
|
||||
}
|
||||
|
||||
// Get suite results with pagination
|
||||
pageSize := 20
|
||||
page := 1
|
||||
if params != nil {
|
||||
if params.PageSize > 0 {
|
||||
pageSize = params.PageSize
|
||||
}
|
||||
if params.Page > 0 {
|
||||
page = params.Page
|
||||
}
|
||||
}
|
||||
|
||||
status.Results, err = s.getSuiteResults(tx, suiteID, page, pageSize)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.GetSuiteStatusByKey] Failed to retrieve results for suite_id=%d: %s", suiteID, err.Error())
|
||||
}
|
||||
// Populate Name and Group fields on each result
|
||||
for _, result := range status.Results {
|
||||
result.Name = name
|
||||
result.Group = group
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// InsertSuiteResult adds the observed result for the specified suite into the store
|
||||
func (s *Store) InsertSuiteResult(su *suite.Suite, result *suite.Result) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Get or create suite
|
||||
suiteID, err := s.getSuiteID(tx, su)
|
||||
if err != nil {
|
||||
if errors.Is(err, common.ErrSuiteNotFound) {
|
||||
// Suite doesn't exist in the database, insert it
|
||||
if suiteID, err = s.insertSuite(tx, su); err != nil {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to create suite with key=%s: %s", su.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to retrieve id of suite with key=%s: %s", su.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Insert suite result
|
||||
var suiteResultID int64
|
||||
err = tx.QueryRow(`
|
||||
INSERT INTO suite_results (suite_id, success, errors, duration, timestamp)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING suite_result_id
|
||||
`,
|
||||
suiteID,
|
||||
result.Success,
|
||||
strings.Join(result.Errors, arraySeparator),
|
||||
result.Duration.Nanoseconds(),
|
||||
result.Timestamp.UTC(), // timestamp is the start time
|
||||
).Scan(&suiteResultID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// For each endpoint result in the suite, we need to store them
|
||||
for _, epResult := range result.EndpointResults {
|
||||
// Create a temporary endpoint object for storage
|
||||
ep := &endpoint.Endpoint{
|
||||
Name: epResult.Name,
|
||||
Group: su.Group,
|
||||
}
|
||||
// Get or create the endpoint (without suite linkage in endpoints table)
|
||||
epID, err := s.getEndpointID(tx, ep)
|
||||
if err != nil {
|
||||
if errors.Is(err, common.ErrEndpointNotFound) {
|
||||
// Endpoint doesn't exist, create it
|
||||
if epID, err = s.insertEndpoint(tx, ep); err != nil {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to create endpoint %s: %s", epResult.Name, err.Error())
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to get endpoint %s: %s", epResult.Name, err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
// InsertEndpointResult the endpoint result with suite linkage
|
||||
err = s.insertEndpointResultWithSuiteID(tx, epID, epResult, &suiteResultID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to insert endpoint result for %s: %s", epResult.Name, err.Error())
|
||||
}
|
||||
}
|
||||
// Clean up old suite results
|
||||
numberOfResults, err := s.getNumberOfSuiteResultsByID(tx, suiteID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to retrieve total number of results for suite with key=%s: %s", su.Key(), err.Error())
|
||||
} else {
|
||||
if numberOfResults > int64(s.maximumNumberOfResults+resultsAboveMaximumCleanUpThreshold) {
|
||||
if err = s.deleteOldSuiteResults(tx, suiteID); err != nil {
|
||||
logr.Errorf("[sql.InsertSuiteResult] Failed to delete old results for suite with key=%s: %s", su.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteAllSuiteStatusesNotInKeys removes all suite statuses that are not within the keys provided
|
||||
func (s *Store) DeleteAllSuiteStatusesNotInKeys(keys []string) int {
|
||||
logr.Debugf("[sql.DeleteAllSuiteStatusesNotInKeys] Called with %d keys", len(keys))
|
||||
if len(keys) == 0 {
|
||||
// Delete all suites
|
||||
logr.Debugf("[sql.DeleteAllSuiteStatusesNotInKeys] No keys provided, deleting all suites")
|
||||
result, err := s.db.Exec("DELETE FROM suites")
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.DeleteAllSuiteStatusesNotInKeys] Failed to delete all suites: %s", err.Error())
|
||||
return 0
|
||||
}
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
return int(rowsAffected)
|
||||
}
|
||||
args := make([]interface{}, 0, len(keys))
|
||||
query := "DELETE FROM suites WHERE suite_key NOT IN ("
|
||||
for i := range keys {
|
||||
if i > 0 {
|
||||
query += ","
|
||||
}
|
||||
query += fmt.Sprintf("$%d", i+1)
|
||||
args = append(args, keys[i])
|
||||
}
|
||||
query += ")"
|
||||
// First, let's see what we're about to delete
|
||||
checkQuery := "SELECT suite_key FROM suites WHERE suite_key NOT IN ("
|
||||
for i := range keys {
|
||||
if i > 0 {
|
||||
checkQuery += ","
|
||||
}
|
||||
checkQuery += fmt.Sprintf("$%d", i+1)
|
||||
}
|
||||
checkQuery += ")"
|
||||
rows, err := s.db.Query(checkQuery, args...)
|
||||
if err == nil {
|
||||
defer rows.Close()
|
||||
var deletedKeys []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err == nil {
|
||||
deletedKeys = append(deletedKeys, key)
|
||||
}
|
||||
}
|
||||
if len(deletedKeys) > 0 {
|
||||
logr.Infof("[sql.DeleteAllSuiteStatusesNotInKeys] Deleting suites with keys: %v", deletedKeys)
|
||||
}
|
||||
}
|
||||
result, err := s.db.Exec(query, args...)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.DeleteAllSuiteStatusesNotInKeys] Failed to delete suites: %s", err.Error())
|
||||
return 0
|
||||
}
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
return int(rowsAffected)
|
||||
}
|
||||
|
||||
// Suite helper methods
|
||||
|
||||
// getSuiteID retrieves the suite ID from the database by its key
|
||||
func (s *Store) getSuiteID(tx *sql.Tx, su *suite.Suite) (int64, error) {
|
||||
var id int64
|
||||
err := tx.QueryRow("SELECT suite_id FROM suites WHERE suite_key = $1", su.Key()).Scan(&id)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return 0, common.ErrSuiteNotFound
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// insertSuite inserts a suite in the store and returns the generated id
|
||||
func (s *Store) insertSuite(tx *sql.Tx, su *suite.Suite) (int64, error) {
|
||||
var id int64
|
||||
err := tx.QueryRow(
|
||||
"INSERT INTO suites (suite_key, suite_name, suite_group) VALUES ($1, $2, $3) RETURNING suite_id",
|
||||
su.Key(),
|
||||
su.Name,
|
||||
su.Group,
|
||||
).Scan(&id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// getSuiteResults retrieves paginated suite results
|
||||
func (s *Store) getSuiteResults(tx *sql.Tx, suiteID int64, page, pageSize int) ([]*suite.Result, error) {
|
||||
rows, err := tx.Query(`
|
||||
SELECT suite_result_id, success, errors, duration, timestamp
|
||||
FROM suite_results
|
||||
WHERE suite_id = $1
|
||||
ORDER BY suite_result_id DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
`,
|
||||
suiteID,
|
||||
pageSize,
|
||||
(page-1)*pageSize,
|
||||
)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.getSuiteResults] Query failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
type suiteResultData struct {
|
||||
result *suite.Result
|
||||
id int64
|
||||
}
|
||||
var resultsData []suiteResultData
|
||||
for rows.Next() {
|
||||
result := &suite.Result{
|
||||
EndpointResults: []*endpoint.Result{},
|
||||
}
|
||||
var suiteResultID int64
|
||||
var joinedErrors string
|
||||
var nanoseconds int64
|
||||
err = rows.Scan(&suiteResultID, &result.Success, &joinedErrors, &nanoseconds, &result.Timestamp)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.getSuiteResults] Failed to scan suite result: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
result.Duration = time.Duration(nanoseconds)
|
||||
if len(joinedErrors) > 0 {
|
||||
result.Errors = strings.Split(joinedErrors, arraySeparator)
|
||||
}
|
||||
// Store both result and ID together
|
||||
resultsData = append(resultsData, suiteResultData{
|
||||
result: result,
|
||||
id: suiteResultID,
|
||||
})
|
||||
}
|
||||
|
||||
// Reverse the results to get chronological order (oldest to newest)
|
||||
for i := len(resultsData)/2 - 1; i >= 0; i-- {
|
||||
opp := len(resultsData) - 1 - i
|
||||
resultsData[i], resultsData[opp] = resultsData[opp], resultsData[i]
|
||||
}
|
||||
// Fetch endpoint results for each suite result
|
||||
for _, data := range resultsData {
|
||||
result := data.result
|
||||
resultID := data.id
|
||||
// Query endpoint results for this suite result
|
||||
epRows, err := tx.Query(`
|
||||
SELECT
|
||||
e.endpoint_name,
|
||||
er.success,
|
||||
er.errors,
|
||||
er.duration,
|
||||
er.timestamp
|
||||
FROM endpoint_results er
|
||||
JOIN endpoints e ON er.endpoint_id = e.endpoint_id
|
||||
WHERE er.suite_result_id = $1
|
||||
ORDER BY er.endpoint_result_id
|
||||
`, resultID)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.getSuiteResults] Failed to get endpoint results for suite_result_id=%d: %s", resultID, err.Error())
|
||||
continue
|
||||
}
|
||||
epCount := 0
|
||||
for epRows.Next() {
|
||||
epCount++
|
||||
var name string
|
||||
var success bool
|
||||
var joinedErrors string
|
||||
var duration int64
|
||||
var timestamp time.Time
|
||||
err = epRows.Scan(&name, &success, &joinedErrors, &duration, ×tamp)
|
||||
if err != nil {
|
||||
logr.Errorf("[sql.getSuiteResults] Failed to scan endpoint result: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
epResult := &endpoint.Result{
|
||||
Name: name,
|
||||
Success: success,
|
||||
Duration: time.Duration(duration),
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
if len(joinedErrors) > 0 {
|
||||
epResult.Errors = strings.Split(joinedErrors, arraySeparator)
|
||||
}
|
||||
result.EndpointResults = append(result.EndpointResults, epResult)
|
||||
}
|
||||
epRows.Close()
|
||||
if epCount > 0 {
|
||||
logr.Debugf("[sql.getSuiteResults] Found %d endpoint results for suite_result_id=%d", epCount, resultID)
|
||||
}
|
||||
}
|
||||
// Extract just the results for return
|
||||
var results []*suite.Result
|
||||
for _, data := range resultsData {
|
||||
results = append(results, data.result)
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// getNumberOfSuiteResultsByID gets the count of results for a suite
|
||||
func (s *Store) getNumberOfSuiteResultsByID(tx *sql.Tx, suiteID int64) (int64, error) {
|
||||
var count int64
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM suite_results WHERE suite_id = $1", suiteID).Scan(&count)
|
||||
return count, err
|
||||
}
|
||||
|
||||
// deleteOldSuiteResults deletes old suite results beyond the maximum
|
||||
func (s *Store) deleteOldSuiteResults(tx *sql.Tx, suiteID int64) error {
|
||||
_, err := tx.Exec(`
|
||||
DELETE FROM suite_results
|
||||
WHERE suite_id = $1
|
||||
AND suite_result_id NOT IN (
|
||||
SELECT suite_result_id
|
||||
FROM suite_results
|
||||
WHERE suite_id = $1
|
||||
ORDER BY suite_result_id DESC
|
||||
LIMIT $2
|
||||
)
|
||||
`,
|
||||
suiteID,
|
||||
s.maximumNumberOfResults,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user