Files
spyda/internal/server.go
2022-10-05 00:19:23 +00:00

507 lines
11 KiB
Go

package internal
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"git.mills.io/prologic/observe"
"github.com/NYTimes/gziphandler"
"github.com/gabstv/merger"
"github.com/justinas/nosurf"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
"github.com/unrolled/logger"
"git.mills.io/prologic/spyda"
"git.mills.io/prologic/spyda/internal/auth"
"git.mills.io/prologic/spyda/internal/passwords"
"git.mills.io/prologic/spyda/internal/session"
"git.mills.io/prologic/spyda/internal/static"
)
var (
metrics *observe.Metrics
)
func init() {
metrics = observe.NewMetrics("spyda")
}
// Server ...
type Server struct {
bind string
config *Config
tmplman *TemplateManager
router *Router
server *http.Server
// Indexer
indexer Indexer
// Crawler
crawler Crawler
// Data Store
db Store
// Scheduler
cron *cron.Cron
// Dispatcher
tasks *Dispatcher
// Auth
am *auth.Manager
// Sessions
sc *SessionStore
sm *session.Manager
// API
api *API
// Passwords
pm passwords.Passwords
}
func (s *Server) render(name string, w http.ResponseWriter, ctx *Context) {
buf, err := s.tmplman.Exec(name, ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = buf.WriteTo(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// AddRouter ...
func (s *Server) AddRoute(method, path string, handler http.Handler) {
s.router.Handler(method, path, handler)
}
// AddShutdownHook ...
func (s *Server) AddShutdownHook(f func()) {
s.server.RegisterOnShutdown(f)
}
// Shutdown ...
func (s *Server) Shutdown(ctx context.Context) error {
s.cron.Stop()
s.tasks.Stop()
s.crawler.Stop()
if err := s.server.Shutdown(ctx); err != nil {
log.WithError(err).Error("error shutting down server")
return err
}
if err := s.db.Close(); err != nil {
log.WithError(err).Error("error closing store")
return err
}
return nil
}
// Run ...
func (s *Server) Run() (err error) {
idleConnsClosed := make(chan struct{})
go func() {
if err = s.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
log.WithError(err).Fatal("HTTP server ListenAndServe")
}
}()
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigch
log.Infof("Received signal %s", sig)
log.Info("Shutting down...")
// We received an interrupt signal, shut down.
if err = s.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.WithError(err).Fatal("Error shutting down HTTP server")
}
close(idleConnsClosed)
<-idleConnsClosed
return
}
// ListenAndServe ...
func (s *Server) ListenAndServe() error {
return s.server.ListenAndServe()
}
// AddCronJob ...
func (s *Server) AddCronJob(spec string, job cron.Job) error {
return s.cron.AddJob(spec, job)
}
func (s *Server) setupMetrics() {
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// sessions
metrics.NewGaugeFunc(
"server", "sessions",
"Number of active in-memory sessions (non-persistent)",
func() float64 {
return float64(s.sc.Count())
},
)
// database keys
metrics.NewGaugeFunc(
"db", "sessions",
"Number of database /sessions keys",
func() float64 {
return float64(s.db.LenSessions())
},
)
metrics.NewGaugeFunc(
"db", "users",
"Number of database /users keys",
func() float64 {
return float64(s.db.LenUsers())
},
)
metrics.NewGaugeFunc(
"db", "tokens",
"Number of database /tokens keys",
func() float64 {
return float64(s.db.LenTokens())
},
)
metrics.NewGaugeFunc(
"db", "urls",
"Number of database /urls keys",
func() float64 {
return float64(s.db.URLCount())
},
)
// Crawler stats
metrics.NewCounter(
"crawler", "crawled",
"Number of links crawled by the crawler",
)
metrics.NewCounter(
"crawler", "scraped",
"Number of links scraped by the crawler",
)
metrics.NewGauge(
"crawler", "duration",
"Duration of crawler tasks",
)
// Index stats
metrics.NewGaugeFunc(
"index", "size",
"Size of index (number of indexed entries)",
func() float64 {
return float64(s.indexer.Size())
},
)
// server info
metrics.NewGaugeVec(
"server", "info",
"Server information",
[]string{"full_version", "version", "commit"},
)
metrics.GaugeVec("server", "info").
With(map[string]string{
"full_version": spyda.FullVersion(),
"version": spyda.Version,
"commit": spyda.Commit,
}).Set(1)
metrics.NewCounter(
"server", "queries",
"Number of queries processed",
)
s.AddRoute("GET", "/metrics", metrics.Handler())
}
func (s *Server) setupCronJobs() error {
for name, jobSpec := range Jobs {
if jobSpec.Schedule == "" {
continue
}
job := jobSpec.Factory(s.config, s.db)
if err := s.cron.AddJob(jobSpec.Schedule, job); err != nil {
return err
}
log.Infof("Started background job %s (%s)", name, jobSpec.Schedule)
}
return nil
}
func (s *Server) runStartupJobs() {
time.Sleep(time.Second * 5)
log.Info("running startup jobs")
for name, jobSpec := range StartupJobs {
job := jobSpec.Factory(s.config, s.db)
log.Infof("running %s now...", name)
job.Run()
}
// Merge store
if err := s.db.Merge(); err != nil {
log.WithError(err).Error("error merging store")
}
}
func (s *Server) initRoutes() {
if s.config.Debug {
s.router.ServeFiles("/css/*filepath", http.Dir("./internal/static/css"))
s.router.ServeFiles("/img/*filepath", http.Dir("./internal/static/img"))
s.router.ServeFiles("/js/*filepath", http.Dir("./internal/static/js"))
} else {
cssFS := static.GetSubFilesystem("css")
imgFS := static.GetSubFilesystem("img")
jsFS := static.GetSubFilesystem("js")
s.router.ServeFilesWithCacheControl("/css/:commit/*filepath", cssFS)
s.router.ServeFilesWithCacheControl("/img/:commit/*filepath", imgFS)
s.router.ServeFilesWithCacheControl("/js/:commit/*filepath", jsFS)
}
s.router.NotFound = http.HandlerFunc(s.NotFoundHandler)
s.router.GET("/about", s.PageHandler("about"))
s.router.GET("/help", s.PageHandler("help"))
s.router.GET("/privacy", s.PageHandler("privacy"))
s.router.GET("/", s.IndexHandler())
s.router.HEAD("/", s.IndexHandler())
s.router.GET("/opensearch.xml", s.OpenSearchHandler())
s.router.GET("/robots.txt", s.RobotsHandler())
s.router.HEAD("/robots.txt", s.RobotsHandler())
s.router.GET("/search", s.SearchHandler())
s.router.GET("/cache/:hash", s.CacheHandler())
s.router.HEAD("/cache/:hash", s.CacheHandler())
s.router.GET("/login", s.am.HasAuth(s.LoginHandler()))
s.router.POST("/login", s.LoginHandler())
s.router.GET("/logout", s.LogoutHandler())
s.router.POST("/logout", s.LogoutHandler())
// Reset Password
s.router.GET("/pwreset", s.ResetPasswordHandler())
s.router.POST("/pwreset", s.ResetPasswordHandler())
s.router.GET("/chpasswd", s.ResetPasswordMagicLinkHandler())
s.router.POST("/chpasswd", s.NewPasswordHandler())
// Task State
s.router.GET("/tasks", s.TasksHandler())
s.router.GET("/task/:uuid", s.TaskHandler())
s.router.GET("/add", s.AddHandler())
s.router.POST("/add", s.AddHandler())
s.router.GET("/manage", s.ManageHandler())
s.router.POST("/manage", s.ManageHandler())
s.router.GET("/manage/users", s.ManageUsersHandler())
s.router.POST("/manage/adduser", s.AddUserHandler())
s.router.POST("/manage/deluser", s.DelUserHandler())
// Support
s.router.GET("/support", s.SupportHandler())
s.router.POST("/support", s.SupportHandler())
s.router.GET("/_captcha", s.CaptchaHandler())
}
// NewServer ...
func NewServer(bind string, options ...Option) (*Server, error) {
config := NewConfig()
for _, opt := range options {
if err := opt(config); err != nil {
return nil, err
}
}
settings, err := LoadSettings(filepath.Join(config.Data, "settings.yaml"))
if err != nil {
log.Warnf("error loading pod settings: %s", err)
} else {
if err := merger.MergeOverwrite(config, settings); err != nil {
log.WithError(err).Error("error merging pod settings")
return nil, err
}
}
if err := config.Validate(); err != nil {
log.WithError(err).Error("error validating config")
return nil, fmt.Errorf("error validating config: %w", err)
}
db, err := NewStore(config.Store)
if err != nil {
log.WithError(err).Error("error creating store")
return nil, err
}
if err := db.Merge(); err != nil {
log.WithError(err).Error("error merging store")
return nil, err
}
tmplman, err := NewTemplateManager(config)
if err != nil {
log.WithError(err).Error("error creating template manager")
return nil, err
}
router := NewRouter()
am := auth.NewManager(auth.NewOptions("/login", "/register"))
tasks := NewDispatcher(2, 10) // TODO: Make this configurable?
pm := passwords.NewScryptPasswords(nil)
sc := NewSessionStore(db, config.SessionCacheTTL)
sm := session.NewManager(
session.NewOptions(
config.Name,
config.CookieSecret,
config.LocalURL().Scheme == "https",
config.SessionExpiry,
),
sc,
)
indexer, err := NewIndexer(config)
if err != nil {
log.WithError(err).Error("error creating indexer")
return nil, err
}
crawler, err := NewCrawler(config, tasks, db, indexer)
if err != nil {
log.WithError(err).Error("error creating crawler")
return nil, err
}
api := NewAPI(router, config, db, pm)
csrfHandler := nosurf.New(router)
csrfHandler.ExemptGlob("/api/v1/*")
server := &Server{
bind: bind,
config: config,
router: router,
tmplman: tmplman,
server: &http.Server{
Addr: bind,
Handler: logger.New(logger.Options{
Prefix: "spyda",
RemoteAddressHeaders: []string{"X-Forwarded-For"},
}).Handler(
gziphandler.GzipHandler(
sm.Handler(csrfHandler),
),
),
},
// API
api: api,
// Indexer
indexer: indexer,
// Crawler
crawler: crawler,
// Data Store
db: db,
// Schedular
cron: cron.New(),
// Dispatcher
tasks: tasks,
// Auth Manager
am: am,
// Session Manager
sc: sc,
sm: sm,
// Password Manager
pm: pm,
}
if err := server.setupCronJobs(); err != nil {
log.WithError(err).Error("error setting up background jobs")
return nil, err
}
server.cron.Start()
log.Info("started background jobs")
server.tasks.Start()
log.Info("started task dispatcher")
server.crawler.Start()
log.Infof("started crawler")
server.setupMetrics()
log.Infof("serving metrics endpoint at %s/metrics", server.config.BaseURL)
// Log interesting configuration options
log.Infof("Instance Name: %s", server.config.Name)
log.Infof("Base URL: %s", server.config.BaseURL)
log.Infof("Admin User: %s", server.config.AdminUser)
log.Infof("Admin Name: %s", server.config.AdminName)
log.Infof("Admin Email: %s", server.config.AdminEmail)
log.Infof("SMTP Host: %s", server.config.SMTPHost)
log.Infof("SMTP Port: %d", server.config.SMTPPort)
log.Infof("SMTP User: %s", server.config.SMTPUser)
log.Infof("SMTP From: %s", server.config.SMTPFrom)
log.Infof("API Session Time: %s", server.config.APISessionTime)
server.initRoutes()
api.initRoutes()
go server.runStartupJobs()
return server, nil
}