diff --git a/docs/config.md b/docs/config.md index d5c481ff2..1f5c30e5d 100644 --- a/docs/config.md +++ b/docs/config.md @@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows: # enable_tls_streaming enables the TLS streaming support. enable_tls_streaming = false + # max_container_log_line_size is the maximum log line size in bytes for a container. + # Log line longer than the limit will be split into multiple lines. -1 means no + # limit. + max_container_log_line_size = 16384 + # "plugins.cri.containerd" contains config related to containerd [plugins.cri.containerd] diff --git a/pkg/config/config.go b/pkg/config/config.go index d9e417d75..8ec2be834 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -96,6 +96,10 @@ type PluginConfig struct { SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"` // EnableTLSStreaming indicates to enable the TLS streaming support. EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` + // MaxContainerLogLineSize is the maximum log line size in bytes for a container. + // Log line longer than the limit will be split into multiple lines. Non-positive + // value means no limit. + MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"` } // Config contains all configurations for cri server. @@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig { Root: "", }, }, - StreamServerAddress: "", - StreamServerPort: "10010", - EnableSelinux: false, - EnableTLSStreaming: false, - SandboxImage: "k8s.gcr.io/pause:3.1", - StatsCollectPeriod: 10, - SystemdCgroup: false, + StreamServerAddress: "", + StreamServerPort: "10010", + EnableSelinux: false, + EnableTLSStreaming: false, + SandboxImage: "k8s.gcr.io/pause:3.1", + StatsCollectPeriod: 10, + SystemdCgroup: false, + MaxContainerLogLineSize: 16 * 1024, Registry: Registry{ Mirrors: map[string]Mirror{ "docker.io": { diff --git a/pkg/ioutil/write_closer.go b/pkg/ioutil/write_closer.go index d1fa8ed50..987169fe1 100644 --- a/pkg/ioutil/write_closer.go +++ b/pkg/ioutil/write_closer.go @@ -16,7 +16,10 @@ limitations under the License. package ioutil -import "io" +import ( + "io" + "sync" +) // writeCloseInformer wraps passed in write closer with a close channel. // Caller could wait on the close channel for the write closer to be @@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) { func (n *nopWriteCloser) Close() error { return nil } + +// serialWriteCloser wraps a write closer and makes sure all writes +// are done in serial. +// Parallel write won't intersect with each other. Use case: +// 1) Pipe: Write content longer than PIPE_BUF. +// See http://man7.org/linux/man-pages/man7/pipe.7.html +// 2) <3.14 Linux Kernel: write is not atomic +// See http://man7.org/linux/man-pages/man2/write.2.html +type serialWriteCloser struct { + mu sync.Mutex + wc io.WriteCloser +} + +// NewSerialWriteCloser creates a SerialWriteCloser from a write closer. +func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser { + return &serialWriteCloser{wc: wc} +} + +// Write writes a group of byte arrays in order atomically. +func (s *serialWriteCloser) Write(data []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Write(data) +} + +// Close closes the write closer. +func (s *serialWriteCloser) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Close() +} diff --git a/pkg/ioutil/write_closer_test.go b/pkg/ioutil/write_closer_test.go index 67f6b02ed..8b1eb4b20 100644 --- a/pkg/ioutil/write_closer_test.go +++ b/pkg/ioutil/write_closer_test.go @@ -17,9 +17,16 @@ limitations under the License. package ioutil import ( + "io/ioutil" + "os" + "sort" + "strconv" + "strings" + "sync" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWriteCloseInformer(t *testing.T) { @@ -47,3 +54,55 @@ func TestWriteCloseInformer(t *testing.T) { assert.Fail(t, "write closer not closed") } } + +func TestSerialWriteCloser(t *testing.T) { + const ( + // Test 10 times to make sure it always pass. + testCount = 10 + + goroutine = 10 + dataLen = 100000 + ) + for n := 0; n < testCount; n++ { + testData := make([][]byte, goroutine) + for i := 0; i < goroutine; i++ { + testData[i] = []byte(repeatNumber(i, dataLen) + "\n") + } + + f, err := ioutil.TempFile("/tmp", "serial-write-closer") + require.NoError(t, err) + defer os.RemoveAll(f.Name()) + defer f.Close() + wc := NewSerialWriteCloser(f) + defer wc.Close() + + // Write data in parallel + var wg sync.WaitGroup + wg.Add(goroutine) + for i := 0; i < goroutine; i++ { + go func(id int) { + n, err := wc.Write(testData[id]) + assert.NoError(t, err) + assert.Equal(t, dataLen+1, n) + wg.Done() + }(i) + } + wg.Wait() + wc.Close() + + // Check test result + content, err := ioutil.ReadFile(f.Name()) + require.NoError(t, err) + resultData := strings.Split(strings.TrimSpace(string(content)), "\n") + require.Len(t, resultData, goroutine) + sort.Strings(resultData) + for i := 0; i < goroutine; i++ { + expected := repeatNumber(i, dataLen) + assert.Equal(t, expected, resultData[i]) + } + } +} + +func repeatNumber(num, count int) string { + return strings.Repeat(strconv.Itoa(num), count) +} diff --git a/pkg/server/container_log_reopen.go b/pkg/server/container_log_reopen.go index 144cc2cc5..515cc20b9 100644 --- a/pkg/server/container_log_reopen.go +++ b/pkg/server/container_log_reopen.go @@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo } // Create new container logger and replace the existing ones. - stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty()) + stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty()) if err != nil { return nil, err } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 26e3d216d..ed4b8b43f 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -18,6 +18,7 @@ package server import ( "io" + "os" "time" "github.com/containerd/containerd" @@ -29,6 +30,7 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" ctrdutil "github.com/containerd/cri/pkg/containerd/util" + cioutil "github.com/containerd/cri/pkg/ioutil" cio "github.com/containerd/cri/pkg/server/io" containerstore "github.com/containerd/cri/pkg/store/container" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" @@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context, } ioCreation := func(id string) (_ containerdio.IO, err error) { - stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty()) + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty()) if err != nil { return nil, errors.Wrap(err, "failed to create container loggers") } - defer func() { - if err != nil { - if stdoutWC != nil { - stdoutWC.Close() - } - if stderrWC != nil { - stderrWC.Close() - } - } - }() cntr.IO.AddOutput("log", stdoutWC, stderrWC) cntr.IO.Pipe() return cntr.IO, nil @@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context, return nil } -// Create container loggers and return write closer for stdout and stderr. -func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) { +// createContainerLoggers creates container loggers and return write closer for stdout and stderr. +func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) { if logPath != "" { // Only generate container log when log path is specified. - if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil { - return nil, nil, errors.Wrap(err, "failed to start container stdout logger") + f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create and open log file") } defer func() { if err != nil { - stdout.Close() + f.Close() } }() + var stdoutCh, stderrCh <-chan struct{} + wc := cioutil.NewSerialWriteCloser(f) + stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize) // Only redirect stderr when there is no tty. if !tty { - if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil { - return nil, nil, errors.Wrap(err, "failed to start container stderr logger") - } + stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize) } + go func() { + if stdoutCh != nil { + <-stdoutCh + } + if stderrCh != nil { + <-stderrCh + } + logrus.Debugf("Finish redirecting log file %q, closing it", logPath) + f.Close() + }() } else { stdout = cio.NewDiscardLogger() stderr = cio.NewDiscardLogger() diff --git a/pkg/server/io/logger.go b/pkg/server/io/logger.go index 3d7816bc4..9d2af2769 100644 --- a/pkg/server/io/logger.go +++ b/pkg/server/io/logger.go @@ -21,10 +21,8 @@ import ( "bytes" "io" "io/ioutil" - "os" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" @@ -38,11 +36,8 @@ const ( eol = '\n' // timestampFormat is the timestamp format used in CRI logging format. timestampFormat = time.RFC3339Nano - // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. - // POSIX.1 says that write less than PIPE_BUF is atmoic. - pipeBufSize = 4096 - // bufSize is the size of the read buffer. - bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/ + // defaultBufSize is the default size of the read buffer in bytes. + defaultBufSize = 4096 ) // NewDiscardLogger creates logger which discards all the input. @@ -51,46 +46,91 @@ func NewDiscardLogger() io.WriteCloser { } // NewCRILogger returns a write closer which redirect container log into -// log file, and decorate the log line into CRI defined format. -func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) { - logrus.Debugf("Start writing log file %q", path) +// log file, and decorate the log line into CRI defined format. It also +// returns a channel which indicates whether the logger is stopped. +// maxLen is the max length limit of a line. A line longer than the +// limit will be cut into multiple lines. +func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) { + logrus.Debugf("Start writing stream %q to log file %q", stream, path) prc, pwc := io.Pipe() - f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) - if err != nil { - return nil, errors.Wrap(err, "failed to open log file") - } - go redirectLogs(path, prc, f, stream) - return pwc, nil + stop := make(chan struct{}) + go func() { + redirectLogs(path, prc, w, stream, maxLen) + close(stop) + }() + return pwc, stop } -func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) { +func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { defer rc.Close() - defer wc.Close() - streamBytes := []byte(stream) - delimiterBytes := []byte{delimiter} - partialBytes := []byte(runtime.LogTagPartial) - fullBytes := []byte(runtime.LogTagFull) - r := bufio.NewReaderSize(rc, bufSize) - for { - lineBytes, isPrefix, err := r.ReadLine() - if err == io.EOF { - logrus.Debugf("Finish redirecting log file %q", path) - return - } - if err != nil { - logrus.WithError(err).Errorf("An error occurred when redirecting log file %q", path) - return - } - tagBytes := fullBytes - if isPrefix { - tagBytes = partialBytes - } - timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) - data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes) - data = append(data, eol) - if _, err := wc.Write(data); err != nil { - logrus.WithError(err).Errorf("Fail to write %q log to log file %q", stream, path) - } - // Continue on write error to drain the input. + var ( + stream = []byte(s) + delimiter = []byte{delimiter} + partial = []byte(runtime.LogTagPartial) + full = []byte(runtime.LogTagFull) + buf [][]byte + length int + bufSize = defaultBufSize + ) + // Make sure bufSize <= maxLen + if maxLen > 0 && maxLen < bufSize { + bufSize = maxLen } + r := bufio.NewReaderSize(rc, bufSize) + writeLine := func(tag, line []byte) { + timestamp := time.Now().AppendFormat(nil, timestampFormat) + data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter) + data = append(data, eol) + if _, err := w.Write(data); err != nil { + logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path) + // Continue on write error to drain the container output. + } + } + for { + var stop bool + newLine, isPrefix, err := r.ReadLine() + if err != nil { + if err == io.EOF { + logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path) + } else { + logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path) + } + if length == 0 { + // No content left to write, break. + break + } + // Stop after writing the content left in buffer. + stop = true + } else { + // Buffer returned by ReadLine will change after + // next read, copy it. + l := make([]byte, len(newLine)) + copy(l, newLine) + buf = append(buf, l) + length += len(l) + } + if maxLen > 0 && length > maxLen { + exceedLen := length - maxLen + last := buf[len(buf)-1] + if exceedLen > len(last) { + // exceedLen must <= len(last), or else the buffer + // should have be written in the previous iteration. + panic("exceed length should <= last buffer size") + } + buf[len(buf)-1] = last[:len(last)-exceedLen] + writeLine(partial, bytes.Join(buf, nil)) + buf = [][]byte{last[len(last)-exceedLen:]} + length = exceedLen + } + if isPrefix { + continue + } + writeLine(full, bytes.Join(buf, nil)) + buf = nil + length = 0 + if stop { + break + } + } + logrus.Debugf("Finish redirecting stream %q to log file %q", s, path) } diff --git a/pkg/server/io/logger_test.go b/pkg/server/io/logger_test.go index 54ed8d3ec..45d45cc47 100644 --- a/pkg/server/io/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -31,15 +31,19 @@ import ( ) func TestRedirectLogs(t *testing.T) { + // defaultBufSize is even number + const maxLen = defaultBufSize * 4 for desc, test := range map[string]struct { input string stream StreamType + maxLen int tag []runtime.LogTag content []string }{ "stdout log": { - input: "test stdout log 1\ntest stdout log 2", + input: "test stdout log 1\ntest stdout log 2\n", stream: Stdout, + maxLen: maxLen, tag: []runtime.LogTag{ runtime.LogTagFull, runtime.LogTagFull, @@ -50,8 +54,9 @@ func TestRedirectLogs(t *testing.T) { }, }, "stderr log": { - input: "test stderr log 1\ntest stderr log 2", + input: "test stderr log 1\ntest stderr log 2\n", stream: Stderr, + maxLen: maxLen, tag: []runtime.LogTag{ runtime.LogTagFull, runtime.LogTagFull, @@ -61,18 +66,160 @@ func TestRedirectLogs(t *testing.T) { "test stderr log 2", }, }, - "long log": { - input: strings.Repeat("a", 2*bufSize+10) + "\n", - stream: Stdout, + "log ends without newline": { + input: "test stderr log 1\ntest stderr log 2", + stream: Stderr, + maxLen: maxLen, tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + "test stderr log 1", + "test stderr log 2", + }, + }, + "log length equal to buffer size": { + input: strings.Repeat("a", defaultBufSize) + "\n" + strings.Repeat("a", defaultBufSize) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize), + strings.Repeat("a", defaultBufSize), + }, + }, + "log length longer than buffer size": { + input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*2+10), + strings.Repeat("a", defaultBufSize*2+20), + }, + }, + "log length equal to max length": { + input: strings.Repeat("a", maxLen) + "\n" + strings.Repeat("a", maxLen) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + }, + }, + "log length exceed max length by 1": { + input: strings.Repeat("a", maxLen+1) + "\n" + strings.Repeat("a", maxLen+1) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", maxLen), + "a", + strings.Repeat("a", maxLen), + "a", + }, + }, + "log length longer than max length": { + input: strings.Repeat("a", maxLen*2) + "\n" + strings.Repeat("a", maxLen*2+1) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, runtime.LogTagPartial, runtime.LogTagPartial, runtime.LogTagFull, }, content: []string{ - strings.Repeat("a", bufSize), - strings.Repeat("a", bufSize), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + "a", + }, + }, + "max length shorter than buffer size": { + input: strings.Repeat("a", defaultBufSize*3/2+10) + "\n" + strings.Repeat("a", defaultBufSize*3/2+20) + "\n", + stream: Stdout, + maxLen: defaultBufSize / 2, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), strings.Repeat("a", 10), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", 20), + }, + }, + "log length longer than max length, and (maxLen % defaultBufSize != 0)": { + input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n", + stream: Stdout, + maxLen: defaultBufSize * 3 / 2, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*3/2), + strings.Repeat("a", defaultBufSize*1/2+10), + strings.Repeat("a", defaultBufSize*3/2), + strings.Repeat("a", defaultBufSize*1/2+20), + }, + }, + "no limit if max length is 0": { + input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n", + stream: Stdout, + maxLen: 0, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*10+10), + strings.Repeat("a", defaultBufSize*10+20), + }, + }, + "no limit if max length is negative": { + input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n", + stream: Stdout, + maxLen: -1, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*10+10), + strings.Repeat("a", defaultBufSize*10+20), }, }, } { @@ -80,7 +227,7 @@ func TestRedirectLogs(t *testing.T) { rc := ioutil.NopCloser(strings.NewReader(test.input)) buf := bytes.NewBuffer(nil) wc := cioutil.NewNopWriteCloser(buf) - redirectLogs("test-path", rc, wc, test.stream) + redirectLogs("test-path", rc, wc, test.stream, test.maxLen) output := buf.String() lines := strings.Split(output, "\n") lines = lines[:len(lines)-1] // Discard empty string after last \n diff --git a/pkg/server/restart.go b/pkg/server/restart.go index d37f8a287..4b11b7d77 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -78,9 +78,7 @@ func (c *criService) recover(ctx context.Context) error { return errors.Wrap(err, "failed to list containers") } for _, container := range containers { - containerDir := c.getContainerRootDir(container.ID()) - volatileContainerDir := c.getVolatileContainerRootDir(container.ID()) - cntr, err := loadContainer(ctx, container, containerDir, volatileContainerDir) + cntr, err := c.loadContainer(ctx, container) if err != nil { logrus.WithError(err).Errorf("Failed to load container %q", container.ID()) continue @@ -149,8 +147,10 @@ func (c *criService) recover(ctx context.Context) error { } // loadContainer loads container from containerd and status checkpoint. -func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, volatileContainerDir string) (containerstore.Container, error) { +func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) { id := cntr.ID() + containerDir := c.getContainerRootDir(id) + volatileContainerDir := c.getVolatileContainerRootDir(id) var container containerstore.Container // Load container metadata. exts, err := cntr.Extensions(ctx) @@ -176,11 +176,21 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, // Load up-to-date status from containerd. var containerIO *cio.ContainerIO - t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (containerdio.IO, error) { - stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty()) + t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) { + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty()) if err != nil { return nil, err } + defer func() { + if err != nil { + if stdoutWC != nil { + stdoutWC.Close() + } + if stderrWC != nil { + stderrWC.Close() + } + } + }() containerIO, err = cio.NewContainerIO(id, cio.WithFIFOs(fifos), )