Reconnect websockets

This commit is contained in:
Ken-Håvard Lieng 2015-06-04 02:06:17 +02:00
parent 6a1d55c968
commit be695a9881
6 changed files with 103 additions and 26 deletions

File diff suppressed because one or more lines are too long

View File

@ -26,13 +26,14 @@
"yargs": "~1.3.3"
},
"dependencies": {
"autolinker": "khlieng/Autolinker.js",
"backo": "^1.1.0",
"immutable": "~3.7.2",
"lodash": "3.8.0",
"reflux": "0.2.7",
"react-router": "0.13.3",
"react": "0.13.3",
"react-infinite": "0.3.4",
"autolinker": "khlieng/Autolinker.js",
"immutable": "~3.7.2",
"react-pure-render": "~1.0.1"
"react-pure-render": "~1.0.1",
"react-router": "0.13.3",
"reflux": "0.2.7"
}
}

View File

@ -1,23 +1,85 @@
var EventEmitter = require('events').EventEmitter;
var Backoff = require('backo');
class Socket extends EventEmitter {
constructor() {
super();
this.connectTimeout = 20000;
this.pingTimeout = 30000;
this.backoff = new Backoff({
min: 1000,
max: 5000,
jitter: 0.25
});
this.connect();
}
connect() {
this.ws = new WebSocket('ws://' + window.location.host + '/ws');
this.ws.onopen = () => this.emit('connect');
this.ws.onclose = () => this.emit('disconnect');
this.timeoutConnect = setTimeout(() => {
this.ws.close();
this.retry();
}, this.connectTimeout);
this.ws.onopen = () => {
clearTimeout(this.timeoutConnect);
this.backoff.reset();
this.emit('connect');
this.setTimeoutPing();
};
this.ws.onclose = () => {
clearTimeout(this.timeoutConnect);
clearTimeout(this.timeoutPing);
if (!this.closing) {
this.emit('disconnect');
this.retry();
}
this.closing = false;
};
this.ws.onerror = () => {
clearTimeout(this.timeoutConnect);
clearTimeout(this.timeoutPing);
this.closing = true;
this.ws.close();
this.retry();
};
this.ws.onmessage = (e) => {
this.setTimeoutPing();
var msg = JSON.parse(e.data);
if (msg.type === 'ping') {
this.send('pong');
}
this.emit(msg.type, msg.response);
};
}
retry() {
setTimeout(() => this.connect(), this.backoff.duration());
}
send(type, data) {
this.ws.send(JSON.stringify({ type, request: data }));
}
setTimeoutPing() {
clearTimeout(this.timeoutPing);
this.timeoutPing = setTimeout(() => {
this.emit('disconnect');
this.closing = true;
this.ws.close();
this.connect();
}, this.pingTimeout);
}
}
module.exports = new Socket();

View File

@ -4,8 +4,6 @@ import (
"encoding/json"
"sync"
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/gorilla/websocket"
"github.com/khlieng/name_pending/storage"
)
@ -56,18 +54,14 @@ func (s *Session) numIRC() int {
return n
}
func (s *Session) setWS(addr string, ws *websocket.Conn) {
socket := NewWebSocket(ws)
go socket.write()
func (s *Session) setWS(addr string, w *WebSocket) {
s.wsLock.Lock()
s.ws[addr] = socket
s.ws[addr] = w
s.wsLock.Unlock()
}
func (s *Session) deleteWS(addr string) {
s.wsLock.Lock()
s.ws[addr].close()
delete(s.ws, addr)
s.wsLock.Unlock()
}

View File

@ -1,6 +1,8 @@
package server
import (
"time"
"github.com/khlieng/name_pending/Godeps/_workspace/src/github.com/gorilla/websocket"
)
@ -18,8 +20,22 @@ func NewWebSocket(ws *websocket.Conn) *WebSocket {
}
func (w *WebSocket) write() {
var err error
ping := time.Tick(20 * time.Second)
for {
err := w.conn.WriteMessage(websocket.TextMessage, <-w.Out)
select {
case msg, ok := <-w.Out:
if !ok {
return
}
err = w.conn.WriteMessage(websocket.TextMessage, msg)
case <-ping:
err = w.conn.WriteJSON(WSResponse{Type: "ping"})
}
if err != nil {
return
}

View File

@ -18,6 +18,8 @@ func handleWS(ws *websocket.Conn) {
var req WSRequest
addr := ws.RemoteAddr().String()
w := NewWebSocket(ws)
go w.write()
log.Println(addr, "connected")
@ -28,6 +30,8 @@ func handleWS(ws *websocket.Conn) {
session.deleteWS(addr)
}
w.close()
log.Println(addr, "disconnected")
return
}
@ -73,7 +77,7 @@ func handleWS(ws *websocket.Conn) {
go session.write()
}
session.setWS(addr, ws)
session.setWS(addr, w)
case "connect":
var data Connect