Update ttrpc to d4528379866b0ce7e9d71f3eb96f0582fc

Contains the OnClose method for the client

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-02-06 10:27:36 -05:00
parent aa49e704e2
commit 9745a4d448
4 changed files with 23 additions and 12 deletions

View File

@ -40,5 +40,5 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/dmcgowan/go-tar go1.10 github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc d2710463e497617f16f26d1e715a3308609e7982 github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577
github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"io" "io"
"net"
"sync" "sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -60,16 +61,18 @@ func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
var buffers sync.Pool var buffers sync.Pool
type channel struct { type channel struct {
conn net.Conn
bw *bufio.Writer bw *bufio.Writer
br *bufio.Reader br *bufio.Reader
hrbuf [messageHeaderLength]byte // avoid alloc when reading header hrbuf [messageHeaderLength]byte // avoid alloc when reading header
hwbuf [messageHeaderLength]byte hwbuf [messageHeaderLength]byte
} }
func newChannel(w io.Writer, r io.Reader) *channel { func newChannel(conn net.Conn) *channel {
return &channel{ return &channel{
bw: bufio.NewWriter(w), conn: conn,
br: bufio.NewReader(r), bw: bufio.NewWriter(conn),
br: bufio.NewReader(conn),
} }
} }

View File

@ -27,6 +27,7 @@ type Client struct {
closed chan struct{} closed chan struct{}
closeOnce sync.Once closeOnce sync.Once
closeFunc func()
done chan struct{} done chan struct{}
err error err error
} }
@ -35,10 +36,11 @@ func NewClient(conn net.Conn) *Client {
c := &Client{ c := &Client{
codec: codec{}, codec: codec{},
conn: conn, conn: conn,
channel: newChannel(conn, conn), channel: newChannel(conn),
calls: make(chan *callRequest), calls: make(chan *callRequest),
closed: make(chan struct{}), closed: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
closeFunc: func() {},
} }
go c.run() go c.run()
@ -113,6 +115,11 @@ func (c *Client) Close() error {
return nil return nil
} }
// OnClose allows a close func to be called when the server is closed
func (c *Client) OnClose(closer func()) {
c.closeFunc = closer
}
type message struct { type message struct {
messageHeader messageHeader
p []byte p []byte
@ -158,6 +165,7 @@ func (c *Client) run() {
defer c.conn.Close() defer c.conn.Close()
defer close(c.done) defer close(c.done)
defer c.closeFunc()
for { for {
select { select {
@ -199,7 +207,7 @@ func (c *Client) run() {
} }
// broadcast the shutdown error to the remaining waiters. // broadcast the shutdown error to the remaining waiters.
for _, waiter := range waiters { for _, waiter := range waiters {
waiter.errs <- shutdownErr waiter.errs <- c.err
} }
return return
} }

View File

@ -281,7 +281,7 @@ func (c *serverConn) run(sctx context.Context) {
) )
var ( var (
ch = newChannel(c.conn, c.conn) ch = newChannel(c.conn)
ctx, cancel = context.WithCancel(sctx) ctx, cancel = context.WithCancel(sctx)
active int active int
state connState = connStateIdle state connState = connStateIdle