Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package fuse

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -80,6 +81,8 @@ type Connection struct {
// Freelists, serviced by freelists.go.
inMessages freelist.Freelist // GUARDED_BY(mu)
outMessages freelist.Freelist // GUARDED_BY(mu)

inMessageSize int
}

// State that is maintained for each in-flight op. This is stuffed into the
Expand Down Expand Up @@ -121,6 +124,12 @@ func newConnection(
cancelFuncs: make(map[uint64]func()),
}

maxPayload := max(buffer.MaxReadSize, buffer.MaxWriteSize)
if cfg.MaxMessageSize > 0 {
maxPayload = max(maxPayload, int(cfg.MaxMessageSize))
}
c.inMessageSize = maxPayload + buffer.GetPageSize()

// Initialize.
if err := c.Init(); err != nil {
c.close()
Expand Down Expand Up @@ -172,7 +181,9 @@ func (c *Connection) Init() error {
// Respond to the init op.
initOp.Library = c.protocol
initOp.MaxReadahead = maxReadahead
initOp.MaxWrite = buffer.MaxWriteSize

maxPayload := c.inMessageSize - buffer.GetPageSize()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we anticipate different sizes for read and write. if not can we have just one variable which tells size of the request for both reads and writes.

initOp.MaxWrite = uint32(maxPayload)

initOp.Flags = 0

Expand All @@ -190,7 +201,6 @@ func (c *Connection) Init() error {
// payload. It applies to both requests and replies, and does not include
// the extra 1 page for the FUSE header and the "args" struct. We set it to
// the max of our message in/out payload sizes.
maxPayload := max(buffer.MaxReadSize, buffer.MaxWriteSize)
initOp.MaxPages = uint16(maxPayload / buffer.GetPageSize())

// Enable writeback caching if the user hasn't asked us not to.
Expand Down Expand Up @@ -376,6 +386,7 @@ func (c *Connection) handleInterrupt(fuseID uint64) {
func (c *Connection) readMessage() (*buffer.InMessage, error) {
// Allocate a message.
m := c.getInMessage()
m.AllocBlocks(c.inMessageSize)

// Loop past transient errors.
for {
Expand All @@ -389,15 +400,11 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
// * EINTR means we should try again. (This seems to happen often on
// OS X, cf. http://golang.org/issue/11180)
//
if pe, ok := err.(*os.PathError); ok {
switch pe.Err {
case syscall.ENODEV:
err = io.EOF

case syscall.EINTR:
err = nil
continue
}
if errors.Is(err, syscall.ENODEV) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are removing a typecasting here? Are these changes intentiontal? If yes, how did they work earlier?

err = io.EOF
} else if errors.Is(err, syscall.EINTR) {
err = nil
continue
}

if err != nil {
Expand Down
52 changes: 37 additions & 15 deletions conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,14 @@ func convertInMessage(
}

entries := make([]fuseops.BatchForgetEntry, 0, in.Count)
for i := uint32(0); i < in.Count; i++ {
type entry fusekernel.BatchForgetEntryIn
ein := (*entry)(inMsg.Consume(unsafe.Sizeof(entry{})))
if ein == nil {
return nil, errors.New("Corrupt OpBatchForget")
}
entrySize := unsafe.Sizeof(fusekernel.BatchForgetEntryIn{})
buf := inMsg.ConsumeBytes(uintptr(in.Count) * entrySize)
if len(buf) < int(in.Count*uint32(entrySize)) {
return nil, errors.New("Corrupt OpBatchForget")
}

for i := uint32(0); i < in.Count; i++ {
ein := (*fusekernel.BatchForgetEntryIn)(unsafe.Pointer(&buf[uintptr(i)*entrySize]))
entries = append(entries, fuseops.BatchForgetEntry{
Inode: fuseops.InodeID(ein.Inode),
N: ein.Nlookup,
Expand Down Expand Up @@ -396,7 +397,11 @@ func convertInMessage(
},
}
// Use part of the incoming message storage as the read buffer.
to.Dst = inMsg.GetFree(int(in.Size))
if config.EnableVectoredReads && int(in.Size) > buffer.MiBPlusPageSize {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brainstorming a bit here. why do we need to support both vectoredReads and non vectoredReads. Can we just pass 2-D array always. It would be a minor change on the GCSFuse side. How big of a change will it be on GCSFuse side? I am guessing we can just pick the first block from the array and pass it downstream when messageSize is 1MB?

to.DstBufs = inMsg.GetFreeVector(int(in.Size))
} else {
to.Dst = inMsg.GetFree(int(in.Size))
}
o = to

case fusekernel.OpReaddir:
Expand Down Expand Up @@ -498,16 +503,31 @@ func convertInMessage(
return nil, errors.New("Corrupt OpWrite")
}

buf := inMsg.ConsumeBytes(inMsg.Len())
if len(buf) < int(in.Size) {
return nil, errors.New("Corrupt OpWrite")
var buf []byte
var dataBlocks [][]byte

if config.EnableVectoredWrites && inMsg.Len() > uintptr(buffer.MiBPlusPageSize) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by moving everything to vectoredReads/writes we need not do if-else every where. the code becomes much simpler.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduces the number of configs too.

dataBlocks = inMsg.ConsumeVector(inMsg.Len())
var totalLen int
for _, b := range dataBlocks {
totalLen += len(b)
}
if totalLen < int(in.Size) {
return nil, errors.New("Corrupt OpWrite")
}
} else {
buf = inMsg.ConsumeBytes(inMsg.Len())
if len(buf) < int(in.Size) {
return nil, errors.New("Corrupt OpWrite")
}
}

o = &fuseops.WriteFileOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Data: buf,
Offset: int64(in.Offset),
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Data: buf,
DataBlocks: dataBlocks,
Offset: int64(in.Offset),
OpContext: fuseops.OpContext{
FuseID: inMsg.Header().Unique,
Pid: inMsg.Header().Pid,
Expand Down Expand Up @@ -933,14 +953,16 @@ func (c *Connection) kernelResponseForOp(
case *fuseops.ReadFileOp:
if o.Data != nil {
m.Append(o.Data...)
} else if o.DstBufs != nil {
m.Append(o.DstBufs...)
} else {
m.Append(o.Dst)
}
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)

case *fuseops.WriteFileOp:
out := (*fusekernel.WriteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.WriteOut{}))))
out.Size = uint32(len(o.Data))
out.Size = uint32(o.TotalSize())

case *fuseops.SyncFileOp:
// Empty response
Expand Down
2 changes: 1 addition & 1 deletion debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func describeRequest(op interface{}) (s string) {
case *fuseops.WriteFileOp:
addComponent("handle %d", typed.Handle)
addComponent("offset %d", typed.Offset)
addComponent("%d bytes", len(typed.Data))
addComponent("%d bytes", typed.TotalSize())

case *fuseops.RemoveXattrOp:
addComponent("name %s", typed.Name)
Expand Down
3 changes: 2 additions & 1 deletion freelists.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ func (c *Connection) getInMessage() *buffer.InMessage {
c.mu.Unlock()

if x == nil {
x = buffer.NewInMessage()
x = buffer.NewInMessage(c.inMessageSize)
}

return x
}

// LOCKS_EXCLUDED(c.mu)
func (c *Connection) putInMessage(x *buffer.InMessage) {
x.FreeBlocks()
c.mu.Lock()
c.inMessages.Put(unsafe.Pointer(x))
c.mu.Unlock()
Expand Down
30 changes: 28 additions & 2 deletions fuseops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,11 +711,24 @@ type ReadFileOp struct {

// The destination buffer, whose length gives the size of the read.
// The file system can write to this buffer for non-vectored reads.
// Note: Dst will be nil if EnableVectoredReads is enabled and the read size
// is larger than the pre-allocated buffer size. In this case, the file system
// must use DstBufs (if populated) or allocate its own buffers and return them
// via the Data field.
Dst []byte

// Set by the file system:
// The destination buffers for vectored reads, whose total length gives the
// size of the read.
// The file system can write directly to these buffers for vectored reads.
// Note: DstBufs will be nil if EnableVectoredReads is disabled, or if the
// read size is smaller than the pre-allocated incoming message block (in
// which case Dst will be populated instead).
DstBufs [][]byte

// Set by the file system (optional):
// A list of slices of data to send back to the client.
// If this field is populated, the contents of `Dst` will be ignored.
// If this field is populated, the contents of `Dst` and `DstBufs` will be ignored.
// If both `Dst` and `DstBufs` are nil, this field MUST be populated to return any data.
Data [][]byte

// Set by the file system: the number of bytes read.
Expand Down Expand Up @@ -796,6 +809,7 @@ type WriteFileOp struct {
// to be because it uses file mmapping machinery
// (https://tinyurl.com/avxy3dvm) to write a page at a time.
Data []byte
DataBlocks [][]byte
OpContext OpContext

// If set, this function will be invoked after the operation response has been
Expand All @@ -804,6 +818,18 @@ type WriteFileOp struct {
Callback func()
}

// TotalSize returns the total size of the write payload.
func (o *WriteFileOp) TotalSize() int {
dataLen := len(o.Data)
if dataLen == 0 && len(o.DataBlocks) > 0 {
for _, b := range o.DataBlocks {
dataLen += len(b)
}
}
return dataLen
}


// Synchronize the current contents of an open file to storage.
//
// vfs.txt documents this as being called for by the fsync(2) system call
Expand Down
Loading