diff --git a/connection.go b/connection.go index ba65e67b..c8f93f82 100644 --- a/connection.go +++ b/connection.go @@ -121,6 +121,15 @@ func newConnection( cancelFuncs: make(map[uint64]func()), } + const hardMaxThreads = 100000 + + if c.cfg.MaxThreads > hardMaxThreads { + if errorLogger != nil { + errorLogger.Printf("fuse: max_threads %d is beyond the hard limit of %d, ignoring configuration", c.cfg.MaxThreads, hardMaxThreads) + } + c.cfg.MaxThreads = 0 + } + // Initialize. if err := c.Init(); err != nil { c.close() @@ -618,6 +627,12 @@ func (c *Connection) callbackForOp(op interface{}) func() { return nil } +// MaxThreads returns the maximum number of concurrent worker goroutines +// configured for this connection. +func (c *Connection) MaxThreads() uint32 { + return c.cfg.MaxThreads +} + // Close the connection. Must not be called until operations that were read // from the connection have been responded to. func (c *Connection) close() error { diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index b1c27f56..9172528c 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -104,6 +104,12 @@ func (s *fileSystemServer) ServeOps(c *fuse.Connection) { s.fs.Destroy() }() + maxThreads := c.MaxThreads() + var sem chan struct{} + if maxThreads > 0 { + sem = make(chan struct{}, maxThreads) + } + for { ctx, op, err := c.ReadOp() if err == io.EOF { @@ -122,7 +128,17 @@ func (s *fileSystemServer) ServeOps(c *fuse.Connection) { // cheap for the file system to handle s.handleOp(c, ctx, op) } else { - go s.handleOp(c, ctx, op) + if sem != nil { + sem <- struct{}{} + go func(ctx context.Context, op interface{}) { + defer func() { + <-sem + }() + s.handleOp(c, ctx, op) + }(ctx, op) + } else { + go s.handleOp(c, ctx, op) + } } } } diff --git a/mount_config.go b/mount_config.go index f95895ad..95a91f4a 100644 --- a/mount_config.go +++ b/mount_config.go @@ -241,6 +241,10 @@ type MountConfig struct { // to always provide ReadFileOp.Dst. If the file system populates ReadFileOp.Data, // that data will be used for a vectored read, irrespective of this flag's value. UseVectoredRead bool + + // The maximum number of concurrent worker goroutines that can handle FUSE + // requests. If not set (or 0), defaults to unlimited (no capping). + MaxThreads uint32 } type FUSEImpl uint8 diff --git a/mount_test.go b/mount_test.go index 87cc36a2..cb5563dd 100644 --- a/mount_test.go +++ b/mount_test.go @@ -7,6 +7,7 @@ import ( "path" "strings" "testing" + "time" "github.com/jacobsa/fuse" "github.com/jacobsa/fuse/fuseops" @@ -93,3 +94,133 @@ func TestNonexistentMountPoint(t *testing.T) { t.Errorf("Unexpected error: %v", got) } } + +//////////////////////////////////////////////////////////////////////// +// blockingFS +//////////////////////////////////////////////////////////////////////// + +type blockingFS struct { + fuseutil.NotImplementedFileSystem + enteredCh chan string + releaseCh chan struct{} +} + +func (fs *blockingFS) StatFS( + ctx context.Context, + op *fuseops.StatFSOp) error { + return nil +} + +func (fs *blockingFS) GetInodeAttributes( + ctx context.Context, + op *fuseops.GetInodeAttributesOp) error { + if op.Inode == fuseops.RootInodeID { + op.Attributes = fuseops.InodeAttributes{ + Mode: 0777 | os.ModeDir, + } + return nil + } + return fuse.ENOENT +} + +func (fs *blockingFS) LookUpInode( + ctx context.Context, + op *fuseops.LookUpInodeOp) error { + fs.enteredCh <- op.Name + + // Block until released. + <-fs.releaseCh + + if op.Name == "foo" || op.Name == "bar" || op.Name == "baz" { + op.Entry = fuseops.ChildInodeEntry{ + Child: 100, // Canned ID + Attributes: fuseops.InodeAttributes{ + Mode: 0444, + }, + } + return nil + } + + return fuse.ENOENT +} + +func TestMaxThreads(t *testing.T) { + ctx := context.Background() + + // Set up a temporary directory. + dir, err := os.MkdirTemp("", "mount_test") + if err != nil { + t.Fatalf("os.MkdirTemp: %v", err) + } + defer os.RemoveAll(dir) + + // Mount with MaxThreads = 2. + fs := &blockingFS{ + enteredCh: make(chan string, 3), + releaseCh: make(chan struct{}), + } + mfs, err := fuse.Mount( + dir, + fuseutil.NewFileSystemServer(fs), + &fuse.MountConfig{ + MaxThreads: 2, + EnableParallelDirOps: true, + }) + if err != nil { + t.Fatalf("fuse.Mount: %v", err) + } + defer func() { + if err := mfs.Join(ctx); err != nil { + t.Errorf("Joining: %v", err) + } + }() + defer fuse.Unmount(mfs.Dir()) + + // Start 3 goroutines, each doing a path lookup. + errChan := make(chan error, 3) + + go func() { + _, err := os.Stat(path.Join(dir, "foo")) + errChan <- err + }() + go func() { + _, err := os.Stat(path.Join(dir, "bar")) + errChan <- err + }() + go func() { + _, err := os.Stat(path.Join(dir, "baz")) + errChan <- err + }() + + // Wait a bit to let all 3 requests be sent by the OS to FUSE. + // Since we set MaxThreads to 2, the third request should be blocked in the server + // and never reach LookUpInode. + time.Sleep(100 * time.Millisecond) + + if got := len(fs.enteredCh); got != 2 { + t.Errorf("enteredOps was %d, expected 2", got) + } + + // Release one operation. + fs.releaseCh <- struct{}{} + + // Give the 3rd operation time to enter. + time.Sleep(100 * time.Millisecond) + + if got := len(fs.enteredCh); got != 3 { + t.Errorf("enteredOps after one release was %d, expected 3", got) + } + + // Release the other two. + fs.releaseCh <- struct{}{} + fs.releaseCh <- struct{}{} + + // Wait for all 3 goroutines to finish. + for i := 0; i < 3; i++ { + // Note: os.Stat might fail with ENOENT but should not return other errors. + err := <-errChan + if err != nil && !os.IsNotExist(err) { + t.Errorf("os.Stat failed: %v", err) + } + } +}