dispatch/storage/user_messages.go

203 lines
4.5 KiB
Go
Raw Normal View History

package storage
import (
"os"
"time"
2016-03-01 00:51:26 +00:00
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/keyword"
2016-03-01 00:51:26 +00:00
"github.com/boltdb/bolt"
)
2016-01-22 17:30:47 +00:00
type Message struct {
2017-04-20 03:32:22 +00:00
ID string `json:"id" bleve:"-"`
Server string `json:"-" bleve:"server"`
From string `json:"from" bleve:"-"`
To string `json:"-" bleve:"to"`
Content string `json:"content" bleve:"content"`
Time int64 `json:"time" bleve:"-"`
}
func (m Message) Type() string {
return "message"
2016-01-22 17:30:47 +00:00
}
2017-04-20 03:32:22 +00:00
func (u *User) LogMessage(id, server, from, to, content string) error {
2016-01-17 20:15:29 +00:00
message := Message{
2017-04-20 03:32:22 +00:00
ID: id,
2016-01-17 20:15:29 +00:00
Server: server,
From: from,
To: to,
Content: content,
Time: time.Now().Unix(),
}
2016-01-17 20:15:29 +00:00
err := u.messageLog.Batch(func(tx *bolt.Tx) error {
2017-04-20 03:32:22 +00:00
b, err := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(server + ":" + to))
2016-01-17 20:15:29 +00:00
if err != nil {
return err
}
2016-01-22 17:30:47 +00:00
data, err := message.Marshal(nil)
2016-01-17 20:15:29 +00:00
if err != nil {
return err
}
2017-04-20 03:32:22 +00:00
return b.Put([]byte(id), data)
})
2016-01-17 20:15:29 +00:00
if err != nil {
return err
}
2017-04-20 03:32:22 +00:00
return u.messageIndex.Index(id, message)
}
func (u *User) GetLastMessages(server, channel string, count int) ([]Message, error) {
messages := make([]Message, count)
u.messageLog.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
if b == nil {
return nil
}
c := b.Cursor()
2016-01-17 20:15:29 +00:00
for _, v := c.Last(); count > 0 && v != nil; _, v = c.Prev() {
count--
2016-01-22 17:30:47 +00:00
messages[count].Unmarshal(v)
}
return nil
})
2016-01-18 02:21:58 +00:00
if count == 0 {
return messages, nil
} else if count < len(messages) {
return messages[count:], nil
}
2016-01-18 02:21:58 +00:00
return nil, nil
}
2017-04-20 03:32:22 +00:00
func (u *User) GetMessages(server, channel string, count int, fromID string) ([]Message, error) {
messages := make([]Message, count)
u.messageLog.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
if b == nil {
return nil
}
c := b.Cursor()
2017-04-20 03:32:22 +00:00
c.Seek([]byte(fromID))
for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() {
count--
2016-01-22 17:30:47 +00:00
messages[count].Unmarshal(v)
}
return nil
})
2016-01-18 02:21:58 +00:00
if count == 0 {
return messages, nil
} else if count < len(messages) {
return messages[count:], nil
}
return nil, nil
}
func (u *User) SearchMessages(server, channel, q string) ([]Message, error) {
serverQuery := bleve.NewMatchQuery(server)
serverQuery.SetField("server")
channelQuery := bleve.NewMatchQuery(channel)
channelQuery.SetField("to")
contentQuery := bleve.NewMatchQuery(q)
contentQuery.SetField("content")
contentQuery.SetFuzziness(2)
query := bleve.NewBooleanQuery()
query.AddMust(serverQuery, channelQuery, contentQuery)
search := bleve.NewSearchRequest(query)
searchResults, err := u.messageIndex.Search(search)
if err != nil {
return nil, err
}
messages := []Message{}
u.messageLog.View(func(tx *bolt.Tx) error {
2017-04-20 03:32:22 +00:00
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
for _, hit := range searchResults.Hits {
2016-01-17 22:33:52 +00:00
message := Message{}
2017-04-20 03:32:22 +00:00
message.Unmarshal(b.Get([]byte(hit.ID)))
messages = append(messages, message)
}
return nil
})
return messages, nil
}
func (u *User) openMessageLog() error {
err := os.MkdirAll(Path.User(u.Username), 0700)
if err != nil {
return err
}
u.messageLog, err = bolt.Open(Path.Log(u.Username), 0600, nil)
if err != nil {
return err
}
u.messageLog.Update(func(tx *bolt.Tx) error {
tx.CreateBucketIfNotExists(bucketMessages)
return nil
})
indexPath := Path.Index(u.Username)
u.messageIndex, err = bleve.Open(indexPath)
if err == bleve.ErrorIndexPathDoesNotExist {
keywordMapping := bleve.NewTextFieldMapping()
keywordMapping.Analyzer = keyword.Name
keywordMapping.Store = false
keywordMapping.IncludeTermVectors = false
keywordMapping.IncludeInAll = false
contentMapping := bleve.NewTextFieldMapping()
contentMapping.Analyzer = "en"
contentMapping.Store = false
contentMapping.IncludeTermVectors = false
contentMapping.IncludeInAll = false
messageMapping := bleve.NewDocumentMapping()
messageMapping.StructTagKey = "bleve"
messageMapping.AddFieldMappingsAt("server", keywordMapping)
messageMapping.AddFieldMappingsAt("to", keywordMapping)
messageMapping.AddFieldMappingsAt("content", contentMapping)
mapping := bleve.NewIndexMapping()
mapping.AddDocumentMapping("message", messageMapping)
u.messageIndex, err = bleve.New(indexPath, mapping)
if err != nil {
return err
}
} else if err != nil {
return err
}
return nil
}
func (u *User) closeMessageLog() {
u.messageLog.Close()
u.messageIndex.Close()
}