Documentation
¶
Index ¶
- Variables
- type FetchDirection
- type FetchRequest
- func (fr *FetchRequest) Done()
- func (fr *FetchRequest) Error(err error)
- func (fr *FetchRequest) Errors() <-chan error
- func (fr *FetchRequest) Init()
- func (fr *FetchRequest) IsDone() bool
- func (fr *FetchRequest) Messages() <-chan *FetchedMessage
- func (fr *FetchRequest) Push(id uint64, message []byte)
- func (fr *FetchRequest) PushError(err error)
- func (fr *FetchRequest) PushFetchMessage(fm *FetchedMessage)
- func (fr *FetchRequest) Ready() int
- type FetchedMessage
- type MessagePartition
- type MessageStore
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type FetchDirection ¶
type FetchDirection int
const ( DirectionOneMessage FetchDirection = 0 DirectionForward FetchDirection = 1 DirectionBackwards FetchDirection = -1 // TODO Bogdan decide the channel size and if should be customizable FetchBufferSize = 10 )
type FetchRequest ¶
type FetchRequest struct {
sync.RWMutex
// Partition is the Store name to search for messages
Partition string
// StartID is the message sequence id to start
StartID uint64
// EndID is the message sequence id to finish. If will not be used.
EndID uint64
// Direction has 3 possible values:
// Direction == 0: Only the Message with StartId
// Direction == 1: Fetch also the next Count Messages with a higher MessageId
// Direction == -1: Fetch also the next Count Messages with a lower MessageId
Direction FetchDirection
// Count is the maximum number of messages to return
Count int
// MessageC is the channel to send the message back to the receiver
MessageC chan *FetchedMessage
// ErrorC is a channel if an error occurs
ErrorC chan error
// StartC Through this channel , the total number or result
// is returned, before sending the first message.
// The Fetch() methods blocks on putting the number to the start channel.
StartC chan int
// contains filtered or unexported fields
}
FetchRequest is used for fetching messages in a MessageStore.
func NewFetchRequest ¶
func NewFetchRequest(partition string, start, end uint64, direction FetchDirection, count int) *FetchRequest
NewFetchRequest creates a new FetchRequest pointer initialized with provided values if `count` is negative will be set to MaxInt32
func (*FetchRequest) Done ¶
func (fr *FetchRequest) Done()
func (*FetchRequest) Error ¶
func (fr *FetchRequest) Error(err error)
func (*FetchRequest) Errors ¶
func (fr *FetchRequest) Errors() <-chan error
func (*FetchRequest) Init ¶
func (fr *FetchRequest) Init()
func (*FetchRequest) IsDone ¶
func (fr *FetchRequest) IsDone() bool
func (*FetchRequest) Messages ¶
func (fr *FetchRequest) Messages() <-chan *FetchedMessage
func (*FetchRequest) Push ¶
func (fr *FetchRequest) Push(id uint64, message []byte)
func (*FetchRequest) PushError ¶
func (fr *FetchRequest) PushError(err error)
func (*FetchRequest) PushFetchMessage ¶
func (fr *FetchRequest) PushFetchMessage(fm *FetchedMessage)
func (*FetchRequest) Ready ¶
func (fr *FetchRequest) Ready() int
Ready returns the count of messages that will be returned meaning that the fetch is starting. It reads the number from the StartC channel.
type FetchedMessage ¶
FetchedMessage is a struct containing a pair: guble Message and its ID.
type MessagePartition ¶
type MessageStore ¶
type MessageStore interface {
// Store a message within a partition.
// The message id must be equal to MaxMessageId +1.
// So the caller has to maintain the consistence between
// fetching an id and storing the message.
Store(partition string, messageID uint64, data []byte) error
// Generates a new ID for the message if it's new and stores it
// Returns the size of the new message or error
// Takes the message and cluster node ID as parameters.
StoreMessage(*protocol.Message, uint8) (int, error)
// Fetch fetches a set of messages.
// The results, as well as errors are communicated asynchronously using
// the channels, supplied by the FetchRequest.
Fetch(*FetchRequest)
// MaxMessageId returns the highest message id for a particular partition
MaxMessageID(partition string) (uint64, error)
// DoInTx executes the supplied function within the locking context of the message partition.
// This ensures, that wile the code is executed, no change to the supplied maxMessageId can occur.
// The error result if the fnToExecute or an error while locking will be returned by DoInTx.
DoInTx(partition string, fnToExecute func(uint64) error) error
// GenerateNextMsgId generates a new message ID based on a timestamp in a strictly monotonically order
GenerateNextMsgID(partition string, nodeID uint8) (uint64, int64, error)
Partition(string) (MessagePartition, error)
// Partitions returns a slice of `MessagePartition` available in the store
Partitions() ([]MessagePartition, error)
}
MessageStore is an interface for a persistence backend storing topics.
Source Files
¶
- fetch_request.go
- store.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package filestore is a filesystem-based implementation of the MessageStore interface.
|
Package filestore is a filesystem-based implementation of the MessageStore interface. |
Click to show internal directories.
Click to hide internal directories.