Refactor websocket handler

This commit is contained in:
Ken-Håvard Lieng 2015-06-17 00:46:58 +02:00
parent 047027ddec
commit 114bf8201e
8 changed files with 257 additions and 226 deletions

File diff suppressed because one or more lines are too long

View File

@ -59,7 +59,7 @@ class Socket extends EventEmitter {
this.send('pong'); this.send('pong');
} }
this.emit(msg.type, msg.response); this.emit(msg.type, msg.data);
}; };
} }
@ -68,7 +68,7 @@ class Socket extends EventEmitter {
} }
send(type, data) { send(type, data) {
this.ws.send(JSON.stringify({ type, request: data })); this.ws.send(JSON.stringify({ type, data }));
} }
setTimeoutPing() { setTimeoutPing() {

View File

@ -43,9 +43,9 @@ func dispatchMessage(msg *irc.Message) WSResponse {
return <-s.out return <-s.out
} }
func checkResponse(t *testing.T, expectedType string, expectedResponse interface{}, res WSResponse) { func checkResponse(t *testing.T, expectedType string, expectedData interface{}, res WSResponse) {
assert.Equal(t, expectedType, res.Type) assert.Equal(t, expectedType, res.Type)
assert.Equal(t, expectedResponse, res.Response) assert.Equal(t, expectedData, res.Data)
} }
func TestHandleIRCNick(t *testing.T) { func TestHandleIRCNick(t *testing.T) {

View File

@ -8,12 +8,12 @@ import (
type WSRequest struct { type WSRequest struct {
Type string `json:"type"` Type string `json:"type"`
Request json.RawMessage `json:"request"` Data json.RawMessage `json:"data"`
} }
type WSResponse struct { type WSResponse struct {
Type string `json:"type"` Type string `json:"type"`
Response interface{} `json:"response"` Data interface{} `json:"data"`
} }
type Connect struct { type Connect struct {

View File

@ -59,7 +59,7 @@ func upgradeWS(w http.ResponseWriter, r *http.Request) {
return return
} }
handleWS(conn) newWSHandler(conn).run()
} }
func reconnect() { func reconnect() {

View File

@ -11,7 +11,7 @@ type Session struct {
irc map[string]*irc.Client irc map[string]*irc.Client
ircLock sync.Mutex ircLock sync.Mutex
ws map[string]*conn ws map[string]*wsConn
wsLock sync.Mutex wsLock sync.Mutex
out chan WSResponse out chan WSResponse
@ -21,7 +21,7 @@ type Session struct {
func NewSession() *Session { func NewSession() *Session {
return &Session{ return &Session{
irc: make(map[string]*irc.Client), irc: make(map[string]*irc.Client),
ws: make(map[string]*conn), ws: make(map[string]*wsConn),
out: make(chan WSResponse, 32), out: make(chan WSResponse, 32),
} }
} }
@ -54,7 +54,7 @@ func (s *Session) numIRC() int {
return n return n
} }
func (s *Session) setWS(addr string, w *conn) { func (s *Session) setWS(addr string, w *wsConn) {
s.wsLock.Lock() s.wsLock.Lock()
s.ws[addr] = w s.ws[addr] = w
s.wsLock.Unlock() s.wsLock.Unlock()

View File

@ -6,21 +6,21 @@ import (
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/gorilla/websocket" "github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/gorilla/websocket"
) )
type conn struct { type wsConn struct {
conn *websocket.Conn conn *websocket.Conn
in chan WSRequest in chan WSRequest
out chan WSResponse out chan WSResponse
} }
func newConn(ws *websocket.Conn) *conn { func newWSConn(conn *websocket.Conn) *wsConn {
return &conn{ return &wsConn{
conn: ws, conn: conn,
in: make(chan WSRequest, 32), in: make(chan WSRequest, 32),
out: make(chan WSResponse, 32), out: make(chan WSResponse, 32),
} }
} }
func (c *conn) send() { func (c *wsConn) send() {
var err error var err error
ping := time.Tick(20 * time.Second) ping := time.Tick(20 * time.Second)
@ -43,7 +43,7 @@ func (c *conn) send() {
} }
} }
func (c *conn) recv() { func (c *wsConn) recv() {
var req WSRequest var req WSRequest
for { for {
@ -57,6 +57,7 @@ func (c *conn) recv() {
} }
} }
func (c *conn) close() { func (c *wsConn) close() {
close(c.out) close(c.out)
c.conn.Close()
} }

View File

@ -11,82 +11,97 @@ import (
"github.com/khlieng/name_pending/storage" "github.com/khlieng/name_pending/storage"
) )
func handleWS(conn *websocket.Conn) { type wsHandler struct {
defer conn.Close() ws *wsConn
session *Session
var session *Session uuid string
var UUID string addr string
addr := conn.RemoteAddr().String() handlers map[string]func([]byte)
ws := newConn(conn)
defer ws.close()
go ws.send()
go ws.recv()
log.Println(addr, "connected")
for {
req, ok := <-ws.in
if !ok {
if session != nil {
session.deleteWS(addr)
} }
log.Println(addr, "disconnected") func newWSHandler(conn *websocket.Conn) *wsHandler {
h := &wsHandler{
ws: newWSConn(conn),
addr: conn.RemoteAddr().String(),
}
h.initHandlers()
return h
}
func (h *wsHandler) run() {
defer h.ws.close()
go h.ws.send()
go h.ws.recv()
for {
req, ok := <-h.ws.in
if !ok {
if h.session != nil {
h.session.deleteWS(h.addr)
}
return return
} }
switch req.Type { h.dispatchRequest(req)
case "uuid": }
json.Unmarshal(req.Request, &UUID) }
log.Println(addr, "set UUID", UUID) func (h *wsHandler) dispatchRequest(req WSRequest) {
if handler, ok := h.handlers[req.Type]; ok {
handler(req.Data)
}
}
func (h *wsHandler) init(b []byte) {
json.Unmarshal(b, &h.uuid)
log.Println(h.addr, "set UUID", h.uuid)
sessionLock.Lock() sessionLock.Lock()
if storedSession, exists := sessions[h.uuid]; exists {
if storedSession, exists := sessions[UUID]; exists {
sessionLock.Unlock() sessionLock.Unlock()
session = storedSession h.session = storedSession
session.setWS(addr, ws) h.session.setWS(h.addr, h.ws)
log.Println(addr, "attached to", session.numIRC(), "existing IRC connections") log.Println(h.addr, "attached to", h.session.numIRC(), "existing IRC connections")
channels := session.user.GetChannels() channels := h.session.user.GetChannels()
for i, channel := range channels { for i, channel := range channels {
channels[i].Topic = channelStore.GetTopic(channel.Server, channel.Name) channels[i].Topic = channelStore.GetTopic(channel.Server, channel.Name)
} }
session.sendJSON("channels", channels) h.session.sendJSON("channels", channels)
session.sendJSON("servers", session.user.GetServers()) h.session.sendJSON("servers", h.session.user.GetServers())
for _, channel := range channels { for _, channel := range channels {
session.sendJSON("users", Userlist{ h.session.sendJSON("users", Userlist{
Server: channel.Server, Server: channel.Server,
Channel: channel.Name, Channel: channel.Name,
Users: channelStore.GetUsers(channel.Server, channel.Name), Users: channelStore.GetUsers(channel.Server, channel.Name),
}) })
} }
} else { } else {
session = NewSession() h.session = NewSession()
session.user = storage.NewUser(UUID) h.session.user = storage.NewUser(h.uuid)
sessions[UUID] = session sessions[h.uuid] = h.session
sessionLock.Unlock() sessionLock.Unlock()
session.setWS(addr, ws) h.session.setWS(h.addr, h.ws)
session.sendJSON("servers", nil) h.session.sendJSON("servers", nil)
go session.write() go h.session.write()
}
} }
case "connect": func (h *wsHandler) connect(b []byte) {
var data Connect var data Connect
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if _, ok := h.session.getIRC(data.Server); !ok {
log.Println(h.addr, "connecting to", data.Server)
if _, ok := session.getIRC(data.Server); !ok {
log.Println(addr, "connecting to", data.Server)
i := irc.NewClient(data.Nick, data.Username) i := irc.NewClient(data.Nick, data.Username)
i.TLS = data.TLS i.TLS = data.TLS
@ -94,15 +109,15 @@ func handleWS(conn *websocket.Conn) {
i.Realname = data.Realname i.Realname = data.Realname
if idx := strings.Index(data.Server, ":"); idx < 0 { if idx := strings.Index(data.Server, ":"); idx < 0 {
session.setIRC(data.Server, i) h.session.setIRC(data.Server, i)
} else { } else {
session.setIRC(data.Server[:idx], i) h.session.setIRC(data.Server[:idx], i)
} }
i.Connect(data.Server) i.Connect(data.Server)
go newIRCHandler(i, session).run() go newIRCHandler(i, h.session).run()
session.user.AddServer(storage.Server{ h.session.user.AddServer(storage.Server{
Name: data.Name, Name: data.Name,
Address: i.Host, Address: i.Host,
TLS: data.TLS, TLS: data.TLS,
@ -112,112 +127,127 @@ func handleWS(conn *websocket.Conn) {
Realname: data.Realname, Realname: data.Realname,
}) })
} else { } else {
log.Println(addr, "already connected to", data.Server) log.Println(h.addr, "already connected to", data.Server)
}
} }
case "join": func (h *wsHandler) join(b []byte) {
var data Join var data Join
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Join(data.Channels...) i.Join(data.Channels...)
} }
}
case "part": func (h *wsHandler) part(b []byte) {
var data Part var data Part
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Part(data.Channels...) i.Part(data.Channels...)
} }
case "quit":
var data Quit
json.Unmarshal(req.Request, &data)
if i, ok := session.getIRC(data.Server); ok {
i.Quit()
session.deleteIRC(data.Server)
channelStore.RemoveUserAll(i.GetNick(), data.Server)
session.user.RemoveServer(data.Server)
} }
case "chat": func (h *wsHandler) quit(b []byte) {
var data Quit
json.Unmarshal(b, &data)
if i, ok := h.session.getIRC(data.Server); ok {
i.Quit()
h.session.deleteIRC(data.Server)
channelStore.RemoveUserAll(i.GetNick(), data.Server)
h.session.user.RemoveServer(data.Server)
}
}
func (h *wsHandler) chat(b []byte) {
var data Chat var data Chat
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Privmsg(data.To, data.Message) i.Privmsg(data.To, data.Message)
} }
case "nick":
var data Nick
json.Unmarshal(req.Request, &data)
if i, ok := session.getIRC(data.Server); ok {
i.Nick(data.New)
session.user.SetNick(data.New, data.Server)
} }
case "invite": func (h *wsHandler) nick(b []byte) {
var data Nick
json.Unmarshal(b, &data)
if i, ok := h.session.getIRC(data.Server); ok {
i.Nick(data.New)
h.session.user.SetNick(data.New, data.Server)
}
}
func (h *wsHandler) invite(b []byte) {
var data Invite var data Invite
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Invite(data.User, data.Channel) i.Invite(data.User, data.Channel)
} }
}
case "kick": func (h *wsHandler) kick(b []byte) {
var data Invite var data Invite
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Kick(data.Channel, data.User) i.Kick(data.Channel, data.User)
} }
}
case "whois": func (h *wsHandler) whois(b []byte) {
var data Whois var data Whois
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) if i, ok := h.session.getIRC(data.Server); ok {
if i, ok := session.getIRC(data.Server); ok {
i.Whois(data.User) i.Whois(data.User)
} }
case "away":
var data Away
json.Unmarshal(req.Request, &data)
if i, ok := session.getIRC(data.Server); ok {
i.Away(data.Message)
} }
case "search": func (h *wsHandler) away(b []byte) {
var data Away
json.Unmarshal(b, &data)
if i, ok := h.session.getIRC(data.Server); ok {
i.Away(data.Message)
}
}
func (h *wsHandler) search(b []byte) {
go func() { go func() {
var data SearchRequest var data SearchRequest
json.Unmarshal(b, &data)
json.Unmarshal(req.Request, &data) results, err := h.session.user.SearchMessages(data.Server, data.Channel, data.Phrase)
results, err := session.user.SearchMessages(data.Server, data.Channel, data.Phrase)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
session.sendJSON("search", SearchResult{ h.session.sendJSON("search", SearchResult{
Server: data.Server, Server: data.Server,
Channel: data.Channel, Channel: data.Channel,
Results: results, Results: results,
}) })
}() }()
} }
func (h *wsHandler) initHandlers() {
h.handlers = map[string]func([]byte){
"uuid": h.init,
"connect": h.connect,
"join": h.join,
"part": h.part,
"quit": h.quit,
"chat": h.chat,
"nick": h.nick,
"invite": h.invite,
"kick": h.kick,
"whois": h.whois,
"away": h.away,
"search": h.search,
} }
} }