Update vendor dir and Godeps.json with new Godep
This commit is contained in:
29
vendor/github.com/google/cadvisor/storage/bigquery/README.md
generated
vendored
29
vendor/github.com/google/cadvisor/storage/bigquery/README.md
generated
vendored
@@ -1,29 +0,0 @@
|
||||
BigQuery Storage Driver
|
||||
=======
|
||||
|
||||
[EXPERIMENTAL] Support for BigQuery backend as cAdvisor storage driver.
|
||||
The current implementation takes bunch of BigQuery specific flags for authentication.
|
||||
These will be merged into a single backend config.
|
||||
|
||||
To run the current version, following flags need to be specified:
|
||||
```
|
||||
# Storage driver to use.
|
||||
-storage_driver=bigquery
|
||||
|
||||
# Information about server-to-server Oauth token.
|
||||
# These can be obtained by creating a Service Account client id under `Google Developer API`
|
||||
|
||||
# service client id
|
||||
-bq_id="XYZ.apps.googleusercontent.com"
|
||||
|
||||
# service email address
|
||||
-bq_account="ABC@developer.gserviceaccount.com"
|
||||
|
||||
# path to pem key (converted from p12 file)
|
||||
-bq_credentials_file="/path/to/key.pem"
|
||||
|
||||
# project id to use for storing datasets.
|
||||
-bq_project_id="awesome_project"
|
||||
```
|
||||
|
||||
See [Service account Authentication](https://developers.google.com/accounts/docs/OAuth2) for Oauth related details.
|
312
vendor/github.com/google/cadvisor/storage/bigquery/bigquery.go
generated
vendored
312
vendor/github.com/google/cadvisor/storage/bigquery/bigquery.go
generated
vendored
@@ -1,312 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package bigquery
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/storage/bigquery/client"
|
||||
|
||||
bigquery "google.golang.org/api/bigquery/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("bigquery", new)
|
||||
}
|
||||
|
||||
type bigqueryStorage struct {
|
||||
client *client.Client
|
||||
machineName string
|
||||
}
|
||||
|
||||
const (
|
||||
// Bigquery schema types
|
||||
typeTimestamp string = "TIMESTAMP"
|
||||
typeString string = "STRING"
|
||||
typeInteger string = "INTEGER"
|
||||
|
||||
colTimestamp string = "timestamp"
|
||||
colMachineName string = "machine"
|
||||
colContainerName string = "container_name"
|
||||
colCpuCumulativeUsage string = "cpu_cumulative_usage"
|
||||
// Cumulative Cpu usage in system and user mode
|
||||
colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system"
|
||||
colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user"
|
||||
// Memory usage
|
||||
colMemoryUsage string = "memory_usage"
|
||||
// Working set size
|
||||
colMemoryWorkingSet string = "memory_working_set"
|
||||
// Container page fault
|
||||
colMemoryContainerPgfault string = "memory_container_pgfault"
|
||||
// Constainer major page fault
|
||||
colMemoryContainerPgmajfault string = "memory_container_pgmajfault"
|
||||
// Hierarchical page fault
|
||||
colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault"
|
||||
// Hierarchical major page fault
|
||||
colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault"
|
||||
// Cumulative count of bytes received.
|
||||
colRxBytes string = "rx_bytes"
|
||||
// Cumulative count of receive errors encountered.
|
||||
colRxErrors string = "rx_errors"
|
||||
// Cumulative count of bytes transmitted.
|
||||
colTxBytes string = "tx_bytes"
|
||||
// Cumulative count of transmit errors encountered.
|
||||
colTxErrors string = "tx_errors"
|
||||
// Filesystem device.
|
||||
colFsDevice = "fs_device"
|
||||
// Filesystem limit.
|
||||
colFsLimit = "fs_limit"
|
||||
// Filesystem available space.
|
||||
colFsUsage = "fs_usage"
|
||||
)
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStorage(
|
||||
hostname,
|
||||
*storage.ArgDbTable,
|
||||
*storage.ArgDbName,
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(jnagal): Infer schema through reflection. (See bigquery/client/example)
|
||||
func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema {
|
||||
fields := make([]*bigquery.TableFieldSchema, 19)
|
||||
i := 0
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeTimestamp,
|
||||
Name: colTimestamp,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeString,
|
||||
Name: colMachineName,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeString,
|
||||
Name: colContainerName,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsage,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsageSystem,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsageUser,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryUsage,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryWorkingSet,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryContainerPgfault,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryContainerPgmajfault,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryHierarchicalPgfault,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryHierarchicalPgmajfault,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colRxBytes,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colRxErrors,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colTxBytes,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colTxErrors,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeString,
|
||||
Name: colFsDevice,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colFsLimit,
|
||||
}
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colFsUsage,
|
||||
}
|
||||
return &bigquery.TableSchema{
|
||||
Fields: fields,
|
||||
}
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) containerStatsToRows(
|
||||
ref info.ContainerReference,
|
||||
stats *info.ContainerStats,
|
||||
) (row map[string]interface{}) {
|
||||
row = make(map[string]interface{})
|
||||
|
||||
// Timestamp
|
||||
row[colTimestamp] = stats.Timestamp
|
||||
|
||||
// Machine name
|
||||
row[colMachineName] = self.machineName
|
||||
|
||||
// Container name
|
||||
name := ref.Name
|
||||
if len(ref.Aliases) > 0 {
|
||||
name = ref.Aliases[0]
|
||||
}
|
||||
row[colContainerName] = name
|
||||
|
||||
// Cumulative Cpu Usage
|
||||
row[colCpuCumulativeUsage] = stats.Cpu.Usage.Total
|
||||
|
||||
// Cumulative Cpu Usage in system mode
|
||||
row[colCpuCumulativeUsageSystem] = stats.Cpu.Usage.System
|
||||
|
||||
// Cumulative Cpu Usage in user mode
|
||||
row[colCpuCumulativeUsageUser] = stats.Cpu.Usage.User
|
||||
|
||||
// Memory Usage
|
||||
row[colMemoryUsage] = stats.Memory.Usage
|
||||
|
||||
// Working set size
|
||||
row[colMemoryWorkingSet] = stats.Memory.WorkingSet
|
||||
|
||||
// container page fault
|
||||
row[colMemoryContainerPgfault] = stats.Memory.ContainerData.Pgfault
|
||||
|
||||
// container major page fault
|
||||
row[colMemoryContainerPgmajfault] = stats.Memory.ContainerData.Pgmajfault
|
||||
|
||||
// hierarchical page fault
|
||||
row[colMemoryHierarchicalPgfault] = stats.Memory.HierarchicalData.Pgfault
|
||||
|
||||
// hierarchical major page fault
|
||||
row[colMemoryHierarchicalPgmajfault] = stats.Memory.HierarchicalData.Pgmajfault
|
||||
|
||||
// Network stats.
|
||||
row[colRxBytes] = stats.Network.RxBytes
|
||||
row[colRxErrors] = stats.Network.RxErrors
|
||||
row[colTxBytes] = stats.Network.TxBytes
|
||||
row[colTxErrors] = stats.Network.TxErrors
|
||||
|
||||
// TODO(jnagal): Handle per-cpu stats.
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) containerFilesystemStatsToRows(
|
||||
ref info.ContainerReference,
|
||||
stats *info.ContainerStats,
|
||||
) (rows []map[string]interface{}) {
|
||||
for _, fsStat := range stats.Filesystem {
|
||||
row := make(map[string]interface{}, 0)
|
||||
row[colFsDevice] = fsStat.Device
|
||||
row[colFsLimit] = fsStat.Limit
|
||||
row[colFsUsage] = fsStat.Usage
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return rows
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
rows := make([]map[string]interface{}, 0)
|
||||
rows = append(rows, self.containerStatsToRows(ref, stats))
|
||||
rows = append(rows, self.containerFilesystemStatsToRows(ref, stats)...)
|
||||
for _, row := range rows {
|
||||
err := self.client.InsertRow(row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) Close() error {
|
||||
self.client.Close()
|
||||
self.client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a new bigquery storage driver.
|
||||
// machineName: A unique identifier to identify the host that current cAdvisor
|
||||
// instance is running on.
|
||||
// tableName: BigQuery table used for storing stats.
|
||||
func newStorage(machineName, datasetId, tableName string) (storage.StorageDriver, error) {
|
||||
bqClient, err := client.NewClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = bqClient.CreateDataset(datasetId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := &bigqueryStorage{
|
||||
client: bqClient,
|
||||
machineName: machineName,
|
||||
}
|
||||
schema := ret.GetSchema()
|
||||
err = bqClient.CreateTable(tableName, schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret, nil
|
||||
}
|
233
vendor/github.com/google/cadvisor/storage/bigquery/client/client.go
generated
vendored
233
vendor/github.com/google/cadvisor/storage/bigquery/client/client.go
generated
vendored
@@ -1,233 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
bigquery "google.golang.org/api/bigquery/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO(jnagal): Condense all flags to an identity file and a pem key file.
|
||||
clientId = flag.String("bq_id", "", "Client ID")
|
||||
clientSecret = flag.String("bq_secret", "notasecret", "Client Secret")
|
||||
projectId = flag.String("bq_project_id", "", "Bigquery project ID")
|
||||
serviceAccount = flag.String("bq_account", "", "Service account email")
|
||||
pemFile = flag.String("bq_credentials_file", "", "Credential Key file (pem)")
|
||||
)
|
||||
|
||||
const (
|
||||
errAlreadyExists string = "Error 409: Already Exists"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
service *bigquery.Service
|
||||
token *oauth2.Token
|
||||
datasetId string
|
||||
tableId string
|
||||
}
|
||||
|
||||
// Helper method to create an authenticated connection.
|
||||
func connect() (*oauth2.Token, *bigquery.Service, error) {
|
||||
if *clientId == "" {
|
||||
return nil, nil, fmt.Errorf("no client id specified")
|
||||
}
|
||||
if *serviceAccount == "" {
|
||||
return nil, nil, fmt.Errorf("no service account specified")
|
||||
}
|
||||
if *projectId == "" {
|
||||
return nil, nil, fmt.Errorf("no project id specified")
|
||||
}
|
||||
authScope := bigquery.BigqueryScope
|
||||
if *pemFile == "" {
|
||||
return nil, nil, fmt.Errorf("no credentials specified")
|
||||
}
|
||||
pemBytes, err := ioutil.ReadFile(*pemFile)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not access credential file %v - %v", pemFile, err)
|
||||
}
|
||||
|
||||
jwtConfig := &jwt.Config{
|
||||
Email: *serviceAccount,
|
||||
Scopes: []string{authScope},
|
||||
PrivateKey: pemBytes,
|
||||
TokenURL: "https://accounts.google.com/o/oauth2/token",
|
||||
}
|
||||
token, err := jwtConfig.TokenSource(oauth2.NoContext).Token()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !token.Valid() {
|
||||
return nil, nil, fmt.Errorf("invalid token for BigQuery oauth")
|
||||
}
|
||||
|
||||
config := &oauth2.Config{
|
||||
ClientID: *clientId,
|
||||
ClientSecret: *clientSecret,
|
||||
Scopes: []string{authScope},
|
||||
Endpoint: oauth2.Endpoint{
|
||||
AuthURL: "https://accounts.google.com/o/oauth2/auth",
|
||||
TokenURL: "https://accounts.google.com/o/oauth2/token",
|
||||
},
|
||||
}
|
||||
client := config.Client(oauth2.NoContext, token)
|
||||
|
||||
service, err := bigquery.New(client)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to create new service: %v\n", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return token, service, nil
|
||||
}
|
||||
|
||||
// Creates a new client instance with an authenticated connection to bigquery.
|
||||
func NewClient() (*Client, error) {
|
||||
token, service, err := connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := &Client{
|
||||
token: token,
|
||||
service: service,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
c.service = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper method to return the bigquery service connection.
|
||||
// Expired connection is refreshed.
|
||||
func (c *Client) getService() (*bigquery.Service, error) {
|
||||
if c.token == nil || c.service == nil {
|
||||
return nil, fmt.Errorf("service not initialized")
|
||||
}
|
||||
|
||||
// Refresh expired token.
|
||||
if !c.token.Valid() {
|
||||
token, service, err := connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.token = token
|
||||
c.service = service
|
||||
return service, nil
|
||||
}
|
||||
return c.service, nil
|
||||
}
|
||||
|
||||
func (c *Client) PrintDatasets() error {
|
||||
datasetList, err := c.service.Datasets.List(*projectId).Do()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to get list of datasets\n")
|
||||
return err
|
||||
} else {
|
||||
fmt.Printf("Successfully retrieved datasets. Retrieved: %d\n", len(datasetList.Datasets))
|
||||
}
|
||||
|
||||
for _, d := range datasetList.Datasets {
|
||||
fmt.Printf("%s %s\n", d.Id, d.FriendlyName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) CreateDataset(datasetId string) error {
|
||||
if c.service == nil {
|
||||
return fmt.Errorf("no service created")
|
||||
}
|
||||
_, err := c.service.Datasets.Insert(*projectId, &bigquery.Dataset{
|
||||
DatasetReference: &bigquery.DatasetReference{
|
||||
DatasetId: datasetId,
|
||||
ProjectId: *projectId,
|
||||
},
|
||||
}).Do()
|
||||
// TODO(jnagal): Do a Get() to verify dataset already exists.
|
||||
if err != nil && !strings.Contains(err.Error(), errAlreadyExists) {
|
||||
return err
|
||||
}
|
||||
c.datasetId = datasetId
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a table with provided table ID and schema.
|
||||
// Schema is currently not updated if the table already exists.
|
||||
func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error {
|
||||
if c.service == nil || c.datasetId == "" {
|
||||
return fmt.Errorf("no dataset created")
|
||||
}
|
||||
_, err := c.service.Tables.Get(*projectId, c.datasetId, tableId).Do()
|
||||
if err != nil {
|
||||
// Create a new table.
|
||||
_, err := c.service.Tables.Insert(*projectId, c.datasetId, &bigquery.Table{
|
||||
Schema: schema,
|
||||
TableReference: &bigquery.TableReference{
|
||||
DatasetId: c.datasetId,
|
||||
ProjectId: *projectId,
|
||||
TableId: tableId,
|
||||
},
|
||||
}).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// TODO(jnagal): Update schema if it has changed. We can only extend existing schema.
|
||||
c.tableId = tableId
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add a row to the connected table.
|
||||
func (c *Client) InsertRow(rowData map[string]interface{}) error {
|
||||
service, _ := c.getService()
|
||||
if service == nil || c.datasetId == "" || c.tableId == "" {
|
||||
return fmt.Errorf("table not setup to add rows")
|
||||
}
|
||||
jsonRows := make(map[string]bigquery.JsonValue)
|
||||
for key, value := range rowData {
|
||||
jsonRows[key] = bigquery.JsonValue(value)
|
||||
}
|
||||
rows := []*bigquery.TableDataInsertAllRequestRows{
|
||||
{
|
||||
Json: jsonRows,
|
||||
},
|
||||
}
|
||||
|
||||
// TODO(jnagal): Batch insert requests.
|
||||
insertRequest := &bigquery.TableDataInsertAllRequest{Rows: rows}
|
||||
|
||||
result, err := service.Tabledata.InsertAll(*projectId, c.datasetId, c.tableId, insertRequest).Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error inserting row: %v", err)
|
||||
}
|
||||
|
||||
if len(result.InsertErrors) > 0 {
|
||||
errstr := fmt.Sprintf("Insertion for %d rows failed\n", len(result.InsertErrors))
|
||||
for _, errors := range result.InsertErrors {
|
||||
for _, errorproto := range errors.Errors {
|
||||
errstr += fmt.Sprintf("Error inserting row %d: %+v\n", errors.Index, errorproto)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf(errstr)
|
||||
}
|
||||
return nil
|
||||
}
|
87
vendor/github.com/google/cadvisor/storage/bigquery/client/example/example.go
generated
vendored
87
vendor/github.com/google/cadvisor/storage/bigquery/client/example/example.go
generated
vendored
@@ -1,87 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SeanDolphin/bqschema"
|
||||
"github.com/google/cadvisor/storage/bigquery/client"
|
||||
)
|
||||
|
||||
type container struct {
|
||||
Name string `json:"name"`
|
||||
CpuUsage uint64 `json:"cpuusage,omitempty"`
|
||||
MemoryUsage uint64 `json:"memoryusage,omitempty"`
|
||||
NetworkUsage uint64 `json:"networkusage,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
c, err := client.NewClient()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to bigquery\n")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c.PrintDatasets()
|
||||
|
||||
// Create a new dataset.
|
||||
err = c.CreateDataset("sampledataset")
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to create dataset %v\n", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Create a new table
|
||||
containerData := container{
|
||||
Name: "test_container",
|
||||
CpuUsage: 123456,
|
||||
MemoryUsage: 1024,
|
||||
NetworkUsage: 9046,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
schema, err := bqschema.ToSchema(containerData)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to create schema")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = c.CreateTable("sampletable", schema)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to create table")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Add Data
|
||||
m := make(map[string]interface{})
|
||||
t := time.Now()
|
||||
for i := 0; i < 10; i++ {
|
||||
m["Name"] = containerData.Name
|
||||
m["CpuUsage"] = containerData.CpuUsage + uint64(i*100)
|
||||
m["MemoryUsage"] = containerData.MemoryUsage - uint64(i*10)
|
||||
m["NetworkUsage"] = containerData.NetworkUsage + uint64(i*10)
|
||||
m["Timestamp"] = t.Add(time.Duration(i) * time.Second)
|
||||
|
||||
err = c.InsertRow(m)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to insert row")
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
158
vendor/github.com/google/cadvisor/storage/elasticsearch/elasticsearch.go
generated
vendored
158
vendor/github.com/google/cadvisor/storage/elasticsearch/elasticsearch.go
generated
vendored
@@ -1,158 +0,0 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
storage "github.com/google/cadvisor/storage"
|
||||
|
||||
"gopkg.in/olivere/elastic.v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("elasticsearch", new)
|
||||
}
|
||||
|
||||
type elasticStorage struct {
|
||||
client *elastic.Client
|
||||
machineName string
|
||||
indexName string
|
||||
typeName string
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
type detailSpec struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
MachineName string `json:"machine_name,omitempty"`
|
||||
ContainerName string `json:"container_Name,omitempty"`
|
||||
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
|
||||
}
|
||||
|
||||
var (
|
||||
argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port")
|
||||
argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name")
|
||||
argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name")
|
||||
argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically")
|
||||
)
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStorage(
|
||||
hostname,
|
||||
*argIndexName,
|
||||
*argTypeName,
|
||||
*argElasticHost,
|
||||
*argEnableSniffer,
|
||||
)
|
||||
}
|
||||
|
||||
func (self *elasticStorage) containerStatsAndDefaultValues(
|
||||
ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
|
||||
timestamp := stats.Timestamp.UnixNano() / 1E3
|
||||
var containerName string
|
||||
if len(ref.Aliases) > 0 {
|
||||
containerName = ref.Aliases[0]
|
||||
} else {
|
||||
containerName = ref.Name
|
||||
}
|
||||
detail := &detailSpec{
|
||||
Timestamp: timestamp,
|
||||
MachineName: self.machineName,
|
||||
ContainerName: containerName,
|
||||
ContainerStats: stats,
|
||||
}
|
||||
return detail
|
||||
}
|
||||
|
||||
func (self *elasticStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
func() {
|
||||
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
// Add some default params based on ContainerStats
|
||||
detail := self.containerStatsAndDefaultValues(ref, stats)
|
||||
// Index a cadvisor (using JSON serialization)
|
||||
_, err := self.client.Index().
|
||||
Index(self.indexName).
|
||||
Type(self.typeName).
|
||||
BodyJson(detail).
|
||||
Do()
|
||||
if err != nil {
|
||||
// Handle error
|
||||
fmt.Printf("failed to write stats to ElasticSearch - %s", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *elasticStorage) Close() error {
|
||||
self.client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// machineName: A unique identifier to identify the host that current cAdvisor
|
||||
// instance is running on.
|
||||
// ElasticHost: The host which runs ElasticSearch.
|
||||
func newStorage(
|
||||
machineName,
|
||||
indexName,
|
||||
typeName,
|
||||
elasticHost string,
|
||||
enableSniffer bool,
|
||||
) (storage.StorageDriver, error) {
|
||||
// Obtain a client and connect to the default Elasticsearch installation
|
||||
// on 127.0.0.1:9200. Of course you can configure your client to connect
|
||||
// to other hosts and configure it in various other ways.
|
||||
client, err := elastic.NewClient(
|
||||
elastic.SetHealthcheck(true),
|
||||
elastic.SetSniff(enableSniffer),
|
||||
elastic.SetHealthcheckInterval(30*time.Second),
|
||||
elastic.SetURL(elasticHost),
|
||||
)
|
||||
if err != nil {
|
||||
// Handle error
|
||||
return nil, fmt.Errorf("failed to create the elasticsearch client - %s", err)
|
||||
}
|
||||
|
||||
// Ping the Elasticsearch server to get e.g. the version number
|
||||
info, code, err := client.Ping().URL(elasticHost).Do()
|
||||
if err != nil {
|
||||
// Handle error
|
||||
return nil, fmt.Errorf("failed to ping the elasticsearch - %s", err)
|
||||
|
||||
}
|
||||
fmt.Printf("Elasticsearch returned with code %d and version %s", code, info.Version.Number)
|
||||
|
||||
ret := &elasticStorage{
|
||||
client: client,
|
||||
machineName: machineName,
|
||||
indexName: indexName,
|
||||
typeName: typeName,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
369
vendor/github.com/google/cadvisor/storage/influxdb/influxdb.go
generated
vendored
369
vendor/github.com/google/cadvisor/storage/influxdb/influxdb.go
generated
vendored
@@ -1,369 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/version"
|
||||
|
||||
influxdb "github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("influxdb", new)
|
||||
}
|
||||
|
||||
type influxdbStorage struct {
|
||||
client *influxdb.Client
|
||||
machineName string
|
||||
database string
|
||||
retentionPolicy string
|
||||
bufferDuration time.Duration
|
||||
lastWrite time.Time
|
||||
points []*influxdb.Point
|
||||
lock sync.Mutex
|
||||
readyToFlush func() bool
|
||||
}
|
||||
|
||||
// Series names
|
||||
const (
|
||||
// Cumulative CPU usage
|
||||
serCpuUsageTotal string = "cpu_usage_total"
|
||||
serCpuUsageSystem string = "cpu_usage_system"
|
||||
serCpuUsageUser string = "cpu_usage_user"
|
||||
serCpuUsagePerCpu string = "cpu_usage_per_cpu"
|
||||
// Smoothed average of number of runnable threads x 1000.
|
||||
serLoadAverage string = "load_average"
|
||||
// Memory Usage
|
||||
serMemoryUsage string = "memory_usage"
|
||||
// Working set size
|
||||
serMemoryWorkingSet string = "memory_working_set"
|
||||
// Cumulative count of bytes received.
|
||||
serRxBytes string = "rx_bytes"
|
||||
// Cumulative count of receive errors encountered.
|
||||
serRxErrors string = "rx_errors"
|
||||
// Cumulative count of bytes transmitted.
|
||||
serTxBytes string = "tx_bytes"
|
||||
// Cumulative count of transmit errors encountered.
|
||||
serTxErrors string = "tx_errors"
|
||||
// Filesystem device.
|
||||
serFsDevice string = "fs_device"
|
||||
// Filesystem limit.
|
||||
serFsLimit string = "fs_limit"
|
||||
// Filesystem usage.
|
||||
serFsUsage string = "fs_usage"
|
||||
)
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStorage(
|
||||
hostname,
|
||||
*storage.ArgDbTable,
|
||||
*storage.ArgDbName,
|
||||
*storage.ArgDbUsername,
|
||||
*storage.ArgDbPassword,
|
||||
*storage.ArgDbHost,
|
||||
*storage.ArgDbIsSecure,
|
||||
*storage.ArgDbBufferDuration,
|
||||
)
|
||||
}
|
||||
|
||||
// Field names
|
||||
const (
|
||||
fieldValue string = "value"
|
||||
fieldType string = "type"
|
||||
fieldDevice string = "device"
|
||||
)
|
||||
|
||||
// Tag names
|
||||
const (
|
||||
tagMachineName string = "machine"
|
||||
tagContainerName string = "container_name"
|
||||
)
|
||||
|
||||
func (self *influxdbStorage) containerFilesystemStatsToPoints(
|
||||
ref info.ContainerReference,
|
||||
stats *info.ContainerStats) (points []*influxdb.Point) {
|
||||
if len(stats.Filesystem) == 0 {
|
||||
return points
|
||||
}
|
||||
for _, fsStat := range stats.Filesystem {
|
||||
tagsFsUsage := map[string]string{
|
||||
fieldDevice: fsStat.Device,
|
||||
fieldType: "usage",
|
||||
}
|
||||
fieldsFsUsage := map[string]interface{}{
|
||||
fieldValue: int64(fsStat.Usage),
|
||||
}
|
||||
pointFsUsage := &influxdb.Point{
|
||||
Measurement: serFsUsage,
|
||||
Tags: tagsFsUsage,
|
||||
Fields: fieldsFsUsage,
|
||||
}
|
||||
|
||||
tagsFsLimit := map[string]string{
|
||||
fieldDevice: fsStat.Device,
|
||||
fieldType: "limit",
|
||||
}
|
||||
fieldsFsLimit := map[string]interface{}{
|
||||
fieldValue: int64(fsStat.Limit),
|
||||
}
|
||||
pointFsLimit := &influxdb.Point{
|
||||
Measurement: serFsLimit,
|
||||
Tags: tagsFsLimit,
|
||||
Fields: fieldsFsLimit,
|
||||
}
|
||||
|
||||
points = append(points, pointFsUsage, pointFsLimit)
|
||||
}
|
||||
|
||||
self.tagPoints(ref, stats, points)
|
||||
|
||||
return points
|
||||
}
|
||||
|
||||
// Set tags and timestamp for all points of the batch.
|
||||
// Points should inherit the tags that are set for BatchPoints, but that does not seem to work.
|
||||
func (self *influxdbStorage) tagPoints(ref info.ContainerReference, stats *info.ContainerStats, points []*influxdb.Point) {
|
||||
// Use container alias if possible
|
||||
var containerName string
|
||||
if len(ref.Aliases) > 0 {
|
||||
containerName = ref.Aliases[0]
|
||||
} else {
|
||||
containerName = ref.Name
|
||||
}
|
||||
|
||||
commonTags := map[string]string{
|
||||
tagMachineName: self.machineName,
|
||||
tagContainerName: containerName,
|
||||
}
|
||||
for i := 0; i < len(points); i++ {
|
||||
// merge with existing tags if any
|
||||
addTagsToPoint(points[i], commonTags)
|
||||
points[i].Time = stats.Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) containerStatsToPoints(
|
||||
ref info.ContainerReference,
|
||||
stats *info.ContainerStats,
|
||||
) (points []*influxdb.Point) {
|
||||
// CPU usage: Total usage in nanoseconds
|
||||
points = append(points, makePoint(serCpuUsageTotal, stats.Cpu.Usage.Total))
|
||||
|
||||
// CPU usage: Time spend in system space (in nanoseconds)
|
||||
points = append(points, makePoint(serCpuUsageSystem, stats.Cpu.Usage.System))
|
||||
|
||||
// CPU usage: Time spent in user space (in nanoseconds)
|
||||
points = append(points, makePoint(serCpuUsageUser, stats.Cpu.Usage.User))
|
||||
|
||||
// CPU usage per CPU
|
||||
for i := 0; i < len(stats.Cpu.Usage.PerCpu); i++ {
|
||||
point := makePoint(serCpuUsagePerCpu, stats.Cpu.Usage.PerCpu[i])
|
||||
tags := map[string]string{"instance": fmt.Sprintf("%v", i)}
|
||||
addTagsToPoint(point, tags)
|
||||
|
||||
points = append(points, point)
|
||||
}
|
||||
|
||||
// Load Average
|
||||
points = append(points, makePoint(serLoadAverage, stats.Cpu.LoadAverage))
|
||||
|
||||
// Memory Usage
|
||||
points = append(points, makePoint(serMemoryUsage, stats.Memory.Usage))
|
||||
|
||||
// Working Set Size
|
||||
points = append(points, makePoint(serMemoryWorkingSet, stats.Memory.WorkingSet))
|
||||
|
||||
// Network Stats
|
||||
points = append(points, makePoint(serRxBytes, stats.Network.RxBytes))
|
||||
points = append(points, makePoint(serRxErrors, stats.Network.RxErrors))
|
||||
points = append(points, makePoint(serTxBytes, stats.Network.TxBytes))
|
||||
points = append(points, makePoint(serTxErrors, stats.Network.TxErrors))
|
||||
|
||||
self.tagPoints(ref, stats, points)
|
||||
|
||||
return points
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
||||
self.readyToFlush = readyToFlush
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) defaultReadyToFlush() bool {
|
||||
return time.Since(self.lastWrite) >= self.bufferDuration
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
var pointsToFlush []*influxdb.Point
|
||||
func() {
|
||||
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
|
||||
self.points = append(self.points, self.containerStatsToPoints(ref, stats)...)
|
||||
self.points = append(self.points, self.containerFilesystemStatsToPoints(ref, stats)...)
|
||||
if self.readyToFlush() {
|
||||
pointsToFlush = self.points
|
||||
self.points = make([]*influxdb.Point, 0)
|
||||
self.lastWrite = time.Now()
|
||||
}
|
||||
}()
|
||||
if len(pointsToFlush) > 0 {
|
||||
points := make([]influxdb.Point, len(pointsToFlush))
|
||||
for i, p := range pointsToFlush {
|
||||
points[i] = *p
|
||||
}
|
||||
|
||||
batchTags := map[string]string{tagMachineName: self.machineName}
|
||||
bp := influxdb.BatchPoints{
|
||||
Points: points,
|
||||
Database: self.database,
|
||||
Tags: batchTags,
|
||||
Time: stats.Timestamp,
|
||||
}
|
||||
response, err := self.client.Write(bp)
|
||||
if err != nil || checkResponseForErrors(response) != nil {
|
||||
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) Close() error {
|
||||
self.client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// machineName: A unique identifier to identify the host that current cAdvisor
|
||||
// instance is running on.
|
||||
// influxdbHost: The host which runs influxdb (host:port)
|
||||
func newStorage(
|
||||
machineName,
|
||||
tablename,
|
||||
database,
|
||||
username,
|
||||
password,
|
||||
influxdbHost string,
|
||||
isSecure bool,
|
||||
bufferDuration time.Duration,
|
||||
) (*influxdbStorage, error) {
|
||||
url := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: influxdbHost,
|
||||
}
|
||||
if isSecure {
|
||||
url.Scheme = "https"
|
||||
}
|
||||
|
||||
config := &influxdb.Config{
|
||||
URL: *url,
|
||||
Username: username,
|
||||
Password: password,
|
||||
UserAgent: fmt.Sprintf("%v/%v", "cAdvisor", version.Info["version"]),
|
||||
}
|
||||
client, err := influxdb.NewClient(*config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := &influxdbStorage{
|
||||
client: client,
|
||||
machineName: machineName,
|
||||
database: database,
|
||||
bufferDuration: bufferDuration,
|
||||
lastWrite: time.Now(),
|
||||
points: make([]*influxdb.Point, 0),
|
||||
}
|
||||
ret.readyToFlush = ret.defaultReadyToFlush
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Creates a measurement point with a single value field
|
||||
func makePoint(name string, value interface{}) *influxdb.Point {
|
||||
fields := map[string]interface{}{
|
||||
fieldValue: toSignedIfUnsigned(value),
|
||||
}
|
||||
|
||||
return &influxdb.Point{
|
||||
Measurement: name,
|
||||
Fields: fields,
|
||||
}
|
||||
}
|
||||
|
||||
// Adds additional tags to the existing tags of a point
|
||||
func addTagsToPoint(point *influxdb.Point, tags map[string]string) {
|
||||
if point.Tags == nil {
|
||||
point.Tags = tags
|
||||
} else {
|
||||
for k, v := range tags {
|
||||
point.Tags[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Checks response for possible errors
|
||||
func checkResponseForErrors(response *influxdb.Response) error {
|
||||
const msg = "failed to write stats to influxDb - %s"
|
||||
|
||||
if response != nil && response.Err != nil {
|
||||
return fmt.Errorf(msg, response.Err)
|
||||
}
|
||||
if response != nil && response.Results != nil {
|
||||
for _, result := range response.Results {
|
||||
if result.Err != nil {
|
||||
return fmt.Errorf(msg, result.Err)
|
||||
}
|
||||
if result.Series != nil {
|
||||
for _, row := range result.Series {
|
||||
if row.Err != nil {
|
||||
return fmt.Errorf(msg, row.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Some stats have type unsigned integer, but the InfluxDB client accepts only signed integers.
|
||||
func toSignedIfUnsigned(value interface{}) interface{} {
|
||||
switch v := value.(type) {
|
||||
case uint64:
|
||||
return int64(v)
|
||||
case uint32:
|
||||
return int32(v)
|
||||
case uint16:
|
||||
return int16(v)
|
||||
case uint8:
|
||||
return int8(v)
|
||||
case uint:
|
||||
return int(v)
|
||||
}
|
||||
return value
|
||||
}
|
114
vendor/github.com/google/cadvisor/storage/kafka/kafka.go
generated
vendored
114
vendor/github.com/google/cadvisor/storage/kafka/kafka.go
generated
vendored
@@ -1,114 +0,0 @@
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/utils/container"
|
||||
|
||||
kafka "github.com/Shopify/sarama"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("kafka", new)
|
||||
}
|
||||
|
||||
var (
|
||||
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
|
||||
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
|
||||
)
|
||||
|
||||
type kafkaStorage struct {
|
||||
producer kafka.AsyncProducer
|
||||
topic string
|
||||
machineName string
|
||||
}
|
||||
|
||||
type detailSpec struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
MachineName string `json:"machine_name,omitempty"`
|
||||
ContainerName string `json:"container_Name,omitempty"`
|
||||
ContainerID string `json:"container_Id,omitempty"`
|
||||
ContainerLabels map[string]string `json:"container_labels,omitempty"`
|
||||
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
|
||||
}
|
||||
|
||||
func (driver *kafkaStorage) infoToDetailSpec(ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
|
||||
timestamp := time.Now()
|
||||
containerID := ref.Id
|
||||
containerLabels := ref.Labels
|
||||
containerName := container.GetPreferredName(ref)
|
||||
|
||||
detail := &detailSpec{
|
||||
Timestamp: timestamp,
|
||||
MachineName: driver.machineName,
|
||||
ContainerName: containerName,
|
||||
ContainerID: containerID,
|
||||
ContainerLabels: containerLabels,
|
||||
ContainerStats: stats,
|
||||
}
|
||||
return detail
|
||||
}
|
||||
|
||||
func (driver *kafkaStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
detail := driver.infoToDetailSpec(ref, stats)
|
||||
b, err := json.Marshal(detail)
|
||||
|
||||
driver.producer.Input() <- &kafka.ProducerMessage{
|
||||
Topic: driver.topic,
|
||||
Value: kafka.StringEncoder(b),
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *kafkaStorage) Close() error {
|
||||
return self.producer.Close()
|
||||
}
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
machineName, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStorage(machineName)
|
||||
}
|
||||
|
||||
func newStorage(machineName string) (storage.StorageDriver, error) {
|
||||
config := kafka.NewConfig()
|
||||
config.Producer.RequiredAcks = kafka.WaitForAll
|
||||
|
||||
brokerList := strings.Split(*brokers, ",")
|
||||
glog.V(4).Infof("Kafka brokers:%q", brokers)
|
||||
|
||||
producer, err := kafka.NewAsyncProducer(brokerList, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := &kafkaStorage{
|
||||
producer: producer,
|
||||
topic: *topic,
|
||||
machineName: machineName,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
139
vendor/github.com/google/cadvisor/storage/redis/redis.go
generated
vendored
139
vendor/github.com/google/cadvisor/storage/redis/redis.go
generated
vendored
@@ -1,139 +0,0 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
storage "github.com/google/cadvisor/storage"
|
||||
|
||||
redis "github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("redis", new)
|
||||
}
|
||||
|
||||
type redisStorage struct {
|
||||
conn redis.Conn
|
||||
machineName string
|
||||
redisKey string
|
||||
bufferDuration time.Duration
|
||||
lastWrite time.Time
|
||||
lock sync.Mutex
|
||||
readyToFlush func() bool
|
||||
}
|
||||
|
||||
type detailSpec struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
MachineName string `json:"machine_name,omitempty"`
|
||||
ContainerName string `json:"container_Name,omitempty"`
|
||||
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
|
||||
}
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStorage(
|
||||
hostname,
|
||||
*storage.ArgDbName,
|
||||
*storage.ArgDbHost,
|
||||
*storage.ArgDbBufferDuration,
|
||||
)
|
||||
}
|
||||
|
||||
func (self *redisStorage) defaultReadyToFlush() bool {
|
||||
return time.Since(self.lastWrite) >= self.bufferDuration
|
||||
}
|
||||
|
||||
//We must add some default params (for example: MachineName,ContainerName...)because containerStats do not include them
|
||||
func (self *redisStorage) containerStatsAndDefaultValues(ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
|
||||
timestamp := stats.Timestamp.UnixNano() / 1E3
|
||||
var containerName string
|
||||
if len(ref.Aliases) > 0 {
|
||||
containerName = ref.Aliases[0]
|
||||
} else {
|
||||
containerName = ref.Name
|
||||
}
|
||||
detail := &detailSpec{
|
||||
Timestamp: timestamp,
|
||||
MachineName: self.machineName,
|
||||
ContainerName: containerName,
|
||||
ContainerStats: stats,
|
||||
}
|
||||
return detail
|
||||
}
|
||||
|
||||
//Push the data into redis
|
||||
func (self *redisStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
var seriesToFlush []byte
|
||||
func() {
|
||||
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
// Add some default params based on containerStats
|
||||
detail := self.containerStatsAndDefaultValues(ref, stats)
|
||||
//To json
|
||||
b, _ := json.Marshal(detail)
|
||||
if self.readyToFlush() {
|
||||
seriesToFlush = b
|
||||
self.lastWrite = time.Now()
|
||||
}
|
||||
}()
|
||||
if len(seriesToFlush) > 0 {
|
||||
//We use redis's "LPUSH" to push the data to the redis
|
||||
self.conn.Send("LPUSH", self.redisKey, seriesToFlush)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *redisStorage) Close() error {
|
||||
return self.conn.Close()
|
||||
}
|
||||
|
||||
// Create a new redis storage driver.
|
||||
// machineName: A unique identifier to identify the host that runs the current cAdvisor
|
||||
// instance is running on.
|
||||
// redisHost: The host which runs redis.
|
||||
// redisKey: The key for the Data that stored in the redis
|
||||
func newStorage(
|
||||
machineName,
|
||||
redisKey,
|
||||
redisHost string,
|
||||
bufferDuration time.Duration,
|
||||
) (storage.StorageDriver, error) {
|
||||
conn, err := redis.Dial("tcp", redisHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := &redisStorage{
|
||||
conn: conn,
|
||||
machineName: machineName,
|
||||
redisKey: redisKey,
|
||||
bufferDuration: bufferDuration,
|
||||
lastWrite: time.Now(),
|
||||
}
|
||||
ret.readyToFlush = ret.defaultReadyToFlush
|
||||
return ret, nil
|
||||
}
|
64
vendor/github.com/google/cadvisor/storage/statsd/client/client.go
generated
vendored
64
vendor/github.com/google/cadvisor/storage/statsd/client/client.go
generated
vendored
@@ -1,64 +0,0 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
HostPort string
|
||||
Namespace string
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func (self *Client) Open() error {
|
||||
conn, err := net.Dial("udp", self.HostPort)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to open udp connection to %q: %v", self.HostPort, err)
|
||||
return err
|
||||
}
|
||||
self.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Client) Close() error {
|
||||
self.conn.Close()
|
||||
self.conn = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Simple send to statsd daemon without sampling.
|
||||
func (self *Client) Send(namespace, containerName, key string, value uint64) error {
|
||||
// only send counter value
|
||||
formatted := fmt.Sprintf("%s.%s.%s:%d|g", namespace, containerName, key, value)
|
||||
_, err := fmt.Fprintf(self.conn, formatted)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("failed to send data %q: %v", formatted, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func New(hostPort string) (*Client, error) {
|
||||
Client := Client{HostPort: hostPort}
|
||||
if err := Client.Open(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client, nil
|
||||
}
|
136
vendor/github.com/google/cadvisor/storage/statsd/statsd.go
generated
vendored
136
vendor/github.com/google/cadvisor/storage/statsd/statsd.go
generated
vendored
@@ -1,136 +0,0 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package statsd
|
||||
|
||||
import (
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
client "github.com/google/cadvisor/storage/statsd/client"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("statsd", new)
|
||||
}
|
||||
|
||||
type statsdStorage struct {
|
||||
client *client.Client
|
||||
Namespace string
|
||||
}
|
||||
|
||||
const (
|
||||
colCpuCumulativeUsage string = "cpu_cumulative_usage"
|
||||
// Memory Usage
|
||||
colMemoryUsage string = "memory_usage"
|
||||
// Working set size
|
||||
colMemoryWorkingSet string = "memory_working_set"
|
||||
// Cumulative count of bytes received.
|
||||
colRxBytes string = "rx_bytes"
|
||||
// Cumulative count of receive errors encountered.
|
||||
colRxErrors string = "rx_errors"
|
||||
// Cumulative count of bytes transmitted.
|
||||
colTxBytes string = "tx_bytes"
|
||||
// Cumulative count of transmit errors encountered.
|
||||
colTxErrors string = "tx_errors"
|
||||
// Filesystem summary
|
||||
colFsSummary = "fs_summary"
|
||||
// Filesystem limit.
|
||||
colFsLimit = "fs_limit"
|
||||
// Filesystem usage.
|
||||
colFsUsage = "fs_usage"
|
||||
)
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
return newStorage(*storage.ArgDbName, *storage.ArgDbHost)
|
||||
}
|
||||
|
||||
func (self *statsdStorage) containerStatsToValues(
|
||||
stats *info.ContainerStats,
|
||||
) (series map[string]uint64) {
|
||||
series = make(map[string]uint64)
|
||||
|
||||
// Cumulative Cpu Usage
|
||||
series[colCpuCumulativeUsage] = stats.Cpu.Usage.Total
|
||||
|
||||
// Memory Usage
|
||||
series[colMemoryUsage] = stats.Memory.Usage
|
||||
|
||||
// Working set size
|
||||
series[colMemoryWorkingSet] = stats.Memory.WorkingSet
|
||||
|
||||
// Network stats.
|
||||
series[colRxBytes] = stats.Network.RxBytes
|
||||
series[colRxErrors] = stats.Network.RxErrors
|
||||
series[colTxBytes] = stats.Network.TxBytes
|
||||
series[colTxErrors] = stats.Network.TxErrors
|
||||
|
||||
return series
|
||||
}
|
||||
|
||||
func (self *statsdStorage) containerFsStatsToValues(
|
||||
series *map[string]uint64,
|
||||
stats *info.ContainerStats,
|
||||
) {
|
||||
for _, fsStat := range stats.Filesystem {
|
||||
// Summary stats.
|
||||
(*series)[colFsSummary+"."+colFsLimit] += fsStat.Limit
|
||||
(*series)[colFsSummary+"."+colFsUsage] += fsStat.Usage
|
||||
|
||||
// Per device stats.
|
||||
(*series)[fsStat.Device+"."+colFsLimit] = fsStat.Limit
|
||||
(*series)[fsStat.Device+"."+colFsUsage] = fsStat.Usage
|
||||
}
|
||||
}
|
||||
|
||||
//Push the data into redis
|
||||
func (self *statsdStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var containerName string
|
||||
if len(ref.Aliases) > 0 {
|
||||
containerName = ref.Aliases[0]
|
||||
} else {
|
||||
containerName = ref.Name
|
||||
}
|
||||
|
||||
series := self.containerStatsToValues(stats)
|
||||
self.containerFsStatsToValues(&series, stats)
|
||||
for key, value := range series {
|
||||
err := self.client.Send(self.Namespace, containerName, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *statsdStorage) Close() error {
|
||||
self.client.Close()
|
||||
self.client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func newStorage(namespace, hostPort string) (*statsdStorage, error) {
|
||||
statsdClient, err := client.New(hostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statsdStorage := &statsdStorage{
|
||||
client: statsdClient,
|
||||
Namespace: namespace,
|
||||
}
|
||||
return statsdStorage, nil
|
||||
}
|
125
vendor/github.com/google/cadvisor/storage/stdout/stdout.go
generated
vendored
125
vendor/github.com/google/cadvisor/storage/stdout/stdout.go
generated
vendored
@@ -1,125 +0,0 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package stdout
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
)
|
||||
|
||||
func init() {
|
||||
storage.RegisterStorageDriver("stdout", new)
|
||||
}
|
||||
|
||||
type stdoutStorage struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
const (
|
||||
colCpuCumulativeUsage = "cpu_cumulative_usage"
|
||||
// Memory Usage
|
||||
colMemoryUsage = "memory_usage"
|
||||
// Working set size
|
||||
colMemoryWorkingSet = "memory_working_set"
|
||||
// Cumulative count of bytes received.
|
||||
colRxBytes = "rx_bytes"
|
||||
// Cumulative count of receive errors encountered.
|
||||
colRxErrors = "rx_errors"
|
||||
// Cumulative count of bytes transmitted.
|
||||
colTxBytes = "tx_bytes"
|
||||
// Cumulative count of transmit errors encountered.
|
||||
colTxErrors = "tx_errors"
|
||||
// Filesystem summary
|
||||
colFsSummary = "fs_summary"
|
||||
// Filesystem limit.
|
||||
colFsLimit = "fs_limit"
|
||||
// Filesystem usage.
|
||||
colFsUsage = "fs_usage"
|
||||
)
|
||||
|
||||
func new() (storage.StorageDriver, error) {
|
||||
return newStorage(*storage.ArgDbHost)
|
||||
}
|
||||
|
||||
func (driver *stdoutStorage) containerStatsToValues(stats *info.ContainerStats) (series map[string]uint64) {
|
||||
series = make(map[string]uint64)
|
||||
|
||||
// Cumulative Cpu Usage
|
||||
series[colCpuCumulativeUsage] = stats.Cpu.Usage.Total
|
||||
|
||||
// Memory Usage
|
||||
series[colMemoryUsage] = stats.Memory.Usage
|
||||
|
||||
// Working set size
|
||||
series[colMemoryWorkingSet] = stats.Memory.WorkingSet
|
||||
|
||||
// Network stats.
|
||||
series[colRxBytes] = stats.Network.RxBytes
|
||||
series[colRxErrors] = stats.Network.RxErrors
|
||||
series[colTxBytes] = stats.Network.TxBytes
|
||||
series[colTxErrors] = stats.Network.TxErrors
|
||||
|
||||
return series
|
||||
}
|
||||
|
||||
func (driver *stdoutStorage) containerFsStatsToValues(series *map[string]uint64, stats *info.ContainerStats) {
|
||||
for _, fsStat := range stats.Filesystem {
|
||||
// Summary stats.
|
||||
(*series)[colFsSummary+"."+colFsLimit] += fsStat.Limit
|
||||
(*series)[colFsSummary+"."+colFsUsage] += fsStat.Usage
|
||||
|
||||
// Per device stats.
|
||||
(*series)[fsStat.Device+"."+colFsLimit] = fsStat.Limit
|
||||
(*series)[fsStat.Device+"."+colFsUsage] = fsStat.Usage
|
||||
}
|
||||
}
|
||||
|
||||
func (driver *stdoutStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
if stats == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
containerName := ref.Name
|
||||
if len(ref.Aliases) > 0 {
|
||||
containerName = ref.Aliases[0]
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString(fmt.Sprintf("cName=%s host=%s", containerName, driver.Namespace))
|
||||
|
||||
series := driver.containerStatsToValues(stats)
|
||||
driver.containerFsStatsToValues(&series, stats)
|
||||
for key, value := range series {
|
||||
buffer.WriteString(fmt.Sprintf(" %s=%v", key, value))
|
||||
}
|
||||
|
||||
_, err := fmt.Println(buffer.String())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (driver *stdoutStorage) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newStorage(namespace string) (*stdoutStorage, error) {
|
||||
stdoutStorage := &stdoutStorage{
|
||||
Namespace: namespace,
|
||||
}
|
||||
return stdoutStorage, nil
|
||||
}
|
39
vendor/github.com/google/cadvisor/storage/test/mock.go
generated
vendored
39
vendor/github.com/google/cadvisor/storage/test/mock.go
generated
vendored
@@ -1,39 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type MockStorageDriver struct {
|
||||
mock.Mock
|
||||
MockCloseMethod bool
|
||||
}
|
||||
|
||||
func (self *MockStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
args := self.Called(ref, stats)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (self *MockStorageDriver) Close() error {
|
||||
if self.MockCloseMethod {
|
||||
args := self.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
return nil
|
||||
}
|
138
vendor/github.com/google/cadvisor/storage/test/storagetests.go
generated
vendored
138
vendor/github.com/google/cadvisor/storage/test/storagetests.go
generated
vendored
@@ -1,138 +0,0 @@
|
||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/storage"
|
||||
)
|
||||
|
||||
type TestStorageDriver interface {
|
||||
StatsEq(a *info.ContainerStats, b *info.ContainerStats) bool
|
||||
storage.StorageDriver
|
||||
}
|
||||
|
||||
func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStats {
|
||||
if len(cpu) != len(mem) {
|
||||
panic("len(cpu) != len(mem)")
|
||||
}
|
||||
|
||||
ret := make([]*info.ContainerStats, len(cpu))
|
||||
currentTime := time.Now()
|
||||
|
||||
var cpuTotalUsage uint64 = 0
|
||||
for i, cpuUsage := range cpu {
|
||||
cpuTotalUsage += cpuUsage
|
||||
stats := new(info.ContainerStats)
|
||||
stats.Timestamp = currentTime
|
||||
currentTime = currentTime.Add(duration)
|
||||
|
||||
stats.Cpu.Usage.Total = cpuTotalUsage
|
||||
stats.Cpu.Usage.User = stats.Cpu.Usage.Total
|
||||
stats.Cpu.Usage.System = 0
|
||||
stats.Cpu.Usage.PerCpu = []uint64{cpuTotalUsage}
|
||||
|
||||
stats.Memory.Usage = mem[i]
|
||||
|
||||
stats.Network.RxBytes = uint64(rand.Intn(10000))
|
||||
stats.Network.RxErrors = uint64(rand.Intn(1000))
|
||||
stats.Network.TxBytes = uint64(rand.Intn(100000))
|
||||
stats.Network.TxErrors = uint64(rand.Intn(1000))
|
||||
|
||||
stats.Filesystem = make([]info.FsStats, 1)
|
||||
stats.Filesystem[0].Device = "/dev/sda1"
|
||||
stats.Filesystem[0].Limit = 1024000000
|
||||
stats.Filesystem[0].Usage = 1024000
|
||||
ret[i] = stats
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func TimeEq(t1, t2 time.Time, tolerance time.Duration) bool {
|
||||
// t1 should not be later than t2
|
||||
if t1.After(t2) {
|
||||
t1, t2 = t2, t1
|
||||
}
|
||||
diff := t2.Sub(t1)
|
||||
if diff <= tolerance {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
const (
|
||||
// 10ms, i.e. 0.01s
|
||||
timePrecision time.Duration = 10 * time.Millisecond
|
||||
)
|
||||
|
||||
// This function is useful because we do not require precise time
|
||||
// representation.
|
||||
func DefaultStatsEq(a, b *info.ContainerStats) bool {
|
||||
if !TimeEq(a.Timestamp, b.Timestamp, timePrecision) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Cpu, b.Cpu) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Memory, b.Memory) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Network, b.Network) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Filesystem, b.Filesystem) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// This function will generate random stats and write
|
||||
// them into the storage. The function will not close the driver
|
||||
func StorageDriverFillRandomStatsFunc(
|
||||
containerName string,
|
||||
N int,
|
||||
driver TestStorageDriver,
|
||||
t *testing.T,
|
||||
) {
|
||||
cpuTrace := make([]uint64, 0, N)
|
||||
memTrace := make([]uint64, 0, N)
|
||||
|
||||
// We need N+1 observations to get N samples
|
||||
for i := 0; i < N+1; i++ {
|
||||
cpuTrace = append(cpuTrace, uint64(rand.Intn(1000)))
|
||||
memTrace = append(memTrace, uint64(rand.Intn(1000)))
|
||||
}
|
||||
|
||||
samplePeriod := 1 * time.Second
|
||||
|
||||
ref := info.ContainerReference{
|
||||
Name: containerName,
|
||||
}
|
||||
|
||||
trace := buildTrace(cpuTrace, memTrace, samplePeriod)
|
||||
|
||||
for _, stats := range trace {
|
||||
err := driver.AddStats(ref, stats)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add stats: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user