Merge pull request #821 from stevvooe/content-unify-provider-ingester
content: unify provider and ingester
This commit is contained in:
commit
b53105ed25
File diff suppressed because it is too large
Load Diff
@ -14,6 +14,14 @@ service Content {
|
|||||||
// existence.
|
// existence.
|
||||||
rpc Info(InfoRequest) returns (InfoResponse);
|
rpc Info(InfoRequest) returns (InfoResponse);
|
||||||
|
|
||||||
|
// List streams the entire set of content as Info objects and closes the
|
||||||
|
// stream.
|
||||||
|
//
|
||||||
|
// Typically, this will yield a large response, chunked into messages.
|
||||||
|
// Clients should make provisions to ensure they can handle the entire data
|
||||||
|
// set.
|
||||||
|
rpc List(ListContentRequest) returns (stream ListContentResponse);
|
||||||
|
|
||||||
// Delete will delete the referenced object.
|
// Delete will delete the referenced object.
|
||||||
rpc Delete(DeleteContentRequest) returns (google.protobuf.Empty);
|
rpc Delete(DeleteContentRequest) returns (google.protobuf.Empty);
|
||||||
|
|
||||||
@ -25,9 +33,10 @@ service Content {
|
|||||||
// Status returns the status of ongoing object ingestions, started via
|
// Status returns the status of ongoing object ingestions, started via
|
||||||
// Write.
|
// Write.
|
||||||
//
|
//
|
||||||
// For active ingestions, the status will be streamed until the client
|
// Only those matching the regular expression will be provided in the
|
||||||
// closes the connection or all matched ingestions are committed.
|
// response. If the provided regular expression is empty, all ingestions
|
||||||
rpc Status(StatusRequest) returns (stream StatusResponse);
|
// will be provided.
|
||||||
|
rpc Status(StatusRequest) returns (StatusResponse);
|
||||||
|
|
||||||
// Write begins or resumes writes to a resource identified by a unique ref.
|
// Write begins or resumes writes to a resource identified by a unique ref.
|
||||||
// Only one active stream may exist at a time for each ref.
|
// Only one active stream may exist at a time for each ref.
|
||||||
@ -46,13 +55,13 @@ service Content {
|
|||||||
// When completed, the commit flag should be set to true. If expected size
|
// When completed, the commit flag should be set to true. If expected size
|
||||||
// or digest is set, the content will be validated against those values.
|
// or digest is set, the content will be validated against those values.
|
||||||
rpc Write(stream WriteRequest) returns (stream WriteResponse);
|
rpc Write(stream WriteRequest) returns (stream WriteResponse);
|
||||||
|
|
||||||
|
// Abort cancels the ongoing write named in the request. Any resources
|
||||||
|
// associated with the write will be collected.
|
||||||
|
rpc Abort(AbortRequest) returns (google.protobuf.Empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
message InfoRequest {
|
message Info {
|
||||||
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
|
||||||
}
|
|
||||||
|
|
||||||
message InfoResponse {
|
|
||||||
// Digest is the hash identity of the blob.
|
// Digest is the hash identity of the blob.
|
||||||
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
|
|
||||||
@ -63,6 +72,20 @@ message InfoResponse {
|
|||||||
google.protobuf.Timestamp committed_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
google.protobuf.Timestamp committed_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message InfoRequest {
|
||||||
|
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
|
message InfoResponse {
|
||||||
|
Info info = 1 [(gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListContentRequest {}
|
||||||
|
|
||||||
|
message ListContentResponse {
|
||||||
|
repeated Info info = 1 [(gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
message DeleteContentRequest {
|
message DeleteContentRequest {
|
||||||
// Digest specifies which content to delete.
|
// Digest specifies which content to delete.
|
||||||
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
string digest = 1 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
@ -90,6 +113,22 @@ message ReadResponse {
|
|||||||
bytes data = 2; // actual data
|
bytes data = 2; // actual data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message StatusRequest {
|
||||||
|
string regexp = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Status {
|
||||||
|
google.protobuf.Timestamp started_at = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
|
google.protobuf.Timestamp updated_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
|
string ref = 3;
|
||||||
|
int64 offset = 4;
|
||||||
|
int64 total = 5;
|
||||||
|
string expected = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
|
message StatusResponse {
|
||||||
|
repeated Status statuses = 1 [(gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
// WriteAction defines the behavior of a WriteRequest.
|
// WriteAction defines the behavior of a WriteRequest.
|
||||||
enum WriteAction {
|
enum WriteAction {
|
||||||
@ -116,12 +155,6 @@ enum WriteAction {
|
|||||||
//
|
//
|
||||||
// This action will always terminate the write.
|
// This action will always terminate the write.
|
||||||
COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"];
|
COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"];
|
||||||
|
|
||||||
// WriteActionAbort will release any resources associated with the write
|
|
||||||
// and free up the ref for a completely new set of writes.
|
|
||||||
//
|
|
||||||
// This action will always terminate the write.
|
|
||||||
ABORT = -1 [(gogoproto.enumvalue_customname) = "WriteActionAbort"];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteRequest writes data to the request ref at offset.
|
// WriteRequest writes data to the request ref at offset.
|
||||||
@ -213,20 +246,10 @@ message WriteResponse {
|
|||||||
|
|
||||||
// Digest, if present, includes the digest up to the currently committed
|
// Digest, if present, includes the digest up to the currently committed
|
||||||
// bytes. If action is commit, this field will be set. It is implementation
|
// bytes. If action is commit, this field will be set. It is implementation
|
||||||
// defined if this is set for other actions, except abort. On abort, this
|
// defined if this is set for other actions.
|
||||||
// will be empty.
|
|
||||||
string digest = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
string digest = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message StatusRequest {
|
message AbortRequest {
|
||||||
repeated string refs = 1;
|
string ref = 1;
|
||||||
repeated string prefix = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message StatusResponse {
|
|
||||||
google.protobuf.Timestamp started_at = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
|
||||||
google.protobuf.Timestamp updated_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
|
||||||
string ref = 3;
|
|
||||||
int64 offset = 4;
|
|
||||||
int64 total = 5;
|
|
||||||
}
|
}
|
||||||
|
@ -3503,7 +3503,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorExecution = []byte{
|
var fileDescriptorExecution = []byte{
|
||||||
// 854 bytes of a gzipped FileDescriptorProto
|
// 854 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x55, 0x4f, 0x8f, 0xdb, 0x44,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x4f, 0x8f, 0xdb, 0x44,
|
||||||
0x14, 0x5f, 0xe7, 0x8f, 0x37, 0x7d, 0xd9, 0x94, 0x32, 0x5a, 0xad, 0x8c, 0x91, 0x92, 0xc8, 0xb4,
|
0x14, 0x5f, 0xe7, 0x8f, 0x37, 0x7d, 0xd9, 0x94, 0x32, 0x5a, 0xad, 0x8c, 0x91, 0x92, 0xc8, 0xb4,
|
||||||
0x25, 0x5c, 0x1c, 0x58, 0x6e, 0x08, 0x90, 0xb2, 0xd9, 0xa8, 0xaa, 0x4a, 0xe9, 0xe2, 0x20, 0x71,
|
0x25, 0x5c, 0x1c, 0x58, 0x6e, 0x08, 0x90, 0xb2, 0xd9, 0xa8, 0xaa, 0x4a, 0xe9, 0xe2, 0x20, 0x71,
|
||||||
0xac, 0xbc, 0xf1, 0x6c, 0x32, 0x92, 0xe3, 0x31, 0x9e, 0xf1, 0xb2, 0xb9, 0xc1, 0x9d, 0x03, 0x5f,
|
0xac, 0xbc, 0xf1, 0x6c, 0x32, 0x92, 0xe3, 0x31, 0x9e, 0xf1, 0xb2, 0xb9, 0xc1, 0x9d, 0x03, 0x5f,
|
||||||
|
@ -1335,7 +1335,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorImages = []byte{
|
var fileDescriptorImages = []byte{
|
||||||
// 419 bytes of a gzipped FileDescriptorProto
|
// 419 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x52, 0x4d, 0x6f, 0xd3, 0x40,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x4d, 0x6f, 0xd3, 0x40,
|
||||||
0x10, 0xcd, 0x36, 0xa9, 0x25, 0xc6, 0xe4, 0xb2, 0xaa, 0x90, 0x71, 0x91, 0x6b, 0x99, 0x4b, 0xc5,
|
0x10, 0xcd, 0x36, 0xa9, 0x25, 0xc6, 0xe4, 0xb2, 0xaa, 0x90, 0x71, 0x91, 0x6b, 0x99, 0x4b, 0xc5,
|
||||||
0x61, 0x0d, 0xe6, 0x02, 0x52, 0x29, 0x22, 0x2a, 0x54, 0x48, 0x1c, 0x2a, 0x1f, 0xb9, 0x39, 0xee,
|
0x61, 0x0d, 0xe6, 0x02, 0x52, 0x29, 0x22, 0x2a, 0x54, 0x48, 0x1c, 0x2a, 0x1f, 0xb9, 0x39, 0xee,
|
||||||
0x60, 0x2c, 0xd5, 0x5e, 0xe3, 0x5d, 0x57, 0xca, 0x0d, 0xfe, 0x5d, 0x8e, 0x1c, 0x39, 0x21, 0xe2,
|
0x60, 0x2c, 0xd5, 0x5e, 0xe3, 0x5d, 0x57, 0xca, 0x0d, 0xfe, 0x5d, 0x8e, 0x1c, 0x39, 0x21, 0xe2,
|
||||||
|
@ -1103,7 +1103,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorRootfs = []byte{
|
var fileDescriptorRootfs = []byte{
|
||||||
// 428 bytes of a gzipped FileDescriptorProto
|
// 428 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x52, 0x4d, 0xab, 0xd3, 0x40,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x52, 0x4d, 0xab, 0xd3, 0x40,
|
||||||
0x14, 0xed, 0xf8, 0x24, 0xad, 0x23, 0x7d, 0x8b, 0xc1, 0x45, 0x08, 0x9a, 0x94, 0xb8, 0x29, 0x82,
|
0x14, 0xed, 0xf8, 0x24, 0xad, 0x23, 0x7d, 0x8b, 0xc1, 0x45, 0x08, 0x9a, 0x94, 0xb8, 0x29, 0x82,
|
||||||
0x09, 0xd6, 0x85, 0x1b, 0x5d, 0xf8, 0x5e, 0x2c, 0xbe, 0x85, 0x20, 0x11, 0xd1, 0x9d, 0x4c, 0x93,
|
0x09, 0xd6, 0x85, 0x1b, 0x5d, 0xf8, 0x5e, 0x2c, 0xbe, 0x85, 0x20, 0x11, 0xd1, 0x9d, 0x4c, 0x93,
|
||||||
0x31, 0x1d, 0x6c, 0xe7, 0x8e, 0x33, 0xd3, 0x42, 0x77, 0xfe, 0x0e, 0x7f, 0x51, 0x97, 0x2e, 0x45,
|
0x31, 0x1d, 0x6c, 0xe7, 0x8e, 0x33, 0xd3, 0x42, 0x77, 0xfe, 0x0e, 0x7f, 0x51, 0x97, 0x2e, 0x45,
|
||||||
|
@ -3468,7 +3468,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorShim = []byte{
|
var fileDescriptorShim = []byte{
|
||||||
// 913 bytes of a gzipped FileDescriptorProto
|
// 913 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x55, 0x4f, 0x6f, 0xe3, 0x44,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x4f, 0x6f, 0xe3, 0x44,
|
||||||
0x14, 0xaf, 0x93, 0xd4, 0x4d, 0x5f, 0xd6, 0x5d, 0x18, 0x55, 0x95, 0x37, 0x45, 0x49, 0xb1, 0x84,
|
0x14, 0xaf, 0x93, 0xd4, 0x4d, 0x5f, 0xd6, 0x5d, 0x18, 0x55, 0x95, 0x37, 0x45, 0x49, 0xb1, 0x84,
|
||||||
0xc8, 0x2e, 0x92, 0x03, 0xd9, 0x1b, 0x82, 0x43, 0xbb, 0x2d, 0x62, 0x61, 0x91, 0xa2, 0xe9, 0xde,
|
0xc8, 0x2e, 0x92, 0x03, 0xd9, 0x1b, 0x82, 0x43, 0xbb, 0x2d, 0x62, 0x61, 0x91, 0xa2, 0xe9, 0xde,
|
||||||
0x90, 0xa8, 0xdc, 0x78, 0x9a, 0x8c, 0x64, 0x7b, 0x8c, 0x67, 0x5c, 0x36, 0x37, 0xce, 0x9c, 0xb8,
|
0x90, 0xa8, 0xdc, 0x78, 0x9a, 0x8c, 0x64, 0x7b, 0x8c, 0x67, 0x5c, 0x36, 0x37, 0xce, 0x9c, 0xb8,
|
||||||
|
@ -1364,7 +1364,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorContainer = []byte{
|
var fileDescriptorContainer = []byte{
|
||||||
// 576 bytes of a gzipped FileDescriptorProto
|
// 576 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x52, 0xcf, 0x6e, 0xd3, 0x4e,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xcf, 0x6e, 0xd3, 0x4e,
|
||||||
0x18, 0xcc, 0xda, 0xce, 0xbf, 0x2f, 0x6a, 0x7e, 0xd6, 0xfe, 0x10, 0x32, 0x41, 0x4a, 0xa2, 0x08,
|
0x18, 0xcc, 0xda, 0xce, 0xbf, 0x2f, 0x6a, 0x7e, 0xd6, 0xfe, 0x10, 0x32, 0x41, 0x4a, 0xa2, 0x08,
|
||||||
0x09, 0x0b, 0x09, 0x47, 0xa4, 0x17, 0xae, 0x6e, 0x6d, 0x95, 0x0a, 0xe1, 0x84, 0x8d, 0x23, 0x7a,
|
0x09, 0x0b, 0x09, 0x47, 0xa4, 0x17, 0xae, 0x6e, 0x6d, 0x95, 0x0a, 0xe1, 0x84, 0x8d, 0x23, 0x7a,
|
||||||
0x8b, 0xdc, 0x78, 0x31, 0x8b, 0x1a, 0xdb, 0xb2, 0xd7, 0x85, 0xde, 0x78, 0x04, 0x9e, 0x81, 0xa7,
|
0x8b, 0xdc, 0x78, 0x31, 0x8b, 0x1a, 0xdb, 0xb2, 0xd7, 0x85, 0xde, 0x78, 0x04, 0x9e, 0x81, 0xa7,
|
||||||
|
@ -404,7 +404,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorDescriptor = []byte{
|
var fileDescriptorDescriptor = []byte{
|
||||||
// 229 bytes of a gzipped FileDescriptorProto
|
// 229 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x72, 0x4b, 0xcf, 0x2c, 0xc9,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x72, 0x4b, 0xcf, 0x2c, 0xc9,
|
||||||
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d,
|
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d,
|
||||||
0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xa7, 0xa4, 0x16,
|
0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xa7, 0xa4, 0x16,
|
||||||
0x27, 0x17, 0x65, 0x16, 0x94, 0xe4, 0x17, 0x21, 0x31, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85,
|
0x27, 0x17, 0x65, 0x16, 0x94, 0xe4, 0x17, 0x21, 0x31, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85,
|
||||||
|
@ -473,7 +473,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorMount = []byte{
|
var fileDescriptorMount = []byte{
|
||||||
// 197 bytes of a gzipped FileDescriptorProto
|
// 197 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xb2, 0x49, 0xcf, 0x2c, 0xc9,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x49, 0xcf, 0x2c, 0xc9,
|
||||||
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d,
|
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d,
|
||||||
0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xe7, 0xe6, 0x97,
|
0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xe7, 0xe6, 0x97,
|
||||||
0xe6, 0x95, 0x40, 0x48, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x61, 0x84, 0x3a, 0xbd, 0x32,
|
0xe6, 0x95, 0x40, 0x48, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x61, 0x84, 0x3a, 0xbd, 0x32,
|
||||||
|
@ -242,7 +242,7 @@ func serveDebugAPI() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveContentStore() (*content.Store, error) {
|
func resolveContentStore() (content.Store, error) {
|
||||||
cp := filepath.Join(conf.Root, "content")
|
cp := filepath.Join(conf.Root, "content")
|
||||||
return content.NewStore(cp)
|
return content.NewStore(cp)
|
||||||
}
|
}
|
||||||
@ -315,7 +315,7 @@ func loadMonitor() (plugin.ContainerMonitor, error) {
|
|||||||
return plugin.NewMultiContainerMonitor(monitors...), nil
|
return plugin.NewMultiContainerMonitor(monitors...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadSnapshotter(store *content.Store) (snapshot.Snapshotter, error) {
|
func loadSnapshotter(store content.Store) (snapshot.Snapshotter, error) {
|
||||||
for name, sr := range plugin.Registrations() {
|
for name, sr := range plugin.Registrations() {
|
||||||
if sr.Type != plugin.SnapshotPlugin {
|
if sr.Type != plugin.SnapshotPlugin {
|
||||||
continue
|
continue
|
||||||
@ -356,7 +356,7 @@ func newGRPCServer() *grpc.Server {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) {
|
func loadServices(runtimes map[string]containerd.Runtime, store content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) {
|
||||||
var o []plugin.Service
|
var o []plugin.Service
|
||||||
for name, sr := range plugin.Registrations() {
|
for name, sr := range plugin.Registrations() {
|
||||||
if sr.Type != plugin.GRPCPlugin {
|
if sr.Type != plugin.GRPCPlugin {
|
||||||
|
@ -78,7 +78,7 @@ var runCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
provider, err := getContentProvider(context)
|
content, err := getContentStore(context)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -102,7 +102,7 @@ var runCommand = cli.Command{
|
|||||||
}
|
}
|
||||||
// let's close out our db and tx so we don't hold the lock whilst running.
|
// let's close out our db and tx so we don't hold the lock whilst running.
|
||||||
|
|
||||||
diffIDs, err := image.RootFS(ctx, provider)
|
diffIDs, err := image.RootFS(ctx, content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -123,13 +123,13 @@ var runCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ic, err := image.Config(ctx, provider)
|
ic, err := image.Config(ctx, content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
switch ic.MediaType {
|
switch ic.MediaType {
|
||||||
case ocispec.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
|
case ocispec.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
|
||||||
r, err := provider.Reader(ctx, ic.Digest)
|
r, err := content.Reader(ctx, ic.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -35,12 +35,12 @@ func getExecutionService(context *cli.Context) (execution.ContainerServiceClient
|
|||||||
return execution.NewContainerServiceClient(conn), nil
|
return execution.NewContainerServiceClient(conn), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getContentProvider(context *cli.Context) (content.Provider, error) {
|
func getContentStore(context *cli.Context) (content.Store, error) {
|
||||||
conn, err := getGRPCConnection(context)
|
conn, err := getGRPCConnection(context)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), nil
|
return contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) {
|
func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) {
|
||||||
|
11
cmd/dist/active.go
vendored
11
cmd/dist/active.go
vendored
@ -13,7 +13,7 @@ import (
|
|||||||
var activeCommand = cli.Command{
|
var activeCommand = cli.Command{
|
||||||
Name: "active",
|
Name: "active",
|
||||||
Usage: "display active transfers.",
|
Usage: "display active transfers.",
|
||||||
ArgsUsage: "[flags] [<key>, ...]",
|
ArgsUsage: "[flags] [<regexp>]",
|
||||||
Description: `Display the ongoing transfers.`,
|
Description: `Display the ongoing transfers.`,
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
cli.DurationFlag{
|
cli.DurationFlag{
|
||||||
@ -28,12 +28,19 @@ var activeCommand = cli.Command{
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Action: func(context *cli.Context) error {
|
Action: func(context *cli.Context) error {
|
||||||
|
var (
|
||||||
|
match = context.Args().First()
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := appContext()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
cs, err := resolveContentStore(context)
|
cs, err := resolveContentStore(context)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
active, err := cs.Active()
|
active, err := cs.Status(ctx, match)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
11
cmd/dist/common.go
vendored
11
cmd/dist/common.go
vendored
@ -12,11 +12,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/console"
|
"github.com/containerd/console"
|
||||||
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
imagesapi "github.com/containerd/containerd/api/services/images"
|
imagesapi "github.com/containerd/containerd/api/services/images"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
"github.com/containerd/containerd/remotes/docker"
|
"github.com/containerd/containerd/remotes/docker"
|
||||||
|
contentservice "github.com/containerd/containerd/services/content"
|
||||||
imagesservice "github.com/containerd/containerd/services/images"
|
imagesservice "github.com/containerd/containerd/services/images"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
@ -42,7 +44,7 @@ var registryFlags = []cli.Flag{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveContentStore(context *cli.Context) (*content.Store, error) {
|
func resolveContentStore(context *cli.Context) (content.Store, error) {
|
||||||
root := filepath.Join(context.GlobalString("root"), "content")
|
root := filepath.Join(context.GlobalString("root"), "content")
|
||||||
if !filepath.IsAbs(root) {
|
if !filepath.IsAbs(root) {
|
||||||
var err error
|
var err error
|
||||||
@ -51,7 +53,12 @@ func resolveContentStore(context *cli.Context) (*content.Store, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return content.NewStore(root)
|
conn, err := connectGRPC(context)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveImageStore(clicontext *cli.Context) (images.Store, error) {
|
func resolveImageStore(clicontext *cli.Context) (images.Store, error) {
|
||||||
|
9
cmd/dist/edit.go
vendored
9
cmd/dist/edit.go
vendored
@ -9,7 +9,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
"github.com/containerd/containerd/services/content"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
@ -50,10 +50,9 @@ var editCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
content := content.NewStoreFromClient(contentapi.NewContentClient(conn))
|
||||||
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
|
||||||
|
|
||||||
rc, err := provider.Reader(ctx, dgst)
|
rc, err := content.Reader(ctx, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -65,7 +64,7 @@ var editCommand = cli.Command{
|
|||||||
}
|
}
|
||||||
defer nrc.Close()
|
defer nrc.Close()
|
||||||
|
|
||||||
wr, err := ingester.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
|
wr, err := content.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
9
cmd/dist/fetch.go
vendored
9
cmd/dist/fetch.go
vendored
@ -59,8 +59,7 @@ Most of this is experimental and there are few leaps to make this work.`,
|
|||||||
|
|
||||||
ongoing := newJobs()
|
ongoing := newJobs()
|
||||||
|
|
||||||
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
content := contentservice.NewStoreFromClient(contentapi.NewContentClient(conn))
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
||||||
|
|
||||||
// TODO(stevvooe): Need to replace this with content store client.
|
// TODO(stevvooe): Need to replace this with content store client.
|
||||||
cs, err := resolveContentStore(clicontext)
|
cs, err := resolveContentStore(clicontext)
|
||||||
@ -85,8 +84,8 @@ Most of this is experimental and there are few leaps to make this work.`,
|
|||||||
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}),
|
}),
|
||||||
remotes.FetchHandler(ingester, fetcher),
|
remotes.FetchHandler(content, fetcher),
|
||||||
images.ChildrenHandler(provider),
|
images.ChildrenHandler(content),
|
||||||
),
|
),
|
||||||
desc)
|
desc)
|
||||||
})
|
})
|
||||||
@ -114,7 +113,7 @@ Most of this is experimental and there are few leaps to make this work.`,
|
|||||||
|
|
||||||
activeSeen := map[string]struct{}{}
|
activeSeen := map[string]struct{}{}
|
||||||
if !done {
|
if !done {
|
||||||
active, err := cs.Active()
|
active, err := cs.Status(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Error("active check failed")
|
log.G(ctx).WithError(err).Error("active check failed")
|
||||||
continue
|
continue
|
||||||
|
6
cmd/dist/get.go
vendored
6
cmd/dist/get.go
vendored
@ -4,8 +4,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
@ -25,13 +23,11 @@ var getCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := connectGRPC(context)
|
cs, err := resolveContentStore(context)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
||||||
|
|
||||||
rc, err := cs.Reader(ctx, dgst)
|
rc, err := cs.Reader(ctx, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
8
cmd/dist/images.go
vendored
8
cmd/dist/images.go
vendored
@ -5,10 +5,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/progress"
|
"github.com/containerd/containerd/progress"
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
@ -29,13 +27,11 @@ var imagesListCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := connectGRPC(clicontext)
|
cs, err := resolveContentStore(clicontext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
||||||
|
|
||||||
images, err := imageStore.List(ctx)
|
images, err := imageStore.List(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "failed to list images")
|
return errors.Wrap(err, "failed to list images")
|
||||||
@ -44,7 +40,7 @@ var imagesListCommand = cli.Command{
|
|||||||
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, ' ', 0)
|
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, ' ', 0)
|
||||||
fmt.Fprintln(tw, "REF\tTYPE\tDIGEST\tSIZE\t")
|
fmt.Fprintln(tw, "REF\tTYPE\tDIGEST\tSIZE\t")
|
||||||
for _, image := range images {
|
for _, image := range images {
|
||||||
size, err := image.Size(ctx, provider)
|
size, err := image.Size(ctx, cs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Errorf("failed calculating size for image %s", image.Name)
|
log.G(ctx).WithError(err).Errorf("failed calculating size for image %s", image.Name)
|
||||||
}
|
}
|
||||||
|
14
cmd/dist/ingest.go
vendored
14
cmd/dist/ingest.go
vendored
@ -3,9 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
@ -40,20 +38,18 @@ var ingestCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := connectGRPC(context)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if ref == "" {
|
if ref == "" {
|
||||||
return errors.New("must specify a transaction reference")
|
return errors.New("must specify a transaction reference")
|
||||||
}
|
}
|
||||||
|
|
||||||
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
cs, err := resolveContentStore(context)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
|
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
|
||||||
// all data to be written in a single invocation. Allow multiple writes
|
// all data to be written in a single invocation. Allow multiple writes
|
||||||
// to the same transaction key followed by a commit.
|
// to the same transaction key followed by a commit.
|
||||||
return content.WriteBlob(ctx, ingester, ref, os.Stdin, expectedSize, expectedDigest)
|
return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
15
cmd/dist/list.go
vendored
15
cmd/dist/list.go
vendored
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
units "github.com/docker/go-units"
|
units "github.com/docker/go-units"
|
||||||
digest "github.com/opencontainers/go-digest"
|
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,8 +45,8 @@ var listCommand = cli.Command{
|
|||||||
|
|
||||||
var walkFn content.WalkFunc
|
var walkFn content.WalkFunc
|
||||||
if quiet {
|
if quiet {
|
||||||
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
walkFn = func(info content.Info) error {
|
||||||
fmt.Println(dgst)
|
fmt.Println(info.Digest)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -55,16 +54,16 @@ var listCommand = cli.Command{
|
|||||||
defer tw.Flush()
|
defer tw.Flush()
|
||||||
|
|
||||||
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE")
|
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE")
|
||||||
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
walkFn = func(info content.Info) error {
|
||||||
fmt.Fprintf(tw, "%s\t%s\t%s\n",
|
fmt.Fprintf(tw, "%s\t%s\t%s\n",
|
||||||
dgst,
|
info.Digest,
|
||||||
units.HumanSize(float64(fi.Size())),
|
units.HumanSize(float64(info.Size)),
|
||||||
units.HumanDuration(time.Since(fi.ModTime())))
|
units.HumanDuration(time.Since(info.CommittedAt)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cs.Walk(walkFn)
|
return cs.Walk(ctx, walkFn)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
29
cmd/dist/pull.go
vendored
29
cmd/dist/pull.go
vendored
@ -7,14 +7,12 @@ import (
|
|||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
|
||||||
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/progress"
|
"github.com/containerd/containerd/progress"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
|
||||||
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
||||||
"github.com/opencontainers/image-spec/identity"
|
"github.com/opencontainers/image-spec/identity"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
@ -44,7 +42,7 @@ command. As part of this process, we do the following:
|
|||||||
ctx, cancel := appContext()
|
ctx, cancel := appContext()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
conn, err := connectGRPC(clicontext)
|
cs, err := resolveContentStore(clicontext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -60,15 +58,6 @@ command. As part of this process, we do the following:
|
|||||||
}
|
}
|
||||||
ongoing := newJobs()
|
ongoing := newJobs()
|
||||||
|
|
||||||
// TODO(stevvooe): Must unify this type.
|
|
||||||
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
||||||
|
|
||||||
cs, err := resolveContentStore(clicontext)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
var resolvedImageName string
|
var resolvedImageName string
|
||||||
@ -93,8 +82,8 @@ command. As part of this process, we do the following:
|
|||||||
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}),
|
}),
|
||||||
remotes.FetchHandler(ingester, fetcher),
|
remotes.FetchHandler(cs, fetcher),
|
||||||
images.ChildrenHandler(provider)),
|
images.ChildrenHandler(cs)),
|
||||||
desc)
|
desc)
|
||||||
|
|
||||||
})
|
})
|
||||||
@ -118,9 +107,7 @@ command. As part of this process, we do the following:
|
|||||||
log.G(ctx).Fatal(err)
|
log.G(ctx).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
p, err := content.ReadBlob(ctx, cs, image.Target.Digest)
|
||||||
|
|
||||||
p, err := content.ReadBlob(ctx, provider, image.Target.Digest)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).Fatal(err)
|
log.G(ctx).Fatal(err)
|
||||||
}
|
}
|
||||||
@ -130,6 +117,10 @@ command. As part of this process, we do the following:
|
|||||||
log.G(ctx).Fatal(err)
|
log.G(ctx).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn, err := connectGRPC(clicontext)
|
||||||
|
if err != nil {
|
||||||
|
log.G(ctx).Fatal(err)
|
||||||
|
}
|
||||||
rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn))
|
rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn))
|
||||||
|
|
||||||
log.G(ctx).Info("unpacking rootfs")
|
log.G(ctx).Info("unpacking rootfs")
|
||||||
@ -138,7 +129,7 @@ command. As part of this process, we do the following:
|
|||||||
log.G(ctx).Fatal(err)
|
log.G(ctx).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
diffIDs, err := image.RootFS(ctx, provider)
|
diffIDs, err := image.RootFS(ctx, cs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Fatal("failed resolving rootfs")
|
log.G(ctx).WithError(err).Fatal("failed resolving rootfs")
|
||||||
}
|
}
|
||||||
@ -168,7 +159,7 @@ command. As part of this process, we do the following:
|
|||||||
|
|
||||||
activeSeen := map[string]struct{}{}
|
activeSeen := map[string]struct{}{}
|
||||||
if !done {
|
if !done {
|
||||||
active, err := cs.Active()
|
active, err := cs.Status(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Error("active check failed")
|
log.G(ctx).WithError(err).Error("active check failed")
|
||||||
continue
|
continue
|
||||||
|
10
cmd/dist/rootfs.go
vendored
10
cmd/dist/rootfs.go
vendored
@ -8,11 +8,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
|
||||||
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
|
||||||
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
@ -49,8 +47,12 @@ var rootfsUnpackCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
cs, err := resolveContentStore(clicontext)
|
||||||
m, err := resolveManifest(ctx, provider, dgst)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := resolveManifest(ctx, cs, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -28,24 +28,62 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Provider interface {
|
||||||
|
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Ingester interface {
|
||||||
|
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): Consider a very different name for this struct. Info is way
|
||||||
|
// to general. It also reads very weird in certain context, like pluralization.
|
||||||
type Info struct {
|
type Info struct {
|
||||||
Digest digest.Digest
|
Digest digest.Digest
|
||||||
Size int64
|
Size int64
|
||||||
CommittedAt time.Time
|
CommittedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type Provider interface {
|
|
||||||
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Status struct {
|
type Status struct {
|
||||||
Ref string
|
Ref string
|
||||||
Offset int64
|
Offset int64
|
||||||
Total int64
|
Total int64
|
||||||
|
Expected digest.Digest
|
||||||
StartedAt time.Time
|
StartedAt time.Time
|
||||||
UpdatedAt time.Time
|
UpdatedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WalkFunc defines the callback for a blob walk.
|
||||||
|
type WalkFunc func(Info) error
|
||||||
|
|
||||||
|
// Manager provides methods for inspecting, listing and removing content.
|
||||||
|
type Manager interface {
|
||||||
|
// Info will return metadata about content available in the content store.
|
||||||
|
//
|
||||||
|
// If the content is not present, ErrNotFound will be returned.
|
||||||
|
Info(ctx context.Context, dgst digest.Digest) (Info, error)
|
||||||
|
|
||||||
|
// Walk will call fn for each item in the content store.
|
||||||
|
Walk(ctx context.Context, fn WalkFunc) error
|
||||||
|
|
||||||
|
// Delete removes the content from the store.
|
||||||
|
Delete(ctx context.Context, dgst digest.Digest) error
|
||||||
|
|
||||||
|
// Status returns the status of any active ingestions whose ref match the
|
||||||
|
// provided regular expression. If empty, all active ingestions will be
|
||||||
|
// returned.
|
||||||
|
//
|
||||||
|
// TODO(stevvooe): Status may be slighly out of place here. If this remains
|
||||||
|
// here, we should remove Manager and just define these on store.
|
||||||
|
Status(ctx context.Context, re string) ([]Status, error)
|
||||||
|
|
||||||
|
// Abort completely cancels the ingest operation targeted by ref.
|
||||||
|
//
|
||||||
|
// TODO(stevvooe): Same consideration as above. This should really be
|
||||||
|
// restricted to an ingest management interface.
|
||||||
|
Abort(ctx context.Context, ref string) error
|
||||||
|
}
|
||||||
|
|
||||||
type Writer interface {
|
type Writer interface {
|
||||||
io.WriteCloser
|
io.WriteCloser
|
||||||
Status() (Status, error)
|
Status() (Status, error)
|
||||||
@ -54,8 +92,12 @@ type Writer interface {
|
|||||||
Truncate(size int64) error
|
Truncate(size int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Ingester interface {
|
// Store combines the methods of content-oriented interfaces into a set that
|
||||||
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
|
// are commonly provided by complete implementations.
|
||||||
|
type Store interface {
|
||||||
|
Manager
|
||||||
|
Ingester
|
||||||
|
Provider
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsNotFound(err error) bool {
|
func IsNotFound(err error) bool {
|
||||||
|
@ -52,7 +52,7 @@ func TestContentWriter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we should also see this as an active ingestion
|
// we should also see this as an active ingestion
|
||||||
ingestions, err := cs.Active()
|
ingestions, err := cs.Status(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -132,11 +132,9 @@ func TestWalkBlobs(t *testing.T) {
|
|||||||
expected[dgst] = struct{}{}
|
expected[dgst] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cs.Walk(func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
if err := cs.Walk(ctx, func(bi Info) error {
|
||||||
found[dgst] = struct{}{}
|
found[bi.Digest] = struct{}{}
|
||||||
if checked := checkBlobPath(t, cs, dgst); checked != path {
|
checkBlobPath(t, cs, bi.Digest)
|
||||||
t.Fatalf("blob path did not match: %v != %v", path, checked)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -203,7 +201,7 @@ func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
|
|||||||
return blobs
|
return blobs
|
||||||
}
|
}
|
||||||
|
|
||||||
func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
func populateBlobStore(t checker, ctx context.Context, cs Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||||
blobs := generateBlobs(t, nblobs, maxsize)
|
blobs := generateBlobs(t, nblobs, maxsize)
|
||||||
|
|
||||||
for dgst, p := range blobs {
|
for dgst, p := range blobs {
|
||||||
@ -213,7 +211,7 @@ func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsiz
|
|||||||
return blobs
|
return blobs
|
||||||
}
|
}
|
||||||
|
|
||||||
func contentStoreEnv(t checker) (context.Context, string, *Store, func()) {
|
func contentStoreEnv(t checker) (context.Context, string, Store, func()) {
|
||||||
pc, _, _, ok := runtime.Caller(1)
|
pc, _, _, ok := runtime.Caller(1)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("failed to resolve caller")
|
t.Fatal("failed to resolve caller")
|
||||||
@ -249,10 +247,10 @@ func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
func checkBlobPath(t *testing.T, cs Store, dgst digest.Digest) string {
|
||||||
path := cs.blobPath(dgst)
|
path := cs.(*store).blobPath(dgst)
|
||||||
|
|
||||||
if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) {
|
if path != filepath.Join(cs.(*store).root, "blobs", dgst.Algorithm().String(), dgst.Hex()) {
|
||||||
t.Fatalf("unexpected path: %q", path)
|
t.Fatalf("unexpected path: %q", path)
|
||||||
}
|
}
|
||||||
fi, err := os.Stat(path)
|
fi, err := os.Stat(path)
|
||||||
@ -268,7 +266,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
|||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
|
func checkWrite(t checker, ctx context.Context, cs Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||||
if err := WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
if err := WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
137
content/store.go
137
content/store.go
@ -7,6 +7,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -21,21 +22,21 @@ import (
|
|||||||
//
|
//
|
||||||
// Store can generally support multi-reader, single-writer ingest of data,
|
// Store can generally support multi-reader, single-writer ingest of data,
|
||||||
// including resumable ingest.
|
// including resumable ingest.
|
||||||
type Store struct {
|
type store struct {
|
||||||
root string
|
root string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(root string) (*Store, error) {
|
func NewStore(root string) (Store, error) {
|
||||||
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Store{
|
return &store{
|
||||||
root: root,
|
root: root,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Info(dgst digest.Digest) (Info, error) {
|
func (s *store) Info(ctx context.Context, dgst digest.Digest) (Info, error) {
|
||||||
p := s.blobPath(dgst)
|
p := s.blobPath(dgst)
|
||||||
fi, err := os.Stat(p)
|
fi, err := os.Stat(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -46,11 +47,15 @@ func (s *Store) Info(dgst digest.Digest) (Info, error) {
|
|||||||
return Info{}, err
|
return Info{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s.info(dgst, fi), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info {
|
||||||
return Info{
|
return Info{
|
||||||
Digest: dgst,
|
Digest: dgst,
|
||||||
Size: fi.Size(),
|
Size: fi.Size(),
|
||||||
CommittedAt: fi.ModTime(),
|
CommittedAt: fi.ModTime(),
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open returns an io.ReadCloser for the blob.
|
// Open returns an io.ReadCloser for the blob.
|
||||||
@ -58,7 +63,7 @@ func (s *Store) Info(dgst digest.Digest) (Info, error) {
|
|||||||
// TODO(stevvooe): This would work much better as an io.ReaderAt in practice.
|
// TODO(stevvooe): This would work much better as an io.ReaderAt in practice.
|
||||||
// Right now, we are doing type assertion to tease that out, but it won't scale
|
// Right now, we are doing type assertion to tease that out, but it won't scale
|
||||||
// well.
|
// well.
|
||||||
func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
func (s *store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||||
fp, err := os.Open(s.blobPath(dgst))
|
fp, err := os.Open(s.blobPath(dgst))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
@ -74,7 +79,7 @@ func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser,
|
|||||||
//
|
//
|
||||||
// While this is safe to do concurrently, safe exist-removal logic must hold
|
// While this is safe to do concurrently, safe exist-removal logic must hold
|
||||||
// some global lock on the store.
|
// some global lock on the store.
|
||||||
func (cs *Store) Delete(dgst digest.Digest) error {
|
func (cs *store) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
|
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
@ -88,14 +93,7 @@ func (cs *Store) Delete(dgst digest.Digest) error {
|
|||||||
|
|
||||||
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
||||||
|
|
||||||
// WalkFunc defines the callback for a blob walk.
|
func (cs *store) Walk(ctx context.Context, fn WalkFunc) error {
|
||||||
//
|
|
||||||
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
|
|
||||||
// not a huge deal, considering we have a path, but let's not just let this one
|
|
||||||
// go without scrutiny.
|
|
||||||
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
|
|
||||||
|
|
||||||
func (cs *Store) Walk(fn WalkFunc) error {
|
|
||||||
root := filepath.Join(cs.root, "blobs")
|
root := filepath.Join(cs.root, "blobs")
|
||||||
var alg digest.Algorithm
|
var alg digest.Algorithm
|
||||||
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
||||||
@ -133,17 +131,60 @@ func (cs *Store) Walk(fn WalkFunc) error {
|
|||||||
// store or extra paths not expected previously.
|
// store or extra paths not expected previously.
|
||||||
}
|
}
|
||||||
|
|
||||||
return fn(path, fi, dgst)
|
return fn(cs.info(dgst, fi))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status returns the current status of a blob by the ingest ref.
|
func (s *store) Status(ctx context.Context, re string) ([]Status, error) {
|
||||||
func (s *Store) Status(ref string) (Status, error) {
|
fp, err := os.Open(filepath.Join(s.root, "ingest"))
|
||||||
return s.status(s.ingestRoot(ref))
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer fp.Close()
|
||||||
|
|
||||||
|
fis, err := fp.Readdir(-1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rec, err := regexp.Compile(re)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var active []Status
|
||||||
|
for _, fi := range fis {
|
||||||
|
p := filepath.Join(s.root, "ingest", fi.Name())
|
||||||
|
stat, err := s.status(p)
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): This is a common error if uploads are being
|
||||||
|
// completed while making this listing. Need to consider taking a
|
||||||
|
// lock on the whole store to coordinate this aspect.
|
||||||
|
//
|
||||||
|
// Another option is to cleanup downloads asynchronously and
|
||||||
|
// coordinate this method with the cleanup process.
|
||||||
|
//
|
||||||
|
// For now, we just skip them, as they really don't exist.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rec.MatchString(stat.Ref) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
active = append(active, stat)
|
||||||
|
}
|
||||||
|
|
||||||
|
return active, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// status works like stat above except uses the path to the ingest.
|
// status works like stat above except uses the path to the ingest.
|
||||||
func (s *Store) status(ingestPath string) (Status, error) {
|
func (s *store) status(ingestPath string) (Status, error) {
|
||||||
dp := filepath.Join(ingestPath, "data")
|
dp := filepath.Join(ingestPath, "data")
|
||||||
fi, err := os.Stat(dp)
|
fi, err := os.Stat(dp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -165,7 +206,7 @@ func (s *Store) status(ingestPath string) (Status, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// total attempts to resolve the total expected size for the write.
|
// total attempts to resolve the total expected size for the write.
|
||||||
func (s *Store) total(ingestPath string) int64 {
|
func (s *store) total(ingestPath string) int64 {
|
||||||
totalS, err := readFileString(filepath.Join(ingestPath, "total"))
|
totalS, err := readFileString(filepath.Join(ingestPath, "total"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0
|
return 0
|
||||||
@ -185,7 +226,10 @@ func (s *Store) total(ingestPath string) int64 {
|
|||||||
// ref at a time.
|
// ref at a time.
|
||||||
//
|
//
|
||||||
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
||||||
func (s *Store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (Writer, error) {
|
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (Writer, error) {
|
||||||
|
// TODO(stevvooe): Need to actually store and handle expected here. We have
|
||||||
|
// code in the service that shouldn't be dealing with this.
|
||||||
|
|
||||||
path, refp, data, lock, err := s.ingestPaths(ref)
|
path, refp, data, lock, err := s.ingestPaths(ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -283,11 +327,11 @@ func (s *Store) Writer(ctx context.Context, ref string, total int64, expected di
|
|||||||
|
|
||||||
// Abort an active transaction keyed by ref. If the ingest is active, it will
|
// Abort an active transaction keyed by ref. If the ingest is active, it will
|
||||||
// be cancelled. Any resources associated with the ingest will be cleaned.
|
// be cancelled. Any resources associated with the ingest will be cleaned.
|
||||||
func (s *Store) Abort(ref string) error {
|
func (s *store) Abort(ctx context.Context, ref string) error {
|
||||||
root := s.ingestRoot(ref)
|
root := s.ingestRoot(ref)
|
||||||
if err := os.RemoveAll(root); err != nil {
|
if err := os.RemoveAll(root); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -296,50 +340,11 @@ func (s *Store) Abort(ref string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Active() ([]Status, error) {
|
func (cs *store) blobPath(dgst digest.Digest) string {
|
||||||
fp, err := os.Open(filepath.Join(s.root, "ingest"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer fp.Close()
|
|
||||||
|
|
||||||
fis, err := fp.Readdir(-1)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var active []Status
|
|
||||||
for _, fi := range fis {
|
|
||||||
p := filepath.Join(s.root, "ingest", fi.Name())
|
|
||||||
stat, err := s.status(p)
|
|
||||||
if err != nil {
|
|
||||||
if !os.IsNotExist(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(stevvooe): This is a common error if uploads are being
|
|
||||||
// completed while making this listing. Need to consider taking a
|
|
||||||
// lock on the whole store to coordinate this aspect.
|
|
||||||
//
|
|
||||||
// Another option is to cleanup downloads asynchronously and
|
|
||||||
// coordinate this method with the cleanup process.
|
|
||||||
//
|
|
||||||
// For now, we just skip them, as they really don't exist.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
active = append(active, stat)
|
|
||||||
}
|
|
||||||
|
|
||||||
return active, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Store) blobPath(dgst digest.Digest) string {
|
|
||||||
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) ingestRoot(ref string) string {
|
func (s *store) ingestRoot(ref string) string {
|
||||||
dgst := digest.FromString(ref)
|
dgst := digest.FromString(ref)
|
||||||
return filepath.Join(s.root, "ingest", dgst.Hex())
|
return filepath.Join(s.root, "ingest", dgst.Hex())
|
||||||
}
|
}
|
||||||
@ -351,7 +356,7 @@ func (s *Store) ingestRoot(ref string) string {
|
|||||||
// - data: file where data is written
|
// - data: file where data is written
|
||||||
// - lock: lock file location
|
// - lock: lock file location
|
||||||
//
|
//
|
||||||
func (s *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
|
func (s *store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
|
||||||
var (
|
var (
|
||||||
fp = s.ingestRoot(ref)
|
fp = s.ingestRoot(ref)
|
||||||
rp = filepath.Join(fp, "ref")
|
rp = filepath.Join(fp, "ref")
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
|
|
||||||
// writer represents a write transaction against the blob store.
|
// writer represents a write transaction against the blob store.
|
||||||
type writer struct {
|
type writer struct {
|
||||||
s *Store
|
s *store
|
||||||
fp *os.File // opened data file
|
fp *os.File // opened data file
|
||||||
lock lockfile.Lockfile
|
lock lockfile.Lockfile
|
||||||
path string // path to writer dir
|
path string // path to writer dir
|
||||||
|
@ -33,7 +33,7 @@ type InitContext struct {
|
|||||||
Root string
|
Root string
|
||||||
State string
|
State string
|
||||||
Runtimes map[string]containerd.Runtime
|
Runtimes map[string]containerd.Runtime
|
||||||
Content *content.Store
|
Content content.Store
|
||||||
Meta *bolt.DB
|
Meta *bolt.DB
|
||||||
Snapshotter snapshot.Snapshotter
|
Snapshotter snapshot.Snapshotter
|
||||||
Config interface{}
|
Config interface{}
|
||||||
|
@ -17,3 +17,14 @@ func rewriteGRPCError(err error) error {
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serverErrorToGRPC(err error, id string) error {
|
||||||
|
switch {
|
||||||
|
case content.IsNotFound(err):
|
||||||
|
return grpc.Errorf(codes.NotFound, "%v: not found", id)
|
||||||
|
case content.IsExists(err):
|
||||||
|
return grpc.Errorf(codes.AlreadyExists, "%v: exists", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -1,35 +1,9 @@
|
|||||||
package content
|
package content
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
"github.com/containerd/containerd/content"
|
|
||||||
digest "github.com/opencontainers/go-digest"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewProviderFromClient(client contentapi.ContentClient) content.Provider {
|
|
||||||
return &remoteProvider{
|
|
||||||
client: client,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type remoteProvider struct {
|
|
||||||
client contentapi.ContentClient
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
|
||||||
client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &remoteReader{
|
|
||||||
client: client,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type remoteReader struct {
|
type remoteReader struct {
|
||||||
client contentapi.Content_ReadClient
|
client contentapi.Content_ReadClient
|
||||||
extra []byte
|
extra []byte
|
@ -18,7 +18,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
store *content.Store
|
store content.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
var bufPool = sync.Pool{
|
var bufPool = sync.Pool{
|
||||||
@ -52,25 +52,68 @@ func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResp
|
|||||||
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
|
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
bi, err := s.store.Info(req.Digest)
|
bi, err := s.store.Info(ctx, req.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, maybeNotFoundGRPC(err, req.Digest.String())
|
return nil, serverErrorToGRPC(err, req.Digest.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
return &api.InfoResponse{
|
return &api.InfoResponse{
|
||||||
Digest: req.Digest,
|
Info: api.Info{
|
||||||
Size_: bi.Size,
|
Digest: bi.Digest,
|
||||||
CommittedAt: bi.CommittedAt,
|
Size_: bi.Size,
|
||||||
|
CommittedAt: bi.CommittedAt,
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
|
||||||
|
var (
|
||||||
|
buffer []api.Info
|
||||||
|
sendBlock = func(block []api.Info) error {
|
||||||
|
// send last block
|
||||||
|
return session.Send(&api.ListContentResponse{
|
||||||
|
Info: block,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.store.Walk(session.Context(), func(info content.Info) error {
|
||||||
|
buffer = append(buffer, api.Info{
|
||||||
|
Digest: info.Digest,
|
||||||
|
Size_: info.Size,
|
||||||
|
CommittedAt: info.CommittedAt,
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(buffer) >= 100 {
|
||||||
|
if err := sendBlock(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = buffer[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(buffer) > 0 {
|
||||||
|
// send last block
|
||||||
|
if err := sendBlock(buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
|
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
|
||||||
if err := req.Digest.Validate(); err != nil {
|
if err := req.Digest.Validate(); err != nil {
|
||||||
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
|
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.store.Delete(req.Digest); err != nil {
|
if err := s.store.Delete(ctx, req.Digest); err != nil {
|
||||||
return nil, maybeNotFoundGRPC(err, req.Digest.String())
|
return nil, serverErrorToGRPC(err, req.Digest.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
return &empty.Empty{}, nil
|
return &empty.Empty{}, nil
|
||||||
@ -81,14 +124,14 @@ func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) err
|
|||||||
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
|
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
oi, err := s.store.Info(req.Digest)
|
oi, err := s.store.Info(session.Context(), req.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return maybeNotFoundGRPC(err, req.Digest.String())
|
return serverErrorToGRPC(err, req.Digest.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
rc, err := s.store.Reader(session.Context(), req.Digest)
|
rc, err := s.store.Reader(session.Context(), req.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return maybeNotFoundGRPC(err, req.Digest.String())
|
return serverErrorToGRPC(err, req.Digest.String())
|
||||||
}
|
}
|
||||||
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
|
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
|
||||||
|
|
||||||
@ -132,6 +175,10 @@ func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readResponseWriter is a writer that places the output into ReadResponse messages.
|
||||||
|
//
|
||||||
|
// This allows io.CopyBuffer to do the heavy lifting of chunking the responses
|
||||||
|
// into the buffer size.
|
||||||
type readResponseWriter struct {
|
type readResponseWriter struct {
|
||||||
offset int64
|
offset int64
|
||||||
session api.Content_ReadServer
|
session api.Content_ReadServer
|
||||||
@ -149,6 +196,27 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
|
|||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
|
||||||
|
statuses, err := s.store.Status(ctx, req.Regexp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, serverErrorToGRPC(err, req.Regexp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp api.StatusResponse
|
||||||
|
for _, status := range statuses {
|
||||||
|
resp.Statuses = append(resp.Statuses, api.Status{
|
||||||
|
StartedAt: status.StartedAt,
|
||||||
|
UpdatedAt: status.UpdatedAt,
|
||||||
|
Ref: status.Ref,
|
||||||
|
Offset: status.Offset,
|
||||||
|
Total: status.Total,
|
||||||
|
Expected: status.Expected,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||||
var (
|
var (
|
||||||
ctx = session.Context()
|
ctx = session.Context()
|
||||||
@ -243,8 +311,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
|||||||
}
|
}
|
||||||
expected = req.Expected
|
expected = req.Expected
|
||||||
|
|
||||||
if _, err := s.store.Info(req.Expected); err == nil {
|
if _, err := s.store.Info(session.Context(), req.Expected); err == nil {
|
||||||
if err := s.store.Abort(ref); err != nil {
|
if err := s.store.Abort(session.Context(), ref); err != nil {
|
||||||
log.G(ctx).WithError(err).Error("failed to abort write")
|
log.G(ctx).WithError(err).Error("failed to abort write")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,8 +375,6 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
msg.Digest = wr.Digest()
|
msg.Digest = wr.Digest()
|
||||||
case api.WriteActionAbort:
|
|
||||||
return s.store.Abort(ref)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := session.Send(&msg); err != nil {
|
if err := session.Send(&msg); err != nil {
|
||||||
@ -326,14 +392,10 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
|
func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empty, error) {
|
||||||
return grpc.Errorf(codes.Unimplemented, "not implemented")
|
if err := s.store.Abort(ctx, req.Ref); err != nil {
|
||||||
}
|
return nil, serverErrorToGRPC(err, req.Ref)
|
||||||
|
|
||||||
func maybeNotFoundGRPC(err error, id string) error {
|
|
||||||
if content.IsNotFound(err) {
|
|
||||||
return grpc.Errorf(codes.NotFound, "%v: not found", id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return &empty.Empty{}, nil
|
||||||
}
|
}
|
||||||
|
155
services/content/store.go
Normal file
155
services/content/store.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package content
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
)
|
||||||
|
|
||||||
|
type remoteStore struct {
|
||||||
|
client contentapi.ContentClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStoreFromClient(client contentapi.ContentClient) content.Store {
|
||||||
|
return &remoteStore{
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||||
|
resp, err := rs.client.Info(ctx, &contentapi.InfoRequest{
|
||||||
|
Digest: dgst,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return content.Info{}, rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return content.Info{
|
||||||
|
Digest: resp.Info.Digest,
|
||||||
|
Size: resp.Info.Size_,
|
||||||
|
CommittedAt: resp.Info.CommittedAt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
||||||
|
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := session.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
return rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range msg.Info {
|
||||||
|
if err := fn(content.Info{
|
||||||
|
Digest: info.Digest,
|
||||||
|
Size: info.Size_,
|
||||||
|
CommittedAt: info.CommittedAt,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
|
if _, err := rs.client.Delete(ctx, &contentapi.DeleteContentRequest{
|
||||||
|
Digest: dgst,
|
||||||
|
}); err != nil {
|
||||||
|
return rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||||
|
client, err := rs.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &remoteReader{
|
||||||
|
client: client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Status(ctx context.Context, re string) ([]content.Status, error) {
|
||||||
|
resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{
|
||||||
|
Regexp: re,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var statuses []content.Status
|
||||||
|
for _, status := range resp.Statuses {
|
||||||
|
statuses = append(statuses, content.Status{
|
||||||
|
Ref: status.Ref,
|
||||||
|
StartedAt: status.StartedAt,
|
||||||
|
UpdatedAt: status.UpdatedAt,
|
||||||
|
Offset: status.Offset,
|
||||||
|
Total: status.Total,
|
||||||
|
Expected: status.Expected,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return statuses, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||||
|
wrclient, offset, err := rs.negotiate(ctx, ref, size, expected)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &remoteWriter{
|
||||||
|
client: wrclient,
|
||||||
|
offset: offset,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort implements asynchronous abort. It starts a new write session on the ref l
|
||||||
|
func (rs *remoteStore) Abort(ctx context.Context, ref string) error {
|
||||||
|
if _, err := rs.client.Abort(ctx, &contentapi.AbortRequest{
|
||||||
|
Ref: ref,
|
||||||
|
}); err != nil {
|
||||||
|
return rewriteGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
|
||||||
|
wrclient, err := rs.client.Write(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wrclient.Send(&contentapi.WriteRequest{
|
||||||
|
Action: contentapi.WriteActionStat,
|
||||||
|
Ref: ref,
|
||||||
|
Total: size,
|
||||||
|
Expected: expected,
|
||||||
|
}); err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := wrclient.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wrclient, resp.Offset, nil
|
||||||
|
}
|
@ -1,61 +1,14 @@
|
|||||||
package content
|
package content
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
contentapi "github.com/containerd/containerd/api/services/content"
|
contentapi "github.com/containerd/containerd/api/services/content"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
|
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewIngesterFromClient(client contentapi.ContentClient) content.Ingester {
|
|
||||||
return &remoteIngester{
|
|
||||||
client: client,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type remoteIngester struct {
|
|
||||||
client contentapi.ContentClient
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
|
||||||
wrclient, offset, err := ri.negotiate(ctx, ref, size, expected)
|
|
||||||
if err != nil {
|
|
||||||
return nil, rewriteGRPCError(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &remoteWriter{
|
|
||||||
client: wrclient,
|
|
||||||
offset: offset,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ri *remoteIngester) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
|
|
||||||
wrclient, err := ri.client.Write(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := wrclient.Send(&contentapi.WriteRequest{
|
|
||||||
Action: contentapi.WriteActionStat,
|
|
||||||
Ref: ref,
|
|
||||||
Total: size,
|
|
||||||
Expected: expected,
|
|
||||||
}); err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := wrclient.Recv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return wrclient, resp.Offset, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type remoteWriter struct {
|
type remoteWriter struct {
|
||||||
ref string
|
ref string
|
||||||
client contentapi.Content_WriteClient
|
client contentapi.Content_WriteClient
|
@ -26,11 +26,11 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
store *content.Store
|
store content.Store
|
||||||
snapshotter snapshot.Snapshotter
|
snapshotter snapshot.Snapshotter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(store *content.Store, snapshotter snapshot.Snapshotter) (*Service, error) {
|
func NewService(store content.Store, snapshotter snapshot.Snapshotter) (*Service, error) {
|
||||||
return &Service{
|
return &Service{
|
||||||
store: store,
|
store: store,
|
||||||
snapshotter: snapshotter,
|
snapshotter: snapshotter,
|
||||||
|
@ -512,7 +512,7 @@ func init() {
|
|||||||
|
|
||||||
var fileDescriptorRecord = []byte{
|
var fileDescriptorRecord = []byte{
|
||||||
// 330 bytes of a gzipped FileDescriptorProto
|
// 330 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x54, 0x90, 0xbd, 0x4e, 0xf3, 0x30,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xbd, 0x4e, 0xf3, 0x30,
|
||||||
0x18, 0x85, 0xe3, 0x7c, 0xf9, 0x42, 0x6b, 0xd1, 0xaa, 0x18, 0x54, 0x59, 0x19, 0x8c, 0xc5, 0x42,
|
0x18, 0x85, 0xe3, 0x7c, 0xf9, 0x42, 0x6b, 0xd1, 0xaa, 0x18, 0x54, 0x59, 0x19, 0x8c, 0xc5, 0x42,
|
||||||
0xc4, 0x90, 0x08, 0xb8, 0x82, 0xfe, 0x0d, 0x55, 0x55, 0x21, 0x85, 0x8a, 0x3d, 0x8d, 0xad, 0xd4,
|
0xc4, 0x90, 0x08, 0xb8, 0x82, 0xfe, 0x0d, 0x55, 0x55, 0x21, 0x85, 0x8a, 0x3d, 0x8d, 0xad, 0xd4,
|
||||||
0x82, 0xda, 0x95, 0x63, 0x2a, 0xc1, 0xc4, 0x88, 0x7a, 0x0f, 0x9d, 0x60, 0xe3, 0x0e, 0xb8, 0x82,
|
0x82, 0xda, 0x95, 0x63, 0x2a, 0xc1, 0xc4, 0x88, 0x7a, 0x0f, 0x9d, 0x60, 0xe3, 0x0e, 0xb8, 0x82,
|
||||||
|
Loading…
Reference in New Issue
Block a user