diff options
Diffstat (limited to 'src/internal/fuzz/worker.go')
-rw-r--r-- | src/internal/fuzz/worker.go | 1184 |
1 files changed, 1184 insertions, 0 deletions
diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go new file mode 100644 index 0000000..467c39b --- /dev/null +++ b/src/internal/fuzz/worker.go @@ -0,0 +1,1184 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package fuzz + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "reflect" + "runtime" + "sync" + "time" +) + +const ( + // workerFuzzDuration is the amount of time a worker can spend testing random + // variations of an input given by the coordinator. + workerFuzzDuration = 100 * time.Millisecond + + // workerTimeoutDuration is the amount of time a worker can go without + // responding to the coordinator before being stopped. + workerTimeoutDuration = 1 * time.Second + + // workerExitCode is used as an exit code by fuzz worker processes after an internal error. + // This distinguishes internal errors from uncontrolled panics and other crashes. + // Keep in sync with internal/fuzz.workerExitCode. + workerExitCode = 70 + + // workerSharedMemSize is the maximum size of the shared memory file used to + // communicate with workers. This limits the size of fuzz inputs. + workerSharedMemSize = 100 << 20 // 100 MB +) + +// worker manages a worker process running a test binary. The worker object +// exists only in the coordinator (the process started by 'go test -fuzz'). +// workerClient is used by the coordinator to send RPCs to the worker process, +// which handles them with workerServer. +type worker struct { + dir string // working directory, same as package directory + binPath string // path to test executable + args []string // arguments for test executable + env []string // environment for test executable + + coordinator *coordinator + + memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes. + + cmd *exec.Cmd // current worker process + client *workerClient // used to communicate with worker process + waitErr error // last error returned by wait, set before termC is closed. + interrupted bool // true after stop interrupts a running worker. + termC chan struct{} // closed by wait when worker process terminates +} + +func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) { + mem, err := sharedMemTempFile(workerSharedMemSize) + if err != nil { + return nil, err + } + memMu := make(chan *sharedMem, 1) + memMu <- mem + return &worker{ + dir: dir, + binPath: binPath, + args: args, + env: env[:len(env):len(env)], // copy on append to ensure workers don't overwrite each other. + coordinator: c, + memMu: memMu, + }, nil +} + +// cleanup releases persistent resources associated with the worker. +func (w *worker) cleanup() error { + mem := <-w.memMu + if mem == nil { + return nil + } + close(w.memMu) + return mem.Close() +} + +// coordinate runs the test binary to perform fuzzing. +// +// coordinate loops until ctx is cancelled or a fatal error is encountered. +// If a test process terminates unexpectedly while fuzzing, coordinate will +// attempt to restart and continue unless the termination can be attributed +// to an interruption (from a timer or the user). +// +// While looping, coordinate receives inputs from the coordinator, passes +// those inputs to the worker process, then passes the results back to +// the coordinator. +func (w *worker) coordinate(ctx context.Context) error { + // Main event loop. + for { + // Start or restart the worker if it's not running. + if !w.isRunning() { + if err := w.startAndPing(ctx); err != nil { + return err + } + } + + select { + case <-ctx.Done(): + // Worker was told to stop. + err := w.stop() + if err != nil && !w.interrupted && !isInterruptError(err) { + return err + } + return ctx.Err() + + case <-w.termC: + // Worker process terminated unexpectedly while waiting for input. + err := w.stop() + if w.interrupted { + panic("worker interrupted after unexpected termination") + } + if err == nil || isInterruptError(err) { + // Worker stopped, either by exiting with status 0 or after being + // interrupted with a signal that was not sent by the coordinator. + // + // When the user presses ^C, on POSIX platforms, SIGINT is delivered to + // all processes in the group concurrently, and the worker may see it + // before the coordinator. The worker should exit 0 gracefully (in + // theory). + // + // This condition is probably intended by the user, so suppress + // the error. + return nil + } + if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode { + // Worker exited with a code indicating F.Fuzz was not called correctly, + // for example, F.Fail was called first. + return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err) + } + // Worker exited non-zero or was terminated by a non-interrupt + // signal (for example, SIGSEGV) while fuzzing. + return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err) + // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker. + + case input := <-w.coordinator.inputC: + // Received input from coordinator. + args := fuzzArgs{ + Limit: input.limit, + Timeout: input.timeout, + Warmup: input.warmup, + CoverageData: input.coverageData, + } + entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args) + canMinimize := true + if err != nil { + // Error communicating with worker. + w.stop() + if ctx.Err() != nil { + // Timeout or interruption. + return ctx.Err() + } + if w.interrupted { + // Communication error before we stopped the worker. + // Report an error, but don't record a crasher. + return fmt.Errorf("communicating with fuzzing process: %v", err) + } + if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) { + // Worker terminated by a signal that probably wasn't caused by a + // specific input to the fuzz function. For example, on Linux, + // the kernel (OOM killer) may send SIGKILL to a process using a lot + // of memory. Or the shell might send SIGHUP when the terminal + // is closed. Don't record a crasher. + return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr) + } + if isInternalError { + // An internal error occurred which shouldn't be considered + // a crash. + return err + } + // Unexpected termination. Set error message and fall through. + // We'll restart the worker on the next iteration. + // Don't attempt to minimize this since it crashed the worker. + resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr) + canMinimize = false + } + result := fuzzResult{ + limit: input.limit, + count: resp.Count, + totalDuration: resp.TotalDuration, + entryDuration: resp.InterestingDuration, + entry: entry, + crasherMsg: resp.Err, + coverageData: resp.CoverageData, + canMinimize: canMinimize, + } + w.coordinator.resultC <- result + + case input := <-w.coordinator.minimizeC: + // Received input to minimize from coordinator. + result, err := w.minimize(ctx, input) + if err != nil { + // Error minimizing. Send back the original input. If it didn't cause + // an error before, report it as causing an error now. + // TODO: double-check this is handled correctly when + // implementing -keepfuzzing. + result = fuzzResult{ + entry: input.entry, + crasherMsg: input.crasherMsg, + canMinimize: false, + limit: input.limit, + } + if result.crasherMsg == "" { + result.crasherMsg = err.Error() + } + } + w.coordinator.resultC <- result + } + } +} + +// minimize tells a worker process to attempt to find a smaller value that +// either causes an error (if we started minimizing because we found an input +// that causes an error) or preserves new coverage (if we started minimizing +// because we found an input that expands coverage). +func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) { + if w.coordinator.opts.MinimizeTimeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout) + defer cancel() + } + + args := minimizeArgs{ + Limit: input.limit, + Timeout: input.timeout, + KeepCoverage: input.keepCoverage, + } + entry, resp, err := w.client.minimize(ctx, input.entry, args) + if err != nil { + // Error communicating with worker. + w.stop() + if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) { + // Worker was interrupted, possibly by the user pressing ^C. + // Normally, workers can handle interrupts and timeouts gracefully and + // will return without error. An error here indicates the worker + // may not have been in a good state, but the error won't be meaningful + // to the user. Just return the original crasher without logging anything. + return fuzzResult{ + entry: input.entry, + crasherMsg: input.crasherMsg, + coverageData: input.keepCoverage, + canMinimize: false, + limit: input.limit, + }, nil + } + return fuzzResult{ + entry: entry, + crasherMsg: fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err), + canMinimize: false, + limit: input.limit, + count: resp.Count, + totalDuration: resp.Duration, + }, nil + } + + if input.crasherMsg != "" && resp.Err == "" { + return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce") + } + + return fuzzResult{ + entry: entry, + crasherMsg: resp.Err, + coverageData: resp.CoverageData, + canMinimize: false, + limit: input.limit, + count: resp.Count, + totalDuration: resp.Duration, + }, nil +} + +func (w *worker) isRunning() bool { + return w.cmd != nil +} + +// startAndPing starts the worker process and sends it a message to make sure it +// can communicate. +// +// startAndPing returns an error if any part of this didn't work, including if +// the context is expired or the worker process was interrupted before it +// responded. Errors that happen after start but before the ping response +// likely indicate that the worker did not call F.Fuzz or called F.Fail first. +// We don't record crashers for these errors. +func (w *worker) startAndPing(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() + } + if err := w.start(); err != nil { + return err + } + if err := w.client.ping(ctx); err != nil { + w.stop() + if ctx.Err() != nil { + return ctx.Err() + } + if isInterruptError(err) { + // User may have pressed ^C before worker responded. + return err + } + // TODO: record and return stderr. + return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err) + } + return nil +} + +// start runs a new worker process. +// +// If the process couldn't be started, start returns an error. Start won't +// return later termination errors from the process if they occur. +// +// If the process starts successfully, start returns nil. stop must be called +// once later to clean up, even if the process terminates on its own. +// +// When the process terminates, w.waitErr is set to the error (if any), and +// w.termC is closed. +func (w *worker) start() (err error) { + if w.isRunning() { + panic("worker already started") + } + w.waitErr = nil + w.interrupted = false + w.termC = nil + + cmd := exec.Command(w.binPath, w.args...) + cmd.Dir = w.dir + cmd.Env = w.env[:len(w.env):len(w.env)] // copy on append to ensure workers don't overwrite each other. + + // Create the "fuzz_in" and "fuzz_out" pipes so we can communicate with + // the worker. We don't use stdin and stdout, since the test binary may + // do something else with those. + // + // Each pipe has a reader and a writer. The coordinator writes to fuzzInW + // and reads from fuzzOutR. The worker inherits fuzzInR and fuzzOutW. + // The coordinator closes fuzzInR and fuzzOutW after starting the worker, + // since we have no further need of them. + fuzzInR, fuzzInW, err := os.Pipe() + if err != nil { + return err + } + defer fuzzInR.Close() + fuzzOutR, fuzzOutW, err := os.Pipe() + if err != nil { + fuzzInW.Close() + return err + } + defer fuzzOutW.Close() + setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu}) + + // Start the worker process. + if err := cmd.Start(); err != nil { + fuzzInW.Close() + fuzzOutR.Close() + return err + } + + // Worker started successfully. + // After this, w.client owns fuzzInW and fuzzOutR, so w.client.Close must be + // called later by stop. + w.cmd = cmd + w.termC = make(chan struct{}) + comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu} + m := newMutator() + w.client = newWorkerClient(comm, m) + + go func() { + w.waitErr = w.cmd.Wait() + close(w.termC) + }() + + return nil +} + +// stop tells the worker process to exit by closing w.client, then blocks until +// it terminates. If the worker doesn't terminate after a short time, stop +// signals it with os.Interrupt (where supported), then os.Kill. +// +// stop returns the error the process terminated with, if any (same as +// w.waitErr). +// +// stop must be called at least once after start returns successfully, even if +// the worker process terminates unexpectedly. +func (w *worker) stop() error { + if w.termC == nil { + panic("worker was not started successfully") + } + select { + case <-w.termC: + // Worker already terminated. + if w.client == nil { + // stop already called. + return w.waitErr + } + // Possible unexpected termination. + w.client.Close() + w.cmd = nil + w.client = nil + return w.waitErr + default: + // Worker still running. + } + + // Tell the worker to stop by closing fuzz_in. It won't actually stop until it + // finishes with earlier calls. + closeC := make(chan struct{}) + go func() { + w.client.Close() + close(closeC) + }() + + sig := os.Interrupt + if runtime.GOOS == "windows" { + // Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on + // Windows; using it with os.Process.Signal will return an error.” + // Fall back to Kill instead. + sig = os.Kill + } + + t := time.NewTimer(workerTimeoutDuration) + for { + select { + case <-w.termC: + // Worker terminated. + t.Stop() + <-closeC + w.cmd = nil + w.client = nil + return w.waitErr + + case <-t.C: + // Timer fired before worker terminated. + w.interrupted = true + switch sig { + case os.Interrupt: + // Try to stop the worker with SIGINT and wait a little longer. + w.cmd.Process.Signal(sig) + sig = os.Kill + t.Reset(workerTimeoutDuration) + + case os.Kill: + // Try to stop the worker with SIGKILL and keep waiting. + w.cmd.Process.Signal(sig) + sig = nil + t.Reset(workerTimeoutDuration) + + case nil: + // Still waiting. Print a message to let the user know why. + fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n") + } + } + } +} + +// RunFuzzWorker is called in a worker process to communicate with the +// coordinator process in order to fuzz random inputs. RunFuzzWorker loops +// until the coordinator tells it to stop. +// +// fn is a wrapper on the fuzz function. It may return an error to indicate +// a given input "crashed". The coordinator will also record a crasher if +// the function times out or terminates the process. +// +// RunFuzzWorker returns an error if it could not communicate with the +// coordinator process. +func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error { + comm, err := getWorkerComm() + if err != nil { + return err + } + srv := &workerServer{ + workerComm: comm, + fuzzFn: func(e CorpusEntry) (time.Duration, error) { + timer := time.AfterFunc(10*time.Second, func() { + panic("deadlocked!") // this error message won't be printed + }) + defer timer.Stop() + start := time.Now() + err := fn(e) + return time.Since(start), err + }, + m: newMutator(), + } + return srv.serve(ctx) +} + +// call is serialized and sent from the coordinator on fuzz_in. It acts as +// a minimalist RPC mechanism. Exactly one of its fields must be set to indicate +// which method to call. +type call struct { + Ping *pingArgs + Fuzz *fuzzArgs + Minimize *minimizeArgs +} + +// minimizeArgs contains arguments to workerServer.minimize. The value to +// minimize is already in shared memory. +type minimizeArgs struct { + // Timeout is the time to spend minimizing. This may include time to start up, + // especially if the input causes the worker process to terminated, requiring + // repeated restarts. + Timeout time.Duration + + // Limit is the maximum number of values to test, without spending more time + // than Duration. 0 indicates no limit. + Limit int64 + + // KeepCoverage is a set of coverage counters the worker should attempt to + // keep in minimized values. When provided, the worker will reject inputs that + // don't cause at least one of these bits to be set. + KeepCoverage []byte + + // Index is the index of the fuzz target parameter to be minimized. + Index int +} + +// minimizeResponse contains results from workerServer.minimize. +type minimizeResponse struct { + // WroteToMem is true if the worker found a smaller input and wrote it to + // shared memory. If minimizeArgs.KeepCoverage was set, the minimized input + // preserved at least one coverage bit and did not cause an error. + // Otherwise, the minimized input caused some error, recorded in Err. + WroteToMem bool + + // Err is the error string caused by the value in shared memory, if any. + Err string + + // CoverageData is the set of coverage bits activated by the minimized value + // in shared memory. When set, it contains at least one bit from KeepCoverage. + // CoverageData will be nil if Err is set or if minimization failed. + CoverageData []byte + + // Duration is the time spent minimizing, not including starting or cleaning up. + Duration time.Duration + + // Count is the number of values tested. + Count int64 +} + +// fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is +// passed in shared memory. +type fuzzArgs struct { + // Timeout is the time to spend fuzzing, not including starting or + // cleaning up. + Timeout time.Duration + + // Limit is the maximum number of values to test, without spending more time + // than Duration. 0 indicates no limit. + Limit int64 + + // Warmup indicates whether this is part of a warmup run, meaning that + // fuzzing should not occur. If coverageEnabled is true, then coverage data + // should be reported. + Warmup bool + + // CoverageData is the coverage data. If set, the worker should update its + // local coverage data prior to fuzzing. + CoverageData []byte +} + +// fuzzResponse contains results from workerServer.fuzz. +type fuzzResponse struct { + // Duration is the time spent fuzzing, not including starting or cleaning up. + TotalDuration time.Duration + InterestingDuration time.Duration + + // Count is the number of values tested. + Count int64 + + // CoverageData is set if the value in shared memory expands coverage + // and therefore may be interesting to the coordinator. + CoverageData []byte + + // Err is the error string caused by the value in shared memory, which is + // non-empty if the value in shared memory caused a crash. + Err string + + // InternalErr is the error string caused by an internal error in the + // worker. This shouldn't be considered a crasher. + InternalErr string +} + +// pingArgs contains arguments to workerServer.ping. +type pingArgs struct{} + +// pingResponse contains results from workerServer.ping. +type pingResponse struct{} + +// workerComm holds pipes and shared memory used for communication +// between the coordinator process (client) and a worker process (server). +// These values are unique to each worker; they are shared only with the +// coordinator, not with other workers. +// +// Access to shared memory is synchronized implicitly over the RPC protocol +// implemented in workerServer and workerClient. During a call, the client +// (worker) has exclusive access to shared memory; at other times, the server +// (coordinator) has exclusive access. +type workerComm struct { + fuzzIn, fuzzOut *os.File + memMu chan *sharedMem // mutex guarding shared memory +} + +// workerServer is a minimalist RPC server, run by fuzz worker processes. +// It allows the coordinator process (using workerClient) to call methods in a +// worker process. This system allows the coordinator to run multiple worker +// processes in parallel and to collect inputs that caused crashes from shared +// memory after a worker process terminates unexpectedly. +type workerServer struct { + workerComm + m *mutator + + // coverageMask is the local coverage data for the worker. It is + // periodically updated to reflect the data in the coordinator when new + // coverage is found. + coverageMask []byte + + // fuzzFn runs the worker's fuzz target on the given input and returns an + // error if it finds a crasher (the process may also exit or crash), and the + // time it took to run the input. It sets a deadline of 10 seconds, at which + // point it will panic with the assumption that the process is hanging or + // deadlocked. + fuzzFn func(CorpusEntry) (time.Duration, error) +} + +// serve reads serialized RPC messages on fuzzIn. When serve receives a message, +// it calls the corresponding method, then sends the serialized result back +// on fuzzOut. +// +// serve handles RPC calls synchronously; it will not attempt to read a message +// until the previous call has finished. +// +// serve returns errors that occurred when communicating over pipes. serve +// does not return errors from method calls; those are passed through serialized +// responses. +func (ws *workerServer) serve(ctx context.Context) error { + enc := json.NewEncoder(ws.fuzzOut) + dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn}) + for { + var c call + if err := dec.Decode(&c); err != nil { + if err == io.EOF || err == ctx.Err() { + return nil + } else { + return err + } + } + + var resp any + switch { + case c.Fuzz != nil: + resp = ws.fuzz(ctx, *c.Fuzz) + case c.Minimize != nil: + resp = ws.minimize(ctx, *c.Minimize) + case c.Ping != nil: + resp = ws.ping(ctx, *c.Ping) + default: + return errors.New("no arguments provided for any call") + } + + if err := enc.Encode(resp); err != nil { + return err + } + } +} + +// chainedMutations is how many mutations are applied before the worker +// resets the input to it's original state. +// NOTE: this number was picked without much thought. It is low enough that +// it seems to create a significant diversity in mutated inputs. We may want +// to consider looking into this more closely once we have a proper performance +// testing framework. Another option is to randomly pick the number of chained +// mutations on each invocation of the workerServer.fuzz method (this appears to +// be what libFuzzer does, although there seems to be no documentation which +// explains why this choice was made.) +const chainedMutations = 5 + +// fuzz runs the test function on random variations of the input value in shared +// memory for a limited duration or number of iterations. +// +// fuzz returns early if it finds an input that crashes the fuzz function (with +// fuzzResponse.Err set) or an input that expands coverage (with +// fuzzResponse.InterestingDuration set). +// +// fuzz does not modify the input in shared memory. Instead, it saves the +// initial PRNG state in shared memory and increments a counter in shared +// memory before each call to the test function. The caller may reconstruct +// the crashing input with this information, since the PRNG is deterministic. +func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) { + if args.CoverageData != nil { + if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) { + resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask)) + return resp + } + ws.coverageMask = args.CoverageData + } + start := time.Now() + defer func() { resp.TotalDuration = time.Since(start) }() + + if args.Timeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, args.Timeout) + defer cancel() + } + mem := <-ws.memMu + ws.m.r.save(&mem.header().randState, &mem.header().randInc) + defer func() { + resp.Count = mem.header().count + ws.memMu <- mem + }() + if args.Limit > 0 && mem.header().count >= args.Limit { + resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit) + return resp + } + + originalVals, err := unmarshalCorpusFile(mem.valueCopy()) + if err != nil { + resp.InternalErr = err.Error() + return resp + } + vals := make([]any, len(originalVals)) + copy(vals, originalVals) + + shouldStop := func() bool { + return args.Limit > 0 && mem.header().count >= args.Limit + } + fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) { + mem.header().count++ + var err error + dur, err = ws.fuzzFn(entry) + if err != nil { + errMsg = err.Error() + if errMsg == "" { + errMsg = "fuzz function failed with no input" + } + return dur, nil, errMsg + } + if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 { + return dur, coverageSnapshot, "" + } + return dur, nil, "" + } + + if args.Warmup { + dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals}) + if errMsg != "" { + resp.Err = errMsg + return resp + } + resp.InterestingDuration = dur + if coverageEnabled { + resp.CoverageData = coverageSnapshot + } + return resp + } + + for { + select { + case <-ctx.Done(): + return resp + default: + if mem.header().count%chainedMutations == 0 { + copy(vals, originalVals) + ws.m.r.save(&mem.header().randState, &mem.header().randInc) + } + ws.m.mutate(vals, cap(mem.valueRef())) + + entry := CorpusEntry{Values: vals} + dur, cov, errMsg := fuzzOnce(entry) + if errMsg != "" { + resp.Err = errMsg + return resp + } + if cov != nil { + resp.CoverageData = cov + resp.InterestingDuration = dur + return resp + } + if shouldStop() { + return resp + } + } + } +} + +func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) { + start := time.Now() + defer func() { resp.Duration = time.Since(start) }() + mem := <-ws.memMu + defer func() { ws.memMu <- mem }() + vals, err := unmarshalCorpusFile(mem.valueCopy()) + if err != nil { + panic(err) + } + inpHash := sha256.Sum256(mem.valueCopy()) + if args.Timeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, args.Timeout) + defer cancel() + } + + // Minimize the values in vals, then write to shared memory. We only write + // to shared memory after completing minimization. + success, err := ws.minimizeInput(ctx, vals, mem, args) + if success { + writeToMem(vals, mem) + outHash := sha256.Sum256(mem.valueCopy()) + mem.header().rawInMem = false + resp.WroteToMem = true + if err != nil { + resp.Err = err.Error() + } else { + // If the values didn't change during minimization then coverageSnapshot is likely + // a dirty snapshot which represents the very last step of minimization, not the + // coverage for the initial input. In that case just return the coverage we were + // given initially, since it more accurately represents the coverage map for the + // input we are returning. + if outHash != inpHash { + resp.CoverageData = coverageSnapshot + } else { + resp.CoverageData = args.KeepCoverage + } + } + } + return resp +} + +// minimizeInput applies a series of minimizing transformations on the provided +// vals, ensuring that each minimization still causes an error, or keeps +// coverage, in fuzzFn. It uses the context to determine how long to run, +// stopping once closed. It returns a bool indicating whether minimization was +// successful and an error if one was found. +func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) { + keepCoverage := args.KeepCoverage + memBytes := mem.valueRef() + bPtr := &memBytes + count := &mem.header().count + shouldStop := func() bool { + return ctx.Err() != nil || + (args.Limit > 0 && *count >= args.Limit) + } + if shouldStop() { + return false, nil + } + + // Check that the original value preserves coverage or causes an error. + // If not, then whatever caused us to think the value was interesting may + // have been a flake, and we can't minimize it. + *count++ + _, retErr = ws.fuzzFn(CorpusEntry{Values: vals}) + if keepCoverage != nil { + if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil { + return false, nil + } + } else if retErr == nil { + return false, nil + } + mem.header().rawInMem = true + + // tryMinimized runs the fuzz function with candidate replacing the value + // at index valI. tryMinimized returns whether the input with candidate is + // interesting for the same reason as the original input: it returns + // an error if one was expected, or it preserves coverage. + tryMinimized := func(candidate []byte) bool { + prev := vals[args.Index] + switch prev.(type) { + case []byte: + vals[args.Index] = candidate + case string: + vals[args.Index] = string(candidate) + default: + panic("impossible") + } + copy(*bPtr, candidate) + *bPtr = (*bPtr)[:len(candidate)] + mem.setValueLen(len(candidate)) + *count++ + _, err := ws.fuzzFn(CorpusEntry{Values: vals}) + if err != nil { + retErr = err + if keepCoverage != nil { + // Now that we've found a crash, that's more important than any + // minimization of interesting inputs that was being done. Clear out + // keepCoverage to only minimize the crash going forward. + keepCoverage = nil + } + return true + } + // Minimization should preserve coverage bits. + if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) { + return true + } + vals[args.Index] = prev + return false + } + switch v := vals[args.Index].(type) { + case string: + minimizeBytes([]byte(v), tryMinimized, shouldStop) + case []byte: + minimizeBytes(v, tryMinimized, shouldStop) + default: + panic("impossible") + } + return true, retErr +} + +func writeToMem(vals []any, mem *sharedMem) { + b := marshalCorpusFile(vals...) + mem.setValue(b) +} + +// ping does nothing. The coordinator calls this method to ensure the worker +// has called F.Fuzz and can communicate. +func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse { + return pingResponse{} +} + +// workerClient is a minimalist RPC client. The coordinator process uses a +// workerClient to call methods in each worker process (handled by +// workerServer). +type workerClient struct { + workerComm + m *mutator + + // mu is the mutex protecting the workerComm.fuzzIn pipe. This must be + // locked before making calls to the workerServer. It prevents + // workerClient.Close from closing fuzzIn while workerClient methods are + // writing to it concurrently, and prevents multiple callers from writing to + // fuzzIn concurrently. + mu sync.Mutex +} + +func newWorkerClient(comm workerComm, m *mutator) *workerClient { + return &workerClient{workerComm: comm, m: m} +} + +// Close shuts down the connection to the RPC server (the worker process) by +// closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker), +// and closes it after the worker process closes the other end. +func (wc *workerClient) Close() error { + wc.mu.Lock() + defer wc.mu.Unlock() + + // Close fuzzIn. This signals to the server that there are no more calls, + // and it should exit. + if err := wc.fuzzIn.Close(); err != nil { + wc.fuzzOut.Close() + return err + } + + // Drain fuzzOut and close it. When the server exits, the kernel will close + // its end of fuzzOut, and we'll get EOF. + if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil { + wc.fuzzOut.Close() + return err + } + return wc.fuzzOut.Close() +} + +// errSharedMemClosed is returned by workerClient methods that cannot access +// shared memory because it was closed and unmapped by another goroutine. That +// can happen when worker.cleanup is called in the worker goroutine while a +// workerClient.fuzz call runs concurrently. +// +// This error should not be reported. It indicates the operation was +// interrupted. +var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped") + +// minimize tells the worker to call the minimize method. See +// workerServer.minimize. +func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) { + wc.mu.Lock() + defer wc.mu.Unlock() + + mem, ok := <-wc.memMu + if !ok { + return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed + } + mem.header().count = 0 + inp, err := corpusEntryData(entryIn) + if err != nil { + return CorpusEntry{}, minimizeResponse{}, err + } + mem.setValue(inp) + defer func() { wc.memMu <- mem }() + entryOut = entryIn + entryOut.Values, err = unmarshalCorpusFile(inp) + if err != nil { + return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err) + } + for i, v := range entryOut.Values { + if !isMinimizable(reflect.TypeOf(v)) { + continue + } + + wc.memMu <- mem + args.Index = i + c := call{Minimize: &args} + callErr := wc.callLocked(ctx, c, &resp) + mem, ok = <-wc.memMu + if !ok { + return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed + } + + if callErr != nil { + retErr = callErr + if !mem.header().rawInMem { + // An unrecoverable error occurred before minimization began. + return entryIn, minimizeResponse{}, retErr + } + // An unrecoverable error occurred during minimization. mem now + // holds the raw, unmarshalled bytes of entryIn.Values[i] that + // caused the error. + switch entryOut.Values[i].(type) { + case string: + entryOut.Values[i] = string(mem.valueCopy()) + case []byte: + entryOut.Values[i] = mem.valueCopy() + default: + panic("impossible") + } + entryOut.Data = marshalCorpusFile(entryOut.Values...) + // Stop minimizing; another unrecoverable error is likely to occur. + break + } + + if resp.WroteToMem { + // Minimization succeeded, and mem holds the marshaled data. + entryOut.Data = mem.valueCopy() + entryOut.Values, err = unmarshalCorpusFile(entryOut.Data) + if err != nil { + return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err) + } + } + + // Prepare for next iteration of the loop. + if args.Timeout != 0 { + args.Timeout -= resp.Duration + if args.Timeout <= 0 { + break + } + } + if args.Limit != 0 { + args.Limit -= mem.header().count + if args.Limit <= 0 { + break + } + } + } + resp.Count = mem.header().count + h := sha256.Sum256(entryOut.Data) + entryOut.Path = fmt.Sprintf("%x", h[:4]) + return entryOut, resp, retErr +} + +// fuzz tells the worker to call the fuzz method. See workerServer.fuzz. +func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) { + wc.mu.Lock() + defer wc.mu.Unlock() + + mem, ok := <-wc.memMu + if !ok { + return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed + } + mem.header().count = 0 + inp, err := corpusEntryData(entryIn) + if err != nil { + return CorpusEntry{}, fuzzResponse{}, true, err + } + mem.setValue(inp) + wc.memMu <- mem + + c := call{Fuzz: &args} + callErr := wc.callLocked(ctx, c, &resp) + if resp.InternalErr != "" { + return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr) + } + mem, ok = <-wc.memMu + if !ok { + return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed + } + defer func() { wc.memMu <- mem }() + resp.Count = mem.header().count + + if !bytes.Equal(inp, mem.valueRef()) { + return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input") + } + needEntryOut := callErr != nil || resp.Err != "" || + (!args.Warmup && resp.CoverageData != nil) + if needEntryOut { + valuesOut, err := unmarshalCorpusFile(inp) + if err != nil { + return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err) + } + wc.m.r.restore(mem.header().randState, mem.header().randInc) + if !args.Warmup { + // Only mutate the valuesOut if fuzzing actually occurred. + numMutations := ((resp.Count - 1) % chainedMutations) + 1 + for i := int64(0); i < numMutations; i++ { + wc.m.mutate(valuesOut, cap(mem.valueRef())) + } + } + dataOut := marshalCorpusFile(valuesOut...) + + h := sha256.Sum256(dataOut) + name := fmt.Sprintf("%x", h[:4]) + entryOut = CorpusEntry{ + Parent: entryIn.Path, + Path: name, + Data: dataOut, + Generation: entryIn.Generation + 1, + } + if args.Warmup { + // The bytes weren't mutated, so if entryIn was a seed corpus value, + // then entryOut is too. + entryOut.IsSeed = entryIn.IsSeed + } + } + + return entryOut, resp, false, callErr +} + +// ping tells the worker to call the ping method. See workerServer.ping. +func (wc *workerClient) ping(ctx context.Context) error { + wc.mu.Lock() + defer wc.mu.Unlock() + c := call{Ping: &pingArgs{}} + var resp pingResponse + return wc.callLocked(ctx, c, &resp) +} + +// callLocked sends an RPC from the coordinator to the worker process and waits +// for the response. The callLocked may be cancelled with ctx. +func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) { + enc := json.NewEncoder(wc.fuzzIn) + dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut}) + if err := enc.Encode(c); err != nil { + return err + } + return dec.Decode(resp) +} + +// contextReader wraps a Reader with a Context. If the context is cancelled +// while the underlying reader is blocked, Read returns immediately. +// +// This is useful for reading from a pipe. Closing a pipe file descriptor does +// not unblock pending Reads on that file descriptor. All copies of the pipe's +// other file descriptor (the write end) must be closed in all processes that +// inherit it. This is difficult to do correctly in the situation we care about +// (process group termination). +type contextReader struct { + ctx context.Context + r io.Reader +} + +func (cr *contextReader) Read(b []byte) (int, error) { + if ctxErr := cr.ctx.Err(); ctxErr != nil { + return 0, ctxErr + } + done := make(chan struct{}) + + // This goroutine may stay blocked after Read returns because the underlying + // read is blocked. + var n int + var err error + go func() { + n, err = cr.r.Read(b) + close(done) + }() + + select { + case <-cr.ctx.Done(): + return 0, cr.ctx.Err() + case <-done: + return n, err + } +} |