Portworx Volume Driver in Kubernetes

- Add a new type PortworxVolumeSource
- Implement the kubernetes volume plugin for Portworx Volumes under pkg/volume/portworx
- The Portworx Volume Driver uses the libopenstorage/openstorage specifications and apis for volume operations.

Changes for k8s configuration and examples for portworx volumes.

- Add PortworxVolume hooks in kubectl, kube-controller-manager and validation.
- Add a README for PortworxVolume usage as PVs, PVCs and StorageClass.
- Add example spec files

Handle code review comments.

- Modified READMEs to incorporate to suggestions.
- Add a test for ReadWriteMany access mode.
- Use util.UnmountPath in TearDown.
- Add ReadOnly flag to PortworxVolumeSource
- Use hostname:port instead of unix sockets
- Delete the mount dir in TearDown.
- Fix link issue in persistentvolumes README
- In unit test check for mountpath after Setup is done.
- Add PVC Claim Name as a Portworx Volume Label

Generated code and documentation.
- Updated swagger spec
- Updated api-reference docs
- Updated generated code under pkg/api/v1

Godeps update for Portworx Volume Driver
- Adds github.com/libopenstorage/openstorage
- Adds go.pedge.io/pb/go/google/protobuf
- Updates Godep Licenses
This commit is contained in:
Aditya Dani
2016-12-19 23:17:11 +00:00
parent dba0af3675
commit 28df55fc31
84 changed files with 14212 additions and 3090 deletions

View File

@@ -0,0 +1,141 @@
package client
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"sync"
"time"
)
var (
httpCache = make(map[string]*http.Client)
cacheLock sync.Mutex
)
// NewClient returns a new REST client for specified server.
func NewClient(host string, version string) (*Client, error) {
baseURL, err := url.Parse(host)
if err != nil {
return nil, err
}
if baseURL.Path == "" {
baseURL.Path = "/"
}
unix2HTTP(baseURL)
c := &Client{
base: baseURL,
version: version,
httpClient: getHttpClient(host),
}
return c, nil
}
func GetUnixServerPath(socketName string, paths ...string) string {
serverPath := "unix://"
for _, path := range paths {
serverPath = serverPath + path
}
serverPath = serverPath + socketName + ".sock"
return serverPath
}
// Client is an HTTP REST wrapper. Use one of Get/Post/Put/Delete to get a request
// object.
type Client struct {
base *url.URL
version string
httpClient *http.Client
}
// Status sends a Status request at the /status REST endpoint.
func (c *Client) Status() (*Status, error) {
status := &Status{}
err := c.Get().UsePath("/status").Do().Unmarshal(status)
return status, err
}
// Version send a request at the /versions REST endpoint.
func (c *Client) Versions(endpoint string) ([]string, error) {
versions := []string{}
err := c.Get().Resource(endpoint + "/versions").Do().Unmarshal(&versions)
return versions, err
}
// Get returns a Request object setup for GET call.
func (c *Client) Get() *Request {
return NewRequest(c.httpClient, c.base, "GET", c.version)
}
// Post returns a Request object setup for POST call.
func (c *Client) Post() *Request {
return NewRequest(c.httpClient, c.base, "POST", c.version)
}
// Put returns a Request object setup for PUT call.
func (c *Client) Put() *Request {
return NewRequest(c.httpClient, c.base, "PUT", c.version)
}
// Put returns a Request object setup for DELETE call.
func (c *Client) Delete() *Request {
return NewRequest(c.httpClient, c.base, "DELETE", c.version)
}
func unix2HTTP(u *url.URL) {
if u.Scheme == "unix" {
// Override the main URL object so the HTTP lib won't complain
u.Scheme = "http"
u.Host = "unix.sock"
u.Path = ""
}
}
func newHTTPClient(u *url.URL, tlsConfig *tls.Config, timeout time.Duration) *http.Client {
httpTransport := &http.Transport{
TLSClientConfig: tlsConfig,
}
switch u.Scheme {
case "unix":
socketPath := u.Path
unixDial := func(proto, addr string) (net.Conn, error) {
ret, err := net.DialTimeout("unix", socketPath, timeout)
return ret, err
}
httpTransport.Dial = unixDial
unix2HTTP(u)
default:
httpTransport.Dial = func(proto, addr string) (net.Conn, error) {
return net.DialTimeout(proto, addr, timeout)
}
}
return &http.Client{Transport: httpTransport}
}
func getHttpClient(host string) *http.Client {
c, ok := httpCache[host]
if !ok {
cacheLock.Lock()
defer cacheLock.Unlock()
c, ok = httpCache[host]
if !ok {
u, err := url.Parse(host)
if err != nil {
// TODO(pedge): clean up
fmt.Println("Failed to parse into url", host)
return nil
}
if u.Path == "" {
u.Path = "/"
}
c = newHTTPClient(u, nil, 10*time.Second)
httpCache[host] = c
}
}
return c
}

View File

@@ -0,0 +1,304 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
)
// Request is contructed iteratively by the client and finally dispatched.
// A REST endpoint is accessed with the following convention:
// base_url/<version>/<resource>/[<instance>]
type Request struct {
client *http.Client
version string
verb string
path string
base *url.URL
params url.Values
headers http.Header
resource string
instance string
err error
body []byte
req *http.Request
resp *http.Response
timeout time.Duration
}
// Response is a representation of HTTP response received from the server.
type Response struct {
status string
statusCode int
err error
body []byte
}
// Status upon error, attempts to parse the body of a response into a meaningful status.
type Status struct {
Message string
ErrorCode int
}
// NewRequest instance
func NewRequest(client *http.Client, base *url.URL, verb string, version string) *Request {
return &Request{
client: client,
verb: verb,
base: base,
path: base.Path,
version: version,
}
}
func checkExists(mustExist string, before string) error {
if len(mustExist) == 0 {
return fmt.Errorf("%q should be set before setting %q", mustExist, before)
}
return nil
}
func checkSet(name string, s *string, newval string) error {
if len(*s) != 0 {
return fmt.Errorf("%q already set to %q, cannot change to %q",
name, *s, newval)
}
*s = newval
return nil
}
// Resource specifies the resource to be accessed.
func (r *Request) Resource(resource string) *Request {
if r.err == nil {
r.err = checkSet("resource", &r.resource, resource)
}
return r
}
// Instance specifies the instance of the resource to be accessed.
func (r *Request) Instance(instance string) *Request {
if r.err == nil {
r.err = checkExists("resource", "instance")
if r.err == nil {
r.err = checkSet("instance", &r.instance, instance)
}
}
return r
}
// UsePath use the specified path and don't build up a request.
func (r *Request) UsePath(path string) *Request {
if r.err == nil {
r.err = checkSet("path", &r.path, path)
}
return r
}
// QueryOption adds specified options to query.
func (r *Request) QueryOption(key string, value string) *Request {
if r.err != nil {
return r
}
if r.params == nil {
r.params = make(url.Values)
}
r.params.Add(string(key), value)
return r
}
// QueryOptionLabel adds specified label to query.
func (r *Request) QueryOptionLabel(key string, labels map[string]string) *Request {
if r.err != nil {
return r
}
if b, err := json.Marshal(labels); err != nil {
r.err = err
} else {
if r.params == nil {
r.params = make(url.Values)
}
r.params.Add(string(key), string(b))
}
return r
}
// SetHeader adds specified header values to query.
func (r *Request) SetHeader(key, value string) *Request {
if r.headers == nil {
r.headers = http.Header{}
}
r.headers.Set(key, value)
return r
}
// Timeout makes the request use the given duration as a timeout. Sets the "timeout"
// parameter.
func (r *Request) Timeout(d time.Duration) *Request {
if r.err != nil {
return r
}
r.timeout = d
return r
}
// Body sets the request Body.
func (r *Request) Body(v interface{}) *Request {
var err error
if r.err != nil {
return r
}
r.body, err = json.Marshal(v)
if err != nil {
r.err = err
return r
}
return r
}
// URL returns the current working URL.
func (r *Request) URL() *url.URL {
u := *r.base
p := r.path
if len(r.version) != 0 {
p = path.Join(p, strings.ToLower(r.version))
}
if len(r.resource) != 0 {
p = path.Join(p, strings.ToLower(r.resource))
if len(r.instance) != 0 {
p = path.Join(p, r.instance)
}
}
u.Path = p
query := url.Values{}
for key, values := range r.params {
for _, value := range values {
query.Add(key, value)
}
}
if r.timeout != 0 {
query.Set("timeout", r.timeout.String())
}
u.RawQuery = query.Encode()
return &u
}
// headerVal for key as an int. Return false if header is not present or valid.
func headerVal(key string, resp *http.Response) (int, bool) {
if h := resp.Header.Get(key); len(h) > 0 {
if i, err := strconv.Atoi(h); err == nil {
return i, true
}
}
return 0, false
}
func parseHTTPStatus(resp *http.Response, body []byte) error {
var (
status *Status
err error
)
httpOK := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
hasStatus := false
if body != nil {
err = json.Unmarshal(body, status)
if err == nil && status.Message != "" {
hasStatus = true
}
}
// If the status is NG, return an error regardless of HTTP status.
if hasStatus && status.ErrorCode != 0 {
return fmt.Errorf("Error %v : %v", status.ErrorCode, status.Message)
}
// Status is good and HTTP status is good, everything is good
if httpOK {
return nil
}
// If HTTP status is NG, return an error.
return fmt.Errorf("HTTP error %d", resp.StatusCode)
}
// Do executes the request and returns a Response.
func (r *Request) Do() *Response {
var (
err error
req *http.Request
resp *http.Response
url string
body []byte
)
if r.err != nil {
return &Response{err: r.err}
}
url = r.URL().String()
req, err = http.NewRequest(r.verb, url, bytes.NewBuffer(r.body))
if err != nil {
return &Response{err: err}
}
if r.headers == nil {
r.headers = http.Header{}
}
req.Header = r.headers
req.Header.Set("Content-Type", "application/json")
resp, err = r.client.Do(req)
if err != nil {
return &Response{err: err}
}
if resp.Body != nil {
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
}
if err != nil {
return &Response{err: err}
}
return &Response{
status: resp.Status,
statusCode: resp.StatusCode,
body: body,
err: parseHTTPStatus(resp, body),
}
}
// Body return http body, valid only if there is no error
func (r Response) Body() ([]byte, error) {
return r.body, r.err
}
// StatusCode HTTP status code returned.
func (r Response) StatusCode() int {
return r.statusCode
}
// Unmarshal result into obj
func (r Response) Unmarshal(v interface{}) error {
if r.err != nil {
return r.err
}
return json.Unmarshal(r.body, v)
}
// Error executing the request.
func (r Response) Error() error {
return r.err
}
func (r Response) FormatError() error {
if len(r.body) == 0 {
return fmt.Errorf("Error: %v", r.err)
} else {
return fmt.Errorf("HTTP-%d: %s", r.statusCode, string(r.body))
}
}

View File

@@ -0,0 +1,387 @@
package volume
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"strconv"
"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/api/client"
"github.com/libopenstorage/openstorage/volume"
)
const (
graphPath = "/graph"
volumePath = "/osd-volumes"
snapPath = "/osd-snapshot"
)
type volumeClient struct {
volume.IODriver
c *client.Client
}
func newVolumeClient(c *client.Client) volume.VolumeDriver {
return &volumeClient{volume.IONotSupported, c}
}
// String description of this driver.
func (v *volumeClient) Name() string {
return "VolumeDriver"
}
func (v *volumeClient) Type() api.DriverType {
// Block drivers implement the superset.
return api.DriverType_DRIVER_TYPE_BLOCK
}
func (v *volumeClient) GraphDriverCreate(id string, parent string) error {
response := ""
if err := v.c.Put().Resource(graphPath + "/create").Instance(id).Do().Unmarshal(&response); err != nil {
return err
}
if response != id {
return fmt.Errorf("Invalid response: %s", response)
}
return nil
}
func (v *volumeClient) GraphDriverRemove(id string) error {
response := ""
if err := v.c.Put().Resource(graphPath + "/remove").Instance(id).Do().Unmarshal(&response); err != nil {
return err
}
if response != id {
return fmt.Errorf("Invalid response: %s", response)
}
return nil
}
func (v *volumeClient) GraphDriverGet(id string, mountLabel string) (string, error) {
response := ""
if err := v.c.Get().Resource(graphPath + "/inspect").Instance(id).Do().Unmarshal(&response); err != nil {
return "", err
}
return response, nil
}
func (v *volumeClient) GraphDriverRelease(id string) error {
response := ""
if err := v.c.Put().Resource(graphPath + "/release").Instance(id).Do().Unmarshal(&response); err != nil {
return err
}
if response != id {
return fmt.Errorf("Invalid response: %v", response)
}
return nil
}
func (v *volumeClient) GraphDriverExists(id string) bool {
response := false
v.c.Get().Resource(graphPath + "/exists").Instance(id).Do().Unmarshal(&response)
return response
}
func (v *volumeClient) GraphDriverDiff(id string, parent string) io.Writer {
body, _ := v.c.Get().Resource(graphPath + "/diff?id=" + id + "&parent=" + parent).Do().Body()
return bytes.NewBuffer(body)
}
func (v *volumeClient) GraphDriverChanges(id string, parent string) ([]api.GraphDriverChanges, error) {
var changes []api.GraphDriverChanges
err := v.c.Get().Resource(graphPath + "/changes").Instance(id).Do().Unmarshal(&changes)
return changes, err
}
func (v *volumeClient) GraphDriverApplyDiff(id string, parent string, diff io.Reader) (int, error) {
b, err := ioutil.ReadAll(diff)
if err != nil {
return 0, err
}
response := 0
if err = v.c.Put().Resource(graphPath + "/diff?id=" + id + "&parent=" + parent).Instance(id).Body(b).Do().Unmarshal(&response); err != nil {
return 0, err
}
return response, nil
}
func (v *volumeClient) GraphDriverDiffSize(id string, parent string) (int, error) {
size := 0
err := v.c.Get().Resource(graphPath + "/diffsize").Instance(id).Do().Unmarshal(&size)
return size, err
}
// Create a new Vol for the specific volume spev.c.
// It returns a system generated VolumeID that uniquely identifies the volume
func (v *volumeClient) Create(locator *api.VolumeLocator, source *api.Source,
spec *api.VolumeSpec) (string, error) {
response := &api.VolumeCreateResponse{}
request := &api.VolumeCreateRequest{
Locator: locator,
Source: source,
Spec: spec,
}
if err := v.c.Post().Resource(volumePath).Body(request).Do().Unmarshal(response); err != nil {
return "", err
}
if response.VolumeResponse != nil && response.VolumeResponse.Error != "" {
return "", errors.New(response.VolumeResponse.Error)
}
return response.Id, nil
}
// Status diagnostic information
func (v *volumeClient) Status() [][2]string {
return [][2]string{}
}
// Inspect specified volumes.
// Errors ErrEnoEnt may be returned.
func (v *volumeClient) Inspect(ids []string) ([]*api.Volume, error) {
if len(ids) == 0 {
return nil, nil
}
var volumes []*api.Volume
request := v.c.Get().Resource(volumePath)
for _, id := range ids {
request.QueryOption(api.OptVolumeID, id)
}
if err := request.Do().Unmarshal(&volumes); err != nil {
return nil, err
}
return volumes, nil
}
// Delete volume.
// Errors ErrEnoEnt, ErrVolHasSnaps may be returned.
func (v *volumeClient) Delete(volumeID string) error {
response := &api.VolumeResponse{}
if err := v.c.Delete().Resource(volumePath).Instance(volumeID).Do().Unmarshal(response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// Snap specified volume. IO to the underlying volume should be quiesced before
// calling this function.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Snapshot(volumeID string, readonly bool,
locator *api.VolumeLocator) (string, error) {
response := &api.SnapCreateResponse{}
request := &api.SnapCreateRequest{
Id: volumeID,
Readonly: readonly,
Locator: locator,
}
if err := v.c.Post().Resource(snapPath).Body(request).Do().Unmarshal(response); err != nil {
return "", err
}
// TODO(pedge): this probably should not be embedded in this way
if response.VolumeCreateResponse != nil &&
response.VolumeCreateResponse.VolumeResponse != nil &&
response.VolumeCreateResponse.VolumeResponse.Error != "" {
return "", errors.New(
response.VolumeCreateResponse.VolumeResponse.Error)
}
if response.VolumeCreateResponse != nil {
return response.VolumeCreateResponse.Id, nil
}
return "", nil
}
// Stats for specified volume.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Stats(
volumeID string,
cumulative bool,
) (*api.Stats, error) {
stats := &api.Stats{}
req := v.c.Get().Resource(volumePath + "/stats").Instance(volumeID)
req.QueryOption(api.OptCumulative, strconv.FormatBool(cumulative))
if err := req.Do().Unmarshal(stats); err != nil {
return nil, err
}
return stats, nil
}
// Alerts on this volume.
// Errors ErrEnoEnt may be returned
func (v *volumeClient) Alerts(volumeID string) (*api.Alerts, error) {
alerts := &api.Alerts{}
if err := v.c.Get().Resource(volumePath + "/alerts").Instance(volumeID).Do().Unmarshal(alerts); err != nil {
return nil, err
}
return alerts, nil
}
// Active Requests on all volume.
func (v *volumeClient) GetActiveRequests() (*api.ActiveRequests, error) {
requests := &api.ActiveRequests{}
resp := v.c.Get().Resource(volumePath + "/requests").Instance("vol_id").Do()
if resp.Error() != nil {
return nil, resp.FormatError()
}
if err := resp.Unmarshal(requests); err != nil {
return nil, err
}
return requests, nil
}
// Shutdown and cleanup.
func (v *volumeClient) Shutdown() {}
// Enumerate volumes that map to the volumeLocator. Locator fields may be regexp.
// If locator fields are left blank, this will return all volumes.
func (v *volumeClient) Enumerate(locator *api.VolumeLocator,
labels map[string]string) ([]*api.Volume, error) {
var volumes []*api.Volume
req := v.c.Get().Resource(volumePath)
if locator.Name != "" {
req.QueryOption(api.OptName, locator.Name)
}
if len(locator.VolumeLabels) != 0 {
req.QueryOptionLabel(api.OptLabel, locator.VolumeLabels)
}
if len(labels) != 0 {
req.QueryOptionLabel(api.OptConfigLabel, labels)
}
resp := req.Do()
if resp.Error() != nil {
return nil, resp.FormatError()
}
if err := resp.Unmarshal(&volumes); err != nil {
return nil, err
}
return volumes, nil
}
// Enumerate snaps for specified volume
// Count indicates the number of snaps populated.
func (v *volumeClient) SnapEnumerate(ids []string,
snapLabels map[string]string) ([]*api.Volume, error) {
var volumes []*api.Volume
request := v.c.Get().Resource(snapPath)
for _, id := range ids {
request.QueryOption(api.OptVolumeID, id)
}
if len(snapLabels) != 0 {
request.QueryOptionLabel(api.OptLabel, snapLabels)
}
if err := request.Do().Unmarshal(&volumes); err != nil {
return nil, err
}
return volumes, nil
}
// Attach map device to the host.
// On success the devicePath specifies location where the device is exported
// Errors ErrEnoEnt, ErrVolAttached may be returned.
func (v *volumeClient) Attach(volumeID string) (string, error) {
response, err := v.doVolumeSetGetResponse(
volumeID,
&api.VolumeSetRequest{
Action: &api.VolumeStateAction{
Attach: api.VolumeActionParam_VOLUME_ACTION_PARAM_ON,
},
},
)
if err != nil {
return "", err
}
if response.Volume != nil {
if response.Volume.Spec.Encrypted {
return response.Volume.SecureDevicePath, nil
} else {
return response.Volume.DevicePath, nil
}
}
return "", nil
}
// Detach device from the host.
// Errors ErrEnoEnt, ErrVolDetached may be returned.
func (v *volumeClient) Detach(volumeID string) error {
return v.doVolumeSet(
volumeID,
&api.VolumeSetRequest{
Action: &api.VolumeStateAction{
Attach: api.VolumeActionParam_VOLUME_ACTION_PARAM_OFF,
},
},
)
}
func (v *volumeClient) MountedAt(mountPath string) string {
return ""
}
// Mount volume at specified path
// Errors ErrEnoEnt, ErrVolDetached may be returned.
func (v *volumeClient) Mount(volumeID string, mountPath string) error {
return v.doVolumeSet(
volumeID,
&api.VolumeSetRequest{
Action: &api.VolumeStateAction{
Mount: api.VolumeActionParam_VOLUME_ACTION_PARAM_ON,
MountPath: mountPath,
},
},
)
}
// Unmount volume at specified path
// Errors ErrEnoEnt, ErrVolDetached may be returned.
func (v *volumeClient) Unmount(volumeID string, mountPath string) error {
return v.doVolumeSet(
volumeID,
&api.VolumeSetRequest{
Action: &api.VolumeStateAction{
Mount: api.VolumeActionParam_VOLUME_ACTION_PARAM_OFF,
MountPath: mountPath,
},
},
)
}
// Update volume
func (v *volumeClient) Set(volumeID string, locator *api.VolumeLocator,
spec *api.VolumeSpec) error {
return v.doVolumeSet(
volumeID,
&api.VolumeSetRequest{
Locator: locator,
Spec: spec,
},
)
}
func (v *volumeClient) doVolumeSet(volumeID string,
request *api.VolumeSetRequest) error {
_, err := v.doVolumeSetGetResponse(volumeID, request)
return err
}
func (v *volumeClient) doVolumeSetGetResponse(volumeID string,
request *api.VolumeSetRequest) (*api.VolumeSetResponse, error) {
response := &api.VolumeSetResponse{}
if err := v.c.Put().Resource(volumePath).Instance(volumeID).Body(request).Do().Unmarshal(response); err != nil {
return nil, err
}
if response.VolumeResponse != nil && response.VolumeResponse.Error != "" {
return nil, errors.New(response.VolumeResponse.Error)
}
return response, nil
}

View File

@@ -0,0 +1,50 @@
package volume
import (
"fmt"
"github.com/libopenstorage/openstorage/api/client"
"github.com/libopenstorage/openstorage/volume"
"github.com/libopenstorage/openstorage/api"
)
// VolumeDriver returns a REST wrapper for the VolumeDriver interface.
func VolumeDriver(c *client.Client) volume.VolumeDriver {
return newVolumeClient(c)
}
// NewDriver returns a new REST client of the supplied version for specified driver.
// host: REST endpoint [http://<ip>:<port> OR unix://<path-to-unix-socket>]. default: [unix:///var/lib/osd/<driverName>.sock]
// version: Volume API version
func NewDriverClient(host, driverName, version string) (*client.Client, error) {
if driverName == "" {
return nil, fmt.Errorf("Driver Name cannot be empty")
}
if host == "" {
host = client.GetUnixServerPath(driverName, volume.DriverAPIBase)
}
if version == "" {
// Set the default version
version = volume.APIVersion
}
return client.NewClient(host, version)
}
// GetSupportedDriverVersions returns a list of supported versions
// for the provided driver. It uses the given server endpoint or the
// standard unix domain socket
func GetSupportedDriverVersions(driverName, host string) ([]string, error) {
// Get a client handler
if host == "" {
host = client.GetUnixServerPath(driverName, volume.DriverAPIBase)
}
client, err := client.NewClient(host, "")
if err != nil {
return []string{}, err
}
versions, err := client.Versions(api.OsdVolumePath)
if err != nil {
return []string{}, err
}
return versions, nil
}