Bumped influxdb to 0.12.2 in Godeps

This commit is contained in:
Piotr Szczesniak
2016-08-26 14:28:05 +02:00
parent 2fb43eb68c
commit 323f2aa0db
99 changed files with 1431 additions and 35206 deletions

View File

@@ -2,28 +2,88 @@ package client
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdata/influxdb/models"
)
const (
// DefaultHost is the default host used to connect to an InfluxDB instance
DefaultHost = "localhost"
// DefaultPort is the default port used to connect to an InfluxDB instance
DefaultPort = 8086
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Query is used to send a command to the server. Both Command and Database are required.
type Query struct {
Command string
Database string
// Chunked tells the server to send back chunked responses. This places
// less load on the server by sending back chunks of the response rather
// than waiting for the entire response all at once.
Chunked bool
// ChunkSize sets the maximum number of rows that will be returned per
// chunk. Chunks are either divided based on their series or if they hit
// the chunk size limit.
//
// Chunked must be set to true for this option to be used.
ChunkSize int
}
// ParseConnectionString will parse a string to create a valid connection URL
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
var host string
var port int
h, p, err := net.SplitHostPort(path)
if err != nil {
if path == "" {
host = DefaultHost
} else {
host = path
}
// If they didn't specify a port, always use the default port
port = DefaultPort
} else {
host = h
port, err = strconv.Atoi(p)
if err != nil {
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err)
}
}
u := url.URL{
Scheme: "http",
}
if ssl {
u.Scheme = "https"
}
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
return u, nil
}
// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
// Username/Password are optional. They will be passed via basic auth if provided.
// UserAgent: If not provided, will default "InfluxDBClient",
// Timeout: If not provided, will default to 0 (no timeout)
type Config struct {
@@ -32,6 +92,15 @@ type Config struct {
Password string
UserAgent string
Timeout time.Duration
Precision string
UnsafeSsl bool
}
// NewConfig will create a config to be used in connecting to the client
func NewConfig() Config {
return Config{
Timeout: DefaultTimeout,
}
}
// Client is used to make calls to the server.
@@ -41,23 +110,40 @@ type Client struct {
password string
httpClient *http.Client
userAgent string
precision string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
// ConsistencyOne requires at least one data node acknowledged a write.
ConsistencyOne = "one"
// ConsistencyAll requires all data nodes to acknowledge a write.
ConsistencyAll = "all"
// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: c.UnsafeSsl,
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
}
client := Client{
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{Timeout: c.Timeout},
httpClient: &http.Client{Timeout: c.Timeout, Transport: tr},
userAgent: c.UserAgent,
precision: c.Precision,
}
if client.userAgent == "" {
client.userAgent = "InfluxDBClient"
@@ -71,6 +157,11 @@ func (c *Client) SetAuth(u, p string) {
c.password = p
}
// SetPrecision will update the precision
func (c *Client) SetPrecision(precision string) {
c.precision = precision
}
// Query sends a command to the server and returns the Response
func (c *Client) Query(q Query) (*Response, error) {
u := c.url
@@ -79,6 +170,15 @@ func (c *Client) Query(q Query) (*Response, error) {
values := u.Query()
values.Set("q", q.Command)
values.Set("db", q.Database)
if q.Chunked {
values.Set("chunked", "true")
if q.ChunkSize > 0 {
values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if c.precision != "" {
values.Set("epoch", c.precision)
}
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
@@ -97,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) {
defer resp.Body.Close()
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != nil {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
if err := dec.Decode(&response); err != nil {
// Ignore EOF errors if we got an invalid status code.
if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
return nil, err
}
}
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, decErr
}
// If we don't have an error in our json response, and didn't get statusOK, then send back an error
// If we don't have an error in our json response, and didn't get StatusOK,
// then send back an error.
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
}
@@ -120,10 +239,15 @@ func (c *Client) Query(q Query) (*Response, error) {
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
c.url.Path = "write"
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
err := checkPointTypes(p)
if err != nil {
return nil, err
}
if p.Raw != "" {
if _, err := b.WriteString(p.Raw); err != nil {
return nil, err
@@ -146,7 +270,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
}
}
req, err := http.NewRequest("POST", c.url.String(), &b)
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
@@ -155,11 +279,17 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
precision := bp.Precision
if precision == "" {
precision = c.precision
}
params := req.URL.Query()
params.Add("db", bp.Database)
params.Add("rp", bp.RetentionPolicy)
params.Add("precision", bp.Precision)
params.Add("consistency", bp.WriteConsistency)
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
@@ -170,7 +300,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
if err != nil {
return nil, err
}
@@ -183,6 +313,52 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
return nil, nil
}
// WriteLineProtocol takes a string with line returns to delimit each write
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
u := c.url
u.Path = "write"
r := strings.NewReader(data)
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", database)
params.Set("rp", retentionPolicy)
params.Set("precision", precision)
params.Set("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
err := fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
// Ping will check to see if the server is up
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
@@ -209,39 +385,11 @@ func (c *Client) Ping() (time.Duration, string, error) {
return time.Since(now), version, nil
}
// Dump connects to server and retrieves all data stored for specified database.
// If successful, Dump returns the entire response body, which is an io.ReadCloser
func (c *Client) Dump(db string) (io.ReadCloser, error) {
u := c.url
u.Path = "dump"
values := u.Query()
values.Set("db", db)
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return resp.Body, fmt.Errorf("HTTP Protocol error %d", resp.StatusCode)
}
return resp.Body, nil
}
// Structs
// Result represents a resultset returned from a single statement.
type Result struct {
Series []influxql.Row
Series []models.Row
Err error
}
@@ -249,8 +397,8 @@ type Result struct {
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Series []influxql.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
Series []models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
@@ -265,8 +413,8 @@ func (r *Result) MarshalJSON() ([]byte, error) {
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Series []influxql.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
Series []models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
}
dec := json.NewDecoder(bytes.NewBuffer(b))
@@ -327,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error {
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r Response) Error() error {
func (r *Response) Error() error {
if r.Err != nil {
return r.Err
}
@@ -339,6 +487,31 @@ func (r Response) Error() error {
return nil
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
dec := json.NewDecoder(r)
dec.UseNumber()
return &ChunkedResponse{dec: dec}
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
return &response, nil
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
@@ -375,8 +548,17 @@ func (p *Point) MarshalJSON() ([]byte, error) {
return json.Marshal(&point)
}
// MarshalString renders string representation of a Point with specified
// precision. The default precision is nanoseconds.
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String()
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
if err != nil {
return "# ERROR: " + err.Error() + " " + p.Measurement
}
if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" {
return pt.String()
}
return pt.PrecisionString(p.Precision)
}
// UnmarshalJSON decodes the data into the Point struct
@@ -459,7 +641,7 @@ func normalizeFields(fields map[string]interface{}) map[string]interface{} {
// BatchPoints is used to send batched data in a single write.
// Database and Points are required
// If no retention policy is specified, it will use the databases default retention policy.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored.
// If time is specified, it will be applied to any point with an empty time.
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
@@ -537,6 +719,19 @@ func (c *Client) Addr() string {
return c.url.String()
}
// checkPointTypes ensures no unsupported types are submitted to influxdb, returning error if they are found.
func checkPointTypes(p Point) error {
for _, v := range p.Fields {
switch v.(type) {
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, float32, float64, bool, string, nil:
return nil
default:
return fmt.Errorf("unsupported point type: %T", v)
}
}
return nil
}
// helper functions
// EpochToTime takes a unix epoch time and uses precision to return back a time.Time