projector

package
v0.0.0-...-71d9aaf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2019 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UPDATE_STATS_MAP byte = iota + 1
	STATS_LOG_INTERVAL_UPDATE
	VBSEQNOS_LOG_INTERVAL_UPDATE
)
View Source
const MAX_CINFO_CACHES_RETRIES = 100

Variables

This section is empty.

Functions

func Accmulate

func Accmulate(wrkr []interface{}) string

func BucketSeqnosLocal

func BucketSeqnosLocal(cluster, pooln, bucketn, kvaddr string) (l_seqnos []uint64, err error)

BucketSeqnosLocal return list of {{vbno,seqno}..} for vbuckets belonging to the KV node where this projector is running. this call might fail due to,

  • concurrent access that can preserve a deleted/failed bucket object.
  • pollForDeletedBuckets() did not get a chance to cleanup a deleted bucket.

in both the cases if the call is retried it should get fixed, provided a valid bucket exists.

func BucketSeqsTiming

func BucketSeqsTiming(bucket string) *stats.TimingStat

func CollectSeqnos

func CollectSeqnos(kvfeeds map[string]*kvConn) (l_seqnos []uint64, err error)

func FeedConfigParams

func FeedConfigParams() []string

FeedConfigParams return the list of configuration params supported by a feed.

func GetHostAddress

func GetHostAddress() string

func NewFakeBuckets

func NewFakeBuckets(buckets []string) map[string]*FakeBucket

NewFakeBuckets returns a reference to new FakeBucket.

func NewStatsManager

func NewStatsManager(cmdCh chan []interface{}, stopCh chan bool, config common.Config) *statsManager

func ResetBucketSeqnos

func ResetBucketSeqnos() error

Types

type BucketAccess

type BucketAccess interface {
	// Refresh bucket meta information like vbmap
	Refresh() error

	// GetVBmap returns a map of `kvaddr` to list of vbuckets hosted in a kv
	// node.
	GetVBmap(kvaddrs []string) (map[string][]uint16, error)

	// FailoverLog fetch the failover log for specified vbucket
	GetFailoverLogs(
		opaque uint16,
		vbuckets []uint16,
		config map[string]interface{}) (couchbase.FailoverLog, error)

	// Close this bucket.
	Close()
}

BucketAccess interface manage a subset of vbucket streams with mutiple KV nodes. To be implemented by couchbase.Bucket type.

type BucketFeeder

type BucketFeeder interface {
	// GetChannel return a mutation channel.
	GetChannel() (mutch <-chan *mc.DcpEvent)

	// StartVbStreams starts a set of vbucket streams on this feed.
	// returns list of vbuckets for which StreamRequest is successfully
	// posted.
	StartVbStreams(opaque uint16, ts *protobuf.TsVbuuid) error

	// EndVbStreams ends an existing vbucket stream from this feed.
	EndVbStreams(opaque uint16, endTs *protobuf.TsVbuuid) error

	// CloseFeed ends all active streams on this feed and free its resources.
	CloseFeed() (err error)

	// GetStats retrieves the pointer to stats objects from all DCP feeds
	// along with the bucket to which the DCP feeds belong to
	GetStats() map[string]interface{}
}

BucketFeeder interface from a BucketAccess object.

func OpenBucketFeed

func OpenBucketFeed(
	feedname couchbase.DcpFeedName,
	b *couchbase.Bucket,
	opaque uint16,
	kvaddrs []string,
	config map[string]interface{}) (feeder BucketFeeder, err error)

OpenBucketFeed opens feed for bucket.

type BucketStats

type BucketStats struct {
	// contains filtered or unexported fields
}

func (*BucketStats) Init

func (bs *BucketStats) Init()

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is immutable structure defined for each index, or any other entity that wants projection and routing over kv-mutations.

func NewEngine

func NewEngine(uuid uint64, evaluator c.Evaluator, router c.Router) *Engine

NewEngine creates a new engine instance for `uuid`.

func (*Engine) Bucket

func (engine *Engine) Bucket() string

Get name of the bucket

func (*Engine) Endpoints

func (engine *Engine) Endpoints() []string

Endpoints hosting this engine.

func (*Engine) GetEvaluatorStats

func (engine *Engine) GetEvaluatorStats() interface{}

GetEvaluatorStats returns the pointer to the stats object for this engine

func (*Engine) GetIndexName

func (engine *Engine) GetIndexName() string

Get name of the index

func (*Engine) SnapshotData

func (engine *Engine) SnapshotData(
	m *mc.DcpEvent, vbno uint16, vbuuid,
	seqno uint64, opaque2 uint64) interface{}

SnapshotData from this engine.

func (*Engine) StreamBeginData

func (engine *Engine) StreamBeginData(
	vbno uint16, vbuuid, seqno uint64, status byte,
	code byte, opaque2 uint64) interface{}

StreamBeginData from this engine.

func (*Engine) StreamEndData

func (engine *Engine) StreamEndData(
	vbno uint16, vbuuid, seqno uint64, opaque2 uint64) interface{}

StreamEndData from this engine.

func (*Engine) SyncData

func (engine *Engine) SyncData(
	vbno uint16, vbuuid, seqno uint64, opaque2 uint64) interface{}

SyncData from this engine.

func (*Engine) TransformRoute

func (engine *Engine) TransformRoute(
	vbuuid uint64, m *mc.DcpEvent, data map[string]interface{}, encodeBuf []byte,
	docval qvalue.AnnotatedValue, context qexpr.Context, meta map[string]interface{},
	numIndexes int, opaque2 uint64) ([]byte, error)

TransformRoute data to endpoints.

type FakeBucket

type FakeBucket struct {
	C chan *mc.DcpEvent
	// contains filtered or unexported fields
}

FakeBucket fot unit testing.

func (*FakeBucket) Close

func (b *FakeBucket) Close(kvaddr string)

Close is method receiver for BucketAccess interface

func (*FakeBucket) CloseFeed

func (b *FakeBucket) CloseFeed() (err error)

CloseFeed is method receiver for BucketFeeder interface

func (*FakeBucket) EndVbStreams

func (b *FakeBucket) EndVbStreams(
	opaque uint16, ts *protobuf.TsVbuuid) (err error)

EndVbStreams is method receiver for BucketFeeder interface

func (*FakeBucket) GetChannel

func (b *FakeBucket) GetChannel() <-chan *mc.DcpEvent

GetChannel is method receiver for BucketFeeder interface

func (*FakeBucket) GetFailoverLogs

func (b *FakeBucket) GetFailoverLogs(
	opaque uint16,
	vbnos []uint16,
	conf map[string]interface{}) (couchbase.FailoverLog, error)

GetFailoverLogs is method receiver for BucketAccess interface

func (*FakeBucket) GetStats

func (b *FakeBucket) GetStats() map[string]interface{}

GetStats is method receiver for BucketFeeder interface

func (*FakeBucket) GetVBmap

func (b *FakeBucket) GetVBmap(kvaddrs []string) (map[string][]uint16, error)

GetVBmap is method receiver for BucketAccess interface

func (*FakeBucket) OpenKVFeed

func (b *FakeBucket) OpenKVFeed(kvaddr string) (BucketFeeder, error)

OpenKVFeed is method receiver for BucketAccess interface

func (*FakeBucket) SetFailoverLog

func (b *FakeBucket) SetFailoverLog(vbno uint16, flog [][2]uint64)

SetFailoverLog fake initialization method.

func (*FakeBucket) SetVbmap

func (b *FakeBucket) SetVbmap(kvaddr string, vbnos []uint16)

SetVbmap fake initialization method.

func (*FakeBucket) StartVbStreams

func (b *FakeBucket) StartVbStreams(
	opaque uint16, ts *protobuf.TsVbuuid) (err error)

StartVbStreams is method receiver for BucketFeeder interface

type FakeStream

type FakeStream struct {
	// contains filtered or unexported fields
}

FakeStream fot unit testing.

type Feed

type Feed struct {
	// contains filtered or unexported fields
}

Feed is mutation stream - for maintenance, initial-load, catchup etc...

func NewFeed

func NewFeed(
	pooln, topic string,
	projector *Projector,
	config c.Config, opaque uint16,
	async bool) (*Feed, error)

NewFeed creates a new topic feed. `config` contains following keys.

clusterAddr: KV cluster address <host:port>.
feedWaitStreamReqTimeout: wait for a response to StreamRequest
feedWaitStreamEndTimeout: wait for a response to StreamEnd
feedChanSize: channel size for feed's control path and back path
mutationChanSize: channel size of projector's data path routine
syncTimeout: timeout, in ms, for sending periodic Sync messages
routerEndpointFactory: endpoint factory

func (*Feed) AddBuckets

func (feed *Feed) AddBuckets(
	req *protobuf.AddBucketsRequest,
	opaque uint16) (*protobuf.TopicResponse, error)

AddBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.

func (*Feed) AddInstances

func (feed *Feed) AddInstances(
	req *protobuf.AddInstancesRequest,
	opaque uint16) (*protobuf.TimestampResponse, error)

AddInstances will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) DelBuckets

func (feed *Feed) DelBuckets(
	req *protobuf.DelBucketsRequest, opaque uint16) error

DelBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.

func (*Feed) DelInstances

func (feed *Feed) DelInstances(
	req *protobuf.DelInstancesRequest, opaque uint16) error

DelInstances will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) DeleteEndpoint

func (feed *Feed) DeleteEndpoint(raddr string) error

DeleteEndpoint will delete the specified endpoint address from feed.

func (*Feed) GetOpaque

func (feed *Feed) GetOpaque() uint16

GetOpaque return the opaque id that created this feed.

func (*Feed) GetStatistics

func (feed *Feed) GetStatistics() c.Statistics

GetStatistics for this feed. Synchronous call.

func (*Feed) GetStats

func (feed *Feed) GetStats() *FeedStats

Return pointers to the stats objects for this feed. Synchronous call.

func (*Feed) GetTopicResponse

func (feed *Feed) GetTopicResponse() *protobuf.TopicResponse

GetTopicResponse for this feed. Synchronous call.

func (*Feed) MutationTopic

func (feed *Feed) MutationTopic(
	req *protobuf.MutationTopicRequest, opaque uint16) (*protobuf.TopicResponse, error)

MutationTopic will start the feed. Synchronous call.

func (*Feed) Ping

func (feed *Feed) Ping() error

Ping whether the feed is active or not.

func (*Feed) PostFinKVdata

func (feed *Feed) PostFinKVdata(bucket string, kvdataUUID uint64)

PostFinKVdata feedback from data-path. Asynchronous call.

func (*Feed) PostStreamEnd

func (feed *Feed) PostStreamEnd(bucket string, m *mc.DcpEvent, kvdataUUID uint64)

PostStreamEnd feedback from data-path. Asynchronous call.

func (*Feed) PostStreamRequest

func (feed *Feed) PostStreamRequest(bucket string, m *mc.DcpEvent, kvdataUUID uint64)

PostStreamRequest feedback from data-path. Asynchronous call.

func (*Feed) RepairEndpoints

func (feed *Feed) RepairEndpoints(
	req *protobuf.RepairEndpointsRequest, opaque uint16) error

RepairEndpoints will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) ResetConfig

func (feed *Feed) ResetConfig(config c.Config) error

ResetConfig for this feed.

func (*Feed) RestartVbuckets

func (feed *Feed) RestartVbuckets(
	req *protobuf.RestartVbucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)

RestartVbuckets will restart upstream vbuckets for specified buckets. Synchronous call.

func (*Feed) Shutdown

func (feed *Feed) Shutdown(opaque uint16) error

Shutdown feed, its upstream connection with kv and downstream endpoints. Synchronous call.

func (*Feed) ShutdownVbuckets

func (feed *Feed) ShutdownVbuckets(
	req *protobuf.ShutdownVbucketsRequest, opaque uint16) error

ShutdownVbuckets will shutdown streams for specified buckets. Synchronous call.

func (*Feed) StaleCheck

func (feed *Feed) StaleCheck(staleTimeout int) (string, error)

StaleCheck will check for feed sanity and return "exit" if feed has was already stale and still stale. Synchronous call.

type FeedStats

type FeedStats struct {
	// contains filtered or unexported fields
}

func (*FeedStats) Init

func (fs *FeedStats) Init()

type KVData

type KVData struct {
	// contains filtered or unexported fields
}

KVData captures an instance of data-path for single kv-node from upstream connection.

func NewKVData

func NewKVData(
	feed *Feed, bucket string,
	opaque uint16,
	reqTs *protobuf.TsVbuuid,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint,
	mutch <-chan *mc.DcpEvent,
	kvaddr string,
	config c.Config,
	async bool,
	opaque2 uint64) (*KVData, error)

NewKVData create a new data-path instance.

func (*KVData) AddEngines

func (kvdata *KVData) AddEngines(
	opaque uint16,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)

AddEngines and endpoints, synchronous call.

func (*KVData) Close

func (kvdata *KVData) Close() error

Close kvdata kv data path, synchronous call.

func (*KVData) DeleteEngines

func (kvdata *KVData) DeleteEngines(opaque uint16, engineKeys []uint64) error

DeleteEngines synchronous call.

func (*KVData) GetKVStats

func (kvdata *KVData) GetKVStats() map[string]interface{}

func (*KVData) GetStatistics

func (kvdata *KVData) GetStatistics() map[string]interface{}

GetStatistics from kv data path, synchronous call.

func (*KVData) GetWorkerStats

func (kvdata *KVData) GetWorkerStats() map[string][]interface{}

func (*KVData) ReloadHeartbeat

func (kvdata *KVData) ReloadHeartbeat() error

ReloadHeartbeat for kvdata.

func (*KVData) ResetConfig

func (kvdata *KVData) ResetConfig(config c.Config) error

ResetConfig for kvdata.

func (*KVData) UpdateTs

func (kvdata *KVData) UpdateTs(opaque uint16, ts *protobuf.TsVbuuid) error

UpdateTs with new set of {vbno,seqno}, synchronous call.

type KvdataStats

type KvdataStats struct {
	// contains filtered or unexported fields
}

func (*KvdataStats) Init

func (kvstats *KvdataStats) Init(numVbuckets int, kvdata *KVData)

func (*KvdataStats) IsClosed

func (kvstats *KvdataStats) IsClosed() bool

func (*KvdataStats) String

func (stats *KvdataStats) String() (string, string)

type Projector

type Projector struct {
	// contains filtered or unexported fields
}

Projector data structure, a projector is connected to one or more upstream kv-nodes. Works in tandem with projector's adminport.

func NewProjector

func NewProjector(maxvbs int, config c.Config, certFile string, keyFile string) *Projector

NewProjector creates a news projector instance and starts a corresponding adminport.

func (*Projector) AddFeed

func (p *Projector) AddFeed(topic string, feed *Feed) (err error)

AddFeed object for `topic`. - return ErrorTopicExist if topic is duplicate.

func (*Projector) DelFeed

func (p *Projector) DelFeed(topic string) (err error)

DelFeed object for `topic`. - return ErrorTopicMissing if topic is not started.

func (*Projector) GetConfig

func (p *Projector) GetConfig() c.Config

GetConfig returns the config object from projector.

func (*Projector) GetFeed

func (p *Projector) GetFeed(topic string) (*Feed, error)

GetFeed object for `topic`. - return ErrorTopicMissing if topic is not started.

func (*Projector) GetFeedConfig

func (p *Projector) GetFeedConfig() c.Config

GetFeedConfig from current configuration settings.

func (*Projector) GetFeeds

func (p *Projector) GetFeeds() []*Feed

GetFeeds return a list of all feeds.

func (*Projector) ResetConfig

func (p *Projector) ResetConfig(config c.Config)

ResetConfig accepts a full-set or subset of global configuration and updates projector related fields.

func (*Projector) UpdateStats

func (p *Projector) UpdateStats(topic string, feed *Feed)

func (*Projector) UpdateStatsMgr

func (p *Projector) UpdateStatsMgr(clone *ProjectorStats)

type ProjectorStats

type ProjectorStats struct {
	// contains filtered or unexported fields
}

func NewProjectorStats

func NewProjectorStats() *ProjectorStats

func (*ProjectorStats) Clone

func (ps *ProjectorStats) Clone() *ProjectorStats

func (*ProjectorStats) Init

func (ps *ProjectorStats) Init()

type ProjectorStatsHolder

type ProjectorStatsHolder struct {
	// contains filtered or unexported fields
}

func (ProjectorStatsHolder) Get

func (*ProjectorStatsHolder) Set

type Subscriber

type Subscriber interface {
	// GetEvaluators will return a map of uuid to Evaluator interface.
	// - return ErrorInconsistentFeed for malformed tables.
	GetEvaluators() (map[uint64]c.Evaluator, error)

	// GetRouters will return a map of uuid to Router interface.
	// - return ErrorInconsistentFeed for malformed tables.
	GetRouters() (map[uint64]c.Router, error)
}

Subscriber interface abstracts engines (aka instances) that can supply `evaluators`, to transform mutations into custom-messages, and `routers`, to supply distribution topology for custom-messages.

type Vbucket

type Vbucket struct {
	// contains filtered or unexported fields
}

Vbucket is immutable structure defined for each vbucket.

func NewVbucket

func NewVbucket(
	cluster, topic, bucket string, opaque, vbno uint16,
	vbuuid, startSeqno uint64, config c.Config, opaque2 uint64) *Vbucket

NewVbucket creates a new routine to handle this vbucket stream.

type VbucketWorker

type VbucketWorker struct {
	// contains filtered or unexported fields
}

VbucketWorker is immutable structure defined for each vbucket.

func NewVbucketWorker

func NewVbucketWorker(
	id int, feed *Feed, bucket string,
	opaque uint16, config c.Config, opaque2 uint64) *VbucketWorker

NewVbucketWorker creates a new routine to handle this vbucket stream.

func (*VbucketWorker) AddEngines

func (worker *VbucketWorker) AddEngines(
	opaque uint16,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)

AddEngines update active set of engines and endpoints, synchronous call.

func (*VbucketWorker) Close

func (worker *VbucketWorker) Close() error

Close worker-routine, synchronous call.

func (*VbucketWorker) DeleteEngines

func (worker *VbucketWorker) DeleteEngines(
	opaque uint16, engines []uint64) error

DeleteEngines delete engines and update endpoints synchronous call.

func (*VbucketWorker) Event

func (worker *VbucketWorker) Event(m *mc.DcpEvent) error

Event will post an DcpEvent, asychronous call.

func (*VbucketWorker) GetStatistics

func (worker *VbucketWorker) GetStatistics() (map[string]interface{}, error)

GetStatistics for worker vbucket, synchronous call.

func (*VbucketWorker) GetVbuckets

func (worker *VbucketWorker) GetVbuckets() ([]*Vbucket, error)

GetVbuckets will return the list of active vbuckets managed by this workers.

func (*VbucketWorker) ResetConfig

func (worker *VbucketWorker) ResetConfig(config c.Config) error

ResetConfig for worker-routine, synchronous call.

func (*VbucketWorker) SyncPulse

func (worker *VbucketWorker) SyncPulse() error

SyncPulse will trigger worker to generate a sync pulse for all its vbuckets, asychronous call.

type WorkerStats

type WorkerStats struct {
	// contains filtered or unexported fields
}

func (*WorkerStats) Init

func (stats *WorkerStats) Init()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL