Documentation
¶
Index ¶
- func AddPeer(r *raft.Raft) redeo.Handler
- func Leader(r *raft.Raft) redeo.Handler
- func Peers(r *raft.Raft) redeo.Handler
- func RemovePeer(r *raft.Raft) redeo.Handler
- func Sentinel(name string, r *raft.Raft, b *redeo.PubSubBroker) redeo.Handler
- func Snapshot(r *raft.Raft) redeo.Handler
- func State(r *raft.Raft) redeo.Handler
- func Stats(r *raft.Raft) redeo.Handler
- type Config
- type Transport
- func (t *Transport) AppendEntries(_ raft.ServerID, target raft.ServerAddress, req *raft.AppendEntriesRequest, ...) error
- func (t *Transport) AppendEntriesPipeline(_ raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)
- func (t *Transport) Close() error
- func (t *Transport) Consumer() <-chan raft.RPC
- func (t *Transport) DecodePeer(peer []byte) raft.ServerAddress
- func (t *Transport) EncodePeer(_ raft.ServerID, peer raft.ServerAddress) []byte
- func (t *Transport) InstallSnapshot(_ raft.ServerID, target raft.ServerAddress, req *raft.InstallSnapshotRequest, ...) error
- func (t *Transport) LocalAddr() raft.ServerAddress
- func (t *Transport) RequestVote(_ raft.ServerID, target raft.ServerAddress, req *raft.RequestVoteRequest, ...) error
- func (t *Transport) SetHeartbeatHandler(fn func(rpc raft.RPC))
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Leader ¶
Leader handler retrieves the address of the cluster leader
Example ¶
This example demonstrates the use of the leader handler
package main
import (
"io"
"time"
"github.com/bsm/redeo"
"github.com/bsm/redeoraft"
"github.com/hashicorp/raft"
)
func main() {
// Init server
srv := redeo.NewServer(nil)
// Start raft
rft, tsp, err := startRaft(srv)
if err != nil {
panic(err)
}
defer rft.Shutdown()
defer tsp.Close()
// Report leader
srv.Handle("raftleader", redeoraft.Leader(rft))
// $ redis-cli -p 9736 raftleader
// "10.0.0.1:9736"
}
func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {
tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
Timeout: time.Minute,
})
rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
if err != nil {
_ = tsp.Close()
return nil, nil, err
}
return rft, tsp, nil
}
type ExampleRaftService struct{}
func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
return nil, raft.ErrNothingNewToSnapshot
}
func RemovePeer ¶
RemovePeer removes a member from the cluster
func Sentinel ¶
Sentinel handler respond to a subset of SENTINEL commands and makes your server behave like an instance of a sentinel cluster.
Implemented sub-commands are:
SENTINELS - returns (abbreviated) peer attributes MASTER - returns (abbreviated) master attributes SLAVES - returns (abbreviated) slave attributes GET-MASTER-ADDR-BY-NAME - returns a the master address
Example ¶
This example demonstrates the use of sentinel commands on the server
package main
import (
"io"
"time"
"github.com/bsm/redeo"
"github.com/bsm/redeoraft"
"github.com/hashicorp/raft"
)
func main() {
// Init server
srv := redeo.NewServer(nil)
// Start raft
rft, tsp, err := startRaft(srv)
if err != nil {
panic(err)
}
defer rft.Shutdown()
defer tsp.Close()
// Create a pub-sub broker and handle messages
broker := redeo.NewPubSubBroker()
srv.Handle("publish", broker.Publish())
srv.Handle("subscribe", broker.Subscribe())
// Listen to sentinel commands
srv.Handle("sentinel", redeoraft.Sentinel("", rft, broker))
// $ redis-cli -p 9736 sentinel get-master-addr-by-name mymaster
// 1) 10.0.0.1
// 2) 9736
}
func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {
tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
Timeout: time.Minute,
})
rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
if err != nil {
_ = tsp.Close()
return nil, nil, err
}
return rft, tsp, nil
}
type ExampleRaftService struct{}
func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
return nil, raft.ErrNothingNewToSnapshot
}
func State ¶
State handler returns the state of the current node
Example ¶
This example demonstrates the use of the state handler
package main
import (
"io"
"time"
"github.com/bsm/redeo"
"github.com/bsm/redeoraft"
"github.com/hashicorp/raft"
)
func main() {
// Init server
srv := redeo.NewServer(nil)
// Start raft
rft, tsp, err := startRaft(srv)
if err != nil {
panic(err)
}
defer rft.Shutdown()
defer tsp.Close()
// Report state
srv.Handle("raftstate", redeoraft.State(rft))
// $ redis-cli -p 9736 raftstate
// "leader"
}
func startRaft(srv *redeo.Server) (*raft.Raft, *redeoraft.Transport, error) {
tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
Timeout: time.Minute,
})
rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
if err != nil {
_ = tsp.Close()
return nil, nil, err
}
return rft, tsp, nil
}
type ExampleRaftService struct{}
func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
return nil, raft.ErrNothingNewToSnapshot
}
Types ¶
type Config ¶
type Config struct {
// AppendEntriesCommand allows to customise the
// command name which is used to append entries.
// Default: raftappend
AppendEntriesCommand string
// RequestVoteCommand allows to customise the
// command name which is used to request a vote.
// Default: raftvote
RequestVoteCommand string
// InstallSnapshotCommand allows to customise the
// command name which is used to install a snapshot.
// Default: raftsnapshot
InstallSnapshotCommand string
// Timeout is used to apply I/O deadlines.
// Default: 0 (= no timeout)
Timeout time.Duration
}
Config allows to customise transports
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport allows redeo instances to communicate cluster messages
Example ¶
package main
import (
"io"
"time"
"github.com/bsm/redeo"
"github.com/bsm/redeoraft"
"github.com/hashicorp/raft"
)
func main() {
// Init server with default config
srv := redeo.NewServer(nil)
// Init a new transport, this installs three new commands on your
// server:
// * raftappend - appends replicated log entries from leader
// * raftvote - replies to vote requests in an leadership election
// * raftsnapshot - installs a snapshot
tsp := redeoraft.NewTransport(srv, "10.0.0.1:9736", &redeoraft.Config{
Timeout: time.Minute,
})
defer tsp.Close()
// Use the transport in your raft configuration
rft, err := raft.NewRaft(raft.DefaultConfig(), &ExampleRaftService{}, raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), tsp)
if err != nil {
panic(err)
}
defer rft.Shutdown()
}
type ExampleRaftService struct{}
func (s *ExampleRaftService) Apply(_ *raft.Log) interface{} { return nil }
func (s *ExampleRaftService) Restore(_ io.ReadCloser) error { return nil }
func (s *ExampleRaftService) Snapshot() (raft.FSMSnapshot, error) {
return nil, raft.ErrNothingNewToSnapshot
}
func NewTransport ¶
NewTransport creates a new transport and installs the required handlers on the server (see Config). It also requires an address it can advertise to peers.
func (*Transport) AppendEntries ¶
func (t *Transport) AppendEntries(_ raft.ServerID, target raft.ServerAddress, req *raft.AppendEntriesRequest, res *raft.AppendEntriesResponse) error
AppendEntries implements the Transport interface.
func (*Transport) AppendEntriesPipeline ¶
func (t *Transport) AppendEntriesPipeline(_ raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)
AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.
func (*Transport) DecodePeer ¶
func (t *Transport) DecodePeer(peer []byte) raft.ServerAddress
DecodePeer implements the raft.Transport interface.
func (*Transport) EncodePeer ¶
EncodePeer implements the raft.Transport interface.
func (*Transport) InstallSnapshot ¶
func (t *Transport) InstallSnapshot(_ raft.ServerID, target raft.ServerAddress, req *raft.InstallSnapshotRequest, res *raft.InstallSnapshotResponse, snap io.Reader) error
InstallSnapshot implements the Transport interface.
func (*Transport) LocalAddr ¶
func (t *Transport) LocalAddr() raft.ServerAddress
LocalAddr implements the raft.Transport interface.
func (*Transport) RequestVote ¶
func (t *Transport) RequestVote(_ raft.ServerID, target raft.ServerAddress, req *raft.RequestVoteRequest, res *raft.RequestVoteResponse) error
RequestVote implements the Transport interface.
func (*Transport) SetHeartbeatHandler ¶
SetHeartbeatHandler implements the raft.Transport interface.
