cap

package
v0.0.0-...-2746f0d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2017 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PUBLISH   = "Publish"
	SUBSCRIBE = "Subscribe"
)
View Source
const PublishedTableName = "cap.published"
View Source
const QueueTableName = "cap.queue"
View Source
const ReceivedTableName = "cap.received"

Variables

View Source
var (
	DefaultRetryInThunk = func(retries int32) int32 {
		val := math.Pow(float64(retries-1), float64(4)) + float64(15) + float64(rand.Int31n(30))*float64(retries)
		if (val - float64(int64(val))) > 0.5 {
			return int32(val + 1)
		} else {
			return int32(val)
		}
	}
	DefaultRetryCount = int32(3)
	DefaultRetry      = NewRetryBehavior(true, DefaultRetryCount, DefaultRetryInThunk)
	NoRetry           = NewRetryBehavior(false, DefaultRetryCount, DefaultRetryInThunk)
)
View Source
var CapConnectionString string

Functions

func NewID

func NewID() int64

NewID Generate an int64 unique id.

func UseConsoleLog

func UseConsoleLog(logger *LoggerFactory)

UseConsoleLog ...

Types

type Bootstrapper

type Bootstrapper struct {
	Servers              []IProcessServer
	CapOptions           *CapOptions
	Register             *CallbackRegister
	ConnectionFactory    *StorageConnectionFactory
	QueueExecutorFactory IQueueExecutorFactory
	WaitGroup            *sync.WaitGroup
	// contains filtered or unexported fields
}

Bootstrapper provide CAP booting functions.

func NewBootstrapper

func NewBootstrapper(
	capOptions *CapOptions,
	connectionFactory *StorageConnectionFactory,
) *Bootstrapper

NewBootstrapper implements to instantiate an instance of Bootstrapper.

func (*Bootstrapper) Bootstrap

func (bootstrapper *Bootstrapper) Bootstrap()

Bootstrap start CAP servers.

func (*Bootstrapper) Close

func (bootstrapper *Bootstrapper) Close()

Close all started servers.

func (*Bootstrapper) Route

func (bootstrapper *Bootstrapper) Route(group, name string, cb CallbackInterface)

Route listeners.

func (*Bootstrapper) WaitForTerminalSignal

func (bootstrapper *Bootstrapper) WaitForTerminalSignal()

WaitForTerminalSignal ...

type Callback

type Callback struct {
}

Callback provide Sample Handler.

type CallbackInfo

type CallbackInfo struct {
	CallbackType reflect.Type
}

func NewCallbackInfo

func NewCallbackInfo(callback CallbackInterface) *CallbackInfo

type CallbackInterface

type CallbackInterface interface {
	Handle(msg interface{}) error
}

type CallbackRegister

type CallbackRegister struct {
	// [groupName][routeName] = CallbackInfo
	Routers map[string]map[string]*CallbackInfo
}

func NewCallbackRegister

func NewCallbackRegister() *CallbackRegister

func (*CallbackRegister) Add

func (this *CallbackRegister) Add(group, name string, callback CallbackInterface)

func (*CallbackRegister) Get

func (this *CallbackRegister) Get(group, name string) (CallbackInterface, error)

type CapError

type CapError struct {
	When time.Time
	What string
}

func NewCapError

func NewCapError(message string) CapError

func (CapError) Error

func (err CapError) Error() string

type CapOptions

type CapOptions struct {
	ConnectionString string
	PoolingDelay     time.Duration
}

func NewCapOptions

func NewCapOptions() *CapOptions

func (*CapOptions) GetConnectionString

func (capOptions *CapOptions) GetConnectionString() (string, error)

type CapPublishedMessage

type CapPublishedMessage struct {
	Id             int
	Name           string
	Content        string
	Added          int
	ExpiresAt      int
	Retries        int
	StatusName     string
	LastWarnedTime int
	MessageId      int64
	TransactionId  int64
}

type CapReceivedMessage

type CapReceivedMessage struct {
	Id             int
	Name           string
	Group          string
	Content        string
	Added          int64
	ExpiresAt      int
	Retries        int
	StatusName     string
	LastWarnedTime int
	MessageId      int64
	TransactionId  int64
}

func NewCapReceivedMessage

func NewCapReceivedMessage(context MessageContext) *CapReceivedMessage

type DefaultDispatcher

type DefaultDispatcher struct {
	StorageConnectionFactory *StorageConnectionFactory
	CapOptions               *CapOptions
	QueueExecutorFactory     IQueueExecutorFactory
	// contains filtered or unexported fields
}

DefaultDispatcher ...

func (*DefaultDispatcher) Process

func (dispatcher *DefaultDispatcher) Process(context *ProcessingContext) (*ProcessResult, error)

Process ...

func (*DefaultDispatcher) ProcessCore

func (dispatcher *DefaultDispatcher) ProcessCore(context *ProcessingContext) (bool, error)

ProcessCore ...

type DefaultLogger

type DefaultLogger struct {
	TypeName string
	// contains filtered or unexported fields
}

DefaultLogger ...

func (*DefaultLogger) Log

func (logger *DefaultLogger) Log(level LogLevel, message string)

Log ...

func (*DefaultLogger) LogData

func (logger *DefaultLogger) LogData(level LogLevel, message string, data interface{})

LogData ...

type EnqueuedState

type EnqueuedState struct {
	IState
}

func (*EnqueuedState) ApplyPublishedMessage

func (this *EnqueuedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error

func (*EnqueuedState) ApplyReceivedMessage

func (this *EnqueuedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error

func (*EnqueuedState) GetExpiresAfter

func (this *EnqueuedState) GetExpiresAfter() int32

func (*EnqueuedState) GetName

func (this *EnqueuedState) GetName() string

type ErrorHanlder

type ErrorHanlder func(str string)

type FailedJobProcessor

type FailedJobProcessor struct {
	Options                  *CapOptions
	StateChanger             IStateChanger
	StorageConnectionFactory *StorageConnectionFactory
	// contains filtered or unexported fields
}

FailedJobProcessor ...

func (*FailedJobProcessor) Process

func (processor *FailedJobProcessor) Process(context *ProcessingContext) (*ProcessResult, error)

Process ...

func (*FailedJobProcessor) ProcessPublishedMessage

func (processor *FailedJobProcessor) ProcessPublishedMessage(connection IStorageConnection) error

ProcessPublishedMessage ...

func (*FailedJobProcessor) ProcessReceivedMessage

func (processor *FailedJobProcessor) ProcessReceivedMessage(connection IStorageConnection) error

ProcessReceivedMessage ...

type FailedState

type FailedState struct {
	IState
}

func NewFailedState

func NewFailedState() *FailedState

func (*FailedState) ApplyPublishedMessage

func (this *FailedState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error

func (*FailedState) ApplyReceivedMessage

func (this *FailedState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error

func (*FailedState) GetExpiresAfter

func (this *FailedState) GetExpiresAfter() int32

func (*FailedState) GetName

func (this *FailedState) GetName() string

type FeiniuBusMessage

type FeiniuBusMessage struct {
	MetaData FeiniuBusMessageMetaData `json:"meta"`
	Content  string                   `json:"content"`
}

FeiniuBusMessage ...

type FeiniuBusMessageMetaData

type FeiniuBusMessageMetaData struct {
	TransactionID int64 `json:"transaction_id,string"`
	MessageID     int64 `json:"message_id,string"`
}

FeiniuBusMessageMetaData ...

type IConsumerClient

type IConsumerClient interface {
	Subscribe(topics []string)
	Listening(timeoutSecs int, ch chan bool)
	Commit(context MessageContext)
	Close()
	SetOnReceive(onReceive ReceiveHanlder)
	SetOnError(onError ErrorHanlder)
}

type IConsumerClientFactory

type IConsumerClientFactory interface {
	Create(group string) IConsumerClient
}

type IConsumerHandler

type IConsumerHandler interface {
	IProcessServer
}

type IDispatcher

type IDispatcher interface {
	IProcessor
}

type IFetchedMessage

type IFetchedMessage interface {
	GetMessageId() (messageId int)

	GetMessageType() (messageType int)

	RemoveFromQueue() error
	Requeue() error

	Dispose() error
}

type ILockedMessage

type ILockedMessage interface {
	Prepare(statement string) (stmt interface{}, err error)
	Commit() error
	Rollback() error
	Dispose()
	GetMessage() interface{}
	GetMessageType() int32
	ChangeState(state IState) error
}

ILockedMessage ...

type ILockedMessages

type ILockedMessages interface {
	Prepare(statement string) (stmt interface{}, err error)
	Commit() error
	Rollback() error
	Dispose()
	ChangeStates(state IState) error
	GetMessages() []ILockedMessage
	GetMessageType() int32
	AppendMessage(i interface{})
}

ILockedMessages

type ILogger

type ILogger interface {
	Log(level LogLevel, message string)
	LogData(level LogLevel, message string, data interface{})
}

ILogger ...

type IProcessServer

type IProcessServer interface {
	Start()
	WaitForClose(wg *sync.WaitGroup)
}

IProcessServer ...

func NewProcessorServer

func NewProcessorServer() IProcessServer

NewProcessorServer bla.

type IProcessor

type IProcessor interface {
	Process(context *ProcessingContext) (*ProcessResult, error)
}

IProcessor bla.

func NewDefaultDispatcher

func NewDefaultDispatcher(capOptions *CapOptions,
	storageConnectionFactory *StorageConnectionFactory,
	factory IQueueExecutorFactory,
) IProcessor

NewDefaultDispatcher ...

func NewFailedJobProcessor

func NewFailedJobProcessor(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor

NewFailedJobProcessor...

func NewPublishQueuer

func NewPublishQueuer(capOptions *CapOptions, storageConnectionFactory *StorageConnectionFactory) IProcessor

NewPublishQueuer bla.

func NewSubscribeQueuer

func NewSubscribeQueuer(options *CapOptions, connectionFactory *StorageConnectionFactory) IProcessor

NewSubscribeQueuer ...

type IPublishDelegate

type IPublishDelegate interface {
	Publish(keyName, content string) error
}

type IPublisher

type IPublisher interface {
	Publish(descriptors []*MessageDescriptor, connection interface{}, transaction interface{}) error

	PublishOne(name string, content interface{}, connection interface{}, transaction interface{}) error
}

IPublisher ...

type IQueueExecutor

type IQueueExecutor interface {
	Execute(connection IStorageConnection, feched IFetchedMessage) error
}

type IQueueExecutorFactory

type IQueueExecutorFactory interface {
	SetPublishQueueExecutorCreateDelegate(delegate PublishQueueExecutorCreateDelegate)
	GetPublishQueueExecutorCreateDelegate() PublishQueueExecutorCreateDelegate
	GetInstance(messageType string) IQueueExecutor
}

type IReceivedMessageHandler

type IReceivedMessageHandler interface {
	Handle(message interface{})
}

type IState

type IState interface {
	GetExpiresAfter() int32
	GetName() string
	ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error
	ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error
}

func NewEnqueuedState

func NewEnqueuedState() IState

type IStateChanger

type IStateChanger interface {
	ChangeReceivedMessageState(message *CapReceivedMessage, state IState, transaction IStorageTransaction) error
	ChangePublishedMessage(message *CapPublishedMessage, state IState, transaction IStorageTransaction) error
}

func NewStateChanger

func NewStateChanger() IStateChanger

NewStateChanger ..

type IStorageConnection

type IStorageConnection interface {
	CreateTransaction() (IStorageTransaction, error)

	FetchNextMessage() (IFetchedMessage, error)

	GetFailedPublishedMessages() ([]*CapPublishedMessage, error)

	GetFailedReceivedMessages() ([]*CapReceivedMessage, error)

	GetNextPublishedMessageToBeEnqueued() (*CapPublishedMessage, error)

	GetNextReceviedMessageToBeEnqueued() (*CapReceivedMessage, error)

	GetPublishedMessage(id int) (*CapPublishedMessage, error)

	GetReceivedMessage(id int) (*CapReceivedMessage, error)

	StoreReceivedMessage(message *CapReceivedMessage) error

	GetNextLockedMessageToBeEnqueued(messageType int32) (ILockedMessage, error)

	GetFailedLockedMessages(messageType int32) (ILockedMessages, error)
}

type IStorageTransaction

type IStorageTransaction interface {
	Commit() error

	EnqueuePublishedMessage(message *CapPublishedMessage) error

	EnqueueReceivedMessage(message *CapReceivedMessage) error

	UpdatePublishedMessage(message *CapPublishedMessage) error

	UpdateReceivedMessage(message *CapReceivedMessage) error

	Dispose()
}

type InfiniteRetryProcessor

type InfiniteRetryProcessor struct {
	InnerProcessor IProcessor
	Status         string
	// contains filtered or unexported fields
}

InfiniteRetryProcessor bla.

func NewInfiniteRetryProcessor

func NewInfiniteRetryProcessor(innerProcessor IProcessor) *InfiniteRetryProcessor

NewInfiniteRetryProcessor bla.

func (InfiniteRetryProcessor) Process

func (processor InfiniteRetryProcessor) Process(context *ProcessingContext)

Process bla.

type LogDelegate

type LogDelegate struct {
	// Log ...
	Log func(level LogLevel, message string, data interface{})
}

LogDelegate ...

type LogLevel

type LogLevel int
const (
	LevelTrace LogLevel = iota
	LevelDebug
	LevelInfomation
	LevelWarn
	LevelError
)

func (LogLevel) GetName

func (level LogLevel) GetName() string

GetName ...

type LoggerFactory

type LoggerFactory struct {
	// contains filtered or unexported fields
}

LoggerFactory ...

func GetLoggerFactory

func GetLoggerFactory() *LoggerFactory

GetLoggerFactory ...

func (*LoggerFactory) CreateLogger

func (factory *LoggerFactory) CreateLogger(i interface{}) ILogger

CreateLogger ...

func (*LoggerFactory) Register

func (factory *LoggerFactory) Register(delegate *LogDelegate)

Register ...

type Looper

type Looper struct {
}

func NewLooper

func NewLooper() *Looper

func (*Looper) While

func (this *Looper) While(predicate func() bool, body func() error) error

type MessageContext

type MessageContext struct {
	Group   string
	Name    string
	Content string
	Tag     uint64
}

type MessageDescriptor

type MessageDescriptor struct {
	Name    string
	Content interface{}
}

MessageDescriptor ...

type ProcessResult

type ProcessResult struct {
	Status       string
	PollingDelay time.Duration
}

ProcessResult bla.

func ProcessSleeping

func ProcessSleeping(pollingDelay time.Duration) *ProcessResult

ProcessSleeping bla.

func ProcessStoped

func ProcessStoped() *ProcessResult

ProcessStoped bla.

type ProcessingContext

type ProcessingContext struct {
	IsStopping bool
}

ProcessingContext bla.

func NewProcessingContext

func NewProcessingContext() *ProcessingContext

NewProcessingContext bla.

func (*ProcessingContext) Start

func (context *ProcessingContext) Start()

Start bla.

func (*ProcessingContext) Stop

func (context *ProcessingContext) Stop()

Stop bla.

func (*ProcessingContext) ThrowIfStopping

func (context *ProcessingContext) ThrowIfStopping() error

ThrowIfStopping bla.

func (*ProcessingContext) Wait

func (context *ProcessingContext) Wait(d time.Duration)

Wait bla.

type ProcessingState

type ProcessingState struct {
	IState
}

func NewProcessingState

func NewProcessingState() *ProcessingState

func (*ProcessingState) ApplyPublishedMessage

func (this *ProcessingState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error

func (*ProcessingState) ApplyReceivedMessage

func (this *ProcessingState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error

func (*ProcessingState) GetExpiresAfter

func (this *ProcessingState) GetExpiresAfter() int32

func (*ProcessingState) GetName

func (this *ProcessingState) GetName() string

type ProcessorContainer

type ProcessorContainer struct {
	Processors []IProcessor
}

ProcessorContainer bla.

func NewProcessorContainer

func NewProcessorContainer() *ProcessorContainer

NewProcessorContainer bla.

func (*ProcessorContainer) GetProcessors

func (container *ProcessorContainer) GetProcessors() []*InfiniteRetryProcessor

GetProcessors bla.

func (*ProcessorContainer) Register

func (container *ProcessorContainer) Register(processor IProcessor) *ProcessorContainer

Register bla.

type ProcessorServer

type ProcessorServer struct {
	Container  *ProcessorContainer
	Context    *ProcessingContext
	Processors []*InfiniteRetryProcessor
}

ProcessorServer bla.

func (*ProcessorServer) Start

func (server *ProcessorServer) Start()

Start bla.

func (*ProcessorServer) StopTheWorld

func (server *ProcessorServer) StopTheWorld() chan bool

StopTheWorld ...

func (*ProcessorServer) WaitForClose

func (server *ProcessorServer) WaitForClose(wg *sync.WaitGroup)

WaitForClose bla.

type PublishQueueExecutorCreateDelegate

type PublishQueueExecutorCreateDelegate func() IQueueExecutor

type PublishQueuer

type PublishQueuer struct {
	StateChanger             IStateChanger
	Options                  *CapOptions
	StorageConnectionFactory *StorageConnectionFactory
	// contains filtered or unexported fields
}

PublishQueuer blablabla .

func (*PublishQueuer) Process

func (processor *PublishQueuer) Process(context *ProcessingContext) (*ProcessResult, error)

Process blablabla.

type PublisherFactory

type PublisherFactory struct {
	CreatePublisher func(options *CapOptions) (IPublisher, error)
}

func NewPublisherFactory

func NewPublisherFactory(createPublisher func(options *CapOptions) (IPublisher, error)) *PublisherFactory

type QueueExecutorFactory

type QueueExecutorFactory struct {
	IQueueExecutorFactory
	PublishQueueExecutorCreateDelegate PublishQueueExecutorCreateDelegate
	Register                           *CallbackRegister
}

func NewQueueExecutorFactory

func NewQueueExecutorFactory(register *CallbackRegister) *QueueExecutorFactory

func (*QueueExecutorFactory) GetInstance

func (this *QueueExecutorFactory) GetInstance(messageType string) IQueueExecutor

func (*QueueExecutorFactory) GetPublishQueueExecutorCreateDelegate

func (this *QueueExecutorFactory) GetPublishQueueExecutorCreateDelegate() PublishQueueExecutorCreateDelegate

func (*QueueExecutorFactory) SetPublishQueueExecutorCreateDelegate

func (this *QueueExecutorFactory) SetPublishQueueExecutorCreateDelegate(delegate PublishQueueExecutorCreateDelegate)

type QueueExecutorPublish

type QueueExecutorPublish struct {
	IQueueExecutor
	StateChanger    IStateChanger
	PublishDelegate IPublishDelegate
	// contains filtered or unexported fields
}

QueueExecutorPublish ...

func NewQueueExecutorPublish

func NewQueueExecutorPublish(stateChanger IStateChanger, delegate IPublishDelegate) *QueueExecutorPublish

NewQueueExecutorPublish ...

func (*QueueExecutorPublish) Execute

func (executor *QueueExecutorPublish) Execute(connection IStorageConnection, feched IFetchedMessage) error

Execute ...

func (*QueueExecutorPublish) UpdateMessageForRetry

func (executor *QueueExecutorPublish) UpdateMessageForRetry(message *CapPublishedMessage, connection IStorageConnection) (bool, error)

UpdateMessageForRetry ...

type QueueExecutorSubscribe

type QueueExecutorSubscribe struct {
	IQueueExecutor
	Register *CallbackRegister
	// contains filtered or unexported fields
}

QueueExecutorSubscribe ...

func NewQueueExecutorSubscribe

func NewQueueExecutorSubscribe(register *CallbackRegister) *QueueExecutorSubscribe

NewQueueExecutorSubscribe ..

func (*QueueExecutorSubscribe) Execute

func (executor *QueueExecutorSubscribe) Execute(connection IStorageConnection, feched IFetchedMessage) error

Execute ...

type ReceiveHanlder

type ReceiveHanlder func(ctx MessageContext)

type RetryBehavior

type RetryBehavior struct {
	RetryInThunk RetryInThunk
	Retry        bool
	RetryCount   int32
}

func NewRetryBehavior

func NewRetryBehavior(retry bool, count int32, thunk RetryInThunk) *RetryBehavior

func (*RetryBehavior) RetryIn

func (this *RetryBehavior) RetryIn(reties int32) int32

type RetryInThunk

type RetryInThunk func(retries int32) int32

type ScheduledState

type ScheduledState struct {
	IState
}

func NewScheduledState

func NewScheduledState() *ScheduledState

func (*ScheduledState) ApplyPublishedMessage

func (this *ScheduledState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error

func (*ScheduledState) ApplyReceivedMessage

func (this *ScheduledState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error

func (*ScheduledState) GetExpiresAfter

func (this *ScheduledState) GetExpiresAfter() int32

func (*ScheduledState) GetName

func (this *ScheduledState) GetName() string

type StateChanger

type StateChanger struct {
}

StateChanger ..

func (*StateChanger) ChangePublishedMessage

func (stateChanger *StateChanger) ChangePublishedMessage(message *CapPublishedMessage, state IState, transaction IStorageTransaction) error

ChangePublishedMessage ...

func (*StateChanger) ChangeReceivedMessageState

func (stateChanger *StateChanger) ChangeReceivedMessageState(message *CapReceivedMessage, state IState, transaction IStorageTransaction) error

ChangeReceivedMessageState ...

type StorageConnectionFactory

type StorageConnectionFactory struct {
	CreateStorageConnection func(options *CapOptions) (IStorageConnection, error)
}

func NewStorageConnectionFactory

func NewStorageConnectionFactory(delegate func(options *CapOptions) (IStorageConnection, error)) *StorageConnectionFactory

type SubscribeQueuer

type SubscribeQueuer struct {
	Options                  *CapOptions
	StorageConnectionFactory *StorageConnectionFactory
	// contains filtered or unexported fields
}

SubscribeQueuer ...

func (*SubscribeQueuer) Process

func (processor *SubscribeQueuer) Process(context *ProcessingContext) (*ProcessResult, error)

Process ...

type SucceededState

type SucceededState struct {
	IState
}

func NewSucceededState

func NewSucceededState() *SucceededState

func (*SucceededState) ApplyPublishedMessage

func (this *SucceededState) ApplyPublishedMessage(message *CapPublishedMessage, transaction IStorageTransaction) error

func (*SucceededState) ApplyReceivedMessage

func (this *SucceededState) ApplyReceivedMessage(message *CapReceivedMessage, transaction IStorageTransaction) error

func (*SucceededState) GetExpiresAfter

func (this *SucceededState) GetExpiresAfter() int32

func (*SucceededState) GetName

func (this *SucceededState) GetName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL