Documentation
¶
Overview ¶
Package fsets provides a function set for executing a chain of function calls that pass a state object between them. This is the mutant child of my statemachine package without fancy routing. It provides automatic retries and the idea is to reduce testing chains like with the statemachine, but with linear calls.
Where this is helpful is avoiding doing testing of the entire call chain from a particular section of the chain down. With this, you can test each function independently without having to mock out the next call. This also will let you still access helper methods and fields of a struct so you can still use methods as part of the function set.
This also contains methods to run the function set in parallel or concurrently. This is useful for using the function set in a way that allows for parallel execution of the function calls. It leverages promises to allow for getting the results back.
This is designed with a state object that is passed through the call chain that is stack allocated (though your data might not be). This can help keep your allocations down.
Use is easy for regular functions:
fset := fsets.Fset[MyDataType]{}
fset.Adds(
fsets.C[MyDataType]{F: myFunction1, B: myBackoff},
fsets.C[MyDataType]{F: myFunction2},
)
so := fsets.StateObject[MyDataType]{Data: myData}
so = fset.Run(ctx, so)
if so.Err() != nil {
log.Error("Error running function set", "error", err)
}
Use with methods is also easy:
fset := fsets.Fset[MyDataType]{}
fset.Adds(
fsets.C[MyDataType]{F: myStruct.myMethod1, B: myBackoff},
fsets.C[MyDataType]{F: myStruct.myMethod2},
)
so := fsets.StateObject[MyDataType]{Data: myData}
so = fset.Run(ctx, so)
You can also do this within a method of a struct:
func (m *MyStruct) MyMethod(ctx context.Context, myData MyDataType) error {
// Though it is usually better unless you need to do the setup dynamically to just
// do this in the constructor to avoid the overhead of creating a new Fset every time.
fset := fsets.Fset[MyDataType]{}
fset.Adds(
fsets.C[MyDataType]{F: m.myMethod1, B: myBackoff},
fsets.C[MyDataType]{F: m.myMethod2},
)
return fset.Run(ctx, fsets.StateObject[MyDataType]{Data: myData}).Err()
}
You will also notice that functions can have exponential backoff retries. You can set some to have this, others not and use different backoffs for different functions. We also return the state object so that you can get return data that was stack allocated and not heap allocated. This is useful for performance and memory usage.
In addition to the serial execution through Run() calls, you can also run the function set in parallel or concurrently. This is useful when you want to have a some type of bound on the number of executions and reuse goroutines. The provided methods allows you to feed in a channel of promises that will be executed in parallel or concurrently with the function set. Parallel and concurrent execution is simply a function of the number of concurrent goroutines that are running the function set. In parallel, there is a fixed number for execution, like say 5 instances running. In concurrent, there might be 5 instances running with another len(fset.Len()) instances running. This mimics the behavior of channel based concurrency execution.
You can use promises to a provide a bounded RPC call chain:
type RPCServer struct {
fset fsets.Fset[RPCRequest]
in fsets.PromiseQueue[RPCRequest, RPCResponse]
}
func New(ctx context.Context) *RPCServer {
// Create a new Fset for the RPC server.
fset := fsets.Fset[RPCRequest]{}
r := &RPCServer{}
// Add the functions to the Fset. These can be methods or functions.
fset.Adds(
fsets.C[RPCRequest]{F: r.handleAuth, B: myBackoff},
fsets.C[RPCRequest]{F: r.handleData},
fsets.C[RPCRequest]{F: r.handleResponse},
)
in, _ := fset.Concurrent(ctx, -1)
r.fset = fset
r.in = in
return r
}
func (s *RPCServer) Serve(ctx context.Context, req RPCRequest) (RPCResponse, error) {
// Create a new promise for the request.
p := s.fset.Promise(req)
if err := s.in.Send(ctx, p); err != nil {
return RPCResponse{}, err // Context cancelled or other error.
}
// Wait for the response.
resp, err := p.Get(ctx)
if err != nil { // Only happens if Context is cancelled.
return RPCResponse{}, err
}
return resp.V, resp.Err()
}
It is similar if you want to use parallel execution instead using .Parallel() instead of .Concurrent().
You can also just use the Fset.Run(), which will be bounded only to the number of allowed RPC calls:
func (s *RPCServer) Serve(ctx context.Context, req RPCRequest) (RPCResponse, error) {
// Create a new StateObject for the request.
so := fsets.StateObject[RPCRequest]{Data: req}
// Run the Fset with the StateObject.
resp := s.fset.Run(ctx, so)
if resp.Err() != nil {
return RPCResponse{}, resp.Err()
}
return resp.Data, nil
}
You can also do an ordered pipeline by using the WithPipeline option when calling Concurrent or Parallel:
fset := fsets.Fset[Data]{}
// Add the functions to the Fset. These can be methods or functions.
fset.Adds(
fsets.C[Data]{F: handleAuth, B: myBackoff},
fsets.C[Data]{F: handleData},
fsets.C[Data]{F: handleResponse},
)
// Create a channel to get the results on.
out := make(chan promises.Promise[StateObject[T], StateObject[T]], 1)
in, _ := fset.Concurrent(ctx, -1, WithPipeline[Data](out))
// Send data to the input channel.
go func() {
defer close(in) // Close the input channel when done.
for _, data := range inputData{
p := fset.Promise(data)
in.Send(ctx, p) // Send the promise to the input channel.
}
}()
// Get data from the output channel.
for promise := range out {
// Process the promise results.
resp, err := p.Get(ctx)
if err != nil { // Only happens if Context is cancelled.
log.Error("Error getting promise result", "error", err)
continue
}
if resp.Err() != nil {
log.Error("Error processing data", "error", resp.Err())
continue
}
log.Info("Processed data: ", resp.V)
}
There is a cost to using fsets vs standard function call chains. The runtime has to do the calling of the function calls, which slows down the execution. In addition, I don't believe Go is smart enough to inline the function calls, so you lose that optimization as well.
This can be seen with the following benchmark where we make 3 functions calls:
BenchmarkCallChain-10 573722551 2.073 ns/op BenchmarkFset-10 8607430 141.8 ns/op
This shows that the overhead of using fsets adds about 46ns per call. This is not a problem for most use cases that do any kind of system call (disk, network, ...). However, if its a real tight loop using cache lines and no syscalls, you might want to use a standard function call chain that can be inlined and avoid the runtime overhead.
There is an experimental compiler (fsetcodegen/) which has its own README so you understand the limitations and how to use it. You can install it and use go generate to generate the compiled version of the Fset. This will significantly reduce the overhead of using fsets and allow you to use the same API as the Fset, but with a compiled version that is much faster. There are examples of this in testing/compiler and testing/fakeRPC.
Index ¶
- type C
- type CompiledFset
- type Fset
- func (f *Fset[T]) Adds(calls ...C[T]) *Fset[T]
- func (f *Fset[T]) Compiled(c CompiledFset[T])
- func (f *Fset[T]) Concurrent(ctx context.Context, n int, options ...ParallelOption[T]) (PromiseQueue[T], *sync.Group)
- func (f *Fset[T]) Len() int
- func (f *Fset[T]) Parallel(ctx context.Context, n int, options ...ParallelOption[T]) (PromiseQueue[T], *sync.Group)
- func (f *Fset[T]) Promise(data T) promises.Promise[StateObject[T], StateObject[T]]
- func (f *Fset[T]) Run(ctx context.Context, so StateObject[T]) StateObject[T]
- type ParallelOption
- type PromiseQueue
- type StateObject
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type C ¶
type C[T any] struct { // F is the function to call. F func(so StateObject[T]) (StateObject[T], error) // B is retrier to use for F. This can be nil. B *exponential.Backoff // O are options for the Backoff.Retry call. O []exponential.RetryOption }
C represents a function call, represented by F . If a user wants F to be called with an exponential backoff mechanism, they can supply if via B. O represents options for the Backoff.Retry call, which can be used to customize the retry behavior.
type CompiledFset ¶
type CompiledFset[T any] func(ctx context.Context, so StateObject[T]) StateObject[T]
CompiledFset is a compiled version of the Fset by the fsetcodegen tool.
type Fset ¶
type Fset[T any] struct { // contains filtered or unexported fields }
Fset is a set of functions in the order they are added. If any return an error, the calls stop.
func (*Fset[T]) Adds ¶
Adds adds calls to the Fset. Calling this after Run() has been called will panic.
func (*Fset[T]) Compiled ¶
func (f *Fset[T]) Compiled(c CompiledFset[T])
func (*Fset[T]) Concurrent ¶
func (f *Fset[T]) Concurrent(ctx context.Context, n int, options ...ParallelOption[T]) (PromiseQueue[T], *sync.Group)
Concurrent sets up a channel to run the Fset concurrently with n numbers of parallel goroutines that are executing each C in the Fset concurrently. This means there are n * len(fset) goroutines running concurrently. Have 4 C's in the Fset, and n == 2 will result in up to 8 different function calls running concurrently. If n < 1, it will use gomaxprocs to determine the number of goroutines to run. Use the Promise() method to create a new promise for the Fset to use with this channel.
func (*Fset[T]) Parallel ¶
func (f *Fset[T]) Parallel(ctx context.Context, n int, options ...ParallelOption[T]) (PromiseQueue[T], *sync.Group)
Parallel sets up a channel to run the Fset in parallel. This will spawn n goroutines that will execute this in parallel and return a channel that you input promises to. Closing the channel will shut down the goroutines. n is the number of goroutines to run in parallel. If n < 1, it will use gomaxprocs to determine the number of goroutines to run. The context passed cannot be canceled. Control cancelation in the StateObject.Ctx, which works only if your functions support it. The returned sync.Group can be used to wait for all goroutines to finish after closing the channel, if that is desired. Use the Promise() method to create a new promise for the Fset to use with this channel.
func (*Fset[T]) Promise ¶
func (f *Fset[T]) Promise(data T) promises.Promise[StateObject[T], StateObject[T]]
Promise helps create a new promise for the Fset for use with the Parallel or Concurrent methods. This is easier and more efficient than creating a new promise manually.
func (*Fset[T]) Run ¶
func (f *Fset[T]) Run(ctx context.Context, so StateObject[T]) StateObject[T]
Run runs the Fset calls. This is thread-safe as long as all calls are thread-safe.
type ParallelOption ¶
ParallelOption is an option that can be used to configure the parallel execution of the Fset.
func WithPipeline ¶
func WithPipeline[T any](out chan promises.Promise[StateObject[T], StateObject[T]]) ParallelOption[T]
WithPipeline provides a channel to send the results of the Fset execution.
type PromiseQueue ¶
type PromiseQueue[T any] struct { // contains filtered or unexported fields }
PromiseQueue is a channel that can be used to send promises to the Fset for parallel or concurrent execution.
func (PromiseQueue[T]) Close ¶
func (pq PromiseQueue[T]) Close()
Closes the queue which will shut down the goroutines that are processing the promises once current operations are done.
func (PromiseQueue[T]) Send ¶
func (pq PromiseQueue[T]) Send(ctx context.Context, p promises.Promise[StateObject[T], StateObject[T]]) error
Send sends a promise to the PromiseQueue. This is a blocking call until the promise is sent or the context is done. The context is attached to the StateObject in the promise, so it can be used for cancelation.
type StateObject ¶
type StateObject[T any] struct { // Data is any data related to this call. Data T // contains filtered or unexported fields }
StateObject is an object passed through the call chain for a single call. It passed along a data object of type T that is passed down the call chain. When the object is returned at the end of a Run() call, calling .Err() will determine if an error occurred in the call chain. Any function the call chain can call .Stop() to stop the call chain without erroring. The Set*() calls are used by the fsets compiler and not for users.
func (*StateObject[T]) Err ¶
func (s *StateObject[T]) Err() error
Err returns and error if one occurred in the call chain.
func (*StateObject[T]) GetStop ¶
func (s *StateObject[T]) GetStop() bool
GetStop returns true if the call chain should stop. This is set by the Stop() method. Normally not used by the user as this is checked internally. But useful if you decide to decompose the Fset object at a later time.
func (*StateObject[T]) SetCtx ¶
func (s *StateObject[T]) SetCtx(ctx context.Context)
SetCtx sets the context for the StateObject. This is not used by users but is provided for the fsets compiler.
func (*StateObject[T]) SetErr ¶
func (s *StateObject[T]) SetErr(err error)
SetErr sets an error on the StateObject. This will stop the call chain and return the error in Err(). This should not be used in normal code as the functions can just return an error which does this automatically.
func (*StateObject[T]) Stop ¶
func (s *StateObject[T]) Stop()
Stop sets the StateObject to stop, which will stop the call chain without erroring.