Add message logging and search server side
This commit is contained in:
parent
6378131a9d
commit
3365832ce3
738 changed files with 143131 additions and 112 deletions
|
@ -1,36 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
|
||||
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
var db *bolt.DB
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
currentUser, _ := user.Current()
|
||||
appDir := path.Join(currentUser.HomeDir, ".name_pending")
|
||||
|
||||
os.Mkdir(appDir, 0777)
|
||||
|
||||
db, err = bolt.Open(path.Join(appDir, "data.db"), 0600, nil)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to open database file")
|
||||
}
|
||||
|
||||
db.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists([]byte("Users"))
|
||||
tx.CreateBucketIfNotExists([]byte("Servers"))
|
||||
tx.CreateBucketIfNotExists([]byte("Channels"))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func Cleanup() {
|
||||
db.Close()
|
||||
}
|
53
storage/storage.go
Normal file
53
storage/storage.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
|
||||
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/boltdb/bolt"
|
||||
|
||||
"github.com/khlieng/name_pending/args"
|
||||
)
|
||||
|
||||
var (
|
||||
appDir string
|
||||
|
||||
db *bolt.DB
|
||||
|
||||
bucketUsers = []byte("Users")
|
||||
bucketServers = []byte("Servers")
|
||||
bucketChannels = []byte("Channels")
|
||||
bucketMessages = []byte("Messages")
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
currentUser, _ := user.Current()
|
||||
appDir = path.Join(currentUser.HomeDir, ".name_pending")
|
||||
|
||||
if args.Development {
|
||||
os.RemoveAll(appDir)
|
||||
}
|
||||
|
||||
os.Mkdir(appDir, 0777)
|
||||
os.Mkdir(path.Join(appDir, "logs"), 0777)
|
||||
|
||||
db, err = bolt.Open(path.Join(appDir, "data.db"), 0600, nil)
|
||||
if err != nil {
|
||||
log.Fatal("Could not open database file")
|
||||
}
|
||||
|
||||
db.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists(bucketUsers)
|
||||
tx.CreateBucketIfNotExists(bucketServers)
|
||||
tx.CreateBucketIfNotExists(bucketChannels)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func Cleanup() {
|
||||
db.Close()
|
||||
}
|
156
storage/user.go
156
storage/user.go
|
@ -3,7 +3,12 @@ package storage
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/blevesearch/bleve"
|
||||
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
|
@ -24,12 +29,24 @@ type Channel struct {
|
|||
Topic string `json:"topic,omitempty"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
UUID string
|
||||
type Message struct {
|
||||
ID uint64 `json:"id"`
|
||||
Server string `json:"server"`
|
||||
From string `json:"from"`
|
||||
To string `json:"to"`
|
||||
Content string `json:"content"`
|
||||
Time int64 `json:"time"`
|
||||
}
|
||||
|
||||
func NewUser(uuid string) User {
|
||||
user := User{
|
||||
type User struct {
|
||||
UUID string
|
||||
|
||||
messageLog *bolt.DB
|
||||
messageIndex bleve.Index
|
||||
}
|
||||
|
||||
func NewUser(uuid string) *User {
|
||||
user := &User{
|
||||
UUID: uuid,
|
||||
}
|
||||
|
||||
|
@ -42,17 +59,22 @@ func NewUser(uuid string) User {
|
|||
return nil
|
||||
})
|
||||
|
||||
go user.openMessageLog()
|
||||
|
||||
return user
|
||||
}
|
||||
|
||||
func LoadUsers() []User {
|
||||
var users []User
|
||||
func LoadUsers() []*User {
|
||||
var users []*User
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("Users"))
|
||||
|
||||
b.ForEach(func(k, v []byte) error {
|
||||
users = append(users, User{string(k)})
|
||||
user := User{UUID: string(k)}
|
||||
go user.openMessageLog()
|
||||
|
||||
users = append(users, &user)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -63,7 +85,7 @@ func LoadUsers() []User {
|
|||
return users
|
||||
}
|
||||
|
||||
func (u User) GetServers() []Server {
|
||||
func (u *User) GetServers() []Server {
|
||||
var servers []Server
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
|
@ -82,7 +104,7 @@ func (u User) GetServers() []Server {
|
|||
return servers
|
||||
}
|
||||
|
||||
func (u User) GetChannels() []Channel {
|
||||
func (u *User) GetChannels() []Channel {
|
||||
var channels []Channel
|
||||
|
||||
db.View(func(tx *bolt.Tx) error {
|
||||
|
@ -102,7 +124,7 @@ func (u User) GetChannels() []Channel {
|
|||
return channels
|
||||
}
|
||||
|
||||
func (u User) AddServer(server Server) {
|
||||
func (u *User) AddServer(server Server) {
|
||||
go db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("Servers"))
|
||||
data, _ := json.Marshal(server)
|
||||
|
@ -113,7 +135,7 @@ func (u User) AddServer(server Server) {
|
|||
})
|
||||
}
|
||||
|
||||
func (u User) AddChannel(channel Channel) {
|
||||
func (u *User) AddChannel(channel Channel) {
|
||||
go db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("Channels"))
|
||||
data, _ := json.Marshal(channel)
|
||||
|
@ -124,7 +146,7 @@ func (u User) AddChannel(channel Channel) {
|
|||
})
|
||||
}
|
||||
|
||||
func (u User) SetNick(nick, address string) {
|
||||
func (u *User) SetNick(nick, address string) {
|
||||
go db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("Servers"))
|
||||
id := []byte(u.UUID + ":" + address)
|
||||
|
@ -140,7 +162,7 @@ func (u User) SetNick(nick, address string) {
|
|||
})
|
||||
}
|
||||
|
||||
func (u User) RemoveServer(address string) {
|
||||
func (u *User) RemoveServer(address string) {
|
||||
go db.Update(func(tx *bolt.Tx) error {
|
||||
serverID := []byte(u.UUID + ":" + address)
|
||||
|
||||
|
@ -157,10 +179,116 @@ func (u User) RemoveServer(address string) {
|
|||
})
|
||||
}
|
||||
|
||||
func (u User) RemoveChannel(server, channel string) {
|
||||
func (u *User) RemoveChannel(server, channel string) {
|
||||
go db.Update(func(tx *bolt.Tx) error {
|
||||
tx.Bucket([]byte("Channels")).Delete([]byte(u.UUID + ":" + server + ":" + channel))
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) LogMessage(server, from, to, content string) {
|
||||
go u.messageLog.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages)
|
||||
messageID, _ := b.NextSequence()
|
||||
id := server + ":" + to + ":" + strconv.FormatUint(messageID, 10)
|
||||
|
||||
message := Message{
|
||||
ID: messageID,
|
||||
Content: content,
|
||||
Server: server,
|
||||
From: from,
|
||||
To: to,
|
||||
Time: time.Now().Unix(),
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(message)
|
||||
b.Put([]byte(id), data)
|
||||
|
||||
go u.messageIndex.Index(id, message)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (u *User) GetMessages(server, channel string, count int, fromID uint64) ([]Message, error) {
|
||||
messages := make([]Message, count)
|
||||
i := count - 1
|
||||
prefix := []byte(server + ":" + channel + ":" + strconv.FormatUint(fromID, 10))
|
||||
|
||||
u.messageLog.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(bucketMessages).Cursor()
|
||||
|
||||
for k, v := c.Seek(prefix); i > 0 && bytes.HasPrefix(k, prefix); k, v = c.Prev() {
|
||||
var message Message
|
||||
|
||||
json.Unmarshal(v, &message)
|
||||
messages[i] = message
|
||||
i--
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return messages[i:], nil
|
||||
}
|
||||
|
||||
func (u *User) SearchMessages(server, channel, phrase string) ([]Message, error) {
|
||||
serverQuery := bleve.NewMatchQuery(server)
|
||||
serverQuery.SetField("Server")
|
||||
channelQuery := bleve.NewMatchQuery(channel)
|
||||
channelQuery.SetField("To")
|
||||
contentQuery := bleve.NewMatchQuery(phrase)
|
||||
contentQuery.SetField("Content")
|
||||
|
||||
query := bleve.NewBooleanQuery([]bleve.Query{serverQuery, channelQuery, contentQuery}, nil, nil)
|
||||
|
||||
search := bleve.NewSearchRequest(query)
|
||||
searchResults, err := u.messageIndex.Search(search)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var messages []Message
|
||||
u.messageLog.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketMessages)
|
||||
|
||||
for _, hit := range searchResults.Hits {
|
||||
var message Message
|
||||
|
||||
json.Unmarshal(b.Get([]byte(hit.ID)), &message)
|
||||
messages = append(messages, message)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (u *User) openMessageLog() {
|
||||
var err error
|
||||
|
||||
u.messageLog, err = bolt.Open(path.Join(appDir, "logs", u.UUID+"_log"), 0600, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
u.messageLog.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists(bucketMessages)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
indexPath := path.Join(appDir, "logs", u.UUID+"_index")
|
||||
u.messageIndex, err = bleve.Open(indexPath)
|
||||
if err == bleve.ErrorIndexPathDoesNotExist {
|
||||
mapping := bleve.NewIndexMapping()
|
||||
u.messageIndex, err = bleve.New(indexPath, mapping)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
} else if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue