Files
spyda/internal/bitcask_store.go
2021-02-01 23:46:26 +10:00

361 lines
7.5 KiB
Go

package internal
import (
"fmt"
"strings"
"github.com/prologic/bitcask"
log "github.com/sirupsen/logrus"
"git.mills.io/prologic/spyda/internal/session"
)
const (
feedsKeyPrefix = "/feeds"
sessionsKeyPrefix = "/sessions"
usersKeyPrefix = "/users"
urlsKeyPrefix = "/urls"
tokensKeyPrefix = "/tokens"
)
// BitcaskStore ...
type BitcaskStore struct {
db *bitcask.Bitcask
}
func newBitcaskStore(path string) (*BitcaskStore, error) {
db, err := bitcask.Open(
path,
bitcask.WithMaxKeySize(256),
)
if err != nil {
return nil, err
}
return &BitcaskStore{db: db}, nil
}
// Sync ...
func (bs *BitcaskStore) Sync() error {
return bs.db.Sync()
}
// Close ...
func (bs *BitcaskStore) Close() error {
log.Info("syncing store ...")
if err := bs.db.Sync(); err != nil {
log.WithError(err).Error("error syncing store")
return err
}
log.Info("closing store ...")
if err := bs.db.Close(); err != nil {
log.WithError(err).Error("error closing store")
return err
}
return nil
}
// Merge ...
func (bs *BitcaskStore) Merge() error {
log.Info("merging store ...")
if err := bs.db.Merge(); err != nil {
log.WithError(err).Error("error merging store")
return err
}
return nil
}
func (bs *BitcaskStore) HasUser(username string) bool {
key := []byte(fmt.Sprintf("%s/%s", usersKeyPrefix, username))
return bs.db.Has(key)
}
func (bs *BitcaskStore) DelUser(username string) error {
key := []byte(fmt.Sprintf("%s/%s", usersKeyPrefix, username))
return bs.db.Delete(key)
}
func (bs *BitcaskStore) GetUser(username string) (*User, error) {
key := []byte(fmt.Sprintf("%s/%s", usersKeyPrefix, username))
data, err := bs.db.Get(key)
if err == bitcask.ErrKeyNotFound {
return nil, ErrUserNotFound
}
return LoadUser(data)
}
func (bs *BitcaskStore) SetUser(username string, user *User) error {
data, err := user.Bytes()
if err != nil {
return err
}
key := []byte(fmt.Sprintf("%s/%s", usersKeyPrefix, username))
if err := bs.db.Put(key, data); err != nil {
return err
}
return nil
}
func (bs *BitcaskStore) LenUsers() int64 {
var count int64
if err := bs.db.Scan([]byte(usersKeyPrefix), func(_ []byte) error {
count++
return nil
}); err != nil {
log.WithError(err).Error("error scanning")
}
return count
}
func (bs *BitcaskStore) SearchUsers(prefix string) []string {
var keys []string
if err := bs.db.Scan([]byte(usersKeyPrefix), func(key []byte) error {
if strings.HasPrefix(strings.ToLower(string(key)), prefix) {
keys = append(keys, strings.TrimPrefix(string(key), "/users/"))
}
return nil
}); err != nil {
log.WithError(err).Error("error scanning")
}
return keys
}
func (bs *BitcaskStore) GetAllUsers() ([]*User, error) {
var users []*User
err := bs.db.Scan([]byte(usersKeyPrefix), func(key []byte) error {
data, err := bs.db.Get(key)
if err != nil {
return err
}
user, err := LoadUser(data)
if err != nil {
return err
}
users = append(users, user)
return nil
})
if err != nil {
return nil, err
}
return users, nil
}
func (bs *BitcaskStore) HasURL(hash string) bool {
key := []byte(fmt.Sprintf("%s/%s", urlsKeyPrefix, hash))
return bs.db.Has(key)
}
func (bs *BitcaskStore) DelURL(hash string) error {
key := []byte(fmt.Sprintf("%s/%s", urlsKeyPrefix, hash))
return bs.db.Delete(key)
}
func (bs *BitcaskStore) GetURL(hash string) (*URL, error) {
key := []byte(fmt.Sprintf("%s/%s", urlsKeyPrefix, hash))
data, err := bs.db.Get(key)
if err == bitcask.ErrKeyNotFound {
return nil, ErrURLNotFound
}
return LoadURL(data)
}
func (bs *BitcaskStore) SetURL(hash string, url *URL) error {
data, err := url.Bytes()
if err != nil {
return err
}
key := []byte(fmt.Sprintf("%s/%s", urlsKeyPrefix, hash))
if err := bs.db.Put(key, data); err != nil {
return err
}
return nil
}
func (bs *BitcaskStore) URLCount() int64 {
var count int64
if err := bs.db.Scan([]byte(urlsKeyPrefix), func(_ []byte) error {
count++
return nil
}); err != nil {
log.WithError(err).Error("error scanning")
}
return count
}
func (bs *BitcaskStore) ForEachURL(f func(url *URL) error) error {
err := bs.db.Scan([]byte(urlsKeyPrefix), func(key []byte) error {
data, err := bs.db.Get(key)
if err != nil {
return err
}
url, err := LoadURL(data)
if err != nil {
return err
}
return f(url)
})
if err != nil {
return err
}
return nil
}
func (bs *BitcaskStore) GetSession(sid string) (*session.Session, error) {
key := []byte(fmt.Sprintf("%s/%s", sessionsKeyPrefix, sid))
data, err := bs.db.Get(key)
if err != nil {
if err == bitcask.ErrKeyNotFound {
return nil, session.ErrSessionNotFound
}
return nil, err
}
sess := session.NewSession(bs)
if err := session.LoadSession(data, sess); err != nil {
return nil, err
}
return sess, nil
}
func (bs *BitcaskStore) SetSession(sid string, sess *session.Session) error {
key := []byte(fmt.Sprintf("%s/%s", sessionsKeyPrefix, sid))
data, err := sess.Bytes()
if err != nil {
return err
}
return bs.db.Put(key, data)
}
func (bs *BitcaskStore) HasSession(sid string) bool {
key := []byte(fmt.Sprintf("%s/%s", sessionsKeyPrefix, sid))
return bs.db.Has(key)
}
func (bs *BitcaskStore) DelSession(sid string) error {
key := []byte(fmt.Sprintf("%s/%s", sessionsKeyPrefix, sid))
return bs.db.Delete(key)
}
func (bs *BitcaskStore) SyncSession(sess *session.Session) error {
// Only persist sessions with a logged in user associated with an account
// This saves resources as we don't need to keep session keys around for
// sessions we may never load from the store again.
if sess.Has("username") {
return bs.SetSession(sess.ID, sess)
}
return nil
}
func (bs *BitcaskStore) LenSessions() int64 {
var count int64
if err := bs.db.Scan([]byte(sessionsKeyPrefix), func(_ []byte) error {
count++
return nil
}); err != nil {
log.WithError(err).Error("error scanning")
}
return count
}
func (bs *BitcaskStore) GetAllSessions() ([]*session.Session, error) {
var sessions []*session.Session
err := bs.db.Scan([]byte(sessionsKeyPrefix), func(key []byte) error {
data, err := bs.db.Get(key)
if err != nil {
return err
}
sess := session.NewSession(bs)
if err := session.LoadSession(data, sess); err != nil {
return err
}
sessions = append(sessions, sess)
return nil
})
if err != nil {
return nil, err
}
return sessions, nil
}
func (bs *BitcaskStore) GetUserTokens(user *User) ([]*Token, error) {
tokens := []*Token{}
for _, signature := range user.Tokens {
tkn, err := bs.GetToken(signature)
if err != nil {
return tokens, err
}
tokens = append(tokens, tkn)
}
return tokens, nil
}
func (bs *BitcaskStore) GetToken(signature string) (*Token, error) {
key := []byte(fmt.Sprintf("%s/%s", tokensKeyPrefix, signature))
data, err := bs.db.Get(key)
if err == bitcask.ErrKeyNotFound {
return nil, ErrTokenNotFound
}
tkn, err := LoadToken(data)
if err != nil {
return nil, err
}
return tkn, nil
}
func (bs *BitcaskStore) SetToken(signature string, tkn *Token) error {
data, err := tkn.Bytes()
if err != nil {
return err
}
key := []byte(fmt.Sprintf("%s/%s", tokensKeyPrefix, signature))
if err := bs.db.Put(key, data); err != nil {
return err
}
return nil
}
func (bs *BitcaskStore) DelToken(signature string) error {
key := []byte(fmt.Sprintf("%s/%s", tokensKeyPrefix, signature))
return bs.db.Delete(key)
}
func (bs *BitcaskStore) LenTokens() int64 {
var count int64
if err := bs.db.Scan([]byte(tokensKeyPrefix), func(_ []byte) error {
count++
return nil
}); err != nil {
log.WithError(err).Error("error scanning")
}
return count
}