Documentation
¶
Index ¶
- Constants
- Variables
- func NewID() int64
- func UseConsoleLog(logger *LoggerFactory)
- type Bootstrapper
- type Callback
- type CallbackInfo
- type CallbackInterface
- type CallbackRegister
- type CapError
- type CapOptions
- type CapPublishedMessage
- type CapReceivedMessage
- type DefaultDispatcher
- type DefaultLogger
- type EnqueuedState
- func (this *EnqueuedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
- func (this *EnqueuedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
- func (this *EnqueuedState) GetExpiresAfter() int32
- func (this *EnqueuedState) GetName() string
- type ErrorHanlder
- type FailedJobProcessor
- type FailedState
- func (this *FailedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
- func (this *FailedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
- func (this *FailedState) GetExpiresAfter() int32
- func (this *FailedState) GetName() string
- type FeiniuBusMessage
- type FeiniuBusMessageMetaData
- type IConsumerClient
- type IConsumerClientFactory
- type IConsumerHandler
- type IDispatcher
- type IFetchedMessage
- type ILockedMessage
- type ILockedMessages
- type ILogger
- type IProcessServer
- type IProcessor
- func NewDefaultDispatcher(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory, ...) IProcessor
- func NewFailedJobProcessor(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor
- func NewPublishQueuer(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor
- func NewSubscribeQueuer(options *CapOptions, connectionFactory *StorageConnectionFactory) IProcessor
- type IPublishDelegate
- type IPublisher
- type IQueueExecutor
- type IQueueExecutorFactory
- type IReceivedMessageHandler
- type IState
- type IStateChanger
- type IStorageConnection
- type IStorageTransaction
- type InfiniteRetryProcessor
- type LogDelegate
- type LogLevel
- type LoggerFactory
- type Looper
- type MessageContext
- type MessageDescriptor
- type ProcessResult
- type ProcessingContext
- type ProcessingState
- func (this *ProcessingState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
- func (this *ProcessingState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
- func (this *ProcessingState) GetExpiresAfter() int32
- func (this *ProcessingState) GetName() string
- type ProcessorContainer
- type ProcessorServer
- type PublishQueueExecutorCreateDelegate
- type PublishQueuer
- type PublisherFactory
- type QueueExecutorFactory
- func (this *QueueExecutorFactory) GetInstance(messageType string) IQueueExecutor
- func (this *QueueExecutorFactory) GetPublishQueueExecutorCreateDelegate() PublishQueueExecutorCreateDelegate
- func (this *QueueExecutorFactory) SetPublishQueueExecutorCreateDelegate(delegate PublishQueueExecutorCreateDelegate)
- type QueueExecutorPublish
- type QueueExecutorSubscribe
- type ReceiveHanlder
- type RetryBehavior
- type RetryInThunk
- type ScheduledState
- func (this *ScheduledState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
- func (this *ScheduledState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
- func (this *ScheduledState) GetExpiresAfter() int32
- func (this *ScheduledState) GetName() string
- type StateChanger
- type StorageConnectionFactory
- type SubscribeQueuer
- type SucceededState
- func (this *SucceededState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
- func (this *SucceededState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
- func (this *SucceededState) GetExpiresAfter() int32
- func (this *SucceededState) GetName() string
Constants ¶
const ( PUBLISH = "Publish" SUBSCRIBE = "Subscribe" )
const PublishedTableName = "cap.published"
const QueueTableName = "cap.queue"
const ReceivedTableName = "cap.received"
Variables ¶
var ( DefaultRetryInThunk = func(retries int32) int32 { val := math.Pow(float64(retries-1), float64(4)) + float64(15) + float64(rand.Int31n(30))*float64(retries) if (val - float64(int64(val))) > 0.5 { return int32(val + 1) } else { return int32(val) } } DefaultRetryCount = int32(3) DefaultRetry = NewRetryBehavior(true, DefaultRetryCount, DefaultRetryInThunk) NoRetry = NewRetryBehavior(false, DefaultRetryCount, DefaultRetryInThunk) )
var CapConnectionString string
Functions ¶
Types ¶
type Bootstrapper ¶
type Bootstrapper struct {
Servers []IProcessServer
CapOptions *CapOptions
Register *CallbackRegister
ConnectionFactory *StorageConnectionFactory
QueueExecutorFactory IQueueExecutorFactory
WaitGroup *sync.WaitGroup
// contains filtered or unexported fields
}
Bootstrapper provide CAP booting functions.
func NewBootstrapper ¶
func NewBootstrapper( capOptions *CapOptions, connectionFactory *StorageConnectionFactory, ) *Bootstrapper
NewBootstrapper implements to instantiate an instance of Bootstrapper.
func (*Bootstrapper) Bootstrap ¶
func (bootstrapper *Bootstrapper) Bootstrap()
Bootstrap start CAP servers.
func (*Bootstrapper) Route ¶
func (bootstrapper *Bootstrapper) Route(group, name string, cb CallbackInterface)
Route listeners.
func (*Bootstrapper) WaitForTerminalSignal ¶
func (bootstrapper *Bootstrapper) WaitForTerminalSignal()
WaitForTerminalSignal ...
type CallbackInfo ¶
func NewCallbackInfo ¶
func NewCallbackInfo(callback CallbackInterface) *CallbackInfo
type CallbackInterface ¶
type CallbackInterface interface {
Handle(msg interface{}) error
}
type CallbackRegister ¶
type CallbackRegister struct {
// [groupName][routeName] = CallbackInfo
Routers map[string]map[string]*CallbackInfo
}
func NewCallbackRegister ¶
func NewCallbackRegister() *CallbackRegister
func (*CallbackRegister) Add ¶
func (this *CallbackRegister) Add(group, name string, callback CallbackInterface)
func (*CallbackRegister) Get ¶
func (this *CallbackRegister) Get(group, name string) (CallbackInterface, error)
type CapError ¶
func NewCapError ¶
type CapOptions ¶
func NewCapOptions ¶
func NewCapOptions() *CapOptions
func (*CapOptions) GetConnectionString ¶
func (capOptions *CapOptions) GetConnectionString() (string, error)
type CapPublishedMessage ¶
type CapReceivedMessage ¶
type CapReceivedMessage struct {
Id int
Name string
Group string
Content string
Added int64
ExpiresAt int
Retries int
StatusName string
LastWarnedTime int
MessageId int64
TransactionId int64
}
func NewCapReceivedMessage ¶
func NewCapReceivedMessage(context MessageContext) *CapReceivedMessage
type DefaultDispatcher ¶
type DefaultDispatcher struct {
StorageConnectionFactory *StorageConnectionFactory
CapOptions *CapOptions
QueueExecutorFactory IQueueExecutorFactory
// contains filtered or unexported fields
}
DefaultDispatcher ...
func (*DefaultDispatcher) Process ¶
func (dispatcher *DefaultDispatcher) Process(context *ProcessingContext) (*ProcessResult, error)
Process ...
func (*DefaultDispatcher) ProcessCore ¶
func (dispatcher *DefaultDispatcher) ProcessCore(context *ProcessingContext) (bool, error)
ProcessCore ...
type DefaultLogger ¶
type DefaultLogger struct {
TypeName string
// contains filtered or unexported fields
}
DefaultLogger ...
func (*DefaultLogger) Log ¶
func (logger *DefaultLogger) Log(level LogLevel, message string)
Log ...
func (*DefaultLogger) LogData ¶
func (logger *DefaultLogger) LogData(level LogLevel, message string, data interface{})
LogData ...
type EnqueuedState ¶
type EnqueuedState struct {
IState
}
func (*EnqueuedState) ApplyPublishedMessage ¶
func (this *EnqueuedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
func (*EnqueuedState) ApplyReceivedMessage ¶
func (this *EnqueuedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
func (*EnqueuedState) GetExpiresAfter ¶
func (this *EnqueuedState) GetExpiresAfter() int32
func (*EnqueuedState) GetName ¶
func (this *EnqueuedState) GetName() string
type ErrorHanlder ¶
type ErrorHanlder func(str string)
type FailedJobProcessor ¶
type FailedJobProcessor struct {
Options *CapOptions
StateChanger IStateChanger
StorageConnectionFactory *StorageConnectionFactory
// contains filtered or unexported fields
}
FailedJobProcessor ...
func (*FailedJobProcessor) Process ¶
func (processor *FailedJobProcessor) Process(context *ProcessingContext) (*ProcessResult, error)
Process ...
func (*FailedJobProcessor) ProcessPublishedMessage ¶
func (processor *FailedJobProcessor) ProcessPublishedMessage(connection IStorageConnection) error
ProcessPublishedMessage ...
func (*FailedJobProcessor) ProcessReceivedMessage ¶
func (processor *FailedJobProcessor) ProcessReceivedMessage(connection IStorageConnection) error
ProcessReceivedMessage ...
type FailedState ¶
type FailedState struct {
IState
}
func NewFailedState ¶
func NewFailedState() *FailedState
func (*FailedState) ApplyPublishedMessage ¶
func (this *FailedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
func (*FailedState) ApplyReceivedMessage ¶
func (this *FailedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
func (*FailedState) GetExpiresAfter ¶
func (this *FailedState) GetExpiresAfter() int32
func (*FailedState) GetName ¶
func (this *FailedState) GetName() string
type FeiniuBusMessage ¶
type FeiniuBusMessage struct {
MetaData FeiniuBusMessageMetaData `json:"meta"`
Content string `json:"content"`
}
FeiniuBusMessage ...
type FeiniuBusMessageMetaData ¶
type FeiniuBusMessageMetaData struct {
TransactionID int64 `json:"transaction_id,string"`
MessageID int64 `json:"message_id,string"`
}
FeiniuBusMessageMetaData ...
type IConsumerClient ¶
type IConsumerClient interface {
Subscribe(topics []string)
Listening(timeoutSecs int, ch chan bool)
Commit(context MessageContext)
Close()
SetOnReceive(onReceive ReceiveHanlder)
SetOnError(onError ErrorHanlder)
}
type IConsumerClientFactory ¶
type IConsumerClientFactory interface {
Create(group string) IConsumerClient
}
type IConsumerHandler ¶
type IConsumerHandler interface {
IProcessServer
}
type IDispatcher ¶
type IDispatcher interface {
IProcessor
}
type IFetchedMessage ¶
type ILockedMessage ¶
type ILockedMessage interface {
Prepare(statement string) (stmt interface{}, err error)
Commit() error
Rollback() error
Dispose()
GetMessage() interface{}
GetMessageType() int32
ChangeState(state IState) error
}
ILockedMessage ...
type ILockedMessages ¶
type ILockedMessages interface {
Prepare(statement string) (stmt interface{}, err error)
Commit() error
Rollback() error
Dispose()
ChangeStates(state IState) error
GetMessages() []ILockedMessage
GetMessageType() int32
AppendMessage(i interface{})
}
ILockedMessages
type ILogger ¶
type ILogger interface {
Log(level LogLevel, message string)
LogData(level LogLevel, message string, data interface{})
}
ILogger ...
type IProcessServer ¶
IProcessServer ...
type IProcessor ¶
type IProcessor interface {
Process(context *ProcessingContext) (*ProcessResult, error)
}
IProcessor bla.
func NewDefaultDispatcher ¶
func NewDefaultDispatcher(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory, factory IQueueExecutorFactory, ) IProcessor
NewDefaultDispatcher ...
func NewFailedJobProcessor ¶
func NewFailedJobProcessor(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor
NewFailedJobProcessor...
func NewPublishQueuer ¶
func NewPublishQueuer(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor
NewPublishQueuer bla.
func NewSubscribeQueuer ¶
func NewSubscribeQueuer(options *CapOptions, connectionFactory *StorageConnectionFactory) IProcessor
NewSubscribeQueuer ...
type IPublishDelegate ¶
type IPublisher ¶
type IPublisher interface {
Publish(descriptors []*MessageDescriptor, connection interface{}, transaction interface{}) error
PublishOne(name string, content interface{}, connection interface{}, transaction interface{}) error
}
IPublisher ...
type IQueueExecutor ¶
type IQueueExecutor interface {
Execute(connection IStorageConnection, feched IFetchedMessage) error
}
type IQueueExecutorFactory ¶
type IQueueExecutorFactory interface {
SetPublishQueueExecutorCreateDelegate(delegate PublishQueueExecutorCreateDelegate)
GetPublishQueueExecutorCreateDelegate() PublishQueueExecutorCreateDelegate
GetInstance(messageType string) IQueueExecutor
}
type IReceivedMessageHandler ¶
type IReceivedMessageHandler interface {
Handle(message interface{})
}
type IState ¶
type IState interface {
GetExpiresAfter() int32
GetName() string
ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
}
func NewEnqueuedState ¶
func NewEnqueuedState() IState
type IStateChanger ¶
type IStateChanger interface {
ChangeReceivedMessageState(message *CapReceivedMessage, state IState, transaction IStorageTransaction) error
ChangePublishedMessage(message *CapPublishedMessage, state IState, transaction IStorageTransaction) error
}
type IStorageConnection ¶
type IStorageConnection interface {
CreateTransaction() (IStorageTransaction, error)
FetchNextMessage() (IFetchedMessage, error)
GetFailedPublishedMessages() ([]*CapPublishedMessage, error)
GetFailedReceivedMessages() ([]*CapReceivedMessage, error)
GetNextPublishedMessageToBeEnqueued() (*CapPublishedMessage, error)
GetNextReceviedMessageToBeEnqueued() (*CapReceivedMessage, error)
GetPublishedMessage(id int) (*CapPublishedMessage, error)
GetReceivedMessage(id int) (*CapReceivedMessage, error)
StoreReceivedMessage(message *CapReceivedMessage) error
GetNextLockedMessageToBeEnqueued(messageType int32) (ILockedMessage, error)
GetFailedLockedMessages(messageType int32) (ILockedMessages, error)
}
type IStorageTransaction ¶
type IStorageTransaction interface {
Commit() error
EnqueuePublishedMessage(message *CapPublishedMessage) error
EnqueueReceivedMessage(message *CapReceivedMessage) error
UpdatePublishedMessage(message *CapPublishedMessage) error
UpdateReceivedMessage(message *CapReceivedMessage) error
Dispose()
}
type InfiniteRetryProcessor ¶
type InfiniteRetryProcessor struct {
InnerProcessor IProcessor
Status string
// contains filtered or unexported fields
}
InfiniteRetryProcessor bla.
func NewInfiniteRetryProcessor ¶
func NewInfiniteRetryProcessor(innerProcessor IProcessor) *InfiniteRetryProcessor
NewInfiniteRetryProcessor bla.
func (InfiniteRetryProcessor) Process ¶
func (processor InfiniteRetryProcessor) Process(context *ProcessingContext)
Process bla.
type LogDelegate ¶
LogDelegate ...
type LoggerFactory ¶
type LoggerFactory struct {
// contains filtered or unexported fields
}
LoggerFactory ...
func (*LoggerFactory) CreateLogger ¶
func (factory *LoggerFactory) CreateLogger(i interface{}) ILogger
CreateLogger ...
func (*LoggerFactory) Register ¶
func (factory *LoggerFactory) Register(delegate *LogDelegate)
Register ...
type MessageContext ¶
type MessageDescriptor ¶
type MessageDescriptor struct {
Name string
Content interface{}
}
MessageDescriptor ...
type ProcessResult ¶
ProcessResult bla.
func ProcessSleeping ¶
func ProcessSleeping(pollingDelay time.Duration) *ProcessResult
ProcessSleeping bla.
type ProcessingContext ¶
type ProcessingContext struct {
IsStopping bool
}
ProcessingContext bla.
func NewProcessingContext ¶
func NewProcessingContext() *ProcessingContext
NewProcessingContext bla.
func (*ProcessingContext) ThrowIfStopping ¶
func (context *ProcessingContext) ThrowIfStopping() error
ThrowIfStopping bla.
type ProcessingState ¶
type ProcessingState struct {
IState
}
func NewProcessingState ¶
func NewProcessingState() *ProcessingState
func (*ProcessingState) ApplyPublishedMessage ¶
func (this *ProcessingState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
func (*ProcessingState) ApplyReceivedMessage ¶
func (this *ProcessingState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
func (*ProcessingState) GetExpiresAfter ¶
func (this *ProcessingState) GetExpiresAfter() int32
func (*ProcessingState) GetName ¶
func (this *ProcessingState) GetName() string
type ProcessorContainer ¶
type ProcessorContainer struct {
Processors []IProcessor
}
ProcessorContainer bla.
func NewProcessorContainer ¶
func NewProcessorContainer() *ProcessorContainer
NewProcessorContainer bla.
func (*ProcessorContainer) GetProcessors ¶
func (container *ProcessorContainer) GetProcessors() []*InfiniteRetryProcessor
GetProcessors bla.
func (*ProcessorContainer) Register ¶
func (container *ProcessorContainer) Register(processor IProcessor) *ProcessorContainer
Register bla.
type ProcessorServer ¶
type ProcessorServer struct {
Container *ProcessorContainer
Context *ProcessingContext
Processors []*InfiniteRetryProcessor
}
ProcessorServer bla.
func (*ProcessorServer) StopTheWorld ¶
func (server *ProcessorServer) StopTheWorld() chan bool
StopTheWorld ...
func (*ProcessorServer) WaitForClose ¶
func (server *ProcessorServer) WaitForClose(wg *sync.WaitGroup)
WaitForClose bla.
type PublishQueueExecutorCreateDelegate ¶
type PublishQueueExecutorCreateDelegate func() IQueueExecutor
type PublishQueuer ¶
type PublishQueuer struct {
StateChanger IStateChanger
Options *CapOptions
StorageConnectionFactory *StorageConnectionFactory
// contains filtered or unexported fields
}
PublishQueuer blablabla .
func (*PublishQueuer) Process ¶
func (processor *PublishQueuer) Process(context *ProcessingContext) (*ProcessResult, error)
Process blablabla.
type PublisherFactory ¶
type PublisherFactory struct {
CreatePublisher func(options *CapOptions) (IPublisher, error)
}
func NewPublisherFactory ¶
func NewPublisherFactory(createPublisher func(options *CapOptions) (IPublisher, error)) *PublisherFactory
type QueueExecutorFactory ¶
type QueueExecutorFactory struct {
IQueueExecutorFactory
PublishQueueExecutorCreateDelegate PublishQueueExecutorCreateDelegate
Register *CallbackRegister
}
func NewQueueExecutorFactory ¶
func NewQueueExecutorFactory(register *CallbackRegister) *QueueExecutorFactory
func (*QueueExecutorFactory) GetInstance ¶
func (this *QueueExecutorFactory) GetInstance(messageType string) IQueueExecutor
func (*QueueExecutorFactory) GetPublishQueueExecutorCreateDelegate ¶
func (this *QueueExecutorFactory) GetPublishQueueExecutorCreateDelegate() PublishQueueExecutorCreateDelegate
func (*QueueExecutorFactory) SetPublishQueueExecutorCreateDelegate ¶
func (this *QueueExecutorFactory) SetPublishQueueExecutorCreateDelegate(delegate PublishQueueExecutorCreateDelegate)
type QueueExecutorPublish ¶
type QueueExecutorPublish struct {
IQueueExecutor
StateChanger IStateChanger
PublishDelegate IPublishDelegate
// contains filtered or unexported fields
}
QueueExecutorPublish ...
func NewQueueExecutorPublish ¶
func NewQueueExecutorPublish(stateChanger IStateChanger, delegate IPublishDelegate) *QueueExecutorPublish
NewQueueExecutorPublish ...
func (*QueueExecutorPublish) Execute ¶
func (executor *QueueExecutorPublish) Execute(connection IStorageConnection, feched IFetchedMessage) error
Execute ...
func (*QueueExecutorPublish) UpdateMessageForRetry ¶
func (executor *QueueExecutorPublish) UpdateMessageForRetry(message *CapPublishedMessage, connection IStorageConnection) (bool, error)
UpdateMessageForRetry ...
type QueueExecutorSubscribe ¶
type QueueExecutorSubscribe struct {
IQueueExecutor
Register *CallbackRegister
// contains filtered or unexported fields
}
QueueExecutorSubscribe ...
func NewQueueExecutorSubscribe ¶
func NewQueueExecutorSubscribe(register *CallbackRegister) *QueueExecutorSubscribe
NewQueueExecutorSubscribe ..
func (*QueueExecutorSubscribe) Execute ¶
func (executor *QueueExecutorSubscribe) Execute(connection IStorageConnection, feched IFetchedMessage) error
Execute ...
type ReceiveHanlder ¶
type ReceiveHanlder func(ctx MessageContext)
type RetryBehavior ¶
type RetryBehavior struct {
RetryInThunk RetryInThunk
Retry bool
RetryCount int32
}
func NewRetryBehavior ¶
func NewRetryBehavior(retry bool, count int32, thunk RetryInThunk) *RetryBehavior
func (*RetryBehavior) RetryIn ¶
func (this *RetryBehavior) RetryIn(reties int32) int32
type RetryInThunk ¶
type ScheduledState ¶
type ScheduledState struct {
IState
}
func NewScheduledState ¶
func NewScheduledState() *ScheduledState
func (*ScheduledState) ApplyPublishedMessage ¶
func (this *ScheduledState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
func (*ScheduledState) ApplyReceivedMessage ¶
func (this *ScheduledState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
func (*ScheduledState) GetExpiresAfter ¶
func (this *ScheduledState) GetExpiresAfter() int32
func (*ScheduledState) GetName ¶
func (this *ScheduledState) GetName() string
type StateChanger ¶
type StateChanger struct {
}
StateChanger ..
func (*StateChanger) ChangePublishedMessage ¶
func (stateChanger *StateChanger) ChangePublishedMessage(message *CapPublishedMessage, state IState, transaction IStorageTransaction) error
ChangePublishedMessage ...
func (*StateChanger) ChangeReceivedMessageState ¶
func (stateChanger *StateChanger) ChangeReceivedMessageState(message *CapReceivedMessage, state IState, transaction IStorageTransaction) error
ChangeReceivedMessageState ...
type StorageConnectionFactory ¶
type StorageConnectionFactory struct {
CreateStorageConnection func(options *CapOptions) (IStorageConnection, error)
}
func NewStorageConnectionFactory ¶
func NewStorageConnectionFactory(delegate func(options *CapOptions) (IStorageConnection, error)) *StorageConnectionFactory
type SubscribeQueuer ¶
type SubscribeQueuer struct {
Options *CapOptions
StorageConnectionFactory *StorageConnectionFactory
// contains filtered or unexported fields
}
SubscribeQueuer ...
func (*SubscribeQueuer) Process ¶
func (processor *SubscribeQueuer) Process(context *ProcessingContext) (*ProcessResult, error)
Process ...
type SucceededState ¶
type SucceededState struct {
IState
}
func NewSucceededState ¶
func NewSucceededState() *SucceededState
func (*SucceededState) ApplyPublishedMessage ¶
func (this *SucceededState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
func (*SucceededState) ApplyReceivedMessage ¶
func (this *SucceededState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
func (*SucceededState) GetExpiresAfter ¶
func (this *SucceededState) GetExpiresAfter() int32
func (*SucceededState) GetName ¶
func (this *SucceededState) GetName() string
Source Files
¶
- IdGenerator.go
- Ilockedmessage.go
- bootstrap.go
- callback.go
- cap_message_descriptor.go
- caperror.go
- capoptions.go
- cappublishedmessage.go
- capreceivedmessage.go
- enqueued_state.go
- failed_state.go
- feiniu_message.go
- iconsumerclient.go
- iconsumerclientfactory.go
- iconsumerhandler.go
- idispatcher.default.go
- idispatcher.go
- ifetchedmessage.go
- ilockedmessages.go
- ilogger.console.go
- ilogger.default.go
- ilogger.delegate.go
- ilogger.factory.go
- ilogger.go
- ilogger.levels.go
- infinite_retry_processor.go
- iprocessor.failed_job_processor.go
- iprocessor.go
- iprocessor.publish_queuer.go
- iprocessor.subscribe_queuer.go
- iprocessserver.go
- ipublish_delegate.go
- ipublisher.go
- iqueue_executor.go
- iqueue_executor_factory.go
- ireceivedmessagehandler.go
- istate.go
- istate_changer.default.go
- istate_changer.go
- istorageconnection.go
- istoragetransaction.go
- looper.go
- messagecontext.go
- options.go
- process_result.go
- processing_context.go
- processing_state.go
- processor_container.go
- processor_server.go
- publisher_factory.go
- queue_executor_factory.go
- queue_executor_publish_base.go
- queue_executor_subscribe.go
- retry_behavior.go
- router.go
- scheduled_state.go
- storage_connection_factory.go
- succeeded_state.go