Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumeConcurrentlyContext ¶
type ConsumeConcurrentlyContext struct {
MessageQueue *message.MessageQueue
// 消费失败延迟消费级别
DelayLevelWhenNextConsume int
AckIndex int
}
func NewConsumeConcurrentlyContext ¶
func NewConsumeConcurrentlyContext(mq *message.MessageQueue) *ConsumeConcurrentlyContext
type MQConsumerInner ¶
type MQConsumerInner interface {
// Set<SubscriptionData>
Subscriptions() set.Set
// Set<MessageQueue>
UpdateTopicSubscribeInfo(topic string, info set.Set)
// 组名称
GroupName() string
// 消息类型
MessageModel() heartbeat.MessageModel
// 消费类型
ConsumeType() heartbeat.ConsumeType
// 消费位置
ConsumeFromWhere() heartbeat.ConsumeFromWhere
IsUnitMode() bool
// 是否需要更新
IsSubscribeTopicNeedUpdate(topic string) bool
// 持久化offset
PersistConsumerOffset()
// 负载
DoRebalance()
}
type MQPullConsumer ¶
type MQPullConsumer interface {
// 开启
Start()
// 关闭
Shutdown()
}
type MQPushConsumer ¶
type MQPushConsumer interface {
// 开启
Start()
// 关闭
Shutdown()
}
type MessageListenerConcurrently ¶
type MessageListenerConcurrently interface {
ConsumeMessage(msgs []*message.MessageExt, context *ConsumeConcurrentlyContext) listener.ConsumeConcurrentlyStatus
}
type ProcessQueue ¶
type ProcessQueue struct {
Dropped bool
LastPullTimestamp int64
PullMaxIdleTime int64
MsgCount int64
MsgTreeMap *TreeMap
QueueOffsetMax int64
Consuming bool
MsgAccCnt int64
// contains filtered or unexported fields
}
func NewProcessQueue ¶
func NewProcessQueue() *ProcessQueue
func (*ProcessQueue) GetMaxSpan ¶
func (pq *ProcessQueue) GetMaxSpan() int64
func (*ProcessQueue) IsPullExpired ¶
func (pq *ProcessQueue) IsPullExpired() bool
func (*ProcessQueue) PutMessage ¶
func (pq *ProcessQueue) PutMessage(msgs []*message.MessageExt) bool
func (*ProcessQueue) RemoveMessage ¶
func (pq *ProcessQueue) RemoveMessage(msgs []*message.MessageExt) int64
func (*ProcessQueue) ToString ¶
func (pq *ProcessQueue) ToString() string
type PullRequest ¶
type PullRequest struct {
ConsumerGroup string
MessageQueue *message.MessageQueue
ProcessQueue *ProcessQueue
NextOffset int64
}
type PullResult ¶
type PullResult struct {
PullStatus PullStatus
NextBeginOffset int64
MinOffset int64
MaxOffset int64
MsgFoundList []*message.MessageExt
}
type PullStatus ¶
type PullStatus int
const ( // Founded FOUND PullStatus = iota // No new message can be pull NO_NEW_MSG // Filtering results can not match NO_MATCHED_MSG // Illegal offset,may be too big or too small OFFSET_ILLEGAL )
func (PullStatus) String ¶
func (status PullStatus) String() string
Source Files
¶
- consume_concurrently_context.go
- message_listener_concurrently.go
- mq_consumer_inner.go
- mq_pull_consumer.go
- mq_push_consumer.go
- process_queue.go
- pull_request.go
- pull_result.go
- pull_status.go
Click to show internal directories.
Click to hide internal directories.