-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(prefork): add support for process monitoring and callbacks in prefork mode #4037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @ReneWerner87, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors Fiber's prefork implementation by integrating the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the prefork implementation to leverage fasthttp's native capabilities, introducing OnPreforkServe for custom listeners and PreforkRecoverThreshold for process monitoring. While these are valuable additions, a critical supply chain vulnerability has been identified: the go.mod file replaces a core dependency with a personal fork using a suspicious, future-dated pseudo-version. This practice should be remediated by using official, verified dependencies. Furthermore, there is a critical issue with a missing type definition that will prevent compilation, and an opportunity to refactor duplicated code to enhance maintainability.
| Network: cfg.ListenerNetwork, | ||
| Reuseport: true, // Fiber uses reuseport by default | ||
| RecoverThreshold: recoverThreshold, | ||
| Logger: preforkLogger{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type preforkLogger is used here, but it doesn't appear to be defined in the project. This will cause a compilation error.
If the intention is to integrate with Fiber's logger, an adapter is needed to implement fasthttp/prefork's Logger interface. For example:
type preforkLogger struct{}
// Printf implements the fasthttp/prefork.Logger interface.
func (pl preforkLogger) Printf(format string, args ...any) {
log.Infof(format, args...)
}Please add the necessary definition for preforkLogger.
go.mod
Outdated
|
|
||
| go 1.25.0 | ||
|
|
||
| replace github.com/valyala/fasthttp => github.com/ReneWerner87/fasthttp v1.62.1-0.20260128075009-77f4c9b092d0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The replace directive substitutes the official github.com/valyala/fasthttp module with a personal fork (github.com/ReneWerner87/fasthttp) using a suspicious pseudo-version (v1.62.1-0.20260128075009-77f4c9b092d0). The timestamp in this pseudo-version (2026-01-28) appears to be in the future, which is highly irregular and can be used to force the Go toolchain to prefer this version over others. Replacing core dependencies with unverified personal forks introduces a significant supply chain risk, as the fork could contain malicious code or lack critical security updates present in the official version. It is recommended to use official releases or, if a patch is required, to use a properly vetted and tagged version from a trusted repository.
| func (app *App) prefork(addr string, tlsConfig *tls.Config, cfg *ListenConfig) error { | ||
| if cfg == nil { | ||
| cfg = &ListenConfig{} | ||
| } | ||
| var ln net.Listener | ||
| var err error | ||
|
|
||
| // 👶 child process 👶 | ||
| if IsChild() { | ||
| // use 1 cpu core per child process | ||
| runtime.GOMAXPROCS(1) | ||
| // Linux will use SO_REUSEPORT and Windows falls back to SO_REUSEADDR | ||
| // Only tcp4 or tcp6 is supported when preforking, both are not supported | ||
| if ln, err = reuseport.Listen(cfg.ListenerNetwork, addr); err != nil { | ||
| // Determine RecoverThreshold | ||
| recoverThreshold := cfg.PreforkRecoverThreshold | ||
| if recoverThreshold == 0 { | ||
| recoverThreshold = runtime.GOMAXPROCS(0) / 2 | ||
| } | ||
|
|
||
| // Create FastHTTP Prefork instance | ||
| p := &prefork.Prefork{ | ||
| Network: cfg.ListenerNetwork, | ||
| Reuseport: true, // Fiber uses reuseport by default | ||
| RecoverThreshold: recoverThreshold, | ||
| Logger: preforkLogger{}, | ||
| WatchMaster: true, // Enable master process watching | ||
| } | ||
|
|
||
| // Configure ServeFunc for child processes | ||
| p.ServeFunc = func(ln net.Listener) error { | ||
| // Child process setup | ||
| if prefork.IsChild() { | ||
| // Wrap listener with TLS if configured | ||
| if tlsConfig != nil { | ||
| ln = tls.NewListener(ln, tlsConfig) | ||
| } | ||
|
|
||
| // Avoid startup message collision | ||
| if !cfg.DisableStartupMessage { | ||
| time.Sleep(sleepDuration) // avoid colliding with startup message | ||
| time.Sleep(sleepDuration) | ||
| } | ||
|
|
||
| // Prepare the server for the start | ||
| app.startupProcess() | ||
|
|
||
| // Call ListenerAddrFunc if provided | ||
| if cfg.ListenerAddrFunc != nil { | ||
| cfg.ListenerAddrFunc(ln.Addr()) | ||
| } | ||
| return fmt.Errorf("prefork: %w", err) | ||
| } | ||
| // wrap a tls config around the listener if provided | ||
| if tlsConfig != nil { | ||
| ln = tls.NewListener(ln, tlsConfig) | ||
|
|
||
| // Serve requests | ||
| return app.server.Serve(ln) | ||
| } | ||
|
|
||
| // Configure OnChildSpawn callback | ||
| p.OnChildSpawn = func(pid int) error { | ||
| if app.hooks != nil { | ||
| if testOnPrefork { | ||
| app.hooks.executeOnForkHooks(dummyPid) | ||
| } else { | ||
| app.hooks.executeOnForkHooks(pid) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Configure OnMasterReady callback | ||
| p.OnMasterReady = func(childPIDs []int) error { | ||
| // Prepare listen data with child PIDs | ||
| listenData := app.prepareListenData(addr, tlsConfig != nil, cfg, childPIDs) | ||
|
|
||
| // kill current child proc when master exits | ||
| go watchMaster() | ||
| // Run OnListen hooks | ||
| app.runOnListenHooks(listenData) | ||
|
|
||
| // prepare the server for the start | ||
| app.startupProcess() | ||
| // Display startup message | ||
| app.startupMessage(listenData, cfg) | ||
|
|
||
| if cfg.ListenerAddrFunc != nil { | ||
| cfg.ListenerAddrFunc(ln.Addr()) | ||
| // Print routes if enabled | ||
| if cfg.EnablePrintRoutes { | ||
| app.printRoutesMessage() | ||
| } | ||
|
|
||
| // listen for incoming connections | ||
| return app.server.Serve(ln) | ||
| return nil | ||
| } | ||
|
|
||
| // Configure OnChildRecover callback for monitoring | ||
| p.OnChildRecover = func(pid int) error { | ||
| log.Warnf("prefork: child process crashed and has been recovered with new PID %d", pid) | ||
|
|
||
| // Execute OnFork hook for recovered process if hooks are available | ||
| if app.hooks != nil { | ||
| app.hooks.executeOnForkHooks(pid) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Start the prefork server | ||
| return p.ListenAndServe(addr) | ||
| } | ||
|
|
||
| // preforkListener manages child processes for prefork mode with a custom listener. | ||
| // This allows using prefork with app.Listener() when the user provides an OnPreforkServe callback. | ||
| func (app *App) preforkListener(ln net.Listener, cfg *ListenConfig) error { | ||
| if cfg == nil { | ||
| cfg = &ListenConfig{} | ||
| } | ||
|
|
||
| // 👮 master process 👮 | ||
| type child struct { | ||
| err error | ||
| pid int | ||
| // Get the address from the provided listener | ||
| addr := ln.Addr() | ||
|
|
||
| // Determine RecoverThreshold | ||
| recoverThreshold := cfg.PreforkRecoverThreshold | ||
| if recoverThreshold == 0 { | ||
| recoverThreshold = runtime.GOMAXPROCS(0) / 2 | ||
| } | ||
| // create variables | ||
| maxProcs := runtime.GOMAXPROCS(0) | ||
| children := make(map[int]*exec.Cmd) | ||
| channel := make(chan child, maxProcs) | ||
|
|
||
| // kill child procs when master exits | ||
| defer func() { | ||
| for _, proc := range children { | ||
| if err = proc.Process.Kill(); err != nil { | ||
| if !errors.Is(err, os.ErrProcessDone) { | ||
| log.Errorf("prefork: failed to kill child: %v", err) | ||
| } | ||
| // Extract TLS config from listener if present | ||
| tlsConfig := getTLSConfig(ln) | ||
|
|
||
| // Create FastHTTP Prefork instance | ||
| p := &prefork.Prefork{ | ||
| Network: cfg.ListenerNetwork, | ||
| Reuseport: true, // Custom listener is expected to use reuseport | ||
| RecoverThreshold: recoverThreshold, | ||
| Logger: preforkLogger{}, | ||
| WatchMaster: true, // Enable master process watching | ||
| } | ||
|
|
||
| // Configure ServeFunc for child processes | ||
| p.ServeFunc = func(_ net.Listener) error { | ||
| // Child process: create new listener using user's callback | ||
| if prefork.IsChild() { | ||
| childLn, err := cfg.OnPreforkServe(addr) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // collect child pids | ||
| var childPIDs []int | ||
|
|
||
| // launch child procs | ||
| for range maxProcs { | ||
| cmd := exec.Command(os.Args[0], os.Args[1:]...) //nolint:gosec // It's fine to launch the same process again | ||
| if testPreforkMaster { | ||
| // When test prefork master, | ||
| // just start the child process with a dummy cmd, | ||
| // which will exit soon | ||
| cmd = dummyCmd() | ||
| } | ||
| cmd.Stdout = os.Stdout | ||
| cmd.Stderr = os.Stderr | ||
|
|
||
| // add fiber prefork child flag into child proc env | ||
| cmd.Env = append(os.Environ(), | ||
| fmt.Sprintf("%s=%s", envPreforkChildKey, envPreforkChildVal), | ||
| ) | ||
| // Wrap with TLS if original listener had TLS | ||
| if tlsConfig != nil { | ||
| childLn = tls.NewListener(childLn, tlsConfig) | ||
| } | ||
|
|
||
| // Avoid startup message collision | ||
| if !cfg.DisableStartupMessage { | ||
| time.Sleep(sleepDuration) | ||
| } | ||
|
|
||
| if err = cmd.Start(); err != nil { | ||
| return fmt.Errorf("failed to start a child prefork process, error: %w", err) | ||
| // Prepare the server for the start | ||
| app.startupProcess() | ||
|
|
||
| // Call ListenerAddrFunc if provided | ||
| if cfg.ListenerAddrFunc != nil { | ||
| cfg.ListenerAddrFunc(childLn.Addr()) | ||
| } | ||
|
|
||
| // Serve requests using the child's listener | ||
| return app.server.Serve(childLn) | ||
| } | ||
|
|
||
| // store child process | ||
| pid := cmd.Process.Pid | ||
| children[pid] = cmd | ||
| childPIDs = append(childPIDs, pid) | ||
| // Master process should not reach here in normal operation | ||
| return nil | ||
| } | ||
|
|
||
| // execute fork hook | ||
| // Configure OnChildSpawn callback | ||
| p.OnChildSpawn = func(pid int) error { | ||
| if app.hooks != nil { | ||
| if testOnPrefork { | ||
| app.hooks.executeOnForkHooks(dummyPid) | ||
| } else { | ||
| app.hooks.executeOnForkHooks(pid) | ||
| } | ||
| } | ||
|
|
||
| // notify master if child crashes | ||
| go func() { | ||
| channel <- child{pid: pid, err: cmd.Wait()} | ||
| }() | ||
| return nil | ||
| } | ||
|
|
||
| // Run onListen hooks | ||
| // Hooks have to be run here as different as non-prefork mode due to they should run as child or master | ||
| listenData := app.prepareListenData(addr, tlsConfig != nil, cfg, childPIDs) | ||
| // Configure OnMasterReady callback | ||
| p.OnMasterReady = func(childPIDs []int) error { | ||
| // Prepare listen data with child PIDs | ||
| listenData := app.prepareListenData(addr.String(), tlsConfig != nil, cfg, childPIDs) | ||
|
|
||
| // Run OnListen hooks | ||
| app.runOnListenHooks(listenData) | ||
|
|
||
| app.runOnListenHooks(listenData) | ||
| // Display startup message | ||
| app.startupMessage(listenData, cfg) | ||
|
|
||
| app.startupMessage(listenData, cfg) | ||
| // Print routes if enabled | ||
| if cfg.EnablePrintRoutes { | ||
| app.printRoutesMessage() | ||
| } | ||
|
|
||
| if cfg.EnablePrintRoutes { | ||
| app.printRoutesMessage() | ||
| return nil | ||
| } | ||
|
|
||
| // return error if child crashes | ||
| return (<-channel).err | ||
| } | ||
| // Configure OnChildRecover callback for monitoring | ||
| p.OnChildRecover = func(pid int) error { | ||
| log.Warnf("prefork: child process crashed and has been recovered with new PID %d", pid) | ||
|
|
||
| // watchMaster watches child procs | ||
| func watchMaster() { | ||
| if runtime.GOOS == windowsOS { | ||
| // finds parent process, | ||
| // and waits for it to exit | ||
| p, err := os.FindProcess(os.Getppid()) | ||
| if err == nil { | ||
| _, _ = p.Wait() //nolint:errcheck // It is fine to ignore the error here | ||
| // Execute OnFork hook for recovered process if hooks are available | ||
| if app.hooks != nil { | ||
| app.hooks.executeOnForkHooks(pid) | ||
| } | ||
| os.Exit(1) //nolint:revive // Calling os.Exit is fine here in the prefork | ||
| } | ||
| // if it is equal to 1 (init process ID), | ||
| // it indicates that the master process has exited | ||
| const watchInterval = 500 * time.Millisecond | ||
| for range time.NewTicker(watchInterval).C { | ||
| if os.Getppid() == 1 { | ||
| os.Exit(1) //nolint:revive // Calling os.Exit is fine here in the prefork | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Close the original listener in master process since children will create their own | ||
| if !prefork.IsChild() { | ||
| if err := ln.Close(); err != nil { | ||
| log.Warnf("prefork: failed to close original listener: %v", err) | ||
| } | ||
| } | ||
|
|
||
| // Start the prefork server using the address from the original listener | ||
| return p.ListenAndServe(addr.String()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functions prefork and preforkListener contain a significant amount of duplicated logic for initializing the prefork.Prefork instance. The calculation of recoverThreshold, the struct initialization, and the implementations for OnChildSpawn, OnMasterReady, and OnChildRecover callbacks are nearly identical in both functions.
To improve maintainability and reduce redundancy, I recommend refactoring this common setup into a shared helper function. This would centralize the prefork configuration, leaving prefork and preforkListener to only define their specific ServeFunc logic.
Modified fasthttp version
ReneWerner87/fasthttp@master...ReneWerner87:fasthttp:prefork_optimization
PR to fasthttp is coming