First commit

This commit is contained in:
Joe Beda
2014-06-06 16:40:48 -07:00
commit 2c4b3a562c
250 changed files with 47501 additions and 0 deletions

View File

@@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,15 @@
# go-etcd
The official etcd v0.2 client library for Go.
For usage, please refer to [![GoDoc](https://godoc.org/github.com/coreos/go-etcd/etcd?status.png)](https://godoc.org/github.com/coreos/go-etcd/etcd)
## Install
```bash
go get github.com/coreos/go-etcd/etcd
```
## License
See LICENSE file.

View File

@@ -0,0 +1,23 @@
package etcd
// Add a new directory with a random etcd-generated key under the given path.
func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) {
raw, err := c.post(key, "", ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// Add a new file with a random etcd-generated key under the given path.
func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) {
raw, err := c.post(key, value, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}

View File

@@ -0,0 +1,73 @@
package etcd
import "testing"
func TestAddChild(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("fooDir", true)
c.Delete("nonexistentDir", true)
}()
c.CreateDir("fooDir", 5)
_, err := c.AddChild("fooDir", "v0", 5)
if err != nil {
t.Fatal(err)
}
_, err = c.AddChild("fooDir", "v1", 5)
if err != nil {
t.Fatal(err)
}
resp, err := c.Get("fooDir", true, false)
// The child with v0 should proceed the child with v1 because it's added
// earlier, so it should have a lower key.
if !(len(resp.Node.Nodes) == 2 && (resp.Node.Nodes[0].Value == "v0" && resp.Node.Nodes[1].Value == "v1")) {
t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+
" The response was: %#v", resp)
}
// Creating a child under a nonexistent directory should succeed.
// The directory should be created.
resp, err = c.AddChild("nonexistentDir", "foo", 5)
if err != nil {
t.Fatal(err)
}
}
func TestAddChildDir(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("fooDir", true)
c.Delete("nonexistentDir", true)
}()
c.CreateDir("fooDir", 5)
_, err := c.AddChildDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
_, err = c.AddChildDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
resp, err := c.Get("fooDir", true, false)
// The child with v0 should proceed the child with v1 because it's added
// earlier, so it should have a lower key.
if !(len(resp.Node.Nodes) == 2 && (len(resp.Node.Nodes[0].Nodes) == 0 && len(resp.Node.Nodes[1].Nodes) == 0)) {
t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+
" The response was: %#v", resp)
}
// Creating a child under a nonexistent directory should succeed.
// The directory should be created.
resp, err = c.AddChildDir("nonexistentDir", 5)
if err != nil {
t.Fatal(err)
}
}

View File

@@ -0,0 +1,430 @@
package etcd
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"time"
)
// See SetConsistency for how to use these constants.
const (
// Using strings rather than iota because the consistency level
// could be persisted to disk, so it'd be better to use
// human-readable values.
STRONG_CONSISTENCY = "STRONG"
WEAK_CONSISTENCY = "WEAK"
)
const (
defaultBufferSize = 10
)
type Config struct {
CertFile string `json:"certFile"`
KeyFile string `json:"keyFile"`
CaCertFile []string `json:"caCertFiles"`
DialTimeout time.Duration `json:"timeout"`
Consistency string `json:"consistency"`
}
type Client struct {
config Config `json:"config"`
cluster *Cluster `json:"cluster"`
httpClient *http.Client
persistence io.Writer
cURLch chan string
// CheckRetry can be used to control the policy for failed requests
// and modify the cluster if needed.
// The client calls it before sending requests again, and
// stops retrying if CheckRetry returns some error. The cases that
// this function needs to handle include no response and unexpected
// http status code of response.
// If CheckRetry is nil, client will call the default one
// `DefaultCheckRetry`.
// Argument cluster is the etcd.Cluster object that these requests have been made on.
// Argument reqs is all of the http.Requests that have been made so far.
// Argument resps is all of the http.Responses from these requests.
// Argument err is the reason of the failure.
CheckRetry func(cluster *Cluster, reqs []http.Request,
resps []http.Response, err error) error
}
// NewClient create a basic client that is configured to be used
// with the given machine list.
func NewClient(machines []string) *Client {
config := Config{
// default timeout is one second
DialTimeout: time.Second,
// default consistency level is STRONG
Consistency: STRONG_CONSISTENCY,
}
client := &Client{
cluster: NewCluster(machines),
config: config,
}
client.initHTTPClient()
client.saveConfig()
return client
}
// NewTLSClient create a basic client with TLS configuration
func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) {
// overwrite the default machine to use https
if len(machines) == 0 {
machines = []string{"https://127.0.0.1:4001"}
}
config := Config{
// default timeout is one second
DialTimeout: time.Second,
// default consistency level is STRONG
Consistency: STRONG_CONSISTENCY,
CertFile: cert,
KeyFile: key,
CaCertFile: make([]string, 0),
}
client := &Client{
cluster: NewCluster(machines),
config: config,
}
err := client.initHTTPSClient(cert, key)
if err != nil {
return nil, err
}
err = client.AddRootCA(caCert)
client.saveConfig()
return client, nil
}
// NewClientFromFile creates a client from a given file path.
// The given file is expected to use the JSON format.
func NewClientFromFile(fpath string) (*Client, error) {
fi, err := os.Open(fpath)
if err != nil {
return nil, err
}
defer func() {
if err := fi.Close(); err != nil {
panic(err)
}
}()
return NewClientFromReader(fi)
}
// NewClientFromReader creates a Client configured from a given reader.
// The configuration is expected to use the JSON format.
func NewClientFromReader(reader io.Reader) (*Client, error) {
c := new(Client)
b, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, c)
if err != nil {
return nil, err
}
if c.config.CertFile == "" {
c.initHTTPClient()
} else {
err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile)
}
if err != nil {
return nil, err
}
for _, caCert := range c.config.CaCertFile {
if err := c.AddRootCA(caCert); err != nil {
return nil, err
}
}
return c, nil
}
// Override the Client's HTTP Transport object
func (c *Client) SetTransport(tr *http.Transport) {
c.httpClient.Transport = tr
}
// initHTTPClient initializes a HTTP client for etcd client
func (c *Client) initHTTPClient() {
tr := &http.Transport{
Dial: c.dial,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
c.httpClient = &http.Client{Transport: tr}
}
// initHTTPClient initializes a HTTPS client for etcd client
func (c *Client) initHTTPSClient(cert, key string) error {
if cert == "" || key == "" {
return errors.New("Require both cert and key path")
}
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return err
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: c.dial,
}
c.httpClient = &http.Client{Transport: tr}
return nil
}
// SetPersistence sets a writer to which the config will be
// written every time it's changed.
func (c *Client) SetPersistence(writer io.Writer) {
c.persistence = writer
}
// SetConsistency changes the consistency level of the client.
//
// When consistency is set to STRONG_CONSISTENCY, all requests,
// including GET, are sent to the leader. This means that, assuming
// the absence of leader failures, GET requests are guaranteed to see
// the changes made by previous requests.
//
// When consistency is set to WEAK_CONSISTENCY, other requests
// are still sent to the leader, but GET requests are sent to a
// random server from the server pool. This reduces the read
// load on the leader, but it's not guaranteed that the GET requests
// will see changes made by previous requests (they might have not
// yet been committed on non-leader servers).
func (c *Client) SetConsistency(consistency string) error {
if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) {
return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.")
}
c.config.Consistency = consistency
return nil
}
// AddRootCA adds a root CA cert for the etcd client
func (c *Client) AddRootCA(caCert string) error {
if c.httpClient == nil {
return errors.New("Client has not been initialized yet!")
}
certBytes, err := ioutil.ReadFile(caCert)
if err != nil {
return err
}
tr, ok := c.httpClient.Transport.(*http.Transport)
if !ok {
panic("AddRootCA(): Transport type assert should not fail")
}
if tr.TLSClientConfig.RootCAs == nil {
caCertPool := x509.NewCertPool()
ok = caCertPool.AppendCertsFromPEM(certBytes)
if ok {
tr.TLSClientConfig.RootCAs = caCertPool
}
tr.TLSClientConfig.InsecureSkipVerify = false
} else {
ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes)
}
if !ok {
err = errors.New("Unable to load caCert")
}
c.config.CaCertFile = append(c.config.CaCertFile, caCert)
c.saveConfig()
return err
}
// SetCluster updates cluster information using the given machine list.
func (c *Client) SetCluster(machines []string) bool {
success := c.internalSyncCluster(machines)
return success
}
func (c *Client) GetCluster() []string {
return c.cluster.Machines
}
// SyncCluster updates the cluster information using the internal machine list.
func (c *Client) SyncCluster() bool {
return c.internalSyncCluster(c.cluster.Machines)
}
// internalSyncCluster syncs cluster information using the given machine list.
func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines {
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
resp, err := c.httpClient.Get(httpPath)
if err != nil {
// try another machine in the cluster
continue
} else {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.updateFromStr(string(b))
// update leader
// the first one in the machine list is the leader
c.cluster.switchLeader(0)
logger.Debug("sync.machines ", c.cluster.Machines)
c.saveConfig()
return true
}
}
return false
}
// createHttpPath creates a complete HTTP URL.
// serverName should contain both the host name and a port number, if any.
func (c *Client) createHttpPath(serverName string, _path string) string {
u, err := url.Parse(serverName)
if err != nil {
panic(err)
}
u.Path = path.Join(u.Path, _path)
if u.Scheme == "" {
u.Scheme = "http"
}
return u.String()
}
// dial attempts to open a TCP connection to the provided address, explicitly
// enabling keep-alives with a one-second interval.
func (c *Client) dial(network, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(network, addr, c.config.DialTimeout)
if err != nil {
return nil, err
}
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return nil, errors.New("Failed type-assertion of net.Conn as *net.TCPConn")
}
// Keep TCP alive to check whether or not the remote machine is down
if err = tcpConn.SetKeepAlive(true); err != nil {
return nil, err
}
if err = tcpConn.SetKeepAlivePeriod(time.Second); err != nil {
return nil, err
}
return tcpConn, nil
}
func (c *Client) OpenCURL() {
c.cURLch = make(chan string, defaultBufferSize)
}
func (c *Client) CloseCURL() {
c.cURLch = nil
}
func (c *Client) sendCURL(command string) {
go func() {
select {
case c.cURLch <- command:
default:
}
}()
}
func (c *Client) RecvCURL() string {
return <-c.cURLch
}
// saveConfig saves the current config using c.persistence.
func (c *Client) saveConfig() error {
if c.persistence != nil {
b, err := json.Marshal(c)
if err != nil {
return err
}
_, err = c.persistence.Write(b)
if err != nil {
return err
}
}
return nil
}
// MarshalJSON implements the Marshaller interface
// as defined by the standard JSON package.
func (c *Client) MarshalJSON() ([]byte, error) {
b, err := json.Marshal(struct {
Config Config `json:"config"`
Cluster *Cluster `json:"cluster"`
}{
Config: c.config,
Cluster: c.cluster,
})
if err != nil {
return nil, err
}
return b, nil
}
// UnmarshalJSON implements the Unmarshaller interface
// as defined by the standard JSON package.
func (c *Client) UnmarshalJSON(b []byte) error {
temp := struct {
Config Config `json:"config"`
Cluster *Cluster `json:"cluster"`
}{}
err := json.Unmarshal(b, &temp)
if err != nil {
return err
}
c.cluster = temp.Cluster
c.config = temp.Config
return nil
}

View File

@@ -0,0 +1,96 @@
package etcd
import (
"encoding/json"
"fmt"
"net"
"net/url"
"os"
"testing"
)
// To pass this test, we need to create a cluster of 3 machines
// The server should be listening on 127.0.0.1:4001, 4002, 4003
func TestSync(t *testing.T) {
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
// Explicit trailing slash to ensure this doesn't reproduce:
// https://github.com/coreos/go-etcd/issues/82
c := NewClient([]string{"http://127.0.0.1:4001/"})
success := c.SyncCluster()
if !success {
t.Fatal("cannot sync machines")
}
for _, m := range c.GetCluster() {
u, err := url.Parse(m)
if err != nil {
t.Fatal(err)
}
if u.Scheme != "http" {
t.Fatal("scheme must be http")
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatal(err)
}
if host != "127.0.0.1" {
t.Fatal("Host must be 127.0.0.1")
}
}
badMachines := []string{"abc", "edef"}
success = c.SetCluster(badMachines)
if success {
t.Fatal("should not sync on bad machines")
}
goodMachines := []string{"127.0.0.1:4002"}
success = c.SetCluster(goodMachines)
if !success {
t.Fatal("cannot sync machines")
} else {
fmt.Println(c.cluster.Machines)
}
}
func TestPersistence(t *testing.T) {
c := NewClient(nil)
c.SyncCluster()
fo, err := os.Create("config.json")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := fo.Close(); err != nil {
panic(err)
}
}()
c.SetPersistence(fo)
err = c.saveConfig()
if err != nil {
t.Fatal(err)
}
c2, err := NewClientFromFile("config.json")
if err != nil {
t.Fatal(err)
}
// Verify that the two clients have the same config
b1, _ := json.Marshal(c)
b2, _ := json.Marshal(c2)
if string(b1) != string(b2) {
t.Fatalf("The two configs should be equal!")
}
}

View File

@@ -0,0 +1,51 @@
package etcd
import (
"net/url"
"strings"
)
type Cluster struct {
Leader string `json:"leader"`
Machines []string `json:"machines"`
}
func NewCluster(machines []string) *Cluster {
// if an empty slice was sent in then just assume HTTP 4001 on localhost
if len(machines) == 0 {
machines = []string{"http://127.0.0.1:4001"}
}
// default leader and machines
return &Cluster{
Leader: machines[0],
Machines: machines,
}
}
// switchLeader switch the current leader to machines[num]
func (cl *Cluster) switchLeader(num int) {
logger.Debugf("switch.leader[from %v to %v]",
cl.Leader, cl.Machines[num])
cl.Leader = cl.Machines[num]
}
func (cl *Cluster) updateFromStr(machines string) {
cl.Machines = strings.Split(machines, ", ")
}
func (cl *Cluster) updateLeader(leader string) {
logger.Debugf("update.leader[%s,%s]", cl.Leader, leader)
cl.Leader = leader
}
func (cl *Cluster) updateLeaderFromURL(u *url.URL) {
var leader string
if u.Scheme == "" {
leader = "http://" + u.Host
} else {
leader = u.Scheme + "://" + u.Host
}
cl.updateLeader(leader)
}

View File

@@ -0,0 +1,34 @@
package etcd
import "fmt"
func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*Response, error) {
raw, err := c.RawCompareAndDelete(key, prevValue, prevIndex)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) {
if prevValue == "" && prevIndex == 0 {
return nil, fmt.Errorf("You must give either prevValue or prevIndex.")
}
options := Options{}
if prevValue != "" {
options["prevValue"] = prevValue
}
if prevIndex != 0 {
options["prevIndex"] = prevIndex
}
raw, err := c.delete(key, options)
if err != nil {
return nil, err
}
return raw, err
}

View File

@@ -0,0 +1,46 @@
package etcd
import (
"testing"
)
func TestCompareAndDelete(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
c.Set("foo", "bar", 5)
// This should succeed an correct prevValue
resp, err := c.CompareAndDelete("foo", "bar", 0)
if err != nil {
t.Fatal(err)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndDelete 1 prevNode failed: %#v", resp)
}
resp, _ = c.Set("foo", "bar", 5)
// This should fail because it gives an incorrect prevValue
_, err = c.CompareAndDelete("foo", "xxx", 0)
if err == nil {
t.Fatalf("CompareAndDelete 2 should have failed. The response is: %#v", resp)
}
// This should succeed because it gives an correct prevIndex
resp, err = c.CompareAndDelete("foo", "", resp.Node.ModifiedIndex)
if err != nil {
t.Fatal(err)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp)
}
c.Set("foo", "bar", 5)
// This should fail because it gives an incorrect prevIndex
resp, err = c.CompareAndDelete("foo", "", 29817514)
if err == nil {
t.Fatalf("CompareAndDelete 4 should have failed. The response is: %#v", resp)
}
}

View File

@@ -0,0 +1,36 @@
package etcd
import "fmt"
func (c *Client) CompareAndSwap(key string, value string, ttl uint64,
prevValue string, prevIndex uint64) (*Response, error) {
raw, err := c.RawCompareAndSwap(key, value, ttl, prevValue, prevIndex)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64,
prevValue string, prevIndex uint64) (*RawResponse, error) {
if prevValue == "" && prevIndex == 0 {
return nil, fmt.Errorf("You must give either prevValue or prevIndex.")
}
options := Options{}
if prevValue != "" {
options["prevValue"] = prevValue
}
if prevIndex != 0 {
options["prevIndex"] = prevIndex
}
raw, err := c.put(key, value, ttl, options)
if err != nil {
return nil, err
}
return raw, err
}

View File

@@ -0,0 +1,57 @@
package etcd
import (
"testing"
)
func TestCompareAndSwap(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
c.Set("foo", "bar", 5)
// This should succeed
resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) {
t.Fatalf("CompareAndSwap 1 failed: %#v", resp)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndSwap 1 prevNode failed: %#v", resp)
}
// This should fail because it gives an incorrect prevValue
resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0)
if err == nil {
t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp)
}
resp, err = c.Set("foo", "bar", 5)
if err != nil {
t.Fatal(err)
}
// This should succeed
resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.Node.ModifiedIndex)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) {
t.Fatalf("CompareAndSwap 3 failed: %#v", resp)
}
if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) {
t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp)
}
// This should fail because it gives an incorrect prevIndex
resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514)
if err == nil {
t.Fatalf("CompareAndSwap 4 should have failed. The response is: %#v", resp)
}
}

View File

@@ -0,0 +1,54 @@
package etcd
import (
"io/ioutil"
"log"
"strings"
)
var logger *etcdLogger
func SetLogger(l *log.Logger) {
logger = &etcdLogger{l}
}
func GetLogger() *log.Logger {
return logger.log
}
type etcdLogger struct {
log *log.Logger
}
func (p *etcdLogger) Debug(args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string)
p.log.Println(args)
}
func (p *etcdLogger) Debugf(fmt string, args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string)
// Append newline if necessary
if !strings.HasSuffix(fmt, "\n") {
fmt = fmt + "\n"
}
p.log.Printf(fmt, args)
}
func (p *etcdLogger) Warning(args ...interface{}) {
args[0] = "WARNING: " + args[0].(string)
p.log.Println(args)
}
func (p *etcdLogger) Warningf(fmt string, args ...interface{}) {
// Append newline if necessary
if !strings.HasSuffix(fmt, "\n") {
fmt = fmt + "\n"
}
args[0] = "WARNING: " + args[0].(string)
p.log.Printf(fmt, args)
}
func init() {
// Default logger uses the go default log.
SetLogger(log.New(ioutil.Discard, "go-etcd", log.LstdFlags))
}

View File

@@ -0,0 +1,40 @@
package etcd
// Delete deletes the given key.
//
// When recursive set to false, if the key points to a
// directory the method will fail.
//
// When recursive set to true, if the key points to a file,
// the file will be deleted; if the key points to a directory,
// then everything under the directory (including all child directories)
// will be deleted.
func (c *Client) Delete(key string, recursive bool) (*Response, error) {
raw, err := c.RawDelete(key, recursive, false)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// DeleteDir deletes an empty directory or a key value pair
func (c *Client) DeleteDir(key string) (*Response, error) {
raw, err := c.RawDelete(key, false, true)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) {
ops := Options{
"recursive": recursive,
"dir": dir,
}
return c.delete(key, ops)
}

View File

@@ -0,0 +1,81 @@
package etcd
import (
"testing"
)
func TestDelete(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
c.Set("foo", "bar", 5)
resp, err := c.Delete("foo", false)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Value == "") {
t.Fatalf("Delete failed with %s", resp.Node.Value)
}
if !(resp.PrevNode.Value == "bar") {
t.Fatalf("Delete PrevNode failed with %s", resp.Node.Value)
}
resp, err = c.Delete("foo", false)
if err == nil {
t.Fatalf("Delete should have failed because the key foo did not exist. "+
"The response was: %v", resp)
}
}
func TestDeleteAll(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
c.Delete("fooDir", true)
}()
c.SetDir("foo", 5)
// test delete an empty dir
resp, err := c.DeleteDir("foo")
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Value == "") {
t.Fatalf("DeleteAll 1 failed: %#v", resp)
}
if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") {
t.Fatalf("DeleteAll 1 PrevNode failed: %#v", resp)
}
c.CreateDir("fooDir", 5)
c.Set("fooDir/foo", "bar", 5)
_, err = c.DeleteDir("fooDir")
if err == nil {
t.Fatal("should not able to delete a non-empty dir with deletedir")
}
resp, err = c.Delete("fooDir", true)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Value == "") {
t.Fatalf("DeleteAll 2 failed: %#v", resp)
}
if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") {
t.Fatalf("DeleteAll 2 PrevNode failed: %#v", resp)
}
resp, err = c.Delete("foo", true)
if err == nil {
t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+
"The response was: %v", resp)
}
}

View File

@@ -0,0 +1,48 @@
package etcd
import (
"encoding/json"
"fmt"
)
const (
ErrCodeEtcdNotReachable = 501
)
var (
errorMap = map[int]string{
ErrCodeEtcdNotReachable: "All the given peers are not reachable",
}
)
type EtcdError struct {
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
Cause string `json:"cause,omitempty"`
Index uint64 `json:"index"`
}
func (e EtcdError) Error() string {
return fmt.Sprintf("%v: %v (%v) [%v]", e.ErrorCode, e.Message, e.Cause, e.Index)
}
func newError(errorCode int, cause string, index uint64) *EtcdError {
return &EtcdError{
ErrorCode: errorCode,
Message: errorMap[errorCode],
Cause: cause,
Index: index,
}
}
func handleError(b []byte) error {
etcdErr := new(EtcdError)
err := json.Unmarshal(b, etcdErr)
if err != nil {
logger.Warningf("cannot unmarshal etcd error: %v", err)
return err
}
return etcdErr
}

View File

@@ -0,0 +1,27 @@
package etcd
// Get gets the file or directory associated with the given key.
// If the key points to a directory, files and directories under
// it will be returned in sorted or unsorted order, depending on
// the sort flag.
// If recursive is set to false, contents under child directories
// will not be returned.
// If recursive is set to true, all the contents will be returned.
func (c *Client) Get(key string, sort, recursive bool) (*Response, error) {
raw, err := c.RawGet(key, sort, recursive)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) {
ops := Options{
"recursive": recursive,
"sorted": sort,
}
return c.get(key, ops)
}

View File

@@ -0,0 +1,131 @@
package etcd
import (
"reflect"
"testing"
)
// cleanNode scrubs Expiration, ModifiedIndex and CreatedIndex of a node.
func cleanNode(n *Node) {
n.Expiration = nil
n.ModifiedIndex = 0
n.CreatedIndex = 0
}
// cleanResult scrubs a result object two levels deep of Expiration,
// ModifiedIndex and CreatedIndex.
func cleanResult(result *Response) {
// TODO(philips): make this recursive.
cleanNode(result.Node)
for i, _ := range result.Node.Nodes {
cleanNode(result.Node.Nodes[i])
for j, _ := range result.Node.Nodes[i].Nodes {
cleanNode(result.Node.Nodes[i].Nodes[j])
}
}
}
func TestGet(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
c.Set("foo", "bar", 5)
result, err := c.Get("foo", false, false)
if err != nil {
t.Fatal(err)
}
if result.Node.Key != "/foo" || result.Node.Value != "bar" {
t.Fatalf("Get failed with %s %s %v", result.Node.Key, result.Node.Value, result.Node.TTL)
}
result, err = c.Get("goo", false, false)
if err == nil {
t.Fatalf("should not be able to get non-exist key")
}
}
func TestGetAll(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("fooDir", true)
}()
c.CreateDir("fooDir", 5)
c.Set("fooDir/k0", "v0", 5)
c.Set("fooDir/k1", "v1", 5)
// Return kv-pairs in sorted order
result, err := c.Get("fooDir", true, false)
if err != nil {
t.Fatal(err)
}
expected := Nodes{
&Node{
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
},
&Node{
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
},
}
cleanResult(result)
if !reflect.DeepEqual(result.Node.Nodes, expected) {
t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected)
}
// Test the `recursive` option
c.CreateDir("fooDir/childDir", 5)
c.Set("fooDir/childDir/k2", "v2", 5)
// Return kv-pairs in sorted order
result, err = c.Get("fooDir", true, true)
cleanResult(result)
if err != nil {
t.Fatal(err)
}
expected = Nodes{
&Node{
Key: "/fooDir/childDir",
Dir: true,
Nodes: Nodes{
&Node{
Key: "/fooDir/childDir/k2",
Value: "v2",
TTL: 5,
},
},
TTL: 5,
},
&Node{
Key: "/fooDir/k0",
Value: "v0",
TTL: 5,
},
&Node{
Key: "/fooDir/k1",
Value: "v1",
TTL: 5,
},
}
cleanResult(result)
if !reflect.DeepEqual(result.Node.Nodes, expected) {
t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected)
}
}

View File

@@ -0,0 +1,72 @@
package etcd
import (
"fmt"
"net/url"
"reflect"
)
type Options map[string]interface{}
// An internally-used data structure that represents a mapping
// between valid options and their kinds
type validOptions map[string]reflect.Kind
// Valid options for GET, PUT, POST, DELETE
// Using CAPITALIZED_UNDERSCORE to emphasize that these
// values are meant to be used as constants.
var (
VALID_GET_OPTIONS = validOptions{
"recursive": reflect.Bool,
"consistent": reflect.Bool,
"sorted": reflect.Bool,
"wait": reflect.Bool,
"waitIndex": reflect.Uint64,
}
VALID_PUT_OPTIONS = validOptions{
"prevValue": reflect.String,
"prevIndex": reflect.Uint64,
"prevExist": reflect.Bool,
"dir": reflect.Bool,
}
VALID_POST_OPTIONS = validOptions{}
VALID_DELETE_OPTIONS = validOptions{
"recursive": reflect.Bool,
"dir": reflect.Bool,
"prevValue": reflect.String,
"prevIndex": reflect.Uint64,
}
)
// Convert options to a string of HTML parameters
func (ops Options) toParameters(validOps validOptions) (string, error) {
p := "?"
values := url.Values{}
if ops == nil {
return "", nil
}
for k, v := range ops {
// Check if the given option is valid (that it exists)
kind := validOps[k]
if kind == reflect.Invalid {
return "", fmt.Errorf("Invalid option: %v", k)
}
// Check if the given option is of the valid type
t := reflect.TypeOf(v)
if kind != t.Kind() {
return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.",
k, kind, t.Kind())
}
values.Set(k, fmt.Sprintf("%v", v))
}
p += values.Encode()
return p, nil
}

View File

@@ -0,0 +1,365 @@
package etcd
import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"path"
"strings"
"sync"
"time"
)
// Errors introduced by handling requests
var (
ErrRequestCancelled = errors.New("sending request is cancelled")
)
type RawRequest struct {
Method string
RelativePath string
Values url.Values
Cancel <-chan bool
}
// NewRawRequest returns a new RawRequest
func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan bool) *RawRequest {
return &RawRequest{
Method: method,
RelativePath: relativePath,
Values: values,
Cancel: cancel,
}
}
// getCancelable issues a cancelable GET request
func (c *Client) getCancelable(key string, options Options,
cancel <-chan bool) (*RawResponse, error) {
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
p := keyToPath(key)
// If consistency level is set to STRONG, append
// the `consistent` query string.
if c.config.Consistency == STRONG_CONSISTENCY {
options["consistent"] = true
}
str, err := options.toParameters(VALID_GET_OPTIONS)
if err != nil {
return nil, err
}
p += str
req := NewRawRequest("GET", p, nil, cancel)
resp, err := c.SendRequest(req)
if err != nil {
return nil, err
}
return resp, nil
}
// get issues a GET request
func (c *Client) get(key string, options Options) (*RawResponse, error) {
return c.getCancelable(key, options, nil)
}
// put issues a PUT request
func (c *Client) put(key string, value string, ttl uint64,
options Options) (*RawResponse, error) {
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
p := keyToPath(key)
str, err := options.toParameters(VALID_PUT_OPTIONS)
if err != nil {
return nil, err
}
p += str
req := NewRawRequest("PUT", p, buildValues(value, ttl), nil)
resp, err := c.SendRequest(req)
if err != nil {
return nil, err
}
return resp, nil
}
// post issues a POST request
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
p := keyToPath(key)
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
resp, err := c.SendRequest(req)
if err != nil {
return nil, err
}
return resp, nil
}
// delete issues a DELETE request
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
p := keyToPath(key)
str, err := options.toParameters(VALID_DELETE_OPTIONS)
if err != nil {
return nil, err
}
p += str
req := NewRawRequest("DELETE", p, nil, nil)
resp, err := c.SendRequest(req)
if err != nil {
return nil, err
}
return resp, nil
}
// SendRequest sends a HTTP request and returns a Response as defined by etcd
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
var req *http.Request
var resp *http.Response
var httpPath string
var err error
var respBody []byte
reqs := make([]http.Request, 0)
resps := make([]http.Response, 0)
checkRetry := c.CheckRetry
if checkRetry == nil {
checkRetry = DefaultCheckRetry
}
cancelled := make(chan bool, 1)
reqLock := new(sync.Mutex)
if rr.Cancel != nil {
cancelRoutine := make(chan bool)
defer close(cancelRoutine)
go func() {
select {
case <-rr.Cancel:
cancelled <- true
logger.Debug("send.request is cancelled")
case <-cancelRoutine:
return
}
// Repeat canceling request until this thread is stopped
// because we have no idea about whether it succeeds.
for {
reqLock.Lock()
c.httpClient.Transport.(*http.Transport).CancelRequest(req)
reqLock.Unlock()
select {
case <-time.After(100 * time.Millisecond):
case <-cancelRoutine:
return
}
}
}()
}
// if we connect to a follower, we will retry until we find a leader
for attempt := 0; ; attempt++ {
select {
case <-cancelled:
return nil, ErrRequestCancelled
default:
}
logger.Debug("begin attempt", attempt, "for", rr.RelativePath)
if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
// If it's a GET and consistency level is set to WEAK,
// then use a random machine.
httpPath = c.getHttpPath(true, rr.RelativePath)
} else {
// Else use the leader.
httpPath = c.getHttpPath(false, rr.RelativePath)
}
// Return a cURL command if curlChan is set
if c.cURLch != nil {
command := fmt.Sprintf("curl -X %s %s", rr.Method, httpPath)
for key, value := range rr.Values {
command += fmt.Sprintf(" -d %s=%s", key, value[0])
}
c.sendCURL(command)
}
logger.Debug("send.request.to ", httpPath, " | method ", rr.Method)
reqLock.Lock()
if rr.Values == nil {
if req, err = http.NewRequest(rr.Method, httpPath, nil); err != nil {
return nil, err
}
} else {
body := strings.NewReader(rr.Values.Encode())
if req, err = http.NewRequest(rr.Method, httpPath, body); err != nil {
return nil, err
}
req.Header.Set("Content-Type",
"application/x-www-form-urlencoded; param=value")
}
reqLock.Unlock()
resp, err = c.httpClient.Do(req)
// If the request was cancelled, return ErrRequestCancelled directly
select {
case <-cancelled:
return nil, ErrRequestCancelled
default:
}
reqs = append(reqs, *req)
// network error, change a machine!
if err != nil {
logger.Debug("network error:", err.Error())
resps = append(resps, http.Response{})
if checkErr := checkRetry(c.cluster, reqs, resps, err); checkErr != nil {
return nil, checkErr
}
c.cluster.switchLeader(attempt % len(c.cluster.Machines))
continue
}
// if there is no error, it should receive response
resps = append(resps, *resp)
defer resp.Body.Close()
logger.Debug("recv.response.from", httpPath)
if validHttpStatusCode[resp.StatusCode] {
// try to read byte code and break the loop
respBody, err = ioutil.ReadAll(resp.Body)
if err == nil {
logger.Debug("recv.success.", httpPath)
break
}
}
// if resp is TemporaryRedirect, set the new leader and retry
if resp.StatusCode == http.StatusTemporaryRedirect {
u, err := resp.Location()
if err != nil {
logger.Warning(err)
} else {
// Update cluster leader based on redirect location
// because it should point to the leader address
c.cluster.updateLeaderFromURL(u)
logger.Debug("recv.response.relocate", u.String())
}
continue
}
if checkErr := checkRetry(c.cluster, reqs, resps,
errors.New("Unexpected HTTP status code")); checkErr != nil {
return nil, checkErr
}
}
r := &RawResponse{
StatusCode: resp.StatusCode,
Body: respBody,
Header: resp.Header,
}
return r, nil
}
// DefaultCheckRetry checks retry cases
// If it has retried 2 * machine number, stop to retry it anymore
// If resp is nil, sleep for 200ms
// If status code is InternalServerError, sleep for 200ms.
func DefaultCheckRetry(cluster *Cluster, reqs []http.Request,
resps []http.Response, err error) error {
if len(reqs) >= 2*len(cluster.Machines) {
return newError(ErrCodeEtcdNotReachable,
"Tried to connect to each peer twice and failed", 0)
}
resp := &resps[len(resps)-1]
if resp == nil {
time.Sleep(time.Millisecond * 200)
return nil
}
code := resp.StatusCode
if code == http.StatusInternalServerError {
time.Sleep(time.Millisecond * 200)
}
logger.Warning("bad response status code", code)
return nil
}
func (c *Client) getHttpPath(random bool, s ...string) string {
var machine string
if random {
machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
} else {
machine = c.cluster.Leader
}
fullPath := machine + "/" + version
for _, seg := range s {
fullPath = fullPath + "/" + seg
}
return fullPath
}
// buildValues builds a url.Values map according to the given value and ttl
func buildValues(value string, ttl uint64) url.Values {
v := url.Values{}
if value != "" {
v.Set("value", value)
}
if ttl > 0 {
v.Set("ttl", fmt.Sprintf("%v", ttl))
}
return v
}
// convert key string to http path exclude version
// for example: key[foo] -> path[keys/foo]
// key[/] -> path[keys/]
func keyToPath(key string) string {
p := path.Join("keys", key)
// corner case: if key is "/" or "//" ect
// path join will clear the tailing "/"
// we need to add it back
if p == "keys" {
p = "keys/"
}
return p
}

View File

@@ -0,0 +1,89 @@
package etcd
import (
"encoding/json"
"net/http"
"strconv"
"time"
)
const (
rawResponse = iota
normalResponse
)
type responseType int
type RawResponse struct {
StatusCode int
Body []byte
Header http.Header
}
var (
validHttpStatusCode = map[int]bool{
http.StatusCreated: true,
http.StatusOK: true,
http.StatusBadRequest: true,
http.StatusNotFound: true,
http.StatusPreconditionFailed: true,
http.StatusForbidden: true,
}
)
// Unmarshal parses RawResponse and stores the result in Response
func (rr *RawResponse) Unmarshal() (*Response, error) {
if rr.StatusCode != http.StatusOK && rr.StatusCode != http.StatusCreated {
return nil, handleError(rr.Body)
}
resp := new(Response)
err := json.Unmarshal(rr.Body, resp)
if err != nil {
return nil, err
}
// attach index and term to response
resp.EtcdIndex, _ = strconv.ParseUint(rr.Header.Get("X-Etcd-Index"), 10, 64)
resp.RaftIndex, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Index"), 10, 64)
resp.RaftTerm, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Term"), 10, 64)
return resp, nil
}
type Response struct {
Action string `json:"action"`
Node *Node `json:"node"`
PrevNode *Node `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"etcdIndex"`
RaftIndex uint64 `json:"raftIndex"`
RaftTerm uint64 `json:"raftTerm"`
}
type Node struct {
Key string `json:"key, omitempty"`
Value string `json:"value,omitempty"`
Dir bool `json:"dir,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"`
Nodes Nodes `json:"nodes,omitempty"`
ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
CreatedIndex uint64 `json:"createdIndex,omitempty"`
}
type Nodes []*Node
// interfaces for sorting
func (ns Nodes) Len() int {
return len(ns)
}
func (ns Nodes) Less(i, j int) bool {
return ns[i].Key < ns[j].Key
}
func (ns Nodes) Swap(i, j int) {
ns[i], ns[j] = ns[j], ns[i]
}

View File

@@ -0,0 +1,42 @@
package etcd
import (
"fmt"
"testing"
)
func TestSetCurlChan(t *testing.T) {
c := NewClient(nil)
c.OpenCURL()
defer func() {
c.Delete("foo", true)
}()
_, err := c.Set("foo", "bar", 5)
if err != nil {
t.Fatal(err)
}
expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5",
c.cluster.Leader)
actual := c.RecvCURL()
if expected != actual {
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
actual, expected)
}
c.SetConsistency(STRONG_CONSISTENCY)
_, err = c.Get("foo", false, false)
if err != nil {
t.Fatal(err)
}
expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&recursive=false&sorted=false",
c.cluster.Leader)
actual = c.RecvCURL()
if expected != actual {
t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
actual, expected)
}
}

View File

@@ -0,0 +1,137 @@
package etcd
// Set sets the given key to the given value.
// It will create a new key value pair or replace the old one.
// It will not replace a existing directory.
func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
raw, err := c.RawSet(key, value, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// Set sets the given key to a directory.
// It will create a new directory or replace the old key value pair by a directory.
// It will not replace a existing directory.
func (c *Client) SetDir(key string, ttl uint64) (*Response, error) {
raw, err := c.RawSetDir(key, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// CreateDir creates a directory. It succeeds only if
// the given key does not yet exist.
func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) {
raw, err := c.RawCreateDir(key, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// UpdateDir updates the given directory. It succeeds only if the
// given key already exists.
func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) {
raw, err := c.RawUpdateDir(key, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// Create creates a file with the given value under the given key. It succeeds
// only if the given key does not yet exist.
func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) {
raw, err := c.RawCreate(key, value, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// CreateInOrder creates a file with a key that's guaranteed to be higher than other
// keys in the given directory. It is useful for creating queues.
func (c *Client) CreateInOrder(dir string, value string, ttl uint64) (*Response, error) {
raw, err := c.RawCreateInOrder(dir, value, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
// Update updates the given key to the given value. It succeeds only if the
// given key already exists.
func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) {
raw, err := c.RawUpdate(key, value, ttl)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
func (c *Client) RawUpdateDir(key string, ttl uint64) (*RawResponse, error) {
ops := Options{
"prevExist": true,
"dir": true,
}
return c.put(key, "", ttl, ops)
}
func (c *Client) RawCreateDir(key string, ttl uint64) (*RawResponse, error) {
ops := Options{
"prevExist": false,
"dir": true,
}
return c.put(key, "", ttl, ops)
}
func (c *Client) RawSet(key string, value string, ttl uint64) (*RawResponse, error) {
return c.put(key, value, ttl, nil)
}
func (c *Client) RawSetDir(key string, ttl uint64) (*RawResponse, error) {
ops := Options{
"dir": true,
}
return c.put(key, "", ttl, ops)
}
func (c *Client) RawUpdate(key string, value string, ttl uint64) (*RawResponse, error) {
ops := Options{
"prevExist": true,
}
return c.put(key, value, ttl, ops)
}
func (c *Client) RawCreate(key string, value string, ttl uint64) (*RawResponse, error) {
ops := Options{
"prevExist": false,
}
return c.put(key, value, ttl, ops)
}
func (c *Client) RawCreateInOrder(dir string, value string, ttl uint64) (*RawResponse, error) {
return c.post(dir, value, ttl)
}

View File

@@ -0,0 +1,241 @@
package etcd
import (
"testing"
)
func TestSet(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
}()
resp, err := c.Set("foo", "bar", 5)
if err != nil {
t.Fatal(err)
}
if resp.Node.Key != "/foo" || resp.Node.Value != "bar" || resp.Node.TTL != 5 {
t.Fatalf("Set 1 failed: %#v", resp)
}
if resp.PrevNode != nil {
t.Fatalf("Set 1 PrevNode failed: %#v", resp)
}
resp, err = c.Set("foo", "bar2", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/foo" && resp.Node.Value == "bar2" && resp.Node.TTL == 5) {
t.Fatalf("Set 2 failed: %#v", resp)
}
if resp.PrevNode.Key != "/foo" || resp.PrevNode.Value != "bar" || resp.Node.TTL != 5 {
t.Fatalf("Set 2 PrevNode failed: %#v", resp)
}
}
func TestUpdate(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
c.Delete("nonexistent", true)
}()
resp, err := c.Set("foo", "bar", 5)
if err != nil {
t.Fatal(err)
}
// This should succeed.
resp, err = c.Update("foo", "wakawaka", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "update" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) {
t.Fatalf("Update 1 failed: %#v", resp)
}
if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.Node.TTL == 5) {
t.Fatalf("Update 1 prevValue failed: %#v", resp)
}
// This should fail because the key does not exist.
resp, err = c.Update("nonexistent", "whatever", 5)
if err == nil {
t.Fatalf("The key %v did not exist, so the update should have failed."+
"The response was: %#v", resp.Node.Key, resp)
}
}
func TestCreate(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("newKey", true)
}()
newKey := "/newKey"
newValue := "/newValue"
// This should succeed
resp, err := c.Create(newKey, newValue, 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "create" && resp.Node.Key == newKey &&
resp.Node.Value == newValue && resp.Node.TTL == 5) {
t.Fatalf("Create 1 failed: %#v", resp)
}
if resp.PrevNode != nil {
t.Fatalf("Create 1 PrevNode failed: %#v", resp)
}
// This should fail, because the key is already there
resp, err = c.Create(newKey, newValue, 5)
if err == nil {
t.Fatalf("The key %v did exist, so the creation should have failed."+
"The response was: %#v", resp.Node.Key, resp)
}
}
func TestCreateInOrder(t *testing.T) {
c := NewClient(nil)
dir := "/queue"
defer func() {
c.DeleteDir(dir)
}()
var firstKey, secondKey string
resp, err := c.CreateInOrder(dir, "1", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "create" && resp.Node.Value == "1" && resp.Node.TTL == 5) {
t.Fatalf("Create 1 failed: %#v", resp)
}
firstKey = resp.Node.Key
resp, err = c.CreateInOrder(dir, "2", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "create" && resp.Node.Value == "2" && resp.Node.TTL == 5) {
t.Fatalf("Create 2 failed: %#v", resp)
}
secondKey = resp.Node.Key
if firstKey >= secondKey {
t.Fatalf("Expected first key to be greater than second key, but %s is not greater than %s",
firstKey, secondKey)
}
}
func TestSetDir(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("foo", true)
c.Delete("fooDir", true)
}()
resp, err := c.CreateDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/fooDir" && resp.Node.Value == "" && resp.Node.TTL == 5) {
t.Fatalf("SetDir 1 failed: %#v", resp)
}
if resp.PrevNode != nil {
t.Fatalf("SetDir 1 PrevNode failed: %#v", resp)
}
// This should fail because /fooDir already points to a directory
resp, err = c.CreateDir("/fooDir", 5)
if err == nil {
t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+
"The response was: %#v", resp)
}
_, err = c.Set("foo", "bar", 5)
if err != nil {
t.Fatal(err)
}
// This should succeed
// It should replace the key
resp, err = c.SetDir("foo", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/foo" && resp.Node.Value == "" && resp.Node.TTL == 5) {
t.Fatalf("SetDir 2 failed: %#v", resp)
}
if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.PrevNode.TTL == 5) {
t.Fatalf("SetDir 2 failed: %#v", resp)
}
}
func TestUpdateDir(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("fooDir", true)
}()
resp, err := c.CreateDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
// This should succeed.
resp, err = c.UpdateDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "update" && resp.Node.Key == "/fooDir" &&
resp.Node.Value == "" && resp.Node.TTL == 5) {
t.Fatalf("UpdateDir 1 failed: %#v", resp)
}
if !(resp.PrevNode.Key == "/fooDir" && resp.PrevNode.Dir == true && resp.PrevNode.TTL == 5) {
t.Fatalf("UpdateDir 1 PrevNode failed: %#v", resp)
}
// This should fail because the key does not exist.
resp, err = c.UpdateDir("nonexistentDir", 5)
if err == nil {
t.Fatalf("The key %v did not exist, so the update should have failed."+
"The response was: %#v", resp.Node.Key, resp)
}
}
func TestCreateDir(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("fooDir", true)
}()
// This should succeed
resp, err := c.CreateDir("fooDir", 5)
if err != nil {
t.Fatal(err)
}
if !(resp.Action == "create" && resp.Node.Key == "/fooDir" &&
resp.Node.Value == "" && resp.Node.TTL == 5) {
t.Fatalf("CreateDir 1 failed: %#v", resp)
}
if resp.PrevNode != nil {
t.Fatalf("CreateDir 1 PrevNode failed: %#v", resp)
}
// This should fail, because the key is already there
resp, err = c.CreateDir("fooDir", 5)
if err == nil {
t.Fatalf("The key %v did exist, so the creation should have failed."+
"The response was: %#v", resp.Node.Key, resp)
}
}

View File

@@ -0,0 +1,3 @@
package etcd
const version = "v2"

View File

@@ -0,0 +1,103 @@
package etcd
import (
"errors"
)
// Errors introduced by the Watch command.
var (
ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel")
)
// If recursive is set to true the watch returns the first change under the given
// prefix since the given index.
//
// If recursive is set to false the watch returns the first change to the given key
// since the given index.
//
// To watch for the latest change, set waitIndex = 0.
//
// If a receiver channel is given, it will be a long-term watch. Watch will block at the
//channel. After someone receives the channel, it will go on to watch that
// prefix. If a stop channel is given, the client can close long-term watch using
// the stop channel.
func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool,
receiver chan *Response, stop chan bool) (*Response, error) {
logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader)
if receiver == nil {
raw, err := c.watchOnce(prefix, waitIndex, recursive, stop)
if err != nil {
return nil, err
}
return raw.Unmarshal()
}
defer close(receiver)
for {
raw, err := c.watchOnce(prefix, waitIndex, recursive, stop)
if err != nil {
return nil, err
}
resp, err := raw.Unmarshal()
if err != nil {
return nil, err
}
waitIndex = resp.Node.ModifiedIndex + 1
receiver <- resp
}
}
func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool,
receiver chan *RawResponse, stop chan bool) (*RawResponse, error) {
logger.Debugf("rawWatch %s [%s]", prefix, c.cluster.Leader)
if receiver == nil {
return c.watchOnce(prefix, waitIndex, recursive, stop)
}
for {
raw, err := c.watchOnce(prefix, waitIndex, recursive, stop)
if err != nil {
return nil, err
}
resp, err := raw.Unmarshal()
if err != nil {
return nil, err
}
waitIndex = resp.Node.ModifiedIndex + 1
receiver <- raw
}
}
// helper func
// return when there is change under the given prefix
func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*RawResponse, error) {
options := Options{
"wait": true,
}
if waitIndex > 0 {
options["waitIndex"] = waitIndex
}
if recursive {
options["recursive"] = true
}
resp, err := c.getCancelable(key, options, stop)
if err == ErrRequestCancelled {
return nil, ErrWatchStoppedByUser
}
return resp, err
}

View File

@@ -0,0 +1,119 @@
package etcd
import (
"fmt"
"runtime"
"testing"
"time"
)
func TestWatch(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("watch_foo", true)
}()
go setHelper("watch_foo", "bar", c)
resp, err := c.Watch("watch_foo", 0, false, nil, nil)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") {
t.Fatalf("Watch 1 failed: %#v", resp)
}
go setHelper("watch_foo", "bar", c)
resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, false, nil, nil)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") {
t.Fatalf("Watch 2 failed: %#v", resp)
}
routineNum := runtime.NumGoroutine()
ch := make(chan *Response, 10)
stop := make(chan bool, 1)
go setLoop("watch_foo", "bar", c)
go receiver(ch, stop)
_, err = c.Watch("watch_foo", 0, false, ch, stop)
if err != ErrWatchStoppedByUser {
t.Fatalf("Watch returned a non-user stop error")
}
if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum {
t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum)
}
}
func TestWatchAll(t *testing.T) {
c := NewClient(nil)
defer func() {
c.Delete("watch_foo", true)
}()
go setHelper("watch_foo/foo", "bar", c)
resp, err := c.Watch("watch_foo", 0, true, nil, nil)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") {
t.Fatalf("WatchAll 1 failed: %#v", resp)
}
go setHelper("watch_foo/foo", "bar", c)
resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, true, nil, nil)
if err != nil {
t.Fatal(err)
}
if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") {
t.Fatalf("WatchAll 2 failed: %#v", resp)
}
ch := make(chan *Response, 10)
stop := make(chan bool, 1)
routineNum := runtime.NumGoroutine()
go setLoop("watch_foo/foo", "bar", c)
go receiver(ch, stop)
_, err = c.Watch("watch_foo", 0, true, ch, stop)
if err != ErrWatchStoppedByUser {
t.Fatalf("Watch returned a non-user stop error")
}
if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum {
t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum)
}
}
func setHelper(key, value string, c *Client) {
time.Sleep(time.Second)
c.Set(key, value, 100)
}
func setLoop(key, value string, c *Client) {
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
newValue := fmt.Sprintf("%s_%v", value, i)
c.Set(key, newValue, 100)
time.Sleep(time.Second / 10)
}
}
func receiver(c chan *Response, stop chan bool) {
for i := 0; i < 10; i++ {
<-c
}
stop <- true
}

View File

@@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,121 @@
go-log
==========
go-log is a simple logging library for Go which supports logging to
systemd.
### Examples
#### Default
This example uses the default log to log to standard out and (if available) to systemd:
```go
package main
import (
"github.com/coreos/go-log/log"
)
func main() {
log.Info("Hello World.")
log.Error("There's nothing more to this program.")
}
```
#### Using Sinks and Formats
```go
package main
import (
"github.com/coreos/go-log/log"
"os"
)
func main() {
l := log.NewSimple(
log.WriterSink(os.Stderr,
"%s: %s[%d] %s\n",
[]string{"priority", "executable", "pid", "message"}))
l.Info("Here's a differently formatted log message.")
}
```
#### Custom Sink
This example only logs messages with priority `PriErr` and greater.
```go
package main
import (
"github.com/coreos/go-log/log"
"os"
)
func main() {
l := log.NewSimple(
&PriorityFilter{
log.PriErr,
log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields),
})
l.Info("This will be filtered out")
l.Info("and not printed at all.")
l.Error("This will be printed, though!")
l.Critical("And so will this!")
}
type PriorityFilter struct {
priority log.Priority
target log.Sink
}
func (filter *PriorityFilter) Log(fields log.Fields) {
// lower priority values indicate more important messages
if fields["priority"].(log.Priority) <= filter.priority {
filter.target.Log(fields)
}
}
```
### Fields
The following fields are available for use in all sinks:
```go
"prefix" string // static field available to all sinks
"seq" uint64 // auto-incrementing sequence number
"start_time" time.Time // start time of the log
"time" string // formatted time of log entry
"full_time" time.Time // time of log entry
"rtime" time.Duration // relative time of log entry since started
"pid" int // process id
"executable" string // executable filename
```
In addition, if `verbose=true` is passed to `New()`, the following (somewhat expensive) runtime fields are also available:
```go
"funcname" string // function name where the log function was called
"lineno" int // line number where the log function was called
"pathname" string // full pathname of caller
"filename" string // filename of caller
```
### Logging functions
All these functions can also be called directly to use the default log.
```go
func (*Logger) Log(priority Priority, v ...interface)
func (*Logger) Logf(priority Priority, format string, v ...interface{})
func (*Logger) Emergency(v ...interface)
func (*Logger) Emergencyf(format string, v ...interface{})
func (*Logger) Alert(v ...interface)
func (*Logger) Alertf(format string, v ...interface{})
func (*Logger) Critical(v ...interface)
func (*Logger) Criticalf(format string, v ...interface{})
func (*Logger) Error(v ...interface)
func (*Logger) Errorf(format string, v ...interface{})
func (*Logger) Warning(v ...interface)
func (*Logger) Warningf(format string, v ...interface{})
func (*Logger) Notice(v ...interface)
func (*Logger) Noticef(format string, v ...interface{})
func (*Logger) Info(v ...interface)
func (*Logger) Infof(format string, v ...interface{})
func (*Logger) Debug(v ...interface)
func (*Logger) Debugf(format string, v ...interface{})
```
### Acknowledgements
This package is a mostly-from-scratch rewrite of
[ccding/go-logging](https://github.com/ccding/go-logging) with some features
removed and systemd support added.

View File

@@ -0,0 +1,214 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"fmt"
"os"
)
var BasicFormat = "%s [%9s] %s- %s\n"
var BasicFields = []string{"time", "priority", "prefix", "message"}
var RichFormat = "%s [%9s] %d %s - %s:%s:%d - %s\n"
var RichFields = []string{"full_time", "priority", "seq", "prefix", "filename", "funcname", "lineno", "message"}
// This function has an unusual name to aid in finding it while walking the
// stack. We need to do some dead reckoning from this function to access the
// caller's stack, so there is a consistent call depth above this function.
func (logger *Logger) Log(priority Priority, v ...interface{}) {
fields := logger.fieldValues()
fields["priority"] = priority
fields["message"] = fmt.Sprint(v...)
for _, sink := range logger.sinks {
sink.Log(fields)
}
}
func (logger *Logger) Logf(priority Priority, format string, v ...interface{}) {
logger.Log(priority, fmt.Sprintf(format, v...))
}
func (logger *Logger) Emergency(v ...interface{}) {
logger.Log(PriEmerg, v...)
}
func (logger *Logger) Emergencyf(format string, v ...interface{}) {
logger.Log(PriEmerg, fmt.Sprintf(format, v...))
}
func (logger *Logger) Alert(v ...interface{}) {
logger.Log(PriAlert, v...)
}
func (logger *Logger) Alertf(format string, v ...interface{}) {
logger.Log(PriAlert, fmt.Sprintf(format, v...))
}
func (logger *Logger) Critical(v ...interface{}) {
logger.Log(PriCrit, v...)
}
func (logger *Logger) Criticalf(format string, v ...interface{}) {
logger.Log(PriCrit, fmt.Sprintf(format, v...))
}
func (logger *Logger) Error(v ...interface{}) {
logger.Log(PriErr, v...)
}
func (logger *Logger) Errorf(format string, v ...interface{}) {
logger.Log(PriErr, fmt.Sprintf(format, v...))
}
func (logger *Logger) Warning(v ...interface{}) {
logger.Log(PriWarning, v...)
}
func (logger *Logger) Warningf(format string, v ...interface{}) {
logger.Log(PriWarning, fmt.Sprintf(format, v...))
}
func (logger *Logger) Notice(v ...interface{}) {
logger.Log(PriNotice, v...)
}
func (logger *Logger) Noticef(format string, v ...interface{}) {
logger.Log(PriNotice, fmt.Sprintf(format, v...))
}
func (logger *Logger) Info(v ...interface{}) {
logger.Log(PriInfo, v...)
}
func (logger *Logger) Infof(format string, v ...interface{}) {
logger.Log(PriInfo, fmt.Sprintf(format, v...))
}
func (logger *Logger) Debug(v ...interface{}) {
logger.Log(PriDebug, v...)
}
func (logger *Logger) Debugf(format string, v ...interface{}) {
logger.Log(PriDebug, fmt.Sprintf(format, v...))
}
func Emergency(v ...interface{}) {
defaultLogger.Log(PriEmerg, v...)
}
func Emergencyf(format string, v ...interface{}) {
defaultLogger.Log(PriEmerg, fmt.Sprintf(format, v...))
}
func Alert(v ...interface{}) {
defaultLogger.Log(PriAlert, v...)
}
func Alertf(format string, v ...interface{}) {
defaultLogger.Log(PriAlert, fmt.Sprintf(format, v...))
}
func Critical(v ...interface{}) {
defaultLogger.Log(PriCrit, v...)
}
func Criticalf(format string, v ...interface{}) {
defaultLogger.Log(PriCrit, fmt.Sprintf(format, v...))
}
func Error(v ...interface{}) {
defaultLogger.Log(PriErr, v...)
}
func Errorf(format string, v ...interface{}) {
defaultLogger.Log(PriErr, fmt.Sprintf(format, v...))
}
func Warning(v ...interface{}) {
defaultLogger.Log(PriWarning, v...)
}
func Warningf(format string, v ...interface{}) {
defaultLogger.Log(PriWarning, fmt.Sprintf(format, v...))
}
func Notice(v ...interface{}) {
defaultLogger.Log(PriNotice, v...)
}
func Noticef(format string, v ...interface{}) {
defaultLogger.Log(PriNotice, fmt.Sprintf(format, v...))
}
func Info(v ...interface{}) {
defaultLogger.Log(PriInfo, v...)
}
func Infof(format string, v ...interface{}) {
defaultLogger.Log(PriInfo, fmt.Sprintf(format, v...))
}
func Debug(v ...interface{}) {
defaultLogger.Log(PriDebug, v...)
}
func Debugf(format string, v ...interface{}) {
defaultLogger.Log(PriDebug, fmt.Sprintf(format, v...))
}
// Standard library log functions
func (logger *Logger)Fatalln (v ...interface{}) {
logger.Log(PriCrit, v...)
os.Exit(1)
}
func (logger *Logger)Fatalf (format string, v ...interface{}) {
logger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func (logger *Logger)Panicln (v ...interface{}) {
s := fmt.Sprint(v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Panicf (format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
logger.Log(PriErr, s)
panic(s)
}
func (logger *Logger)Println (v ...interface{}) {
logger.Log(PriInfo, v...)
}
func (logger *Logger)Printf (format string, v ...interface{}) {
logger.Logf(PriInfo, format, v...)
}
func Fatalln (v ...interface{}) {
defaultLogger.Log(PriCrit, v...)
os.Exit(1)
}
func Fatalf (format string, v ...interface{}) {
defaultLogger.Logf(PriCrit, format, v...)
os.Exit(1)
}
func Panicln (v ...interface{}) {
s := fmt.Sprint(v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Panicf (format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
defaultLogger.Log(PriErr, s)
panic(s)
}
func Println (v ...interface{}) {
defaultLogger.Log(PriInfo, v...)
}
func Printf (format string, v ...interface{}) {
defaultLogger.Logf(PriInfo, format, v...)
}

View File

@@ -0,0 +1,69 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"os"
"path"
"runtime"
"strings"
"sync/atomic"
"time"
)
type Fields map[string]interface{}
func (logger *Logger) fieldValues() Fields {
now := time.Now()
fields := Fields{
"prefix": logger.prefix, // static field available to all sinks
"seq": logger.nextSeq(), // auto-incrementing sequence number
"start_time": logger.created, // start time of the logger
"time": now.Format(time.StampMilli), // formatted time of log entry
"full_time": now, // time of log entry
"rtime": time.Since(logger.created), // relative time of log entry since started
"pid": os.Getpid(), // process id
"executable": logger.executable, // executable filename
}
if logger.verbose {
setVerboseFields(fields)
}
return fields
}
func (logger *Logger) nextSeq() uint64 {
return atomic.AddUint64(&logger.seq, 1)
}
func setVerboseFields(fields Fields) {
callers := make([]uintptr, 10)
n := runtime.Callers(3, callers) // starts in (*Logger).Log or similar
callers = callers[:n]
for _, pc := range callers {
f := runtime.FuncForPC(pc)
if !strings.Contains(f.Name(), "logger.(*Logger)") {
fields["funcname"] = f.Name()
pathname, lineno := f.FileLine(pc)
fields["lineno"] = lineno
fields["pathname"] = pathname
fields["filename"] = path.Base(pathname)
return
}
}
}

View File

@@ -0,0 +1,72 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"bitbucket.org/kardianos/osext"
"os"
"path"
"time"
)
// Logger is user-immutable immutable struct which can log to several outputs
type Logger struct {
sinks []Sink // the sinks this logger will log to
verbose bool // gather expensive logging data?
prefix string // static field available to all log sinks under this logger
created time.Time // time when this logger was created
seq uint64 // sequential number of log message, starting at 1
executable string // executable name
}
// New creates a new Logger which logs to all the supplied sinks. The prefix
// argument is passed to all loggers under the field "prefix" with every log
// message. If verbose is true, more expensive runtime fields will be computed
// and passed to loggers. These fields are funcname, lineno, pathname, and
// filename.
func New(prefix string, verbose bool, sinks ...Sink) *Logger {
return &Logger{
sinks: sinks,
verbose: verbose,
prefix: prefix,
created: time.Now(),
seq: 0,
executable: getExecutableName(),
}
}
func getExecutableName() string {
executablePath, err := osext.Executable()
if err != nil {
return "(UNKNOWN)"
} else {
return path.Base(executablePath)
}
}
// NewSimple(sinks...) is equivalent to New("", false, sinks...)
func NewSimple(sinks ...Sink) *Logger {
return New("", false, sinks...)
}
var defaultLogger *Logger
func init() {
defaultLogger = NewSimple(CombinedSink(os.Stdout, BasicFormat, BasicFields))
}

View File

@@ -0,0 +1,54 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
type Priority int
const (
PriEmerg Priority = iota
PriAlert
PriCrit
PriErr
PriWarning
PriNotice
PriInfo
PriDebug
)
func (priority Priority) String() string {
switch priority {
case PriEmerg:
return "EMERGENCY"
case PriAlert:
return "ALERT"
case PriCrit:
return "CRITICAL"
case PriErr:
return "ERROR"
case PriWarning:
return "WARNING"
case PriNotice:
return "NOTICE"
case PriInfo:
return "INFO"
case PriDebug:
return "DEBUG"
default:
return "UNKNOWN"
}
}

View File

@@ -0,0 +1,97 @@
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"fmt"
"io"
"sync"
)
const AsyncBuffer = 100
type Sink interface {
Log(Fields)
}
type nullSink struct{}
func (sink *nullSink) Log(fields Fields) {}
func NullSink() Sink {
return &nullSink{}
}
type writerSink struct {
lock sync.Mutex
out io.Writer
format string
fields []string
}
func (sink *writerSink) Log(fields Fields) {
vals := make([]interface{}, len(sink.fields))
for i, field := range sink.fields {
var ok bool
vals[i], ok = fields[field]
if !ok {
vals[i] = "???"
}
}
sink.lock.Lock()
defer sink.lock.Unlock()
fmt.Fprintf(sink.out, sink.format, vals...)
}
func WriterSink(out io.Writer, format string, fields []string) Sink {
return &writerSink{
out: out,
format: format,
fields: fields,
}
}
type combinedSink struct {
sinks []Sink
}
func (sink *combinedSink) Log(fields Fields) {
for _, s := range sink.sinks {
s.Log(fields)
}
}
type priorityFilter struct {
priority Priority
target Sink
}
func (filter *priorityFilter) Log(fields Fields) {
// lower priority values indicate more important messages
if fields["priority"].(Priority) <= filter.priority {
filter.target.Log(fields)
}
}
func PriorityFilter(priority Priority, target Sink) Sink {
return &priorityFilter{
priority: priority,
target: target,
}
}

View File

@@ -0,0 +1,82 @@
// +build !windows
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"fmt"
"github.com/coreos/go-systemd/journal"
"io"
"strings"
)
type journalSink struct{}
func (sink *journalSink) Log(fields Fields) {
message := fields["message"].(string)
priority := toJournalPriority(fields["priority"].(Priority))
journalFields := make(map[string]string)
for k, v := range fields {
if k == "message" || k == "priority" {
continue
}
journalFields[strings.ToUpper(k)] = fmt.Sprint(v)
}
journal.Send(message, priority, journalFields)
}
func toJournalPriority(priority Priority) journal.Priority {
switch priority {
case PriEmerg:
return journal.PriEmerg
case PriAlert:
return journal.PriAlert
case PriCrit:
return journal.PriCrit
case PriErr:
return journal.PriErr
case PriWarning:
return journal.PriWarning
case PriNotice:
return journal.PriNotice
case PriInfo:
return journal.PriInfo
case PriDebug:
return journal.PriDebug
default:
return journal.PriErr
}
}
func JournalSink() Sink {
return &journalSink{}
}
func CombinedSink(writer io.Writer, format string, fields []string) Sink {
sinks := make([]Sink, 0)
sinks = append(sinks, WriterSink(writer, format, fields))
if journal.Enabled() {
sinks = append(sinks, JournalSink())
}
return &combinedSink{
sinks: sinks,
}
}

View File

@@ -0,0 +1,33 @@
// +build windows
package log
// Copyright 2013, CoreOS, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// author: David Fisher <ddf1991@gmail.com>
// based on previous package by: Cong Ding <dinggnu@gmail.com>
import (
"io"
)
func CombinedSink(writer io.Writer, format string, fields []string) Sink {
sinks := make([]Sink, 0)
sinks = append(sinks, WriterSink(writer, format, fields))
return &combinedSink{
sinks: sinks,
}
}

View File

@@ -0,0 +1,168 @@
/*
Copyright 2013 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package journal provides write bindings to the systemd journal
package journal
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"syscall"
)
// Priority of a journal message
type Priority int
const (
PriEmerg Priority = iota
PriAlert
PriCrit
PriErr
PriWarning
PriNotice
PriInfo
PriDebug
)
var conn net.Conn
func init() {
var err error
conn, err = net.Dial("unixgram", "/run/systemd/journal/socket")
if err != nil {
conn = nil
}
}
// Enabled returns true iff the systemd journal is available for logging
func Enabled() bool {
return conn != nil
}
// Send a message to the systemd journal. vars is a map of journald fields to
// values. Fields must be composed of uppercase letters, numbers, and
// underscores, but must not start with an underscore. Within these
// restrictions, any arbitrary field name may be used. Some names have special
// significance: see the journalctl documentation
// (http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html)
// for more details. vars may be nil.
func Send(message string, priority Priority, vars map[string]string) error {
if conn == nil {
return journalError("could not connect to journald socket")
}
data := new(bytes.Buffer)
appendVariable(data, "PRIORITY", strconv.Itoa(int(priority)))
appendVariable(data, "MESSAGE", message)
for k, v := range vars {
appendVariable(data, k, v)
}
_, err := io.Copy(conn, data)
if err != nil && isSocketSpaceError(err) {
file, err := tempFd()
if err != nil {
return journalError(err.Error())
}
_, err = io.Copy(file, data)
if err != nil {
return journalError(err.Error())
}
rights := syscall.UnixRights(int(file.Fd()))
/* this connection should always be a UnixConn, but better safe than sorry */
unixConn, ok := conn.(*net.UnixConn)
if !ok {
return journalError("can't send file through non-Unix connection")
}
unixConn.WriteMsgUnix([]byte{}, rights, nil)
} else if err != nil {
return journalError(err.Error())
}
return nil
}
func appendVariable(w io.Writer, name, value string) {
if !validVarName(name) {
journalError("variable name contains invalid character, ignoring")
}
if strings.ContainsRune(value, '\n') {
/* When the value contains a newline, we write:
* - the variable name, followed by a newline
* - the size (in 64bit little endian format)
* - the data, followed by a newline
*/
fmt.Fprintln(w, name)
binary.Write(w, binary.LittleEndian, uint64(len(value)))
fmt.Fprintln(w, value)
} else {
/* just write the variable and value all on one line */
fmt.Fprintf(w, "%s=%s\n", name, value)
}
}
func validVarName(name string) bool {
/* The variable name must be in uppercase and consist only of characters,
* numbers and underscores, and may not begin with an underscore. (from the docs)
*/
valid := name[0] != '_'
for _, c := range name {
valid = valid && ('A' <= c && c <= 'Z') || ('0' <= c && c <= '9') || c == '_'
}
return valid
}
func isSocketSpaceError(err error) bool {
opErr, ok := err.(*net.OpError)
if !ok {
return false
}
sysErr, ok := opErr.Err.(syscall.Errno)
if !ok {
return false
}
return sysErr == syscall.EMSGSIZE || sysErr == syscall.ENOBUFS
}
func tempFd() (*os.File, error) {
file, err := ioutil.TempFile("/dev/shm/", "journal.XXXXX")
if err != nil {
return nil, err
}
syscall.Unlink(file.Name())
if err != nil {
return nil, err
}
return file, nil
}
func journalError(s string) error {
s = "journal error: " + s
fmt.Fprintln(os.Stderr, s)
return errors.New(s)
}