Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ func newConnection(
cancelFuncs: make(map[uint64]func()),
}

const hardMaxThreads = 100000

if c.cfg.MaxThreads > hardMaxThreads {
if errorLogger != nil {
errorLogger.Printf("fuse: max_threads %d is beyond the hard limit of %d, ignoring configuration", c.cfg.MaxThreads, hardMaxThreads)
}
c.cfg.MaxThreads = 0
}

// Initialize.
if err := c.Init(); err != nil {
c.close()
Expand Down Expand Up @@ -618,6 +627,12 @@ func (c *Connection) callbackForOp(op interface{}) func() {
return nil
}

// MaxThreads returns the maximum number of concurrent worker goroutines
// configured for this connection.
func (c *Connection) MaxThreads() uint32 {
return c.cfg.MaxThreads
}

// Close the connection. Must not be called until operations that were read
// from the connection have been responded to.
func (c *Connection) close() error {
Expand Down
18 changes: 17 additions & 1 deletion fuseutil/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
s.fs.Destroy()
}()

maxThreads := c.MaxThreads()
var sem chan struct{}
if maxThreads > 0 {
sem = make(chan struct{}, maxThreads)
}

for {
ctx, op, err := c.ReadOp()
if err == io.EOF {
Expand All @@ -122,7 +128,17 @@ func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
// cheap for the file system to handle
s.handleOp(c, ctx, op)
} else {
go s.handleOp(c, ctx, op)
if sem != nil {
sem <- struct{}{}
go func(ctx context.Context, op interface{}) {
defer func() {
<-sem
}()
s.handleOp(c, ctx, op)
}(ctx, op)
} else {
go s.handleOp(c, ctx, op)
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions mount_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ type MountConfig struct {
// to always provide ReadFileOp.Dst. If the file system populates ReadFileOp.Data,
// that data will be used for a vectored read, irrespective of this flag's value.
UseVectoredRead bool

// The maximum number of concurrent worker goroutines that can handle FUSE
// requests. If not set (or 0), defaults to unlimited (no capping).
MaxThreads uint32
}

type FUSEImpl uint8
Expand Down
131 changes: 131 additions & 0 deletions mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path"
"strings"
"testing"
"time"

"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
Expand Down Expand Up @@ -93,3 +94,133 @@ func TestNonexistentMountPoint(t *testing.T) {
t.Errorf("Unexpected error: %v", got)
}
}

////////////////////////////////////////////////////////////////////////
// blockingFS
////////////////////////////////////////////////////////////////////////

type blockingFS struct {
fuseutil.NotImplementedFileSystem
enteredCh chan string
releaseCh chan struct{}
}

func (fs *blockingFS) StatFS(
ctx context.Context,
op *fuseops.StatFSOp) error {
return nil
}

func (fs *blockingFS) GetInodeAttributes(
ctx context.Context,
op *fuseops.GetInodeAttributesOp) error {
if op.Inode == fuseops.RootInodeID {
op.Attributes = fuseops.InodeAttributes{
Mode: 0777 | os.ModeDir,
}
return nil
}
return fuse.ENOENT
}

func (fs *blockingFS) LookUpInode(
ctx context.Context,
op *fuseops.LookUpInodeOp) error {
fs.enteredCh <- op.Name

// Block until released.
<-fs.releaseCh

if op.Name == "foo" || op.Name == "bar" || op.Name == "baz" {
op.Entry = fuseops.ChildInodeEntry{
Child: 100, // Canned ID
Attributes: fuseops.InodeAttributes{
Mode: 0444,
},
}
return nil
}

return fuse.ENOENT
}

func TestMaxThreads(t *testing.T) {
ctx := context.Background()

// Set up a temporary directory.
dir, err := os.MkdirTemp("", "mount_test")
if err != nil {
t.Fatalf("os.MkdirTemp: %v", err)
}
defer os.RemoveAll(dir)

// Mount with MaxThreads = 2.
fs := &blockingFS{
enteredCh: make(chan string, 3),
releaseCh: make(chan struct{}),
}
mfs, err := fuse.Mount(
dir,
fuseutil.NewFileSystemServer(fs),
&fuse.MountConfig{
MaxThreads: 2,
EnableParallelDirOps: true,
})
if err != nil {
t.Fatalf("fuse.Mount: %v", err)
}
defer func() {
if err := mfs.Join(ctx); err != nil {
t.Errorf("Joining: %v", err)
}
}()
defer fuse.Unmount(mfs.Dir())

// Start 3 goroutines, each doing a path lookup.
errChan := make(chan error, 3)

go func() {
_, err := os.Stat(path.Join(dir, "foo"))
errChan <- err
}()
go func() {
_, err := os.Stat(path.Join(dir, "bar"))
errChan <- err
}()
go func() {
_, err := os.Stat(path.Join(dir, "baz"))
errChan <- err
}()

// Wait a bit to let all 3 requests be sent by the OS to FUSE.
// Since we set MaxThreads to 2, the third request should be blocked in the server
// and never reach LookUpInode.
time.Sleep(100 * time.Millisecond)

if got := len(fs.enteredCh); got != 2 {
t.Errorf("enteredOps was %d, expected 2", got)
}

// Release one operation.
fs.releaseCh <- struct{}{}

// Give the 3rd operation time to enter.
time.Sleep(100 * time.Millisecond)

if got := len(fs.enteredCh); got != 3 {
t.Errorf("enteredOps after one release was %d, expected 3", got)
}

// Release the other two.
fs.releaseCh <- struct{}{}
fs.releaseCh <- struct{}{}

// Wait for all 3 goroutines to finish.
for i := 0; i < 3; i++ {
// Note: os.Stat might fail with ENOENT but should not return other errors.
err := <-errChan
if err != nil && !os.IsNotExist(err) {
t.Errorf("os.Stat failed: %v", err)
}
}
}
Loading