update influxdb to v1.1.1 and change client to v2

This commit is contained in:
Karol Kraśkiewicz
2017-03-10 23:09:30 +01:00
parent 0ea3e9a2c1
commit 2a270e8f18
16 changed files with 1527 additions and 301 deletions

View File

@@ -181,7 +181,7 @@ func (c *Client) Query(q Query) (*Response, error) {
}
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
@@ -387,22 +387,31 @@ func (c *Client) Ping() (time.Duration, string, error) {
// Structs
// Message represents a user message.
type Message struct {
Level string `json:"level,omitempty"`
Text string `json:"text,omitempty"`
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Err error
Series []models.Row
Messages []*Message
Err error
}
// MarshalJSON encodes the result into JSON.
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Series []models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
Series []models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
o.Series = r.Series
o.Messages = r.Messages
if r.Err != nil {
o.Err = r.Err.Error()
}
@@ -413,8 +422,9 @@ func (r *Result) MarshalJSON() ([]byte, error) {
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Series []models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
Series []models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Err string `json:"error,omitempty"`
}
dec := json.NewDecoder(bytes.NewBuffer(b))
@@ -424,6 +434,7 @@ func (r *Result) UnmarshalJSON(b []byte) error {
return err
}
r.Series = o.Series
r.Messages = o.Messages
if o.Err != "" {
r.Err = errors.New(o.Err)
}
@@ -487,17 +498,36 @@ func (r *Response) Error() error {
return nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}
func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
dec := json.NewDecoder(r)
dec.UseNumber()
return &ChunkedResponse{dec: dec}
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}
// NextResponse reads the next line of the stream and returns a response.
@@ -507,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) {
if err == io.EOF {
return nil, nil
}
return nil, err
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}
r.buf.Reset()
return &response, nil
}
@@ -551,7 +586,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
// MarshalString renders string representation of a Point with specified
// precision. The default precision is nanoseconds.
func (p *Point) MarshalString() string {
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
if err != nil {
return "# ERROR: " + err.Error() + " " + p.Measurement
}