From 24f9553aa5e29308c75f3a158378e4766403ee34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ken-H=C3=A5vard=20Lieng?= Date: Thu, 31 May 2018 23:24:59 +0200 Subject: [PATCH] Persist, renew and delete sessions, refactor storage package, move reusable packages to pkg --- commands/dispatch.go | 23 +- {irc => pkg/irc}/client.go | 0 {irc => pkg/irc}/client_test.go | 0 {irc => pkg/irc}/conn.go | 0 {irc => pkg/irc}/conn_test.go | 0 {irc => pkg/irc}/const.go | 0 {irc => pkg/irc}/message.go | 0 {irc => pkg/irc}/message_test.go | 0 {letsencrypt => pkg/letsencrypt}/directory.go | 0 .../letsencrypt}/letsencrypt.go | 0 {letsencrypt => pkg/letsencrypt}/user.go | 0 {letsencrypt => pkg/letsencrypt}/user_test.go | 0 links/links.go => pkg/linkmeta/linkmeta.go | 2 +- pkg/session/session.go | 106 +++++ pkg/session/session.schema | 5 + pkg/session/session.schema.gen.go | 112 ++++++ server/auth.go | 99 +++-- server/index_data.go | 98 ++--- server/index_data_easyjson.go | 4 +- server/irc.go | 48 +-- server/irc_handler.go | 64 +-- server/irc_handler_test.go | 26 +- server/json.go | 2 +- server/serve_files.go | 20 +- server/server.go | 120 ++++-- server/session.go | 253 ------------ server/state.go | 323 ++++++++++++++++ server/websocket_handler.go | 82 ++-- storage/bleve/bleve.go | 79 ++++ storage/boltdb/boltdb.go | 364 ++++++++++++++++++ storage/storage.go | 61 ++- storage/user.go | 326 ++++++---------- storage/user_messages.go | 191 --------- storage/user_test.go | 82 ++-- .../bleve/index/scorch/introducer.go | 5 + .../blevesearch/bleve/index/scorch/merge.go | 2 + .../bleve/index/scorch/persister.go | 2 + .../blevesearch/bleve/index/scorch/scorch.go | 12 +- .../bleve/index/scorch/segment/zap/dict.go | 13 +- .../index/scorch/segment/zap/docvalues.go | 19 +- .../bleve/index/scorch/segment/zap/merge.go | 24 +- .../bleve/index/scorch/segment/zap/segment.go | 22 +- .../bleve/index/scorch/snapshot_index.go | 192 ++++----- .../bleve/index/scorch/snapshot_segment.go | 48 ++- .../blevesearch/bleve/index/scorch/stats.go | 4 + .../blevesearch/bleve/numeric/prefix_coded.go | 4 + .../blevesearch/bleve/search/sort.go | 40 +- vendor/vendor.json | 166 ++++---- 48 files changed, 1872 insertions(+), 1171 deletions(-) rename {irc => pkg/irc}/client.go (100%) rename {irc => pkg/irc}/client_test.go (100%) rename {irc => pkg/irc}/conn.go (100%) rename {irc => pkg/irc}/conn_test.go (100%) rename {irc => pkg/irc}/const.go (100%) rename {irc => pkg/irc}/message.go (100%) rename {irc => pkg/irc}/message_test.go (100%) rename {letsencrypt => pkg/letsencrypt}/directory.go (100%) rename {letsencrypt => pkg/letsencrypt}/letsencrypt.go (100%) rename {letsencrypt => pkg/letsencrypt}/user.go (100%) rename {letsencrypt => pkg/letsencrypt}/user_test.go (100%) rename links/links.go => pkg/linkmeta/linkmeta.go (99%) create mode 100644 pkg/session/session.go create mode 100644 pkg/session/session.schema create mode 100644 pkg/session/session.schema.gen.go delete mode 100644 server/session.go create mode 100644 server/state.go create mode 100644 storage/bleve/bleve.go create mode 100644 storage/boltdb/boltdb.go delete mode 100644 storage/user_messages.go diff --git a/commands/dispatch.go b/commands/dispatch.go index 14306b6b..fe37c3b5 100644 --- a/commands/dispatch.go +++ b/commands/dispatch.go @@ -14,6 +14,8 @@ import ( "github.com/khlieng/dispatch/assets" "github.com/khlieng/dispatch/server" "github.com/khlieng/dispatch/storage" + "github.com/khlieng/dispatch/storage/bleve" + "github.com/khlieng/dispatch/storage/boltdb" ) const logo = ` @@ -61,10 +63,25 @@ var rootCmd = &cobra.Command{ } log.Println("Storing data at", storage.Path.Root()) - storage.Open() - defer storage.Close() + db, err := boltdb.New(storage.Path.Database()) + if err != nil { + log.Fatal(err) + } + defer db.Close() - server.Run() + srv := server.Dispatch{ + Store: db, + SessionStore: db, + + GetMessageStore: func(user *storage.User) (storage.MessageStore, error) { + return boltdb.New(storage.Path.Log(user.Username)) + }, + GetMessageSearchProvider: func(user *storage.User) (storage.MessageSearchProvider, error) { + return bleve.New(storage.Path.Index(user.Username)) + }, + } + + srv.Run() }, } diff --git a/irc/client.go b/pkg/irc/client.go similarity index 100% rename from irc/client.go rename to pkg/irc/client.go diff --git a/irc/client_test.go b/pkg/irc/client_test.go similarity index 100% rename from irc/client_test.go rename to pkg/irc/client_test.go diff --git a/irc/conn.go b/pkg/irc/conn.go similarity index 100% rename from irc/conn.go rename to pkg/irc/conn.go diff --git a/irc/conn_test.go b/pkg/irc/conn_test.go similarity index 100% rename from irc/conn_test.go rename to pkg/irc/conn_test.go diff --git a/irc/const.go b/pkg/irc/const.go similarity index 100% rename from irc/const.go rename to pkg/irc/const.go diff --git a/irc/message.go b/pkg/irc/message.go similarity index 100% rename from irc/message.go rename to pkg/irc/message.go diff --git a/irc/message_test.go b/pkg/irc/message_test.go similarity index 100% rename from irc/message_test.go rename to pkg/irc/message_test.go diff --git a/letsencrypt/directory.go b/pkg/letsencrypt/directory.go similarity index 100% rename from letsencrypt/directory.go rename to pkg/letsencrypt/directory.go diff --git a/letsencrypt/letsencrypt.go b/pkg/letsencrypt/letsencrypt.go similarity index 100% rename from letsencrypt/letsencrypt.go rename to pkg/letsencrypt/letsencrypt.go diff --git a/letsencrypt/user.go b/pkg/letsencrypt/user.go similarity index 100% rename from letsencrypt/user.go rename to pkg/letsencrypt/user.go diff --git a/letsencrypt/user_test.go b/pkg/letsencrypt/user_test.go similarity index 100% rename from letsencrypt/user_test.go rename to pkg/letsencrypt/user_test.go diff --git a/links/links.go b/pkg/linkmeta/linkmeta.go similarity index 99% rename from links/links.go rename to pkg/linkmeta/linkmeta.go index 52e48f09..ec840ac9 100644 --- a/links/links.go +++ b/pkg/linkmeta/linkmeta.go @@ -1,4 +1,4 @@ -package links +package linkmeta import ( "errors" diff --git a/pkg/session/session.go b/pkg/session/session.go new file mode 100644 index 00000000..dedce0c2 --- /dev/null +++ b/pkg/session/session.go @@ -0,0 +1,106 @@ +package session + +import ( + "crypto/rand" + "encoding/base64" + "net/http" + "sync" + "time" +) + +var ( + CookieName = "session" + + Expiration = time.Hour * 24 * 7 + RefreshInterval = time.Hour +) + +type Session struct { + UserID uint64 + + key string + createdAt int64 + expiration *time.Timer + lock sync.Mutex +} + +func New(id uint64) (*Session, error) { + key, err := newSessionKey() + if err != nil { + return nil, err + } + + return &Session{ + key: key, + createdAt: time.Now().Unix(), + UserID: id, + expiration: time.NewTimer(Expiration), + }, nil +} + +func (s *Session) Init() { + exp := time.Until(time.Unix(s.createdAt, 0).Add(Expiration)) + s.expiration = time.NewTimer(exp) +} + +func (s *Session) Key() string { + s.lock.Lock() + key := s.key + s.lock.Unlock() + return key +} + +func (s *Session) SetCookie(w http.ResponseWriter, r *http.Request) { + http.SetCookie(w, &http.Cookie{ + Name: CookieName, + Value: s.Key(), + Path: "/", + Expires: time.Now().Add(Expiration), + HttpOnly: true, + Secure: r.TLS != nil, + }) +} + +func (s *Session) Expired() bool { + s.lock.Lock() + created := time.Unix(s.createdAt, 0) + s.lock.Unlock() + return time.Since(created) > Expiration +} + +func (s *Session) Refresh() (string, bool, error) { + s.lock.Lock() + created := time.Unix(s.createdAt, 0) + s.lock.Unlock() + + if time.Since(created) > Expiration { + return "", true, nil + } + + if time.Since(created) > RefreshInterval { + key, err := newSessionKey() + if err != nil { + return "", false, err + } + + s.expiration.Reset(Expiration) + + s.lock.Lock() + s.createdAt = time.Now().Unix() + s.key = key + s.lock.Unlock() + return key, false, nil + } + + return "", false, nil +} + +func (s *Session) WaitUntilExpiration() { + <-s.expiration.C +} + +func newSessionKey() (string, error) { + key := make([]byte, 32) + _, err := rand.Read(key) + return base64.RawURLEncoding.EncodeToString(key), err +} diff --git a/pkg/session/session.schema b/pkg/session/session.schema new file mode 100644 index 00000000..a08f1ce7 --- /dev/null +++ b/pkg/session/session.schema @@ -0,0 +1,5 @@ +struct Session { + UserID uint64 + key string + createdAt int64 +} diff --git a/pkg/session/session.schema.gen.go b/pkg/session/session.schema.gen.go new file mode 100644 index 00000000..097ecdbb --- /dev/null +++ b/pkg/session/session.schema.gen.go @@ -0,0 +1,112 @@ +package session + +import ( + "io" + "time" + "unsafe" +) + +var ( + _ = unsafe.Sizeof(0) + _ = io.ReadFull + _ = time.Now() +) + +func (d *Session) Size() (s uint64) { + + { + l := uint64(len(d.key)) + + { + + t := l + for t >= 0x80 { + t >>= 7 + s++ + } + s++ + + } + s += l + } + s += 16 + return +} +func (d *Session) Marshal(buf []byte) ([]byte, error) { + size := d.Size() + { + if uint64(cap(buf)) >= size { + buf = buf[:size] + } else { + buf = make([]byte, size) + } + } + i := uint64(0) + + { + + *(*uint64)(unsafe.Pointer(&buf[0])) = d.UserID + + } + { + l := uint64(len(d.key)) + + { + + t := uint64(l) + + for t >= 0x80 { + buf[i+8] = byte(t) | 0x80 + t >>= 7 + i++ + } + buf[i+8] = byte(t) + i++ + + } + copy(buf[i+8:], d.key) + i += l + } + { + + *(*int64)(unsafe.Pointer(&buf[i+8])) = d.createdAt + + } + return buf[:i+16], nil +} + +func (d *Session) Unmarshal(buf []byte) (uint64, error) { + i := uint64(0) + + { + + d.UserID = *(*uint64)(unsafe.Pointer(&buf[i+0])) + + } + { + l := uint64(0) + + { + + bs := uint8(7) + t := uint64(buf[i+8] & 0x7F) + for buf[i+8]&0x80 == 0x80 { + i++ + t |= uint64(buf[i+8]&0x7F) << bs + bs += 7 + } + i++ + + l = t + + } + d.key = string(buf[i+8 : i+8+l]) + i += l + } + { + + d.createdAt = *(*int64)(unsafe.Pointer(&buf[i+8])) + + } + return i + 16, nil +} diff --git a/server/auth.go b/server/auth.go index c4809329..da32abd8 100644 --- a/server/auth.go +++ b/server/auth.go @@ -3,58 +3,95 @@ package server import ( "log" "net/http" - "time" + "github.com/khlieng/dispatch/pkg/session" "github.com/khlieng/dispatch/storage" ) -const ( - cookieName = "dispatch" -) +func (d *Dispatch) handleAuth(w http.ResponseWriter, r *http.Request, createUser bool) *State { + var state *State -func handleAuth(w http.ResponseWriter, r *http.Request, createUser bool) *Session { - var session *Session - - cookie, err := r.Cookie(cookieName) + cookie, err := r.Cookie(session.CookieName) if err != nil { if createUser { - session = newUser(w, r) + state, err = d.newUser(w, r) + if err != nil { + log.Println(err) + } } } else { - session = sessions.get(cookie.Value) + session := d.states.getSession(cookie.Value) if session != nil { - log.Println(r.RemoteAddr, "[Auth] GET", r.URL.Path, "| Valid token | User ID:", session.user.ID) + key := session.Key() + newKey, expired, err := session.Refresh() + if err != nil { + return nil + } + + if !expired { + state = d.states.get(session.UserID) + if newKey != "" { + d.states.setSession(session) + d.states.deleteSession(key) + session.SetCookie(w, r) + } + } + } + + if state != nil { + log.Println(r.RemoteAddr, "[Auth] GET", r.URL.Path, "| Valid token | User ID:", state.user.ID) } else if createUser { - session = newUser(w, r) + state, err = d.newUser(w, r) + if err != nil { + log.Println(err) + } } } - return session + return state } -func newUser(w http.ResponseWriter, r *http.Request) *Session { - user, err := storage.NewUser() +func (d *Dispatch) newUser(w http.ResponseWriter, r *http.Request) (*State, error) { + user, err := storage.NewUser(d.Store) if err != nil { - return nil + return nil, err } - log.Println(r.RemoteAddr, "[Auth] Create session | User ID:", user.ID) - - session, err := NewSession(user) + messageStore, err := d.GetMessageStore(user) if err != nil { - return nil + return nil, err } - sessions.set(session) - go session.run() + user.SetMessageStore(messageStore) - http.SetCookie(w, &http.Cookie{ - Name: cookieName, - Value: session.id, - Path: "/", - Expires: time.Now().AddDate(0, 1, 0), - HttpOnly: true, - Secure: r.TLS != nil, - }) + search, err := d.GetMessageSearchProvider(user) + if err != nil { + return nil, err + } + user.SetMessageSearchProvider(search) - return session + log.Println(r.RemoteAddr, "[Auth] New anonymous user | ID:", user.ID) + + session, err := session.New(user.ID) + if err != nil { + return nil, err + } + d.states.setSession(session) + go d.deleteSessionWhenExpired(session) + + state := NewState(user, d) + d.states.set(state) + go state.run() + + session.SetCookie(w, r) + + return state, nil +} + +func (d *Dispatch) deleteSessionWhenExpired(session *session.Session) { + deleteSessionWhenExpired(session, d.states) +} + +func deleteSessionWhenExpired(session *session.Session, stateStore *stateStore) { + session.WaitUntilExpiration() + stateStore.deleteSession(session.Key()) } diff --git a/server/index_data.go b/server/index_data.go index 74a06e1a..8525ea9d 100644 --- a/server/index_data.go +++ b/server/index_data.go @@ -11,55 +11,29 @@ import ( ) type connectDefaults struct { - Name string `json:"name,omitempty"` - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - Channels []string `json:"channels,omitempty"` - Password bool `json:"password,omitempty"` - SSL bool `json:"ssl,omitempty"` - ReadOnly bool `json:"readonly,omitempty"` - ShowDetails bool `json:"showDetails,omitempty"` + Name string + Host string + Port int + Channels []string + Password bool + SSL bool + ReadOnly bool + ShowDetails bool } type indexData struct { - Defaults connectDefaults `json:"defaults"` - Servers []Server `json:"servers,omitempty"` - Channels []storage.Channel `json:"channels,omitempty"` + Defaults connectDefaults + Servers []Server + Channels []storage.Channel // Users in the selected channel - Users *Userlist `json:"users,omitempty"` + Users *Userlist // Last messages in the selected channel - Messages *Messages `json:"messages,omitempty"` + Messages *Messages } -func (d *indexData) addUsersAndMessages(server, channel string, session *Session) { - users := channelStore.GetUsers(server, channel) - if len(users) > 0 { - d.Users = &Userlist{ - Server: server, - Channel: channel, - Users: users, - } - } - - messages, hasMore, err := session.user.GetLastMessages(server, channel, 50) - if err == nil && len(messages) > 0 { - m := Messages{ - Server: server, - To: channel, - Messages: messages, - } - - if hasMore { - m.Next = messages[0].ID - } - - d.Messages = &m - } -} - -func getIndexData(r *http.Request, session *Session) *indexData { +func getIndexData(r *http.Request, state *State) *indexData { data := indexData{} data.Defaults = connectDefaults{ @@ -73,12 +47,15 @@ func getIndexData(r *http.Request, session *Session) *indexData { ShowDetails: viper.GetBool("defaults.show_details"), } - if session == nil { + if state == nil { return &data } - servers := session.user.GetServers() - connections := session.getConnectionStates() + servers, err := state.user.GetServers() + if err != nil { + return nil + } + connections := state.getConnectionStates() for _, server := range servers { server.Password = "" server.Username = "" @@ -90,7 +67,10 @@ func getIndexData(r *http.Request, session *Session) *indexData { }) } - channels := session.user.GetChannels() + channels, err := state.user.GetChannels() + if err != nil { + return nil + } for i, channel := range channels { channels[i].Topic = channelStore.GetTopic(channel.Server, channel.Name) } @@ -98,18 +78,44 @@ func getIndexData(r *http.Request, session *Session) *indexData { server, channel := getTabFromPath(r.URL.EscapedPath()) if isInChannel(channels, server, channel) { - data.addUsersAndMessages(server, channel, session) + data.addUsersAndMessages(server, channel, state) return &data } server, channel = parseTabCookie(r, r.URL.Path) if isInChannel(channels, server, channel) { - data.addUsersAndMessages(server, channel, session) + data.addUsersAndMessages(server, channel, state) } return &data } +func (d *indexData) addUsersAndMessages(server, channel string, state *State) { + users := channelStore.GetUsers(server, channel) + if len(users) > 0 { + d.Users = &Userlist{ + Server: server, + Channel: channel, + Users: users, + } + } + + messages, hasMore, err := state.user.GetLastMessages(server, channel, 50) + if err == nil && len(messages) > 0 { + m := Messages{ + Server: server, + To: channel, + Messages: messages, + } + + if hasMore { + m.Next = messages[0].ID + } + + d.Messages = &m + } +} + func isInChannel(channels []storage.Channel, server, channel string) bool { if channel != "" { for _, ch := range channels { diff --git a/server/index_data_easyjson.go b/server/index_data_easyjson.go index ad94964d..7cb35d4d 100644 --- a/server/index_data_easyjson.go +++ b/server/index_data_easyjson.go @@ -344,7 +344,7 @@ func easyjson7e607aefDecodeGithubComKhliengDispatchServer1(in *jlexer.Lexer, out out.Password = bool(in.Bool()) case "ssl": out.SSL = bool(in.Bool()) - case "readonly": + case "readOnly": out.ReadOnly = bool(in.Bool()) case "showDetails": out.ShowDetails = bool(in.Bool()) @@ -432,7 +432,7 @@ func easyjson7e607aefEncodeGithubComKhliengDispatchServer1(out *jwriter.Writer, out.Bool(bool(in.SSL)) } if in.ReadOnly { - const prefix string = ",\"readonly\":" + const prefix string = ",\"readOnly\":" if first { first = false out.RawString(prefix[1:]) diff --git a/server/irc.go b/server/irc.go index 64c1df21..dd33d743 100644 --- a/server/irc.go +++ b/server/irc.go @@ -2,61 +2,35 @@ package server import ( "crypto/tls" - "log" "net" - "github.com/khlieng/dispatch/irc" - "github.com/khlieng/dispatch/storage" "github.com/spf13/viper" + + "github.com/khlieng/dispatch/pkg/irc" + "github.com/khlieng/dispatch/storage" ) -func createNickInUseHandler(i *irc.Client, session *Session) func(string) string { +func createNickInUseHandler(i *irc.Client, state *State) func(string) string { return func(nick string) string { newNick := nick + "_" if newNick == i.GetNick() { - session.sendJSON("nick_fail", NickFail{ + state.sendJSON("nick_fail", NickFail{ Server: i.Host, }) } - session.printError("Nickname", nick, "is already in use, using", newNick, "instead") + state.printError("Nickname", nick, "is already in use, using", newNick, "instead") return newNick } } -func reconnectIRC() { - for _, user := range storage.LoadUsers() { - session, err := NewSession(user) - if err != nil { - log.Println(err) - continue - } - sessions.set(session) - go session.run() - - channels := user.GetChannels() - - for _, server := range user.GetServers() { - i := connectIRC(server, session) - - var joining []string - for _, channel := range channels { - if channel.Server == server.Host { - joining = append(joining, channel.Name) - } - } - i.Join(joining...) - } - } -} - -func connectIRC(server storage.Server, session *Session) *irc.Client { +func connectIRC(server *storage.Server, state *State) *irc.Client { i := irc.NewClient(server.Nick, server.Username) i.TLS = server.TLS i.Realname = server.Realname - i.HandleNickInUse = createNickInUseHandler(i, session) + i.HandleNickInUse = createNickInUseHandler(i, state) address := server.Host if server.Port != "" { @@ -83,14 +57,14 @@ func connectIRC(server storage.Server, session *Session) *irc.Client { InsecureSkipVerify: !viper.GetBool("verify_certificates"), } - if cert := session.user.GetCertificate(); cert != nil { + if cert := state.user.GetCertificate(); cert != nil { i.TLSConfig.Certificates = []tls.Certificate{*cert} } } - session.setIRC(server.Host, i) + state.setIRC(server.Host, i) i.Connect(address) - go newIRCHandler(i, session).run() + go newIRCHandler(i, state).run() return i } diff --git a/server/irc_handler.go b/server/irc_handler.go index 3e7ac711..534930b3 100644 --- a/server/irc_handler.go +++ b/server/irc_handler.go @@ -8,7 +8,7 @@ import ( "github.com/kjk/betterguid" - "github.com/khlieng/dispatch/irc" + "github.com/khlieng/dispatch/pkg/irc" "github.com/khlieng/dispatch/storage" ) @@ -17,8 +17,8 @@ var excludedErrors = []string{ } type ircHandler struct { - client *irc.Client - session *Session + client *irc.Client + state *State whois WhoisReply userBuffers map[string][]string @@ -27,10 +27,10 @@ type ircHandler struct { handlers map[string]func(*irc.Message) } -func newIRCHandler(client *irc.Client, session *Session) *ircHandler { +func newIRCHandler(client *irc.Client, state *State) *ircHandler { i := &ircHandler{ client: client, - session: session, + state: state, userBuffers: make(map[string][]string), } i.initHandlers() @@ -43,15 +43,15 @@ func (i *ircHandler) run() { select { case msg, ok := <-i.client.Messages: if !ok { - i.session.deleteIRC(i.client.Host) + i.state.deleteIRC(i.client.Host) return } i.dispatchMessage(msg) case state := <-i.client.ConnectionChanged: - i.session.sendJSON("connection_update", newConnectionUpdate(i.client.Host, state)) - i.session.setConnectionState(i.client.Host, state) + i.state.sendJSON("connection_update", newConnectionUpdate(i.client.Host, state)) + i.state.setConnectionState(i.client.Host, state) if state.Error != nil && (lastConnErr == nil || state.Error.Error() != lastConnErr.Error()) { @@ -66,7 +66,7 @@ func (i *ircHandler) run() { func (i *ircHandler) dispatchMessage(msg *irc.Message) { if msg.Command[0] == '4' && !isExcludedError(msg.Command) { - i.session.printError(formatIRCError(msg)) + i.state.printError(formatIRCError(msg)) } if handler, ok := i.handlers[msg.Command]; ok { @@ -75,7 +75,7 @@ func (i *ircHandler) dispatchMessage(msg *irc.Message) { } func (i *ircHandler) nick(msg *irc.Message) { - i.session.sendJSON("nick", Nick{ + i.state.sendJSON("nick", Nick{ Server: i.client.Host, Old: msg.Nick, New: msg.LastParam(), @@ -84,12 +84,12 @@ func (i *ircHandler) nick(msg *irc.Message) { channelStore.RenameUser(msg.Nick, msg.LastParam(), i.client.Host) if msg.LastParam() == i.client.GetNick() { - go i.session.user.SetNick(msg.LastParam(), i.client.Host) + go i.state.user.SetNick(msg.LastParam(), i.client.Host) } } func (i *ircHandler) join(msg *irc.Message) { - i.session.sendJSON("join", Join{ + i.state.sendJSON("join", Join{ Server: i.client.Host, User: msg.Nick, Channels: msg.Params, @@ -102,9 +102,9 @@ func (i *ircHandler) join(msg *irc.Message) { // Incase no topic is set and theres a cached one that needs to be cleared i.client.Topic(channel) - i.session.sendLastMessages(i.client.Host, channel, 50) + i.state.sendLastMessages(i.client.Host, channel, 50) - go i.session.user.AddChannel(storage.Channel{ + go i.state.user.AddChannel(&storage.Channel{ Server: i.client.Host, Name: channel, }) @@ -122,12 +122,12 @@ func (i *ircHandler) part(msg *irc.Message) { part.Reason = msg.Params[1] } - i.session.sendJSON("part", part) + i.state.sendJSON("part", part) channelStore.RemoveUser(msg.Nick, i.client.Host, part.Channel) if msg.Nick == i.client.GetNick() { - go i.session.user.RemoveChannel(i.client.Host, part.Channel) + go i.state.user.RemoveChannel(i.client.Host, part.Channel) } } @@ -139,7 +139,7 @@ func (i *ircHandler) mode(msg *irc.Message) { mode.Channel = target mode.User = msg.Params[2] - i.session.sendJSON("mode", mode) + i.state.sendJSON("mode", mode) channelStore.SetMode(i.client.Host, target, msg.Params[2], mode.Add, mode.Remove) } @@ -154,20 +154,20 @@ func (i *ircHandler) message(msg *irc.Message) { } if msg.Params[0] == i.client.GetNick() { - i.session.sendJSON("pm", message) + i.state.sendJSON("pm", message) } else { message.To = msg.Params[0] - i.session.sendJSON("message", message) + i.state.sendJSON("message", message) } if msg.Params[0] != "*" { - go i.session.user.LogMessage(message.ID, + go i.state.user.LogMessage(message.ID, i.client.Host, msg.Nick, msg.Params[0], msg.LastParam()) } } func (i *ircHandler) quit(msg *irc.Message) { - i.session.sendJSON("quit", Quit{ + i.state.sendJSON("quit", Quit{ Server: i.client.Host, User: msg.Nick, Reason: msg.LastParam(), @@ -178,15 +178,15 @@ func (i *ircHandler) quit(msg *irc.Message) { func (i *ircHandler) info(msg *irc.Message) { if msg.Command == irc.ReplyWelcome { - i.session.sendJSON("nick", Nick{ + i.state.sendJSON("nick", Nick{ Server: i.client.Host, New: msg.Params[0], }) - go i.session.user.SetNick(msg.Params[0], i.client.Host) + go i.state.user.SetNick(msg.Params[0], i.client.Host) } - i.session.sendJSON("pm", Message{ + i.state.sendJSON("pm", Message{ Server: i.client.Host, From: msg.Nick, Content: strings.Join(msg.Params[1:], " "), @@ -210,7 +210,7 @@ func (i *ircHandler) whoisChannels(msg *irc.Message) { func (i *ircHandler) whoisEnd(msg *irc.Message) { if i.whois.Nick != "" { - i.session.sendJSON("whois", i.whois) + i.state.sendJSON("whois", i.whois) } i.whois = WhoisReply{} } @@ -226,7 +226,7 @@ func (i *ircHandler) topic(msg *irc.Message) { channel = msg.Params[1] } - i.session.sendJSON("topic", Topic{ + i.state.sendJSON("topic", Topic{ Server: i.client.Host, Channel: channel, Topic: msg.LastParam(), @@ -239,7 +239,7 @@ func (i *ircHandler) topic(msg *irc.Message) { func (i *ircHandler) noTopic(msg *irc.Message) { channel := msg.Params[1] - i.session.sendJSON("topic", Topic{ + i.state.sendJSON("topic", Topic{ Server: i.client.Host, Channel: channel, }) @@ -257,7 +257,7 @@ func (i *ircHandler) namesEnd(msg *irc.Message) { channel := msg.Params[1] users := i.userBuffers[channel] - i.session.sendJSON("users", Userlist{ + i.state.sendJSON("users", Userlist{ Server: i.client.Host, Channel: channel, Users: users, @@ -277,18 +277,18 @@ func (i *ircHandler) motd(msg *irc.Message) { } func (i *ircHandler) motdEnd(msg *irc.Message) { - i.session.sendJSON("motd", i.motdBuffer) + i.state.sendJSON("motd", i.motdBuffer) i.motdBuffer = MOTD{} } func (i *ircHandler) badNick(msg *irc.Message) { - i.session.sendJSON("nick_fail", NickFail{ + i.state.sendJSON("nick_fail", NickFail{ Server: i.client.Host, }) } func (i *ircHandler) error(msg *irc.Message) { - i.session.printError(msg.LastParam()) + i.state.printError(msg.LastParam()) } func (i *ircHandler) initHandlers() { @@ -327,7 +327,7 @@ func (i *ircHandler) initHandlers() { func (i *ircHandler) log(v ...interface{}) { s := fmt.Sprintln(v...) - log.Println("[IRC]", i.session.user.ID, i.client.Host, s[:len(s)-1]) + log.Println("[IRC]", i.state.user.ID, i.client.Host, s[:len(s)-1]) } func parseMode(mode string) *Mode { diff --git a/server/irc_handler_test.go b/server/irc_handler_test.go index d422f8fb..6fdb3e17 100644 --- a/server/irc_handler_test.go +++ b/server/irc_handler_test.go @@ -8,8 +8,9 @@ import ( "github.com/stretchr/testify/assert" - "github.com/khlieng/dispatch/irc" + "github.com/khlieng/dispatch/pkg/irc" "github.com/khlieng/dispatch/storage" + "github.com/khlieng/dispatch/storage/boltdb" ) var user *storage.User @@ -21,11 +22,18 @@ func TestMain(m *testing.M) { } storage.Initialize(tempdir) - storage.Open() - user, err = storage.NewUser() + + db, err := boltdb.New(storage.Path.Database()) if err != nil { - os.Exit(1) + log.Fatal(err) } + + user, err = storage.NewUser(db) + if err != nil { + log.Fatal(err) + } + user.SetMessageStore(db) + channelStore = storage.NewChannelStore() code := m.Run() @@ -41,7 +49,7 @@ func dispatchMessage(msg *irc.Message) WSResponse { func dispatchMessageMulti(msg *irc.Message) chan WSResponse { c := irc.NewClient("nick", "user") c.Host = "host.com" - s, _ := NewSession(user) + s := NewState(user, nil) newIRCHandler(c, s).dispatchMessage(msg) @@ -187,7 +195,7 @@ func TestHandleIRCWelcome(t *testing.T) { func TestHandleIRCWhois(t *testing.T) { c := irc.NewClient("nick", "user") c.Host = "host.com" - s, _ := NewSession(nil) + s := NewState(nil, nil) i := newIRCHandler(c, s) i.dispatchMessage(&irc.Message{ @@ -255,7 +263,7 @@ func TestHandleIRCNoTopic(t *testing.T) { func TestHandleIRCNames(t *testing.T) { c := irc.NewClient("nick", "user") c.Host = "host.com" - s, _ := NewSession(nil) + s := NewState(nil, nil) i := newIRCHandler(c, s) i.dispatchMessage(&irc.Message{ @@ -281,7 +289,7 @@ func TestHandleIRCNames(t *testing.T) { func TestHandleIRCMotd(t *testing.T) { c := irc.NewClient("nick", "user") c.Host = "host.com" - s, _ := NewSession(nil) + s := NewState(nil, nil) i := newIRCHandler(c, s) i.dispatchMessage(&irc.Message{ @@ -308,7 +316,7 @@ func TestHandleIRCMotd(t *testing.T) { func TestHandleIRCBadNick(t *testing.T) { c := irc.NewClient("nick", "user") c.Host = "host.com" - s, _ := NewSession(nil) + s := NewState(nil, nil) i := newIRCHandler(c, s) i.dispatchMessage(&irc.Message{ diff --git a/server/json.go b/server/json.go index dfcf4f25..6be4ccfa 100644 --- a/server/json.go +++ b/server/json.go @@ -5,7 +5,7 @@ import ( "github.com/mailru/easyjson" - "github.com/khlieng/dispatch/irc" + "github.com/khlieng/dispatch/pkg/irc" "github.com/khlieng/dispatch/storage" ) diff --git a/server/serve_files.go b/server/serve_files.go index 4cbdfbfb..4f455289 100644 --- a/server/serve_files.go +++ b/server/serve_files.go @@ -62,7 +62,7 @@ var ( cspEnabled bool ) -func initFileServer() { +func (d *Dispatch) initFileServer() { if !viper.GetBool("dev") { data, err := assets.Asset(files[0].Asset) if err != nil { @@ -154,24 +154,24 @@ func initFileServer() { } } -func serveFiles(w http.ResponseWriter, r *http.Request) { +func (d *Dispatch) serveFiles(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" { - serveIndex(w, r) + d.serveIndex(w, r) return } for _, file := range files { if strings.HasSuffix(r.URL.Path, file.Path) { - serveFile(w, r, file) + d.serveFile(w, r, file) return } } - serveIndex(w, r) + d.serveIndex(w, r) } -func serveIndex(w http.ResponseWriter, r *http.Request) { - session := handleAuth(w, r, false) +func (d *Dispatch) serveIndex(w http.ResponseWriter, r *http.Request) { + state := d.handleAuth(w, r, false) if cspEnabled { var connectSrc string @@ -228,10 +228,10 @@ func serveIndex(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Encoding", "gzip") gzw := gzip.NewWriter(w) - IndexTemplate(gzw, getIndexData(r, session), files[1].Path, files[0].Path) + IndexTemplate(gzw, getIndexData(r, state), files[1].Path, files[0].Path) gzw.Close() } else { - IndexTemplate(w, getIndexData(r, session), files[1].Path, files[0].Path) + IndexTemplate(w, getIndexData(r, state), files[1].Path, files[0].Path) } } @@ -246,7 +246,7 @@ func setPushCookie(w http.ResponseWriter, r *http.Request) { }) } -func serveFile(w http.ResponseWriter, r *http.Request, file *File) { +func (d *Dispatch) serveFile(w http.ResponseWriter, r *http.Request, file *File) { info, err := assets.AssetInfo(file.Asset) if err != nil { http.Error(w, "", http.StatusInternalServerError) diff --git a/server/server.go b/server/server.go index eecf7ba2..bf38f0a3 100644 --- a/server/server.go +++ b/server/server.go @@ -12,36 +12,99 @@ import ( "github.com/gorilla/websocket" "github.com/spf13/viper" - "github.com/khlieng/dispatch/letsencrypt" + "github.com/khlieng/dispatch/pkg/letsencrypt" + "github.com/khlieng/dispatch/pkg/session" "github.com/khlieng/dispatch/storage" ) -var ( - sessions *sessionStore - channelStore *storage.ChannelStore +var channelStore = storage.NewChannelStore() - upgrader = websocket.Upgrader{ +type Dispatch struct { + Store storage.Store + SessionStore storage.SessionStore + + GetMessageStore func(*storage.User) (storage.MessageStore, error) + GetMessageSearchProvider func(*storage.User) (storage.MessageSearchProvider, error) + + upgrader websocket.Upgrader + states *stateStore +} + +func (d *Dispatch) Run() { + d.upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } -) - -func Run() { - sessions = newSessionStore() - channelStore = storage.NewChannelStore() if viper.GetBool("dev") { - upgrader.CheckOrigin = func(r *http.Request) bool { + d.upgrader.CheckOrigin = func(r *http.Request) bool { return true } } - reconnectIRC() - initFileServer() - startHTTP() + session.CookieName = "dispatch" + + d.states = newStateStore(d.SessionStore) + + d.loadUsers() + d.initFileServer() + d.startHTTP() } -func startHTTP() { +func (d *Dispatch) loadUsers() { + users, err := storage.LoadUsers(d.Store) + if err != nil { + log.Fatal(err) + } + + log.Printf("Loading %d user(s)", len(users)) + + for i := range users { + go d.loadUser(&users[i]) + } +} + +func (d *Dispatch) loadUser(user *storage.User) { + messageStore, err := d.GetMessageStore(user) + if err != nil { + log.Fatal(err) + } + user.SetMessageStore(messageStore) + + search, err := d.GetMessageSearchProvider(user) + if err != nil { + log.Fatal(err) + } + user.SetMessageSearchProvider(search) + + state := NewState(user, d) + d.states.set(state) + go state.run() + + channels, err := user.GetChannels() + if err != nil { + log.Fatal(err) + } + + servers, err := user.GetServers() + if err != nil { + log.Fatal(err) + } + + for _, server := range servers { + i := connectIRC(&server, state) + + var joining []string + for _, channel := range channels { + if channel.Server == server.Host { + joining = append(joining, channel.Name) + } + } + i.Join(joining...) + } +} + +func (d *Dispatch) startHTTP() { port := viper.GetString("port") if viper.GetBool("https.enabled") { @@ -55,7 +118,7 @@ func startHTTP() { server := &http.Server{ Addr: ":" + portHTTPS, - Handler: http.HandlerFunc(serve), + Handler: http.HandlerFunc(d.serve), } if certExists() { @@ -71,13 +134,13 @@ func startHTTP() { go http.ListenAndServe(":80", http.HandlerFunc(letsEncryptProxy)) } - letsEncrypt, err := letsencrypt.Run(dir, domain, email, ":"+lePort) + le, err := letsencrypt.Run(dir, domain, email, ":"+lePort) if err != nil { log.Fatal(err) } server.TLSConfig = &tls.Config{ - GetCertificate: letsEncrypt.GetCertificate, + GetCertificate: le.GetCertificate, } log.Println("[HTTPS] Listening on port", portHTTPS) @@ -92,11 +155,11 @@ func startHTTP() { port = "1337" } log.Println("[HTTP] Listening on port", port) - log.Fatal(http.ListenAndServe(":"+port, http.HandlerFunc(serve))) + log.Fatal(http.ListenAndServe(":"+port, http.HandlerFunc(d.serve))) } } -func serve(w http.ResponseWriter, r *http.Request) { +func (d *Dispatch) serve(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { fail(w, http.StatusNotFound) return @@ -108,28 +171,27 @@ func serve(w http.ResponseWriter, r *http.Request) { return } - session := handleAuth(w, r, true) - - if session == nil { - log.Println("[Auth] No session") + state := d.handleAuth(w, r, true) + if state == nil { + log.Println("[Auth] No state") fail(w, http.StatusInternalServerError) return } - upgradeWS(w, r, session) + d.upgradeWS(w, r, state) } else { - serveFiles(w, r) + d.serveFiles(w, r) } } -func upgradeWS(w http.ResponseWriter, r *http.Request, session *Session) { - conn, err := upgrader.Upgrade(w, r, w.Header()) +func (d *Dispatch) upgradeWS(w http.ResponseWriter, r *http.Request, state *State) { + conn, err := d.upgrader.Upgrade(w, r, w.Header()) if err != nil { log.Println(err) return } - newWSHandler(conn, session, r).run() + newWSHandler(conn, state, r).run() } func createHTTPSRedirect(portHTTPS string) http.HandlerFunc { diff --git a/server/session.go b/server/session.go deleted file mode 100644 index b388f15d..00000000 --- a/server/session.go +++ /dev/null @@ -1,253 +0,0 @@ -package server - -import ( - "crypto/rand" - "encoding/base64" - "sync" - "time" - - "fmt" - - "github.com/khlieng/dispatch/irc" - "github.com/khlieng/dispatch/storage" -) - -const ( - AnonymousSessionExpiration = 1 * time.Minute -) - -type Session struct { - irc map[string]*irc.Client - connectionState map[string]irc.ConnectionState - ircLock sync.Mutex - - ws map[string]*wsConn - wsLock sync.Mutex - broadcast chan WSResponse - - id string - user *storage.User - expiration *time.Timer - reset chan time.Duration -} - -func NewSession(user *storage.User) (*Session, error) { - id, err := newSessionID() - if err != nil { - return nil, err - } - return &Session{ - irc: make(map[string]*irc.Client), - connectionState: make(map[string]irc.ConnectionState), - ws: make(map[string]*wsConn), - broadcast: make(chan WSResponse, 32), - id: id, - user: user, - expiration: time.NewTimer(AnonymousSessionExpiration), - reset: make(chan time.Duration, 1), - }, nil -} - -func newSessionID() (string, error) { - key := make([]byte, 32) - _, err := rand.Read(key) - return base64.RawURLEncoding.EncodeToString(key), err -} - -func (s *Session) getIRC(server string) (*irc.Client, bool) { - s.ircLock.Lock() - i, ok := s.irc[server] - s.ircLock.Unlock() - - return i, ok -} - -func (s *Session) setIRC(server string, i *irc.Client) { - s.ircLock.Lock() - s.irc[server] = i - s.connectionState[server] = irc.ConnectionState{ - Connected: false, - } - s.ircLock.Unlock() - - s.reset <- 0 -} - -func (s *Session) deleteIRC(server string) { - s.ircLock.Lock() - delete(s.irc, server) - delete(s.connectionState, server) - s.ircLock.Unlock() - - s.resetExpirationIfEmpty() -} - -func (s *Session) numIRC() int { - s.ircLock.Lock() - n := len(s.irc) - s.ircLock.Unlock() - - return n -} - -func (s *Session) getConnectionStates() map[string]irc.ConnectionState { - s.ircLock.Lock() - state := make(map[string]irc.ConnectionState, len(s.connectionState)) - - for k, v := range s.connectionState { - state[k] = v - } - s.ircLock.Unlock() - - return state -} - -func (s *Session) setConnectionState(server string, state irc.ConnectionState) { - s.ircLock.Lock() - s.connectionState[server] = state - s.ircLock.Unlock() -} - -func (s *Session) setWS(addr string, w *wsConn) { - s.wsLock.Lock() - s.ws[addr] = w - s.wsLock.Unlock() - - s.reset <- 0 -} - -func (s *Session) deleteWS(addr string) { - s.wsLock.Lock() - delete(s.ws, addr) - s.wsLock.Unlock() - - s.resetExpirationIfEmpty() -} - -func (s *Session) numWS() int { - s.ircLock.Lock() - n := len(s.ws) - s.ircLock.Unlock() - - return n -} - -func (s *Session) sendJSON(t string, v interface{}) { - s.broadcast <- WSResponse{t, v} -} - -func (s *Session) sendError(err error, server string) { - s.sendJSON("error", Error{ - Server: server, - Message: err.Error(), - }) -} - -func (s *Session) sendLastMessages(server, channel string, count int) { - messages, hasMore, err := s.user.GetLastMessages(server, channel, count) - if err == nil && len(messages) > 0 { - res := Messages{ - Server: server, - To: channel, - Messages: messages, - } - - if hasMore { - res.Next = messages[0].ID - } - - s.sendJSON("messages", res) - } -} - -func (s *Session) sendMessages(server, channel string, count int, fromID string) { - messages, hasMore, err := s.user.GetMessages(server, channel, count, fromID) - if err == nil && len(messages) > 0 { - res := Messages{ - Server: server, - To: channel, - Messages: messages, - Prepend: true, - } - - if hasMore { - res.Next = messages[0].ID - } - - s.sendJSON("messages", res) - } -} - -func (s *Session) print(a ...interface{}) { - s.sendJSON("print", Message{ - Content: fmt.Sprintln(a...), - }) -} - -func (s *Session) printError(a ...interface{}) { - s.sendJSON("print", Message{ - Content: fmt.Sprintln(a...), - Type: "error", - }) -} - -func (s *Session) resetExpirationIfEmpty() { - if s.numIRC() == 0 && s.numWS() == 0 { - s.reset <- AnonymousSessionExpiration - } -} - -func (s *Session) run() { - for { - select { - case res := <-s.broadcast: - s.wsLock.Lock() - for _, ws := range s.ws { - ws.out <- res - } - s.wsLock.Unlock() - - case <-s.expiration.C: - sessions.delete(s.id) - s.user.Remove() - return - - case duration := <-s.reset: - if duration == 0 { - s.expiration.Stop() - } else { - s.expiration.Reset(duration) - } - } - } -} - -type sessionStore struct { - sessions map[string]*Session - lock sync.Mutex -} - -func newSessionStore() *sessionStore { - return &sessionStore{ - sessions: make(map[string]*Session), - } -} - -func (s *sessionStore) get(id string) *Session { - s.lock.Lock() - session := s.sessions[id] - s.lock.Unlock() - return session -} - -func (s *sessionStore) set(session *Session) { - s.lock.Lock() - s.sessions[session.id] = session - s.lock.Unlock() -} - -func (s *sessionStore) delete(id string) { - s.lock.Lock() - delete(s.sessions, id) - s.lock.Unlock() -} diff --git a/server/state.go b/server/state.go new file mode 100644 index 00000000..27ca0cf0 --- /dev/null +++ b/server/state.go @@ -0,0 +1,323 @@ +package server + +import ( + "log" + "sync" + "time" + + "fmt" + + "github.com/khlieng/dispatch/pkg/irc" + "github.com/khlieng/dispatch/pkg/session" + "github.com/khlieng/dispatch/storage" +) + +const ( + AnonymousUserExpiration = 1 * time.Minute +) + +type State struct { + irc map[string]*irc.Client + connectionState map[string]irc.ConnectionState + ircLock sync.Mutex + + ws map[string]*wsConn + wsLock sync.Mutex + broadcast chan WSResponse + + srv *Dispatch + user *storage.User + expiration *time.Timer + reset chan time.Duration +} + +func NewState(user *storage.User, srv *Dispatch) *State { + return &State{ + irc: make(map[string]*irc.Client), + connectionState: make(map[string]irc.ConnectionState), + ws: make(map[string]*wsConn), + broadcast: make(chan WSResponse, 32), + srv: srv, + user: user, + expiration: time.NewTimer(AnonymousUserExpiration), + reset: make(chan time.Duration, 1), + } +} + +func (s *State) getIRC(server string) (*irc.Client, bool) { + s.ircLock.Lock() + i, ok := s.irc[server] + s.ircLock.Unlock() + + return i, ok +} + +func (s *State) setIRC(server string, i *irc.Client) { + s.ircLock.Lock() + s.irc[server] = i + s.connectionState[server] = irc.ConnectionState{ + Connected: false, + } + s.ircLock.Unlock() + + s.reset <- 0 +} + +func (s *State) deleteIRC(server string) { + s.ircLock.Lock() + delete(s.irc, server) + delete(s.connectionState, server) + s.ircLock.Unlock() + + s.resetExpirationIfEmpty() +} + +func (s *State) numIRC() int { + s.ircLock.Lock() + n := len(s.irc) + s.ircLock.Unlock() + + return n +} + +func (s *State) getConnectionStates() map[string]irc.ConnectionState { + s.ircLock.Lock() + state := make(map[string]irc.ConnectionState, len(s.connectionState)) + + for k, v := range s.connectionState { + state[k] = v + } + s.ircLock.Unlock() + + return state +} + +func (s *State) setConnectionState(server string, state irc.ConnectionState) { + s.ircLock.Lock() + s.connectionState[server] = state + s.ircLock.Unlock() +} + +func (s *State) setWS(addr string, w *wsConn) { + s.wsLock.Lock() + s.ws[addr] = w + s.wsLock.Unlock() + + s.reset <- 0 +} + +func (s *State) deleteWS(addr string) { + s.wsLock.Lock() + delete(s.ws, addr) + s.wsLock.Unlock() + + s.resetExpirationIfEmpty() +} + +func (s *State) numWS() int { + s.ircLock.Lock() + n := len(s.ws) + s.ircLock.Unlock() + + return n +} + +func (s *State) sendJSON(t string, v interface{}) { + s.broadcast <- WSResponse{t, v} +} + +func (s *State) sendError(err error, server string) { + s.sendJSON("error", Error{ + Server: server, + Message: err.Error(), + }) +} + +func (s *State) sendLastMessages(server, channel string, count int) { + messages, hasMore, err := s.user.GetLastMessages(server, channel, count) + if err == nil && len(messages) > 0 { + res := Messages{ + Server: server, + To: channel, + Messages: messages, + } + + if hasMore { + res.Next = messages[0].ID + } + + s.sendJSON("messages", res) + } +} + +func (s *State) sendMessages(server, channel string, count int, fromID string) { + messages, hasMore, err := s.user.GetMessages(server, channel, count, fromID) + if err == nil && len(messages) > 0 { + res := Messages{ + Server: server, + To: channel, + Messages: messages, + Prepend: true, + } + + if hasMore { + res.Next = messages[0].ID + } + + s.sendJSON("messages", res) + } +} + +func (s *State) print(a ...interface{}) { + s.sendJSON("print", Message{ + Content: fmt.Sprintln(a...), + }) +} + +func (s *State) printError(a ...interface{}) { + s.sendJSON("print", Message{ + Content: fmt.Sprintln(a...), + Type: "error", + }) +} + +func (s *State) resetExpirationIfEmpty() { + if s.numIRC() == 0 && s.numWS() == 0 { + s.reset <- AnonymousUserExpiration + } +} + +func (s *State) kill() { + s.wsLock.Lock() + for _, ws := range s.ws { + ws.conn.Close() + } + s.wsLock.Unlock() + s.ircLock.Lock() + for _, i := range s.irc { + i.Quit() + } + s.ircLock.Unlock() +} + +func (s *State) run() { + for { + select { + case res := <-s.broadcast: + s.wsLock.Lock() + for _, ws := range s.ws { + ws.out <- res + } + s.wsLock.Unlock() + + case <-s.expiration.C: + s.srv.states.delete(s.user.ID) + s.user.Remove() + return + + case duration := <-s.reset: + if duration == 0 { + s.expiration.Stop() + } else { + s.expiration.Reset(duration) + } + } + } +} + +type stateStore struct { + states map[uint64]*State + sessions map[string]*session.Session + sessionStore storage.SessionStore + lock sync.Mutex +} + +func newStateStore(sessionStore storage.SessionStore) *stateStore { + store := &stateStore{ + states: make(map[uint64]*State), + sessions: make(map[string]*session.Session), + sessionStore: sessionStore, + } + + sessions, err := sessionStore.GetSessions() + if err != nil { + log.Fatal(err) + } + + for _, session := range sessions { + if !session.Expired() { + session.Init() + store.sessions[session.Key()] = &session + go deleteSessionWhenExpired(&session, store) + } else { + go sessionStore.DeleteSession(session.Key()) + } + } + + return store +} + +func (s *stateStore) get(id uint64) *State { + s.lock.Lock() + state := s.states[id] + s.lock.Unlock() + return state +} + +func (s *stateStore) set(state *State) { + s.lock.Lock() + s.states[state.user.ID] = state + s.lock.Unlock() +} + +func (s *stateStore) delete(id uint64) { + s.lock.Lock() + delete(s.states, id) + for key, session := range s.sessions { + if session.UserID == id { + delete(s.sessions, key) + go s.sessionStore.DeleteSession(key) + } + } + s.lock.Unlock() +} + +func (s *stateStore) getSession(key string) *session.Session { + s.lock.Lock() + session := s.sessions[key] + s.lock.Unlock() + return session +} + +func (s *stateStore) setSession(session *session.Session) { + s.lock.Lock() + s.sessions[session.Key()] = session + s.lock.Unlock() + s.sessionStore.SaveSession(session) +} + +func (s *stateStore) deleteSession(key string) { + s.lock.Lock() + id := s.sessions[key].UserID + delete(s.sessions, key) + n := 0 + for _, session := range s.sessions { + if session.UserID == id { + n++ + } + } + state := s.states[id] + if n == 0 { + delete(s.states, id) + } + s.lock.Unlock() + + if n == 0 { + // This anonymous user is not reachable anymore since all sessions have + // expired, so we clean it up + state.kill() + state.user.Remove() + } + + s.sessionStore.DeleteSession(key) +} diff --git a/server/websocket_handler.go b/server/websocket_handler.go index 1ee1081e..7a7fda8b 100644 --- a/server/websocket_handler.go +++ b/server/websocket_handler.go @@ -11,16 +11,16 @@ import ( type wsHandler struct { ws *wsConn - session *Session + state *State addr string handlers map[string]func([]byte) } -func newWSHandler(conn *websocket.Conn, session *Session, r *http.Request) *wsHandler { +func newWSHandler(conn *websocket.Conn, state *State, r *http.Request) *wsHandler { h := &wsHandler{ - ws: newWSConn(conn), - session: session, - addr: conn.RemoteAddr().String(), + ws: newWSConn(conn), + state: state, + addr: conn.RemoteAddr().String(), } h.init(r) h.initHandlers() @@ -35,8 +35,8 @@ func (h *wsHandler) run() { for { req, ok := <-h.ws.in if !ok { - if h.session != nil { - h.session.deleteWS(h.addr) + if h.state != nil { + h.state.deleteWS(h.addr) } return } @@ -52,13 +52,16 @@ func (h *wsHandler) dispatchRequest(req WSRequest) { } func (h *wsHandler) init(r *http.Request) { - h.session.setWS(h.addr, h.ws) + h.state.setWS(h.addr, h.ws) - log.Println(h.addr, "[Session] User ID:", h.session.user.ID, "|", - h.session.numIRC(), "IRC connections |", - h.session.numWS(), "WebSocket connections") + log.Println(h.addr, "[State] User ID:", h.state.user.ID, "|", + h.state.numIRC(), "IRC connections |", + h.state.numWS(), "WebSocket connections") - channels := h.session.user.GetChannels() + channels, err := h.state.user.GetChannels() + if err != nil { + log.Println(err) + } path := r.URL.EscapedPath()[3:] pathServer, pathChannel := getTabFromPath(path) cookieServer, cookieChannel := parseTabCookie(r, path) @@ -66,16 +69,17 @@ func (h *wsHandler) init(r *http.Request) { for _, channel := range channels { if (channel.Server == pathServer && channel.Name == pathChannel) || (channel.Server == cookieServer && channel.Name == cookieChannel) { + // Userlist and messages for this channel gets embedded in the index page continue } - h.session.sendJSON("users", Userlist{ + h.state.sendJSON("users", Userlist{ Server: channel.Server, Channel: channel.Name, Users: channelStore.GetUsers(channel.Server, channel.Name), }) - h.session.sendLastMessages(channel.Server, channel.Name, 50) + h.state.sendLastMessages(channel.Server, channel.Name, 50) } } @@ -83,12 +87,12 @@ func (h *wsHandler) connect(b []byte) { var data Server data.UnmarshalJSON(b) - if _, ok := h.session.getIRC(data.Host); !ok { + if _, ok := h.state.getIRC(data.Host); !ok { log.Println(h.addr, "[IRC] Add server", data.Host) - connectIRC(data.Server, h.session) + connectIRC(&data.Server, h.state) - go h.session.user.AddServer(data.Server) + go h.state.user.AddServer(&data.Server) } else { log.Println(h.addr, "[IRC]", data.Host, "already added") } @@ -98,7 +102,7 @@ func (h *wsHandler) reconnect(b []byte) { var data ReconnectSettings data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok && !i.Connected() { + if i, ok := h.state.getIRC(data.Server); ok && !i.Connected() { if i.TLS { i.TLSConfig.InsecureSkipVerify = data.SkipVerify } @@ -110,7 +114,7 @@ func (h *wsHandler) join(b []byte) { var data Join data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Join(data.Channels...) } } @@ -119,7 +123,7 @@ func (h *wsHandler) part(b []byte) { var data Part data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Part(data.Channels...) } } @@ -129,22 +133,22 @@ func (h *wsHandler) quit(b []byte) { data.UnmarshalJSON(b) log.Println(h.addr, "[IRC] Remove server", data.Server) - if i, ok := h.session.getIRC(data.Server); ok { - h.session.deleteIRC(data.Server) + if i, ok := h.state.getIRC(data.Server); ok { + h.state.deleteIRC(data.Server) i.Quit() } - go h.session.user.RemoveServer(data.Server) + go h.state.user.RemoveServer(data.Server) } func (h *wsHandler) message(b []byte) { var data Message data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Privmsg(data.To, data.Content) - go h.session.user.LogMessage(betterguid.New(), + go h.state.user.LogMessage(betterguid.New(), data.Server, i.GetNick(), data.To, data.Content) } } @@ -153,7 +157,7 @@ func (h *wsHandler) nick(b []byte) { var data Nick data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Nick(data.New) } } @@ -162,7 +166,7 @@ func (h *wsHandler) topic(b []byte) { var data Topic data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Topic(data.Channel, data.Topic) } } @@ -171,7 +175,7 @@ func (h *wsHandler) invite(b []byte) { var data Invite data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Invite(data.User, data.Channel) } } @@ -180,7 +184,7 @@ func (h *wsHandler) kick(b []byte) { var data Invite data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Kick(data.Channel, data.User) } } @@ -189,7 +193,7 @@ func (h *wsHandler) whois(b []byte) { var data Whois data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Whois(data.User) } } @@ -198,7 +202,7 @@ func (h *wsHandler) away(b []byte) { var data Away data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Away(data.Message) } } @@ -207,7 +211,7 @@ func (h *wsHandler) raw(b []byte) { var data Raw data.UnmarshalJSON(b) - if i, ok := h.session.getIRC(data.Server); ok { + if i, ok := h.state.getIRC(data.Server); ok { i.Write(data.Message) } } @@ -217,13 +221,13 @@ func (h *wsHandler) search(b []byte) { var data SearchRequest data.UnmarshalJSON(b) - results, err := h.session.user.SearchMessages(data.Server, data.Channel, data.Phrase) + results, err := h.state.user.SearchMessages(data.Server, data.Channel, data.Phrase) if err != nil { log.Println(err) return } - h.session.sendJSON("search", SearchResult{ + h.state.sendJSON("search", SearchResult{ Server: data.Server, Channel: data.Channel, Results: results, @@ -235,20 +239,20 @@ func (h *wsHandler) cert(b []byte) { var data ClientCert data.UnmarshalJSON(b) - err := h.session.user.SetCertificate(data.Cert, data.Key) + err := h.state.user.SetCertificate(data.Cert, data.Key) if err != nil { - h.session.sendJSON("cert_fail", Error{Message: err.Error()}) + h.state.sendJSON("cert_fail", Error{Message: err.Error()}) return } - h.session.sendJSON("cert_success", nil) + h.state.sendJSON("cert_success", nil) } func (h *wsHandler) fetchMessages(b []byte) { var data FetchMessages data.UnmarshalJSON(b) - h.session.sendMessages(data.Server, data.Channel, 200, data.Next) + h.state.sendMessages(data.Server, data.Channel, 200, data.Next) } func (h *wsHandler) setServerName(b []byte) { @@ -256,7 +260,7 @@ func (h *wsHandler) setServerName(b []byte) { data.UnmarshalJSON(b) if isValidServerName(data.Name) { - h.session.user.SetServerName(data.Name, data.Server) + h.state.user.SetServerName(data.Name, data.Server) } } diff --git a/storage/bleve/bleve.go b/storage/bleve/bleve.go new file mode 100644 index 00000000..af325a59 --- /dev/null +++ b/storage/bleve/bleve.go @@ -0,0 +1,79 @@ +package bleve + +import ( + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/keyword" + + "github.com/khlieng/dispatch/storage" +) + +// Bleve implements storage.MessageSearchProvider +type Bleve struct { + index bleve.Index +} + +func New(path string) (*Bleve, error) { + index, err := bleve.Open(path) + if err == bleve.ErrorIndexPathDoesNotExist { + keywordMapping := bleve.NewTextFieldMapping() + keywordMapping.Analyzer = keyword.Name + keywordMapping.Store = false + keywordMapping.IncludeTermVectors = false + keywordMapping.IncludeInAll = false + + contentMapping := bleve.NewTextFieldMapping() + contentMapping.Analyzer = "en" + contentMapping.Store = false + contentMapping.IncludeTermVectors = false + contentMapping.IncludeInAll = false + + messageMapping := bleve.NewDocumentMapping() + messageMapping.StructTagKey = "bleve" + messageMapping.AddFieldMappingsAt("server", keywordMapping) + messageMapping.AddFieldMappingsAt("to", keywordMapping) + messageMapping.AddFieldMappingsAt("content", contentMapping) + + mapping := bleve.NewIndexMapping() + mapping.AddDocumentMapping("message", messageMapping) + + index, err = bleve.New(path, mapping) + } + if err != nil { + return nil, err + } + return &Bleve{index: index}, nil +} + +func (b *Bleve) Index(id string, message *storage.Message) error { + return b.index.Index(id, message) +} + +func (b *Bleve) SearchMessages(server, channel, q string) ([]string, error) { + serverQuery := bleve.NewMatchQuery(server) + serverQuery.SetField("server") + channelQuery := bleve.NewMatchQuery(channel) + channelQuery.SetField("to") + contentQuery := bleve.NewMatchQuery(q) + contentQuery.SetField("content") + contentQuery.SetFuzziness(2) + + query := bleve.NewBooleanQuery() + query.AddMust(serverQuery, channelQuery, contentQuery) + + search := bleve.NewSearchRequest(query) + searchResults, err := b.index.Search(search) + if err != nil { + return nil, err + } + + ids := make([]string, len(searchResults.Hits)) + for i, hit := range searchResults.Hits { + ids[i] = hit.ID + } + + return ids, nil +} + +func (b *Bleve) Close() { + b.index.Close() +} diff --git a/storage/boltdb/boltdb.go b/storage/boltdb/boltdb.go new file mode 100644 index 00000000..30231b05 --- /dev/null +++ b/storage/boltdb/boltdb.go @@ -0,0 +1,364 @@ +package boltdb + +import ( + "bytes" + "encoding/binary" + "strconv" + + "github.com/boltdb/bolt" + + "github.com/khlieng/dispatch/pkg/session" + "github.com/khlieng/dispatch/storage" +) + +var ( + bucketUsers = []byte("Users") + bucketServers = []byte("Servers") + bucketChannels = []byte("Channels") + bucketMessages = []byte("Messages") + bucketSessions = []byte("Sessions") +) + +// BoltStore implements storage.Store, storage.MessageStore and storage.SessionStore +type BoltStore struct { + db *bolt.DB +} + +func New(path string) (*BoltStore, error) { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + return nil, err + } + + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucketIfNotExists(bucketUsers) + tx.CreateBucketIfNotExists(bucketServers) + tx.CreateBucketIfNotExists(bucketChannels) + tx.CreateBucketIfNotExists(bucketMessages) + tx.CreateBucketIfNotExists(bucketSessions) + return nil + }) + + return &BoltStore{ + db, + }, nil +} + +func (s *BoltStore) Close() { + s.db.Close() +} + +func (s *BoltStore) GetUsers() ([]storage.User, error) { + var users []storage.User + + s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketUsers) + + return b.ForEach(func(k, _ []byte) error { + id := idFromBytes(k) + user := storage.User{ + ID: id, + IDBytes: make([]byte, 8), + Username: strconv.FormatUint(id, 10), + } + copy(user.IDBytes, k) + + users = append(users, user) + + return nil + }) + }) + + return users, nil +} + +func (s *BoltStore) SaveUser(user *storage.User) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketUsers) + + user.ID, _ = b.NextSequence() + user.Username = strconv.FormatUint(user.ID, 10) + + data, err := user.Marshal(nil) + if err != nil { + return err + } + + user.IDBytes = idToBytes(user.ID) + return b.Put(user.IDBytes, data) + }) +} + +func (s *BoltStore) DeleteUser(user *storage.User) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketServers) + c := b.Cursor() + + for k, _ := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, _ = c.Next() { + b.Delete(k) + } + + b = tx.Bucket(bucketChannels) + c = b.Cursor() + + for k, _ := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, _ = c.Next() { + b.Delete(k) + } + + return tx.Bucket(bucketUsers).Delete(user.IDBytes) + }) +} + +func (s *BoltStore) GetServers(user *storage.User) ([]storage.Server, error) { + var servers []storage.Server + + s.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket(bucketServers).Cursor() + + for k, v := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, v = c.Next() { + server := storage.Server{} + server.Unmarshal(v) + servers = append(servers, server) + } + + return nil + }) + + return servers, nil +} + +func (s *BoltStore) AddServer(user *storage.User, server *storage.Server) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketServers) + data, _ := server.Marshal(nil) + + return b.Put(serverID(user, server.Host), data) + }) +} + +func (s *BoltStore) RemoveServer(user *storage.User, address string) error { + return s.db.Batch(func(tx *bolt.Tx) error { + serverID := serverID(user, address) + tx.Bucket(bucketServers).Delete(serverID) + + b := tx.Bucket(bucketChannels) + c := b.Cursor() + + for k, _ := c.Seek(serverID); bytes.HasPrefix(k, serverID); k, _ = c.Next() { + b.Delete(k) + } + + return nil + }) +} + +func (s *BoltStore) SetNick(user *storage.User, nick, address string) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketServers) + id := serverID(user, address) + + server := storage.Server{} + v := b.Get(id) + if v != nil { + server.Unmarshal(v) + server.Nick = nick + + data, _ := server.Marshal(nil) + return b.Put(id, data) + } + + return nil + }) +} + +func (s *BoltStore) SetServerName(user *storage.User, name, address string) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketServers) + id := serverID(user, address) + + server := storage.Server{} + v := b.Get(id) + if v != nil { + server.Unmarshal(v) + server.Name = name + + data, _ := server.Marshal(nil) + return b.Put(id, data) + } + + return nil + }) +} + +func (s *BoltStore) GetChannels(user *storage.User) ([]storage.Channel, error) { + var channels []storage.Channel + + s.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket(bucketChannels).Cursor() + + for k, v := c.Seek(user.IDBytes); bytes.HasPrefix(k, user.IDBytes); k, v = c.Next() { + channel := storage.Channel{} + channel.Unmarshal(v) + channels = append(channels, channel) + } + + return nil + }) + + return channels, nil +} + +func (s *BoltStore) AddChannel(user *storage.User, channel *storage.Channel) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketChannels) + data, _ := channel.Marshal(nil) + + return b.Put(channelID(user, channel.Server, channel.Name), data) + }) +} + +func (s *BoltStore) RemoveChannel(user *storage.User, server, channel string) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketChannels) + id := channelID(user, server, channel) + + return b.Delete(id) + }) +} + +func (s *BoltStore) LogMessage(message *storage.Message) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b, err := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(message.Server + ":" + message.To)) + if err != nil { + return err + } + + data, err := message.Marshal(nil) + if err != nil { + return err + } + + return b.Put([]byte(message.ID), data) + }) +} + +func (s *BoltStore) GetMessages(server, channel string, count int, fromID string) ([]storage.Message, bool, error) { + messages := make([]storage.Message, count) + hasMore := false + + s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel)) + if b == nil { + return nil + } + + c := b.Cursor() + + if fromID != "" { + c.Seek([]byte(fromID)) + + for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() { + count-- + messages[count].Unmarshal(v) + } + } else { + for k, v := c.Last(); count > 0 && k != nil; k, v = c.Prev() { + count-- + messages[count].Unmarshal(v) + } + } + + c.Next() + k, _ := c.Prev() + hasMore = k != nil + + return nil + }) + + if count == 0 { + return messages, hasMore, nil + } else if count < len(messages) { + return messages[count:], hasMore, nil + } + + return nil, false, nil +} + +func (s *BoltStore) GetMessagesByID(server, channel string, ids []string) ([]storage.Message, error) { + messages := make([]storage.Message, len(ids)) + + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel)) + + for i, id := range ids { + messages[i].Unmarshal(b.Get([]byte(id))) + } + return nil + }) + return messages, err +} + +func (s *BoltStore) GetSessions() ([]session.Session, error) { + var sessions []session.Session + + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketSessions) + + return b.ForEach(func(_ []byte, v []byte) error { + session := session.Session{} + _, err := session.Unmarshal(v) + sessions = append(sessions, session) + return err + }) + }) + + if err != nil { + return nil, err + } + + return sessions, nil +} + +func (s *BoltStore) SaveSession(session *session.Session) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketSessions) + + data, err := session.Marshal(nil) + if err != nil { + return err + } + + return b.Put([]byte(session.Key()), data) + }) +} + +func (s *BoltStore) DeleteSession(key string) error { + return s.db.Batch(func(tx *bolt.Tx) error { + return tx.Bucket(bucketSessions).Delete([]byte(key)) + }) +} + +func serverID(user *storage.User, address string) []byte { + id := make([]byte, 8+len(address)) + copy(id, user.IDBytes) + copy(id[8:], address) + return id +} + +func channelID(user *storage.User, server, channel string) []byte { + id := make([]byte, 8+len(server)+1+len(channel)) + copy(id, user.IDBytes) + copy(id[8:], server) + copy(id[8+len(server)+1:], channel) + return id +} + +func idToBytes(i uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b +} + +func idFromBytes(b []byte) uint64 { + return binary.BigEndian.Uint64(b) +} diff --git a/storage/storage.go b/storage/storage.go index b45a27ae..dbbc03a3 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,53 +1,46 @@ package storage import ( - "encoding/binary" - "log" - - "github.com/boltdb/bolt" + "github.com/khlieng/dispatch/pkg/session" ) -var ( - Path directory - - db *bolt.DB - - bucketUsers = []byte("Users") - bucketServers = []byte("Servers") - bucketChannels = []byte("Channels") - bucketMessages = []byte("Messages") -) +var Path directory func Initialize(dir string) { Path = directory(dir) } -func Open() { - var err error - db, err = bolt.Open(Path.Database(), 0600, nil) - if err != nil { - log.Fatal("Could not open database:", err) - } +type Store interface { + GetUsers() ([]User, error) + SaveUser(*User) error + DeleteUser(*User) error - db.Update(func(tx *bolt.Tx) error { - tx.CreateBucketIfNotExists(bucketUsers) - tx.CreateBucketIfNotExists(bucketServers) - tx.CreateBucketIfNotExists(bucketChannels) + GetServers(*User) ([]Server, error) + AddServer(*User, *Server) error + RemoveServer(*User, string) error + SetNick(*User, string, string) error + SetServerName(*User, string, string) error - return nil - }) + GetChannels(*User) ([]Channel, error) + AddChannel(*User, *Channel) error + RemoveChannel(*User, string, string) error } -func Close() { - db.Close() +type SessionStore interface { + GetSessions() ([]session.Session, error) + SaveSession(session *session.Session) error + DeleteSession(key string) error } -func idToBytes(i uint64) []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, i) - return b +type MessageStore interface { + LogMessage(message *Message) error + GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error) + GetMessagesByID(server, channel string, ids []string) ([]Message, error) + Close() } -func idFromBytes(b []byte) uint64 { - return binary.BigEndian.Uint64(b) +type MessageSearchProvider interface { + SearchMessages(server, channel, q string) ([]string, error) + Index(id string, message *Message) error + Close() } diff --git a/storage/user.go b/storage/user.go index dce63916..8bb95410 100644 --- a/storage/user.go +++ b/storage/user.go @@ -1,67 +1,33 @@ package storage import ( - "bytes" "crypto/tls" "os" - "strconv" "sync" - - "github.com/blevesearch/bleve" - "github.com/boltdb/bolt" + "time" ) type User struct { ID uint64 + IDBytes []byte Username string - id []byte - messageLog *bolt.DB - messageIndex bleve.Index + store Store + messageLog MessageStore + messageIndex MessageSearchProvider certificate *tls.Certificate lock sync.Mutex } -type Server struct { - Name string `json:"name"` - Host string `json:"host"` - Port string `json:"port"` - TLS bool `json:"tls,omitempty"` - Password string `json:"password,omitempty"` - Nick string `json:"nick"` - Username string `json:"username,omitempty"` - Realname string `json:"realname,omitempty"` -} - -type Channel struct { - Server string `json:"server"` - Name string `json:"name"` - Topic string `json:"topic,omitempty"` -} - -func NewUser() (*User, error) { - user := &User{} - - err := db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketUsers) - - user.ID, _ = b.NextSequence() - user.Username = strconv.FormatUint(user.ID, 10) - - data, err := user.Marshal(nil) - if err != nil { - return err - } - - user.id = idToBytes(user.ID) - return b.Put(user.id, data) - }) +func NewUser(store Store) (*User, error) { + user := &User{store: store} + err := store.SaveUser(user) if err != nil { return nil, err } - err = user.openMessageLog() + err = os.MkdirAll(Path.User(user.Username), 0700) if err != nil { return nil, err } @@ -69,179 +35,131 @@ func NewUser() (*User, error) { return user, nil } -func LoadUsers() []*User { - var users []*User - - db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketUsers) - - b.ForEach(func(k, _ []byte) error { - id := idFromBytes(k) - user := &User{ - ID: id, - Username: strconv.FormatUint(id, 10), - id: make([]byte, 8), - } - copy(user.id, k) - - users = append(users, user) - - return nil - }) - - return nil - }) - - for _, user := range users { - user.openMessageLog() - user.loadCertificate() +func LoadUsers(store Store) ([]User, error) { + users, err := store.GetUsers() + if err != nil { + return nil, err } - return users + for i := range users { + users[i].store = store + users[i].loadCertificate() + } + + return users, nil } -func (u *User) GetServers() []Server { - var servers []Server - - db.View(func(tx *bolt.Tx) error { - c := tx.Bucket(bucketServers).Cursor() - - for k, v := c.Seek(u.id); bytes.HasPrefix(k, u.id); k, v = c.Next() { - server := Server{} - server.Unmarshal(v) - servers = append(servers, server) - } - - return nil - }) - - return servers +func (u *User) SetMessageStore(store MessageStore) { + u.messageLog = store } -func (u *User) GetChannels() []Channel { - var channels []Channel - - db.View(func(tx *bolt.Tx) error { - c := tx.Bucket(bucketChannels).Cursor() - - for k, v := c.Seek(u.id); bytes.HasPrefix(k, u.id); k, v = c.Next() { - channel := Channel{} - channel.Unmarshal(v) - channels = append(channels, channel) - } - - return nil - }) - - return channels -} - -func (u *User) AddServer(server Server) { - db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketServers) - data, _ := server.Marshal(nil) - - b.Put(u.serverID(server.Host), data) - - return nil - }) -} - -func (u *User) AddChannel(channel Channel) { - db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketChannels) - data, _ := channel.Marshal(nil) - - b.Put(u.channelID(channel.Server, channel.Name), data) - - return nil - }) -} - -func (u *User) SetNick(nick, address string) { - db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketServers) - id := u.serverID(address) - - server := Server{} - v := b.Get(id) - if v != nil { - server.Unmarshal(v) - server.Nick = nick - - data, _ := server.Marshal(nil) - b.Put(id, data) - } - - return nil - }) -} - -func (u *User) SetServerName(name, address string) { - db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketServers) - id := u.serverID(address) - - server := Server{} - v := b.Get(id) - if v != nil { - server.Unmarshal(v) - server.Name = name - - data, _ := server.Marshal(nil) - b.Put(id, data) - } - - return nil - }) -} - -func (u *User) RemoveServer(address string) { - db.Batch(func(tx *bolt.Tx) error { - serverID := u.serverID(address) - tx.Bucket(bucketServers).Delete(serverID) - - b := tx.Bucket(bucketChannels) - c := b.Cursor() - - for k, _ := c.Seek(serverID); bytes.HasPrefix(k, serverID); k, _ = c.Next() { - b.Delete(k) - } - - return nil - }) -} - -func (u *User) RemoveChannel(server, channel string) { - db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketChannels) - id := u.channelID(server, channel) - - b.Delete(id) - - return nil - }) +func (u *User) SetMessageSearchProvider(search MessageSearchProvider) { + u.messageIndex = search } func (u *User) Remove() { - db.Batch(func(tx *bolt.Tx) error { - return tx.Bucket(bucketUsers).Delete(u.id) - }) - u.closeMessageLog() + u.store.DeleteUser(u) + if u.messageLog != nil { + u.messageLog.Close() + } + if u.messageIndex != nil { + u.messageIndex.Close() + } os.RemoveAll(Path.User(u.Username)) } -func (u *User) serverID(address string) []byte { - id := make([]byte, 8+len(address)) - copy(id, u.id) - copy(id[8:], address) - return id +type Server struct { + Name string + Host string + Port string + TLS bool + Password string + Nick string + Username string + Realname string } -func (u *User) channelID(server, channel string) []byte { - id := make([]byte, 8+len(server)+1+len(channel)) - copy(id, u.id) - copy(id[8:], server) - copy(id[8+len(server)+1:], channel) - return id +func (u *User) GetServers() ([]Server, error) { + return u.store.GetServers(u) +} + +func (u *User) AddServer(server *Server) error { + return u.store.AddServer(u, server) +} + +func (u *User) RemoveServer(address string) error { + return u.store.RemoveServer(u, address) +} + +func (u *User) SetNick(nick, address string) error { + return u.store.SetNick(u, nick, address) +} + +func (u *User) SetServerName(name, address string) error { + return u.store.SetServerName(u, name, address) +} + +type Channel struct { + Server string + Name string + Topic string +} + +func (u *User) GetChannels() ([]Channel, error) { + return u.store.GetChannels(u) +} + +func (u *User) AddChannel(channel *Channel) error { + return u.store.AddChannel(u, channel) +} + +func (u *User) RemoveChannel(server, channel string) error { + return u.store.RemoveChannel(u, server, channel) +} + +type Message struct { + ID string `json:"-" bleve:"-"` + Server string `json:"-" bleve:"server"` + From string `bleve:"-"` + To string `json:"-" bleve:"to"` + Content string `bleve:"content"` + Time int64 `bleve:"-"` +} + +func (m Message) Type() string { + return "message" +} + +func (u *User) LogMessage(id, server, from, to, content string) error { + message := &Message{ + ID: id, + Server: server, + From: from, + To: to, + Content: content, + Time: time.Now().Unix(), + } + + err := u.messageLog.LogMessage(message) + if err != nil { + return err + } + return u.messageIndex.Index(id, message) +} + +func (u *User) GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error) { + return u.messageLog.GetMessages(server, channel, count, fromID) +} + +func (u *User) GetLastMessages(server, channel string, count int) ([]Message, bool, error) { + return u.GetMessages(server, channel, count, "") +} + +func (u *User) SearchMessages(server, channel, q string) ([]Message, error) { + ids, err := u.messageIndex.SearchMessages(server, channel, q) + if err != nil { + return nil, err + } + + return u.messageLog.GetMessagesByID(server, channel, ids) } diff --git a/storage/user_messages.go b/storage/user_messages.go deleted file mode 100644 index f13f3dc4..00000000 --- a/storage/user_messages.go +++ /dev/null @@ -1,191 +0,0 @@ -package storage - -import ( - "os" - "time" - - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/analyzer/keyword" - "github.com/boltdb/bolt" -) - -type Message struct { - ID string `json:"-" bleve:"-"` - Server string `json:"-" bleve:"server"` - From string `json:"from" bleve:"-"` - To string `json:"-" bleve:"to"` - Content string `json:"content" bleve:"content"` - Time int64 `json:"time" bleve:"-"` -} - -func (m Message) Type() string { - return "message" -} - -func (u *User) LogMessage(id, server, from, to, content string) error { - message := Message{ - ID: id, - Server: server, - From: from, - To: to, - Content: content, - Time: time.Now().Unix(), - } - - err := u.messageLog.Batch(func(tx *bolt.Tx) error { - b, err := tx.Bucket(bucketMessages).CreateBucketIfNotExists([]byte(server + ":" + to)) - if err != nil { - return err - } - - data, err := message.Marshal(nil) - if err != nil { - return err - } - - return b.Put([]byte(id), data) - }) - - if err != nil { - return err - } - - return u.messageIndex.Index(id, message) -} - -func (u *User) GetLastMessages(server, channel string, count int) ([]Message, bool, error) { - return u.GetMessages(server, channel, count, "") -} - -func (u *User) GetMessages(server, channel string, count int, fromID string) ([]Message, bool, error) { - messages := make([]Message, count) - hasMore := false - - u.messageLog.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel)) - if b == nil { - return nil - } - - c := b.Cursor() - - if fromID != "" { - c.Seek([]byte(fromID)) - - for k, v := c.Prev(); count > 0 && k != nil; k, v = c.Prev() { - count-- - messages[count].Unmarshal(v) - } - } else { - for k, v := c.Last(); count > 0 && k != nil; k, v = c.Prev() { - count-- - messages[count].Unmarshal(v) - } - } - - c.Next() - k, _ := c.Prev() - hasMore = k != nil - - return nil - }) - - if count == 0 { - return messages, hasMore, nil - } else if count < len(messages) { - return messages[count:], hasMore, nil - } - - return nil, false, nil -} - -func (u *User) SearchMessages(server, channel, q string) ([]Message, error) { - serverQuery := bleve.NewMatchQuery(server) - serverQuery.SetField("server") - channelQuery := bleve.NewMatchQuery(channel) - channelQuery.SetField("to") - contentQuery := bleve.NewMatchQuery(q) - contentQuery.SetField("content") - contentQuery.SetFuzziness(2) - - query := bleve.NewBooleanQuery() - query.AddMust(serverQuery, channelQuery, contentQuery) - - search := bleve.NewSearchRequest(query) - searchResults, err := u.messageIndex.Search(search) - if err != nil { - return nil, err - } - - messages := []Message{} - u.messageLog.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketMessages).Bucket([]byte(server + ":" + channel)) - - for _, hit := range searchResults.Hits { - message := Message{} - message.Unmarshal(b.Get([]byte(hit.ID))) - messages = append(messages, message) - } - - return nil - }) - - return messages, nil -} - -func (u *User) openMessageLog() error { - err := os.MkdirAll(Path.User(u.Username), 0700) - if err != nil { - return err - } - - u.messageLog, err = bolt.Open(Path.Log(u.Username), 0600, nil) - if err != nil { - return err - } - - u.messageLog.Update(func(tx *bolt.Tx) error { - tx.CreateBucketIfNotExists(bucketMessages) - - return nil - }) - - indexPath := Path.Index(u.Username) - u.messageIndex, err = bleve.Open(indexPath) - if err == bleve.ErrorIndexPathDoesNotExist { - keywordMapping := bleve.NewTextFieldMapping() - keywordMapping.Analyzer = keyword.Name - keywordMapping.Store = false - keywordMapping.IncludeTermVectors = false - keywordMapping.IncludeInAll = false - - contentMapping := bleve.NewTextFieldMapping() - contentMapping.Analyzer = "en" - contentMapping.Store = false - contentMapping.IncludeTermVectors = false - contentMapping.IncludeInAll = false - - messageMapping := bleve.NewDocumentMapping() - messageMapping.StructTagKey = "bleve" - messageMapping.AddFieldMappingsAt("server", keywordMapping) - messageMapping.AddFieldMappingsAt("to", keywordMapping) - messageMapping.AddFieldMappingsAt("content", contentMapping) - - mapping := bleve.NewIndexMapping() - mapping.AddDocumentMapping("message", messageMapping) - - u.messageIndex, err = bleve.New(indexPath, mapping) - if err != nil { - return err - } - } else if err != nil { - return err - } - - return nil -} - -func (u *User) closeMessageLog() { - u.messageLog.Close() - u.messageIndex.Close() -} diff --git a/storage/user_test.go b/storage/user_test.go index f5f71fb2..c3af91d8 100644 --- a/storage/user_test.go +++ b/storage/user_test.go @@ -1,4 +1,4 @@ -package storage +package storage_test import ( "io/ioutil" @@ -6,6 +6,9 @@ import ( "strconv" "testing" + "github.com/khlieng/dispatch/storage" + "github.com/khlieng/dispatch/storage/bleve" + "github.com/khlieng/dispatch/storage/boltdb" "github.com/kjk/betterguid" "github.com/stretchr/testify/assert" ) @@ -16,81 +19,96 @@ func tempdir() string { } func TestUser(t *testing.T) { - defer func() { - r := recover() - assert.Nil(t, r) - }() + storage.Initialize(tempdir()) - Initialize(tempdir()) - Open() + db, err := boltdb.New(storage.Path.Database()) + assert.Nil(t, err) - srv := Server{ + user, err := storage.NewUser(db) + assert.Nil(t, err) + + srv := storage.Server{ Name: "Freenode", Host: "irc.freenode.net", Nick: "test", } - chan1 := Channel{ + chan1 := storage.Channel{ Server: srv.Host, Name: "#test", } - chan2 := Channel{ + chan2 := storage.Channel{ Server: srv.Host, Name: "#testing", } - user, err := NewUser() - assert.Nil(t, err) - user.AddServer(srv) - user.AddChannel(chan1) - user.AddChannel(chan2) - user.closeMessageLog() + user.AddServer(&srv) + user.AddChannel(&chan1) + user.AddChannel(&chan2) - users := LoadUsers() + users, err := storage.LoadUsers(db) + assert.Nil(t, err) assert.Len(t, users, 1) - user = users[0] + user = &users[0] assert.Equal(t, uint64(1), user.ID) - servers := user.GetServers() + servers, err := user.GetServers() assert.Len(t, servers, 1) assert.Equal(t, srv, servers[0]) - channels := user.GetChannels() + channels, err := user.GetChannels() assert.Len(t, channels, 2) assert.Equal(t, chan1, channels[0]) assert.Equal(t, chan2, channels[1]) user.SetNick("bob", srv.Host) - assert.Equal(t, "bob", user.GetServers()[0].Nick) + servers, err = user.GetServers() + assert.Equal(t, "bob", servers[0].Nick) user.SetServerName("cake", srv.Host) - assert.Equal(t, "cake", user.GetServers()[0].Name) + servers, err = user.GetServers() + assert.Equal(t, "cake", servers[0].Name) user.RemoveChannel(srv.Host, chan1.Name) - channels = user.GetChannels() + channels, err = user.GetChannels() assert.Len(t, channels, 1) assert.Equal(t, chan2, channels[0]) user.RemoveServer(srv.Host) - assert.Len(t, user.GetServers(), 0) - assert.Len(t, user.GetChannels(), 0) + servers, err = user.GetServers() + assert.Len(t, servers, 0) + channels, err = user.GetChannels() + assert.Len(t, channels, 0) user.Remove() - _, err = os.Stat(Path.User(user.Username)) + _, err = os.Stat(storage.Path.User(user.Username)) assert.True(t, os.IsNotExist(err)) - for _, storedUser := range LoadUsers() { - assert.NotEqual(t, user.ID, storedUser.ID) + users, err = storage.LoadUsers(db) + assert.Nil(t, err) + + for i := range users { + assert.NotEqual(t, user.ID, users[i].ID) } } func TestMessages(t *testing.T) { - Initialize(tempdir()) - Open() + storage.Initialize(tempdir()) - user, err := NewUser() + db, err := boltdb.New(storage.Path.Database()) assert.Nil(t, err) + user, err := storage.NewUser(db) + assert.Nil(t, err) + + os.MkdirAll(storage.Path.User(user.Username), 0700) + + search, err := bleve.New(storage.Path.Index(user.Username)) + assert.Nil(t, err) + + user.SetMessageStore(db) + user.SetMessageSearchProvider(search) + messages, hasMore, err := user.GetMessages("irc.freenode.net", "#go-nuts", 10, "6") assert.Nil(t, err) assert.False(t, hasMore) @@ -152,5 +170,5 @@ func TestMessages(t *testing.T) { assert.Nil(t, err) assert.True(t, len(messages) > 0) - Close() + db.Close() } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go index fb6afd5d..6989bbc9 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/introducer.go @@ -202,6 +202,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { s.nextSnapshotEpoch++ rootPrev := s.root s.root = newSnapshot + atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch) // release lock s.rootLock.Unlock() @@ -265,6 +266,7 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) { s.rootLock.Lock() rootPrev := s.root s.root = newIndexSnapshot + atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch) s.rootLock.Unlock() if rootPrev != nil { @@ -369,6 +371,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { s.nextSnapshotEpoch++ rootPrev := s.root s.root = newSnapshot + atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch) // release lock s.rootLock.Unlock() @@ -430,6 +433,8 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error { // swap in new snapshot rootPrev := s.root s.root = newSnapshot + + atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch) // release lock s.rootLock.Unlock() diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go index 41b734aa..171f33ae 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/merge.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/merge.go @@ -72,6 +72,8 @@ OUTER: } lastEpochMergePlanned = ourSnapshot.epoch + atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch) + s.fireEvent(EventKindMergerProgress, time.Since(startTime)) } _ = ourSnapshot.DecRef() diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go index cbc24cdb..c822ad0b 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/persister.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/persister.go @@ -109,6 +109,8 @@ OUTER: continue OUTER } + atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch) + lastPersistedEpoch = ourSnapshot.epoch for _, ew := range persistWatchers { close(ew.notifyCh) diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go index 31d31642..fe4f71b8 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/scorch.go @@ -424,7 +424,9 @@ func (s *Scorch) Reader() (index.IndexReader, error) { func (s *Scorch) currentSnapshot() *IndexSnapshot { s.rootLock.RLock() rv := s.root - rv.AddRef() + if rv != nil { + rv.AddRef() + } s.rootLock.RUnlock() return rv } @@ -508,14 +510,18 @@ func (s *Scorch) AddEligibleForRemoval(epoch uint64) { s.rootLock.Unlock() } -func (s *Scorch) MemoryUsed() uint64 { +func (s *Scorch) MemoryUsed() (memUsed uint64) { indexSnapshot := s.currentSnapshot() + if indexSnapshot == nil { + return + } + defer func() { _ = indexSnapshot.Close() }() // Account for current root snapshot overhead - memUsed := uint64(indexSnapshot.Size()) + memUsed += uint64(indexSnapshot.Size()) // Account for snapshot that the persister may be working on persistEpoch := atomic.LoadUint64(&s.iStats.persistEpoch) diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go index 736fa59f..c73cc6e5 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/dict.go @@ -28,11 +28,10 @@ import ( // Dictionary is the zap representation of the term dictionary type Dictionary struct { - sb *SegmentBase - field string - fieldID uint16 - fst *vellum.FST - fstReader *vellum.Reader + sb *SegmentBase + field string + fieldID uint16 + fst *vellum.FST } // PostingsList returns the postings list for the specified term @@ -47,14 +46,14 @@ func (d *Dictionary) PostingsList(term []byte, except *roaring.Bitmap, } func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { - if d.fstReader == nil { + if d.fst == nil { if rv == nil || rv == emptyPostingsList { return emptyPostingsList, nil } return d.postingsListInit(rv, except), nil } - postingsOffset, exists, err := d.fstReader.Get(term) + postingsOffset, exists, err := d.fst.Get(term) if err != nil { return nil, fmt.Errorf("vellum err: %v", err) } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go index d2896497..72ce1248 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/docvalues.go @@ -69,9 +69,9 @@ func (di *docValueReader) cloneInto(rv *docValueReader) *docValueReader { rv.curChunkNum = math.MaxUint64 rv.chunkOffsets = di.chunkOffsets // immutable, so it's sharable rv.dvDataLoc = di.dvDataLoc - rv.curChunkHeader = nil + rv.curChunkHeader = rv.curChunkHeader[:0] rv.curChunkData = nil - rv.uncompressed = nil + rv.uncompressed = rv.uncompressed[:0] return rv } @@ -150,7 +150,11 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error chunkMetaLoc := destChunkDataLoc + uint64(read) offset := uint64(0) - di.curChunkHeader = make([]MetaData, int(numDocs)) + if cap(di.curChunkHeader) < int(numDocs) { + di.curChunkHeader = make([]MetaData, int(numDocs)) + } else { + di.curChunkHeader = di.curChunkHeader[:int(numDocs)] + } for i := 0; i < int(numDocs); i++ { di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) @@ -301,12 +305,5 @@ func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string, // persisted doc value terms ready to be visitable using the // VisitDocumentFieldTerms method. func (s *Segment) VisitableDocValueFields() ([]string, error) { - rv := make([]string, 0, len(s.fieldDvReaders)) - for fieldID, field := range s.fieldsInv { - if dvIter, ok := s.fieldDvReaders[uint16(fieldID)]; ok && - dvIter != nil { - rv = append(rv, field) - } - } - return rv, nil + return s.fieldDvNames, nil } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go index dab09f6b..c735caad 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/merge.go @@ -599,8 +599,13 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, typs := make([][]byte, len(fieldsInv)) poss := make([][][]uint64, len(fieldsInv)) + var posBuf []uint64 + docNumOffsets := make([]uint64, newSegDocCount) + vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) + defer visitDocumentCtxPool.Put(vdc) + // for each segment for segI, segment := range segments { segNewDocNums := make([]uint64, segment.numDocs) @@ -639,17 +644,32 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, metaBuf.Reset() data = data[:0] + posTemp := posBuf + // collect all the data for i := 0; i < len(fieldsInv); i++ { vals[i] = vals[i][:0] typs[i] = typs[i][:0] poss[i] = poss[i][:0] } - err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool { + err := segment.visitDocument(vdc, docNum, func(field string, typ byte, value []byte, pos []uint64) bool { fieldID := int(fieldsMap[field]) - 1 vals[fieldID] = append(vals[fieldID], value) typs[fieldID] = append(typs[fieldID], typ) - poss[fieldID] = append(poss[fieldID], pos) + + // copy array positions to preserve them beyond the scope of this callback + var curPos []uint64 + if len(pos) > 0 { + if cap(posTemp) < len(pos) { + posBuf = make([]uint64, len(pos)*len(fieldsInv)) + posTemp = posBuf + } + curPos = posTemp[0:len(pos)] + copy(curPos, pos) + posTemp = posTemp[len(pos):] + } + poss[fieldID] = append(poss[fieldID], curPos) + return true }) if err != nil { diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go index 5f9a562f..0fd4e57c 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/segment/zap/segment.go @@ -99,6 +99,7 @@ type SegmentBase struct { docValueOffset uint64 dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field + fieldDvNames []string // field names cached in fieldDvReaders size uint64 } @@ -265,10 +266,6 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { if err != nil { return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err) } - rv.fstReader, err = rv.fst.Reader() - if err != nil { - return nil, fmt.Errorf("dictionary field %s vellum Reader err: %v", field, err) - } } } } @@ -294,10 +291,15 @@ var visitDocumentCtxPool = sync.Pool{ // VisitDocument invokes the DocFieldValueVistor for each stored field // for the specified doc number func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { + vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) + defer visitDocumentCtxPool.Put(vdc) + return s.visitDocument(vdc, num, visitor) +} + +func (s *SegmentBase) visitDocument(vdc *visitDocumentCtx, num uint64, + visitor segment.DocumentFieldValueVisitor) error { // first make sure this is a valid number in this segment if num < s.numDocs { - vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) - meta, compressed := s.getDocStoredMetaAndCompressed(num) vdc.reader.Reset(meta) @@ -367,7 +369,6 @@ func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldVal } vdc.buf = uncompressed - visitDocumentCtxPool.Put(vdc) } return nil } @@ -528,7 +529,12 @@ func (s *SegmentBase) loadDvReaders() error { } read += uint64(n) - s.fieldDvReaders[uint16(fieldID)], _ = s.loadFieldDocValueReader(field, fieldLocStart, fieldLocEnd) + fieldDvReader, _ := s.loadFieldDocValueReader(field, fieldLocStart, fieldLocEnd) + if fieldDvReader != nil { + s.fieldDvReaders[uint16(fieldID)] = fieldDvReader + s.fieldDvNames = append(s.fieldDvNames, field) + } } + return nil } diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go index edb986d0..6b615cd1 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_index.go @@ -15,7 +15,6 @@ package scorch import ( - "bytes" "container/heap" "encoding/binary" "fmt" @@ -314,21 +313,26 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) { segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) rv = document.NewDocument(id) - err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, value []byte, pos []uint64) bool { + err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool { if name == "_id" { return true } + + // copy value, array positions to preserve them beyond the scope of this callback + value := append([]byte(nil), val...) + arrayPos := append([]uint64(nil), pos...) + switch typ { case 't': - rv.AddField(document.NewTextField(name, pos, value)) + rv.AddField(document.NewTextField(name, arrayPos, value)) case 'n': - rv.AddField(document.NewNumericFieldFromBytes(name, pos, value)) + rv.AddField(document.NewNumericFieldFromBytes(name, arrayPos, value)) case 'd': - rv.AddField(document.NewDateTimeFieldFromBytes(name, pos, value)) + rv.AddField(document.NewDateTimeFieldFromBytes(name, arrayPos, value)) case 'b': - rv.AddField(document.NewBooleanFieldFromBytes(name, pos, value)) + rv.AddField(document.NewBooleanFieldFromBytes(name, arrayPos, value)) case 'g': - rv.AddField(document.NewGeoPointFieldFromBytes(name, pos, value)) + rv.AddField(document.NewGeoPointFieldFromBytes(name, arrayPos, value)) } return true @@ -492,124 +496,117 @@ func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, } func (i *IndexSnapshot) documentVisitFieldTerms(id index.IndexInternalID, - fields []string, visitor index.DocumentFieldTermVisitor, dvs segment.DocVisitState) ( - segment.DocVisitState, error) { - + fields []string, visitor index.DocumentFieldTermVisitor, + dvs segment.DocVisitState) (segment.DocVisitState, error) { docNum, err := docInternalToNumber(id) if err != nil { return nil, err } + segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) if segmentIndex >= len(i.segment) { return nil, nil } + _, dvs, err = i.documentVisitFieldTermsOnSegment( + segmentIndex, localDocNum, fields, nil, visitor, dvs) + + return dvs, err +} + +func (i *IndexSnapshot) documentVisitFieldTermsOnSegment( + segmentIndex int, localDocNum uint64, fields []string, cFields []string, + visitor index.DocumentFieldTermVisitor, dvs segment.DocVisitState) ( + cFieldsOut []string, dvsOut segment.DocVisitState, err error) { ss := i.segment[segmentIndex] - if zaps, ok := ss.segment.(segment.DocumentFieldTermVisitable); ok { - // get the list of doc value persisted fields - pFields, err := zaps.VisitableDocValueFields() + var vFields []string // fields that are visitable via the segment + + ssv, ssvOk := ss.segment.(segment.DocumentFieldTermVisitable) + if ssvOk && ssv != nil { + vFields, err = ssv.VisitableDocValueFields() if err != nil { - return nil, err + return nil, nil, err } - // assort the fields for which terms look up have to - // be performed runtime - dvPendingFields := extractDvPendingFields(fields, pFields) - // all fields are doc value persisted - if len(dvPendingFields) == 0 { - return zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor, dvs) + } + + var errCh chan error + + // cFields represents the fields that we'll need from the + // cachedDocs, and might be optionally be provided by the caller, + // if the caller happens to know we're on the same segmentIndex + // from a previous invocation + if cFields == nil { + cFields = subtractStrings(fields, vFields) + + if !ss.cachedDocs.hasFields(cFields) { + errCh = make(chan error, 1) + + go func() { + err := ss.cachedDocs.prepareFields(cFields, ss) + if err != nil { + errCh <- err + } + close(errCh) + }() } + } - // concurrently trigger the runtime doc value preparations for - // pending fields as well as the visit of the persisted doc values - errCh := make(chan error, 1) - - go func() { - defer close(errCh) - err := ss.cachedDocs.prepareFields(dvPendingFields, ss) - if err != nil { - errCh <- err - } - }() - - // visit the requested persisted dv while the cache preparation in progress - dvs, err = zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor, dvs) + if ssvOk && ssv != nil && len(vFields) > 0 { + dvs, err = ssv.VisitDocumentFieldTerms(localDocNum, fields, visitor, dvs) if err != nil { - return nil, err + return nil, nil, err } + } - // err out if fieldCache preparation failed + if errCh != nil { err = <-errCh if err != nil { - return nil, err - } - - visitDocumentFieldCacheTerms(localDocNum, dvPendingFields, ss, visitor) - return dvs, nil - } - - return dvs, prepareCacheVisitDocumentFieldTerms(localDocNum, fields, ss, visitor) -} - -func prepareCacheVisitDocumentFieldTerms(localDocNum uint64, fields []string, - ss *SegmentSnapshot, visitor index.DocumentFieldTermVisitor) error { - err := ss.cachedDocs.prepareFields(fields, ss) - if err != nil { - return err - } - - visitDocumentFieldCacheTerms(localDocNum, fields, ss, visitor) - return nil -} - -func visitDocumentFieldCacheTerms(localDocNum uint64, fields []string, - ss *SegmentSnapshot, visitor index.DocumentFieldTermVisitor) { - - for _, field := range fields { - if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists { - if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists { - for { - i := bytes.Index(tlist, TermSeparatorSplitSlice) - if i < 0 { - break - } - visitor(field, tlist[0:i]) - tlist = tlist[i+1:] - } - } + return nil, nil, err } } -} - -func extractDvPendingFields(requestedFields, persistedFields []string) []string { - removeMap := make(map[string]struct{}, len(persistedFields)) - for _, str := range persistedFields { - removeMap[str] = struct{}{} + if len(cFields) > 0 { + ss.cachedDocs.visitDoc(localDocNum, cFields, visitor) } - rv := make([]string, 0, len(requestedFields)) - for _, s := range requestedFields { - if _, ok := removeMap[s]; !ok { - rv = append(rv, s) - } - } - return rv + return cFields, dvs, nil } -func (i *IndexSnapshot) DocValueReader(fields []string) (index.DocValueReader, error) { - return &DocValueReader{i: i, fields: fields}, nil +func (i *IndexSnapshot) DocValueReader(fields []string) ( + index.DocValueReader, error) { + return &DocValueReader{i: i, fields: fields, currSegmentIndex: -1}, nil } type DocValueReader struct { i *IndexSnapshot fields []string dvs segment.DocVisitState + + currSegmentIndex int + currCachedFields []string } func (dvr *DocValueReader) VisitDocValues(id index.IndexInternalID, visitor index.DocumentFieldTermVisitor) (err error) { - dvr.dvs, err = dvr.i.documentVisitFieldTerms(id, dvr.fields, visitor, dvr.dvs) + docNum, err := docInternalToNumber(id) + if err != nil { + return err + } + + segmentIndex, localDocNum := dvr.i.segmentIndexAndLocalDocNumFromGlobal(docNum) + if segmentIndex >= len(dvr.i.segment) { + return nil + } + + if dvr.currSegmentIndex != segmentIndex { + dvr.currSegmentIndex = segmentIndex + dvr.currCachedFields = nil + } + + dvr.currCachedFields, dvr.dvs, err = dvr.i.documentVisitFieldTermsOnSegment( + dvr.currSegmentIndex, localDocNum, dvr.fields, dvr.currCachedFields, visitor, dvr.dvs) + return err } @@ -636,3 +633,22 @@ func (i *IndexSnapshot) DumpFields() chan interface{} { }() return rv } + +// subtractStrings returns set a minus elements of set b. +func subtractStrings(a, b []string) []string { + if len(b) <= 0 { + return a + } + + rv := make([]string, 0, len(a)) +OUTER: + for _, as := range a { + for _, bs := range b { + if as == bs { + continue OUTER + } + } + rv = append(rv, as) + } + return rv +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_segment.go b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_segment.go index 4053244d..90dbcb49 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_segment.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/snapshot_segment.go @@ -15,10 +15,12 @@ package scorch import ( + "bytes" "sync" "sync/atomic" "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" "github.com/blevesearch/bleve/size" ) @@ -106,7 +108,6 @@ func (s *SegmentSnapshot) DocID(num uint64) ([]byte, error) { } func (s *SegmentSnapshot) Count() uint64 { - rv := s.segment.Count() if s.deleted != nil { rv -= s.deleted.GetCardinality() @@ -166,7 +167,7 @@ type cachedFieldDocs struct { size uint64 } -func (cfd *cachedFieldDocs) prepareFields(field string, ss *SegmentSnapshot) { +func (cfd *cachedFieldDocs) prepareField(field string, ss *SegmentSnapshot) { defer close(cfd.readyCh) cfd.size += uint64(size.SizeOfUint64) /* size field */ @@ -222,6 +223,7 @@ type cachedDocs struct { func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) error { c.m.Lock() + if c.cache == nil { c.cache = make(map[string]*cachedFieldDocs, len(ss.Fields())) } @@ -234,7 +236,7 @@ func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) e docs: make(map[uint64][]byte), } - go c.cache[field].prepareFields(field, ss) + go c.cache[field].prepareField(field, ss) } } @@ -248,12 +250,26 @@ func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) e } c.m.Lock() } + c.updateSizeLOCKED() c.m.Unlock() return nil } +// hasFields returns true if the cache has all the given fields +func (c *cachedDocs) hasFields(fields []string) bool { + c.m.Lock() + for _, field := range fields { + if _, exists := c.cache[field]; !exists { + c.m.Unlock() + return false // found a field not in cache + } + } + c.m.Unlock() + return true +} + func (c *cachedDocs) Size() int { return int(atomic.LoadUint64(&c.size)) } @@ -270,3 +286,29 @@ func (c *cachedDocs) updateSizeLOCKED() { } atomic.StoreUint64(&c.size, uint64(sizeInBytes)) } + +func (c *cachedDocs) visitDoc(localDocNum uint64, + fields []string, visitor index.DocumentFieldTermVisitor) { + c.m.Lock() + + for _, field := range fields { + if cachedFieldDocs, exists := c.cache[field]; exists { + c.m.Unlock() + <-cachedFieldDocs.readyCh + c.m.Lock() + + if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists { + for { + i := bytes.Index(tlist, TermSeparatorSplitSlice) + if i < 0 { + break + } + visitor(field, tlist[0:i]) + tlist = tlist[i+1:] + } + } + } + } + + c.m.Unlock() +} diff --git a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go index e9bcd91d..d4e07f6b 100644 --- a/vendor/github.com/blevesearch/bleve/index/scorch/stats.go +++ b/vendor/github.com/blevesearch/bleve/index/scorch/stats.go @@ -33,6 +33,10 @@ type Stats struct { TotBatchIntroTime uint64 MaxBatchIntroTime uint64 + CurRootEpoch uint64 + LastPersistedEpoch uint64 + LastMergedEpoch uint64 + TotOnErrors uint64 TotAnalysisTime uint64 diff --git a/vendor/github.com/blevesearch/bleve/numeric/prefix_coded.go b/vendor/github.com/blevesearch/bleve/numeric/prefix_coded.go index 4200c23b..76ea001b 100644 --- a/vendor/github.com/blevesearch/bleve/numeric/prefix_coded.go +++ b/vendor/github.com/blevesearch/bleve/numeric/prefix_coded.go @@ -77,6 +77,10 @@ func (p PrefixCoded) Int64() (int64, error) { } func ValidPrefixCodedTerm(p string) (bool, int) { + return ValidPrefixCodedTermBytes([]byte(p)) +} + +func ValidPrefixCodedTermBytes(p []byte) (bool, int) { if len(p) > 0 { if p[0] < ShiftStartInt64 || p[0] > ShiftStartInt64+63 { return false, 0 diff --git a/vendor/github.com/blevesearch/bleve/search/sort.go b/vendor/github.com/blevesearch/bleve/search/sort.go index 6afc9789..e17f7078 100644 --- a/vendor/github.com/blevesearch/bleve/search/sort.go +++ b/vendor/github.com/blevesearch/bleve/search/sort.go @@ -15,6 +15,7 @@ package search import ( + "bytes" "encoding/json" "fmt" "math" @@ -342,14 +343,15 @@ type SortField struct { Type SortFieldType Mode SortFieldMode Missing SortFieldMissing - values []string + values [][]byte + tmp [][]byte } // UpdateVisitor notifies this sort field that in this document // this field has the specified term func (s *SortField) UpdateVisitor(field string, term []byte) { if field == s.Field { - s.values = append(s.values, string(term)) + s.values = append(s.values, term) } } @@ -359,7 +361,7 @@ func (s *SortField) UpdateVisitor(field string, term []byte) { func (s *SortField) Value(i *DocumentMatch) string { iTerms := s.filterTermsByType(s.values) iTerm := s.filterTermsByMode(iTerms) - s.values = nil + s.values = s.values[:0] return iTerm } @@ -368,17 +370,17 @@ func (s *SortField) Descending() bool { return s.Desc } -func (s *SortField) filterTermsByMode(terms []string) string { +func (s *SortField) filterTermsByMode(terms [][]byte) string { if len(terms) == 1 || (len(terms) > 1 && s.Mode == SortFieldDefault) { - return terms[0] + return string(terms[0]) } else if len(terms) > 1 { switch s.Mode { case SortFieldMin: - sort.Strings(terms) - return terms[0] + sort.Sort(BytesSlice(terms)) + return string(terms[0]) case SortFieldMax: - sort.Strings(terms) - return terms[len(terms)-1] + sort.Sort(BytesSlice(terms)) + return string(terms[len(terms)-1]) } } @@ -400,13 +402,13 @@ func (s *SortField) filterTermsByMode(terms []string) string { // return only the terms which had shift of 0 // if we are in explicit number or date mode, return only valid // prefix coded numbers with shift of 0 -func (s *SortField) filterTermsByType(terms []string) []string { +func (s *SortField) filterTermsByType(terms [][]byte) [][]byte { stype := s.Type if stype == SortFieldAuto { allTermsPrefixCoded := true - var termsWithShiftZero []string + termsWithShiftZero := s.tmp[:0] for _, term := range terms { - valid, shift := numeric.ValidPrefixCodedTerm(term) + valid, shift := numeric.ValidPrefixCodedTermBytes(term) if valid && shift == 0 { termsWithShiftZero = append(termsWithShiftZero, term) } else if !valid { @@ -415,16 +417,18 @@ func (s *SortField) filterTermsByType(terms []string) []string { } if allTermsPrefixCoded { terms = termsWithShiftZero + s.tmp = termsWithShiftZero[:0] } } else if stype == SortFieldAsNumber || stype == SortFieldAsDate { - var termsWithShiftZero []string + termsWithShiftZero := s.tmp[:0] for _, term := range terms { - valid, shift := numeric.ValidPrefixCodedTerm(term) + valid, shift := numeric.ValidPrefixCodedTermBytes(term) if valid && shift == 0 { termsWithShiftZero = append(termsWithShiftZero, term) } } terms = termsWithShiftZero + s.tmp = termsWithShiftZero[:0] } return terms } @@ -619,7 +623,7 @@ func (s *SortGeoDistance) UpdateVisitor(field string, term []byte) { func (s *SortGeoDistance) Value(i *DocumentMatch) string { iTerms := s.filterTermsByType(s.values) iTerm := s.filterTermsByMode(iTerms) - s.values = nil + s.values = s.values[:0] if iTerm == "" { return maxDistance @@ -700,3 +704,9 @@ func (s *SortGeoDistance) Copy() SearchSort { rv := *s return &rv } + +type BytesSlice [][]byte + +func (p BytesSlice) Len() int { return len(p) } +func (p BytesSlice) Less(i, j int) bool { return bytes.Compare(p[i], p[j]) < 0 } +func (p BytesSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/vendor/vendor.json b/vendor/vendor.json index 2a437f94..15a66d98 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -11,230 +11,236 @@ { "checksumSHA1": "W+LrvPPrjucuzGEmslEPztRDDOI=", "path": "github.com/blevesearch/bleve", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "XX5+Amhdr+mxVY7iDzanrQrcNyI=", "path": "github.com/blevesearch/bleve/analysis", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "OM2QW7G5DfzaUzCoe23282875TE=", "path": "github.com/blevesearch/bleve/analysis/analyzer/keyword", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "IefDmVwLU3UiILeN35DA25gPFnc=", "path": "github.com/blevesearch/bleve/analysis/analyzer/standard", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "P+ay5l3LO/xoWJXKfyK4Ma1hGvw=", "path": "github.com/blevesearch/bleve/analysis/datetime/flexible", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "uIHCAnZoB7dKDPFc3SkiO1hN4BY=", "path": "github.com/blevesearch/bleve/analysis/datetime/optional", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "AdhWAC/hkZLFXUcihmzhMspNk3w=", "path": "github.com/blevesearch/bleve/analysis/lang/en", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" + }, + { + "checksumSHA1": "5rJgE+eR0dB+cjHkENWqTKfX0T8=", + "path": "github.com/blevesearch/bleve/analysis/token/keyword", + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "3VIPkl12t1ko4y6DkbPcz+MtQjY=", "path": "github.com/blevesearch/bleve/analysis/token/lowercase", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "QOw3ypU4VTmFT8XYS/52P3RILZw=", "path": "github.com/blevesearch/bleve/analysis/token/porter", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "8wCAW8E4SO7gGxt0tsr4NZ4APIg=", "path": "github.com/blevesearch/bleve/analysis/token/stop", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "Lnopn2j55CFd15EBle12dzqQar8=", "path": "github.com/blevesearch/bleve/analysis/tokenizer/single", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "q7C04nlJLxKmemXLop0oyJhfi5M=", "path": "github.com/blevesearch/bleve/analysis/tokenizer/unicode", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "+vKBTffiCd1lsVOahRE1H3/eIuo=", "path": "github.com/blevesearch/bleve/document", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "8+NkVEqldBSg13whAM0Fgk0aIQU=", "path": "github.com/blevesearch/bleve/geo", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "BD1BDYaRaKBUHfeoXr7Om1G/h+k=", "path": "github.com/blevesearch/bleve/index", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { - "checksumSHA1": "lxqhrjo3SYry9yRCfuJmVqSHLAE=", + "checksumSHA1": "ksbZyEYxUW3IJzvHN+l5fDXzbH0=", "path": "github.com/blevesearch/bleve/index/scorch", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "0Ef3ooWYliWUWCa9YdNJ1T3sJFk=", "path": "github.com/blevesearch/bleve/index/scorch/mergeplan", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "gQgYsSMtCzm01zvuI52qGEPAio4=", "path": "github.com/blevesearch/bleve/index/scorch/segment", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { - "checksumSHA1": "ucFyMsvVO6Dw5kkmejVKVHDBA+I=", + "checksumSHA1": "0e/pIoPrfIu5tU511Dxv7WU3ZJk=", "path": "github.com/blevesearch/bleve/index/scorch/segment/zap", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "3ttI5qH9k/gOBaW8FJFVmOh5oIA=", "path": "github.com/blevesearch/bleve/index/store", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "9cJS6D7IAwrzK/opywK0ZgAmpTQ=", "path": "github.com/blevesearch/bleve/index/store/boltdb", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "yeAX9ygUYTMbFpL20NJ0MjR7u6M=", "path": "github.com/blevesearch/bleve/index/store/gtreap", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "9HX6569+W5I72PAtzoUkwi2s8xs=", "path": "github.com/blevesearch/bleve/index/upsidedown", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "7/6MZFLZzfBAsuOWTFs79xomnBE=", "path": "github.com/blevesearch/bleve/mapping", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { - "checksumSHA1": "Qyi8BmpvHc83X9J06QB7GV7O+6M=", + "checksumSHA1": "UnotAMIXNVNwOZvPeJAYFhYp9vg=", "path": "github.com/blevesearch/bleve/numeric", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "Qj1wH6TzvIl4OAiPQaFDpkWvwLM=", "path": "github.com/blevesearch/bleve/registry", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { - "checksumSHA1": "zkRYi4evy7/mBB0fGgpeT/F2lfw=", + "checksumSHA1": "1TjupJvROj0OOzdiL5OTe1JbJKg=", "path": "github.com/blevesearch/bleve/search", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "DFJ6M+PN7kH10K9ZaRoO62uMHQU=", "path": "github.com/blevesearch/bleve/search/collector", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "h38ir3/VB/uR5txN0sfk1hBrIaw=", "path": "github.com/blevesearch/bleve/search/facet", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "J/bdoPp+OZ6vSqsXF10484C7asc=", "path": "github.com/blevesearch/bleve/search/highlight", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "rAz4wfq/O/Tx5aYz/6BN09jm0io=", "path": "github.com/blevesearch/bleve/search/highlight/format/html", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "JQCH82+IdGvTtmKn+rDxCDxISxI=", "path": "github.com/blevesearch/bleve/search/highlight/fragmenter/simple", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "/4Q1eosaGj0eU+F4YWQRdaOS5XA=", "path": "github.com/blevesearch/bleve/search/highlight/highlighter/html", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "m4s4+yGUKuSVYHDOQpzSZ8Jdeyg=", "path": "github.com/blevesearch/bleve/search/highlight/highlighter/simple", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "3c9y+4nTwE5+iW4tdAPAk9M181U=", "path": "github.com/blevesearch/bleve/search/query", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "WnfAv5lWULhk5H/DE7roBVQoJOU=", "path": "github.com/blevesearch/bleve/search/scorer", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "Lu0Efd4WmYV5ildYZ88dExUV640=", "path": "github.com/blevesearch/bleve/search/searcher", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "lycEaUs7grxzfMYWTt+p/IniQsE=", "path": "github.com/blevesearch/bleve/size", - "revision": "ecf672f9bf46edfafa0262cbe05cc943b72ff48b", - "revisionTime": "2018-05-03T18:49:31Z" + "revision": "1d6d47ed3ad966075bf9162fee4caa5d8984733c", + "revisionTime": "2018-05-25T17:44:03Z" }, { "checksumSHA1": "F6iBQThfd04TIlxT49zaPRGvlqE=",