diff --git a/cmd/nerdctl/container/container_run_restart_linux_test.go b/cmd/nerdctl/container/container_run_restart_linux_test.go index 795550696f6..04e2721a41f 100644 --- a/cmd/nerdctl/container/container_run_restart_linux_test.go +++ b/cmd/nerdctl/container/container_run_restart_linux_test.go @@ -53,7 +53,8 @@ func TestRunRestart(t *testing.T) { "--name", testContainerName, "-p", fmt.Sprintf("127.0.0.1:%d:80", hostPort), testutil.NginxAlpineImage).AssertOK() - + inspectedContainer := base.InspectContainer(testContainerName) + pid := inspectedContainer.State.Pid check := func(httpGetRetry int) error { resp, err := nettestutil.HTTPGet(fmt.Sprintf("http://127.0.0.1:%d", hostPort), httpGetRetry, false) if err != nil { @@ -87,6 +88,9 @@ func TestRunRestart(t *testing.T) { } time.Sleep(sleep) } + inspectedContainer = base.InspectContainer(testContainerName) + assert.Equal(t, inspectedContainer.State.Status, "running") + assert.Equal(t, inspectedContainer.State.Pid, pid) base.DumpDaemonLogs(10) t.Fatalf("the container does not seem to be restarted") } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..dbbda004e93 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -26,7 +26,6 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "time" @@ -165,49 +164,9 @@ func WaitForLogger(dataStore, ns, id string) error { }) } -func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { - client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace)) - if err != nil { - return nil, err - } - con, err := client.LoadContainer(ctx, config.ID) - if err != nil { - return nil, err - } - - task, err := con.Task(ctx, nil) - if err == nil { - return task.Wait(ctx) - } - if !errdefs.IsNotFound(err) { - return nil, err - } - - // If task was not found, it's possible that the container runtime is still being created. - // Retry every 100ms. - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil, errors.New("timed out waiting for container task to start") - case <-ticker.C: - task, err = con.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - continue - } - return nil, err - } - return task.Wait(ctx) - } - } -} - type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) -func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { return err } @@ -220,6 +179,15 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres if err != nil { return err } + stdoutChan, err := waitIOClose(config.Stdout) + if err != nil { + return err + } + stderrChan, err := waitIOClose(config.Stderr) + if err != nil { + return err + } + go func() { <-ctx.Done() // delivered on SIGTERM stdoutR.Cancel() @@ -230,9 +198,7 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres pipeStdoutR, pipeStdoutW := io.Pipe() pipeStderrR, pipeStderrW := io.Pipe() copyStream := func(reader io.Reader, writer *io.PipeWriter) { - // copy using a buffer of size 32K - buf := make([]byte, 32<<10) - _, err := io.CopyBuffer(writer, reader, buf) + _, err := io.Copy(writer, reader) if err != nil { log.G(ctx).Errorf("failed to copy stream: %s", err) } @@ -273,13 +239,8 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres // close pipeStdoutW and pipeStderrW upon container exit defer pipeStdoutW.Close() defer pipeStderrW.Close() - - exitCh, err := getContainerWait(ctx, address, config) - if err != nil { - log.G(ctx).Errorf("failed to get container task wait channel: %v", err) - return - } - <-exitCh + <-stdoutChan + <-stderrChan }() wg.Wait() return driver.PostProcess() @@ -314,7 +275,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { return err } // getContainerWait is extracted as parameter to allow mocking in tests. - return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config) + return loggingProcessAdapter(ctx, driver, dataStore, config) }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 diff --git a/pkg/logging/logging_bsd.go b/pkg/logging/logging_bsd.go new file mode 100644 index 00000000000..1c1c420dff2 --- /dev/null +++ b/pkg/logging/logging_bsd.go @@ -0,0 +1,67 @@ +//go:build darwin || freebsd || netbsd || openbsd || dragonfly +// +build darwin freebsd netbsd openbsd dragonfly + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import ( + "fmt" + "io" + + "github.com/muesli/cancelreader" + "golang.org/x/sys/unix" +) + +func waitIOClose(reader io.Reader) (chan struct{}, error) { + closeIO := make(chan struct{}) + file, ok := reader.(cancelreader.File) + if !ok { + return nil, fmt.Errorf("reader is not an cancelreader.File") + } + + kq, err := unix.Kqueue() + if err != nil { + return nil, fmt.Errorf("create kqueue: %w", err) + } + kev := unix.Kevent_t{ + Ident: uint64(file.Fd()), + Filter: unix.EVFILT_READ, + Flags: unix.EV_ADD | unix.EV_ENABLE, + } + + events := make([]unix.Kevent_t, 1) + _, err = unix.Kevent(kq, []unix.Kevent_t{kev}, events, nil) + if err != nil { + return nil, err + } + go func() { + for { + n, err := unix.Kevent(kq, nil, events, nil) + if err != nil { + continue + } + for i := 0; i < n; i++ { + if events[i].Flags&unix.EV_EOF != 0 { + close(closeIO) + return + } + } + } + }() + return closeIO, nil +} diff --git a/pkg/logging/logging_linux.go b/pkg/logging/logging_linux.go new file mode 100644 index 00000000000..0a11528f181 --- /dev/null +++ b/pkg/logging/logging_linux.go @@ -0,0 +1,64 @@ +//go:build linux +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import ( + "fmt" + "io" + "os" + + "golang.org/x/sys/unix" +) + +func waitIOClose(reader io.Reader) (chan struct{}, error) { + closeIO := make(chan struct{}) + epfd, err := unix.EpollCreate1(0) + if err != nil { + return nil, err + } + file, ok := reader.(*os.File) + if !ok { + return nil, fmt.Errorf("reader is not an cancelreader.File") + } + fd := file.Fd() + event := unix.EpollEvent{ + Events: unix.EPOLLHUP, + Fd: int32(fd), + } + if err := unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, int(fd), &event); err != nil { + return nil, err + } + events := make([]unix.EpollEvent, 1) + go func() { + for { + n, err := unix.EpollWait(epfd, events, -1) + if err != nil { + continue + } + for i := 0; i < n; i++ { + if events[i].Events&unix.EPOLLHUP != 0 { + close(closeIO) + return + } + } + } + }() + return closeIO, nil +} diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index da0d535b074..07c5dd292c3 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -18,14 +18,13 @@ package logging import ( "bufio" - "bytes" "context" "math/rand" + "os" "strings" "testing" "time" - containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" ) @@ -68,37 +67,38 @@ func TestLoggingProcessAdapter(t *testing.T) { // Prepare mock driver and logging config driver := &MockDriver{} - stdoutBuffer := bytes.NewBufferString(normalString) - stderrBuffer := bytes.NewBufferString(hugeString) + stdoutReader, stdoutWriter, _ := os.Pipe() + stderrReader, stderrWriter, _ := os.Pipe() config := &logging.Config{ - Stdout: stdoutBuffer, - Stderr: stderrBuffer, + Stdout: stdoutReader, + Stderr: stderrReader, } - // Execute the logging process adapter ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { - exitChan := make(chan containerd.ExitStatus, 1) + go func() { + stdoutWriter.Write([]byte(normalString)) + }() + go func() { + stderrWriter.Write([]byte(hugeString)) + }() + + go func() { time.Sleep(50 * time.Millisecond) - exitChan <- containerd.ExitStatus{} - return exitChan, nil - } + stdoutWriter.Close() + stderrWriter.Close() + }() - err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config) + err := loggingProcessAdapter(ctx, driver, "testDataStore", config) if err != nil { t.Fatal(err) } - // let bufio read the buffer - time.Sleep(50 * time.Millisecond) - // Verify that the driver methods were called if !driver.processed { t.Fatal("process should be processed") } - // Verify that the driver received the expected data stdout := strings.Join(driver.receivedStdout, "\n") stderr := strings.Join(driver.receivedStderr, "\n") diff --git a/pkg/logging/logging_windows.go b/pkg/logging/logging_windows.go new file mode 100644 index 00000000000..e2c7e9953f2 --- /dev/null +++ b/pkg/logging/logging_windows.go @@ -0,0 +1,24 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import "io" + +// TODO: support windows +func waitIOClose(reader io.Reader) (chan struct{}, error) { + return nil, nil +}