This repository contains go implementation of a Couchbase Database Change Protocol (DCP) client.
- Our main goal is to build a dcp client for faster and stateful systems. We're already using this repository in below implementations:
package main
import (
"github.com/Trendyol/go-dcp"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/models"
)
func listener(ctx *models.ListenerContext) {
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Info(
"mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
)
case models.DcpDeletion:
logger.Log.Info(
"deleted(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
case models.DcpExpiration:
logger.Log.Info(
"expired(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
}
ctx.Ack()
}
func main() {
connector, err := dcp.NewDcp("config.yml", listener)
if err != nil {
panic(err)
}
defer connector.Close()
connector.Start()
}
$ go get github.com/Trendyol/go-dcp
Variable | Type | Required | Default | Description |
---|---|---|---|---|
hosts |
[]string | yes | - | Couchbase host like localhost:8091 . |
username |
string | yes | - | Couchbase username. |
password |
string | yes | - | Couchbase password. |
bucketName |
string | yes | - | Couchbase DCP bucket. |
dcp.group.name |
string | yes | DCP group name for vbuckets. | |
scopeName |
string | no | _default | Couchbase scope name. |
collectionNames |
[]string | no | _default | Couchbase collection names. |
connectionBufferSize |
uint, string | no | 20mb | Source Bucket tcp connection buffer size (x Node Count). Check this if you get OOM Killed. |
maxQueueSize |
int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full. |
connectionTimeout |
time.Duration | no | 1m | Couchbase connection timeout. |
secureConnection |
bool | no | false | Enable TLS connection of Couchbase. |
rootCAPath |
string | no | *not set | if secureConnection set true this field is required. |
debug |
bool | no | false | For debugging purpose. |
dcp.bufferSize |
int | no | 16mb | DCP internal queue buffer size (x Node Count). Check this if you get OOM Killed. |
dcp.connectionBufferSize |
uint, string | no | 20mb | DCP tcp connection buffer size (x Node Count). Check this if you get OOM Killed. |
dcp.connectionTimeout |
time.Duration | no | 1m | DCP connection timeout. |
dcp.maxQueueSize |
int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full. |
dcp.listener.skipUntil |
time.Time | no | Set this if you want to skip events until certain time. | |
dcp.group.membership.type |
string | no | DCP membership types. couchbase , kubernetesHa , kubernetesStatefulSet , static or dynamic . Check examples for details. |
|
dcp.group.membership.memberNumber |
int | no | 1 | Set this if membership is static . Other methods will ignore this field. |
dcp.group.membership.totalMembers |
int | no | 1 | Set this if membership is static or kubernetesStatefulSet . Other methods will ignore this field. |
dcp.group.membership.rebalanceDelay |
time.Duration | no | 30s | Works for autonomous mode. If membership is dynamic , it is ignored and set to 0s . |
dcp.group.membership.config |
map[string]string | no | *not set | Set key-values of config. expirySeconds ,heartbeatInterval ,heartbeatToleranceDuration ,monitorInterval ,timeout for couchbase type |
dcp.config.disableChangeStreams |
bool | no | false | Set this to true if you did not want to get older versions of changes for Couchbase Server 7.2.0+ using Magma storage buckets |
leaderElection.enabled |
bool | no | false | Set this true for memberships kubernetesHa . |
leaderElection.type |
string | no | kubernetes | Leader Election types. kubernetes |
leaderElection.config |
map[string]string | no | *not set | Set key-values of config. leaseLockName ,leaseLockNamespace , leaseDuration , renewDeadline , retryPeriod for kubernetes type. |
leaderElection.rpc.port |
int | no | 8081 | This field is usable for kubernetesStatefulSet membership. |
checkpoint.type |
string | no | auto | Set checkpoint type auto or manual . |
checkpoint.autoReset |
string | no | earliest | Set checkpoint start point to earliest or latest . |
checkpoint.interval |
time.Duration | no | 1m | Checkpoint checking interval. |
checkpoint.timeout |
time.Duration | no | 1m | Checkpoint checking timeout. |
healthCheck.disabled |
bool | no | false | Disable Couchbase connection health check. |
healthCheck.interval |
time.Duration | no | 1m | Couchbase connection health checking interval duration. |
healthCheck.timeout |
time.Duration | no | 1m | Couchbase connection health checking timeout duration. |
rollbackMitigation.disabled |
bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. |
rollbackMitigation.interval |
time.Duration | no | 1s | Persisted sequence numbers polling interval. |
rollbackMitigation.configWatchInterval |
time.Duration | no | 10s | Cluster config changes listener interval. |
metadata.type |
string | no | couchbase | Metadata storing types. file or couchbase . |
metadata.readOnly |
bool | no | false | Set this for debugging state purposes. |
metadata.config |
map[string]string | no | *not set | Set key-values of config. hosts , username , password , bucket ,scope ,collection ,maxQueueSize ,connectionBufferSize 5mb is default (x Node Count),connectionTimeout , secureConnection , rootCAPath for couchbase type |
api.disabled |
bool | no | false | Disable metric endpoints |
api.port |
int | no | 8080 | Set API port |
metric.path |
string | no | /metrics | Set metric endpoint path. |
logging.level |
string | no | info | Set logging level. |
These environment variables will overwrite the corresponding configs.
Variable | Type | Corresponding Config | Description |
---|---|---|---|
GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER |
int | dcp.group.membership.memberNumber | To be able to prevent making deployment to scale up or down. |
GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS |
int | dcp.group.membership.totalMembers | To be able to prevent making deployment to scale up or down. |
The client offers an API that handles different endpoints and expose several metrics.
Endpoint | Description | Debug Mode | Body |
---|---|---|---|
GET /status |
Returns a 200 OK status if the client is able to ping the couchbase server successfully. | ||
GET /rebalance |
Triggers a rebalance operation for the vBuckets. | ||
GET /states/offset |
Returns the current offsets for each vBucket. | x | |
GET /states/followers |
Returns the list of follower clients if service discovery enabled | x | |
GET /debug/pprof/* |
Fiber Pprof | x | |
PUT /membership/info |
Updates membership info and applies rebalance. | {"memberNumber": 1,"totalMembers": 3 } |
The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.
Metric Name | Description | Labels | Value Type |
---|---|---|---|
cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter |
cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter |
cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter |
cbgo_agent_queue_current | The current number of agent queue | address: Couchbase, is dcp: Is Dcp Agent | Gauge |
cbgo_agent_queue_max | The max number of agent queue | address: Couchbase, is dcp: Is Dcp Agent | Gauge |
cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge |
cbgo_total_lag_current | The current total lag | N/A | Gauge |
cbgo_process_latency_ms_current | The latest process latency in milliseconds | N/A | Gauge |
cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter |
cbgo_rebalance_current | The number of total rebalance | N/A | Counter |
cbgo_active_stream_current | The number of total active stream | N/A | Gauge |
cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge |
cbgo_member_number_current | The number of the current member | N/A | Gauge |
cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge |
cbgo_offset_write_current | The latest number of the offset write | N/A | Gauge |
cbgo_offset_write_latency_ms_current | The latest offset write latency in milliseconds | N/A | Gauge |
Go DCP Version | Minimum Couchbase Server Version |
---|---|
x<1.1.16 | 6.5.x |
1.1.16>=x | 5.x.x |
Date taking effect | Version | Change | How to check |
---|---|---|---|
December 14, 2023 | v1.1.19 | dcp.config.[DisableExpiryOpcode,DisableStreamEndByClient, EnableChangeStreams] removed | Review your configs |