package zongzi
import (
"bytes"
"context"
"fmt"
"net"
"regexp"
"sort"
"strings"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/logger"
"github.com/logbn/zongzi/internal"
)
type Agent struct {
advertiseAddress string
bindAddress string
clusterName string
hostController *hostController
controllerManager *controllerManager
clientManager *clientManager
fsm *fsm
grpcClientPool *grpcClientPool
grpcServer *grpcServer
host *dragonboat.NodeHost
hostConfig HostConfig
hostTags []string
log logger.ILogger
members map[uint64]string
peers []string
replicaConfig ReplicaConfig
shardTypes map[string]shardType
status AgentStatus
clock clock.Clock
ctx context.Context
ctxCancel context.CancelFunc
mutex sync.RWMutex
wg sync.WaitGroup
}
type shardType struct {
Config ReplicaConfig
StateMachineFactory StateMachineFactory
StateMachinePersistentFactory StateMachinePersistentFactory
Uri string
}
func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent, err error) {
if !regexp.MustCompile(ClusterNameRegex).MatchString(clusterName) {
err = fmt.Errorf(`%v: (%s)`, ErrClusterNameInvalid, clusterName)
return
}
log := logger.GetLogger(projectName)
sort.Strings(peers)
a = &Agent{
clock: clock.New(),
clusterName: clusterName,
log: log,
peers: peers,
shardTypes: map[string]shardType{},
status: AgentStatus_Pending,
}
a.controllerManager = newControllerManager(a)
a.clientManager = newClientManager(a)
for _, opt := range append([]AgentOption{
WithAddrApi(DefaultApiAddress),
WithAddrGossip(DefaultGossipAddress),
WithHostConfig(DefaultHostConfig),
WithReplicaConfig(DefaultReplicaConfig),
}, opts...) {
opt(a)
}
a.hostController = newHostController(a)
a.hostConfig.RaftEventListener = newCompositeRaftEventListener(
a.controllerManager,
a.hostConfig.RaftEventListener,
)
a.replicaConfig.ShardID = ShardID
a.hostConfig.DeploymentID = mustBase36Decode(clusterName)
a.hostConfig.DefaultNodeRegistryEnabled = true
a.hostConfig.Gossip.Meta = []byte(a.advertiseAddress)
a.grpcClientPool = newGrpcClientPool(1e4)
a.grpcServer = newGrpcServer(a.bindAddress)
if parts := strings.Split(a.advertiseAddress, ":"); len(parts) > 1 {
// Append port from advertise addr to peer list for convenience
for i, addr := range peers {
if !strings.Contains(addr, ":") {
peers[i] = fmt.Sprintf("%s:%s", addr, parts[1])
}
}
}
return a, nil
}
// Client returns a client for a specific shard.
// It will send writes to the nearest member and send reads to the nearest replica (by ping).
func (a *Agent) Client(shardID uint64, opts ...ClientOption) (c ShardClient) {
c, _ = newClient(a.clientManager, shardID, opts...)
return
}
// ShardCreate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardCreate(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) {
shard = Shard{
Status: ShardStatus_New,
Type: uri,
Tags: map[string]string{},
}
for _, opt := range opts {
if err = opt(&shard); err != nil {
return
}
}
if len(shard.Name) > 0 {
var ok bool
var found Shard
a.State(ctx, func(state *State) {
found, ok = state.ShardFindByName(shard.Name)
})
if ok {
shard = found
return
}
}
res, err := a.primePropose(newCmdShardPost(shard))
if err != nil {
return
}
shard.ID = res.Value
created = true
a.log.Infof("Shard created %s, %d, %s", uri, shard.ID, shard.Name)
return
}
// ShardDelete deletes a shard.
func (a *Agent) ShardDelete(ctx context.Context, id uint64) (err error) {
res, err := a.primePropose(newCmdShardDel(id))
if err != nil {
return
}
if res.Value == 0 {
err = fmt.Errorf("Error deleting shard %d: %s", id, string(res.Data))
return
}
a.log.Infof("Shard deleted (%d)", id)
return
}
// ShardFind returns a shard by id.
func (a *Agent) ShardFind(ctx context.Context, id uint64) (shard Shard, err error) {
err = a.State(ctx, func(state *State) {
shard, _ = state.Shard(id)
})
return
}
// ShardUpdate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardUpdate(ctx context.Context, id uint64, opts ...ShardOption) (shard Shard, err error) {
var found Shard
var ok bool
a.State(ctx, func(state *State) {
found, ok = state.Shard(id)
})
if !ok {
err = ErrShardNotFound
return
}
for _, opt := range opts {
opt(&found)
}
shard = found
res, err := a.primePropose(newCmdShardPut(shard))
if err != nil {
return
}
shard.ID = res.Value
a.log.Infof("Shard updated (%d)", id)
return
}
// Start starts the agent, bootstrapping the cluster if required.
func (a *Agent) Start(ctx context.Context) (err error) {
var init bool
defer func() {
if err == nil {
a.hostController.Start()
a.controllerManager.Start()
a.clientManager.Start()
a.setStatus(AgentStatus_Ready)
}
}()
a.ctx, a.ctxCancel = context.WithCancel(ctx)
// Start gRPC server
a.wg.Add(1)
go func() {
defer a.wg.Done()
defer a.log.Debugf("Stopped gRPC Server")
for {
if err := a.grpcServer.Start(a); err != nil {
a.log.Errorf("Error starting gRPC server: %s", err.Error())
}
select {
case <-a.ctx.Done():
return
case <-a.clock.After(waitPeriod):
}
}
}()
// Resolve member gossip addresses
a.hostConfig.Gossip.AdvertiseAddress, err = a.gossipIP(a.hostConfig.Gossip.AdvertiseAddress)
if err != nil {
a.log.Errorf(`Failed to resolve gossip advertise address: %v`, err)
return
}
a.hostConfig.Gossip.Seed, err = a.resolvePeerGossipSeed()
if err != nil {
a.log.Errorf(`Failed to resolve gossip seeds: %v`, err)
return
}
// Start node host
if a.host, err = dragonboat.NewNodeHost(a.hostConfig); err != nil {
a.log.Errorf(`Failed to start host: %v`, err)
return
}
a.log.Infof(`Started host "%s"`, a.hostID())
// Find prime replicaID
a.replicaConfig.ReplicaID = a.findLocalReplicaID(a.replicaConfig.ShardID)
existing := a.replicaConfig.ReplicaID > 0
a.replicaConfig.IsNonVoting = !sliceContains(a.peers, a.advertiseAddress)
// Resolve prime member replicaIDs
a.members, init, err = a.resolvePrimeMembership()
if err != nil {
a.log.Errorf(`Failed to resolve prime membership: %v`, err)
return
}
if a.replicaConfig.ReplicaID == 0 {
// Request Join if cluster found but replica does not exist
a.setStatus(AgentStatus_Joining)
for {
a.replicaConfig.ReplicaID, err = a.joinPrimeShard()
if err != nil {
a.log.Warningf(`Failed to join prime shard: %v`, err)
a.clock.Sleep(waitPeriod)
continue
}
break
}
for {
err = a.startPrimeReplica(nil, true)
if err != nil {
a.log.Warningf("Error startPrimeReplica: (%s) %s", AgentStatus_Joining, err.Error())
a.clock.Sleep(waitPeriod)
continue
}
break
}
} else if !existing {
if init {
// Init if cluster is new
a.setStatus(AgentStatus_Initializing)
err = a.startPrimeReplica(a.members, false)
if err != nil {
a.log.Errorf(`Failed to startPrimeReplica: %v`, err)
return
}
err = a.primeInit(a.members)
if err != nil {
a.log.Errorf(`Failed to primeInit: %v`, err)
return
}
} else {
// Join if replica should exist but doesn't
a.setStatus(AgentStatus_Joining)
err = a.startPrimeReplica(a.members, false)
if err != nil {
a.log.Errorf(`Failed to startPrimeReplica: %v`, err)
return
}
err = a.primeInitAwait()
if err != nil {
a.log.Errorf(`Failed to primeInitAwait: %v`, err)
return
}
}
} else {
// Otherwise just rejoin the shard
a.setStatus(AgentStatus_Rejoining)
err = a.startPrimeReplica(nil, false)
if err != nil {
a.log.Errorf(`Failed to startPrimeReplica: %v`, err)
return
}
}
// Update host status, meta, etc.
err = a.updateHost()
if err != nil {
a.log.Errorf(`Failed to updateHost: %v`, err)
return
}
// Update replica status
err = a.updateReplica()
if err != nil {
a.log.Errorf(`Failed to updateReplica: %v`, err)
return
}
return
}
// State executes a callback function passing a snapshot of the cluster state.
//
// err := agent.State(ctx, func(s *State) error {
// log.Println(s.Index())
// return nil
// })
//
// Linear reads are enabled by default to achieve "Read Your Writes" consistency following a proposal. Pass optional
// argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot
// isolation, even for stale reads.
//
// State will block indefinitely if the prime shard is unavailable. This may prevent the agent from stopping gracefully.
// Pass a timeout context to avoid blocking indefinitely.
//
// State is thread safe and will not block writes.
//
// State read will be stale (non-linearizable) when ctx is nil.
func (a *Agent) State(ctx context.Context, fn func(*State)) (err error) {
err = a.index(ctx, a.replicaConfig.ShardID)
if err != nil {
return
}
fn(a.fsm.state.withTxn(false))
return
}
func (a *Agent) StateLocal(fn func(*State)) (err error) {
fn(a.fsm.state.withTxn(false))
return
}
// StateMachineRegister registers a shard type. Call before Starting agent.
func (a *Agent) StateMachineRegister(uri string, factory any, config ...ReplicaConfig) (err error) {
cfg := DefaultReplicaConfig
if len(config) > 0 {
cfg = config[0]
}
t := shardType{
Config: cfg,
Uri: uri,
}
switch f := factory.(type) {
case StateMachineFactory:
t.StateMachineFactory = f
case StateMachinePersistentFactory:
t.StateMachinePersistentFactory = f
default:
return ErrInvalidFactory
}
a.shardTypes[uri] = t
return
}
// Status returns the agent status.
func (a *Agent) Status() AgentStatus {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.status
}
// Stop stops the agent.
func (a *Agent) Stop() {
a.controllerManager.Stop()
a.hostController.Stop()
a.grpcServer.Stop()
a.ctxCancel()
if a.host != nil {
a.host.Close()
}
a.wg.Wait()
a.setStatus(AgentStatus_Stopped)
}
// hostID returns host ID if host is initialized, otherwise empty string.
func (a *Agent) HostID() (id string) {
if a.host != nil {
return a.host.ID()
}
return ""
}
// hostID returns host ID if host is initialized, otherwise empty string.
func (a *Agent) hostID() (id string) {
if a.host != nil {
return a.host.ID()
}
return ""
}
// hostClient returns a Client for a specific host.
func (a *Agent) hostClient(hostID string) (c hostClient) {
a.State(a.ctx, func(s *State) {
host, ok := s.Host(hostID)
if ok {
c = newhostClient(a, host)
}
})
return
}
// tagsSet sets tags on an item (Host, Shard or Replica). Overwrites if tag is already present.
func (a *Agent) tagsSet(item any, tags ...string) (err error) {
_, err = a.primePropose(newCmdTagsSet(item, tags...))
return
}
// tagsSetNX sets tags on an item (Host, Shard or Replica). Does nothing if tag is already present.
func (a *Agent) tagsSetNX(item any, tags ...string) (err error) {
_, err = a.primePropose(newCmdTagsSetNX(item, tags...))
return
}
// tagsRemove remove tags from an item (Host, Shard or Replica).
func (a *Agent) tagsRemove(item any, tags ...string) (err error) {
_, err = a.primePropose(newCmdTagsRemove(item, tags...))
return
}
// replicaCreate creates a replica
func (a *Agent) replicaCreate(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error) {
res, err := a.primePropose(newCmdReplicaPost(hostID, shardID, isNonVoting))
if err == nil {
id = res.Value
a.log.Infof("[%05d:%05d] Replica created %s, %v", shardID, id, hostID, isNonVoting)
}
return
}
// replicaDelete deletes a replica
func (a *Agent) replicaDelete(replicaID uint64) (err error) {
_, err = a.primePropose(newCmdReplicaDelete(replicaID))
if err == nil {
a.log.Infof("Replica deleted %05d", replicaID)
}
return
}
// shardLeaderSet sets the leader of a shard
func (a *Agent) shardLeaderSet(shardID, replicaID, term uint64) (err error) {
_, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID, term))
if err == nil {
a.log.Infof("Shard %05d leader set to %05d", shardID, replicaID)
}
return
}
func (a *Agent) setStatus(s AgentStatus) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.log.Infof("%s Agent Status: %v", a.hostID(), s)
a.status = s
}
func (a *Agent) stopReplica(cfg ReplicaConfig) (err error) {
err = a.host.StopReplica(cfg.ShardID, cfg.ReplicaID)
if err != nil {
return fmt.Errorf("Failed to stop replica: %w", err)
}
return
}
func (a *Agent) gossipIP(peerApiAddr string) (ipAddr string, err error) {
parts := strings.Split(peerApiAddr, ":")
if len(parts) != 2 {
err = fmt.Errorf("%w: %s", ErrInvalidGossipAddr, peerApiAddr)
return
}
if net.ParseIP(parts[0]) != nil {
return peerApiAddr, nil
}
ips, err := net.LookupIP(parts[0])
if err != nil {
return
}
if len(ips) == 0 {
err = fmt.Errorf("%w: %s", ErrInvalidGossipAddr, peerApiAddr)
} else {
ipAddr = ips[0].String() + ":" + parts[1]
}
return
}
// resolvePeerGossipSeed resolves peer api address list to peer gossip address list
func (a *Agent) resolvePeerGossipSeed() (gossip []string, err error) {
a.wg.Add(1)
defer a.wg.Done()
for {
gossip = gossip[:0]
for _, peerApiAddr := range a.peers {
if peerApiAddr == a.advertiseAddress {
ipAddr, err := a.gossipIP(a.hostConfig.Gossip.AdvertiseAddress)
if err == nil {
gossip = append(gossip, ipAddr)
} else {
a.log.Warningf(err.Error())
}
continue
}
var res *internal.ProbeResponse
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
res, err = a.grpcClientPool.get(peerApiAddr).Probe(ctx, &internal.ProbeRequest{})
if err == nil && res != nil {
ipAddr, err := a.gossipIP(res.GossipAdvertiseAddress)
if err == nil {
gossip = append(gossip, ipAddr)
} else {
a.log.Warningf(err.Error())
}
} else if err != nil && !strings.HasSuffix(err.Error(), `connect: connection refused"`) {
a.log.Warningf("No probe response for %s %+v %v", peerApiAddr, res, err.Error())
}
}
a.log.Infof("Peers: %#v", a.peers)
a.log.Infof("Found %d of %d peers %+v", len(gossip), len(a.peers), gossip)
if len(gossip) < len(a.peers) {
select {
case <-a.ctx.Done():
err = fmt.Errorf(`Cancelling findGossip (agent stopped)`)
return
case <-a.clock.After(waitPeriod):
}
continue
}
break
}
return
}
// resolvePrimeMembership resolves peer raft address list to replicaID and hostID
func (a *Agent) resolvePrimeMembership() (members map[uint64]string, init bool, err error) {
a.wg.Add(1)
defer a.wg.Done()
for {
members = map[uint64]string{}
var uninitialized = map[string]string{}
// Get host info from all peers to determine which are initialized.
for _, apiAddr := range a.peers {
var info *internal.InfoResponse
if apiAddr == a.advertiseAddress && a.hostID() != "" {
info = &internal.InfoResponse{
ReplicaId: a.replicaConfig.ReplicaID,
HostId: a.hostID(),
}
} else {
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
info, err = a.grpcClientPool.get(apiAddr).Info(ctx, &internal.InfoRequest{})
if err != nil {
return
}
}
if len(info.HostId) == 0 {
continue
}
if info.ReplicaId == 0 {
uninitialized[apiAddr] = info.HostId
} else {
members[info.ReplicaId] = info.HostId
}
}
a.log.Infof("Found %d of %d peers (%d uninitialized)", len(members), len(a.peers), len(uninitialized))
// All peers resolved. Start agent.
if len(members) == len(a.peers) {
break
}
// All peers uninitialized. Initialize cluster.
if len(uninitialized) == len(a.peers) {
for i, apiAddr := range a.peers {
replicaID := uint64(i + 1)
members[replicaID] = uninitialized[apiAddr]
if apiAddr == a.advertiseAddress {
a.replicaConfig.ReplicaID = replicaID
}
}
if a.replicaConfig.ReplicaID > 0 {
init = true
break
}
}
// Some peers initialized. Retrieve member list from initialized host.
if len(members)+len(uninitialized) == len(a.peers) {
var res *internal.MembersResponse
for _, apiAddr := range a.peers {
if apiAddr == a.advertiseAddress {
continue
}
if _, ok := uninitialized[apiAddr]; !ok {
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
res, err = a.grpcClientPool.get(apiAddr).Members(ctx, &internal.MembersRequest{})
if err != nil {
return
}
a.log.Debugf("Get Members: %+v", res.GetMembers())
for replicaID, hostID := range res.GetMembers() {
members[replicaID] = hostID
if hostID == a.host.ID() {
a.replicaConfig.ReplicaID = replicaID
break
}
}
break
}
}
if a.replicaConfig.ReplicaID > 0 && len(members) == len(a.peers) {
break
}
}
a.clock.Sleep(waitPeriod)
}
a.log.Debugf(`Init: %v, Members: %+v`, init, members)
return
}
// startPrimeReplica starts the prime replica
func (a *Agent) startPrimeReplica(members map[uint64]string, join bool) (err error) {
// a.log.Debugf("Starting Replica %+v (%v)", members, join)
err = a.host.StartReplica(members, join, fsmFactory(a), a.replicaConfig)
if err == dragonboat.ErrShardAlreadyExist {
a.log.Infof("Shard already exists %+v (%v) %+v", members, join, a.replicaConfig)
err = nil
}
if err != nil {
err = fmt.Errorf(`startPrimeReplica: %w`, err)
return
}
err = a.index(a.ctx, a.replicaConfig.ShardID)
if err != nil {
return
}
return
}
// joinPrimeShard requests host be added to prime shard
func (a *Agent) joinPrimeShard() (replicaID uint64, err error) {
a.log.Debugf("Joining prime shard")
var res *internal.JoinResponse
for _, peerApiAddr := range a.peers {
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
res, err = a.grpcClientPool.get(peerApiAddr).Join(ctx, &internal.JoinRequest{
HostId: a.hostID(),
IsNonVoting: a.replicaConfig.IsNonVoting,
})
if res != nil && res.Value > 0 {
replicaID = res.Value
break
}
}
return
}
// updateHost adds host info to prime shard
func (a *Agent) updateHost() (err error) {
apiAddr, err := a.parseMeta(a.hostID())
if err != nil {
return
}
shardTypes := keys(a.shardTypes)
sort.Strings(shardTypes)
cmd := newCmdHostPut(a.hostID(), apiAddr, a.hostConfig.RaftAddress, a.hostTags, HostStatus_Active, shardTypes)
a.log.Debugf("Updating host: %s", string(cmd))
_, err = a.primePropose(cmd)
return
}
// updateReplica sets prime shard replica to active
func (a *Agent) updateReplica() (err error) {
_, err = a.primePropose(newCmdReplicaUpdateStatus(a.replicaConfig.ReplicaID, ReplicaStatus_Active))
if err != nil {
err = fmt.Errorf("Failed to update replica status: %w", err)
}
return
}
// index blocks until it can read from a shard, indicating that the local replica is up to date.
func (a *Agent) index(ctx context.Context, shardID uint64) (err error) {
var rs *dragonboat.RequestState
for {
rs, err = a.host.ReadIndex(shardID, raftTimeout)
if err != nil || rs == nil {
a.log.Infof(`[%05x] Error reading shard index: %s: %v`, shardID, a.hostID(), err)
select {
case <-ctx.Done():
return
case <-a.clock.After(waitPeriod):
}
continue
}
res := <-rs.ResultC()
rs.Release()
if !res.Completed() {
a.log.Infof(`[%05x] Waiting for other nodes`, shardID)
select {
case <-ctx.Done():
return
case <-a.clock.After(waitPeriod):
}
continue
}
break
}
return
}
func (a *Agent) joinPrimeReplica(hostID string, shardID uint64, isNonVoting bool) (replicaID uint64, err error) {
var ok bool
var host Host
a.State(a.ctx, func(s *State) {
host, ok = s.Host(hostID)
if !ok {
return
}
})
if host.ID == "" {
host, err = a.primeAddHost(hostID)
if err != nil {
return
}
}
a.State(a.ctx, func(s *State) {
s.ReplicaIterateByHostID(host.ID, func(r Replica) bool {
if r.ShardID == shardID {
replicaID = r.ID
return false
}
return true
})
})
if replicaID == 0 {
if replicaID, err = a.primeAddReplica(hostID, isNonVoting); err != nil {
return
}
}
return a.joinShardReplica(hostID, shardID, replicaID, isNonVoting)
}
func (a *Agent) joinShardReplica(hostID string, shardID, replicaID uint64, isNonVoting bool) (res uint64, err error) {
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
m, err := a.host.SyncGetShardMembership(ctx, shardID)
if err != nil {
return
}
ctx, cancel = context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
if isNonVoting {
if _, ok := m.NonVotings[replicaID]; ok {
return replicaID, nil
}
err = a.host.SyncRequestAddNonVoting(ctx, shardID, replicaID, hostID, m.ConfigChangeID)
} else {
if _, ok := m.Nodes[replicaID]; ok {
return replicaID, nil
}
err = a.host.SyncRequestAddReplica(ctx, shardID, replicaID, hostID, m.ConfigChangeID)
}
if err != nil {
return
}
return replicaID, nil
}
func (a *Agent) parseMeta(nhid string) (apiAddr string, err error) {
reg, ok := a.host.GetNodeHostRegistry()
if !ok {
err = fmt.Errorf("Unable to retrieve HostRegistry")
return
}
meta, ok := reg.GetMeta(nhid)
if !ok {
err = fmt.Errorf("Unable to retrieve node host meta (%s)", nhid)
return
}
parts := bytes.Split(meta, []byte(`|`))
apiAddr = string(parts[0])
return
}
func (a *Agent) primePropose(cmd []byte) (Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
return a.host.SyncPropose(ctx, a.host.GetNoOPSession(a.replicaConfig.ShardID), cmd)
}
// primeInit proposes addition of initial cluster state to prime shard
func (a *Agent) primeInit(members map[uint64]string) (err error) {
_, err = a.primePropose(newCmdShardPut(Shard{
ID: a.replicaConfig.ShardID,
Name: "zongzi",
Type: projectUri,
}))
if err != nil {
return
}
toAdd := make([]string, len(members))
for replicaID, nhid := range members {
_, err = a.primeAddHost(nhid)
if err != nil {
return
}
toAdd[replicaID-1] = nhid
}
for _, nhid := range toAdd {
if _, err = a.primeAddReplica(nhid, false); err != nil {
return
}
}
return
}
// primeInitAwait pauses non-initializers until prime shard is initialized
func (a *Agent) primeInitAwait() (err error) {
for {
var found bool
err = a.State(a.ctx, func(s *State) {
s.ReplicaIterateByHostID(a.hostID(), func(r Replica) bool {
found = true
return false
})
})
if err != nil || found {
break
}
a.clock.Sleep(100 * time.Millisecond)
}
return
}
// primeAddHost proposes addition of host metadata to the prime shard state
func (a *Agent) primeAddHost(nhid string) (host Host, err error) {
addr, err := a.parseMeta(nhid)
if err != nil {
return
}
cmd := newCmdHostPut(nhid, addr, "", nil, HostStatus_New, nil)
_, err = a.primePropose(cmd)
if err != nil {
return
}
host = Host{
ID: nhid,
Status: HostStatus_New,
}
return
}
// primeAddReplica proposes addition of replica metadata to the prime shard state
func (a *Agent) primeAddReplica(nhid string, isNonVoting bool) (id uint64, err error) {
res, err := a.primePropose(newCmdReplicaPost(nhid, a.replicaConfig.ShardID, isNonVoting))
if err == nil {
id = res.Value
}
return
}
// findLocalReplicaID proposes addition of replica metadata to the prime shard state
func (a *Agent) findLocalReplicaID(shardID uint64) (id uint64) {
nhInfo := a.host.GetNodeHostInfo(dragonboat.NodeHostInfoOption{})
for _, info := range nhInfo.LogInfo {
if info.ShardID == shardID {
return info.ReplicaID
}
}
return
}
func (a *Agent) dumpState() {
a.StateLocal(func(state *State) {
// Print snapshot
buf := bytes.NewBufferString("")
state.Save(buf)
a.log.Debugf(buf.String())
})
}
package zongzi
import (
"fmt"
"strings"
"github.com/lni/dragonboat/v4/config"
)
type AgentOption func(*Agent) error
func WithAddrApi(advertiseAddress string, bindAddress ...string) AgentOption {
return func(a *Agent) error {
a.advertiseAddress = advertiseAddress
if len(bindAddress) > 0 {
a.bindAddress = bindAddress[0]
} else {
a.bindAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(advertiseAddress, ":")[1])
}
return nil
}
}
func WithAddrGossip(advertiseAddress string, bindAddress ...string) AgentOption {
return func(a *Agent) error {
a.hostConfig.Gossip.AdvertiseAddress = advertiseAddress
if len(bindAddress) > 0 {
a.hostConfig.Gossip.BindAddress = bindAddress[0]
} else {
a.hostConfig.Gossip.BindAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(advertiseAddress, ":")[1])
}
return nil
}
}
func WithAddrRaft(raftAddress string, listenAddress ...string) AgentOption {
return func(a *Agent) error {
a.hostConfig.RaftAddress = raftAddress
if len(listenAddress) > 0 {
a.hostConfig.ListenAddress = listenAddress[0]
} else {
a.hostConfig.ListenAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(raftAddress, ":")[1])
}
return nil
}
}
func WithDirRaft(dir string) AgentOption {
return func(a *Agent) error {
a.hostConfig.NodeHostDir = dir
return nil
}
}
func WithDirWAL(dir string) AgentOption {
return func(a *Agent) error {
a.hostConfig.WALDir = dir
return nil
}
}
func WithHostConfig(cfg HostConfig) AgentOption {
return func(a *Agent) error {
if len(cfg.Gossip.AdvertiseAddress) == 0 && len(a.hostConfig.Gossip.AdvertiseAddress) > 0 {
cfg.Gossip.AdvertiseAddress = a.hostConfig.Gossip.AdvertiseAddress
}
if len(cfg.Gossip.BindAddress) == 0 && len(a.hostConfig.Gossip.BindAddress) > 0 {
cfg.Gossip.BindAddress = a.hostConfig.Gossip.BindAddress
}
if len(cfg.RaftAddress) == 0 && len(a.hostConfig.RaftAddress) > 0 {
cfg.RaftAddress = a.hostConfig.RaftAddress
}
cfg.Expert.LogDBFactory = DefaultHostConfig.Expert.LogDBFactory
cfg.Expert.LogDB = DefaultHostConfig.Expert.LogDB
a.hostConfig = cfg
return nil
}
}
type HostMemory = LogDBConfig
// WithHostMemory sets the maximum memory alloted to the raft log
//
// zongzi.WithHostMemory(zongzi.HostMemory256)
func WithHostMemoryLimit(limit HostMemory) AgentOption {
return func(a *Agent) error {
a.hostConfig.Expert.LogDB = limit
return nil
}
}
var (
// HostMemory256 can be used to set max log memory usage to 256 MB
HostMemory256 = config.GetTinyMemLogDBConfig()
// HostMem1024 can be used to set max log memory usage to 1 GB
HostMemory1024 = config.GetSmallMemLogDBConfig()
// HostMem4096 can be used to set max log memory usage to 4 GB
HostMemory4096 = config.GetMediumMemLogDBConfig()
// HostMem8192 can be used to set max log memory usage to 8 GB (default)
HostMemory8192 = config.GetLargeMemLogDBConfig()
)
func WithRaftEventListener(listener RaftEventListener) AgentOption {
return func(a *Agent) error {
a.hostConfig.RaftEventListener = listener
return nil
}
}
func WithSystemEventListener(listener SystemEventListener) AgentOption {
return func(a *Agent) error {
a.hostConfig.SystemEventListener = listener
return nil
}
}
func WithHostTags(tags ...string) AgentOption {
return func(a *Agent) error {
a.hostTags = tags
return nil
}
}
func WithReplicaConfig(cfg ReplicaConfig) AgentOption {
return func(a *Agent) error {
a.replicaConfig = cfg
return nil
}
}
func WithShardController(c Controller) AgentOption {
return func(a *Agent) error {
a.controllerManager.controller = c
return nil
}
}
package zongzi
import (
"encoding/json"
"fmt"
"io"
"github.com/lni/dragonboat/v4/logger"
"github.com/lni/dragonboat/v4/statemachine"
)
func fsmFactory(agent *Agent) statemachine.CreateStateMachineFunc {
return func(shardID, replicaID uint64) statemachine.IStateMachine {
node := &fsm{
log: agent.log,
replicaID: replicaID,
shardID: shardID,
state: newFsmStateRadix(),
}
agent.fsm = node
return node
}
}
var _ statemachine.IStateMachine = (*fsm)(nil)
type fsm struct {
log logger.ILogger
replicaID uint64
shardID uint64
state *State
}
func (fsm *fsm) Update(entry Entry) (Result, error) {
var err error
var cmd any
var cmdBase command
if err = json.Unmarshal(entry.Cmd, &cmdBase); err != nil {
fsm.log.Errorf("Invalid entry %d - %v, %v", entry.Index, string(entry.Cmd), err)
return entry.Result, nil
}
switch cmdBase.Type {
case command_type_host:
var cmdHost commandHost
if err = json.Unmarshal(entry.Cmd, &cmdHost); err != nil {
fsm.log.Errorf("Invalid host cmd %#v, %v", entry, err)
break
}
cmd = cmdHost
case command_type_shard:
var cmdShard commandShard
if err = json.Unmarshal(entry.Cmd, &cmdShard); err != nil {
fsm.log.Errorf("Invalid shard cmd %#v, %v", entry, err)
break
}
cmd = cmdShard
case command_type_replica:
var cmdReplica commandReplica
if err = json.Unmarshal(entry.Cmd, &cmdReplica); err != nil {
fsm.log.Errorf("Invalid replica cmd %#v, %v", entry, err)
break
}
cmd = cmdReplica
default:
fsm.log.Errorf("Unrecognized cmd type %s", cmdBase.Type, cmdBase)
}
state := fsm.state.withTxn(true)
defer state.commit()
// fsm.log.Debugf(`Update: %d %d %s`, entry.Index, state.Index(), string(entry.Cmd))
switch cmd := cmd.(type) {
// Host
case commandHost:
switch cmd.Action {
// Put
case command_action_put:
if old, ok := state.Host(cmd.Host.ID); ok {
cmd.Host.Created = old.Created
fsm.tagsSetNX(cmd.Host.Tags, old.Tags)
} else {
cmd.Host.Created = entry.Index
state.setHostID(cmd.Host.ID)
}
cmd.Host.Updated = entry.Index
state.ReplicaIterateByHostID(cmd.Host.ID, func(r Replica) bool {
state.shardTouch(r.ShardID, entry.Index)
return true
})
state.hostPut(cmd.Host)
entry.Result.Value = 1
// Delete
case command_action_del:
host, ok := state.Host(cmd.Host.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrHostNotFound, cmd)
break
}
state.ReplicaIterateByHostID(host.ID, func(r Replica) bool {
state.replicaDelete(r)
state.shardTouch(r.ShardID, entry.Index)
return true
})
state.hostDelete(host)
entry.Result.Value = 1
// Tags
case command_action_tags_set:
entry.Result.Value = fsm.hostTagAction(fsm.tagsSet, state, entry, cmd)
case command_action_tags_setnx:
entry.Result.Value = fsm.hostTagAction(fsm.tagsSetNX, state, entry, cmd)
case command_action_tags_remove:
entry.Result.Value = fsm.hostTagAction(fsm.tagsRemove, state, entry, cmd)
default:
fsm.log.Errorf("Unrecognized host action: %s - %#v", cmd.Action, cmd)
}
// Shard
case commandShard:
switch cmd.Action {
// Post
case command_action_post:
if cmd.Shard.Name != "" {
if _, ok := state.ShardFindByName(cmd.Shard.Name); ok {
err := fmt.Errorf("%s: %s", ErrShardExists, cmd.Shard.Name)
fsm.log.Errorf(err.Error())
entry.Result.Data = []byte(err.Error())
break
}
}
cmd.Shard.ID = state.shardIncr()
cmd.Shard.Created = entry.Index
cmd.Shard.Updated = entry.Index
state.shardPut(cmd.Shard)
entry.Result.Value = cmd.Shard.ID
// Put
case command_action_put:
if old, ok := state.Shard(cmd.Shard.ID); ok {
cmd.Shard.Created = old.Created
fsm.tagsSetNX(cmd.Shard.Tags, old.Tags)
} else if cmd.Shard.ID == 0 && state.ShardID() == 0 {
// Special case for prime shard (0)
cmd.Shard.Created = entry.Index
} else {
fsm.log.Errorf("%s: %s - %#v", ErrShardNotFound, cmd.Action, cmd)
break
}
cmd.Shard.Updated = entry.Index
state.shardPut(cmd.Shard)
state.ReplicaIterateByShardID(cmd.Shard.ID, func(r Replica) bool {
state.hostTouch(r.HostID, entry.Index)
return true
})
entry.Result.Value = cmd.Shard.ID
// Status Update
case command_action_status_update:
shard, ok := state.Shard(cmd.Shard.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd)
break
}
shard.Status = cmd.Shard.Status
shard.Updated = entry.Index
state.shardPut(shard)
state.ReplicaIterateByShardID(shard.ID, func(r Replica) bool {
state.hostTouch(r.HostID, entry.Index)
return true
})
entry.Result.Value = 1
// Set Leader
case command_action_leader_set:
shard, ok := state.Shard(cmd.Shard.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd)
break
}
shard.Leader = cmd.Shard.Leader
shard.Term = cmd.Shard.Term
shard.Updated = entry.Index
state.shardPut(shard)
entry.Result.Value = 1
// Delete
case command_action_del:
shard, ok := state.Shard(cmd.Shard.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd)
break
}
state.ReplicaIterateByShardID(shard.ID, func(r Replica) bool {
state.replicaDelete(r)
state.hostTouch(r.HostID, entry.Index)
return true
})
state.shardDelete(shard)
entry.Result.Value = 1
// Tags
case command_action_tags_set:
entry.Result.Value = fsm.shardTagAction(fsm.tagsSet, state, entry, cmd)
case command_action_tags_setnx:
entry.Result.Value = fsm.shardTagAction(fsm.tagsSetNX, state, entry, cmd)
case command_action_tags_remove:
entry.Result.Value = fsm.shardTagAction(fsm.tagsRemove, state, entry, cmd)
default:
fsm.log.Errorf("Unrecognized shard action: %s - %#v", cmd.Action, cmd)
}
// Replica
case commandReplica:
switch cmd.Action {
// Post
case command_action_post:
cmd.Replica.ID = state.replicaIncr()
fsm.log.Infof(`command_action_post - %d - %s - %v`, cmd.Replica.ID, cmd.Replica.HostID, cmd.Replica.IsNonVoting)
cmd.Replica.Created = entry.Index
cmd.Replica.Updated = entry.Index
if !cmd.Replica.IsNonVoting && len(state.ShardMembers(cmd.Replica.ShardID)) < 3 {
cmd.Replica.Status = ReplicaStatus_Bootstrapping
} else {
cmd.Replica.Status = ReplicaStatus_Joining
}
state.replicaPut(cmd.Replica)
state.hostTouch(cmd.Replica.HostID, entry.Index)
state.shardTouch(cmd.Replica.ShardID, entry.Index)
entry.Result.Value = cmd.Replica.ID
// Status Update
case command_action_status_update:
replica, ok := state.Replica(cmd.Replica.ID)
if !ok {
fsm.log.Warningf("%v: %#v %#v", ErrReplicaNotFound, cmd, replica)
break
}
replica.Status = cmd.Replica.Status
state.replicaPut(replica)
state.hostTouch(replica.HostID, entry.Index)
state.shardTouch(replica.ShardID, entry.Index)
entry.Result.Value = 1
// Put
case command_action_put:
if old, ok := state.Replica(cmd.Replica.ID); ok {
cmd.Replica.Created = old.Created
fsm.tagsSetNX(cmd.Replica.Tags, old.Tags)
} else {
fsm.log.Errorf("%s: %s - %#v", ErrReplicaNotFound, cmd.Action, cmd)
break
}
cmd.Replica.Updated = entry.Index
state.replicaPut(cmd.Replica)
state.hostTouch(cmd.Replica.HostID, entry.Index)
state.shardTouch(cmd.Replica.ShardID, entry.Index)
entry.Result.Value = 1
// Delete
case command_action_del:
replica, ok := state.Replica(cmd.Replica.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrReplicaNotFound, cmd)
break
}
state.hostTouch(replica.HostID, entry.Index)
state.shardTouch(replica.ShardID, entry.Index)
state.replicaDelete(replica)
entry.Result.Value = 1
// Tags
case command_action_tags_set:
entry.Result.Value = fsm.replicaTagAction(fsm.tagsSet, state, entry, cmd)
case command_action_tags_setnx:
entry.Result.Value = fsm.replicaTagAction(fsm.tagsSetNX, state, entry, cmd)
case command_action_tags_remove:
entry.Result.Value = fsm.replicaTagAction(fsm.tagsRemove, state, entry, cmd)
default:
fsm.log.Errorf("Unrecognized replica action: %s - %#v", cmd.Action, cmd)
}
}
state.metaSetIndex(entry.Index)
return entry.Result, nil
}
// func (fsm *fsm) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) (err error) {
func (fsm *fsm) SaveSnapshot(w io.Writer, _ statemachine.ISnapshotFileCollection, _ <-chan struct{}) error {
return fsm.state.Save(w)
}
// func (fsm *fsm) RecoverFromSnapshot(r io.Reader, close <-chan struct{}) (err error) {
func (fsm *fsm) RecoverFromSnapshot(r io.Reader, _ []statemachine.SnapshotFile, _ <-chan struct{}) error {
return fsm.state.recover(r)
}
func (fsm *fsm) Lookup(interface{}) (res interface{}, err error) { return }
func (fsm *fsm) Close() (err error) { return }
func (fsm *fsm) hostTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandHost) (val uint64) {
host, ok := state.Host(cmd.Host.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrHostNotFound, cmd)
return
}
if fn(host.Tags, cmd.Host.Tags) {
state.ReplicaIterateByHostID(cmd.Host.ID, func(r Replica) bool {
state.shardTouch(r.ShardID, entry.Index)
return true
})
state.hostPut(host)
val = 1
}
return
}
func (fsm *fsm) shardTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandShard) (val uint64) {
shard, ok := state.Shard(cmd.Shard.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd)
return
}
if fn(shard.Tags, cmd.Shard.Tags) {
state.ReplicaIterateByShardID(cmd.Shard.ID, func(r Replica) bool {
state.hostTouch(r.HostID, entry.Index)
return true
})
state.shardPut(shard)
val = 1
}
return
}
func (fsm *fsm) replicaTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandReplica) (val uint64) {
replica, ok := state.Replica(cmd.Replica.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrReplicaNotFound, cmd)
return
}
if fn(replica.Tags, cmd.Replica.Tags) {
state.hostTouch(replica.HostID, entry.Index)
state.shardTouch(replica.ShardID, entry.Index)
state.replicaPut(replica)
val = 1
}
return
}
func (fsm *fsm) tagsSet(new, old map[string]string) (written bool) {
for k, v := range old {
new[k] = v
written = true
}
return
}
func (fsm *fsm) tagsSetNX(new, old map[string]string) (written bool) {
for k, v := range old {
if _, ok := new[k]; !ok {
new[k] = v
written = true
}
}
return
}
func (fsm *fsm) tagsRemove(new, old map[string]string) (written bool) {
for k := range old {
delete(new, k)
written = true
}
return
}
package zongzi
import (
"encoding/json"
"strings"
)
const (
command_type_host = "host"
command_type_replica = "replica"
command_type_shard = "shard"
command_type_snapshot = "snapshot"
command_action_del = "del"
command_action_put = "put"
command_action_post = "post"
command_action_status_update = "status-update"
command_action_tags_set = "tags-set"
command_action_tags_setnx = "tags-setnx"
command_action_tags_remove = "tags-remove"
command_action_leader_set = "leader-set"
)
type command struct {
Action string `json:"action"`
Type string `json:"type"`
}
type commandHost struct {
command
Host Host `json:"host"`
}
type commandShard struct {
command
Shard Shard `json:"shard"`
}
type commandReplica struct {
command
Replica Replica `json:"replica"`
}
func newCmdHostPut(nhid, apiAddr, raftAddr string, tagList []string, status HostStatus, shardTypes []string) (b []byte) {
b, _ = json.Marshal(commandHost{command{
Action: command_action_put,
Type: command_type_host,
}, Host{
ApiAddress: apiAddr,
ID: nhid,
Tags: tagMapFromList(tagList),
RaftAddress: raftAddr,
ShardTypes: shardTypes,
Status: status,
}})
return
}
func newCmdHostDel(nhid string) (b []byte) {
b, _ = json.Marshal(commandHost{command{
Action: command_action_del,
Type: command_type_host,
}, Host{
ID: nhid,
}})
return
}
func newCmdShardPost(s Shard) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_post,
Type: command_type_shard,
}, s})
return
}
func newCmdShardPut(s Shard) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_put,
Type: command_type_shard,
}, s})
return
}
func newCmdShardStatusUpdate(id uint64, status ShardStatus) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_status_update,
Type: command_type_shard,
}, Shard{
ID: id,
Status: status,
}})
return
}
func newCmdShardLeaderSet(shardID, replicaID, term uint64) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_leader_set,
Type: command_type_shard,
}, Shard{
ID: shardID,
Leader: replicaID,
Term: term,
}})
return
}
func newCmdShardDel(shardID uint64) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_del,
Type: command_type_shard,
}, Shard{
ID: shardID,
}})
return
}
func newCmdReplicaPost(nhid string, shardID uint64, isNonVoting bool) (b []byte) {
b, _ = json.Marshal(commandReplica{command{
Action: command_action_post,
Type: command_type_replica,
}, Replica{
HostID: nhid,
IsNonVoting: isNonVoting,
ShardID: shardID,
Status: ReplicaStatus_New,
}})
return
}
func newCmdReplicaPut(nhid string, shardID, replicaID uint64, isNonVoting bool) (b []byte) {
b, _ = json.Marshal(commandReplica{command{
Action: command_action_put,
Type: command_type_replica,
}, Replica{
HostID: nhid,
ID: replicaID,
ShardID: shardID,
IsNonVoting: isNonVoting,
}})
return
}
func newCmdReplicaDelete(replicaID uint64) (b []byte) {
b, _ = json.Marshal(commandReplica{command{
Action: command_action_del,
Type: command_type_replica,
}, Replica{
ID: replicaID,
}})
return
}
func newCmdReplicaUpdateStatus(replicaID uint64, status ReplicaStatus) (b []byte) {
b, _ = json.Marshal(commandReplica{command{
Action: command_action_status_update,
Type: command_type_replica,
}, Replica{
ID: replicaID,
Status: status,
}})
return
}
func newCmdTagsSetNX(subject any, tagList ...string) []byte {
return newCmdTags(command_action_tags_setnx, subject, tagList)
}
func newCmdTagsSet(subject any, tagList ...string) []byte {
return newCmdTags(command_action_tags_set, subject, tagList)
}
func newCmdTagsRemove(subject any, tagList ...string) []byte {
return newCmdTags(command_action_tags_remove, subject, tagList)
}
func newCmdTags(action string, subject any, tagList []string) (b []byte) {
switch subject.(type) {
case Host:
b, _ = json.Marshal(commandHost{command{
Action: action,
Type: command_type_host,
}, Host{
ID: subject.(Host).ID,
Tags: tagMapFromList(tagList),
}})
case Shard:
b, _ = json.Marshal(commandShard{command{
Action: action,
Type: command_type_shard,
}, Shard{
ID: subject.(Shard).ID,
Tags: tagMapFromList(tagList),
}})
case Replica:
b, _ = json.Marshal(commandReplica{command{
Action: action,
Type: command_type_replica,
}, Replica{
ID: subject.(Replica).ID,
Tags: tagMapFromList(tagList),
}})
}
return
}
// tagMapFromList converts a list of tags to a map of key (namespace:predicate) and value
//
// ["geo:region=us-west-1"] -> {"geo:region": "us-west-1"}
func tagMapFromList(tagList []string) map[string]string {
var m = map[string]string{}
for _, ts := range tagList {
if i := strings.Index(ts, "="); i >= 0 {
m[ts[:i]] = ts[i+1:]
} else {
m[ts] = ""
}
}
return m
}
package zongzi
import (
"encoding/json"
"fmt"
"io"
"github.com/hashicorp/go-memdb"
)
// State represents internal state.
// All public methods are read only.
type State struct {
db *memdb.MemDB
txn *memdb.Txn
}
func newFsmStateRadix() *State {
metaSchema := &memdb.TableSchema{
Name: "meta",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Key"},
},
},
}
hostSchema := &memdb.TableSchema{
Name: "host",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.UUIDFieldIndex{Field: "ID"},
},
"RaftAddress": {
Name: "RaftAddress",
Unique: false,
AllowMissing: true,
Indexer: &memdb.StringFieldIndex{Field: "RaftAddress"},
},
"Tags": {
Name: "Tags",
Unique: false,
AllowMissing: true,
Indexer: &memdb.StringMapFieldIndex{Field: "Tags"},
},
"Updated": {
Name: "Updated",
Unique: false,
AllowMissing: true,
Indexer: &memdb.UintFieldIndex{Field: "Updated"},
},
},
}
shardSchema := &memdb.TableSchema{
Name: "shard",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.UintFieldIndex{Field: "ID"},
},
"Name": {
Name: "Name",
Unique: true,
AllowMissing: true,
Indexer: &memdb.StringFieldIndex{Field: "Name"},
},
"Tags": {
Name: "Tags",
Unique: false,
AllowMissing: true,
Indexer: &memdb.StringMapFieldIndex{Field: "Tags"},
},
"Updated": {
Name: "Updated",
Unique: false,
AllowMissing: true,
Indexer: &memdb.UintFieldIndex{Field: "Updated"},
},
},
}
replicaSchema := &memdb.TableSchema{
Name: "replica",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.UintFieldIndex{Field: "ID"},
},
"HostID": {
Name: "HostID",
Unique: false,
Indexer: &memdb.UUIDFieldIndex{Field: "HostID"},
},
"ShardID": {
Name: "ShardID",
Unique: false,
Indexer: &memdb.UintFieldIndex{Field: "ShardID"},
},
"Tags": {
Name: "Tags",
Unique: false,
AllowMissing: true,
Indexer: &memdb.StringMapFieldIndex{Field: "Tags"},
},
},
}
db, err := memdb.NewMemDB(&memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"meta": metaSchema,
"host": hostSchema,
"shard": shardSchema,
"replica": replicaSchema,
},
})
if err != nil {
panic(err)
}
return &State{
db: db,
}
}
type metaValue struct {
Key string
Val uint64
Data []byte
}
func (fsm *State) withTxn(write bool) *State {
return &State{fsm.db, fsm.db.Txn(write)}
}
func (fsm *State) commit() {
fsm.txn.Commit()
}
func (fsm *State) rollback() {
fsm.txn.Abort()
}
func (fsm *State) metaSet(key string, val uint64, data []byte) {
err := fsm.txn.Insert(`meta`, metaValue{key, val, data})
if err != nil {
panic(err)
}
}
func (fsm *State) metaSetIndex(val uint64) {
err := fsm.txn.Insert(`meta`, metaValue{`index`, val, nil})
if err != nil {
panic(err)
}
}
func (fsm *State) metaGet(key string) (val uint64, data []byte) {
res, _ := fsm.txn.First(`meta`, `id`, key)
if res != nil {
val = res.(metaValue).Val
data = res.(metaValue).Data
}
return
}
func (fsm *State) metaGetVal(key string) (val uint64) {
val, _ = fsm.metaGet(key)
return
}
func (fsm *State) metaGetDataString(key string) string {
_, b := fsm.metaGet(key)
return string(b)
}
func (fsm *State) Index() (val uint64) {
return fsm.metaGetVal(`index`)
}
func (fsm *State) setHostID(id string) {
fsm.metaSet(`hostID`, 0, []byte(id))
}
// HostID returns the ID of the last created Host
func (fsm *State) HostID() string {
return fsm.metaGetDataString(`hostID`)
}
func (fsm *State) shardIncr() uint64 {
id := fsm.metaGetVal(`shardID`)
id++
fsm.metaSet(`shardID`, id, nil)
return id
}
// ShardID returns the ID of the last created Shard
func (fsm *State) ShardID() uint64 {
return fsm.metaGetVal(`shardID`)
}
func (fsm *State) replicaIncr() uint64 {
id := fsm.metaGetVal(`replicaID`)
id++
fsm.metaSet(`replicaID`, id, nil)
return id
}
// ReplicaID returns the ID of the last created Replica
func (fsm *State) ReplicaID() uint64 {
return fsm.metaGetVal(`replicaID`)
}
// Host returns the host with the specified ID or ok false if not found.
func (fsm *State) Host(id string) (h Host, ok bool) {
res, err := fsm.txn.First(`host`, `id`, id)
if err != nil {
panic(err)
}
if res != nil {
h = res.(Host)
ok = true
}
return
}
// HostIterate executes a callback for every host in the cluster.
// Return true to continue iterating, false to stop.
//
// var hostCount int
// agent.Read(func(s *zongzi.State) {
// s.HostIterate(func(h Host) bool {
// hostCount++
// return true
// })
// })
func (fsm *State) HostIterate(fn func(h Host) bool) {
iter, err := fsm.txn.Get(`host`, `id_prefix`, "")
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Host)) {
break
}
}
}
// HostIterateByShardType executes a callback for every host in the cluster supporting to the provided shard type,
// ordered by host id ascending. Return true to continue iterating, false to stop.
func (fsm *State) HostIterateByShardType(shardType string, fn func(h Host) bool) {
iter, err := fsm.txn.Get(`host`, `ShardTypes`, shardType)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Host)) {
break
}
}
}
// HostIterateByTag executes a callback for every host in the cluster matching the specified tag,
// ordered by host id ascending. Return true to continue iterating, false to stop.
func (fsm *State) HostIterateByTag(tag string, fn func(h Host) bool) {
iter, err := fsm.txn.Get(`host`, `Tags`, tag)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Host)) {
break
}
}
}
func (fsm *State) hostPut(h Host) {
err := fsm.txn.Insert(`host`, h)
if err != nil {
panic(err)
}
}
func (fsm *State) hostDelete(h Host) {
err := fsm.txn.Delete(`host`, h)
if err != nil && err != memdb.ErrNotFound {
panic(err)
}
}
func (fsm *State) hostTouch(id string, index uint64) {
h, ok := fsm.Host(id)
if !ok {
fsm.HostIterate(func(h Host) bool {
fmt.Println(h.ID, h.Updated)
return true
})
panic(`Host not found "` + id + fmt.Sprintf(`" %#v`, h))
}
h.Updated = index
fsm.hostPut(h)
}
func (fsm *State) hostByRaftAddress(raftAddress string) (h Host, ok bool) {
res, err := fsm.txn.First(`host`, `RaftAddress`, raftAddress)
if err != nil {
panic(err)
}
if res != nil {
h = res.(Host)
ok = true
}
return
}
// Shard returns the shard with the specified id or ok false if not found.
func (fsm *State) Shard(id uint64) (s Shard, ok bool) {
res, err := fsm.txn.First(`shard`, `id`, id)
if err != nil {
panic(err)
}
if res != nil {
s = res.(Shard)
ok = true
}
return
}
// ShardFindByName returns the shard with the specified name or ok false if not found.
func (fsm *State) ShardFindByName(name string) (s Shard, ok bool) {
res, err := fsm.txn.First(`shard`, `Name`, name)
if err != nil {
panic(err)
}
if res != nil {
s = res.(Shard)
ok = true
}
return
}
func (fsm *State) shardPut(s Shard) {
err := fsm.txn.Insert(`shard`, s)
if err != nil {
panic(err)
}
}
func (fsm *State) shardDelete(s Shard) {
err := fsm.txn.Delete(`shard`, s)
if err != nil && err != memdb.ErrNotFound {
panic(err)
}
}
func (fsm *State) shardTouch(id, index uint64) {
if s, ok := fsm.Shard(id); ok {
s.Updated = index
fsm.shardPut(s)
}
}
// ShardIterate executes a callback for every shard in the cluster ordered by shard id ascending.
// Return true to continue iterating, false to stop.
func (fsm *State) ShardIterate(fn func(s Shard) bool) {
iter, err := fsm.txn.LowerBound(`shard`, `id`, uint64(0))
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Shard)) {
break
}
}
}
// ShardIterateByTag executes a callback for every shard in the cluster matching the specified tag,
// ordered by shard id ascending. Return true to continue iterating, false to stop.
func (fsm *State) ShardIterateByTag(tag string, fn func(r Shard) bool) {
iter, err := fsm.txn.Get(`shard`, `Tags`, tag)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Shard)) {
break
}
}
}
// ShardIterateUpdatedAfter executes a callback for every shard in the cluster having an updated index
// greater than the supplied index
func (fsm *State) ShardIterateUpdatedAfter(index uint64, fn func(r Shard) bool) {
iter, err := fsm.txn.LowerBound(`shard`, `Updated`, index+1)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Shard)) {
break
}
}
}
// ShardMembers returns a map of shard members (replicaID: hostID)
func (fsm *State) ShardMembers(id uint64) map[uint64]string {
members := map[uint64]string{}
fsm.ReplicaIterateByShardID(id, func(r Replica) bool {
if !r.IsNonVoting {
members[r.ID] = r.HostID
}
return true
})
return members
}
// Replica returns the replica with the specified id or ok false if not found.
func (fsm *State) Replica(id uint64) (r Replica, ok bool) {
res, err := fsm.txn.First(`replica`, `id`, id)
if err != nil {
panic(err)
}
if res != nil {
r = res.(Replica)
ok = true
}
return
}
func (fsm *State) replicaPut(r Replica) {
err := fsm.txn.Insert(`replica`, r)
if err != nil {
panic(err)
}
}
func (fsm *State) replicaDelete(r Replica) {
err := fsm.txn.Delete(`replica`, r)
if err != nil && err != memdb.ErrNotFound {
panic(err)
}
}
func (fsm *State) replicaTouch(id, index uint64) {
if r, ok := fsm.Replica(id); ok {
r.Updated = index
fsm.replicaPut(r)
}
}
// ReplicaIterate executes a callback for every replica in the cluster ordered by replica id ascending.
// Return true to continue iterating, false to stop.
func (fsm *State) ReplicaIterate(fn func(r Replica) bool) {
iter, err := fsm.txn.LowerBound(`replica`, `id`, uint64(0))
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Replica)) {
break
}
}
}
// ReplicaIterateByShardID executes a callback for every replica in the cluster belonging to the provided shard id,
// ordered by replica id ascending. Return true to continue iterating, false to stop.
func (fsm *State) ReplicaIterateByShardID(shardID uint64, fn func(r Replica) bool) {
iter, err := fsm.txn.Get(`replica`, `ShardID`, shardID)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Replica)) {
break
}
}
}
// ReplicaIterateByHostID executes a callback for every replica in the cluster belonging to the provided host id,
// ordered by replica id ascending. Return true to continue iterating, false to stop.
func (fsm *State) ReplicaIterateByHostID(hostID string, fn func(r Replica) bool) {
iter, err := fsm.txn.Get(`replica`, `HostID`, hostID)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Replica)) {
break
}
}
}
// ReplicaIterateByTag executes a callback for every host in the cluster matching the specified tag,
// ordered by host id ascending. Return true to continue iterating, false to stop.
func (fsm *State) ReplicaIterateByTag(tag string, fn func(r Replica) bool) {
iter, err := fsm.txn.Get(`replica`, `Tags`, tag)
if err != nil {
panic(err)
}
for {
res := iter.Next()
if res == nil || !fn(res.(Replica)) {
break
}
}
}
type fsmStateMetaHeader struct {
Index uint64 `json:"index"`
HostID string `json:"hostID"`
Hosts uint64 `json:"hosts"`
ReplicaID uint64 `json:"replicaID"`
Replicas uint64 `json:"replicas"`
ShardID uint64 `json:"shardID"`
Shards uint64 `json:"shards"`
}
func (fsm *State) Save(w io.Writer) error {
f := fsm.withTxn(false)
header := fsmStateMetaHeader{
Index: f.metaGetVal(`index`),
HostID: f.metaGetDataString(`hostID`),
ShardID: f.metaGetVal(`shardID`),
ReplicaID: f.metaGetVal(`replicaID`),
}
f.HostIterate(func(Host) bool {
header.Hosts++
return true
})
f.ShardIterate(func(Shard) bool {
header.Shards++
return true
})
f.ReplicaIterate(func(Replica) bool {
header.Replicas++
return true
})
b, _ := json.Marshal(header)
w.Write(append(b, '\n'))
f.HostIterate(func(h Host) bool {
b, _ := json.Marshal(h)
w.Write(append(b, '\n'))
return true
})
f.ShardIterate(func(s Shard) bool {
b, _ := json.Marshal(s)
w.Write(append(b, '\n'))
return true
})
f.ReplicaIterate(func(r Replica) bool {
b, _ := json.Marshal(r)
w.Write(append(b, '\n'))
return true
})
return nil
}
func (fsm *State) recover(r io.Reader) (err error) {
f := fsm.withTxn(true)
defer f.commit()
decoder := json.NewDecoder(r)
var header fsmStateMetaHeader
if err := decoder.Decode(&header); err != nil {
return err
}
f.metaSet(`index`, header.Index, nil)
f.metaSet(`hostID`, 0, []byte(header.HostID))
f.metaSet(`shardID`, header.ShardID, nil)
f.metaSet(`replicaID`, header.ReplicaID, nil)
var i uint64
for i = 0; i < header.Hosts; i++ {
var h Host
if err = decoder.Decode(&h); err != nil {
return fmt.Errorf("parse host: %w", err)
}
f.hostPut(h)
}
for i = 0; i < header.Shards; i++ {
var s Shard
if err = decoder.Decode(&s); err != nil {
return fmt.Errorf("parse shard: %w", err)
}
f.shardPut(s)
}
for i = 0; i < header.Replicas; i++ {
var r Replica
if err = decoder.Decode(&r); err != nil {
return fmt.Errorf("parse replica: %w", err)
}
f.replicaPut(r)
}
return nil
}
package zongzi
import (
"context"
"google.golang.org/grpc"
"github.com/logbn/zongzi/internal"
)
type grpcClientErr struct {
err error
}
func (c *grpcClientErr) Ping(ctx context.Context, in *internal.PingRequest, opts ...grpc.CallOption) (*internal.PingResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Probe(ctx context.Context, in *internal.ProbeRequest, opts ...grpc.CallOption) (*internal.ProbeResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Info(ctx context.Context, in *internal.InfoRequest, opts ...grpc.CallOption) (*internal.InfoResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Members(ctx context.Context, in *internal.MembersRequest, opts ...grpc.CallOption) (*internal.MembersResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Join(ctx context.Context, in *internal.JoinRequest, opts ...grpc.CallOption) (*internal.JoinResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Add(ctx context.Context, in *internal.AddRequest, opts ...grpc.CallOption) (*internal.AddResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Index(ctx context.Context, in *internal.IndexRequest, opts ...grpc.CallOption) (*internal.IndexResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Commit(ctx context.Context, in *internal.CommitRequest, opts ...grpc.CallOption) (*internal.CommitResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Apply(ctx context.Context, in *internal.ApplyRequest, opts ...grpc.CallOption) (*internal.ApplyResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Read(ctx context.Context, in *internal.ReadRequest, opts ...grpc.CallOption) (*internal.ReadResponse, error) {
return nil, c.err
}
func (c *grpcClientErr) Watch(ctx context.Context, in *internal.WatchRequest, opts ...grpc.CallOption) (internal.Internal_WatchClient, error) {
return nil, c.err
}
package zongzi
import (
"github.com/hashicorp/golang-lru/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/logbn/zongzi/internal"
)
type grpcClientPool struct {
clients *lru.Cache[string, grpcClientPoolEntry]
dialOpts []grpc.DialOption
}
type grpcClientPoolEntry struct {
client internal.InternalClient
conn *grpc.ClientConn
}
func grpcClientPoolEvictFunc(addr string, e grpcClientPoolEntry) {
e.conn.Close()
}
func newGrpcClientPool(size int, dialOpts ...grpc.DialOption) *grpcClientPool {
clients, _ := lru.NewWithEvict[string, grpcClientPoolEntry](size, grpcClientPoolEvictFunc)
return &grpcClientPool{
clients: clients,
dialOpts: dialOpts,
}
}
func (c *grpcClientPool) get(addr string) (client internal.InternalClient) {
e, ok := c.clients.Get(addr)
if ok {
return e.client
}
// https://github.com/grpc/grpc-go/tree/master/examples/features/authentication
// opts = append(opts, grpc.WithPerRPCCredentials(perRPC))
conn, err := grpc.Dial(addr, append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}, c.dialOpts...)...)
if err != nil {
return &grpcClientErr{err}
}
client = internal.NewInternalClient(conn)
c.clients.Add(addr, grpcClientPoolEntry{client, conn})
return
}
func (c *grpcClientPool) remove(addr string) bool {
return c.clients.Remove(addr)
}
func (c *grpcClientPool) Close() {
c.clients.Purge()
}
package zongzi
import (
"context"
"net"
"sync"
"time"
"google.golang.org/grpc"
"github.com/logbn/zongzi/internal"
)
type grpcServer struct {
internal.UnimplementedInternalServer
agent *Agent
server *grpc.Server
listenAddr string
serverOpts []grpc.ServerOption
}
func newGrpcServer(listenAddr string, opts ...grpc.ServerOption) *grpcServer {
return &grpcServer{
listenAddr: listenAddr,
serverOpts: opts,
}
}
func (s *grpcServer) Start(a *Agent) error {
// a.log.Errorf("Starting gRPC server on %s", s.listenAddr)
s.agent = a
lis, err := net.Listen("tcp", s.listenAddr)
if err != nil {
return err
}
// https://github.com/grpc/grpc-go/tree/master/examples/features/authentication
// opts = append(opts, grpc.UnaryInterceptor(ensureValidToken))
s.server = grpc.NewServer(s.serverOpts...)
internal.RegisterInternalServer(s.server, s)
var done = make(chan bool)
go func() {
err = s.server.Serve(lis)
close(done)
}()
select {
case <-a.ctx.Done():
case <-done:
}
return err
}
func (s *grpcServer) Stop() {
if s.server != nil {
var ch = make(chan bool)
go func() {
s.server.GracefulStop()
close(ch)
}()
select {
case <-ch:
case <-time.After(5 * time.Second):
s.server.Stop()
}
}
}
func (s *grpcServer) Ping(ctx context.Context,
req *internal.PingRequest,
) (resp *internal.PingResponse, err error) {
return &internal.PingResponse{}, nil
}
func (s *grpcServer) Probe(ctx context.Context,
req *internal.ProbeRequest,
) (resp *internal.ProbeResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Probe: %#v`, req)
return &internal.ProbeResponse{
GossipAdvertiseAddress: s.agent.hostConfig.Gossip.AdvertiseAddress,
}, nil
}
func (s *grpcServer) Info(ctx context.Context,
req *internal.InfoRequest,
) (resp *internal.InfoResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Info: %#v`, req)
return &internal.InfoResponse{
HostId: s.agent.hostID(),
ReplicaId: s.agent.replicaConfig.ReplicaID,
}, nil
}
func (s *grpcServer) Members(ctx context.Context,
req *internal.MembersRequest,
) (resp *internal.MembersResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Members: %#v`, req)
return &internal.MembersResponse{
Members: s.agent.members,
}, nil
}
func (s *grpcServer) Join(ctx context.Context,
req *internal.JoinRequest,
) (resp *internal.JoinResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Join: %#v`, req)
resp = &internal.JoinResponse{}
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
resp.Value, err = s.agent.joinPrimeReplica(req.HostId, s.agent.replicaConfig.ShardID, req.IsNonVoting)
return
}
func (s *grpcServer) Add(ctx context.Context,
req *internal.AddRequest,
) (resp *internal.AddResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Join: %#v`, req)
resp = &internal.AddResponse{}
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
resp.Value, err = s.agent.joinShardReplica(req.HostId, req.ShardId, req.ReplicaId, req.IsNonVoting)
return
}
var emptyCommitResponse = &internal.CommitResponse{}
func (s *grpcServer) Commit(ctx context.Context,
req *internal.CommitRequest,
) (resp *internal.CommitResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Propose: %#v`, req)
if !s.agent.hostConfig.NotifyCommit {
s.agent.log.Warningf(`%v`, ErrNotifyCommitDisabled)
}
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
rs, err := s.agent.host.Propose(s.agent.host.GetNoOPSession(req.ShardId), req.Data, raftTimeout)
if err != nil {
return
}
defer rs.Release()
for {
select {
case r := <-rs.ResultC():
if r.Committed() {
resp = emptyCommitResponse
} else if r.Aborted() {
err = ErrAborted
} else if r.Dropped() {
err = ErrShardNotReady
} else if r.Rejected() {
err = ErrRejected
} else if r.Terminated() {
err = ErrShardClosed
} else if r.Timeout() {
err = ErrTimeout
}
case <-ctx.Done():
if ctx.Err() == context.Canceled {
err = ErrCanceled
} else if ctx.Err() == context.DeadlineExceeded {
err = ErrTimeout
}
}
if err != nil || resp != nil {
break
}
}
return
}
func (s *grpcServer) Apply(ctx context.Context,
req *internal.ApplyRequest,
) (resp *internal.ApplyResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Propose: %#v`, req)
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
rs, err := s.agent.host.Propose(s.agent.host.GetNoOPSession(req.ShardId), req.Data, raftTimeout)
if err != nil {
return
}
defer rs.Release()
for {
select {
case r := <-rs.ResultC():
if r.Completed() {
resp = &internal.ApplyResponse{
Value: r.GetResult().Value,
Data: r.GetResult().Data,
}
// Result cannot be released because ApplyResponse may not be serialized
// This occurs as an optimization in hostClient for requests that do not require forwarding
// ReleaseResult(r.GetResult())
} else if r.Aborted() {
err = ErrAborted
} else if r.Dropped() {
err = ErrShardNotReady
} else if r.Rejected() {
err = ErrRejected
} else if r.Terminated() {
err = ErrShardClosed
} else if r.Timeout() {
err = ErrTimeout
}
case <-ctx.Done():
if ctx.Err() == context.Canceled {
err = ErrCanceled
} else if ctx.Err() == context.DeadlineExceeded {
err = ErrTimeout
}
}
if err != nil || resp != nil {
break
}
}
return
}
var emptyIndexResponse = &internal.IndexResponse{}
func (s *grpcServer) Index(ctx context.Context,
req *internal.IndexRequest,
) (resp *internal.IndexResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Query: %#v`, req)
resp = emptyIndexResponse
err = s.agent.index(ctx, req.ShardId)
return
}
func (s *grpcServer) Read(ctx context.Context,
req *internal.ReadRequest,
) (resp *internal.ReadResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Query: %#v`, req)
resp = &internal.ReadResponse{}
query := getLookupQuery()
query.ctx = ctx
query.data = req.Data
defer query.Release()
var r any
if req.Stale {
r, err = s.agent.host.StaleRead(req.ShardId, query)
} else {
ctx, cancel := context.WithTimeout(ctx, raftTimeout)
defer cancel()
r, err = s.agent.host.SyncRead(ctx, req.ShardId, query)
}
if result, ok := r.(*Result); ok && result != nil {
resp.Value = result.Value
resp.Data = result.Data
// Result cannot be released because ReadResponse may not be serialized
// This occurs as an optimization in hostClient for requests that do not require forwarding
// ReleaseResult(result)
}
return
}
func (s *grpcServer) Watch(req *internal.WatchRequest, srv internal.Internal_WatchServer) (err error) {
// s.agent.log.Debugf(`gRPC Req Query: %#v`, req)
query := getWatchQuery()
query.ctx = srv.Context()
query.data = req.Data
query.result = make(chan *Result)
defer query.Release()
var done = make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case result := <-query.result:
err := srv.Send(&internal.WatchResponse{
Value: result.Value,
Data: result.Data,
})
if err != nil {
s.agent.log.Errorf(`Error sending watch response: %s`, err.Error())
}
// Result cannot be released because WatchResponse may not be serialized
// This occurs as an optimization in hostClient for requests that do not require forwarding
// ReleaseResult(result)
case <-done:
return
}
}
}()
if req.Stale {
_, err = s.agent.host.StaleRead(req.ShardId, query)
} else {
_, err = s.agent.host.SyncRead(srv.Context(), req.ShardId, query)
}
close(done)
wg.Wait()
return
}
package zongzi
import (
"context"
"time"
"github.com/benbjohnson/clock"
"google.golang.org/grpc"
"github.com/logbn/zongzi/internal"
)
type hostClient struct {
agent *Agent
clock clock.Clock
host Host
}
func newhostClient(a *Agent, host Host) hostClient {
return hostClient{
agent: a,
clock: clock.New(),
host: host,
}
}
func (c *hostClient) Ping(ctx context.Context) (t time.Duration, err error) {
if c.host.ID == c.agent.hostID() {
return
}
start := c.clock.Now()
_, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Ping(ctx, &internal.PingRequest{})
t = c.clock.Since(start)
return
}
func (c *hostClient) ReadIndex(ctx context.Context, shardID uint64) (err error) {
if c.host.ID == c.agent.hostID() {
return
}
_, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Index(ctx, &internal.IndexRequest{ShardId: shardID})
return
}
func (c *hostClient) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error) {
var res *internal.ApplyResponse
if c.host.ID == c.agent.hostID() {
c.agent.log.Debugf(`gRPC hostClient Apply Local: %s`, string(cmd))
res, err = c.agent.grpcServer.Apply(ctx, &internal.ApplyRequest{
ShardId: shardID,
Data: cmd,
})
} else {
c.agent.log.Debugf(`gRPC hostClient Apply Remote: %s`, string(cmd))
res, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Apply(ctx, &internal.ApplyRequest{
ShardId: shardID,
Data: cmd,
})
}
if err != nil {
return
}
value = res.Value
data = res.Data
return
}
func (c *hostClient) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error) {
if c.host.ID == c.agent.hostID() {
c.agent.log.Debugf(`gRPC hostClient Commit Local: %s`, string(cmd))
_, err = c.agent.grpcServer.Commit(ctx, &internal.CommitRequest{
ShardId: shardID,
Data: cmd,
})
} else {
c.agent.log.Debugf(`gRPC hostClient Commit Remote: %s`, string(cmd))
_, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Commit(ctx, &internal.CommitRequest{
ShardId: shardID,
Data: cmd,
})
}
if err != nil {
return
}
return
}
func (c *hostClient) Read(ctx context.Context, shardID uint64, query []byte, stale bool) (value uint64, data []byte, err error) {
var res *internal.ReadResponse
if c.host.ID == c.agent.hostID() {
res, err = c.agent.grpcServer.Read(ctx, &internal.ReadRequest{
ShardId: shardID,
Stale: stale,
Data: query,
})
} else {
res, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Read(ctx, &internal.ReadRequest{
ShardId: shardID,
Stale: stale,
Data: query,
})
}
if err != nil {
return
}
value = res.Value
data = res.Data
return
}
type watchServer struct {
grpc.ServerStream
ctx context.Context
results chan<- *Result
}
func newWatchServer(ctx context.Context, results chan<- *Result) *watchServer {
return &watchServer{
ctx: ctx,
results: results,
}
}
func (s *watchServer) Context() context.Context {
return s.ctx
}
func (s *watchServer) Send(res *internal.WatchResponse) error {
s.results <- &Result{
Value: res.Value,
Data: res.Data,
}
return nil
}
func (c *hostClient) Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale bool) (err error) {
var client internal.Internal_WatchClient
if c.host.ID == c.agent.hostID() {
err = c.agent.grpcServer.Watch(&internal.WatchRequest{
ShardId: shardID,
Stale: stale,
Data: query,
}, newWatchServer(ctx, results))
} else {
client, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Watch(ctx, &internal.WatchRequest{
ShardId: shardID,
Stale: stale,
Data: query,
})
for {
res, err := client.Recv()
if err != nil {
break
}
results <- &Result{
Value: res.Value,
Data: res.Data,
}
}
}
if err != nil {
return
}
return
}
package zongzi
import (
"context"
"fmt"
"sync"
"time"
"github.com/logbn/zongzi/internal"
)
// The hostController starts and stops replicas based on replica config.
type hostController struct {
agent *Agent
ctx context.Context
ctxCancel context.CancelFunc
mutex sync.RWMutex
index uint64
done chan bool
}
func newHostController(a *Agent) *hostController {
return &hostController{
agent: a,
}
}
func (c *hostController) Start() (err error) {
c.mutex.Lock()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.done = make(chan bool)
go func() {
t := time.NewTicker(waitPeriod)
defer t.Stop()
for {
select {
case <-t.C:
err = c.tick()
case <-c.ctx.Done():
close(c.done)
return
}
if err != nil {
c.agent.log.Errorf("Host Controller: %v", err)
}
}
}()
c.mutex.Unlock()
return c.tick()
}
func (c *hostController) tick() (err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
var index uint64
var hadErr bool
var shard Shard
c.agent.State(c.ctx, func(state *State) {
host, ok := state.Host(c.agent.hostID())
if !ok {
hadErr = true
c.agent.log.Warningf("Host not found %s", c.agent.hostID())
return
}
index = host.Updated
if index <= c.index {
return
}
// These are all the replicas that SHOULD exist on the host
var found = map[uint64]bool{}
state.ReplicaIterateByHostID(c.agent.hostID(), func(r Replica) bool {
if r.ShardID > 0 {
found[r.ID] = false
}
return true
})
// These are all the replicas that DO exist on the host
hostInfo := c.agent.host.GetNodeHostInfo(nodeHostInfoOption{})
for _, info := range hostInfo.ShardInfoList {
if replica, ok := state.Replica(info.ReplicaID); ok {
found[replica.ID] = true
if replica.Status == ReplicaStatus_Closed {
// Remove replica
}
if info.IsNonVoting && !replica.IsNonVoting {
// Promote to Voting
}
if !info.IsNonVoting && replica.IsNonVoting {
// Demote to NonVoting
}
} else if replica.ShardID > 0 {
// Remove raftNode
}
}
// This creates all the missing replicas
for id, ok := range found {
if ok {
continue
}
replica, ok := state.Replica(id)
if !ok {
hadErr = true
c.agent.log.Warningf("Replica not found")
continue
}
shard, ok = state.Shard(replica.ShardID)
if !ok {
hadErr = true
c.agent.log.Warningf("Shard not found")
continue
}
item, ok := c.agent.shardTypes[shard.Type]
if !ok {
c.agent.log.Warningf("Shard name not found in registry: %s", shard.Type)
continue
}
// err := c.add(shard, replica)
members := state.ShardMembers(shard.ID)
item.Config.ShardID = shard.ID
item.Config.ReplicaID = replica.ID
item.Config.IsNonVoting = replica.IsNonVoting
c.agent.log.Infof("[%05d:%05d] Host Controller: Starting replica: %s", shard.ID, replica.ID, shard.Type)
if item.StateMachineFactory != nil {
shim := stateMachineFactoryShim(item.StateMachineFactory)
switch replica.Status {
case ReplicaStatus_Bootstrapping:
if len(members) < 3 {
err = fmt.Errorf("Not enough members")
break
}
err = c.agent.host.StartConcurrentReplica(members, false, shim, item.Config)
case ReplicaStatus_Joining:
res := c.requestShardJoin(members, shard.ID, replica.ID, replica.IsNonVoting)
if res == 0 {
err = fmt.Errorf(`[%05d:%05d] Unable to join shard`, shard.ID, replica.ID)
break
}
err = c.agent.host.StartConcurrentReplica(nil, true, shim, item.Config)
case ReplicaStatus_Active:
err = c.agent.host.StartConcurrentReplica(nil, false, shim, item.Config)
}
} else {
shim := stateMachinePersistentFactoryShim(item.StateMachinePersistentFactory)
switch replica.Status {
case ReplicaStatus_Bootstrapping:
if len(members) < 3 {
err = fmt.Errorf("Not enough members")
break
}
err = c.agent.host.StartOnDiskReplica(members, false, shim, item.Config)
case ReplicaStatus_Joining:
res := c.requestShardJoin(members, shard.ID, replica.ID, replica.IsNonVoting)
if res == 0 {
err = fmt.Errorf(`[%05d:%05d] Unable to join persistent shard`, shard.ID, replica.ID)
break
}
err = c.agent.host.StartOnDiskReplica(nil, true, shim, item.Config)
case ReplicaStatus_Active:
err = c.agent.host.StartOnDiskReplica(nil, false, shim, item.Config)
}
}
if err != nil {
hadErr = true
c.agent.log.Warningf("Failed to start replica: %v", err)
continue
}
var res Result
res, err = c.agent.primePropose(newCmdReplicaUpdateStatus(replica.ID, ReplicaStatus_Active))
if err != nil || res.Value != 1 {
hadErr = true
c.agent.log.Warningf("Failed to update replica status: %v", err)
}
}
})
if index <= c.index || hadErr {
err = nil
return
}
c.agent.log.Debugf("%s Finished processing %d", c.agent.hostID(), index)
c.index = index
return
}
// requestShardJoin requests host replica be added to a shard
func (c *hostController) requestShardJoin(members map[uint64]string, shardID, replicaID uint64, isNonVoting bool) (v uint64) {
c.agent.log.Debugf("[%05d:%05d] Joining shard (isNonVoting: %v)", shardID, replicaID, isNonVoting)
var res *internal.AddResponse
var host Host
var ok bool
var err error
for _, hostID := range members {
c.agent.State(c.ctx, func(s *State) {
host, ok = s.Host(hostID)
})
if !ok {
c.agent.log.Warningf(`Host not found %s`, hostID)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), raftTimeout)
defer cancel()
res, err = c.agent.grpcClientPool.get(host.ApiAddress).Add(ctx, &internal.AddRequest{
HostId: c.agent.hostID(),
ShardId: shardID,
ReplicaId: replicaID,
IsNonVoting: isNonVoting,
})
if err != nil {
c.agent.log.Warningf(`[%05d:%05d] %s | %s Unable to join shard (%v): %v`, shardID, replicaID, c.agent.hostID(), hostID, isNonVoting, err)
}
if err == nil && res != nil && res.Value > 0 {
v = res.Value
break
}
}
if err != nil {
c.agent.log.Warningf(`Unable to join shard: %v`, err)
}
if res != nil {
v = res.Value
}
return
}
func (c *hostController) Stop() {
defer c.agent.log.Infof(`Stopped hostController`)
if c.ctxCancel != nil {
c.ctxCancel()
}
<-c.done
c.mutex.Lock()
defer c.mutex.Unlock()
c.index = 0
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v3.12.4
// source: zongzi.proto
package internal
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PingRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *PingRequest) Reset() {
*x = PingRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PingRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PingRequest) ProtoMessage() {}
func (x *PingRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead.
func (*PingRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{0}
}
type PingResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *PingResponse) Reset() {
*x = PingResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PingResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PingResponse) ProtoMessage() {}
func (x *PingResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead.
func (*PingResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{1}
}
type ProbeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ProbeRequest) Reset() {
*x = ProbeRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ProbeRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProbeRequest) ProtoMessage() {}
func (x *ProbeRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ProbeRequest.ProtoReflect.Descriptor instead.
func (*ProbeRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{2}
}
type ProbeResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
GossipAdvertiseAddress string `protobuf:"bytes,1,opt,name=gossip_advertise_address,json=gossipAdvertiseAddress,proto3" json:"gossip_advertise_address,omitempty"`
}
func (x *ProbeResponse) Reset() {
*x = ProbeResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ProbeResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProbeResponse) ProtoMessage() {}
func (x *ProbeResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ProbeResponse.ProtoReflect.Descriptor instead.
func (*ProbeResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{3}
}
func (x *ProbeResponse) GetGossipAdvertiseAddress() string {
if x != nil {
return x.GossipAdvertiseAddress
}
return ""
}
type InfoRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *InfoRequest) Reset() {
*x = InfoRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InfoRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InfoRequest) ProtoMessage() {}
func (x *InfoRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InfoRequest.ProtoReflect.Descriptor instead.
func (*InfoRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{4}
}
type InfoResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"`
ReplicaId uint64 `protobuf:"varint,2,opt,name=replica_id,json=replicaId,proto3" json:"replica_id,omitempty"`
}
func (x *InfoResponse) Reset() {
*x = InfoResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InfoResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InfoResponse) ProtoMessage() {}
func (x *InfoResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InfoResponse.ProtoReflect.Descriptor instead.
func (*InfoResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{5}
}
func (x *InfoResponse) GetHostId() string {
if x != nil {
return x.HostId
}
return ""
}
func (x *InfoResponse) GetReplicaId() uint64 {
if x != nil {
return x.ReplicaId
}
return 0
}
type MembersRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *MembersRequest) Reset() {
*x = MembersRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MembersRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MembersRequest) ProtoMessage() {}
func (x *MembersRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MembersRequest.ProtoReflect.Descriptor instead.
func (*MembersRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{6}
}
type MembersResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Members map[uint64]string `protobuf:"bytes,1,rep,name=members,proto3" json:"members,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *MembersResponse) Reset() {
*x = MembersResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MembersResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MembersResponse) ProtoMessage() {}
func (x *MembersResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MembersResponse.ProtoReflect.Descriptor instead.
func (*MembersResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{7}
}
func (x *MembersResponse) GetMembers() map[uint64]string {
if x != nil {
return x.Members
}
return nil
}
type JoinRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"`
IsNonVoting bool `protobuf:"varint,2,opt,name=is_non_voting,json=isNonVoting,proto3" json:"is_non_voting,omitempty"`
}
func (x *JoinRequest) Reset() {
*x = JoinRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *JoinRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*JoinRequest) ProtoMessage() {}
func (x *JoinRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead.
func (*JoinRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{8}
}
func (x *JoinRequest) GetHostId() string {
if x != nil {
return x.HostId
}
return ""
}
func (x *JoinRequest) GetIsNonVoting() bool {
if x != nil {
return x.IsNonVoting
}
return false
}
type JoinResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *JoinResponse) Reset() {
*x = JoinResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *JoinResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*JoinResponse) ProtoMessage() {}
func (x *JoinResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use JoinResponse.ProtoReflect.Descriptor instead.
func (*JoinResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{9}
}
func (x *JoinResponse) GetValue() uint64 {
if x != nil {
return x.Value
}
return 0
}
func (x *JoinResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
type AddRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"`
ShardId uint64 `protobuf:"varint,2,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
ReplicaId uint64 `protobuf:"varint,3,opt,name=replica_id,json=replicaId,proto3" json:"replica_id,omitempty"`
IsNonVoting bool `protobuf:"varint,4,opt,name=is_non_voting,json=isNonVoting,proto3" json:"is_non_voting,omitempty"`
}
func (x *AddRequest) Reset() {
*x = AddRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AddRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddRequest) ProtoMessage() {}
func (x *AddRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddRequest.ProtoReflect.Descriptor instead.
func (*AddRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{10}
}
func (x *AddRequest) GetHostId() string {
if x != nil {
return x.HostId
}
return ""
}
func (x *AddRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
func (x *AddRequest) GetReplicaId() uint64 {
if x != nil {
return x.ReplicaId
}
return 0
}
func (x *AddRequest) GetIsNonVoting() bool {
if x != nil {
return x.IsNonVoting
}
return false
}
type AddResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *AddResponse) Reset() {
*x = AddResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AddResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddResponse) ProtoMessage() {}
func (x *AddResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[11]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddResponse.ProtoReflect.Descriptor instead.
func (*AddResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{11}
}
func (x *AddResponse) GetValue() uint64 {
if x != nil {
return x.Value
}
return 0
}
func (x *AddResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
type IndexRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
}
func (x *IndexRequest) Reset() {
*x = IndexRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[12]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IndexRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IndexRequest) ProtoMessage() {}
func (x *IndexRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[12]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IndexRequest.ProtoReflect.Descriptor instead.
func (*IndexRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{12}
}
func (x *IndexRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
type IndexResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *IndexResponse) Reset() {
*x = IndexResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IndexResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IndexResponse) ProtoMessage() {}
func (x *IndexResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[13]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IndexResponse.ProtoReflect.Descriptor instead.
func (*IndexResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{13}
}
type ApplyRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *ApplyRequest) Reset() {
*x = ApplyRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ApplyRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ApplyRequest) ProtoMessage() {}
func (x *ApplyRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[14]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ApplyRequest.ProtoReflect.Descriptor instead.
func (*ApplyRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{14}
}
func (x *ApplyRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
func (x *ApplyRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type ApplyResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *ApplyResponse) Reset() {
*x = ApplyResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[15]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ApplyResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ApplyResponse) ProtoMessage() {}
func (x *ApplyResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[15]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ApplyResponse.ProtoReflect.Descriptor instead.
func (*ApplyResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{15}
}
func (x *ApplyResponse) GetValue() uint64 {
if x != nil {
return x.Value
}
return 0
}
func (x *ApplyResponse) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type CommitRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *CommitRequest) Reset() {
*x = CommitRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[16]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommitRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommitRequest) ProtoMessage() {}
func (x *CommitRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[16]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommitRequest.ProtoReflect.Descriptor instead.
func (*CommitRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{16}
}
func (x *CommitRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
func (x *CommitRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type CommitResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *CommitResponse) Reset() {
*x = CommitResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[17]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommitResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommitResponse) ProtoMessage() {}
func (x *CommitResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[17]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommitResponse.ProtoReflect.Descriptor instead.
func (*CommitResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{17}
}
type ReadRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
Stale bool `protobuf:"varint,2,opt,name=stale,proto3" json:"stale,omitempty"`
Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *ReadRequest) Reset() {
*x = ReadRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[18]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReadRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReadRequest) ProtoMessage() {}
func (x *ReadRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[18]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReadRequest.ProtoReflect.Descriptor instead.
func (*ReadRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{18}
}
func (x *ReadRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
func (x *ReadRequest) GetStale() bool {
if x != nil {
return x.Stale
}
return false
}
func (x *ReadRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type ReadResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *ReadResponse) Reset() {
*x = ReadResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[19]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReadResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReadResponse) ProtoMessage() {}
func (x *ReadResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[19]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReadResponse.ProtoReflect.Descriptor instead.
func (*ReadResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{19}
}
func (x *ReadResponse) GetValue() uint64 {
if x != nil {
return x.Value
}
return 0
}
func (x *ReadResponse) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type WatchRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
Stale bool `protobuf:"varint,2,opt,name=stale,proto3" json:"stale,omitempty"`
Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *WatchRequest) Reset() {
*x = WatchRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[20]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WatchRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WatchRequest) ProtoMessage() {}
func (x *WatchRequest) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[20]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead.
func (*WatchRequest) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{20}
}
func (x *WatchRequest) GetShardId() uint64 {
if x != nil {
return x.ShardId
}
return 0
}
func (x *WatchRequest) GetStale() bool {
if x != nil {
return x.Stale
}
return false
}
func (x *WatchRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type WatchResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *WatchResponse) Reset() {
*x = WatchResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_zongzi_proto_msgTypes[21]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WatchResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WatchResponse) ProtoMessage() {}
func (x *WatchResponse) ProtoReflect() protoreflect.Message {
mi := &file_zongzi_proto_msgTypes[21]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WatchResponse.ProtoReflect.Descriptor instead.
func (*WatchResponse) Descriptor() ([]byte, []int) {
return file_zongzi_proto_rawDescGZIP(), []int{21}
}
func (x *WatchResponse) GetValue() uint64 {
if x != nil {
return x.Value
}
return 0
}
func (x *WatchResponse) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
var File_zongzi_proto protoreflect.FileDescriptor
var file_zongzi_proto_rawDesc = []byte{
0x0a, 0x0c, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06,
0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x22, 0x0d, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x49, 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x18, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70,
0x5f, 0x61, 0x64, 0x76, 0x65, 0x72, 0x74, 0x69, 0x73, 0x65, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65,
0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70,
0x41, 0x64, 0x76, 0x65, 0x72, 0x74, 0x69, 0x73, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73,
0x22, 0x0d, 0x0a, 0x0b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22,
0x46, 0x0a, 0x0c, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x22, 0x10, 0x0a, 0x0e, 0x4d, 0x65, 0x6d, 0x62, 0x65,
0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x8d, 0x01, 0x0a, 0x0f, 0x4d, 0x65,
0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a,
0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24,
0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a,
0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a,
0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4a, 0x0a, 0x0b, 0x4a, 0x6f, 0x69,
0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74,
0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49,
0x64, 0x12, 0x22, 0x0a, 0x0d, 0x69, 0x73, 0x5f, 0x6e, 0x6f, 0x6e, 0x5f, 0x76, 0x6f, 0x74, 0x69,
0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x4e, 0x6f, 0x6e, 0x56,
0x6f, 0x74, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x0c, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f,
0x72, 0x22, 0x83, 0x01, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
0x72, 0x64, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f,
0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x69, 0x73, 0x5f, 0x6e, 0x6f, 0x6e, 0x5f, 0x76, 0x6f,
0x74, 0x69, 0x6e, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x4e, 0x6f,
0x6e, 0x56, 0x6f, 0x74, 0x69, 0x6e, 0x67, 0x22, 0x39, 0x0a, 0x0b, 0x41, 0x64, 0x64, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x22, 0x29, 0x0a, 0x0c, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x22, 0x0f, 0x0a,
0x0d, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x3d,
0x0a, 0x0c, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19,
0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04,
0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74,
0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a,
0x0d, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3e, 0x0a, 0x0d, 0x43, 0x6f, 0x6d, 0x6d,
0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
0x72, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x6d,
0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x52, 0x0a, 0x0b, 0x52, 0x65,
0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
0x72, 0x64, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x08, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61,
0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x38,
0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x53, 0x0a, 0x0c, 0x57, 0x61, 0x74, 0x63,
0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72,
0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72,
0x64, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01,
0x28, 0x08, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74,
0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a,
0x0d, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0xeb, 0x04, 0x0a, 0x08, 0x49, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x33, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x13, 0x2e,
0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x69, 0x6e, 0x67,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x50, 0x72,
0x6f, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x72, 0x6f,
0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67,
0x7a, 0x69, 0x2e, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x13, 0x2e, 0x7a, 0x6f, 0x6e,
0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x07, 0x4d, 0x65, 0x6d, 0x62, 0x65,
0x72, 0x73, 0x12, 0x16, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62,
0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x7a, 0x6f, 0x6e,
0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x4a, 0x6f, 0x69, 0x6e, 0x12, 0x13, 0x2e,
0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4a, 0x6f, 0x69, 0x6e,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x30, 0x0a, 0x03, 0x41, 0x64,
0x64, 0x12, 0x12, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x64, 0x64, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41,
0x64, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05,
0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49,
0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f,
0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x14, 0x2e,
0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x70, 0x70,
0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x06,
0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x12, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e,
0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12,
0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x52, 0x65,
0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x05,
0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x57,
0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f,
0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x6f, 0x67, 0x62, 0x6e, 0x2f, 0x7a, 0x6f, 0x6e, 0x67, 0x7a,
0x69, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_zongzi_proto_rawDescOnce sync.Once
file_zongzi_proto_rawDescData = file_zongzi_proto_rawDesc
)
func file_zongzi_proto_rawDescGZIP() []byte {
file_zongzi_proto_rawDescOnce.Do(func() {
file_zongzi_proto_rawDescData = protoimpl.X.CompressGZIP(file_zongzi_proto_rawDescData)
})
return file_zongzi_proto_rawDescData
}
var file_zongzi_proto_msgTypes = make([]protoimpl.MessageInfo, 23)
var file_zongzi_proto_goTypes = []interface{}{
(*PingRequest)(nil), // 0: zongzi.PingRequest
(*PingResponse)(nil), // 1: zongzi.PingResponse
(*ProbeRequest)(nil), // 2: zongzi.ProbeRequest
(*ProbeResponse)(nil), // 3: zongzi.ProbeResponse
(*InfoRequest)(nil), // 4: zongzi.InfoRequest
(*InfoResponse)(nil), // 5: zongzi.InfoResponse
(*MembersRequest)(nil), // 6: zongzi.MembersRequest
(*MembersResponse)(nil), // 7: zongzi.MembersResponse
(*JoinRequest)(nil), // 8: zongzi.JoinRequest
(*JoinResponse)(nil), // 9: zongzi.JoinResponse
(*AddRequest)(nil), // 10: zongzi.AddRequest
(*AddResponse)(nil), // 11: zongzi.AddResponse
(*IndexRequest)(nil), // 12: zongzi.IndexRequest
(*IndexResponse)(nil), // 13: zongzi.IndexResponse
(*ApplyRequest)(nil), // 14: zongzi.ApplyRequest
(*ApplyResponse)(nil), // 15: zongzi.ApplyResponse
(*CommitRequest)(nil), // 16: zongzi.CommitRequest
(*CommitResponse)(nil), // 17: zongzi.CommitResponse
(*ReadRequest)(nil), // 18: zongzi.ReadRequest
(*ReadResponse)(nil), // 19: zongzi.ReadResponse
(*WatchRequest)(nil), // 20: zongzi.WatchRequest
(*WatchResponse)(nil), // 21: zongzi.WatchResponse
nil, // 22: zongzi.MembersResponse.MembersEntry
}
var file_zongzi_proto_depIdxs = []int32{
22, // 0: zongzi.MembersResponse.members:type_name -> zongzi.MembersResponse.MembersEntry
0, // 1: zongzi.Internal.Ping:input_type -> zongzi.PingRequest
2, // 2: zongzi.Internal.Probe:input_type -> zongzi.ProbeRequest
4, // 3: zongzi.Internal.Info:input_type -> zongzi.InfoRequest
6, // 4: zongzi.Internal.Members:input_type -> zongzi.MembersRequest
8, // 5: zongzi.Internal.Join:input_type -> zongzi.JoinRequest
10, // 6: zongzi.Internal.Add:input_type -> zongzi.AddRequest
12, // 7: zongzi.Internal.Index:input_type -> zongzi.IndexRequest
14, // 8: zongzi.Internal.Apply:input_type -> zongzi.ApplyRequest
16, // 9: zongzi.Internal.Commit:input_type -> zongzi.CommitRequest
18, // 10: zongzi.Internal.Read:input_type -> zongzi.ReadRequest
20, // 11: zongzi.Internal.Watch:input_type -> zongzi.WatchRequest
1, // 12: zongzi.Internal.Ping:output_type -> zongzi.PingResponse
3, // 13: zongzi.Internal.Probe:output_type -> zongzi.ProbeResponse
5, // 14: zongzi.Internal.Info:output_type -> zongzi.InfoResponse
7, // 15: zongzi.Internal.Members:output_type -> zongzi.MembersResponse
9, // 16: zongzi.Internal.Join:output_type -> zongzi.JoinResponse
11, // 17: zongzi.Internal.Add:output_type -> zongzi.AddResponse
13, // 18: zongzi.Internal.Index:output_type -> zongzi.IndexResponse
15, // 19: zongzi.Internal.Apply:output_type -> zongzi.ApplyResponse
17, // 20: zongzi.Internal.Commit:output_type -> zongzi.CommitResponse
19, // 21: zongzi.Internal.Read:output_type -> zongzi.ReadResponse
21, // 22: zongzi.Internal.Watch:output_type -> zongzi.WatchResponse
12, // [12:23] is the sub-list for method output_type
1, // [1:12] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_zongzi_proto_init() }
func file_zongzi_proto_init() {
if File_zongzi_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_zongzi_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ProbeRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ProbeResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InfoRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InfoResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MembersRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MembersResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*JoinRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*JoinResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AddRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AddResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IndexRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IndexResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ApplyRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ApplyResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommitRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommitResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReadRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReadResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WatchRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_zongzi_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WatchResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_zongzi_proto_rawDesc,
NumEnums: 0,
NumMessages: 23,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_zongzi_proto_goTypes,
DependencyIndexes: file_zongzi_proto_depIdxs,
MessageInfos: file_zongzi_proto_msgTypes,
}.Build()
File_zongzi_proto = out.File
file_zongzi_proto_rawDesc = nil
file_zongzi_proto_goTypes = nil
file_zongzi_proto_depIdxs = nil
}
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.12.4
// source: zongzi.proto
package internal
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const (
Internal_Ping_FullMethodName = "/zongzi.Internal/Ping"
Internal_Probe_FullMethodName = "/zongzi.Internal/Probe"
Internal_Info_FullMethodName = "/zongzi.Internal/Info"
Internal_Members_FullMethodName = "/zongzi.Internal/Members"
Internal_Join_FullMethodName = "/zongzi.Internal/Join"
Internal_Add_FullMethodName = "/zongzi.Internal/Add"
Internal_Index_FullMethodName = "/zongzi.Internal/Index"
Internal_Apply_FullMethodName = "/zongzi.Internal/Apply"
Internal_Commit_FullMethodName = "/zongzi.Internal/Commit"
Internal_Read_FullMethodName = "/zongzi.Internal/Read"
Internal_Watch_FullMethodName = "/zongzi.Internal/Watch"
)
// InternalClient is the client API for Internal service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InternalClient interface {
// Ping is a noop for timing purposes
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
// Probe returns Gossip.AdvertiseAddress
// Used by new hosts to start dragonboat
Probe(ctx context.Context, in *ProbeRequest, opts ...grpc.CallOption) (*ProbeResponse, error)
// Info returns zero shard replicaID and hostID
// Used by new hosts to discover zero shard replicas
Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error)
// Members returns json marshaled map of member zero shard replicaID to hostID
// Used by new hosts joining a bootstrapped zero shard
Members(ctx context.Context, in *MembersRequest, opts ...grpc.CallOption) (*MembersResponse, error)
// Join takes a host ID and returns success
// Used by joining hosts to request their own addition to the zero shard
Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error)
// Add takes a replica ID, shard ID and host ID and returns success
// Used during replica creation to request replica's addition to the shard
Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error)
// Index reads the index of a shard
Index(ctx context.Context, in *IndexRequest, opts ...grpc.CallOption) (*IndexResponse, error)
// Apply provides unary request/response command forwarding
Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error)
// Commit provides unary request/response command forwarding
Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error)
// Read provides unary request/response query forwarding
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
// Watch provides streaming query response forwarding
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Internal_WatchClient, error)
}
type internalClient struct {
cc grpc.ClientConnInterface
}
func NewInternalClient(cc grpc.ClientConnInterface) InternalClient {
return &internalClient{cc}
}
func (c *internalClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, Internal_Ping_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Probe(ctx context.Context, in *ProbeRequest, opts ...grpc.CallOption) (*ProbeResponse, error) {
out := new(ProbeResponse)
err := c.cc.Invoke(ctx, Internal_Probe_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) {
out := new(InfoResponse)
err := c.cc.Invoke(ctx, Internal_Info_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Members(ctx context.Context, in *MembersRequest, opts ...grpc.CallOption) (*MembersResponse, error) {
out := new(MembersResponse)
err := c.cc.Invoke(ctx, Internal_Members_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) {
out := new(JoinResponse)
err := c.cc.Invoke(ctx, Internal_Join_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error) {
out := new(AddResponse)
err := c.cc.Invoke(ctx, Internal_Add_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Index(ctx context.Context, in *IndexRequest, opts ...grpc.CallOption) (*IndexResponse, error) {
out := new(IndexResponse)
err := c.cc.Invoke(ctx, Internal_Index_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) {
out := new(ApplyResponse)
err := c.cc.Invoke(ctx, Internal_Apply_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error) {
out := new(CommitResponse)
err := c.cc.Invoke(ctx, Internal_Commit_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
out := new(ReadResponse)
err := c.cc.Invoke(ctx, Internal_Read_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *internalClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Internal_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &Internal_ServiceDesc.Streams[0], Internal_Watch_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &internalWatchClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Internal_WatchClient interface {
Recv() (*WatchResponse, error)
grpc.ClientStream
}
type internalWatchClient struct {
grpc.ClientStream
}
func (x *internalWatchClient) Recv() (*WatchResponse, error) {
m := new(WatchResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalServer is the server API for Internal service.
// All implementations must embed UnimplementedInternalServer
// for forward compatibility
type InternalServer interface {
// Ping is a noop for timing purposes
Ping(context.Context, *PingRequest) (*PingResponse, error)
// Probe returns Gossip.AdvertiseAddress
// Used by new hosts to start dragonboat
Probe(context.Context, *ProbeRequest) (*ProbeResponse, error)
// Info returns zero shard replicaID and hostID
// Used by new hosts to discover zero shard replicas
Info(context.Context, *InfoRequest) (*InfoResponse, error)
// Members returns json marshaled map of member zero shard replicaID to hostID
// Used by new hosts joining a bootstrapped zero shard
Members(context.Context, *MembersRequest) (*MembersResponse, error)
// Join takes a host ID and returns success
// Used by joining hosts to request their own addition to the zero shard
Join(context.Context, *JoinRequest) (*JoinResponse, error)
// Add takes a replica ID, shard ID and host ID and returns success
// Used during replica creation to request replica's addition to the shard
Add(context.Context, *AddRequest) (*AddResponse, error)
// Index reads the index of a shard
Index(context.Context, *IndexRequest) (*IndexResponse, error)
// Apply provides unary request/response command forwarding
Apply(context.Context, *ApplyRequest) (*ApplyResponse, error)
// Commit provides unary request/response command forwarding
Commit(context.Context, *CommitRequest) (*CommitResponse, error)
// Read provides unary request/response query forwarding
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// Watch provides streaming query response forwarding
Watch(*WatchRequest, Internal_WatchServer) error
mustEmbedUnimplementedInternalServer()
}
// UnimplementedInternalServer must be embedded to have forward compatible implementations.
type UnimplementedInternalServer struct {
}
func (UnimplementedInternalServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedInternalServer) Probe(context.Context, *ProbeRequest) (*ProbeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Probe not implemented")
}
func (UnimplementedInternalServer) Info(context.Context, *InfoRequest) (*InfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Info not implemented")
}
func (UnimplementedInternalServer) Members(context.Context, *MembersRequest) (*MembersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Members not implemented")
}
func (UnimplementedInternalServer) Join(context.Context, *JoinRequest) (*JoinResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Join not implemented")
}
func (UnimplementedInternalServer) Add(context.Context, *AddRequest) (*AddResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Add not implemented")
}
func (UnimplementedInternalServer) Index(context.Context, *IndexRequest) (*IndexResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Index not implemented")
}
func (UnimplementedInternalServer) Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented")
}
func (UnimplementedInternalServer) Commit(context.Context, *CommitRequest) (*CommitResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Commit not implemented")
}
func (UnimplementedInternalServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
}
func (UnimplementedInternalServer) Watch(*WatchRequest, Internal_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedInternalServer) mustEmbedUnimplementedInternalServer() {}
// UnsafeInternalServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalServer will
// result in compilation errors.
type UnsafeInternalServer interface {
mustEmbedUnimplementedInternalServer()
}
func RegisterInternalServer(s grpc.ServiceRegistrar, srv InternalServer) {
s.RegisterService(&Internal_ServiceDesc, srv)
}
func _Internal_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Ping_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Probe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProbeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Probe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Probe_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Probe(ctx, req.(*ProbeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Info_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InfoRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Info(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Info_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Info(ctx, req.(*InfoRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Members_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(MembersRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Members(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Members_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Members(ctx, req.(*MembersRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(JoinRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Join(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Join_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Join(ctx, req.(*JoinRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Add_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Add(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Add_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Add(ctx, req.(*AddRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Index_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(IndexRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Index(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Index_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Index(ctx, req.(*IndexRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Apply_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ApplyRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Apply(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Apply_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Apply(ctx, req.(*ApplyRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Commit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CommitRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Commit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Commit_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Commit(ctx, req.(*CommitRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(InternalServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Internal_Read_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(InternalServer).Read(ctx, req.(*ReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Internal_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(InternalServer).Watch(m, &internalWatchServer{stream})
}
type Internal_WatchServer interface {
Send(*WatchResponse) error
grpc.ServerStream
}
type internalWatchServer struct {
grpc.ServerStream
}
func (x *internalWatchServer) Send(m *WatchResponse) error {
return x.ServerStream.SendMsg(m)
}
// Internal_ServiceDesc is the grpc.ServiceDesc for Internal service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Internal_ServiceDesc = grpc.ServiceDesc{
ServiceName: "zongzi.Internal",
HandlerType: (*InternalServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Internal_Ping_Handler,
},
{
MethodName: "Probe",
Handler: _Internal_Probe_Handler,
},
{
MethodName: "Info",
Handler: _Internal_Info_Handler,
},
{
MethodName: "Members",
Handler: _Internal_Members_Handler,
},
{
MethodName: "Join",
Handler: _Internal_Join_Handler,
},
{
MethodName: "Add",
Handler: _Internal_Add_Handler,
},
{
MethodName: "Index",
Handler: _Internal_Index_Handler,
},
{
MethodName: "Apply",
Handler: _Internal_Apply_Handler,
},
{
MethodName: "Commit",
Handler: _Internal_Commit_Handler,
},
{
MethodName: "Read",
Handler: _Internal_Read_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _Internal_Watch_Handler,
ServerStreams: true,
},
},
Metadata: "zongzi.proto",
}
package zongzi
import (
"context"
)
// ShardClient can be used to interact with a shard regardless of its placement in the cluster
// Requests will be forwarded to the appropriate host based on ping
type ShardClient interface {
Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error)
Commit(ctx context.Context, cmd []byte) (err error)
Index(ctx context.Context) (err error)
Leader() (uint64, uint64)
Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error)
}
// The shard client
type client struct {
manager *clientManager
shardID uint64
retries int
writeToLeader bool
}
var _ ShardClient = new(client)
func newClient(manager *clientManager, shardID uint64, opts ...ClientOption) (c *client, err error) {
c = &client{
manager: manager,
shardID: shardID,
}
for _, fn := range opts {
if err = fn(c); err != nil {
return
}
}
return
}
func (c *client) Index(ctx context.Context) (err error) {
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
err = el.Value.ReadIndex(ctx, c.shardID)
if err == nil {
break
}
}
return
}
func (c *client) Leader() (replicaID, term uint64) {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
replicaID = leader.replicaID
term = leader.term
}
return
}
func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) {
if c.writeToLeader {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Apply(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
for el := list.Front(); el != nil; el = el.Next() {
value, data, err = el.Value.Apply(ctx, c.shardID, cmd)
if err == nil {
break
}
}
return
}
func (c *client) Commit(ctx context.Context, cmd []byte) (err error) {
if c.writeToLeader {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Commit(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
err = el.Value.Commit(ctx, c.shardID, cmd)
if err == nil {
break
}
}
return
}
func (c *client) Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error) {
var run bool
if stale {
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
run = true
value, data, err = el.Value.Read(ctx, c.shardID, query, stale)
if err == nil {
break
}
}
if run && err == nil {
return
}
}
if c.writeToLeader {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Read(ctx, c.shardID, query, stale)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
value, data, err = el.Value.Read(ctx, c.shardID, query, stale)
if err == nil {
break
}
}
return
}
func (c *client) Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error) {
var run bool
if stale {
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
run = true
err = el.Value.Watch(ctx, c.shardID, query, results, stale)
if err == nil {
break
}
}
if run && err == nil {
return
}
}
if c.writeToLeader {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Watch(ctx, c.shardID, query, results, stale)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
if !ok {
err = ErrShardNotReady
return
}
el := list.Front()
for ; el != nil; el = el.Next() {
err = el.Value.Watch(ctx, c.shardID, query, results, stale)
if err == nil {
break
}
}
return
}
package zongzi
import (
"cmp"
"context"
"slices"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/elliotchance/orderedmap/v2"
)
// The clientManager creates and destroys replicas based on a shard tags.
type clientManager struct {
agent *Agent
clock clock.Clock
ctx context.Context
ctxCancel context.CancelFunc
clientHost map[string]hostClient
clientLeader map[uint64]hostClientLeader
clientMember map[uint64]*orderedmap.OrderedMap[int64, hostClient]
clientReplica map[uint64]*orderedmap.OrderedMap[int64, hostClient]
index uint64
log Logger
mutex sync.RWMutex
shardController Controller
wg sync.WaitGroup
}
type hostClientLeader struct {
client hostClient
replicaID uint64
term uint64
}
func newClientManager(agent *Agent) *clientManager {
return &clientManager{
log: agent.log,
agent: agent,
clock: clock.New(),
clientHost: map[string]hostClient{},
clientLeader: map[uint64]hostClientLeader{},
clientMember: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
clientReplica: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
}
}
func (c *clientManager) Start() (err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := c.clock.Ticker(500 * time.Millisecond)
defer t.Stop()
for {
select {
case <-c.ctx.Done():
c.log.Infof("Shard client manager stopped")
return
case <-t.C:
c.tick()
}
}
}()
return
}
type hostClientPing struct {
ping int64
client hostClient
}
func (c *clientManager) tick() {
var err error
var index uint64
var start = c.clock.Now()
var shardCount int
var replicaCount int
var pings = map[string]time.Duration{}
err = c.agent.StateLocal(func(state *State) {
state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool {
shardCount++
index = shard.Updated
var leaderClient hostClient
members := []hostClientPing{}
replicas := []hostClientPing{}
state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool {
replicaCount++
client, ok := c.clientHost[replica.HostID]
if !ok {
client = c.agent.hostClient(replica.HostID)
c.clientHost[replica.HostID] = client
}
ping, ok := pings[replica.HostID]
if !ok {
ctx, cancel := context.WithTimeout(c.ctx, time.Second)
defer cancel()
ping, err = client.Ping(ctx)
if err != nil {
c.log.Warningf(`Unable to ping host in shard client manager: %s`, err.Error())
return true
}
pings[replica.HostID] = ping
}
if replica.IsNonVoting {
replicas = append(replicas, hostClientPing{ping.Nanoseconds(), client})
} else {
members = append(members, hostClientPing{ping.Nanoseconds(), client})
}
if shard.Leader == replica.ID {
leaderClient = client
}
return true
})
slices.SortFunc(members, byPingAsc)
slices.SortFunc(replicas, byPingAsc)
newMembers := orderedmap.NewOrderedMap[int64, hostClient]()
for _, item := range members {
newMembers.Set(item.ping, item.client)
}
newReplicas := orderedmap.NewOrderedMap[int64, hostClient]()
for _, item := range replicas {
newReplicas.Set(item.ping, item.client)
}
c.mutex.Lock()
if leaderClient.agent != nil {
c.clientLeader[shard.ID] = hostClientLeader{leaderClient, shard.Leader, shard.Term}
} else {
delete(c.clientLeader, shard.ID)
}
c.clientMember[shard.ID] = newMembers
c.clientReplica[shard.ID] = newReplicas
c.mutex.Unlock()
return true
})
})
if err == nil && shardCount > 0 {
c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d leaders: %d time: %vms",
c.agent.hostID(),
len(pings),
shardCount,
replicaCount,
len(c.clientLeader),
float64(c.clock.Since(start)/time.Microsecond)/1000)
c.index = index
}
}
func byPingAsc(a, b hostClientPing) int { return cmp.Compare(a.ping, b.ping) }
func (c *clientManager) Stop() {
defer c.log.Infof(`Stopped clientManager`)
if c.ctxCancel != nil {
c.ctxCancel()
}
c.wg.Wait()
c.mutex.Lock()
defer c.mutex.Unlock()
c.index = 0
}
package zongzi
type ClientOption func(*client) error
func WithRetries(retries int) ClientOption {
return func(c *client) error {
c.retries = retries
return nil
}
}
func WithWriteToLeader() ClientOption {
return func(c *client) error {
c.writeToLeader = true
return nil
}
}
package zongzi
import (
"fmt"
"strconv"
"strings"
)
// The default shard controller is a basic controller that creates and destroys replicas based on a shard tags.
type shardControllerDefault struct {
agent *Agent
log Logger
}
func newShardControllerDefault(agent *Agent) *shardControllerDefault {
return &shardControllerDefault{
log: agent.log,
agent: agent,
}
}
func (c *shardControllerDefault) Reconcile(state *State, shard Shard, controls Controls) (err error) {
c.log.Debugf("Reconciling Shard %d", shard.ID)
var (
desired = map[string]int{}
filters = map[string][]string{}
found = map[string]int{}
matches = map[string][]Host{}
occupiedHosts = map[string]bool{}
undesired = []uint64{}
vary = map[string]bool{}
varyCount = map[string]map[string]int{}
varyMatch = map[string]map[string]int{}
)
// Resolve desired state from shard tags
for tagKey, tagValue := range shard.Tags {
if !strings.HasPrefix(tagKey, "placement:") {
continue
}
if tagKey == `placement:vary` {
// ex: placement:vary=geo:zone
vary[tagValue] = true
} else if tagKey == `placement:member` {
// ex: placement:member=3;geo:region=us-central1
parts := strings.Split(tagValue, ";")
i, err := strconv.Atoi(parts[0])
if err != nil {
c.log.Warningf(`Invalid tag placement:member %s %s`, tagKey, err.Error())
continue
}
desired[`member`] = i
filters[`member`] = parts[1:]
} else if strings.HasPrefix(tagKey, `placement:replica:`) {
// ex: placement:replica:read=6;host:class=storage-replica
group := tagKey[len(`placement:replica:`):]
if len(group) == 0 {
c.log.Warningf(`Invalid tag placement:replica - "%s"`, tagKey)
continue
}
if group == `member` {
c.log.Warningf(`Invalid tag placement:replica - group name "member" is reserved.`)
continue
}
parts := strings.Split(tagValue, ";")
i, err := strconv.Atoi(parts[0])
if err != nil {
c.log.Warningf(`Invalid tag placement:replica %s %s`, tagKey, err.Error())
continue
}
desired[group] = i
filters[group] = parts[1:]
} else if tagKey == `placement:cover` {
// ex: placement:cover=host:class=compute
for _, t := range strings.Split(tagValue, ";") {
k, v := c.parseTag(t)
state.HostIterateByTag(k, func(h Host) bool {
if v == "" || h.Tags[k] == v {
desired[`cover`]++
}
return true
})
filters[`cover`] = append(filters[`cover`], t)
}
}
}
var varies bool
var groups []string
state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool {
host, _ := state.Host(replica.HostID)
groups = groups[:0]
for group := range desired {
if c.matchTagFilter(host.Tags, filters[group]) {
varies = true
for tag := range vary {
if varyCount[group] == nil {
varyCount[group] = map[string]int{}
}
v, ok := host.Tags[tag]
if !ok {
varies = false
break
}
varyCount[group][fmt.Sprintf(`%s=%s`, tag, v)]++
}
if varies {
groups = append(groups, group)
found[group]++
}
}
if len(groups) > 1 {
c.log.Infof(`Replica matched multiple groups [%05d:%05d]: %v`, shard.ID, replica.ID, groups)
}
}
if len(groups) == 0 {
c.log.Debugf(`[%05d:%05d] Undesired \n%#v\n%#v`, shard.ID, replica.ID, shard.Tags, host.Tags)
undesired = append(undesired, replica.ID)
} else {
occupiedHosts[replica.HostID] = true
}
return true
})
var excessReplicaCount = map[string]int{}
var missingReplicaCount = map[string]int{}
groups = groups[:0]
for group, n := range desired {
if found[group] > n {
excessReplicaCount[group] = found[group] - n
}
if found[group] < n {
missingReplicaCount[group] = n - found[group]
}
if group == `member` {
// Always process the member group first
groups = append([]string{group}, groups...)
} else {
groups = append(groups, group)
}
}
var requiresRebalance = map[string]bool{}
for group, tags := range varyCount {
var min int
var max int
for _, n := range tags {
if min == 0 || n < min {
min = n
}
if max == 0 || n > max {
max = n
}
if min-max < -1 || min-max > 1 {
requiresRebalance[group] = true
}
}
}
// Early exit
if len(missingReplicaCount) == 0 &&
len(excessReplicaCount) == 0 &&
len(requiresRebalance) == 0 &&
len(undesired) == 0 {
return
}
// Delete undesired replicas
for _, replicaID := range undesired {
if err = controls.Delete(replicaID); err != nil {
c.log.Errorf(`Error deleting replica: %s`, err.Error())
return
}
// Early exit just simplifies the logic
return
}
// Find matching hosts for each group
state.HostIterate(func(host Host) bool {
if _, ok := occupiedHosts[host.ID]; ok {
return true
}
for group, n := range desired {
if found[group] == n {
continue
}
if c.matchTagFilter(host.Tags, filters[group]) {
matches[group] = append(matches[group], host)
for tag := range vary {
v, _ := host.Tags[tag]
if _, ok := varyMatch[group]; !ok {
varyMatch[group] = map[string]int{}
}
varyMatch[group][fmt.Sprintf(`%s=%s`, tag, v)]++
}
}
}
return true
})
// TODO - Rebalance (maybe belongs in its own controller)
// for group := range requiresRebalance {}
// TODO - Remove excess replicas (deciding which to remove while retaining balance)
// for group, n := range excessReplicaCount {}
// Add missing replicas
for _, group := range groups {
var n = missingReplicaCount[group]
for range n {
if len(matches[group]) == 0 {
err = fmt.Errorf(`No more matching hosts`)
break
}
// Find the vary tag values with the fewest replicas
var varyTagValues = map[string]string{}
var varyTagCounts = map[string]int{}
for tag, replicaCount := range varyCount[group] {
if varyMatch[group][tag] == 0 {
// Don't even try this vary tag because it has no remaining hosts.
continue
}
k, v := c.parseTag(tag)
if varyTagCounts[k] == 0 || varyTagCounts[k] < replicaCount {
varyTagValues[k] = v
varyTagCounts[k] = replicaCount
}
}
if len(varyTagValues) < len(varyCount[group]) {
err = fmt.Errorf(`Failed to find an available host matching vary criteria`)
break
}
// TODO - Ensure a host is available with the full tag set rather than each individually to avoid pathological edge case w/ multiple vary tags.
// Build vary tag set with fewest replicas
var varyTags []string
for tagKey, replicaCount := range varyTagCounts {
if replicaCount > 0 && group == `member` {
// The smallest vary tag value has a member already. If this group is the member group then
// we give up because we never schedule members in a way that violates the vary policy.
err = fmt.Errorf(`Unable to find host that satisfies member vary policy for shard`)
break
}
varyTags = append(varyTags, fmt.Sprintf(`%s=%s`, tagKey, varyTagValues[tagKey]))
}
var success bool
var replicaID uint64
for j, host := range matches[group] {
if _, ok := occupiedHosts[host.ID]; ok {
// Host already occupied
continue
}
if len(varyTags) == 0 || c.matchTagFilter(host.Tags, varyTags) {
replicaID, err = controls.Create(host.ID, shard.ID, group != `member`)
if err != nil {
return
}
for _, varyTag := range varyTags {
tagKey, _ := c.parseTag(varyTag)
varyMatch[group][tagKey]--
varyCount[group][varyTag]++
}
c.log.Infof(`[%05d:%05d] Created replica for %s`, shard.ID, replicaID, shard.Name)
matches[group] = append(matches[group][:j], matches[group][j+1:]...)
occupiedHosts[host.ID] = true
success = true
break
}
}
if !success {
err = fmt.Errorf(`Unable to find host with matching vary tag set %+v`, varyTags)
break
}
}
if err != nil {
c.log.Warningf("[%05d] (%s/%s) %s", shard.ID, shard.Name, group, err.Error())
err = nil
}
}
return
}
func (c *shardControllerDefault) parseTag(tag string) (k, v string) {
i := strings.Index(tag, "=")
if i < 1 {
return tag, ""
}
return tag[:i], tag[i+1:]
}
func (c *shardControllerDefault) matchTagFilter(src map[string]string, tags []string) bool {
for _, tag := range tags {
if len(tag) == 0 {
continue
}
k, v := c.parseTag(tag)
if _, ok := src[k]; !ok {
// Tag key not present
return false
}
if src[k] != v {
// Tag value does not match
return false
}
}
return true
}
package zongzi
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/benbjohnson/clock"
)
// The controllerManager creates and destroys replicas based on a shard tags.
type controllerManager struct {
agent *Agent
clock clock.Clock
ctx context.Context
ctxCancel context.CancelFunc
index uint64
isLeader atomic.Bool
log Logger
mutex sync.RWMutex
controller Controller
wg sync.WaitGroup
}
func newControllerManager(agent *Agent) *controllerManager {
return &controllerManager{
log: agent.log,
agent: agent,
clock: clock.New(),
controller: newShardControllerDefault(agent),
}
}
type Controller interface {
Reconcile(*State, Shard, Controls) error
}
func (c *controllerManager) Start() (err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := c.clock.Ticker(500 * time.Millisecond)
defer t.Stop()
for {
select {
case <-c.ctx.Done():
c.log.Infof("Shard controller manager stopped")
return
case <-t.C:
c.tick()
}
}
}()
return
}
func (c *controllerManager) tick() {
c.mutex.Lock()
defer c.mutex.Unlock()
var err error
var hadErr bool
var index uint64
var updated = true
var controls = newControls(c.agent)
if c.isLeader.Load() {
for updated {
updated = false
err = c.agent.State(c.ctx, func(state *State) {
index = state.Index()
state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool {
select {
case <-c.ctx.Done():
return false
default:
}
if shard.ID == 0 {
return true
}
controls.updated = false
err = c.controller.Reconcile(state, shard, controls)
if err != nil {
hadErr = true
c.log.Warningf("Error resolving shard %d %s %s", shard.ID, shard.Name, err.Error())
c.agent.tagsSet(shard, fmt.Sprintf(`zongzi:controller:error=%s`, err.Error()))
} else if _, ok := shard.Tags[`zongzi:controller:error`]; ok {
c.agent.tagsRemove(shard, `zongzi:controller:error`)
}
if controls.updated {
// We break the iterator on update in order to catch a fresh snapshot for the next shard.
// This ensures that changes applied during reconciliation of this shard will be visible to
// reconciliation of the next shard.
return false
}
return true
})
updated = controls.updated
})
if err != nil {
hadErr = true
}
}
}
if !hadErr && index > c.index {
c.log.Debugf("%s Finished processing %d", c.agent.hostID(), index)
// c.agent.dumpState()
c.index = index
}
}
func (c *controllerManager) LeaderUpdated(info LeaderInfo) {
c.log.Infof("[%05d:%05d] LeaderUpdated: %05d (term %d)", info.ShardID, info.ReplicaID, info.LeaderID, info.Term)
if info.ShardID == 0 {
c.isLeader.Store(info.LeaderID == info.ReplicaID)
}
if c.isLeader.Load() && c.agent.Status() == AgentStatus_Ready && info.LeaderID != 0 {
c.agent.shardLeaderSet(info.ShardID, info.LeaderID, info.Term)
c.log.Warningf("[%05d:%05d] Leader Set: %d (term %d)", info.ShardID, info.ReplicaID, info.LeaderID, info.Term)
}
}
func (c *controllerManager) Stop() {
defer c.log.Infof(`Stopped controllerManager`)
if c.ctxCancel != nil {
c.ctxCancel()
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.index = 0
}
package zongzi
type Controls interface {
Create(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error)
Delete(replicaID uint64) error
}
func newControls(a *Agent) *controls {
return &controls{a, false}
}
type controls struct {
agent *Agent
updated bool
}
func (sc *controls) Create(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error) {
id, err = sc.agent.replicaCreate(hostID, shardID, isNonVoting)
if err == nil {
sc.updated = true
}
return
}
func (sc *controls) Delete(replicaID uint64) (err error) {
err = sc.agent.replicaDelete(replicaID)
if err == nil {
sc.updated = true
}
return
}
package zongzi
import (
"fmt"
"strings"
)
type ShardOption func(*Shard) error
func WithName(name string) ShardOption {
return func(s *Shard) error {
s.Name = name
return nil
}
}
func WithPlacementMembers(n int, tags ...string) ShardOption {
return func(s *Shard) error {
s.Tags[`placement:member`] = fmt.Sprintf(`%d;%s`, n, strings.Join(tags, ";"))
return nil
}
}
func WithPlacementReplicas(group string, n int, tags ...string) ShardOption {
return func(s *Shard) error {
s.Tags[`placement:replica:`+group] = fmt.Sprintf(`%d;%s`, n, strings.Join(tags, ";"))
return nil
}
}
func WithPlacementVary(tagKeys ...string) ShardOption {
return func(s *Shard) error {
s.Tags[`placement:vary`] = strings.Join(tagKeys, ";")
return nil
}
}
func WithPlacementCover(tagKeys ...string) ShardOption {
return func(s *Shard) error {
s.Tags[`placement:cover`] = strings.Join(tagKeys, ";")
return nil
}
}
func WithTag(k string, v any) ShardOption {
return func(s *Shard) error {
s.Tags[k] = fmt.Sprintf(`%v`, v)
return nil
}
}
package zongzi
import (
"context"
"fmt"
"io"
"github.com/lni/dragonboat/v4/statemachine"
)
// StateMachineFactory is a function that returns a StateMachine
type StateMachineFactory = func(shardID uint64, replicaID uint64) StateMachine
func stateMachineFactoryShim(fn StateMachineFactory) statemachine.CreateConcurrentStateMachineFunc {
return func(shardID uint64, replicaID uint64) statemachine.IConcurrentStateMachine {
return &stateMachineShim{fn(shardID, replicaID)}
}
}
// StateMachine is a deterministic finite state machine. Snapshots are requested during log compaction to ensure that
// the in-memory state can be recovered following a restart. If you expect a dataset larger than memory, a persistent
// state machine may be more appropriate.
//
// Lookup may be called concurrently with Update and SaveSnapshot. It is the caller's responsibility to ensure that
// snapshots are generated using snapshot isolation. This can be achieved using Multi Version Concurrency Control
// (MVCC). A simple mutex can also be used if blocking writes during read is acceptable.
type StateMachine interface {
Update(entries []Entry) []Entry
Query(ctx context.Context, query []byte) *Result
Watch(ctx context.Context, query []byte, result chan<- *Result)
PrepareSnapshot() (cursor any, err error)
SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error
RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error
Close() error
}
var _ statemachine.IConcurrentStateMachine = (*stateMachineShim)(nil)
type stateMachineShim struct {
sm StateMachine
}
func (shim *stateMachineShim) Update(entries []Entry) (responses []Entry, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(`%v`, r)
}
}()
responses = shim.sm.Update(entries)
return
}
func (shim *stateMachineShim) Lookup(query any) (res any, err error) {
if q, ok := query.(*lookupQuery); ok {
res = shim.sm.Query(q.ctx, q.data)
return
}
if q, ok := query.(*watchQuery); ok {
shim.sm.Watch(q.ctx, q.data, q.result)
return
}
return
}
func (shim *stateMachineShim) PrepareSnapshot() (cursor any, err error) {
return shim.sm.PrepareSnapshot()
}
func (shim *stateMachineShim) SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error {
return shim.sm.SaveSnapshot(cursor, w, c, close)
}
func (shim *stateMachineShim) RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error {
return shim.sm.RecoverFromSnapshot(r, f, close)
}
func (shim *stateMachineShim) Close() error {
return shim.sm.Close()
}
package zongzi
import (
"context"
"fmt"
"io"
"github.com/lni/dragonboat/v4/statemachine"
)
// StateMachinePersistentFactory is a function that returns a StateMachinePersistent
type StateMachinePersistentFactory = func(shardID uint64, replicaID uint64) StateMachinePersistent
func stateMachinePersistentFactoryShim(fn StateMachinePersistentFactory) statemachine.CreateOnDiskStateMachineFunc {
return func(shardID uint64, replicaID uint64) statemachine.IOnDiskStateMachine {
return &stateMachinePersistentShim{fn(shardID, replicaID)}
}
}
// StateMachinePersistent is a StateMachine where the state is persisted to a medium (such as disk) that can survive
// restart. During compaction, calls to Snapshot are replaced with calls to Sync which effectively flushes state to
// the persistent medium. SaveSnapshot and RecoverFromSnapshot are used to replicate full on-disk state to new replicas.
type StateMachinePersistent interface {
Open(stopc <-chan struct{}) (index uint64, err error)
Update(entries []Entry) []Entry
Query(ctx context.Context, query []byte) *Result
Watch(ctx context.Context, query []byte, result chan<- *Result)
PrepareSnapshot() (cursor any, err error)
SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error
RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error
Sync() error
Close() error
}
var _ statemachine.IOnDiskStateMachine = (*stateMachinePersistentShim)(nil)
type stateMachinePersistentShim struct {
sm StateMachinePersistent
}
func (shim *stateMachinePersistentShim) Open(stopc <-chan struct{}) (index uint64, err error) {
return shim.sm.Open(stopc)
}
func (shim *stateMachinePersistentShim) Update(entries []Entry) (responses []Entry, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(`%v`, r)
}
}()
responses = shim.sm.Update(entries)
return
}
func (shim *stateMachinePersistentShim) Lookup(query any) (res any, err error) {
if q, ok := query.(*lookupQuery); ok {
res = shim.sm.Query(q.ctx, q.data)
return
}
if q, ok := query.(*watchQuery); ok {
shim.sm.Watch(q.ctx, q.data, q.result)
return
}
return
}
func (shim *stateMachinePersistentShim) PrepareSnapshot() (cursor any, err error) {
return shim.sm.PrepareSnapshot()
}
func (shim *stateMachinePersistentShim) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error {
return shim.sm.SaveSnapshot(cursor, w, close)
}
func (shim *stateMachinePersistentShim) RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error {
return shim.sm.RecoverFromSnapshot(r, close)
}
func (shim *stateMachinePersistentShim) Sync() error {
return shim.sm.Sync()
}
func (shim *stateMachinePersistentShim) Close() error {
return shim.sm.Close()
}
package zongzi
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/logger"
"github.com/lni/dragonboat/v4/plugin/tan"
"github.com/lni/dragonboat/v4/raftio"
"github.com/lni/dragonboat/v4/statemachine"
)
// force google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb to stay in go.mod
import _ "google.golang.org/genproto/protobuf/ptype"
const (
projectName = "zongzi"
projectUri = "zongzi://github.com/logbn/zongzi"
minReplicas = 3
raftTimeout = time.Second
joinTimeout = 5 * time.Second
waitPeriod = 500 * time.Millisecond
)
const (
DefaultGossipAddress = "127.0.0.1:17001"
DefaultRaftAddress = "127.0.0.1:17002"
DefaultApiAddress = "127.0.0.1:17003"
ShardID = 0
)
var (
DefaultHostConfig = HostConfig{
NodeHostDir: "/var/lib/zongzi/raft",
RaftAddress: DefaultRaftAddress,
RTTMillisecond: 10,
WALDir: "/var/lib/zongzi/wal",
Expert: config.ExpertConfig{
LogDBFactory: tan.Factory,
LogDB: config.GetMediumMemLogDBConfig(),
},
}
DefaultReplicaConfig = ReplicaConfig{
CheckQuorum: true,
CompactionOverhead: 10000,
ElectionRTT: 100,
EntryCompressionType: config.Snappy,
HeartbeatRTT: 10,
OrderedConfigChange: true,
Quiesce: false,
SnapshotCompressionType: config.Snappy,
SnapshotEntries: 10000,
}
)
type (
nodeHostInfoOption = dragonboat.NodeHostInfoOption
HostConfig = config.NodeHostConfig
ReplicaConfig = config.Config
GossipConfig = config.GossipConfig
LogDBConfig = config.LogDBConfig
EngineConfig = config.EngineConfig
LeaderInfo = raftio.LeaderInfo
RaftEventListener = raftio.IRaftEventListener
SystemEventListener = raftio.ISystemEventListener
Entry = statemachine.Entry
Result = statemachine.Result
SnapshotFile = statemachine.SnapshotFile
SnapshotFileCollection = statemachine.ISnapshotFileCollection
LogLevel = logger.LogLevel
Logger = logger.ILogger
AgentStatus string
HostStatus string
ShardStatus string
ReplicaStatus string
)
var (
GetLogger = logger.GetLogger
GetDefaultEngineConfig = config.GetDefaultEngineConfig
)
const (
LogLevelCritical = logger.CRITICAL
LogLevelError = logger.ERROR
LogLevelWarning = logger.WARNING
LogLevelInfo = logger.INFO
LogLevelDebug = logger.DEBUG
)
const (
AgentStatus_Initializing = AgentStatus("initializing")
AgentStatus_Joining = AgentStatus("joining")
AgentStatus_Pending = AgentStatus("pending")
AgentStatus_Ready = AgentStatus("ready")
AgentStatus_Rejoining = AgentStatus("rejoining")
AgentStatus_Stopped = AgentStatus("stopped")
HostStatus_Active = HostStatus("active")
HostStatus_Gone = HostStatus("gone")
HostStatus_Missing = HostStatus("missing")
HostStatus_New = HostStatus("new")
HostStatus_Recovering = HostStatus("recovering")
ShardStatus_Active = ShardStatus("active")
ShardStatus_Closed = ShardStatus("closed")
ShardStatus_Closing = ShardStatus("closing")
ShardStatus_New = ShardStatus("new")
ShardStatus_Unavailable = ShardStatus("unavailable")
ReplicaStatus_Active = ReplicaStatus("active")
ReplicaStatus_Closed = ReplicaStatus("closed")
ReplicaStatus_Closing = ReplicaStatus("closing")
ReplicaStatus_Bootstrapping = ReplicaStatus("bootstrapping")
ReplicaStatus_Joining = ReplicaStatus("joining")
ReplicaStatus_New = ReplicaStatus("new")
)
var (
ErrAborted = dragonboat.ErrAborted
ErrCanceled = dragonboat.ErrCanceled
ErrRejected = dragonboat.ErrRejected
ErrShardClosed = dragonboat.ErrShardClosed
ErrShardNotReady = dragonboat.ErrShardNotReady
ErrTimeout = dragonboat.ErrTimeout
ErrAgentNotReady = fmt.Errorf("Agent not ready")
ErrHostNotFound = fmt.Errorf(`Host not found`)
ErrIDOutOfRange = fmt.Errorf(`ID out of range`)
ErrInvalidFactory = fmt.Errorf(`Invalid Factory`)
ErrInvalidGossipAddr = fmt.Errorf(`Invalid gossip address`)
ErrReplicaNotActive = fmt.Errorf("Replica not active")
ErrReplicaNotAllowed = fmt.Errorf("Replica not allowed")
ErrReplicaNotFound = fmt.Errorf("Replica not found")
ErrShardExists = fmt.Errorf(`Shard already exists`)
ErrShardNotFound = fmt.Errorf(`Shard not found`)
ErrInvalidNumberOfArguments = fmt.Errorf(`Invalid number of arguments`)
// ErrClusterNameInvalid indicates that the clusterName is invalid
// Base36 supports only lowercase alphanumeric characters
ErrClusterNameInvalid = fmt.Errorf("Invalid cluster name (base36 maxlen 12)")
ClusterNameRegex = `^[a-z0-9]{1,12}$`
// ErrNotifyCommitDisabled is logged when non-linearizable writes are requested but disabled.
// Set property `NotifyCommit` to `true` in `HostConfig` to add support for non-linearizable writes.
ErrNotifyCommitDisabled = fmt.Errorf("Attempted to make a non-linearizable write while NotifyCommit is disabled")
)
type (
lookupQuery struct {
ctx context.Context
data []byte
}
watchQuery struct {
ctx context.Context
data []byte
result chan *Result
}
)
func (q *lookupQuery) Release() {
q.ctx = nil
q.data = q.data[:0]
lookupQueryPool.Put(q)
}
var lookupQueryPool = sync.Pool{New: func() any { return &lookupQuery{} }}
func getLookupQuery() *lookupQuery {
return lookupQueryPool.Get().(*lookupQuery)
}
func newLookupQuery(ctx context.Context, data []byte) (q *lookupQuery) {
q = lookupQueryPool.Get().(*lookupQuery)
q.ctx = ctx
q.data = data
return q
}
func (q *watchQuery) Release() {
q.ctx = nil
q.data = q.data[:0]
q.result = nil
watchQueryPool.Put(q)
}
var watchQueryPool = sync.Pool{New: func() any { return &watchQuery{} }}
func getWatchQuery() *watchQuery {
return watchQueryPool.Get().(*watchQuery)
}
func newWatchQuery(ctx context.Context, data []byte, result chan *Result) (q *watchQuery) {
q = watchQueryPool.Get().(*watchQuery)
q.ctx = ctx
q.data = data
q.result = result
return
}
var resultPool = sync.Pool{New: func() any { return &Result{} }}
func ReleaseResult(r *Result) {
r.Value = 0
r.Data = r.Data[:0]
resultPool.Put(r)
}
// GetResult can be used to efficiently retrieve an empty Result from a global pool. It is recommended to use this
// method to instantiate Result objects returned by Lookup or sent over Watch channels as they will be automatically
// returned to the pool to reduce allocation overhead.
func GetResult() *Result {
return resultPool.Get().(*Result)
}
var requestStatePool = sync.Pool{New: func() any { return &dragonboat.RequestState{} }}
func getRequestState() *dragonboat.RequestState {
return requestStatePool.Get().(*dragonboat.RequestState)
}
func mustBase36Decode(name string) uint64 {
id, err := base36Decode(name)
if err != nil {
panic(err)
}
return id
}
func base36Decode(name string) (uint64, error) {
return strconv.ParseUint(name, 36, 64)
}
func base36Encode(id uint64) string {
return strconv.FormatUint(id, 36)
}
type compositeRaftEventListener struct {
listeners []raftio.IRaftEventListener
}
func newCompositeRaftEventListener(listeners ...raftio.IRaftEventListener) raftio.IRaftEventListener {
return &compositeRaftEventListener{listeners}
}
func (c *compositeRaftEventListener) LeaderUpdated(info LeaderInfo) {
for _, listener := range c.listeners {
if listener != nil {
listener.LeaderUpdated(info)
}
}
}
// SetLogLevel sets log level for all zongzi and dragonboat loggers.
//
// Recommend [LogLevelWarning] for production.
func SetLogLevel(level LogLevel) {
logger.GetLogger("dragonboat").SetLevel(level)
logger.GetLogger("gossip").SetLevel(level)
logger.GetLogger("grpc").SetLevel(level)
logger.GetLogger("logdb").SetLevel(level)
logger.GetLogger("raft").SetLevel(level)
logger.GetLogger("rsm").SetLevel(level)
logger.GetLogger("transport").SetLevel(level)
logger.GetLogger("zongzi").SetLevel(level)
logger.GetLogger("tan").SetLevel(level)
logger.GetLogger("registry").SetLevel(level)
logger.GetLogger("config").SetLevel(level)
}
// SetLogLevelDebug sets a debug log level for most loggers.
// but filters out loggers having tons of debug output.
func SetLogLevelDebug() {
SetLogLevel(logger.DEBUG)
logger.GetLogger("dragonboat").SetLevel(logger.WARNING)
logger.GetLogger("gossip").SetLevel(logger.ERROR)
logger.GetLogger("raft").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
}
// SetLogLevelProduction sets a good log level for production (gossip logger is a bit noisy).
func SetLogLevelProduction() {
SetLogLevel(logger.WARNING)
logger.GetLogger("gossip").SetLevel(logger.ERROR)
}
func parseUint64(s string) (uint64, error) {
i, err := strconv.Atoi(s)
return uint64(i), err
}
func keys[K comparable, V any](m map[K]V) []K {
r := make([]K, 0, len(m))
for k := range m {
r = append(r, k)
}
return r
}
func sliceContains[T comparable](slice []T, value T) bool {
for _, item := range slice {
if item == value {
return true
}
}
return false
}
func sliceWithout[T comparable](slice []T, exclude T) []T {
var out []T
for _, v := range slice {
if v != exclude {
out = append(out, v)
}
}
return out
}