Add GetLastMessages, log messages to nested buckets

This commit is contained in:
Ken-Håvard Lieng 2015-05-21 09:15:57 +02:00
parent a6d7403eba
commit 6a25c1d6a5
3 changed files with 61 additions and 27 deletions

View File

@ -12,6 +12,7 @@ var messages = Immutable.Map();
var empty = Immutable.List();
var Message = Immutable.Record({
id: null,
server: null,
from: null,
to: null,

File diff suppressed because one or more lines are too long

View File

@ -6,6 +6,7 @@ import (
"log"
"path"
"strconv"
"strings"
"time"
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/blevesearch/bleve"
@ -189,12 +190,13 @@ func (u *User) RemoveChannel(server, channel string) {
func (u *User) LogMessage(server, from, to, content string) {
go u.messageLog.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketMessages)
messageID, _ := b.NextSequence()
id := server + ":" + to + ":" + strconv.FormatUint(messageID, 10)
bucketKey := server + ":" + to
b, _ := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(bucketKey))
id, _ := b.NextSequence()
idStr := strconv.FormatUint(id, 10)
message := Message{
ID: messageID,
ID: id,
Content: content,
Server: server,
From: from,
@ -203,34 +205,65 @@ func (u *User) LogMessage(server, from, to, content string) {
}
data, _ := json.Marshal(message)
b.Put([]byte(id), data)
b.Put([]byte(idStr), data)
go u.messageIndex.Index(id, message)
go u.messageIndex.Index(bucketKey+":"+idStr, message)
return nil
})
}
func (u *User) GetMessages(server, channel string, count int, fromID uint64) ([]Message, error) {
func (u *User) GetLastMessages(server, channel string, count int) ([]Message, error) {
messages := make([]Message, count)
i := count - 1
prefix := []byte(server + ":" + channel + ":" + strconv.FormatUint(fromID, 10))
u.messageLog.View(func(tx *bolt.Tx) error {
c := tx.Bucket(bucketMessages).Cursor()
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
if b == nil {
return nil
}
for k, v := c.Seek(prefix); i > 0 && bytes.HasPrefix(k, prefix); k, v = c.Prev() {
var message Message
c := b.Cursor()
json.Unmarshal(v, &message)
messages[i] = message
i--
for k, v := c.Last(); count > 0 && k != nil; k, v = c.Prev() {
count--
json.Unmarshal(v, &messages[count])
}
return nil
})
return messages[i:], nil
if count < len(messages) {
return messages[count:], nil
} else {
return nil, nil
}
}
func (u *User) GetMessages(server, channel string, count int, fromID uint64) ([]Message, error) {
messages := make([]Message, count)
u.messageLog.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel))
if b == nil {
return nil
}
c := b.Cursor()
c.Seek([]byte(strconv.FormatUint(fromID, 10)))
for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() {
count--
json.Unmarshal(v, &messages[count])
}
return nil
})
if count < len(messages) {
return messages[count:], nil
} else {
return nil, nil
}
}
func (u *User) SearchMessages(server, channel, phrase string) ([]Message, error) {
@ -249,16 +282,16 @@ func (u *User) SearchMessages(server, channel, phrase string) ([]Message, error)
return nil, err
}
log.Printf("%.3fms\n", searchResults.Took.Seconds()*1000)
messages := []Message{}
u.messageLog.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketMessages)
for _, hit := range searchResults.Hits {
idx := strings.LastIndex(hit.ID, ":")
bc := b.Bucket([]byte(hit.ID[:idx]))
var message Message
json.Unmarshal(b.Get([]byte(hit.ID)), &message)
json.Unmarshal(bc.Get([]byte(hit.ID[idx+1:])), &message)
messages = append(messages, message)
}