diff --git a/cmd/web.go b/cmd/web.go index 4723ddbbdd296..6e39db2178da0 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -156,7 +156,6 @@ func serveInstall(cmd *cli.Command) error { case <-graceful.GetManager().IsShutdown(): <-graceful.GetManager().Done() log.Info("PID: %d Gitea Web Finished", os.Getpid()) - log.GetManager().Close() return err default: } @@ -231,7 +230,6 @@ func serveInstalled(c *cli.Command) error { err := listen(webRoutes, true) <-graceful.GetManager().Done() log.Info("PID: %d Gitea Web Finished", os.Getpid()) - log.GetManager().Close() return err } diff --git a/main.go b/main.go index 2c25bac4e3dd2..bc2121b1e7479 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,7 @@ func main() { } app := cmd.NewMainApp(cmd.AppVersion{Version: Version, Extra: formatBuiltWith()}) _ = cmd.RunMainApp(app, os.Args...) // all errors should have been handled by the RunMainApp + // flush the queued logs before exiting, it is a MUST, otherwise there will be log loss log.GetManager().Close() } diff --git a/modules/graceful/server.go b/modules/graceful/server.go index 2525a83e77520..b440f68ab5570 100644 --- a/modules/graceful/server.go +++ b/modules/graceful/server.go @@ -11,7 +11,6 @@ import ( "os" "strings" "sync" - "sync/atomic" "syscall" "time" @@ -30,12 +29,15 @@ type ServeFunction = func(net.Listener) error // Server represents our graceful server type Server struct { - network string - address string - listener net.Listener - wg sync.WaitGroup - state state - lock *sync.RWMutex + network string + address string + listener net.Listener + + lock sync.RWMutex + state state + connCounter int64 + connEmptyCond *sync.Cond + BeforeBegin func(network, address string) OnShutdown func() PerWriteTimeout time.Duration @@ -50,14 +52,13 @@ func NewServer(network, address, name string) *Server { log.Info("Starting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid()) } srv := &Server{ - wg: sync.WaitGroup{}, state: stateInit, - lock: &sync.RWMutex{}, network: network, address: address, PerWriteTimeout: setting.PerWriteTimeout, PerWritePerKbTimeout: setting.PerWritePerKbTimeout, } + srv.connEmptyCond = sync.NewCond(&srv.lock) srv.BeforeBegin = func(network, addr string) { log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid()) @@ -154,7 +155,7 @@ func (srv *Server) Serve(serve ServeFunction) error { GetManager().RegisterServer() err := serve(srv.listener) log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) - srv.wg.Wait() + srv.waitForActiveConnections() srv.setState(stateTerminate) GetManager().ServerDone() // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil @@ -178,16 +179,62 @@ func (srv *Server) setState(st state) { srv.state = st } +func (srv *Server) waitForActiveConnections() { + srv.lock.Lock() + for srv.connCounter > 0 { + srv.connEmptyCond.Wait() + } + srv.lock.Unlock() +} + +func (srv *Server) wrapConnection(c net.Conn) (net.Conn, error) { + srv.lock.Lock() + defer srv.lock.Unlock() + + if srv.state != stateRunning { + _ = c.Close() + return nil, syscall.EINVAL // same as AcceptTCP + } + + srv.connCounter++ + return &wrappedConn{Conn: c, server: srv}, nil +} + +func (srv *Server) removeConnection(_ *wrappedConn) { + srv.lock.Lock() + defer srv.lock.Unlock() + + srv.connCounter-- + if srv.connCounter <= 0 { + srv.connEmptyCond.Broadcast() + } +} + +// closeAllConnections forcefully closes all active connections +func (srv *Server) closeAllConnections() { + srv.lock.Lock() + if srv.connCounter > 0 { + log.Warn("After graceful shutdown period, %d connections are still active. Forcefully close.", srv.connCounter) + srv.connCounter = 0 // OS will close all the connections after the process exits, so we just assume there is no active connection now + } + srv.lock.Unlock() + srv.connEmptyCond.Broadcast() +} + type filer interface { File() (*os.File, error) } type wrappedListener struct { net.Listener - stopped bool - server *Server + server *Server } +var ( + _ net.Listener = (*wrappedListener)(nil) + _ filer = (*wrappedListener)(nil) +) + func newWrappedListener(l net.Listener, srv *Server) *wrappedListener { return &wrappedListener{ Listener: l, @@ -195,46 +242,24 @@ func newWrappedListener(l net.Listener, srv *Server) *wrappedListener { } } -func (wl *wrappedListener) Accept() (net.Conn, error) { - var c net.Conn - // Set keepalive on TCPListeners connections. +func (wl *wrappedListener) Accept() (c net.Conn, err error) { if tcl, ok := wl.Listener.(*net.TCPListener); ok { + // Set keepalive on TCPListeners connections if possible, see http.tcpKeepAliveListener tc, err := tcl.AcceptTCP() if err != nil { return nil, err } - _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener - _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener + _ = tc.SetKeepAlive(true) + _ = tc.SetKeepAlivePeriod(3 * time.Minute) c = tc } else { - var err error c, err = wl.Listener.Accept() if err != nil { return nil, err } } - closed := int32(0) - - c = &wrappedConn{ - Conn: c, - server: wl.server, - closed: &closed, - perWriteTimeout: wl.server.PerWriteTimeout, - perWritePerKbTimeout: wl.server.PerWritePerKbTimeout, - } - - wl.server.wg.Add(1) - return c, nil -} - -func (wl *wrappedListener) Close() error { - if wl.stopped { - return syscall.EINVAL - } - - wl.stopped = true - return wl.Listener.Close() + return wl.server.wrapConnection(c) } func (wl *wrappedListener) File() (*os.File, error) { @@ -244,17 +269,14 @@ func (wl *wrappedListener) File() (*os.File, error) { type wrappedConn struct { net.Conn - server *Server - closed *int32 - deadline time.Time - perWriteTimeout time.Duration - perWritePerKbTimeout time.Duration + server *Server + deadline time.Time } func (w *wrappedConn) Write(p []byte) (n int, err error) { - if w.perWriteTimeout > 0 { - minTimeout := time.Duration(len(p)/1024) * w.perWritePerKbTimeout - minDeadline := time.Now().Add(minTimeout).Add(w.perWriteTimeout) + if w.server.PerWriteTimeout > 0 { + minTimeout := time.Duration(len(p)/1024) * w.server.PerWritePerKbTimeout + minDeadline := time.Now().Add(minTimeout).Add(w.server.PerWriteTimeout) w.deadline = w.deadline.Add(minTimeout) if minDeadline.After(w.deadline) { @@ -266,19 +288,6 @@ func (w *wrappedConn) Write(p []byte) (n int, err error) { } func (w *wrappedConn) Close() error { - if atomic.CompareAndSwapInt32(w.closed, 0, 1) { - defer func() { - if err := recover(); err != nil { - select { - case <-GetManager().IsHammer(): - // Likely deadlocked request released at hammertime - log.Warn("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown.", err) - default: - log.Error("Panic during connection close! %v", err) - } - } - }() - w.server.wg.Done() - } + w.server.removeConnection(w) return w.Conn.Close() } diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go index 9b67589571c35..b800c32503e32 100644 --- a/modules/graceful/server_hooks.go +++ b/modules/graceful/server_hooks.go @@ -5,7 +5,6 @@ package graceful import ( "os" - "runtime" "code.gitea.io/gitea/modules/log" ) @@ -48,26 +47,8 @@ func (srv *Server) doShutdown() { } func (srv *Server) doHammer() { - defer func() { - // We call srv.wg.Done() until it panics. - // This happens if we call Done() when the WaitGroup counter is already at 0 - // So if it panics -> we're done, Serve() will return and the - // parent will goroutine will exit. - if r := recover(); r != nil { - log.Error("WaitGroup at 0: Error: %v", r) - } - }() if srv.getState() != stateShuttingDown { return } - log.Warn("Forcefully shutting down parent") - for { - if srv.getState() == stateTerminate { - break - } - srv.wg.Done() - - // Give other goroutines a chance to finish before we forcibly stop them. - runtime.Gosched() - } + srv.closeAllConnections() }