Handle IRC client closing better
This commit is contained in:
parent
947823ab41
commit
6865bf2832
@ -15,6 +15,7 @@ func testClientSend() (*Client, chan string) {
|
|||||||
c := testClient()
|
c := testClient()
|
||||||
conn := &mockConn{hook: make(chan string, 16)}
|
conn := &mockConn{hook: make(chan string, 16)}
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
|
c.sendRecv.Add(1)
|
||||||
go c.send()
|
go c.send()
|
||||||
return c, conn.hook
|
return c, conn.hook
|
||||||
}
|
}
|
||||||
|
131
irc/conn.go
131
irc/conn.go
@ -45,6 +45,61 @@ func (c *Client) writef(format string, a ...interface{}) {
|
|||||||
fmt.Fprintf(c.conn, format+"\r\n", a...)
|
fmt.Fprintf(c.conn, format+"\r\n", a...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) run() {
|
||||||
|
c.tryConnect()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.quit:
|
||||||
|
if c.Connected() {
|
||||||
|
c.disconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
c.sendRecv.Wait()
|
||||||
|
close(c.Messages)
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-c.reconnect:
|
||||||
|
c.disconnect()
|
||||||
|
|
||||||
|
c.sendRecv.Wait()
|
||||||
|
c.reconnect = make(chan struct{})
|
||||||
|
c.once.Reset()
|
||||||
|
|
||||||
|
c.tryConnect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) disconnect() {
|
||||||
|
c.ConnectionChanged <- false
|
||||||
|
c.lock.Lock()
|
||||||
|
c.connected = false
|
||||||
|
c.lock.Unlock()
|
||||||
|
|
||||||
|
c.once.Do(c.ready.Done)
|
||||||
|
c.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) tryConnect() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.connect()
|
||||||
|
if err == nil {
|
||||||
|
c.backoff.Reset()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(c.backoff.Duration())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) connect() error {
|
func (c *Client) connect() error {
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
@ -72,52 +127,14 @@ func (c *Client) connect() error {
|
|||||||
c.register()
|
c.register()
|
||||||
|
|
||||||
c.ready.Add(1)
|
c.ready.Add(1)
|
||||||
|
c.sendRecv.Add(2)
|
||||||
go c.send()
|
go c.send()
|
||||||
go c.recv()
|
go c.recv()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) tryConnect() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.quit:
|
|
||||||
return
|
|
||||||
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.connect()
|
|
||||||
if err == nil {
|
|
||||||
c.backoff.Reset()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(c.backoff.Duration())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) run() {
|
|
||||||
c.tryConnect()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.quit:
|
|
||||||
c.close()
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-c.reconnect:
|
|
||||||
c.sendRecv.Wait()
|
|
||||||
c.reconnect = make(chan struct{})
|
|
||||||
c.once.Reset()
|
|
||||||
|
|
||||||
c.tryConnect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) send() {
|
func (c *Client) send() {
|
||||||
c.sendRecv.Add(1)
|
|
||||||
defer c.sendRecv.Done()
|
defer c.sendRecv.Done()
|
||||||
|
|
||||||
c.ready.Wait()
|
c.ready.Wait()
|
||||||
@ -140,11 +157,6 @@ func (c *Client) send() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) recv() {
|
func (c *Client) recv() {
|
||||||
defer func() {
|
|
||||||
recover()
|
|
||||||
}()
|
|
||||||
|
|
||||||
c.sendRecv.Add(1)
|
|
||||||
defer c.sendRecv.Done()
|
defer c.sendRecv.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -155,21 +167,19 @@ func (c *Client) recv() {
|
|||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
c.ConnectionChanged <- false
|
|
||||||
c.lock.Lock()
|
|
||||||
c.connected = false
|
|
||||||
c.lock.Unlock()
|
|
||||||
|
|
||||||
c.once.Do(c.ready.Done)
|
|
||||||
c.conn.Close()
|
|
||||||
|
|
||||||
close(c.reconnect)
|
close(c.reconnect)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := parseMessage(line)
|
msg := parseMessage(line)
|
||||||
c.Messages <- msg
|
|
||||||
|
select {
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
|
||||||
|
case c.Messages <- msg:
|
||||||
|
}
|
||||||
|
|
||||||
switch msg.Command {
|
switch msg.Command {
|
||||||
case Ping:
|
case Ping:
|
||||||
@ -180,18 +190,3 @@ func (c *Client) recv() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) close() {
|
|
||||||
if c.Connected() {
|
|
||||||
c.ConnectionChanged <- false
|
|
||||||
c.lock.Lock()
|
|
||||||
c.connected = false
|
|
||||||
c.lock.Unlock()
|
|
||||||
|
|
||||||
c.once.Do(c.ready.Done)
|
|
||||||
c.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
close(c.out)
|
|
||||||
close(c.Messages)
|
|
||||||
}
|
|
||||||
|
@ -132,27 +132,14 @@ func TestRecv(t *testing.T) {
|
|||||||
c.reader = bufio.NewReader(buf)
|
c.reader = bufio.NewReader(buf)
|
||||||
|
|
||||||
c.ready.Add(1)
|
c.ready.Add(1)
|
||||||
|
c.sendRecv.Add(2)
|
||||||
go c.send()
|
go c.send()
|
||||||
close(c.quit)
|
|
||||||
go c.recv()
|
go c.recv()
|
||||||
|
|
||||||
assert.Equal(t, "PONG :test\r\n", <-conn.hook)
|
assert.Equal(t, "PONG :test\r\n", <-conn.hook)
|
||||||
assert.Equal(t, &Message{Command: "CMD"}, <-c.Messages)
|
assert.Equal(t, &Message{Command: "CMD"}, <-c.Messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRecvRecoversPanic(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
assert.Nil(t, recover())
|
|
||||||
}()
|
|
||||||
|
|
||||||
c := testClient()
|
|
||||||
|
|
||||||
buf := bytes.NewBuffer([]byte("CMD\r\n"))
|
|
||||||
c.reader = bufio.NewReader(buf)
|
|
||||||
close(c.Messages)
|
|
||||||
c.recv()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRecvTriggersReconnect(t *testing.T) {
|
func TestRecvTriggersReconnect(t *testing.T) {
|
||||||
c := testClient()
|
c := testClient()
|
||||||
c.conn = &mockConn{}
|
c.conn = &mockConn{}
|
||||||
@ -161,6 +148,7 @@ func TestRecvTriggersReconnect(t *testing.T) {
|
|||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
ok := false
|
ok := false
|
||||||
go func() {
|
go func() {
|
||||||
|
c.sendRecv.Add(1)
|
||||||
c.recv()
|
c.recv()
|
||||||
_, ok = <-c.reconnect
|
_, ok = <-c.reconnect
|
||||||
close(done)
|
close(done)
|
||||||
@ -178,15 +166,16 @@ func TestRecvTriggersReconnect(t *testing.T) {
|
|||||||
|
|
||||||
func TestClose(t *testing.T) {
|
func TestClose(t *testing.T) {
|
||||||
c := testClient()
|
c := testClient()
|
||||||
c.close()
|
close(c.quit)
|
||||||
ok := false
|
ok := false
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
_, ok = <-c.out
|
|
||||||
_, ok = <-c.Messages
|
_, ok = <-c.Messages
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
c.run()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
assert.False(t, ok)
|
assert.False(t, ok)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user