Persist, renew and delete sessions, refactor storage package, move reusable packages to pkg
This commit is contained in:
parent
121582f72a
commit
24f9553aa5
48 changed files with 1872 additions and 1171 deletions
79
storage/bleve/bleve.go
Normal file
79
storage/bleve/bleve.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package bleve
|
||||
|
||||
import (
|
||||
"github.com/blevesearch/bleve"
|
||||
"github.com/blevesearch/bleve/analysis/analyzer/keyword"
|
||||
|
||||
"github.com/khlieng/dispatch/storage"
|
||||
)
|
||||
|
||||
// Bleve implements storage.MessageSearchProvider
|
||||
type Bleve struct {
|
||||
index bleve.Index
|
||||
}
|
||||
|
||||
func New(path string) (*Bleve, error) {
|
||||
index, err := bleve.Open(path)
|
||||
if err == bleve.ErrorIndexPathDoesNotExist {
|
||||
keywordMapping := bleve.NewTextFieldMapping()
|
||||
keywordMapping.Analyzer = keyword.Name
|
||||
keywordMapping.Store = false
|
||||
keywordMapping.IncludeTermVectors = false
|
||||
keywordMapping.IncludeInAll = false
|
||||
|
||||
contentMapping := bleve.NewTextFieldMapping()
|
||||
contentMapping.Analyzer = "en"
|
||||
contentMapping.Store = false
|
||||
contentMapping.IncludeTermVectors = false
|
||||
contentMapping.IncludeInAll = false
|
||||
|
||||
messageMapping := bleve.NewDocumentMapping()
|
||||
messageMapping.StructTagKey = "bleve"
|
||||
messageMapping.AddFieldMappingsAt("server", keywordMapping)
|
||||
messageMapping.AddFieldMappingsAt("to", keywordMapping)
|
||||
messageMapping.AddFieldMappingsAt("content", contentMapping)
|
||||
|
||||
mapping := bleve.NewIndexMapping()
|
||||
mapping.AddDocumentMapping("message", messageMapping)
|
||||
|
||||
index, err = bleve.New(path, mapping)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Bleve{index: index}, nil
|
||||
}
|
||||
|
||||
func (b *Bleve) Index(id string, message *storage.Message) error {
|
||||
return b.index.Index(id, message)
|
||||
}
|
||||
|
||||
func (b *Bleve) SearchMessages(server, channel, q string) ([]string, error) {
|
||||
serverQuery := bleve.NewMatchQuery(server)
|
||||
serverQuery.SetField("server")
|
||||
channelQuery := bleve.NewMatchQuery(channel)
|
||||
channelQuery.SetField("to")
|
||||
contentQuery := bleve.NewMatchQuery(q)
|
||||
contentQuery.SetField("content")
|
||||
contentQuery.SetFuzziness(2)
|
||||
|
||||
query := bleve.NewBooleanQuery()
|
||||
query.AddMust(serverQuery, channelQuery, contentQuery)
|
||||
|
||||
search := bleve.NewSearchRequest(query)
|
||||
searchResults, err := b.index.Search(search)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ids := make([]string, len(searchResults.Hits))
|
||||
for i, hit := range searchResults.Hits {
|
||||
ids[i] = hit.ID
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (b *Bleve) Close() {
|
||||
b.index.Close()
|
||||
}
|
364
storage/boltdb/boltdb.go
Normal file
364
storage/boltdb/boltdb.go
Normal file
|
@ -0,0 +1,364 @@
|
|||
package boltdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"strconv"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
|
||||
"github.com/khlieng/dispatch/pkg/session"
|
||||
"github.com/khlieng/dispatch/storage"
|
||||
)
|
||||
|
||||
var (
|
||||
bucketUsers = []byte("Users")
|
||||
bucketServers = []byte("Servers")
|
||||
bucketChannels = []byte("Channels")
|
||||
bucketMessages = []byte("Messages")
|
||||
bucketSessions = []byte("Sessions")
|
||||
)
|
||||
|
||||
// BoltStore implements storage.Store, storage.MessageStore and storage.SessionStore
|
||||
type BoltStore struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
func New(path string) (*BoltStore, error) {
|
||||
db, err := bolt.Open(path, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists(bucketUsers)
|
||||
tx.CreateBucketIfNotExists(bucketServers)
|
||||
tx.CreateBucketIfNotExists(bucketChannels)
|
||||
tx.CreateBucketIfNotExists(bucketMessages)
|
||||
tx.CreateBucketIfNotExists(bucketSessions)
|
||||
return nil
|
||||
})
|
||||
|
||||
return &BoltStore{
|
||||
db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) Close() {
|
||||
s.db.Close()
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetUsers() ([]storage.User, error) {
|
||||
var users []storage.User
|
||||
|
||||
s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketUsers)
|
||||
|
||||
return b.ForEach(func(k, _ []byte) error {
|
||||
id := idFromBytes(k)
|
||||
user := storage.User{
|
||||
ID: id,
|
||||
IDBytes: make([]byte, 8),
|
||||
Username: strconv.FormatUint(id, 10),
|
||||
}
|
||||
copy(user.IDBytes, k)
|
||||
|
||||
users = append(users, user)
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
return users, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) SaveUser(user *storage.User) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketUsers)
|
||||
|
||||
user.ID, _ = b.NextSequence()
|
||||
user.Username = strconv.FormatUint(user.ID, 10)
|
||||
|
||||
data, err := user.Marshal(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user.IDBytes = idToBytes(user.ID)
|
||||
return b.Put(user.IDBytes, data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) DeleteUser(user *storage.User) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
c := b.Cursor()
|
||||
|
||||
for k, _ := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, _ = c.Next() {
|
||||
b.Delete(k)
|
||||
}
|
||||
|
||||
b = tx.Bucket(bucketChannels)
|
||||
c = b.Cursor()
|
||||
|
||||
for k, _ := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, _ = c.Next() {
|
||||
b.Delete(k)
|
||||
}
|
||||
|
||||
return tx.Bucket(bucketUsers).Delete(user.IDBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetServers(user *storage.User) ([]storage.Server, error) {
|
||||
var servers []storage.Server
|
||||
|
||||
s.db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(bucketServers).Cursor()
|
||||
|
||||
for k, v := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, v = c.Next() {
|
||||
server := storage.Server{}
|
||||
server.Unmarshal(v)
|
||||
servers = append(servers, server)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) AddServer(user *storage.User, server *storage.Server) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
data, _ := server.Marshal(nil)
|
||||
|
||||
return b.Put(serverID(user, server.Host), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) RemoveServer(user *storage.User, address string) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
serverID := serverID(user, address)
|
||||
tx.Bucket(bucketServers).Delete(serverID)
|
||||
|
||||
b := tx.Bucket(bucketChannels)
|
||||
c := b.Cursor()
|
||||
|
||||
for k, _ := c.Seek(serverID); bytes.HasPrefix(k, serverID); k, _ = c.Next() {
|
||||
b.Delete(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) SetNick(user *storage.User, nick, address string) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
id := serverID(user, address)
|
||||
|
||||
server := storage.Server{}
|
||||
v := b.Get(id)
|
||||
if v != nil {
|
||||
server.Unmarshal(v)
|
||||
server.Nick = nick
|
||||
|
||||
data, _ := server.Marshal(nil)
|
||||
return b.Put(id, data)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) SetServerName(user *storage.User, name, address string) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
id := serverID(user, address)
|
||||
|
||||
server := storage.Server{}
|
||||
v := b.Get(id)
|
||||
if v != nil {
|
||||
server.Unmarshal(v)
|
||||
server.Name = name
|
||||
|
||||
data, _ := server.Marshal(nil)
|
||||
return b.Put(id, data)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetChannels(user *storage.User) ([]storage.Channel, error) {
|
||||
var channels []storage.Channel
|
||||
|
||||
s.db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(bucketChannels).Cursor()
|
||||
|
||||
for k, v := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, v = c.Next() {
|
||||
channel := storage.Channel{}
|
||||
channel.Unmarshal(v)
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) AddChannel(user *storage.User, channel *storage.Channel) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketChannels)
|
||||
data, _ := channel.Marshal(nil)
|
||||
|
||||
return b.Put(channelID(user, channel.Server, channel.Name), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) RemoveChannel(user *storage.User, server, channel string) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketChannels)
|
||||
id := channelID(user, server, channel)
|
||||
|
||||
return b.Delete(id)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) LogMessage(message *storage.Message) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b, err := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(message.Server + ":" + message.To))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := message.Marshal(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put([]byte(message.ID), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetMessages(server, channel string, count int, fromID string) ([]storage.Message, bool, error) {
|
||||
messages := make([]storage.Message, count)
|
||||
hasMore := false
|
||||
|
||||
s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := b.Cursor()
|
||||
|
||||
if fromID != "" {
|
||||
c.Seek([]byte(fromID))
|
||||
|
||||
for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() {
|
||||
count--
|
||||
messages[count].Unmarshal(v)
|
||||
}
|
||||
} else {
|
||||
for k, v := c.Last(); count > 0 && k != nil; k, v = c.Prev() {
|
||||
count--
|
||||
messages[count].Unmarshal(v)
|
||||
}
|
||||
}
|
||||
|
||||
c.Next()
|
||||
k, _ := c.Prev()
|
||||
hasMore = k != nil
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if count == 0 {
|
||||
return messages, hasMore, nil
|
||||
} else if count < len(messages) {
|
||||
return messages[count:], hasMore, nil
|
||||
}
|
||||
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetMessagesByID(server, channel string, ids []string) ([]storage.Message, error) {
|
||||
messages := make([]storage.Message, len(ids))
|
||||
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
|
||||
|
||||
for i, id := range ids {
|
||||
messages[i].Unmarshal(b.Get([]byte(id)))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return messages, err
|
||||
}
|
||||
|
||||
func (s *BoltStore) GetSessions() ([]session.Session, error) {
|
||||
var sessions []session.Session
|
||||
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketSessions)
|
||||
|
||||
return b.ForEach(func(_ []byte, v []byte) error {
|
||||
session := session.Session{}
|
||||
_, err := session.Unmarshal(v)
|
||||
sessions = append(sessions, session)
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sessions, nil
|
||||
}
|
||||
|
||||
func (s *BoltStore) SaveSession(session *session.Session) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketSessions)
|
||||
|
||||
data, err := session.Marshal(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put([]byte(session.Key()), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltStore) DeleteSession(key string) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
return tx.Bucket(bucketSessions).Delete([]byte(key))
|
||||
})
|
||||
}
|
||||
|
||||
func serverID(user *storage.User, address string) []byte {
|
||||
id := make([]byte, 8+len(address))
|
||||
copy(id, user.IDBytes)
|
||||
copy(id[8:], address)
|
||||
return id
|
||||
}
|
||||
|
||||
func channelID(user *storage.User, server, channel string) []byte {
|
||||
id := make([]byte, 8+len(server)+1+len(channel))
|
||||
copy(id, user.IDBytes)
|
||||
copy(id[8:], server)
|
||||
copy(id[8+len(server)+1:], channel)
|
||||
return id
|
||||
}
|
||||
|
||||
func idToBytes(i uint64) []byte {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, i)
|
||||
return b
|
||||
}
|
||||
|
||||
func idFromBytes(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
|
@ -1,53 +1,46 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/khlieng/dispatch/pkg/session"
|
||||
)
|
||||
|
||||
var (
|
||||
Path directory
|
||||
|
||||
db *bolt.DB
|
||||
|
||||
bucketUsers = []byte("Users")
|
||||
bucketServers = []byte("Servers")
|
||||
bucketChannels = []byte("Channels")
|
||||
bucketMessages = []byte("Messages")
|
||||
)
|
||||
var Path directory
|
||||
|
||||
func Initialize(dir string) {
|
||||
Path = directory(dir)
|
||||
}
|
||||
|
||||
func Open() {
|
||||
var err error
|
||||
db, err = bolt.Open(Path.Database(), 0600, nil)
|
||||
if err != nil {
|
||||
log.Fatal("Could not open database:", err)
|
||||
}
|
||||
type Store interface {
|
||||
GetUsers() ([]User, error)
|
||||
SaveUser(*User) error
|
||||
DeleteUser(*User) error
|
||||
|
||||
db.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists(bucketUsers)
|
||||
tx.CreateBucketIfNotExists(bucketServers)
|
||||
tx.CreateBucketIfNotExists(bucketChannels)
|
||||
GetServers(*User) ([]Server, error)
|
||||
AddServer(*User, *Server) error
|
||||
RemoveServer(*User, string) error
|
||||
SetNick(*User, string, string) error
|
||||
SetServerName(*User, string, string) error
|
||||
|
||||
return nil
|
||||
})
|
||||
GetChannels(*User) ([]Channel, error)
|
||||
AddChannel(*User, *Channel) error
|
||||
RemoveChannel(*User, string, string) error
|
||||
}
|
||||
|
||||
func Close() {
|
||||
db.Close()
|
||||
type SessionStore interface {
|
||||
GetSessions() ([]session.Session, error)
|
||||
SaveSession(session *session.Session) error
|
||||
DeleteSession(key string) error
|
||||
}
|
||||
|
||||
func idToBytes(i uint64) []byte {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, i)
|
||||
return b
|
||||
type MessageStore interface {
|
||||
LogMessage(message *Message) error
|
||||
GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error)
|
||||
GetMessagesByID(server, channel string, ids []string) ([]Message, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
func idFromBytes(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
type MessageSearchProvider interface {
|
||||
SearchMessages(server, channel, q string) ([]string, error)
|
||||
Index(id string, message *Message) error
|
||||
Close()
|
||||
}
|
||||
|
|
326
storage/user.go
326
storage/user.go
|
@ -1,67 +1,33 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/blevesearch/bleve"
|
||||
"github.com/boltdb/bolt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type User struct {
|
||||
ID uint64
|
||||
IDBytes []byte
|
||||
Username string
|
||||
|
||||
id []byte
|
||||
messageLog *bolt.DB
|
||||
messageIndex bleve.Index
|
||||
store Store
|
||||
messageLog MessageStore
|
||||
messageIndex MessageSearchProvider
|
||||
certificate *tls.Certificate
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Port string `json:"port"`
|
||||
TLS bool `json:"tls,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Nick string `json:"nick"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Realname string `json:"realname,omitempty"`
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
Server string `json:"server"`
|
||||
Name string `json:"name"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
}
|
||||
|
||||
func NewUser() (*User, error) {
|
||||
user := &User{}
|
||||
|
||||
err := db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketUsers)
|
||||
|
||||
user.ID, _ = b.NextSequence()
|
||||
user.Username = strconv.FormatUint(user.ID, 10)
|
||||
|
||||
data, err := user.Marshal(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user.id = idToBytes(user.ID)
|
||||
return b.Put(user.id, data)
|
||||
})
|
||||
func NewUser(store Store) (*User, error) {
|
||||
user := &User{store: store}
|
||||
|
||||
err := store.SaveUser(user)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = user.openMessageLog()
|
||||
err = os.MkdirAll(Path.User(user.Username), 0700)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -69,179 +35,131 @@ func NewUser() (*User, error) {
|
|||
return user, nil
|
||||
}
|
||||
|
||||
func LoadUsers() []*User {
|
||||
var users []*User
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketUsers)
|
||||
|
||||
b.ForEach(func(k, _ []byte) error {
|
||||
id := idFromBytes(k)
|
||||
user := &User{
|
||||
ID: id,
|
||||
Username: strconv.FormatUint(id, 10),
|
||||
id: make([]byte, 8),
|
||||
}
|
||||
copy(user.id, k)
|
||||
|
||||
users = append(users, user)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
for _, user := range users {
|
||||
user.openMessageLog()
|
||||
user.loadCertificate()
|
||||
func LoadUsers(store Store) ([]User, error) {
|
||||
users, err := store.GetUsers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return users
|
||||
for i := range users {
|
||||
users[i].store = store
|
||||
users[i].loadCertificate()
|
||||
}
|
||||
|
||||
return users, nil
|
||||
}
|
||||
|
||||
func (u *User) GetServers() []Server {
|
||||
var servers []Server
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(bucketServers).Cursor()
|
||||
|
||||
for k, v := c.Seek(u.id); bytes.HasPrefix(k, u.id); k, v = c.Next() {
|
||||
server := Server{}
|
||||
server.Unmarshal(v)
|
||||
servers = append(servers, server)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return servers
|
||||
func (u *User) SetMessageStore(store MessageStore) {
|
||||
u.messageLog = store
|
||||
}
|
||||
|
||||
func (u *User) GetChannels() []Channel {
|
||||
var channels []Channel
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(bucketChannels).Cursor()
|
||||
|
||||
for k, v := c.Seek(u.id); bytes.HasPrefix(k, u.id); k, v = c.Next() {
|
||||
channel := Channel{}
|
||||
channel.Unmarshal(v)
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return channels
|
||||
}
|
||||
|
||||
func (u *User) AddServer(server Server) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
data, _ := server.Marshal(nil)
|
||||
|
||||
b.Put(u.serverID(server.Host), data)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) AddChannel(channel Channel) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketChannels)
|
||||
data, _ := channel.Marshal(nil)
|
||||
|
||||
b.Put(u.channelID(channel.Server, channel.Name), data)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) SetNick(nick, address string) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
id := u.serverID(address)
|
||||
|
||||
server := Server{}
|
||||
v := b.Get(id)
|
||||
if v != nil {
|
||||
server.Unmarshal(v)
|
||||
server.Nick = nick
|
||||
|
||||
data, _ := server.Marshal(nil)
|
||||
b.Put(id, data)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) SetServerName(name, address string) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketServers)
|
||||
id := u.serverID(address)
|
||||
|
||||
server := Server{}
|
||||
v := b.Get(id)
|
||||
if v != nil {
|
||||
server.Unmarshal(v)
|
||||
server.Name = name
|
||||
|
||||
data, _ := server.Marshal(nil)
|
||||
b.Put(id, data)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) RemoveServer(address string) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
serverID := u.serverID(address)
|
||||
tx.Bucket(bucketServers).Delete(serverID)
|
||||
|
||||
b := tx.Bucket(bucketChannels)
|
||||
c := b.Cursor()
|
||||
|
||||
for k, _ := c.Seek(serverID); bytes.HasPrefix(k, serverID); k, _ = c.Next() {
|
||||
b.Delete(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) RemoveChannel(server, channel string) {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketChannels)
|
||||
id := u.channelID(server, channel)
|
||||
|
||||
b.Delete(id)
|
||||
|
||||
return nil
|
||||
})
|
||||
func (u *User) SetMessageSearchProvider(search MessageSearchProvider) {
|
||||
u.messageIndex = search
|
||||
}
|
||||
|
||||
func (u *User) Remove() {
|
||||
db.Batch(func(tx *bolt.Tx) error {
|
||||
return tx.Bucket(bucketUsers).Delete(u.id)
|
||||
})
|
||||
u.closeMessageLog()
|
||||
u.store.DeleteUser(u)
|
||||
if u.messageLog != nil {
|
||||
u.messageLog.Close()
|
||||
}
|
||||
if u.messageIndex != nil {
|
||||
u.messageIndex.Close()
|
||||
}
|
||||
os.RemoveAll(Path.User(u.Username))
|
||||
}
|
||||
|
||||
func (u *User) serverID(address string) []byte {
|
||||
id := make([]byte, 8+len(address))
|
||||
copy(id, u.id)
|
||||
copy(id[8:], address)
|
||||
return id
|
||||
type Server struct {
|
||||
Name string
|
||||
Host string
|
||||
Port string
|
||||
TLS bool
|
||||
Password string
|
||||
Nick string
|
||||
Username string
|
||||
Realname string
|
||||
}
|
||||
|
||||
func (u *User) channelID(server, channel string) []byte {
|
||||
id := make([]byte, 8+len(server)+1+len(channel))
|
||||
copy(id, u.id)
|
||||
copy(id[8:], server)
|
||||
copy(id[8+len(server)+1:], channel)
|
||||
return id
|
||||
func (u *User) GetServers() ([]Server, error) {
|
||||
return u.store.GetServers(u)
|
||||
}
|
||||
|
||||
func (u *User) AddServer(server *Server) error {
|
||||
return u.store.AddServer(u, server)
|
||||
}
|
||||
|
||||
func (u *User) RemoveServer(address string) error {
|
||||
return u.store.RemoveServer(u, address)
|
||||
}
|
||||
|
||||
func (u *User) SetNick(nick, address string) error {
|
||||
return u.store.SetNick(u, nick, address)
|
||||
}
|
||||
|
||||
func (u *User) SetServerName(name, address string) error {
|
||||
return u.store.SetServerName(u, name, address)
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
Server string
|
||||
Name string
|
||||
Topic string
|
||||
}
|
||||
|
||||
func (u *User) GetChannels() ([]Channel, error) {
|
||||
return u.store.GetChannels(u)
|
||||
}
|
||||
|
||||
func (u *User) AddChannel(channel *Channel) error {
|
||||
return u.store.AddChannel(u, channel)
|
||||
}
|
||||
|
||||
func (u *User) RemoveChannel(server, channel string) error {
|
||||
return u.store.RemoveChannel(u, server, channel)
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
ID string `json:"-" bleve:"-"`
|
||||
Server string `json:"-" bleve:"server"`
|
||||
From string `bleve:"-"`
|
||||
To string `json:"-" bleve:"to"`
|
||||
Content string `bleve:"content"`
|
||||
Time int64 `bleve:"-"`
|
||||
}
|
||||
|
||||
func (m Message) Type() string {
|
||||
return "message"
|
||||
}
|
||||
|
||||
func (u *User) LogMessage(id, server, from, to, content string) error {
|
||||
message := &Message{
|
||||
ID: id,
|
||||
Server: server,
|
||||
From: from,
|
||||
To: to,
|
||||
Content: content,
|
||||
Time: time.Now().Unix(),
|
||||
}
|
||||
|
||||
err := u.messageLog.LogMessage(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return u.messageIndex.Index(id, message)
|
||||
}
|
||||
|
||||
func (u *User) GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error) {
|
||||
return u.messageLog.GetMessages(server, channel, count, fromID)
|
||||
}
|
||||
|
||||
func (u *User) GetLastMessages(server, channel string, count int) ([]Message, bool, error) {
|
||||
return u.GetMessages(server, channel, count, "")
|
||||
}
|
||||
|
||||
func (u *User) SearchMessages(server, channel, q string) ([]Message, error) {
|
||||
ids, err := u.messageIndex.SearchMessages(server, channel, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return u.messageLog.GetMessagesByID(server, channel, ids)
|
||||
}
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve"
|
||||
"github.com/blevesearch/bleve/analysis/analyzer/keyword"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
ID string `json:"-" bleve:"-"`
|
||||
Server string `json:"-" bleve:"server"`
|
||||
From string `json:"from" bleve:"-"`
|
||||
To string `json:"-" bleve:"to"`
|
||||
Content string `json:"content" bleve:"content"`
|
||||
Time int64 `json:"time" bleve:"-"`
|
||||
}
|
||||
|
||||
func (m Message) Type() string {
|
||||
return "message"
|
||||
}
|
||||
|
||||
func (u *User) LogMessage(id, server, from, to, content string) error {
|
||||
message := Message{
|
||||
ID: id,
|
||||
Server: server,
|
||||
From: from,
|
||||
To: to,
|
||||
Content: content,
|
||||
Time: time.Now().Unix(),
|
||||
}
|
||||
|
||||
err := u.messageLog.Batch(func(tx *bolt.Tx) error {
|
||||
b, err := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(server + ":" + to))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := message.Marshal(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put([]byte(id), data)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return u.messageIndex.Index(id, message)
|
||||
}
|
||||
|
||||
func (u *User) GetLastMessages(server, channel string, count int) ([]Message, bool, error) {
|
||||
return u.GetMessages(server, channel, count, "")
|
||||
}
|
||||
|
||||
func (u *User) GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error) {
|
||||
messages := make([]Message, count)
|
||||
hasMore := false
|
||||
|
||||
u.messageLog.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := b.Cursor()
|
||||
|
||||
if fromID != "" {
|
||||
c.Seek([]byte(fromID))
|
||||
|
||||
for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() {
|
||||
count--
|
||||
messages[count].Unmarshal(v)
|
||||
}
|
||||
} else {
|
||||
for k, v := c.Last(); count > 0 && k != nil; k, v = c.Prev() {
|
||||
count--
|
||||
messages[count].Unmarshal(v)
|
||||
}
|
||||
}
|
||||
|
||||
c.Next()
|
||||
k, _ := c.Prev()
|
||||
hasMore = k != nil
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if count == 0 {
|
||||
return messages, hasMore, nil
|
||||
} else if count < len(messages) {
|
||||
return messages[count:], hasMore, nil
|
||||
}
|
||||
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (u *User) SearchMessages(server, channel, q string) ([]Message, error) {
|
||||
serverQuery := bleve.NewMatchQuery(server)
|
||||
serverQuery.SetField("server")
|
||||
channelQuery := bleve.NewMatchQuery(channel)
|
||||
channelQuery.SetField("to")
|
||||
contentQuery := bleve.NewMatchQuery(q)
|
||||
contentQuery.SetField("content")
|
||||
contentQuery.SetFuzziness(2)
|
||||
|
||||
query := bleve.NewBooleanQuery()
|
||||
query.AddMust(serverQuery, channelQuery, contentQuery)
|
||||
|
||||
search := bleve.NewSearchRequest(query)
|
||||
searchResults, err := u.messageIndex.Search(search)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
messages := []Message{}
|
||||
u.messageLog.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
|
||||
|
||||
for _, hit := range searchResults.Hits {
|
||||
message := Message{}
|
||||
message.Unmarshal(b.Get([]byte(hit.ID)))
|
||||
messages = append(messages, message)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (u *User) openMessageLog() error {
|
||||
err := os.MkdirAll(Path.User(u.Username), 0700)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.messageLog, err = bolt.Open(Path.Log(u.Username), 0600, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.messageLog.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists(bucketMessages)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
indexPath := Path.Index(u.Username)
|
||||
u.messageIndex, err = bleve.Open(indexPath)
|
||||
if err == bleve.ErrorIndexPathDoesNotExist {
|
||||
keywordMapping := bleve.NewTextFieldMapping()
|
||||
keywordMapping.Analyzer = keyword.Name
|
||||
keywordMapping.Store = false
|
||||
keywordMapping.IncludeTermVectors = false
|
||||
keywordMapping.IncludeInAll = false
|
||||
|
||||
contentMapping := bleve.NewTextFieldMapping()
|
||||
contentMapping.Analyzer = "en"
|
||||
contentMapping.Store = false
|
||||
contentMapping.IncludeTermVectors = false
|
||||
contentMapping.IncludeInAll = false
|
||||
|
||||
messageMapping := bleve.NewDocumentMapping()
|
||||
messageMapping.StructTagKey = "bleve"
|
||||
messageMapping.AddFieldMappingsAt("server", keywordMapping)
|
||||
messageMapping.AddFieldMappingsAt("to", keywordMapping)
|
||||
messageMapping.AddFieldMappingsAt("content", contentMapping)
|
||||
|
||||
mapping := bleve.NewIndexMapping()
|
||||
mapping.AddDocumentMapping("message", messageMapping)
|
||||
|
||||
u.messageIndex, err = bleve.New(indexPath, mapping)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *User) closeMessageLog() {
|
||||
u.messageLog.Close()
|
||||
u.messageIndex.Close()
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package storage_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
@ -6,6 +6,9 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/khlieng/dispatch/storage"
|
||||
"github.com/khlieng/dispatch/storage/bleve"
|
||||
"github.com/khlieng/dispatch/storage/boltdb"
|
||||
"github.com/kjk/betterguid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -16,81 +19,96 @@ func tempdir() string {
|
|||
}
|
||||
|
||||
func TestUser(t *testing.T) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
assert.Nil(t, r)
|
||||
}()
|
||||
storage.Initialize(tempdir())
|
||||
|
||||
Initialize(tempdir())
|
||||
Open()
|
||||
db, err := boltdb.New(storage.Path.Database())
|
||||
assert.Nil(t, err)
|
||||
|
||||
srv := Server{
|
||||
user, err := storage.NewUser(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
srv := storage.Server{
|
||||
Name: "Freenode",
|
||||
Host: "irc.freenode.net",
|
||||
Nick: "test",
|
||||
}
|
||||
chan1 := Channel{
|
||||
chan1 := storage.Channel{
|
||||
Server: srv.Host,
|
||||
Name: "#test",
|
||||
}
|
||||
chan2 := Channel{
|
||||
chan2 := storage.Channel{
|
||||
Server: srv.Host,
|
||||
Name: "#testing",
|
||||
}
|
||||
|
||||
user, err := NewUser()
|
||||
assert.Nil(t, err)
|
||||
user.AddServer(srv)
|
||||
user.AddChannel(chan1)
|
||||
user.AddChannel(chan2)
|
||||
user.closeMessageLog()
|
||||
user.AddServer(&srv)
|
||||
user.AddChannel(&chan1)
|
||||
user.AddChannel(&chan2)
|
||||
|
||||
users := LoadUsers()
|
||||
users, err := storage.LoadUsers(db)
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, users, 1)
|
||||
|
||||
user = users[0]
|
||||
user = &users[0]
|
||||
assert.Equal(t, uint64(1), user.ID)
|
||||
|
||||
servers := user.GetServers()
|
||||
servers, err := user.GetServers()
|
||||
assert.Len(t, servers, 1)
|
||||
assert.Equal(t, srv, servers[0])
|
||||
|
||||
channels := user.GetChannels()
|
||||
channels, err := user.GetChannels()
|
||||
assert.Len(t, channels, 2)
|
||||
assert.Equal(t, chan1, channels[0])
|
||||
assert.Equal(t, chan2, channels[1])
|
||||
|
||||
user.SetNick("bob", srv.Host)
|
||||
assert.Equal(t, "bob", user.GetServers()[0].Nick)
|
||||
servers, err = user.GetServers()
|
||||
assert.Equal(t, "bob", servers[0].Nick)
|
||||
|
||||
user.SetServerName("cake", srv.Host)
|
||||
assert.Equal(t, "cake", user.GetServers()[0].Name)
|
||||
servers, err = user.GetServers()
|
||||
assert.Equal(t, "cake", servers[0].Name)
|
||||
|
||||
user.RemoveChannel(srv.Host, chan1.Name)
|
||||
channels = user.GetChannels()
|
||||
channels, err = user.GetChannels()
|
||||
assert.Len(t, channels, 1)
|
||||
assert.Equal(t, chan2, channels[0])
|
||||
|
||||
user.RemoveServer(srv.Host)
|
||||
assert.Len(t, user.GetServers(), 0)
|
||||
assert.Len(t, user.GetChannels(), 0)
|
||||
servers, err = user.GetServers()
|
||||
assert.Len(t, servers, 0)
|
||||
channels, err = user.GetChannels()
|
||||
assert.Len(t, channels, 0)
|
||||
|
||||
user.Remove()
|
||||
_, err = os.Stat(Path.User(user.Username))
|
||||
_, err = os.Stat(storage.Path.User(user.Username))
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
|
||||
for _, storedUser := range LoadUsers() {
|
||||
assert.NotEqual(t, user.ID, storedUser.ID)
|
||||
users, err = storage.LoadUsers(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
for i := range users {
|
||||
assert.NotEqual(t, user.ID, users[i].ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessages(t *testing.T) {
|
||||
Initialize(tempdir())
|
||||
Open()
|
||||
storage.Initialize(tempdir())
|
||||
|
||||
user, err := NewUser()
|
||||
db, err := boltdb.New(storage.Path.Database())
|
||||
assert.Nil(t, err)
|
||||
|
||||
user, err := storage.NewUser(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
os.MkdirAll(storage.Path.User(user.Username), 0700)
|
||||
|
||||
search, err := bleve.New(storage.Path.Index(user.Username))
|
||||
assert.Nil(t, err)
|
||||
|
||||
user.SetMessageStore(db)
|
||||
user.SetMessageSearchProvider(search)
|
||||
|
||||
messages, hasMore, err := user.GetMessages("irc.freenode.net", "#go-nuts", 10, "6")
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, hasMore)
|
||||
|
@ -152,5 +170,5 @@ func TestMessages(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.True(t, len(messages) > 0)
|
||||
|
||||
Close()
|
||||
db.Close()
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue