|
|
|
@@ -153,13 +153,20 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
|
|
|
|
|
|
|
|
|
|
// Create implements storage.Interface.Create.
|
|
|
|
|
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
|
|
|
|
trace := utiltrace.New("Create etcd3",
|
|
|
|
|
utiltrace.Field{"key", key},
|
|
|
|
|
utiltrace.Field{"type", getTypeName(obj)},
|
|
|
|
|
)
|
|
|
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
|
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
|
|
|
|
return errors.New("resourceVersion should not be set on objects to be created")
|
|
|
|
|
}
|
|
|
|
|
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
|
|
|
|
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
trace.Step("About to Encode")
|
|
|
|
|
data, err := runtime.Encode(s.codec, obj)
|
|
|
|
|
trace.Step("Encode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@@ -171,6 +178,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(key))
|
|
|
|
|
trace.Step("TransformToStorage finished", utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return storage.NewInternalError(err.Error())
|
|
|
|
|
}
|
|
|
|
@@ -182,16 +190,20 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
|
|
|
|
clientv3.OpPut(key, string(newData), opts...),
|
|
|
|
|
).Commit()
|
|
|
|
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
|
|
|
|
trace.Step("Txn call finished", utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !txnResp.Succeeded {
|
|
|
|
|
return storage.NewKeyExistsError(key, 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if out != nil {
|
|
|
|
|
putResp := txnResp.Responses[0].GetResponsePut()
|
|
|
|
|
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
|
|
|
|
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
|
|
|
|
trace.Step("decode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -318,7 +330,9 @@ func (s *store) conditionalDelete(
|
|
|
|
|
func (s *store) GuaranteedUpdate(
|
|
|
|
|
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
|
|
|
|
|
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
|
|
|
|
|
trace := utiltrace.New("GuaranteedUpdate etcd3", utiltrace.Field{"type", getTypeName(destination)})
|
|
|
|
|
trace := utiltrace.New("GuaranteedUpdate etcd3",
|
|
|
|
|
utiltrace.Field{"key", key},
|
|
|
|
|
utiltrace.Field{"type", getTypeName(destination)})
|
|
|
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
v, err := conversion.EnforcePtr(destination)
|
|
|
|
@@ -397,7 +411,9 @@ func (s *store) GuaranteedUpdate(
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace.Step("About to Encode")
|
|
|
|
|
data, err := runtime.Encode(s.codec, ret)
|
|
|
|
|
trace.Step("Encode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@@ -423,6 +439,7 @@ func (s *store) GuaranteedUpdate(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newData, err := s.transformer.TransformToStorage(ctx, data, transformContext)
|
|
|
|
|
trace.Step("TransformToStorage finished", utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return storage.NewInternalError(err.Error())
|
|
|
|
|
}
|
|
|
|
@@ -442,6 +459,7 @@ func (s *store) GuaranteedUpdate(
|
|
|
|
|
clientv3.OpGet(key),
|
|
|
|
|
).Commit()
|
|
|
|
|
metrics.RecordEtcdRequestLatency("update", getTypeName(destination), startTime)
|
|
|
|
|
trace.Step("Txn call finished", utiltrace.Field{"err", err})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@@ -459,7 +477,9 @@ func (s *store) GuaranteedUpdate(
|
|
|
|
|
}
|
|
|
|
|
putResp := txnResp.Responses[0].GetResponsePut()
|
|
|
|
|
|
|
|
|
|
return decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
|
|
|
|
|
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
|
|
|
|
|
trace.Step("decode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|