mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-14 14:49:32 +01:00
1074 lines
29 KiB
Go
1074 lines
29 KiB
Go
// Package memcached provides a memcached binary protocol client.
|
|
package memcached
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"github.com/couchbase/gomemcached"
|
|
"github.com/couchbase/goutils/logging"
|
|
"github.com/couchbase/goutils/scramsha"
|
|
"github.com/pkg/errors"
|
|
"io"
|
|
"math"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type ClientIface interface {
|
|
Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
|
|
Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
|
|
Auth(user, pass string) (*gomemcached.MCResponse, error)
|
|
AuthList() (*gomemcached.MCResponse, error)
|
|
AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
|
|
AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
|
|
CASNext(vb uint16, k string, exp int, state *CASState) bool
|
|
CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
|
|
Close() error
|
|
Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
|
|
Del(vb uint16, key string) (*gomemcached.MCResponse, error)
|
|
EnableMutationToken() (*gomemcached.MCResponse, error)
|
|
Get(vb uint16, key string) (*gomemcached.MCResponse, error)
|
|
GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
|
|
GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
|
|
GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error
|
|
GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
|
|
GetRandomDoc() (*gomemcached.MCResponse, error)
|
|
Hijack() io.ReadWriteCloser
|
|
Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
|
|
Observe(vb uint16, key string) (result ObserveResult, err error)
|
|
ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
|
|
Receive() (*gomemcached.MCResponse, error)
|
|
ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
|
|
Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
|
|
Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
|
|
SetKeepAliveOptions(interval time.Duration)
|
|
SetReadDeadline(t time.Time)
|
|
SetDeadline(t time.Time)
|
|
SelectBucket(bucket string) (*gomemcached.MCResponse, error)
|
|
SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
|
|
Stats(key string) ([]StatValue, error)
|
|
StatsMap(key string) (map[string]string, error)
|
|
StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
|
|
Transmit(req *gomemcached.MCRequest) error
|
|
TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
|
|
TransmitResponse(res *gomemcached.MCResponse) error
|
|
|
|
// UprFeed Related
|
|
NewUprFeed() (*UprFeed, error)
|
|
NewUprFeedIface() (UprFeedIface, error)
|
|
NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
|
|
NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
|
|
UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
|
|
}
|
|
|
|
const bufsize = 1024
|
|
|
|
var UnHealthy uint32 = 0
|
|
var Healthy uint32 = 1
|
|
|
|
type Features []Feature
|
|
type Feature uint16
|
|
|
|
const FeatureMutationToken = Feature(0x04)
|
|
const FeatureXattr = Feature(0x06)
|
|
const FeatureDataType = Feature(0x0b)
|
|
|
|
// The Client itself.
|
|
type Client struct {
|
|
conn io.ReadWriteCloser
|
|
// use uint32 type so that it can be accessed through atomic APIs
|
|
healthy uint32
|
|
opaque uint32
|
|
|
|
hdrBuf []byte
|
|
}
|
|
|
|
var (
|
|
DefaultDialTimeout = time.Duration(0) // No timeout
|
|
|
|
DefaultWriteTimeout = time.Duration(0) // No timeout
|
|
|
|
dialFun = func(prot, dest string) (net.Conn, error) {
|
|
return net.DialTimeout(prot, dest, DefaultDialTimeout)
|
|
}
|
|
)
|
|
|
|
// Connect to a memcached server.
|
|
func Connect(prot, dest string) (rv *Client, err error) {
|
|
conn, err := dialFun(prot, dest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return Wrap(conn)
|
|
}
|
|
|
|
func SetDefaultTimeouts(dial, read, write time.Duration) {
|
|
DefaultDialTimeout = dial
|
|
DefaultWriteTimeout = write
|
|
}
|
|
|
|
func SetDefaultDialTimeout(dial time.Duration) {
|
|
DefaultDialTimeout = dial
|
|
}
|
|
|
|
func (c *Client) SetKeepAliveOptions(interval time.Duration) {
|
|
c.conn.(*net.TCPConn).SetKeepAlive(true)
|
|
c.conn.(*net.TCPConn).SetKeepAlivePeriod(interval)
|
|
}
|
|
|
|
func (c *Client) SetReadDeadline(t time.Time) {
|
|
c.conn.(*net.TCPConn).SetReadDeadline(t)
|
|
}
|
|
|
|
func (c *Client) SetDeadline(t time.Time) {
|
|
c.conn.(*net.TCPConn).SetDeadline(t)
|
|
}
|
|
|
|
// Wrap an existing transport.
|
|
func Wrap(rwc io.ReadWriteCloser) (rv *Client, err error) {
|
|
client := &Client{
|
|
conn: rwc,
|
|
hdrBuf: make([]byte, gomemcached.HDR_LEN),
|
|
opaque: uint32(1),
|
|
}
|
|
client.setHealthy(true)
|
|
return client, nil
|
|
}
|
|
|
|
// Close the connection when you're done.
|
|
func (c *Client) Close() error {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
// IsHealthy returns true unless the client is belived to have
|
|
// difficulty communicating to its server.
|
|
//
|
|
// This is useful for connection pools where we want to
|
|
// non-destructively determine that a connection may be reused.
|
|
func (c Client) IsHealthy() bool {
|
|
healthyState := atomic.LoadUint32(&c.healthy)
|
|
return healthyState == Healthy
|
|
}
|
|
|
|
// Send a custom request and get the response.
|
|
func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) {
|
|
err = c.Transmit(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
resp, _, err := getResponse(c.conn, c.hdrBuf)
|
|
c.setHealthy(!gomemcached.IsFatal(err))
|
|
return resp, err
|
|
}
|
|
|
|
// Transmit send a request, but does not wait for a response.
|
|
func (c *Client) Transmit(req *gomemcached.MCRequest) error {
|
|
if DefaultWriteTimeout > 0 {
|
|
c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
|
|
}
|
|
_, err := transmitRequest(c.conn, req)
|
|
// clear write deadline to avoid interference with future write operations
|
|
if DefaultWriteTimeout > 0 {
|
|
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
|
|
}
|
|
if err != nil {
|
|
c.setHealthy(false)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error {
|
|
c.conn.(net.Conn).SetWriteDeadline(deadline)
|
|
|
|
_, err := transmitRequest(c.conn, req)
|
|
|
|
// clear write deadline to avoid interference with future write operations
|
|
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
|
|
|
|
if err != nil {
|
|
c.setHealthy(false)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// TransmitResponse send a response, does not wait.
|
|
func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error {
|
|
if DefaultWriteTimeout > 0 {
|
|
c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
|
|
}
|
|
_, err := transmitResponse(c.conn, res)
|
|
// clear write deadline to avoid interference with future write operations
|
|
if DefaultWriteTimeout > 0 {
|
|
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
|
|
}
|
|
if err != nil {
|
|
c.setHealthy(false)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Receive a response
|
|
func (c *Client) Receive() (*gomemcached.MCResponse, error) {
|
|
resp, _, err := getResponse(c.conn, c.hdrBuf)
|
|
if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
|
|
c.setHealthy(false)
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) {
|
|
c.conn.(net.Conn).SetReadDeadline(deadline)
|
|
|
|
resp, _, err := getResponse(c.conn, c.hdrBuf)
|
|
|
|
// Clear read deadline to avoid interference with future read operations.
|
|
c.conn.(net.Conn).SetReadDeadline(time.Time{})
|
|
|
|
if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
|
|
c.setHealthy(false)
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func appendMutationToken(bytes []byte) []byte {
|
|
bytes = append(bytes, 0, 0)
|
|
binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04))
|
|
return bytes
|
|
}
|
|
|
|
//Send a hello command to enable MutationTokens
|
|
func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
|
|
var payload []byte
|
|
payload = appendMutationToken(payload)
|
|
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.HELLO,
|
|
Key: []byte("GoMemcached"),
|
|
Body: payload,
|
|
})
|
|
|
|
}
|
|
|
|
//Send a hello command to enable specific features
|
|
func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
|
|
var payload []byte
|
|
|
|
for _, feature := range features {
|
|
payload = append(payload, 0, 0)
|
|
binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
|
|
}
|
|
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.HELLO,
|
|
Key: []byte("GoMemcached"),
|
|
Body: payload,
|
|
})
|
|
|
|
}
|
|
|
|
// Get the value for a key.
|
|
func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) {
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.GET,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
})
|
|
}
|
|
|
|
// Get the xattrs, doc value for the input key
|
|
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) {
|
|
|
|
extraBuf, valueBuf := GetSubDocVal(subPaths)
|
|
res, err := c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Extras: extraBuf,
|
|
Body: valueBuf,
|
|
})
|
|
|
|
if err != nil && IfResStatusError(res) {
|
|
return res, err
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// Get the value for a key, and update expiry
|
|
func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
|
|
extraBuf := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.GAT,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Extras: extraBuf,
|
|
})
|
|
}
|
|
|
|
// Get metadata for a key
|
|
func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) {
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.GET_META,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
})
|
|
}
|
|
|
|
// Del deletes a key.
|
|
func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error) {
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.DELETE,
|
|
VBucket: vb,
|
|
Key: []byte(key)})
|
|
}
|
|
|
|
// Get a random document
|
|
func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error) {
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: 0xB6,
|
|
})
|
|
}
|
|
|
|
// AuthList lists SASL auth mechanisms.
|
|
func (c *Client) AuthList() (*gomemcached.MCResponse, error) {
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.SASL_LIST_MECHS})
|
|
}
|
|
|
|
// Auth performs SASL PLAIN authentication against the server.
|
|
func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) {
|
|
res, err := c.AuthList()
|
|
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
authMech := string(res.Body)
|
|
if strings.Index(authMech, "PLAIN") != -1 {
|
|
return c.AuthPlain(user, pass)
|
|
}
|
|
return nil, fmt.Errorf("auth mechanism PLAIN not supported")
|
|
}
|
|
|
|
// AuthScramSha performs SCRAM-SHA authentication against the server.
|
|
func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) {
|
|
res, err := c.AuthList()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Unable to obtain list of methods.")
|
|
}
|
|
|
|
methods := string(res.Body)
|
|
method, err := scramsha.BestMethod(methods)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err,
|
|
"Unable to select SCRAM-SHA method.")
|
|
}
|
|
|
|
s, err := scramsha.NewScramSha(method)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Unable to initialize scramsha.")
|
|
}
|
|
|
|
logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
|
|
|
|
message, err := s.GetStartRequest(user)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err,
|
|
"Error building start request for user %s.", user)
|
|
}
|
|
|
|
startRequest := &gomemcached.MCRequest{
|
|
Opcode: 0x21,
|
|
Key: []byte(method),
|
|
Body: []byte(message)}
|
|
|
|
startResponse, err := c.Send(startRequest)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Error sending start request.")
|
|
}
|
|
|
|
err = s.HandleStartResponse(string(startResponse.Body))
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Error handling start response.")
|
|
}
|
|
|
|
message = s.GetFinalRequest(pass)
|
|
|
|
// send step request
|
|
finalRequest := &gomemcached.MCRequest{
|
|
Opcode: 0x22,
|
|
Key: []byte(method),
|
|
Body: []byte(message)}
|
|
finalResponse, err := c.Send(finalRequest)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Error sending final request.")
|
|
}
|
|
|
|
err = s.HandleFinalResponse(string(finalResponse.Body))
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Error handling final response.")
|
|
}
|
|
|
|
return finalResponse, nil
|
|
}
|
|
|
|
func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) {
|
|
logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.SASL_AUTH,
|
|
Key: []byte("PLAIN"),
|
|
Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))})
|
|
}
|
|
|
|
// select bucket
|
|
func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
|
|
|
|
return c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.SELECT_BUCKET,
|
|
Key: []byte(fmt.Sprintf("%s", bucket))})
|
|
}
|
|
|
|
func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
|
|
key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) {
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: opcode,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Cas: 0,
|
|
Opaque: 0,
|
|
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
|
|
Body: body}
|
|
|
|
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
|
|
return c.Send(req)
|
|
}
|
|
|
|
func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
|
|
key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) {
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: opcode,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Cas: cas,
|
|
Opaque: 0,
|
|
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
|
|
Body: body}
|
|
|
|
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
|
|
return c.Send(req)
|
|
}
|
|
|
|
// Incr increments the value at the given key.
|
|
func (c *Client) Incr(vb uint16, key string,
|
|
amt, def uint64, exp int) (uint64, error) {
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.INCREMENT,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Extras: make([]byte, 8+8+4),
|
|
}
|
|
binary.BigEndian.PutUint64(req.Extras[:8], amt)
|
|
binary.BigEndian.PutUint64(req.Extras[8:16], def)
|
|
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
|
|
|
|
resp, err := c.Send(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return binary.BigEndian.Uint64(resp.Body), nil
|
|
}
|
|
|
|
// Decr decrements the value at the given key.
|
|
func (c *Client) Decr(vb uint16, key string,
|
|
amt, def uint64, exp int) (uint64, error) {
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.DECREMENT,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Extras: make([]byte, 8+8+4),
|
|
}
|
|
binary.BigEndian.PutUint64(req.Extras[:8], amt)
|
|
binary.BigEndian.PutUint64(req.Extras[8:16], def)
|
|
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
|
|
|
|
resp, err := c.Send(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return binary.BigEndian.Uint64(resp.Body), nil
|
|
}
|
|
|
|
// Add a value for a key (store if not exists).
|
|
func (c *Client) Add(vb uint16, key string, flags int, exp int,
|
|
body []byte) (*gomemcached.MCResponse, error) {
|
|
return c.store(gomemcached.ADD, vb, key, flags, exp, body)
|
|
}
|
|
|
|
// Set the value for a key.
|
|
func (c *Client) Set(vb uint16, key string, flags int, exp int,
|
|
body []byte) (*gomemcached.MCResponse, error) {
|
|
return c.store(gomemcached.SET, vb, key, flags, exp, body)
|
|
}
|
|
|
|
// SetCas set the value for a key with cas
|
|
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
|
|
body []byte) (*gomemcached.MCResponse, error) {
|
|
return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body)
|
|
}
|
|
|
|
// Append data to the value of a key.
|
|
func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) {
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.APPEND,
|
|
VBucket: vb,
|
|
Key: []byte(key),
|
|
Cas: 0,
|
|
Opaque: 0,
|
|
Body: data}
|
|
|
|
return c.Send(req)
|
|
}
|
|
|
|
// GetBulk gets keys in bulk
|
|
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error {
|
|
stopch := make(chan bool)
|
|
var wg sync.WaitGroup
|
|
|
|
defer func() {
|
|
close(stopch)
|
|
wg.Wait()
|
|
}()
|
|
|
|
if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) {
|
|
c.opaque = uint32(1)
|
|
}
|
|
|
|
opStart := c.opaque
|
|
|
|
errch := make(chan error, 2)
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logging.Infof("Recovered in f %v", r)
|
|
}
|
|
errch <- nil
|
|
wg.Done()
|
|
}()
|
|
|
|
ok := true
|
|
for ok {
|
|
|
|
select {
|
|
case <-stopch:
|
|
return
|
|
default:
|
|
res, err := c.Receive()
|
|
|
|
if err != nil && IfResStatusError(res) {
|
|
if res == nil || res.Status != gomemcached.KEY_ENOENT {
|
|
errch <- err
|
|
return
|
|
}
|
|
// continue receiving in case of KEY_ENOENT
|
|
} else if res.Opcode == gomemcached.GET ||
|
|
res.Opcode == gomemcached.SUBDOC_GET ||
|
|
res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP {
|
|
opaque := res.Opaque - opStart
|
|
if opaque < 0 || opaque >= uint32(len(keys)) {
|
|
// Every now and then we seem to be seeing an invalid opaque
|
|
// value returned from the server. When this happens log the error
|
|
// and the calling function will retry the bulkGet. MB-15140
|
|
logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body))
|
|
errch <- fmt.Errorf("Out of Bounds error")
|
|
return
|
|
}
|
|
|
|
rv[keys[opaque]] = res
|
|
}
|
|
|
|
if res.Opcode == gomemcached.NOOP {
|
|
ok = false
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
memcachedReqPkt := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.GET,
|
|
VBucket: vb,
|
|
}
|
|
|
|
if len(subPaths) > 0 {
|
|
extraBuf, valueBuf := GetSubDocVal(subPaths)
|
|
memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP
|
|
memcachedReqPkt.Extras = extraBuf
|
|
memcachedReqPkt.Body = valueBuf
|
|
}
|
|
|
|
for _, k := range keys { // Start of Get request
|
|
memcachedReqPkt.Key = []byte(k)
|
|
memcachedReqPkt.Opaque = c.opaque
|
|
|
|
err := c.Transmit(memcachedReqPkt)
|
|
if err != nil {
|
|
logging.Errorf(" Transmit failed in GetBulkAll %v", err)
|
|
return err
|
|
}
|
|
c.opaque++
|
|
} // End of Get request
|
|
|
|
// finally transmit a NOOP
|
|
err := c.Transmit(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.NOOP,
|
|
VBucket: vb,
|
|
Opaque: c.opaque,
|
|
})
|
|
|
|
if err != nil {
|
|
logging.Errorf(" Transmit of NOOP failed %v", err)
|
|
return err
|
|
}
|
|
c.opaque++
|
|
|
|
return <-errch
|
|
}
|
|
|
|
func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
|
|
|
|
var ops []string
|
|
totalBytesLen := 0
|
|
num := 1
|
|
|
|
for _, v := range subPaths {
|
|
totalBytesLen = totalBytesLen + len([]byte(v))
|
|
ops = append(ops, v)
|
|
num = num + 1
|
|
}
|
|
|
|
// Xattr retrieval - subdoc multi get
|
|
extraBuf = append(extraBuf, uint8(0x04))
|
|
|
|
valueBuf = make([]byte, num*4+totalBytesLen)
|
|
|
|
//opcode for subdoc get
|
|
op := gomemcached.SUBDOC_GET
|
|
|
|
// Calculate path total bytes
|
|
// There are 2 ops - get xattrs - both input and $document and get whole doc
|
|
valIter := 0
|
|
|
|
for _, v := range ops {
|
|
pathBytes := []byte(v)
|
|
valueBuf[valIter+0] = uint8(op)
|
|
|
|
// SubdocFlagXattrPath indicates that the path refers to
|
|
// an Xattr rather than the document body.
|
|
valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR)
|
|
|
|
// 2 byte key
|
|
binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes)))
|
|
|
|
// Then n bytes path
|
|
copy(valueBuf[valIter+4:], pathBytes)
|
|
valIter = valIter + 4 + len(pathBytes)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// ObservedStatus is the type reported by the Observe method
|
|
type ObservedStatus uint8
|
|
|
|
// Observation status values.
|
|
const (
|
|
ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted
|
|
ObservedPersisted = ObservedStatus(0x01) // found, persisted
|
|
ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete)
|
|
ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
|
|
)
|
|
|
|
// ObserveResult represents the data obtained by an Observe call
|
|
type ObserveResult struct {
|
|
Status ObservedStatus // Whether the value has been persisted/deleted
|
|
Cas uint64 // Current value's CAS
|
|
PersistenceTime time.Duration // Node's average time to persist a value
|
|
ReplicationTime time.Duration // Node's average time to replicate a value
|
|
}
|
|
|
|
// Observe gets the persistence/replication/CAS state of a key
|
|
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) {
|
|
// http://www.couchbase.com/wiki/display/couchbase/Observe
|
|
body := make([]byte, 4+len(key))
|
|
binary.BigEndian.PutUint16(body[0:2], vb)
|
|
binary.BigEndian.PutUint16(body[2:4], uint16(len(key)))
|
|
copy(body[4:4+len(key)], key)
|
|
|
|
res, err := c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.OBSERVE,
|
|
VBucket: vb,
|
|
Body: body,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Parse the response data from the body:
|
|
if len(res.Body) < 2+2+1 {
|
|
err = io.ErrUnexpectedEOF
|
|
return
|
|
}
|
|
outVb := binary.BigEndian.Uint16(res.Body[0:2])
|
|
keyLen := binary.BigEndian.Uint16(res.Body[2:4])
|
|
if len(res.Body) < 2+2+int(keyLen)+1+8 {
|
|
err = io.ErrUnexpectedEOF
|
|
return
|
|
}
|
|
outKey := string(res.Body[4 : 4+keyLen])
|
|
if outVb != vb || outKey != key {
|
|
err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey)
|
|
return
|
|
}
|
|
result.Status = ObservedStatus(res.Body[4+keyLen])
|
|
result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:])
|
|
// The response reuses the Cas field to store time statistics:
|
|
result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond
|
|
result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond
|
|
return
|
|
}
|
|
|
|
// CheckPersistence checks whether a stored value has been persisted to disk yet.
|
|
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) {
|
|
switch {
|
|
case result.Status == ObservedNotFound && deletion:
|
|
persisted = true
|
|
case result.Cas != cas:
|
|
overwritten = true
|
|
case result.Status == ObservedPersisted:
|
|
persisted = true
|
|
}
|
|
return
|
|
}
|
|
|
|
// Sequence number based Observe Implementation
|
|
type ObserveSeqResult struct {
|
|
Failover uint8 // Set to 1 if a failover took place
|
|
VbId uint16 // vbucket id
|
|
Vbuuid uint64 // vucket uuid
|
|
LastPersistedSeqNo uint64 // last persisted sequence number
|
|
CurrentSeqNo uint64 // current sequence number
|
|
OldVbuuid uint64 // Old bucket vbuuid
|
|
LastSeqNo uint64 // last sequence number received before failover
|
|
}
|
|
|
|
func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) {
|
|
// http://www.couchbase.com/wiki/display/couchbase/Observe
|
|
body := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(body[0:8], vbuuid)
|
|
|
|
res, err := c.Send(&gomemcached.MCRequest{
|
|
Opcode: gomemcached.OBSERVE_SEQNO,
|
|
VBucket: vb,
|
|
Body: body,
|
|
Opaque: 0x01,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if res.Status != gomemcached.SUCCESS {
|
|
return nil, fmt.Errorf(" Observe returned error %v", res.Status)
|
|
}
|
|
|
|
// Parse the response data from the body:
|
|
if len(res.Body) < (1 + 2 + 8 + 8 + 8) {
|
|
err = io.ErrUnexpectedEOF
|
|
return
|
|
}
|
|
|
|
result = &ObserveSeqResult{}
|
|
result.Failover = res.Body[0]
|
|
result.VbId = binary.BigEndian.Uint16(res.Body[1:3])
|
|
result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11])
|
|
result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19])
|
|
result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27])
|
|
|
|
// in case of failover processing we can have old vbuuid and the last persisted seq number
|
|
if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) {
|
|
result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35])
|
|
result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43])
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// CasOp is the type of operation to perform on this CAS loop.
|
|
type CasOp uint8
|
|
|
|
const (
|
|
// CASStore instructs the server to store the new value normally
|
|
CASStore = CasOp(iota)
|
|
// CASQuit instructs the client to stop attempting to CAS, leaving value untouched
|
|
CASQuit
|
|
// CASDelete instructs the server to delete the current value
|
|
CASDelete
|
|
)
|
|
|
|
// User specified termination is returned as an error.
|
|
func (c CasOp) Error() string {
|
|
switch c {
|
|
case CASStore:
|
|
return "CAS store"
|
|
case CASQuit:
|
|
return "CAS quit"
|
|
case CASDelete:
|
|
return "CAS delete"
|
|
}
|
|
panic("Unhandled value")
|
|
}
|
|
|
|
//////// CAS TRANSFORM
|
|
|
|
// CASState tracks the state of CAS over several operations.
|
|
//
|
|
// This is used directly by CASNext and indirectly by CAS
|
|
type CASState struct {
|
|
initialized bool // false on the first call to CASNext, then true
|
|
Value []byte // Current value of key; update in place to new value
|
|
Cas uint64 // Current CAS value of key
|
|
Exists bool // Does a value exist for the key? (If not, Value will be nil)
|
|
Err error // Error, if any, after CASNext returns false
|
|
resp *gomemcached.MCResponse
|
|
}
|
|
|
|
// CASNext is a non-callback, loop-based version of CAS method.
|
|
//
|
|
// Usage is like this:
|
|
//
|
|
// var state memcached.CASState
|
|
// for client.CASNext(vb, key, exp, &state) {
|
|
// state.Value = some_mutation(state.Value)
|
|
// }
|
|
// if state.Err != nil { ... }
|
|
func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool {
|
|
if state.initialized {
|
|
if !state.Exists {
|
|
// Adding a new key:
|
|
if state.Value == nil {
|
|
state.Cas = 0
|
|
return false // no-op (delete of non-existent value)
|
|
}
|
|
state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value)
|
|
} else {
|
|
// Updating / deleting a key:
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.DELETE,
|
|
VBucket: vb,
|
|
Key: []byte(k),
|
|
Cas: state.Cas}
|
|
if state.Value != nil {
|
|
req.Opcode = gomemcached.SET
|
|
req.Opaque = 0
|
|
req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0}
|
|
req.Body = state.Value
|
|
|
|
flags := 0
|
|
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
|
|
}
|
|
state.resp, state.Err = c.Send(req)
|
|
}
|
|
|
|
// If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to
|
|
// get the new value (below). Otherwise, we're done (either success or failure) so return:
|
|
if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS ||
|
|
state.resp.Status == gomemcached.NOT_STORED)) {
|
|
state.Cas = state.resp.Cas
|
|
return false // either success or fatal error
|
|
}
|
|
}
|
|
|
|
// Initial call, or after a conflict: GET the current value and CAS and return them:
|
|
state.initialized = true
|
|
if state.resp, state.Err = c.Get(vb, k); state.Err == nil {
|
|
state.Exists = true
|
|
state.Value = state.resp.Body
|
|
state.Cas = state.resp.Cas
|
|
} else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT {
|
|
state.Err = nil
|
|
state.Exists = false
|
|
state.Value = nil
|
|
state.Cas = 0
|
|
} else {
|
|
return false // fatal error
|
|
}
|
|
return true // keep going...
|
|
}
|
|
|
|
// CasFunc is type type of function to perform a CAS transform.
|
|
//
|
|
// Input is the current value, or nil if no value exists.
|
|
// The function should return the new value (if any) to set, and the store/quit/delete operation.
|
|
type CasFunc func(current []byte) ([]byte, CasOp)
|
|
|
|
// CAS performs a CAS transform with the given function.
|
|
//
|
|
// If the value does not exist, a nil current value will be sent to f.
|
|
func (c *Client) CAS(vb uint16, k string, f CasFunc,
|
|
initexp int) (*gomemcached.MCResponse, error) {
|
|
var state CASState
|
|
for c.CASNext(vb, k, initexp, &state) {
|
|
newValue, operation := f(state.Value)
|
|
if operation == CASQuit || (operation == CASDelete && state.Value == nil) {
|
|
return nil, operation
|
|
}
|
|
state.Value = newValue
|
|
}
|
|
return state.resp, state.Err
|
|
}
|
|
|
|
// StatValue is one of the stats returned from the Stats method.
|
|
type StatValue struct {
|
|
// The stat key
|
|
Key string
|
|
// The stat value
|
|
Val string
|
|
}
|
|
|
|
// Stats requests server-side stats.
|
|
//
|
|
// Use "" as the stat key for toplevel stats.
|
|
func (c *Client) Stats(key string) ([]StatValue, error) {
|
|
rv := make([]StatValue, 0, 128)
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.STAT,
|
|
Key: []byte(key),
|
|
Opaque: 918494,
|
|
}
|
|
|
|
err := c.Transmit(req)
|
|
if err != nil {
|
|
return rv, err
|
|
}
|
|
|
|
for {
|
|
res, _, err := getResponse(c.conn, c.hdrBuf)
|
|
if err != nil {
|
|
return rv, err
|
|
}
|
|
k := string(res.Key)
|
|
if k == "" {
|
|
break
|
|
}
|
|
rv = append(rv, StatValue{
|
|
Key: k,
|
|
Val: string(res.Body),
|
|
})
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
// StatsMap requests server-side stats similarly to Stats, but returns
|
|
// them as a map.
|
|
//
|
|
// Use "" as the stat key for toplevel stats.
|
|
func (c *Client) StatsMap(key string) (map[string]string, error) {
|
|
rv := make(map[string]string)
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.STAT,
|
|
Key: []byte(key),
|
|
Opaque: 918494,
|
|
}
|
|
|
|
err := c.Transmit(req)
|
|
if err != nil {
|
|
return rv, err
|
|
}
|
|
|
|
for {
|
|
res, _, err := getResponse(c.conn, c.hdrBuf)
|
|
if err != nil {
|
|
return rv, err
|
|
}
|
|
k := string(res.Key)
|
|
if k == "" {
|
|
break
|
|
}
|
|
rv[k] = string(res.Body)
|
|
}
|
|
|
|
return rv, nil
|
|
}
|
|
|
|
// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys
|
|
// for which stats needs to be retrieved
|
|
func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error {
|
|
|
|
// clear statsMap
|
|
for key, _ := range statsMap {
|
|
statsMap[key] = ""
|
|
}
|
|
|
|
req := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.STAT,
|
|
Key: []byte(key),
|
|
Opaque: 918494,
|
|
}
|
|
|
|
err := c.Transmit(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
res, _, err := getResponse(c.conn, c.hdrBuf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
k := string(res.Key)
|
|
if k == "" {
|
|
break
|
|
}
|
|
if _, ok := statsMap[k]; ok {
|
|
statsMap[k] = string(res.Body)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Hijack exposes the underlying connection from this client.
|
|
//
|
|
// It also marks the connection as unhealthy since the client will
|
|
// have lost control over the connection and can't otherwise verify
|
|
// things are in good shape for connection pools.
|
|
func (c *Client) Hijack() io.ReadWriteCloser {
|
|
c.setHealthy(false)
|
|
return c.conn
|
|
}
|
|
|
|
func (c *Client) setHealthy(healthy bool) {
|
|
healthyState := UnHealthy
|
|
if healthy {
|
|
healthyState = Healthy
|
|
}
|
|
atomic.StoreUint32(&c.healthy, healthyState)
|
|
}
|
|
|
|
func IfResStatusError(response *gomemcached.MCResponse) bool {
|
|
return response == nil ||
|
|
(response.Status != gomemcached.SUBDOC_BAD_MULTI &&
|
|
response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
|
|
response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
|
|
}
|