package zongzi import ( "bytes" "context" "fmt" "net" "regexp" "sort" "strings" "sync" "time" "github.com/benbjohnson/clock" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/logger" "github.com/logbn/zongzi/internal" ) type Agent struct { advertiseAddress string bindAddress string clusterName string hostController *hostController controllerManager *controllerManager clientManager *clientManager fsm *fsm grpcClientPool *grpcClientPool grpcServer *grpcServer host *dragonboat.NodeHost hostConfig HostConfig hostTags []string log logger.ILogger members map[uint64]string peers []string replicaConfig ReplicaConfig shardTypes map[string]shardType status AgentStatus clock clock.Clock ctx context.Context ctxCancel context.CancelFunc mutex sync.RWMutex wg sync.WaitGroup } type shardType struct { Config ReplicaConfig StateMachineFactory StateMachineFactory StateMachinePersistentFactory StateMachinePersistentFactory Uri string } func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent, err error) { if !regexp.MustCompile(ClusterNameRegex).MatchString(clusterName) { err = fmt.Errorf(`%v: (%s)`, ErrClusterNameInvalid, clusterName) return } log := logger.GetLogger(projectName) sort.Strings(peers) a = &Agent{ clock: clock.New(), clusterName: clusterName, log: log, peers: peers, shardTypes: map[string]shardType{}, status: AgentStatus_Pending, } a.controllerManager = newControllerManager(a) a.clientManager = newClientManager(a) for _, opt := range append([]AgentOption{ WithApiAddress(DefaultApiAddress), WithGossipAddress(DefaultGossipAddress), WithHostConfig(DefaultHostConfig), WithReplicaConfig(DefaultReplicaConfig), }, opts...) { opt(a) } a.hostController = newHostController(a) a.hostConfig.RaftEventListener = newCompositeRaftEventListener( a.controllerManager, a.hostConfig.RaftEventListener, ) a.replicaConfig.ShardID = ShardID a.hostConfig.DeploymentID = mustBase36Decode(clusterName) a.hostConfig.DefaultNodeRegistryEnabled = true a.hostConfig.Gossip.Meta = []byte(a.advertiseAddress) a.grpcClientPool = newGrpcClientPool(1e4) a.grpcServer = newGrpcServer(a.bindAddress) return a, nil } // Client returns a client for a specific shard. // It will send writes to the nearest member and send reads to the nearest replica (by ping). func (a *Agent) Client(shardID uint64, opts ...ClientOption) (c ShardClient) { c, _ = newClient(a.clientManager, shardID, opts...) return } // ShardCreate creates a new shard. If shard name option is provided and shard exists, found shard is returned. func (a *Agent) ShardCreate(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) { shard = Shard{ Status: ShardStatus_New, Type: uri, Tags: map[string]string{}, } for _, opt := range opts { if err = opt(&shard); err != nil { return } } if len(shard.Name) > 0 { var ok bool var found Shard a.State(ctx, func(state *State) { found, ok = state.ShardFindByName(shard.Name) }) if ok { shard = found return } } res, err := a.primePropose(newCmdShardPost(shard)) if err != nil { return } shard.ID = res.Value created = true a.log.Infof("Shard created %s, %d, %s", uri, shard.ID, shard.Name) return } // ShardDelete deletes a shard. func (a *Agent) ShardDelete(ctx context.Context, id uint64) (err error) { res, err := a.primePropose(newCmdShardDel(id)) if err != nil { return } if res.Value == 0 { err = fmt.Errorf("Error deleting shard %d: %s", id, string(res.Data)) return } a.log.Infof("Shard deleted (%d)", id) return } // ShardFind returns a shard by id. func (a *Agent) ShardFind(ctx context.Context, id uint64) (shard Shard, err error) { err = a.State(ctx, func(state *State) { shard, _ = state.Shard(id) }) return } // ShardUpdate creates a new shard. If shard name option is provided and shard exists, found shard is returned. func (a *Agent) ShardUpdate(ctx context.Context, id uint64, opts ...ShardOption) (shard Shard, err error) { var found Shard var ok bool a.State(ctx, func(state *State) { found, ok = state.Shard(id) }) if !ok { err = ErrShardNotFound return } for _, opt := range opts { opt(&found) } shard = found res, err := a.primePropose(newCmdShardPut(shard)) if err != nil { return } shard.ID = res.Value a.log.Infof("Shard updated (%d)", id) return } // Start starts the agent, bootstrapping the cluster if required. func (a *Agent) Start(ctx context.Context) (err error) { var init bool defer func() { if err == nil { a.hostController.Start() a.controllerManager.Start() a.clientManager.Start() a.setStatus(AgentStatus_Ready) } }() a.ctx, a.ctxCancel = context.WithCancel(ctx) // Start gRPC server a.wg.Add(1) go func() { defer a.wg.Done() defer a.log.Debugf("Stopped gRPC Server") for { if err := a.grpcServer.Start(a); err != nil { a.log.Errorf("Error starting gRPC server: %s", err.Error()) } select { case <-a.ctx.Done(): return case <-a.clock.After(waitPeriod): } } }() // Resolve member gossip addresses a.hostConfig.Gossip.AdvertiseAddress, err = a.gossipIP(a.hostConfig.Gossip.AdvertiseAddress) if err != nil { a.log.Errorf(`Failed to resolve gossip advertise address: %v`, err) return } a.hostConfig.Gossip.Seed, err = a.resolvePeerGossipSeed() if err != nil { a.log.Errorf(`Failed to resolve gossip seeds: %v`, err) return } // Start node host if a.host, err = dragonboat.NewNodeHost(a.hostConfig); err != nil { a.log.Errorf(`Failed to start host: %v`, err) return } a.log.Infof(`Started host "%s"`, a.hostID()) // Find prime replicaID a.replicaConfig.ReplicaID = a.findLocalReplicaID(a.replicaConfig.ShardID) existing := a.replicaConfig.ReplicaID > 0 a.replicaConfig.IsNonVoting = !sliceContains(a.peers, a.advertiseAddress) // Resolve prime member replicaIDs a.members, init, err = a.resolvePrimeMembership() if err != nil { a.log.Errorf(`Failed to resolve prime membership: %v`, err) return } if a.replicaConfig.ReplicaID == 0 { // Request Join if cluster found but replica does not exist a.setStatus(AgentStatus_Joining) for { a.replicaConfig.ReplicaID, err = a.joinPrimeShard() if err != nil { a.log.Warningf(`Failed to join prime shard: %v`, err) a.clock.Sleep(waitPeriod) continue } break } for { err = a.startPrimeReplica(nil, true) if err != nil { a.log.Warningf("Error startPrimeReplica: (%s) %s", AgentStatus_Joining, err.Error()) a.clock.Sleep(waitPeriod) continue } break } } else if !existing { if init { // Init if cluster is new a.setStatus(AgentStatus_Initializing) err = a.startPrimeReplica(a.members, false) if err != nil { a.log.Errorf(`Failed to startPrimeReplica: %v`, err) return } err = a.primeInit(a.members) if err != nil { a.log.Errorf(`Failed to primeInit: %v`, err) return } } else { // Join if replica should exist but doesn't a.setStatus(AgentStatus_Joining) err = a.startPrimeReplica(a.members, false) if err != nil { a.log.Errorf(`Failed to startPrimeReplica: %v`, err) return } err = a.primeInitAwait() if err != nil { a.log.Errorf(`Failed to primeInitAwait: %v`, err) return } } } else { // Otherwise just rejoin the shard a.setStatus(AgentStatus_Rejoining) err = a.startPrimeReplica(nil, false) if err != nil { a.log.Errorf(`Failed to startPrimeReplica: %v`, err) return } } // Update host status, meta, etc. err = a.updateHost() if err != nil { a.log.Errorf(`Failed to updateHost: %v`, err) return } // Update replica status err = a.updateReplica() if err != nil { a.log.Errorf(`Failed to updateReplica: %v`, err) return } return } // State executes a callback function passing a snapshot of the cluster state. // // err := agent.State(ctx, func(s *State) error { // log.Println(s.Index()) // return nil // }) // // Linear reads are enabled by default to achieve "Read Your Writes" consistency following a proposal. Pass optional // argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot // isolation, even for stale reads. // // State will block indefinitely if the prime shard is unavailable. This may prevent the agent from stopping gracefully. // Pass a timeout context to avoid blocking indefinitely. // // State is thread safe and will not block writes. // // State read will be stale (non-linearizable) when ctx is nil. func (a *Agent) State(ctx context.Context, fn func(*State)) (err error) { if ctx != nil { err = a.index(ctx, a.replicaConfig.ShardID) if err != nil { return } } fn(a.fsm.state.withTxn(false)) return } // StateMachineRegister registers a non-persistent shard type. Call before Starting agent. func (a *Agent) StateMachineRegister(uri string, factory any, config ...ReplicaConfig) (err error) { cfg := DefaultReplicaConfig if len(config) > 0 { cfg = config[0] } t := shardType{ Config: cfg, Uri: uri, } switch factory.(type) { case StateMachineFactory: t.StateMachineFactory = factory.(StateMachineFactory) case StateMachinePersistentFactory: t.StateMachinePersistentFactory = factory.(StateMachinePersistentFactory) default: return ErrInvalidFactory } a.shardTypes[uri] = t return } // Status returns the agent status. func (a *Agent) Status() AgentStatus { a.mutex.RLock() defer a.mutex.RUnlock() return a.status } // Stop stops the agent. func (a *Agent) Stop() { a.controllerManager.Stop() a.hostController.Stop() a.grpcServer.Stop() a.ctxCancel() if a.host != nil { a.host.Close() } a.wg.Wait() a.setStatus(AgentStatus_Stopped) } // hostID returns host ID if host is initialized, otherwise empty string. func (a *Agent) hostID() (id string) { if a.host != nil { return a.host.ID() } return "" } // hostClient returns a Client for a specific host. func (a *Agent) hostClient(hostID string) (c hostClient) { a.State(a.ctx, func(s *State) { host, ok := s.Host(hostID) if ok { c = newhostClient(a, host) } }) return } // tagsSet sets tags on an item (Host, Shard or Replica). Overwrites if tag is already present. func (a *Agent) tagsSet(item any, tags ...string) (err error) { _, err = a.primePropose(newCmdTagsSet(item, tags...)) return } // tagsSetNX sets tags on an item (Host, Shard or Replica). Does nothing if tag is already present. func (a *Agent) tagsSetNX(item any, tags ...string) (err error) { _, err = a.primePropose(newCmdTagsSetNX(item, tags...)) return } // tagsRemove remove tags from an item (Host, Shard or Replica). func (a *Agent) tagsRemove(item any, tags ...string) (err error) { _, err = a.primePropose(newCmdTagsRemove(item, tags...)) return } // replicaCreate creates a replica func (a *Agent) replicaCreate(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error) { res, err := a.primePropose(newCmdReplicaPost(hostID, shardID, isNonVoting)) if err == nil { id = res.Value a.log.Infof("[%05d:%05d] Replica created %s, %v", shardID, id, hostID, isNonVoting) } return } // replicaDelete deletes a replica func (a *Agent) replicaDelete(replicaID uint64) (err error) { _, err = a.primePropose(newCmdReplicaDelete(replicaID)) if err == nil { a.log.Infof("Replica deleted %05d", replicaID) } return } // shardLeaderSet sets the leader of a shard func (a *Agent) shardLeaderSet(shardID, replicaID, term uint64) (err error) { _, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID, term)) if err == nil { a.log.Infof("Shard %05d leader set to %05d", shardID, replicaID) } return } func (a *Agent) setStatus(s AgentStatus) { a.mutex.Lock() defer a.mutex.Unlock() a.log.Infof("%s Agent Status: %v", a.hostID(), s) a.status = s } func (a *Agent) stopReplica(cfg ReplicaConfig) (err error) { err = a.host.StopReplica(cfg.ShardID, cfg.ReplicaID) if err != nil { return fmt.Errorf("Failed to stop replica: %w", err) } return } func (a *Agent) gossipIP(peerApiAddr string) (ipAddr string, err error) { parts := strings.Split(peerApiAddr, ":") if len(parts) != 2 { err = fmt.Errorf("%w: %s", ErrInvalidGossipAddr, peerApiAddr) return } if net.ParseIP(parts[0]) != nil { return peerApiAddr, nil } ips, err := net.LookupIP(parts[0]) if err != nil { return } if len(ips) == 0 { err = fmt.Errorf("%w: %s", ErrInvalidGossipAddr, peerApiAddr) } else { ipAddr = ips[0].String() + ":" + parts[1] } return } // resolvePeerGossipSeed resolves peer api address list to peer gossip address list func (a *Agent) resolvePeerGossipSeed() (gossip []string, err error) { a.wg.Add(1) defer a.wg.Done() for { gossip = gossip[:0] for _, peerApiAddr := range a.peers { if peerApiAddr == a.advertiseAddress { ipAddr, err := a.gossipIP(a.hostConfig.Gossip.AdvertiseAddress) if err == nil { gossip = append(gossip, ipAddr) } else { a.log.Warningf(err.Error()) } continue } var res *internal.ProbeResponse ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() res, err = a.grpcClientPool.get(peerApiAddr).Probe(ctx, &internal.ProbeRequest{}) if err == nil && res != nil { ipAddr, err := a.gossipIP(res.GossipAdvertiseAddress) if err == nil { gossip = append(gossip, ipAddr) } else { a.log.Warningf(err.Error()) } } else if err != nil && !strings.HasSuffix(err.Error(), `connect: connection refused"`) { a.log.Warningf("No probe response for %s %+v %v", peerApiAddr, res, err.Error()) } } a.log.Infof("Peers: %#v", a.peers) a.log.Infof("Found %d of %d peers %+v", len(gossip), len(a.peers), gossip) if len(gossip) < len(a.peers) { select { case <-a.ctx.Done(): err = fmt.Errorf(`Cancelling findGossip (agent stopped)`) return case <-a.clock.After(waitPeriod): } continue } break } return } // resolvePrimeMembership resolves peer raft address list to replicaID and hostID func (a *Agent) resolvePrimeMembership() (members map[uint64]string, init bool, err error) { a.wg.Add(1) defer a.wg.Done() for { members = map[uint64]string{} var uninitialized = map[string]string{} // Get host info from all peers to determine which are initialized. for _, apiAddr := range a.peers { var info *internal.InfoResponse if apiAddr == a.advertiseAddress && a.hostID() != "" { info = &internal.InfoResponse{ ReplicaId: a.replicaConfig.ReplicaID, HostId: a.hostID(), } } else { ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() info, err = a.grpcClientPool.get(apiAddr).Info(ctx, &internal.InfoRequest{}) if err != nil { return } } if len(info.HostId) == 0 { continue } if info.ReplicaId == 0 { uninitialized[apiAddr] = info.HostId } else { members[info.ReplicaId] = info.HostId } } a.log.Infof("Found %d of %d peers (%d uninitialized)", len(members), len(a.peers), len(uninitialized)) // All peers resolved. Start agent. if len(members) == len(a.peers) { break } // All peers uninitialized. Initialize cluster. if len(uninitialized) == len(a.peers) { for i, apiAddr := range a.peers { replicaID := uint64(i + 1) members[replicaID] = uninitialized[apiAddr] if apiAddr == a.advertiseAddress { a.replicaConfig.ReplicaID = replicaID } } if a.replicaConfig.ReplicaID > 0 { init = true break } } // Some peers initialized. Retrieve member list from initialized host. if len(members)+len(uninitialized) == len(a.peers) { var res *internal.MembersResponse for _, apiAddr := range a.peers { if apiAddr == a.advertiseAddress { continue } if _, ok := uninitialized[apiAddr]; !ok { ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() res, err = a.grpcClientPool.get(apiAddr).Members(ctx, &internal.MembersRequest{}) if err != nil { return } a.log.Debugf("Get Members: %+v", res.GetMembers()) for replicaID, hostID := range res.GetMembers() { members[replicaID] = hostID if hostID == a.host.ID() { a.replicaConfig.ReplicaID = replicaID break } } break } } if a.replicaConfig.ReplicaID > 0 && len(members) == len(a.peers) { break } } a.clock.Sleep(waitPeriod) } a.log.Debugf(`Init: %v, Members: %+v`, init, members) return } // startPrimeReplica starts the prime replica func (a *Agent) startPrimeReplica(members map[uint64]string, join bool) (err error) { // a.log.Debugf("Starting Replica %+v (%v)", members, join) err = a.host.StartReplica(members, join, fsmFactory(a), a.replicaConfig) if err == dragonboat.ErrShardAlreadyExist { a.log.Infof("Shard already exists %+v (%v) %+v", members, join, a.replicaConfig) err = nil } if err != nil { err = fmt.Errorf(`startPrimeReplica: %w`, err) return } err = a.index(a.ctx, a.replicaConfig.ShardID) if err != nil { return } return } // joinPrimeShard requests host be added to prime shard func (a *Agent) joinPrimeShard() (replicaID uint64, err error) { a.log.Debugf("Joining prime shard") var res *internal.JoinResponse for _, peerApiAddr := range a.peers { ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() res, err = a.grpcClientPool.get(peerApiAddr).Join(ctx, &internal.JoinRequest{ HostId: a.hostID(), IsNonVoting: a.replicaConfig.IsNonVoting, }) if res != nil && res.Value > 0 { replicaID = res.Value break } } return } // updateHost adds host info to prime shard func (a *Agent) updateHost() (err error) { apiAddr, err := a.parseMeta(a.hostID()) if err != nil { return } shardTypes := keys(a.shardTypes) sort.Strings(shardTypes) cmd := newCmdHostPut(a.hostID(), apiAddr, a.hostConfig.RaftAddress, a.hostTags, HostStatus_Active, shardTypes) a.log.Debugf("Updating host: %s", string(cmd)) _, err = a.primePropose(cmd) return } // updateReplica sets prime shard replica to active func (a *Agent) updateReplica() (err error) { _, err = a.primePropose(newCmdReplicaUpdateStatus(a.replicaConfig.ReplicaID, ReplicaStatus_Active)) if err != nil { err = fmt.Errorf("Failed to update replica status: %w", err) } return } // index blocks until it can read from a shard, indicating that the local replica is up to date. func (a *Agent) index(ctx context.Context, shardID uint64) (err error) { var rs *dragonboat.RequestState for { rs, err = a.host.ReadIndex(shardID, raftTimeout) if err != nil || rs == nil { a.log.Infof(`[%05x] Error reading shard index: %s: %v`, shardID, a.hostID(), err) select { case <-ctx.Done(): return case <-a.clock.After(waitPeriod): } continue } res := <-rs.ResultC() rs.Release() if !res.Completed() { a.log.Infof(`[%05x] Waiting for other nodes`, shardID) select { case <-ctx.Done(): return case <-a.clock.After(waitPeriod): } continue } break } return } func (a *Agent) joinPrimeReplica(hostID string, shardID uint64, isNonVoting bool) (replicaID uint64, err error) { var ok bool var host Host a.State(a.ctx, func(s *State) { host, ok = s.Host(hostID) if !ok { return } }) if host.ID == "" { host, err = a.primeAddHost(hostID) if err != nil { return } } a.State(a.ctx, func(s *State) { s.ReplicaIterateByHostID(host.ID, func(r Replica) bool { if r.ShardID == shardID { replicaID = r.ID return false } return true }) }) if replicaID == 0 { if replicaID, err = a.primeAddReplica(hostID, isNonVoting); err != nil { return } } return a.joinShardReplica(hostID, shardID, replicaID, isNonVoting) } func (a *Agent) joinShardReplica(hostID string, shardID, replicaID uint64, isNonVoting bool) (res uint64, err error) { ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() m, err := a.host.SyncGetShardMembership(ctx, shardID) if err != nil { return } ctx, cancel = context.WithTimeout(context.Background(), raftTimeout) defer cancel() if isNonVoting { if _, ok := m.NonVotings[replicaID]; ok { return replicaID, nil } err = a.host.SyncRequestAddNonVoting(ctx, shardID, replicaID, hostID, m.ConfigChangeID) } else { if _, ok := m.Nodes[replicaID]; ok { return replicaID, nil } err = a.host.SyncRequestAddReplica(ctx, shardID, replicaID, hostID, m.ConfigChangeID) } if err != nil { return } return replicaID, nil } func (a *Agent) parseMeta(nhid string) (apiAddr string, err error) { reg, ok := a.host.GetNodeHostRegistry() if !ok { err = fmt.Errorf("Unable to retrieve HostRegistry") return } meta, ok := reg.GetMeta(nhid) if !ok { err = fmt.Errorf("Unable to retrieve node host meta (%s)", nhid) return } parts := bytes.Split(meta, []byte(`|`)) apiAddr = string(parts[0]) return } func (a *Agent) primePropose(cmd []byte) (Result, error) { ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() return a.host.SyncPropose(ctx, a.host.GetNoOPSession(a.replicaConfig.ShardID), cmd) } // primeInit proposes addition of initial cluster state to prime shard func (a *Agent) primeInit(members map[uint64]string) (err error) { _, err = a.primePropose(newCmdShardPut(Shard{ ID: a.replicaConfig.ShardID, Name: "zongzi", Type: projectUri, })) if err != nil { return } toAdd := make([]string, len(members)) for replicaID, nhid := range members { _, err = a.primeAddHost(nhid) if err != nil { return } toAdd[replicaID-1] = nhid } for _, nhid := range toAdd { if _, err = a.primeAddReplica(nhid, false); err != nil { return } } return } // primeInitAwait pauses non-initializers until prime shard is initialized func (a *Agent) primeInitAwait() (err error) { for { var found bool err = a.State(a.ctx, func(s *State) { s.ReplicaIterateByHostID(a.hostID(), func(r Replica) bool { found = true return false }) }) if err != nil || found { break } a.clock.Sleep(100 * time.Millisecond) } return } // primeAddHost proposes addition of host metadata to the prime shard state func (a *Agent) primeAddHost(nhid string) (host Host, err error) { addr, err := a.parseMeta(nhid) if err != nil { return } cmd := newCmdHostPut(nhid, addr, "", nil, HostStatus_New, nil) _, err = a.primePropose(cmd) if err != nil { return } host = Host{ ID: nhid, Status: HostStatus_New, } return } // primeAddReplica proposes addition of replica metadata to the prime shard state func (a *Agent) primeAddReplica(nhid string, isNonVoting bool) (id uint64, err error) { res, err := a.primePropose(newCmdReplicaPost(nhid, a.replicaConfig.ShardID, isNonVoting)) if err == nil { id = res.Value } return } // findLocalReplicaID proposes addition of replica metadata to the prime shard state func (a *Agent) findLocalReplicaID(shardID uint64) (id uint64) { nhInfo := a.host.GetNodeHostInfo(dragonboat.NodeHostInfoOption{}) for _, info := range nhInfo.LogInfo { if info.ShardID == shardID { return info.ReplicaID } } return } func (a *Agent) dumpState() { a.State(nil, func(state *State) { // Print snapshot buf := bytes.NewBufferString("") state.Save(buf) a.log.Debugf(buf.String()) }) }
package zongzi import ( "fmt" "strings" "github.com/lni/dragonboat/v4/config" ) type AgentOption func(*Agent) error func WithApiAddress(advertiseAddress string, bindAddress ...string) AgentOption { return func(a *Agent) error { a.advertiseAddress = advertiseAddress if len(bindAddress) > 0 { a.bindAddress = bindAddress[0] } else { a.bindAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(advertiseAddress, ":")[1]) } return nil } } func WithGossipAddress(advertiseAddress string, bindAddress ...string) AgentOption { return func(a *Agent) error { a.hostConfig.Gossip.AdvertiseAddress = advertiseAddress if len(bindAddress) > 0 { a.hostConfig.Gossip.BindAddress = bindAddress[0] } else { a.hostConfig.Gossip.BindAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(advertiseAddress, ":")[1]) } return nil } } func WithRaftAddress(raftAddress string, listenAddress ...string) AgentOption { return func(a *Agent) error { a.hostConfig.RaftAddress = raftAddress if len(listenAddress) > 0 { a.hostConfig.ListenAddress = listenAddress[0] } else { a.hostConfig.ListenAddress = fmt.Sprintf("0.0.0.0:%s", strings.Split(raftAddress, ":")[1]) } return nil } } func WithRaftDir(dir string) AgentOption { return func(a *Agent) error { a.hostConfig.NodeHostDir = dir return nil } } func WithWALDir(dir string) AgentOption { return func(a *Agent) error { a.hostConfig.WALDir = dir return nil } } func WithHostConfig(cfg HostConfig) AgentOption { return func(a *Agent) error { if len(cfg.Gossip.AdvertiseAddress) == 0 && len(a.hostConfig.Gossip.AdvertiseAddress) > 0 { cfg.Gossip.AdvertiseAddress = a.hostConfig.Gossip.AdvertiseAddress } if len(cfg.Gossip.BindAddress) == 0 && len(a.hostConfig.Gossip.BindAddress) > 0 { cfg.Gossip.BindAddress = a.hostConfig.Gossip.BindAddress } if len(cfg.RaftAddress) == 0 && len(a.hostConfig.RaftAddress) > 0 { cfg.RaftAddress = a.hostConfig.RaftAddress } cfg.Expert.LogDBFactory = DefaultHostConfig.Expert.LogDBFactory cfg.Expert.LogDB = DefaultHostConfig.Expert.LogDB a.hostConfig = cfg return nil } } type HostMemory = LogDBConfig // WithHostMemory sets the maximum memory alloted to the raft log // // zongzi.WithHostMemory(zongzi.HostMemory256) func WithHostMemoryLimit(limit HostMemory) AgentOption { return func(a *Agent) error { a.hostConfig.Expert.LogDB = limit return nil } } var ( // HostMemory256 can be used to set max log memory usage to 256 MB HostMemory256 = config.GetTinyMemLogDBConfig() // HostMem1024 can be used to set max log memory usage to 1 GB HostMemory1024 = config.GetSmallMemLogDBConfig() // HostMem4096 can be used to set max log memory usage to 4 GB HostMemory4096 = config.GetMediumMemLogDBConfig() // HostMem8192 can be used to set max log memory usage to 8 GB (default) HostMemory8192 = config.GetLargeMemLogDBConfig() ) func WithRaftEventListener(listener RaftEventListener) AgentOption { return func(a *Agent) error { a.hostConfig.RaftEventListener = listener return nil } } func WithSystemEventListener(listener SystemEventListener) AgentOption { return func(a *Agent) error { a.hostConfig.SystemEventListener = listener return nil } } func WithHostTags(tags ...string) AgentOption { return func(a *Agent) error { a.hostTags = tags return nil } } func WithReplicaConfig(cfg ReplicaConfig) AgentOption { return func(a *Agent) error { a.replicaConfig = cfg return nil } } func WithShardController(c Controller) AgentOption { return func(a *Agent) error { a.controllerManager.controller = c return nil } }
package zongzi import ( "encoding/json" "fmt" "io" "github.com/lni/dragonboat/v4/logger" "github.com/lni/dragonboat/v4/statemachine" ) func fsmFactory(agent *Agent) statemachine.CreateStateMachineFunc { return func(shardID, replicaID uint64) statemachine.IStateMachine { node := &fsm{ log: agent.log, replicaID: replicaID, shardID: shardID, state: newFsmStateRadix(), } agent.fsm = node return node } } var _ statemachine.IStateMachine = (*fsm)(nil) type fsm struct { log logger.ILogger replicaID uint64 shardID uint64 state *State } func (fsm *fsm) Update(entry Entry) (Result, error) { var err error var cmd any var cmdBase command if err = json.Unmarshal(entry.Cmd, &cmdBase); err != nil { fsm.log.Errorf("Invalid entry %d - %v, %v", entry.Index, string(entry.Cmd), err) return entry.Result, nil } switch cmdBase.Type { case command_type_host: var cmdHost commandHost if err = json.Unmarshal(entry.Cmd, &cmdHost); err != nil { fsm.log.Errorf("Invalid host cmd %#v, %v", entry, err) break } cmd = cmdHost case command_type_shard: var cmdShard commandShard if err = json.Unmarshal(entry.Cmd, &cmdShard); err != nil { fsm.log.Errorf("Invalid shard cmd %#v, %v", entry, err) break } cmd = cmdShard case command_type_replica: var cmdReplica commandReplica if err = json.Unmarshal(entry.Cmd, &cmdReplica); err != nil { fsm.log.Errorf("Invalid replica cmd %#v, %v", entry, err) break } cmd = cmdReplica default: fsm.log.Errorf("Unrecognized cmd type %s", cmdBase.Type, cmdBase) } state := fsm.state.withTxn(true) defer state.commit() // fsm.log.Debugf(`Update: %d %d %s`, entry.Index, state.Index(), string(entry.Cmd)) switch cmd.(type) { // Host case commandHost: var cmd = cmd.(commandHost) switch cmd.Action { // Put case command_action_put: if old, ok := state.Host(cmd.Host.ID); ok { cmd.Host.Created = old.Created fsm.tagsSetNX(cmd.Host.Tags, old.Tags) } else { cmd.Host.Created = entry.Index state.setHostID(cmd.Host.ID) } cmd.Host.Updated = entry.Index state.ReplicaIterateByHostID(cmd.Host.ID, func(r Replica) bool { state.shardTouch(r.ShardID, entry.Index) return true }) state.hostPut(cmd.Host) entry.Result.Value = 1 // Delete case command_action_del: host, ok := state.Host(cmd.Host.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrHostNotFound, cmd) break } state.ReplicaIterateByHostID(host.ID, func(r Replica) bool { state.replicaDelete(r) state.shardTouch(r.ShardID, entry.Index) return true }) state.hostDelete(host) entry.Result.Value = 1 // Tags case command_action_tags_set: entry.Result.Value = fsm.hostTagAction(fsm.tagsSet, state, entry, cmd) case command_action_tags_setnx: entry.Result.Value = fsm.hostTagAction(fsm.tagsSetNX, state, entry, cmd) case command_action_tags_remove: entry.Result.Value = fsm.hostTagAction(fsm.tagsRemove, state, entry, cmd) default: fsm.log.Errorf("Unrecognized host action: %s - %#v", cmd.Action, cmd) } // Shard case commandShard: var cmd = cmd.(commandShard) switch cmd.Action { // Post case command_action_post: if cmd.Shard.Name != "" { if _, ok := state.ShardFindByName(cmd.Shard.Name); ok { err := fmt.Errorf("%s: %s", ErrShardExists, cmd.Shard.Name) fsm.log.Errorf(err.Error()) entry.Result.Data = []byte(err.Error()) break } } cmd.Shard.ID = state.shardIncr() cmd.Shard.Created = entry.Index cmd.Shard.Updated = entry.Index state.shardPut(cmd.Shard) entry.Result.Value = cmd.Shard.ID // Put case command_action_put: if old, ok := state.Shard(cmd.Shard.ID); ok { cmd.Shard.Created = old.Created fsm.tagsSetNX(cmd.Shard.Tags, old.Tags) } else if cmd.Shard.ID == 0 && state.ShardID() == 0 { // Special case for prime shard (0) cmd.Shard.Created = entry.Index } else { fsm.log.Errorf("%s: %s - %#v", ErrShardNotFound, cmd.Action, cmd) break } cmd.Shard.Updated = entry.Index state.shardPut(cmd.Shard) state.ReplicaIterateByShardID(cmd.Shard.ID, func(r Replica) bool { state.hostTouch(r.HostID, entry.Index) return true }) entry.Result.Value = cmd.Shard.ID // Status Update case command_action_status_update: shard, ok := state.Shard(cmd.Shard.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd) break } shard.Status = cmd.Shard.Status shard.Updated = entry.Index state.shardPut(shard) state.ReplicaIterateByShardID(shard.ID, func(r Replica) bool { state.hostTouch(r.HostID, entry.Index) return true }) entry.Result.Value = 1 // Set Leader case command_action_leader_set: shard, ok := state.Shard(cmd.Shard.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd) break } shard.Leader = cmd.Shard.Leader shard.Term = cmd.Shard.Term shard.Updated = entry.Index state.shardPut(shard) entry.Result.Value = 1 // Delete case command_action_del: shard, ok := state.Shard(cmd.Shard.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd) break } state.ReplicaIterateByShardID(shard.ID, func(r Replica) bool { state.replicaDelete(r) state.hostTouch(r.HostID, entry.Index) return true }) state.shardDelete(shard) entry.Result.Value = 1 // Tags case command_action_tags_set: entry.Result.Value = fsm.shardTagAction(fsm.tagsSet, state, entry, cmd) case command_action_tags_setnx: entry.Result.Value = fsm.shardTagAction(fsm.tagsSetNX, state, entry, cmd) case command_action_tags_remove: entry.Result.Value = fsm.shardTagAction(fsm.tagsRemove, state, entry, cmd) default: fsm.log.Errorf("Unrecognized shard action: %s - %#v", cmd.Action, cmd) } // Replica case commandReplica: var cmd = cmd.(commandReplica) switch cmd.Action { // Post case command_action_post: cmd.Replica.ID = state.replicaIncr() fsm.log.Infof(`command_action_post - %d - %s - %v`, cmd.Replica.ID, cmd.Replica.HostID, cmd.Replica.IsNonVoting) cmd.Replica.Created = entry.Index cmd.Replica.Updated = entry.Index if !cmd.Replica.IsNonVoting && len(state.ShardMembers(cmd.Replica.ShardID)) < 3 { cmd.Replica.Status = ReplicaStatus_Bootstrapping } else { cmd.Replica.Status = ReplicaStatus_Joining } state.replicaPut(cmd.Replica) state.hostTouch(cmd.Replica.HostID, entry.Index) state.shardTouch(cmd.Replica.ShardID, entry.Index) entry.Result.Value = cmd.Replica.ID // Status Update case command_action_status_update: replica, ok := state.Replica(cmd.Replica.ID) if !ok { fsm.log.Warningf("%v: %#v %#v", ErrReplicaNotFound, cmd, replica) break } replica.Status = cmd.Replica.Status state.replicaPut(replica) state.hostTouch(replica.HostID, entry.Index) state.shardTouch(replica.ShardID, entry.Index) entry.Result.Value = 1 // Put case command_action_put: if old, ok := state.Replica(cmd.Replica.ID); ok { cmd.Replica.Created = old.Created fsm.tagsSetNX(cmd.Replica.Tags, old.Tags) } else { fsm.log.Errorf("%s: %s - %#v", ErrReplicaNotFound, cmd.Action, cmd) break } cmd.Replica.Updated = entry.Index state.replicaPut(cmd.Replica) state.hostTouch(cmd.Replica.HostID, entry.Index) state.shardTouch(cmd.Replica.ShardID, entry.Index) entry.Result.Value = 1 // Delete case command_action_del: replica, ok := state.Replica(cmd.Replica.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrReplicaNotFound, cmd) break } state.hostTouch(replica.HostID, entry.Index) state.shardTouch(replica.ShardID, entry.Index) state.replicaDelete(replica) entry.Result.Value = 1 // Tags case command_action_tags_set: entry.Result.Value = fsm.replicaTagAction(fsm.tagsSet, state, entry, cmd) case command_action_tags_setnx: entry.Result.Value = fsm.replicaTagAction(fsm.tagsSetNX, state, entry, cmd) case command_action_tags_remove: entry.Result.Value = fsm.replicaTagAction(fsm.tagsRemove, state, entry, cmd) default: fsm.log.Errorf("Unrecognized replica action: %s - %#v", cmd.Action, cmd) } } state.metaSetIndex(entry.Index) return entry.Result, nil } // func (fsm *fsm) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) (err error) { func (fsm *fsm) SaveSnapshot(w io.Writer, _ statemachine.ISnapshotFileCollection, _ <-chan struct{}) error { return fsm.state.Save(w) } // func (fsm *fsm) RecoverFromSnapshot(r io.Reader, close <-chan struct{}) (err error) { func (fsm *fsm) RecoverFromSnapshot(r io.Reader, _ []statemachine.SnapshotFile, _ <-chan struct{}) error { return fsm.state.recover(r) } func (fsm *fsm) Lookup(interface{}) (res interface{}, err error) { return } func (fsm *fsm) Close() (err error) { return } func (fsm *fsm) hostTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandHost) (val uint64) { host, ok := state.Host(cmd.Host.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrHostNotFound, cmd) return } if fn(host.Tags, cmd.Host.Tags) { state.ReplicaIterateByHostID(cmd.Host.ID, func(r Replica) bool { state.shardTouch(r.ShardID, entry.Index) return true }) state.hostPut(host) val = 1 } return } func (fsm *fsm) shardTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandShard) (val uint64) { shard, ok := state.Shard(cmd.Shard.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd) return } if fn(shard.Tags, cmd.Shard.Tags) { state.ReplicaIterateByShardID(cmd.Shard.ID, func(r Replica) bool { state.hostTouch(r.HostID, entry.Index) return true }) state.shardPut(shard) val = 1 } return } func (fsm *fsm) replicaTagAction(fn func(map[string]string, map[string]string) bool, state *State, entry Entry, cmd commandReplica) (val uint64) { replica, ok := state.Replica(cmd.Replica.ID) if !ok { fsm.log.Warningf("%v: %#v", ErrReplicaNotFound, cmd) return } if fn(replica.Tags, cmd.Replica.Tags) { state.hostTouch(replica.HostID, entry.Index) state.shardTouch(replica.ShardID, entry.Index) state.replicaPut(replica) val = 1 } return } func (fsm *fsm) tagsSet(new, old map[string]string) (written bool) { for k, v := range old { new[k] = v written = true } return } func (fsm *fsm) tagsSetNX(new, old map[string]string) (written bool) { for k, v := range old { if _, ok := new[k]; !ok { new[k] = v written = true } } return } func (fsm *fsm) tagsRemove(new, old map[string]string) (written bool) { for k := range old { delete(new, k) written = true } return }
package zongzi import ( "encoding/json" "strings" ) const ( command_type_host = "host" command_type_replica = "replica" command_type_shard = "shard" command_type_snapshot = "snapshot" command_action_del = "del" command_action_put = "put" command_action_post = "post" command_action_status_update = "status-update" command_action_tags_set = "tags-set" command_action_tags_setnx = "tags-setnx" command_action_tags_remove = "tags-remove" command_action_leader_set = "leader-set" ) type command struct { Action string `json:"action"` Type string `json:"type"` } type commandHost struct { command Host Host `json:"host"` } type commandShard struct { command Shard Shard `json:"shard"` } type commandReplica struct { command Replica Replica `json:"replica"` } func newCmdHostPut(nhid, apiAddr, raftAddr string, tagList []string, status HostStatus, shardTypes []string) (b []byte) { b, _ = json.Marshal(commandHost{command{ Action: command_action_put, Type: command_type_host, }, Host{ ApiAddress: apiAddr, ID: nhid, Tags: tagMapFromList(tagList), RaftAddress: raftAddr, ShardTypes: shardTypes, Status: status, }}) return } func newCmdHostDel(nhid string) (b []byte) { b, _ = json.Marshal(commandHost{command{ Action: command_action_del, Type: command_type_host, }, Host{ ID: nhid, }}) return } func newCmdShardPost(s Shard) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_post, Type: command_type_shard, }, s}) return } func newCmdShardPut(s Shard) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_put, Type: command_type_shard, }, s}) return } func newCmdShardStatusUpdate(id uint64, status ShardStatus) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_status_update, Type: command_type_shard, }, Shard{ ID: id, Status: status, }}) return } func newCmdShardLeaderSet(shardID, replicaID, term uint64) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_leader_set, Type: command_type_shard, }, Shard{ ID: shardID, Leader: replicaID, Term: term, }}) return } func newCmdShardDel(shardID uint64) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_del, Type: command_type_shard, }, Shard{ ID: shardID, }}) return } func newCmdReplicaPost(nhid string, shardID uint64, isNonVoting bool) (b []byte) { b, _ = json.Marshal(commandReplica{command{ Action: command_action_post, Type: command_type_replica, }, Replica{ HostID: nhid, IsNonVoting: isNonVoting, ShardID: shardID, Status: ReplicaStatus_New, }}) return } func newCmdReplicaPut(nhid string, shardID, replicaID uint64, isNonVoting bool) (b []byte) { b, _ = json.Marshal(commandReplica{command{ Action: command_action_put, Type: command_type_replica, }, Replica{ HostID: nhid, ID: replicaID, ShardID: shardID, IsNonVoting: isNonVoting, }}) return } func newCmdReplicaDelete(replicaID uint64) (b []byte) { b, _ = json.Marshal(commandReplica{command{ Action: command_action_del, Type: command_type_replica, }, Replica{ ID: replicaID, }}) return } func newCmdReplicaUpdateStatus(replicaID uint64, status ReplicaStatus) (b []byte) { b, _ = json.Marshal(commandReplica{command{ Action: command_action_status_update, Type: command_type_replica, }, Replica{ ID: replicaID, Status: status, }}) return } func newCmdTagsSetNX(subject any, tagList ...string) []byte { return newCmdTags(command_action_tags_setnx, subject, tagList) } func newCmdTagsSet(subject any, tagList ...string) []byte { return newCmdTags(command_action_tags_set, subject, tagList) } func newCmdTagsRemove(subject any, tagList ...string) []byte { return newCmdTags(command_action_tags_remove, subject, tagList) } func newCmdTags(action string, subject any, tagList []string) (b []byte) { switch subject.(type) { case Host: b, _ = json.Marshal(commandHost{command{ Action: action, Type: command_type_host, }, Host{ ID: subject.(Host).ID, Tags: tagMapFromList(tagList), }}) case Shard: b, _ = json.Marshal(commandShard{command{ Action: action, Type: command_type_shard, }, Shard{ ID: subject.(Shard).ID, Tags: tagMapFromList(tagList), }}) case Replica: b, _ = json.Marshal(commandReplica{command{ Action: action, Type: command_type_replica, }, Replica{ ID: subject.(Replica).ID, Tags: tagMapFromList(tagList), }}) } return } // tagMapFromList converts a list of tags to a map of key (namespace:predicate) and value // // ["geo:region=us-west-1"] -> {"geo:region": "us-west-1"} func tagMapFromList(tagList []string) map[string]string { var m = map[string]string{} for _, ts := range tagList { if i := strings.Index(ts, "="); i >= 0 { m[ts[:i]] = ts[i+1:] } else { m[ts] = "" } } return m }
package zongzi import ( "encoding/json" "fmt" "io" "github.com/hashicorp/go-memdb" ) // State represents internal state. // All public methods are read only. type State struct { db *memdb.MemDB txn *memdb.Txn } func newFsmStateRadix() *State { metaSchema := &memdb.TableSchema{ Name: "meta", Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", Unique: true, Indexer: &memdb.StringFieldIndex{Field: "Key"}, }, }, } hostSchema := &memdb.TableSchema{ Name: "host", Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", Unique: true, Indexer: &memdb.UUIDFieldIndex{Field: "ID"}, }, "RaftAddress": { Name: "RaftAddress", Unique: false, AllowMissing: true, Indexer: &memdb.StringFieldIndex{Field: "RaftAddress"}, }, "Tags": { Name: "Tags", Unique: false, AllowMissing: true, Indexer: &memdb.StringMapFieldIndex{Field: "Tags"}, }, "Updated": { Name: "Updated", Unique: false, AllowMissing: true, Indexer: &memdb.UintFieldIndex{Field: "Updated"}, }, }, } shardSchema := &memdb.TableSchema{ Name: "shard", Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", Unique: true, Indexer: &memdb.UintFieldIndex{Field: "ID"}, }, "Name": { Name: "Name", Unique: true, AllowMissing: true, Indexer: &memdb.StringFieldIndex{Field: "Name"}, }, "Tags": { Name: "Tags", Unique: false, AllowMissing: true, Indexer: &memdb.StringMapFieldIndex{Field: "Tags"}, }, "Updated": { Name: "Updated", Unique: false, AllowMissing: true, Indexer: &memdb.UintFieldIndex{Field: "Updated"}, }, }, } replicaSchema := &memdb.TableSchema{ Name: "replica", Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", Unique: true, Indexer: &memdb.UintFieldIndex{Field: "ID"}, }, "HostID": { Name: "HostID", Unique: false, Indexer: &memdb.UUIDFieldIndex{Field: "HostID"}, }, "ShardID": { Name: "ShardID", Unique: false, Indexer: &memdb.UintFieldIndex{Field: "ShardID"}, }, "Tags": { Name: "Tags", Unique: false, AllowMissing: true, Indexer: &memdb.StringMapFieldIndex{Field: "Tags"}, }, }, } db, err := memdb.NewMemDB(&memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ "meta": metaSchema, "host": hostSchema, "shard": shardSchema, "replica": replicaSchema, }, }) if err != nil { panic(err) } return &State{ db: db, } } type metaValue struct { Key string Val uint64 Data []byte } func (fsm *State) withTxn(write bool) *State { return &State{fsm.db, fsm.db.Txn(write)} } func (fsm *State) commit() { fsm.txn.Commit() } func (fsm *State) rollback() { fsm.txn.Abort() } func (fsm *State) metaSet(key string, val uint64, data []byte) { err := fsm.txn.Insert(`meta`, metaValue{key, val, data}) if err != nil { panic(err) } } func (fsm *State) metaSetIndex(val uint64) { err := fsm.txn.Insert(`meta`, metaValue{`index`, val, nil}) if err != nil { panic(err) } } func (fsm *State) metaGet(key string) (val uint64, data []byte) { res, _ := fsm.txn.First(`meta`, `id`, key) if res != nil { val = res.(metaValue).Val data = res.(metaValue).Data } return } func (fsm *State) metaGetVal(key string) (val uint64) { val, _ = fsm.metaGet(key) return } func (fsm *State) metaGetDataString(key string) string { _, b := fsm.metaGet(key) return string(b) } func (fsm *State) Index() (val uint64) { return fsm.metaGetVal(`index`) } func (fsm *State) setHostID(id string) { fsm.metaSet(`hostID`, 0, []byte(id)) } // HostID returns the ID of the last created Host func (fsm *State) HostID() string { return fsm.metaGetDataString(`hostID`) } func (fsm *State) shardIncr() uint64 { id := fsm.metaGetVal(`shardID`) id++ fsm.metaSet(`shardID`, id, nil) return id } // ShardID returns the ID of the last created Shard func (fsm *State) ShardID() uint64 { return fsm.metaGetVal(`shardID`) } func (fsm *State) replicaIncr() uint64 { id := fsm.metaGetVal(`replicaID`) id++ fsm.metaSet(`replicaID`, id, nil) return id } // ReplicaID returns the ID of the last created Replica func (fsm *State) ReplicaID() uint64 { return fsm.metaGetVal(`replicaID`) } // Host returns the host with the specified ID or ok false if not found. func (fsm *State) Host(id string) (h Host, ok bool) { res, err := fsm.txn.First(`host`, `id`, id) if err != nil { panic(err) } if res != nil { h = res.(Host) ok = true } return } // HostIterate executes a callback for every host in the cluster. // Return true to continue iterating, false to stop. // // var hostCount int // agent.Read(func(s *zongzi.State) { // s.HostIterate(func(h Host) bool { // hostCount++ // return true // }) // }) func (fsm *State) HostIterate(fn func(h Host) bool) { iter, err := fsm.txn.Get(`host`, `id_prefix`, "") if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Host)) { break } } } // HostIterateByShardType executes a callback for every host in the cluster supporting to the provided shard type, // ordered by host id ascending. Return true to continue iterating, false to stop. func (fsm *State) HostIterateByShardType(shardType string, fn func(h Host) bool) { iter, err := fsm.txn.Get(`host`, `ShardTypes`, shardType) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Host)) { break } } } // HostIterateByTag executes a callback for every host in the cluster matching the specified tag, // ordered by host id ascending. Return true to continue iterating, false to stop. func (fsm *State) HostIterateByTag(tag string, fn func(h Host) bool) { iter, err := fsm.txn.Get(`host`, `Tags`, tag) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Host)) { break } } } func (fsm *State) hostPut(h Host) { err := fsm.txn.Insert(`host`, h) if err != nil { panic(err) } } func (fsm *State) hostDelete(h Host) { err := fsm.txn.Delete(`host`, h) if err != nil && err != memdb.ErrNotFound { panic(err) } } func (fsm *State) hostTouch(id string, index uint64) { h, ok := fsm.Host(id) if !ok { fsm.HostIterate(func(h Host) bool { fmt.Println(h.ID, h.Updated) return true }) panic(`Host not found "` + id + fmt.Sprintf(`" %#v`, h)) } h.Updated = index fsm.hostPut(h) } func (fsm *State) hostByRaftAddress(raftAddress string) (h Host, ok bool) { res, err := fsm.txn.First(`host`, `RaftAddress`, raftAddress) if err != nil { panic(err) } if res != nil { h = res.(Host) ok = true } return } // Shard returns the shard with the specified id or ok false if not found. func (fsm *State) Shard(id uint64) (s Shard, ok bool) { res, err := fsm.txn.First(`shard`, `id`, id) if err != nil { panic(err) } if res != nil { s = res.(Shard) ok = true } return } // ShardFindByName returns the shard with the specified name or ok false if not found. func (fsm *State) ShardFindByName(name string) (s Shard, ok bool) { res, err := fsm.txn.First(`shard`, `Name`, name) if err != nil { panic(err) } if res != nil { s = res.(Shard) ok = true } return } func (fsm *State) shardPut(s Shard) { err := fsm.txn.Insert(`shard`, s) if err != nil { panic(err) } } func (fsm *State) shardDelete(s Shard) { err := fsm.txn.Delete(`shard`, s) if err != nil && err != memdb.ErrNotFound { panic(err) } } func (fsm *State) shardTouch(id, index uint64) { if s, ok := fsm.Shard(id); ok { s.Updated = index fsm.shardPut(s) } } // ShardIterate executes a callback for every shard in the cluster ordered by shard id ascending. // Return true to continue iterating, false to stop. func (fsm *State) ShardIterate(fn func(s Shard) bool) { iter, err := fsm.txn.LowerBound(`shard`, `id`, uint64(0)) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Shard)) { break } } } // ShardIterateByTag executes a callback for every shard in the cluster matching the specified tag, // ordered by shard id ascending. Return true to continue iterating, false to stop. func (fsm *State) ShardIterateByTag(tag string, fn func(r Shard) bool) { iter, err := fsm.txn.Get(`shard`, `Tags`, tag) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Shard)) { break } } } // ShardIterateUpdatedAfter executes a callback for every shard in the cluster having an updated index // greater than the supplied index func (fsm *State) ShardIterateUpdatedAfter(index uint64, fn func(r Shard) bool) { iter, err := fsm.txn.LowerBound(`shard`, `Updated`, index+1) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Shard)) { break } } } // ShardMembers returns a map of shard members (replicaID: hostID) func (fsm *State) ShardMembers(id uint64) map[uint64]string { members := map[uint64]string{} fsm.ReplicaIterateByShardID(id, func(r Replica) bool { if !r.IsNonVoting { members[r.ID] = r.HostID } return true }) return members } // Replica returns the replica with the specified id or ok false if not found. func (fsm *State) Replica(id uint64) (r Replica, ok bool) { res, err := fsm.txn.First(`replica`, `id`, id) if err != nil { panic(err) } if res != nil { r = res.(Replica) ok = true } return } func (fsm *State) replicaPut(r Replica) { err := fsm.txn.Insert(`replica`, r) if err != nil { panic(err) } } func (fsm *State) replicaDelete(r Replica) { err := fsm.txn.Delete(`replica`, r) if err != nil && err != memdb.ErrNotFound { panic(err) } } func (fsm *State) replicaTouch(id, index uint64) { if r, ok := fsm.Replica(id); ok { r.Updated = index fsm.replicaPut(r) } } // ReplicaIterate executes a callback for every replica in the cluster ordered by replica id ascending. // Return true to continue iterating, false to stop. func (fsm *State) ReplicaIterate(fn func(r Replica) bool) { iter, err := fsm.txn.LowerBound(`replica`, `id`, uint64(0)) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Replica)) { break } } } // ReplicaIterateByShardID executes a callback for every replica in the cluster belonging to the provided shard id, // ordered by replica id ascending. Return true to continue iterating, false to stop. func (fsm *State) ReplicaIterateByShardID(shardID uint64, fn func(r Replica) bool) { iter, err := fsm.txn.Get(`replica`, `ShardID`, shardID) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Replica)) { break } } } // ReplicaIterateByHostID executes a callback for every replica in the cluster belonging to the provided host id, // ordered by replica id ascending. Return true to continue iterating, false to stop. func (fsm *State) ReplicaIterateByHostID(hostID string, fn func(r Replica) bool) { iter, err := fsm.txn.Get(`replica`, `HostID`, hostID) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Replica)) { break } } } // ReplicaIterateByTag executes a callback for every host in the cluster matching the specified tag, // ordered by host id ascending. Return true to continue iterating, false to stop. func (fsm *State) ReplicaIterateByTag(tag string, fn func(r Replica) bool) { iter, err := fsm.txn.Get(`replica`, `Tags`, tag) if err != nil { panic(err) } for { res := iter.Next() if res == nil || !fn(res.(Replica)) { break } } } type fsmStateMetaHeader struct { Index uint64 `json:"index"` HostID string `json:"hostID"` Hosts uint64 `json:"hosts"` ReplicaID uint64 `json:"replicaID"` Replicas uint64 `json:"replicas"` ShardID uint64 `json:"shardID"` Shards uint64 `json:"shards"` } func (fsm *State) Save(w io.Writer) error { f := fsm.withTxn(false) header := fsmStateMetaHeader{ Index: f.metaGetVal(`index`), HostID: f.metaGetDataString(`hostID`), ShardID: f.metaGetVal(`shardID`), ReplicaID: f.metaGetVal(`replicaID`), } f.HostIterate(func(Host) bool { header.Hosts++ return true }) f.ShardIterate(func(Shard) bool { header.Shards++ return true }) f.ReplicaIterate(func(Replica) bool { header.Replicas++ return true }) b, _ := json.Marshal(header) w.Write(append(b, '\n')) f.HostIterate(func(h Host) bool { b, _ := json.Marshal(h) w.Write(append(b, '\n')) return true }) f.ShardIterate(func(s Shard) bool { b, _ := json.Marshal(s) w.Write(append(b, '\n')) return true }) f.ReplicaIterate(func(r Replica) bool { b, _ := json.Marshal(r) w.Write(append(b, '\n')) return true }) return nil } func (fsm *State) recover(r io.Reader) (err error) { f := fsm.withTxn(true) defer f.commit() decoder := json.NewDecoder(r) var header fsmStateMetaHeader if err := decoder.Decode(&header); err != nil { return err } f.metaSet(`index`, header.Index, nil) f.metaSet(`hostID`, 0, []byte(header.HostID)) f.metaSet(`shardID`, header.ShardID, nil) f.metaSet(`replicaID`, header.ReplicaID, nil) var i uint64 for i = 0; i < header.Hosts; i++ { var h Host if err = decoder.Decode(&h); err != nil { return fmt.Errorf("parse host: %w", err) } f.hostPut(h) } for i = 0; i < header.Shards; i++ { var s Shard if err = decoder.Decode(&s); err != nil { return fmt.Errorf("parse shard: %w", err) } f.shardPut(s) } for i = 0; i < header.Replicas; i++ { var r Replica if err = decoder.Decode(&r); err != nil { return fmt.Errorf("parse replica: %w", err) } f.replicaPut(r) } return nil }
package zongzi import ( "context" "google.golang.org/grpc" "github.com/logbn/zongzi/internal" ) type grpcClientErr struct { err error } func (c *grpcClientErr) Ping(ctx context.Context, in *internal.PingRequest, opts ...grpc.CallOption) (*internal.PingResponse, error) { return nil, c.err } func (c *grpcClientErr) Probe(ctx context.Context, in *internal.ProbeRequest, opts ...grpc.CallOption) (*internal.ProbeResponse, error) { return nil, c.err } func (c *grpcClientErr) Info(ctx context.Context, in *internal.InfoRequest, opts ...grpc.CallOption) (*internal.InfoResponse, error) { return nil, c.err } func (c *grpcClientErr) Members(ctx context.Context, in *internal.MembersRequest, opts ...grpc.CallOption) (*internal.MembersResponse, error) { return nil, c.err } func (c *grpcClientErr) Join(ctx context.Context, in *internal.JoinRequest, opts ...grpc.CallOption) (*internal.JoinResponse, error) { return nil, c.err } func (c *grpcClientErr) Add(ctx context.Context, in *internal.AddRequest, opts ...grpc.CallOption) (*internal.AddResponse, error) { return nil, c.err } func (c *grpcClientErr) Index(ctx context.Context, in *internal.IndexRequest, opts ...grpc.CallOption) (*internal.IndexResponse, error) { return nil, c.err } func (c *grpcClientErr) Commit(ctx context.Context, in *internal.CommitRequest, opts ...grpc.CallOption) (*internal.CommitResponse, error) { return nil, c.err } func (c *grpcClientErr) Apply(ctx context.Context, in *internal.ApplyRequest, opts ...grpc.CallOption) (*internal.ApplyResponse, error) { return nil, c.err } func (c *grpcClientErr) Read(ctx context.Context, in *internal.ReadRequest, opts ...grpc.CallOption) (*internal.ReadResponse, error) { return nil, c.err } func (c *grpcClientErr) Watch(ctx context.Context, in *internal.WatchRequest, opts ...grpc.CallOption) (internal.Internal_WatchClient, error) { return nil, c.err }
package zongzi import ( "github.com/hashicorp/golang-lru/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/logbn/zongzi/internal" ) type grpcClientPool struct { clients *lru.Cache[string, grpcClientPoolEntry] dialOpts []grpc.DialOption } type grpcClientPoolEntry struct { client internal.InternalClient conn *grpc.ClientConn } func grpcClientPoolEvictFunc(addr string, e grpcClientPoolEntry) { e.conn.Close() } func newGrpcClientPool(size int, dialOpts ...grpc.DialOption) *grpcClientPool { clients, _ := lru.NewWithEvict[string, grpcClientPoolEntry](size, grpcClientPoolEvictFunc) return &grpcClientPool{ clients: clients, dialOpts: dialOpts, } } func (c *grpcClientPool) get(addr string) (client internal.InternalClient) { e, ok := c.clients.Get(addr) if ok { return e.client } // https://github.com/grpc/grpc-go/tree/master/examples/features/authentication // opts = append(opts, grpc.WithPerRPCCredentials(perRPC)) conn, err := grpc.Dial(addr, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), }, c.dialOpts...)...) if err != nil { return &grpcClientErr{err} } client = internal.NewInternalClient(conn) c.clients.Add(addr, grpcClientPoolEntry{client, conn}) return } func (c *grpcClientPool) remove(addr string) bool { return c.clients.Remove(addr) } func (c *grpcClientPool) Close() { c.clients.Purge() }
package zongzi import ( "context" "net" "sync" "time" "google.golang.org/grpc" "github.com/logbn/zongzi/internal" ) type grpcServer struct { internal.UnimplementedInternalServer agent *Agent server *grpc.Server listenAddr string serverOpts []grpc.ServerOption } func newGrpcServer(listenAddr string, opts ...grpc.ServerOption) *grpcServer { return &grpcServer{ listenAddr: listenAddr, serverOpts: opts, } } func (s *grpcServer) Ping(ctx context.Context, req *internal.PingRequest, ) (resp *internal.PingResponse, err error) { return &internal.PingResponse{}, nil } func (s *grpcServer) Probe(ctx context.Context, req *internal.ProbeRequest, ) (resp *internal.ProbeResponse, err error) { // s.agent.log.Debugf(`gRPC Req Probe: %#v`, req) return &internal.ProbeResponse{ GossipAdvertiseAddress: s.agent.hostConfig.Gossip.AdvertiseAddress, }, nil } func (s *grpcServer) Info(ctx context.Context, req *internal.InfoRequest, ) (resp *internal.InfoResponse, err error) { // s.agent.log.Debugf(`gRPC Req Info: %#v`, req) return &internal.InfoResponse{ HostId: s.agent.hostID(), ReplicaId: s.agent.replicaConfig.ReplicaID, }, nil } func (s *grpcServer) Members(ctx context.Context, req *internal.MembersRequest, ) (resp *internal.MembersResponse, err error) { // s.agent.log.Debugf(`gRPC Req Members: %#v`, req) return &internal.MembersResponse{ Members: s.agent.members, }, nil } func (s *grpcServer) Join(ctx context.Context, req *internal.JoinRequest, ) (resp *internal.JoinResponse, err error) { // s.agent.log.Debugf(`gRPC Req Join: %#v`, req) resp = &internal.JoinResponse{} if s.agent.Status() != AgentStatus_Ready { err = ErrAgentNotReady return } resp.Value, err = s.agent.joinPrimeReplica(req.HostId, s.agent.replicaConfig.ShardID, req.IsNonVoting) return } func (s *grpcServer) Add(ctx context.Context, req *internal.AddRequest, ) (resp *internal.AddResponse, err error) { // s.agent.log.Debugf(`gRPC Req Join: %#v`, req) resp = &internal.AddResponse{} if s.agent.Status() != AgentStatus_Ready { err = ErrAgentNotReady return } resp.Value, err = s.agent.joinShardReplica(req.HostId, req.ShardId, req.ReplicaId, req.IsNonVoting) return } var emptyCommitResponse = &internal.CommitResponse{} func (s *grpcServer) Commit(ctx context.Context, req *internal.CommitRequest, ) (resp *internal.CommitResponse, err error) { // s.agent.log.Debugf(`gRPC Req Propose: %#v`, req) if !s.agent.hostConfig.NotifyCommit { s.agent.log.Warningf(`%v`, ErrNotifyCommitDisabled) } if s.agent.Status() != AgentStatus_Ready { err = ErrAgentNotReady return } rs, err := s.agent.host.Propose(s.agent.host.GetNoOPSession(req.ShardId), req.Data, raftTimeout) if err != nil { return } defer rs.Release() for { select { case r := <-rs.ResultC(): if r.Committed() { resp = emptyCommitResponse } else if r.Aborted() { err = ErrAborted } else if r.Dropped() { err = ErrShardNotReady } else if r.Rejected() { err = ErrRejected } else if r.Terminated() { err = ErrShardClosed } else if r.Timeout() { err = ErrTimeout } case <-ctx.Done(): if ctx.Err() == context.Canceled { err = ErrCanceled } else if ctx.Err() == context.DeadlineExceeded { err = ErrTimeout } } if err != nil || resp != nil { break } } return } func (s *grpcServer) Apply(ctx context.Context, req *internal.ApplyRequest, ) (resp *internal.ApplyResponse, err error) { // s.agent.log.Debugf(`gRPC Req Propose: %#v`, req) if s.agent.Status() != AgentStatus_Ready { err = ErrAgentNotReady return } rs, err := s.agent.host.Propose(s.agent.host.GetNoOPSession(req.ShardId), req.Data, raftTimeout) if err != nil { return } defer rs.Release() for { select { case r := <-rs.ResultC(): if r.Completed() { resp = &internal.ApplyResponse{ Value: r.GetResult().Value, Data: r.GetResult().Data, } // Result cannot be released because ApplyResponse may not be serialized // This occurs as an optimization in hostClient for requests that do not require forwarding // ReleaseResult(r.GetResult()) } else if r.Aborted() { err = ErrAborted } else if r.Dropped() { err = ErrShardNotReady } else if r.Rejected() { err = ErrRejected } else if r.Terminated() { err = ErrShardClosed } else if r.Timeout() { err = ErrTimeout } case <-ctx.Done(): if ctx.Err() == context.Canceled { err = ErrCanceled } else if ctx.Err() == context.DeadlineExceeded { err = ErrTimeout } } if err != nil || resp != nil { break } } return } var emptyIndexResponse = &internal.IndexResponse{} func (s *grpcServer) Index(ctx context.Context, req *internal.IndexRequest, ) (resp *internal.IndexResponse, err error) { // s.agent.log.Debugf(`gRPC Req Query: %#v`, req) resp = emptyIndexResponse err = s.agent.index(ctx, req.ShardId) return } func (s *grpcServer) Read(ctx context.Context, req *internal.ReadRequest, ) (resp *internal.ReadResponse, err error) { // s.agent.log.Debugf(`gRPC Req Query: %#v`, req) resp = &internal.ReadResponse{} query := getLookupQuery() query.ctx = ctx query.data = req.Data defer query.Release() var r any if req.Stale { r, err = s.agent.host.StaleRead(req.ShardId, query) } else { ctx, cancel := context.WithTimeout(ctx, raftTimeout) defer cancel() r, err = s.agent.host.SyncRead(ctx, req.ShardId, query) } if result, ok := r.(*Result); ok && result != nil { resp.Value = result.Value resp.Data = result.Data // Result cannot be released because ReadResponse may not be serialized // This occurs as an optimization in hostClient for requests that do not require forwarding // ReleaseResult(result) } return } func (s *grpcServer) Watch(req *internal.WatchRequest, srv internal.Internal_WatchServer) (err error) { // s.agent.log.Debugf(`gRPC Req Query: %#v`, req) query := getWatchQuery() query.ctx = srv.Context() query.data = req.Data query.result = make(chan *Result) defer query.Release() var done = make(chan bool) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for { select { case result := <-query.result: err := srv.Send(&internal.WatchResponse{ Value: result.Value, Data: result.Data, }) if err != nil { s.agent.log.Errorf(`Error sending watch response: %s`, err.Error()) } // Result cannot be released because WatchResponse may not be serialized // This occurs as an optimization in hostClient for requests that do not require forwarding // ReleaseResult(result) case <-done: return } } }() if req.Stale { _, err = s.agent.host.StaleRead(req.ShardId, query) } else { _, err = s.agent.host.SyncRead(srv.Context(), req.ShardId, query) } close(done) wg.Wait() return } func (s *grpcServer) Start(a *Agent) error { // a.log.Errorf("Starting gRPC server on %s", s.listenAddr) s.agent = a lis, err := net.Listen("tcp", s.listenAddr) if err != nil { return err } // https://github.com/grpc/grpc-go/tree/master/examples/features/authentication // opts = append(opts, grpc.UnaryInterceptor(ensureValidToken)) s.server = grpc.NewServer(s.serverOpts...) internal.RegisterInternalServer(s.server, s) var done = make(chan bool) go func() { err = s.server.Serve(lis) close(done) }() select { case <-a.ctx.Done(): case <-done: } return err } func (s *grpcServer) Stop() { if s.server != nil { var ch = make(chan bool) go func() { s.server.GracefulStop() close(ch) }() select { case <-ch: case <-time.After(5 * time.Second): s.server.Stop() } } }
package zongzi import ( "context" "time" "github.com/benbjohnson/clock" "google.golang.org/grpc" "github.com/logbn/zongzi/internal" ) type hostClient struct { agent *Agent clock clock.Clock host Host } func newhostClient(a *Agent, host Host) hostClient { return hostClient{ agent: a, clock: clock.New(), host: host, } } func (c *hostClient) Ping(ctx context.Context) (t time.Duration, err error) { if c.host.ID == c.agent.hostID() { return } start := c.clock.Now() _, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Ping(ctx, &internal.PingRequest{}) t = c.clock.Since(start) return } func (c *hostClient) ReadIndex(ctx context.Context, shardID uint64) (err error) { if c.host.ID == c.agent.hostID() { return } _, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Index(ctx, &internal.IndexRequest{ShardId: shardID}) return } func (c *hostClient) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error) { var res *internal.ApplyResponse if c.host.ID == c.agent.hostID() { c.agent.log.Debugf(`gRPC hostClient Apply Local: %s`, string(cmd)) res, err = c.agent.grpcServer.Apply(ctx, &internal.ApplyRequest{ ShardId: shardID, Data: cmd, }) } else { c.agent.log.Debugf(`gRPC hostClient Apply Remote: %s`, string(cmd)) res, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Apply(ctx, &internal.ApplyRequest{ ShardId: shardID, Data: cmd, }) } if err != nil { return } value = res.Value data = res.Data return } func (c *hostClient) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error) { if c.host.ID == c.agent.hostID() { c.agent.log.Debugf(`gRPC hostClient Commit Local: %s`, string(cmd)) _, err = c.agent.grpcServer.Commit(ctx, &internal.CommitRequest{ ShardId: shardID, Data: cmd, }) } else { c.agent.log.Debugf(`gRPC hostClient Commit Remote: %s`, string(cmd)) _, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Commit(ctx, &internal.CommitRequest{ ShardId: shardID, Data: cmd, }) } if err != nil { return } return } func (c *hostClient) Read(ctx context.Context, shardID uint64, query []byte, stale bool) (value uint64, data []byte, err error) { var res *internal.ReadResponse if c.host.ID == c.agent.hostID() { res, err = c.agent.grpcServer.Read(ctx, &internal.ReadRequest{ ShardId: shardID, Stale: stale, Data: query, }) } else { res, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Read(ctx, &internal.ReadRequest{ ShardId: shardID, Stale: stale, Data: query, }) } if err != nil { return } value = res.Value data = res.Data return } type watchServer struct { grpc.ServerStream ctx context.Context results chan<- *Result } func newWatchServer(ctx context.Context, results chan<- *Result) *watchServer { return &watchServer{ ctx: ctx, results: results, } } func (s *watchServer) Context() context.Context { return s.ctx } func (s *watchServer) Send(res *internal.WatchResponse) error { s.results <- &Result{ Value: res.Value, Data: res.Data, } return nil } func (c *hostClient) Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale bool) (err error) { var client internal.Internal_WatchClient if c.host.ID == c.agent.hostID() { err = c.agent.grpcServer.Watch(&internal.WatchRequest{ ShardId: shardID, Stale: stale, Data: query, }, newWatchServer(ctx, results)) } else { client, err = c.agent.grpcClientPool.get(c.host.ApiAddress).Watch(ctx, &internal.WatchRequest{ ShardId: shardID, Stale: stale, Data: query, }) for { res, err := client.Recv() if err != nil { break } results <- &Result{ Value: res.Value, Data: res.Data, } } } if err != nil { return } return }
package zongzi import ( "context" "fmt" "sync" "time" "github.com/logbn/zongzi/internal" ) // The hostController starts and stops replicas based on replica config. type hostController struct { agent *Agent ctx context.Context ctxCancel context.CancelFunc mutex sync.RWMutex index uint64 done chan bool } func newHostController(a *Agent) *hostController { return &hostController{ agent: a, } } func (c *hostController) Start() (err error) { c.mutex.Lock() c.ctx, c.ctxCancel = context.WithCancel(context.Background()) c.done = make(chan bool) go func() { t := time.NewTicker(waitPeriod) defer t.Stop() for { select { case <-t.C: err = c.tick() case <-c.ctx.Done(): close(c.done) return } if err != nil { c.agent.log.Errorf("Host Controller: %v", err) } } }() c.mutex.Unlock() return c.tick() } func (c *hostController) tick() (err error) { c.mutex.Lock() defer c.mutex.Unlock() var index uint64 var hadErr bool var shard Shard c.agent.State(c.ctx, func(state *State) { host, ok := state.Host(c.agent.hostID()) if !ok { hadErr = true c.agent.log.Warningf("Host not found %s", c.agent.hostID()) return } index = host.Updated if index <= c.index { return } // These are all the replicas that SHOULD exist on the host var found = map[uint64]bool{} state.ReplicaIterateByHostID(c.agent.hostID(), func(r Replica) bool { if r.ShardID > 0 { found[r.ID] = false } return true }) // These are all the replicas that DO exist on the host hostInfo := c.agent.host.GetNodeHostInfo(nodeHostInfoOption{}) for _, info := range hostInfo.ShardInfoList { if replica, ok := state.Replica(info.ReplicaID); ok { found[replica.ID] = true if replica.Status == ReplicaStatus_Closed { // Remove replica } if info.IsNonVoting && !replica.IsNonVoting { // Promote to Voting } if !info.IsNonVoting && replica.IsNonVoting { // Demote to NonVoting } } else if replica.ShardID > 0 { // Remove raftNode } } // This creates all the missing replicas for id, ok := range found { if ok { continue } replica, ok := state.Replica(id) if !ok { hadErr = true c.agent.log.Warningf("Replica not found") continue } shard, ok = state.Shard(replica.ShardID) if !ok { hadErr = true c.agent.log.Warningf("Shard not found") continue } item, ok := c.agent.shardTypes[shard.Type] if !ok { c.agent.log.Warningf("Shard name not found in registry: %s", shard.Type) continue } // err := c.add(shard, replica) members := state.ShardMembers(shard.ID) item.Config.ShardID = shard.ID item.Config.ReplicaID = replica.ID item.Config.IsNonVoting = replica.IsNonVoting c.agent.log.Infof("[%05d:%05d] Host Controller: Starting replica: %s", shard.ID, replica.ID, shard.Type) if item.StateMachineFactory != nil { shim := stateMachineFactoryShim(item.StateMachineFactory) switch replica.Status { case ReplicaStatus_Bootstrapping: if len(members) < 3 { err = fmt.Errorf("Not enough members") break } err = c.agent.host.StartConcurrentReplica(members, false, shim, item.Config) case ReplicaStatus_Joining: res := c.requestShardJoin(members, shard.ID, replica.ID, replica.IsNonVoting) if res == 0 { err = fmt.Errorf(`[%05d:%05d] Unable to join shard`, shard.ID, replica.ID) break } err = c.agent.host.StartConcurrentReplica(nil, true, shim, item.Config) case ReplicaStatus_Active: err = c.agent.host.StartConcurrentReplica(nil, false, shim, item.Config) } } else { shim := stateMachinePersistentFactoryShim(item.StateMachinePersistentFactory) switch replica.Status { case ReplicaStatus_Bootstrapping: if len(members) < 3 { err = fmt.Errorf("Not enough members") break } err = c.agent.host.StartOnDiskReplica(members, false, shim, item.Config) case ReplicaStatus_Joining: res := c.requestShardJoin(members, shard.ID, replica.ID, replica.IsNonVoting) if res == 0 { err = fmt.Errorf(`[%05d:%05d] Unable to join persistent shard`, shard.ID, replica.ID) break } err = c.agent.host.StartOnDiskReplica(nil, true, shim, item.Config) case ReplicaStatus_Active: err = c.agent.host.StartOnDiskReplica(nil, false, shim, item.Config) } } if err != nil { hadErr = true c.agent.log.Warningf("Failed to start replica: %v", err) continue } var res Result res, err = c.agent.primePropose(newCmdReplicaUpdateStatus(replica.ID, ReplicaStatus_Active)) if err != nil || res.Value != 1 { hadErr = true c.agent.log.Warningf("Failed to update replica status: %v", err) } } }) if index <= c.index || hadErr { err = nil return } c.agent.log.Debugf("%s Finished processing %d", c.agent.hostID(), index) c.index = index return } // requestShardJoin requests host replica be added to a shard func (c *hostController) requestShardJoin(members map[uint64]string, shardID, replicaID uint64, isNonVoting bool) (v uint64) { c.agent.log.Debugf("[%05d:%05d] Joining shard (isNonVoting: %v)", shardID, replicaID, isNonVoting) var res *internal.AddResponse var host Host var ok bool var err error for _, hostID := range members { c.agent.State(c.ctx, func(s *State) { host, ok = s.Host(hostID) }) if !ok { c.agent.log.Warningf(`Host not found %s`, hostID) continue } ctx, cancel := context.WithTimeout(context.Background(), raftTimeout) defer cancel() res, err = c.agent.grpcClientPool.get(host.ApiAddress).Add(ctx, &internal.AddRequest{ HostId: c.agent.hostID(), ShardId: shardID, ReplicaId: replicaID, IsNonVoting: isNonVoting, }) if err != nil { c.agent.log.Warningf(`[%05d:%05d] %s | %s Unable to join shard (%v): %v`, shardID, replicaID, c.agent.hostID(), hostID, isNonVoting, err) } if err == nil && res != nil && res.Value > 0 { v = res.Value break } } if err != nil { c.agent.log.Warningf(`Unable to join shard: %v`, err) } if res != nil { v = res.Value } return } func (c *hostController) Stop() { defer c.agent.log.Infof(`Stopped hostController`) if c.ctxCancel != nil { c.ctxCancel() } <-c.done c.mutex.Lock() defer c.mutex.Unlock() c.index = 0 }
// Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.33.0 // protoc v3.12.4 // source: zongzi.proto package internal import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" ) const ( // Verify that this generated code is sufficiently up-to-date. _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) // Verify that runtime/protoimpl is sufficiently up-to-date. _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) type PingRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *PingRequest) Reset() { *x = PingRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *PingRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*PingRequest) ProtoMessage() {} func (x *PingRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. func (*PingRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{0} } type PingResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *PingResponse) Reset() { *x = PingResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *PingResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{1} } type ProbeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *ProbeRequest) Reset() { *x = ProbeRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ProbeRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*ProbeRequest) ProtoMessage() {} func (x *ProbeRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ProbeRequest.ProtoReflect.Descriptor instead. func (*ProbeRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{2} } type ProbeResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields GossipAdvertiseAddress string `protobuf:"bytes,1,opt,name=gossip_advertise_address,json=gossipAdvertiseAddress,proto3" json:"gossip_advertise_address,omitempty"` } func (x *ProbeResponse) Reset() { *x = ProbeResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ProbeResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*ProbeResponse) ProtoMessage() {} func (x *ProbeResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ProbeResponse.ProtoReflect.Descriptor instead. func (*ProbeResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{3} } func (x *ProbeResponse) GetGossipAdvertiseAddress() string { if x != nil { return x.GossipAdvertiseAddress } return "" } type InfoRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *InfoRequest) Reset() { *x = InfoRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *InfoRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*InfoRequest) ProtoMessage() {} func (x *InfoRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use InfoRequest.ProtoReflect.Descriptor instead. func (*InfoRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{4} } type InfoResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"` ReplicaId uint64 `protobuf:"varint,2,opt,name=replica_id,json=replicaId,proto3" json:"replica_id,omitempty"` } func (x *InfoResponse) Reset() { *x = InfoResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *InfoResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*InfoResponse) ProtoMessage() {} func (x *InfoResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use InfoResponse.ProtoReflect.Descriptor instead. func (*InfoResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{5} } func (x *InfoResponse) GetHostId() string { if x != nil { return x.HostId } return "" } func (x *InfoResponse) GetReplicaId() uint64 { if x != nil { return x.ReplicaId } return 0 } type MembersRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *MembersRequest) Reset() { *x = MembersRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *MembersRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*MembersRequest) ProtoMessage() {} func (x *MembersRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use MembersRequest.ProtoReflect.Descriptor instead. func (*MembersRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{6} } type MembersResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Members map[uint64]string `protobuf:"bytes,1,rep,name=members,proto3" json:"members,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *MembersResponse) Reset() { *x = MembersResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *MembersResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*MembersResponse) ProtoMessage() {} func (x *MembersResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use MembersResponse.ProtoReflect.Descriptor instead. func (*MembersResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{7} } func (x *MembersResponse) GetMembers() map[uint64]string { if x != nil { return x.Members } return nil } type JoinRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"` IsNonVoting bool `protobuf:"varint,2,opt,name=is_non_voting,json=isNonVoting,proto3" json:"is_non_voting,omitempty"` } func (x *JoinRequest) Reset() { *x = JoinRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *JoinRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*JoinRequest) ProtoMessage() {} func (x *JoinRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead. func (*JoinRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{8} } func (x *JoinRequest) GetHostId() string { if x != nil { return x.HostId } return "" } func (x *JoinRequest) GetIsNonVoting() bool { if x != nil { return x.IsNonVoting } return false } type JoinResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` } func (x *JoinResponse) Reset() { *x = JoinResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *JoinResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*JoinResponse) ProtoMessage() {} func (x *JoinResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use JoinResponse.ProtoReflect.Descriptor instead. func (*JoinResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{9} } func (x *JoinResponse) GetValue() uint64 { if x != nil { return x.Value } return 0 } func (x *JoinResponse) GetError() string { if x != nil { return x.Error } return "" } type AddRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields HostId string `protobuf:"bytes,1,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"` ShardId uint64 `protobuf:"varint,2,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` ReplicaId uint64 `protobuf:"varint,3,opt,name=replica_id,json=replicaId,proto3" json:"replica_id,omitempty"` IsNonVoting bool `protobuf:"varint,4,opt,name=is_non_voting,json=isNonVoting,proto3" json:"is_non_voting,omitempty"` } func (x *AddRequest) Reset() { *x = AddRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *AddRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*AddRequest) ProtoMessage() {} func (x *AddRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. func (*AddRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{10} } func (x *AddRequest) GetHostId() string { if x != nil { return x.HostId } return "" } func (x *AddRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } func (x *AddRequest) GetReplicaId() uint64 { if x != nil { return x.ReplicaId } return 0 } func (x *AddRequest) GetIsNonVoting() bool { if x != nil { return x.IsNonVoting } return false } type AddResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` } func (x *AddResponse) Reset() { *x = AddResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *AddResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*AddResponse) ProtoMessage() {} func (x *AddResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use AddResponse.ProtoReflect.Descriptor instead. func (*AddResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{11} } func (x *AddResponse) GetValue() uint64 { if x != nil { return x.Value } return 0 } func (x *AddResponse) GetError() string { if x != nil { return x.Error } return "" } type IndexRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` } func (x *IndexRequest) Reset() { *x = IndexRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *IndexRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*IndexRequest) ProtoMessage() {} func (x *IndexRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use IndexRequest.ProtoReflect.Descriptor instead. func (*IndexRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{12} } func (x *IndexRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } type IndexResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *IndexResponse) Reset() { *x = IndexResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *IndexResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*IndexResponse) ProtoMessage() {} func (x *IndexResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use IndexResponse.ProtoReflect.Descriptor instead. func (*IndexResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{13} } type ApplyRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *ApplyRequest) Reset() { *x = ApplyRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ApplyRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*ApplyRequest) ProtoMessage() {} func (x *ApplyRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ApplyRequest.ProtoReflect.Descriptor instead. func (*ApplyRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{14} } func (x *ApplyRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } func (x *ApplyRequest) GetData() []byte { if x != nil { return x.Data } return nil } type ApplyResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *ApplyResponse) Reset() { *x = ApplyResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ApplyResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*ApplyResponse) ProtoMessage() {} func (x *ApplyResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ApplyResponse.ProtoReflect.Descriptor instead. func (*ApplyResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{15} } func (x *ApplyResponse) GetValue() uint64 { if x != nil { return x.Value } return 0 } func (x *ApplyResponse) GetData() []byte { if x != nil { return x.Data } return nil } type CommitRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *CommitRequest) Reset() { *x = CommitRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *CommitRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*CommitRequest) ProtoMessage() {} func (x *CommitRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use CommitRequest.ProtoReflect.Descriptor instead. func (*CommitRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{16} } func (x *CommitRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } func (x *CommitRequest) GetData() []byte { if x != nil { return x.Data } return nil } type CommitResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } func (x *CommitResponse) Reset() { *x = CommitResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *CommitResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*CommitResponse) ProtoMessage() {} func (x *CommitResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use CommitResponse.ProtoReflect.Descriptor instead. func (*CommitResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{17} } type ReadRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` Stale bool `protobuf:"varint,2,opt,name=stale,proto3" json:"stale,omitempty"` Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` } func (x *ReadRequest) Reset() { *x = ReadRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ReadRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*ReadRequest) ProtoMessage() {} func (x *ReadRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[18] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ReadRequest.ProtoReflect.Descriptor instead. func (*ReadRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{18} } func (x *ReadRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } func (x *ReadRequest) GetStale() bool { if x != nil { return x.Stale } return false } func (x *ReadRequest) GetData() []byte { if x != nil { return x.Data } return nil } type ReadResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *ReadResponse) Reset() { *x = ReadResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *ReadResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*ReadResponse) ProtoMessage() {} func (x *ReadResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[19] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use ReadResponse.ProtoReflect.Descriptor instead. func (*ReadResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{19} } func (x *ReadResponse) GetValue() uint64 { if x != nil { return x.Value } return 0 } func (x *ReadResponse) GetData() []byte { if x != nil { return x.Data } return nil } type WatchRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` Stale bool `protobuf:"varint,2,opt,name=stale,proto3" json:"stale,omitempty"` Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` } func (x *WatchRequest) Reset() { *x = WatchRequest{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *WatchRequest) String() string { return protoimpl.X.MessageStringOf(x) } func (*WatchRequest) ProtoMessage() {} func (x *WatchRequest) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[20] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead. func (*WatchRequest) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{20} } func (x *WatchRequest) GetShardId() uint64 { if x != nil { return x.ShardId } return 0 } func (x *WatchRequest) GetStale() bool { if x != nil { return x.Stale } return false } func (x *WatchRequest) GetData() []byte { if x != nil { return x.Data } return nil } type WatchResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *WatchResponse) Reset() { *x = WatchResponse{} if protoimpl.UnsafeEnabled { mi := &file_zongzi_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *WatchResponse) String() string { return protoimpl.X.MessageStringOf(x) } func (*WatchResponse) ProtoMessage() {} func (x *WatchResponse) ProtoReflect() protoreflect.Message { mi := &file_zongzi_proto_msgTypes[21] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use WatchResponse.ProtoReflect.Descriptor instead. func (*WatchResponse) Descriptor() ([]byte, []int) { return file_zongzi_proto_rawDescGZIP(), []int{21} } func (x *WatchResponse) GetValue() uint64 { if x != nil { return x.Value } return 0 } func (x *WatchResponse) GetData() []byte { if x != nil { return x.Data } return nil } var File_zongzi_proto protoreflect.FileDescriptor var file_zongzi_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x22, 0x0d, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0e, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x49, 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x18, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x5f, 0x61, 0x64, 0x76, 0x65, 0x72, 0x74, 0x69, 0x73, 0x65, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x41, 0x64, 0x76, 0x65, 0x72, 0x74, 0x69, 0x73, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x0d, 0x0a, 0x0b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x46, 0x0a, 0x0c, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x22, 0x10, 0x0a, 0x0e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x8d, 0x01, 0x0a, 0x0f, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4a, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x69, 0x73, 0x5f, 0x6e, 0x6f, 0x6e, 0x5f, 0x76, 0x6f, 0x74, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x4e, 0x6f, 0x6e, 0x56, 0x6f, 0x74, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x0c, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x83, 0x01, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x68, 0x6f, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x68, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x69, 0x73, 0x5f, 0x6e, 0x6f, 0x6e, 0x5f, 0x76, 0x6f, 0x74, 0x69, 0x6e, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x4e, 0x6f, 0x6e, 0x56, 0x6f, 0x74, 0x69, 0x6e, 0x67, 0x22, 0x39, 0x0a, 0x0b, 0x41, 0x64, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x29, 0x0a, 0x0c, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x22, 0x0f, 0x0a, 0x0d, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x3d, 0x0a, 0x0c, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a, 0x0d, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3e, 0x0a, 0x0d, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x52, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x38, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x53, 0x0a, 0x0c, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a, 0x0d, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0xeb, 0x04, 0x0a, 0x08, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x33, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x50, 0x72, 0x6f, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x07, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, 0x16, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x4a, 0x6f, 0x69, 0x6e, 0x12, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x30, 0x0a, 0x03, 0x41, 0x64, 0x64, 0x12, 0x12, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x64, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x06, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x12, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x13, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x14, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x6f, 0x67, 0x62, 0x6e, 0x2f, 0x7a, 0x6f, 0x6e, 0x67, 0x7a, 0x69, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( file_zongzi_proto_rawDescOnce sync.Once file_zongzi_proto_rawDescData = file_zongzi_proto_rawDesc ) func file_zongzi_proto_rawDescGZIP() []byte { file_zongzi_proto_rawDescOnce.Do(func() { file_zongzi_proto_rawDescData = protoimpl.X.CompressGZIP(file_zongzi_proto_rawDescData) }) return file_zongzi_proto_rawDescData } var file_zongzi_proto_msgTypes = make([]protoimpl.MessageInfo, 23) var file_zongzi_proto_goTypes = []interface{}{ (*PingRequest)(nil), // 0: zongzi.PingRequest (*PingResponse)(nil), // 1: zongzi.PingResponse (*ProbeRequest)(nil), // 2: zongzi.ProbeRequest (*ProbeResponse)(nil), // 3: zongzi.ProbeResponse (*InfoRequest)(nil), // 4: zongzi.InfoRequest (*InfoResponse)(nil), // 5: zongzi.InfoResponse (*MembersRequest)(nil), // 6: zongzi.MembersRequest (*MembersResponse)(nil), // 7: zongzi.MembersResponse (*JoinRequest)(nil), // 8: zongzi.JoinRequest (*JoinResponse)(nil), // 9: zongzi.JoinResponse (*AddRequest)(nil), // 10: zongzi.AddRequest (*AddResponse)(nil), // 11: zongzi.AddResponse (*IndexRequest)(nil), // 12: zongzi.IndexRequest (*IndexResponse)(nil), // 13: zongzi.IndexResponse (*ApplyRequest)(nil), // 14: zongzi.ApplyRequest (*ApplyResponse)(nil), // 15: zongzi.ApplyResponse (*CommitRequest)(nil), // 16: zongzi.CommitRequest (*CommitResponse)(nil), // 17: zongzi.CommitResponse (*ReadRequest)(nil), // 18: zongzi.ReadRequest (*ReadResponse)(nil), // 19: zongzi.ReadResponse (*WatchRequest)(nil), // 20: zongzi.WatchRequest (*WatchResponse)(nil), // 21: zongzi.WatchResponse nil, // 22: zongzi.MembersResponse.MembersEntry } var file_zongzi_proto_depIdxs = []int32{ 22, // 0: zongzi.MembersResponse.members:type_name -> zongzi.MembersResponse.MembersEntry 0, // 1: zongzi.Internal.Ping:input_type -> zongzi.PingRequest 2, // 2: zongzi.Internal.Probe:input_type -> zongzi.ProbeRequest 4, // 3: zongzi.Internal.Info:input_type -> zongzi.InfoRequest 6, // 4: zongzi.Internal.Members:input_type -> zongzi.MembersRequest 8, // 5: zongzi.Internal.Join:input_type -> zongzi.JoinRequest 10, // 6: zongzi.Internal.Add:input_type -> zongzi.AddRequest 12, // 7: zongzi.Internal.Index:input_type -> zongzi.IndexRequest 14, // 8: zongzi.Internal.Apply:input_type -> zongzi.ApplyRequest 16, // 9: zongzi.Internal.Commit:input_type -> zongzi.CommitRequest 18, // 10: zongzi.Internal.Read:input_type -> zongzi.ReadRequest 20, // 11: zongzi.Internal.Watch:input_type -> zongzi.WatchRequest 1, // 12: zongzi.Internal.Ping:output_type -> zongzi.PingResponse 3, // 13: zongzi.Internal.Probe:output_type -> zongzi.ProbeResponse 5, // 14: zongzi.Internal.Info:output_type -> zongzi.InfoResponse 7, // 15: zongzi.Internal.Members:output_type -> zongzi.MembersResponse 9, // 16: zongzi.Internal.Join:output_type -> zongzi.JoinResponse 11, // 17: zongzi.Internal.Add:output_type -> zongzi.AddResponse 13, // 18: zongzi.Internal.Index:output_type -> zongzi.IndexResponse 15, // 19: zongzi.Internal.Apply:output_type -> zongzi.ApplyResponse 17, // 20: zongzi.Internal.Commit:output_type -> zongzi.CommitResponse 19, // 21: zongzi.Internal.Read:output_type -> zongzi.ReadResponse 21, // 22: zongzi.Internal.Watch:output_type -> zongzi.WatchResponse 12, // [12:23] is the sub-list for method output_type 1, // [1:12] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name } func init() { file_zongzi_proto_init() } func file_zongzi_proto_init() { if File_zongzi_proto != nil { return } if !protoimpl.UnsafeEnabled { file_zongzi_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PingRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PingResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ProbeRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ProbeResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*InfoRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*InfoResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MembersRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MembersResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*JoinRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*JoinResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*AddRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*AddResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*IndexRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*IndexResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ApplyRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ApplyResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CommitRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CommitResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReadRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReadResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WatchRequest); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } file_zongzi_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WatchResponse); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_zongzi_proto_rawDesc, NumEnums: 0, NumMessages: 23, NumExtensions: 0, NumServices: 1, }, GoTypes: file_zongzi_proto_goTypes, DependencyIndexes: file_zongzi_proto_depIdxs, MessageInfos: file_zongzi_proto_msgTypes, }.Build() File_zongzi_proto = out.File file_zongzi_proto_rawDesc = nil file_zongzi_proto_goTypes = nil file_zongzi_proto_depIdxs = nil }
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 // - protoc v3.12.4 // source: zongzi.proto package internal import ( context "context" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" ) // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 const ( Internal_Ping_FullMethodName = "/zongzi.Internal/Ping" Internal_Probe_FullMethodName = "/zongzi.Internal/Probe" Internal_Info_FullMethodName = "/zongzi.Internal/Info" Internal_Members_FullMethodName = "/zongzi.Internal/Members" Internal_Join_FullMethodName = "/zongzi.Internal/Join" Internal_Add_FullMethodName = "/zongzi.Internal/Add" Internal_Index_FullMethodName = "/zongzi.Internal/Index" Internal_Apply_FullMethodName = "/zongzi.Internal/Apply" Internal_Commit_FullMethodName = "/zongzi.Internal/Commit" Internal_Read_FullMethodName = "/zongzi.Internal/Read" Internal_Watch_FullMethodName = "/zongzi.Internal/Watch" ) // InternalClient is the client API for Internal service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type InternalClient interface { // Ping is a noop for timing purposes Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) // Probe returns Gossip.AdvertiseAddress // Used by new hosts to start dragonboat Probe(ctx context.Context, in *ProbeRequest, opts ...grpc.CallOption) (*ProbeResponse, error) // Info returns zero shard replicaID and hostID // Used by new hosts to discover zero shard replicas Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) // Members returns json marshaled map of member zero shard replicaID to hostID // Used by new hosts joining a bootstrapped zero shard Members(ctx context.Context, in *MembersRequest, opts ...grpc.CallOption) (*MembersResponse, error) // Join takes a host ID and returns success // Used by joining hosts to request their own addition to the zero shard Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) // Add takes a replica ID, shard ID and host ID and returns success // Used during replica creation to request replica's addition to the shard Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error) // Index reads the index of a shard Index(ctx context.Context, in *IndexRequest, opts ...grpc.CallOption) (*IndexResponse, error) // Apply provides unary request/response command forwarding Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) // Commit provides unary request/response command forwarding Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error) // Read provides unary request/response query forwarding Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) // Watch provides streaming query response forwarding Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Internal_WatchClient, error) } type internalClient struct { cc grpc.ClientConnInterface } func NewInternalClient(cc grpc.ClientConnInterface) InternalClient { return &internalClient{cc} } func (c *internalClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { out := new(PingResponse) err := c.cc.Invoke(ctx, Internal_Ping_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Probe(ctx context.Context, in *ProbeRequest, opts ...grpc.CallOption) (*ProbeResponse, error) { out := new(ProbeResponse) err := c.cc.Invoke(ctx, Internal_Probe_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) { out := new(InfoResponse) err := c.cc.Invoke(ctx, Internal_Info_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Members(ctx context.Context, in *MembersRequest, opts ...grpc.CallOption) (*MembersResponse, error) { out := new(MembersResponse) err := c.cc.Invoke(ctx, Internal_Members_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) { out := new(JoinResponse) err := c.cc.Invoke(ctx, Internal_Join_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error) { out := new(AddResponse) err := c.cc.Invoke(ctx, Internal_Add_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Index(ctx context.Context, in *IndexRequest, opts ...grpc.CallOption) (*IndexResponse, error) { out := new(IndexResponse) err := c.cc.Invoke(ctx, Internal_Index_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) { out := new(ApplyResponse) err := c.cc.Invoke(ctx, Internal_Apply_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error) { out := new(CommitResponse) err := c.cc.Invoke(ctx, Internal_Commit_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { out := new(ReadResponse) err := c.cc.Invoke(ctx, Internal_Read_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *internalClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Internal_WatchClient, error) { stream, err := c.cc.NewStream(ctx, &Internal_ServiceDesc.Streams[0], Internal_Watch_FullMethodName, opts...) if err != nil { return nil, err } x := &internalWatchClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } type Internal_WatchClient interface { Recv() (*WatchResponse, error) grpc.ClientStream } type internalWatchClient struct { grpc.ClientStream } func (x *internalWatchClient) Recv() (*WatchResponse, error) { m := new(WatchResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // InternalServer is the server API for Internal service. // All implementations must embed UnimplementedInternalServer // for forward compatibility type InternalServer interface { // Ping is a noop for timing purposes Ping(context.Context, *PingRequest) (*PingResponse, error) // Probe returns Gossip.AdvertiseAddress // Used by new hosts to start dragonboat Probe(context.Context, *ProbeRequest) (*ProbeResponse, error) // Info returns zero shard replicaID and hostID // Used by new hosts to discover zero shard replicas Info(context.Context, *InfoRequest) (*InfoResponse, error) // Members returns json marshaled map of member zero shard replicaID to hostID // Used by new hosts joining a bootstrapped zero shard Members(context.Context, *MembersRequest) (*MembersResponse, error) // Join takes a host ID and returns success // Used by joining hosts to request their own addition to the zero shard Join(context.Context, *JoinRequest) (*JoinResponse, error) // Add takes a replica ID, shard ID and host ID and returns success // Used during replica creation to request replica's addition to the shard Add(context.Context, *AddRequest) (*AddResponse, error) // Index reads the index of a shard Index(context.Context, *IndexRequest) (*IndexResponse, error) // Apply provides unary request/response command forwarding Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) // Commit provides unary request/response command forwarding Commit(context.Context, *CommitRequest) (*CommitResponse, error) // Read provides unary request/response query forwarding Read(context.Context, *ReadRequest) (*ReadResponse, error) // Watch provides streaming query response forwarding Watch(*WatchRequest, Internal_WatchServer) error mustEmbedUnimplementedInternalServer() } // UnimplementedInternalServer must be embedded to have forward compatible implementations. type UnimplementedInternalServer struct { } func (UnimplementedInternalServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } func (UnimplementedInternalServer) Probe(context.Context, *ProbeRequest) (*ProbeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Probe not implemented") } func (UnimplementedInternalServer) Info(context.Context, *InfoRequest) (*InfoResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Info not implemented") } func (UnimplementedInternalServer) Members(context.Context, *MembersRequest) (*MembersResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Members not implemented") } func (UnimplementedInternalServer) Join(context.Context, *JoinRequest) (*JoinResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Join not implemented") } func (UnimplementedInternalServer) Add(context.Context, *AddRequest) (*AddResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Add not implemented") } func (UnimplementedInternalServer) Index(context.Context, *IndexRequest) (*IndexResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Index not implemented") } func (UnimplementedInternalServer) Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented") } func (UnimplementedInternalServer) Commit(context.Context, *CommitRequest) (*CommitResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Commit not implemented") } func (UnimplementedInternalServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Read not implemented") } func (UnimplementedInternalServer) Watch(*WatchRequest, Internal_WatchServer) error { return status.Errorf(codes.Unimplemented, "method Watch not implemented") } func (UnimplementedInternalServer) mustEmbedUnimplementedInternalServer() {} // UnsafeInternalServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to InternalServer will // result in compilation errors. type UnsafeInternalServer interface { mustEmbedUnimplementedInternalServer() } func RegisterInternalServer(s grpc.ServiceRegistrar, srv InternalServer) { s.RegisterService(&Internal_ServiceDesc, srv) } func _Internal_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PingRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Ping(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Ping_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Ping(ctx, req.(*PingRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Probe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ProbeRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Probe(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Probe_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Probe(ctx, req.(*ProbeRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Info_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(InfoRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Info(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Info_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Info(ctx, req.(*InfoRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Members_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(MembersRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Members(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Members_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Members(ctx, req.(*MembersRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(JoinRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Join(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Join_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Join(ctx, req.(*JoinRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Add_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(AddRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Add(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Add_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Add(ctx, req.(*AddRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Index_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(IndexRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Index(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Index_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Index(ctx, req.(*IndexRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Apply_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ApplyRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Apply(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Apply_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Apply(ctx, req.(*ApplyRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Commit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CommitRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Commit(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Commit_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Commit(ctx, req.(*CommitRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ReadRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(InternalServer).Read(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Internal_Read_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(InternalServer).Read(ctx, req.(*ReadRequest)) } return interceptor(ctx, in, info, handler) } func _Internal_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(WatchRequest) if err := stream.RecvMsg(m); err != nil { return err } return srv.(InternalServer).Watch(m, &internalWatchServer{stream}) } type Internal_WatchServer interface { Send(*WatchResponse) error grpc.ServerStream } type internalWatchServer struct { grpc.ServerStream } func (x *internalWatchServer) Send(m *WatchResponse) error { return x.ServerStream.SendMsg(m) } // Internal_ServiceDesc is the grpc.ServiceDesc for Internal service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Internal_ServiceDesc = grpc.ServiceDesc{ ServiceName: "zongzi.Internal", HandlerType: (*InternalServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Ping", Handler: _Internal_Ping_Handler, }, { MethodName: "Probe", Handler: _Internal_Probe_Handler, }, { MethodName: "Info", Handler: _Internal_Info_Handler, }, { MethodName: "Members", Handler: _Internal_Members_Handler, }, { MethodName: "Join", Handler: _Internal_Join_Handler, }, { MethodName: "Add", Handler: _Internal_Add_Handler, }, { MethodName: "Index", Handler: _Internal_Index_Handler, }, { MethodName: "Apply", Handler: _Internal_Apply_Handler, }, { MethodName: "Commit", Handler: _Internal_Commit_Handler, }, { MethodName: "Read", Handler: _Internal_Read_Handler, }, }, Streams: []grpc.StreamDesc{ { StreamName: "Watch", Handler: _Internal_Watch_Handler, ServerStreams: true, }, }, Metadata: "zongzi.proto", }
package zongzi import ( "context" ) // ShardClient can be used to interact with a shard regardless of its placement in the cluster // Requests will be forwarded to the appropriate host based on ping type ShardClient interface { Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) Commit(ctx context.Context, cmd []byte) (err error) Index(ctx context.Context) (err error) Leader() (uint64, uint64) Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error) Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error) } // The shard client type client struct { manager *clientManager shardID uint64 retries int writeToLeader bool } func newClient(manager *clientManager, shardID uint64, opts ...ClientOption) (c *client, err error) { c = &client{ manager: manager, shardID: shardID, } for _, fn := range opts { if err = fn(c); err != nil { return } } return } func (c *client) Index(ctx context.Context) (err error) { c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { err = el.Value.ReadIndex(ctx, c.shardID) if err == nil { break } } return } func (c *client) Leader() (replicaID, term uint64) { c.manager.mutex.RLock() leader, ok := c.manager.clientLeader[c.shardID] c.manager.mutex.RUnlock() if ok { replicaID = leader.replicaID term = leader.term } return } func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) { if c.writeToLeader { c.manager.mutex.RLock() leader, ok := c.manager.clientLeader[c.shardID] c.manager.mutex.RUnlock() if ok { return leader.client.Apply(ctx, c.shardID, cmd) } } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } for el := list.Front(); el != nil; el = el.Next() { value, data, err = el.Value.Apply(ctx, c.shardID, cmd) if err == nil { break } } return } func (c *client) Commit(ctx context.Context, cmd []byte) (err error) { if c.writeToLeader { c.manager.mutex.RLock() leader, ok := c.manager.clientLeader[c.shardID] c.manager.mutex.RUnlock() if ok { return leader.client.Commit(ctx, c.shardID, cmd) } } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { err = el.Value.Commit(ctx, c.shardID, cmd) if err == nil { break } } return } func (c *client) Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error) { var run bool if stale { c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { run = true value, data, err = el.Value.Read(ctx, c.shardID, query, stale) if err == nil { break } } if run && err == nil { return } } if c.writeToLeader { c.manager.mutex.RLock() leader, ok := c.manager.clientLeader[c.shardID] c.manager.mutex.RUnlock() if ok { return leader.client.Read(ctx, c.shardID, query, stale) } } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { value, data, err = el.Value.Read(ctx, c.shardID, query, stale) if err == nil { break } } return } func (c *client) Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error) { var run bool if stale { c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { run = true err = el.Value.Watch(ctx, c.shardID, query, results, stale) if err == nil { break } } if run && err == nil { return } } if c.writeToLeader { c.manager.mutex.RLock() leader, ok := c.manager.clientLeader[c.shardID] c.manager.mutex.RUnlock() if ok { return leader.client.Watch(ctx, c.shardID, query, results, stale) } } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() if !ok { err = ErrShardNotReady return } el := list.Front() for ; el != nil; el = el.Next() { err = el.Value.Watch(ctx, c.shardID, query, results, stale) if err == nil { break } } return }
package zongzi import ( "cmp" "context" "slices" "sync" "time" "github.com/benbjohnson/clock" "github.com/elliotchance/orderedmap/v2" ) // The clientManager creates and destroys replicas based on a shard tags. type clientManager struct { agent *Agent clock clock.Clock ctx context.Context ctxCancel context.CancelFunc clientHost map[string]hostClient clientLeader map[uint64]hostClientLeader clientMember map[uint64]*orderedmap.OrderedMap[int64, hostClient] clientReplica map[uint64]*orderedmap.OrderedMap[int64, hostClient] index uint64 log Logger mutex sync.RWMutex shardController Controller wg sync.WaitGroup } type hostClientLeader struct { client hostClient replicaID uint64 term uint64 } func newClientManager(agent *Agent) *clientManager { return &clientManager{ log: agent.log, agent: agent, clock: clock.New(), clientHost: map[string]hostClient{}, clientLeader: map[uint64]hostClientLeader{}, clientMember: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{}, clientReplica: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{}, } } func (c *clientManager) Start() (err error) { c.mutex.Lock() defer c.mutex.Unlock() c.ctx, c.ctxCancel = context.WithCancel(context.Background()) c.wg.Add(1) go func() { defer c.wg.Done() t := c.clock.Ticker(500 * time.Millisecond) defer t.Stop() for { select { case <-c.ctx.Done(): c.log.Infof("Shard client manager stopped") return case <-t.C: c.tick() } } }() return } type hostClientPing struct { ping int64 client hostClient } func (c *clientManager) tick() { var err error var index uint64 var start = c.clock.Now() var shardCount int var replicaCount int var pings = map[string]time.Duration{} err = c.agent.State(nil, func(state *State) { state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool { shardCount++ index = shard.Updated var leaderClient hostClient members := []hostClientPing{} replicas := []hostClientPing{} state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool { replicaCount++ client, ok := c.clientHost[replica.HostID] if !ok { client = c.agent.hostClient(replica.HostID) c.clientHost[replica.HostID] = client } ping, ok := pings[replica.HostID] if !ok { ctx, cancel := context.WithTimeout(c.ctx, time.Second) defer cancel() ping, err = client.Ping(ctx) if err != nil { c.log.Warningf(`Unable to ping host in shard client manager: %s`, err.Error()) return true } pings[replica.HostID] = ping } if replica.IsNonVoting { replicas = append(replicas, hostClientPing{ping.Nanoseconds(), client}) } else { members = append(members, hostClientPing{ping.Nanoseconds(), client}) } if shard.Leader == replica.ID { leaderClient = client } return true }) slices.SortFunc(members, byPingAsc) slices.SortFunc(replicas, byPingAsc) newMembers := orderedmap.NewOrderedMap[int64, hostClient]() for _, item := range members { newMembers.Set(item.ping, item.client) } newReplicas := orderedmap.NewOrderedMap[int64, hostClient]() for _, item := range replicas { newReplicas.Set(item.ping, item.client) } c.mutex.Lock() if leaderClient.agent != nil { c.clientLeader[shard.ID] = hostClientLeader{leaderClient, shard.Leader, shard.Term} } else { delete(c.clientLeader, shard.ID) } c.clientMember[shard.ID] = newMembers c.clientReplica[shard.ID] = newReplicas c.mutex.Unlock() return true }) }) if err == nil && shardCount > 0 { c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d leaders: %d time: %vms", c.agent.hostID(), len(pings), shardCount, replicaCount, len(c.clientLeader), float64(c.clock.Since(start)/time.Microsecond)/1000) c.index = index } return } func byPingAsc(a, b hostClientPing) int { return cmp.Compare(a.ping, b.ping) } func (c *clientManager) Stop() { defer c.log.Infof(`Stopped clientManager`) if c.ctxCancel != nil { c.ctxCancel() } c.wg.Wait() c.mutex.Lock() defer c.mutex.Unlock() c.index = 0 }
package zongzi type ClientOption func(*client) error func WithRetries(retries int) ClientOption { return func(c *client) error { c.retries = retries return nil } } func WithWriteToLeader() ClientOption { return func(c *client) error { c.writeToLeader = true return nil } }
package zongzi import ( "fmt" "strconv" "strings" ) // The default shard controller is a basic controller that creates and destroys replicas based on a shard tags. type shardControllerDefault struct { agent *Agent log Logger } func newShardControllerDefault(agent *Agent) *shardControllerDefault { return &shardControllerDefault{ log: agent.log, agent: agent, } } func (c *shardControllerDefault) Reconcile(state *State, shard Shard, controls Controls) (err error) { c.log.Debugf("Reconciling Shard %d", shard.ID) var ( desired = map[string]int{} filters = map[string][]string{} found = map[string]int{} matches = map[string][]Host{} occupiedHosts = map[string]bool{} undesired = []uint64{} vary = map[string]bool{} varyCount = map[string]map[string]int{} varyMatch = map[string]map[string]int{} ) // Resolve desired state from shard tags for tagKey, tagValue := range shard.Tags { if !strings.HasPrefix(tagKey, "placement:") { continue } if tagKey == `placement:vary` { // ex: placement:vary=geo:zone vary[tagValue] = true } else if tagKey == `placement:member` { // ex: placement:member=3;geo:region=us-central1 parts := strings.Split(tagValue, ";") i, err := strconv.Atoi(parts[0]) if err != nil { c.log.Warningf(`Invalid tag placement:member %s %s`, tagKey, err.Error()) continue } desired[`member`] = i filters[`member`] = parts[1:] } else if strings.HasPrefix(tagKey, `placement:replica:`) { // ex: placement:replica:read=6;host:class=storage-replica group := tagKey[len(`placement:replica:`):] if len(group) == 0 { c.log.Warningf(`Invalid tag placement:replica - "%s"`, tagKey) continue } if group == `member` { c.log.Warningf(`Invalid tag placement:replica - group name "member" is reserved.`) continue } parts := strings.Split(tagValue, ";") i, err := strconv.Atoi(parts[0]) if err != nil { c.log.Warningf(`Invalid tag placement:replica %s %s`, tagKey, err.Error()) continue } desired[group] = i filters[group] = parts[1:] } else if tagKey == `placement:cover` { // ex: placement:cover=host:class=compute for _, t := range strings.Split(tagValue, ";") { k, v := c.parseTag(t) state.HostIterateByTag(k, func(h Host) bool { if v == "" || h.Tags[k] == v { desired[`cover`]++ } return true }) filters[`cover`] = append(filters[`cover`], t) } } } var varies bool var groups []string state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool { host, _ := state.Host(replica.HostID) groups = groups[:0] for group := range desired { if c.matchTagFilter(host.Tags, filters[group]) { varies = true for tag := range vary { if varyCount[group] == nil { varyCount[group] = map[string]int{} } v, ok := host.Tags[tag] if !ok { varies = false break } varyCount[group][fmt.Sprintf(`%s=%s`, tag, v)]++ } if varies { groups = append(groups, group) found[group]++ } } if len(groups) > 1 { c.log.Infof(`Replica matched multiple groups [%05d:%05d]: %v`, shard.ID, replica.ID, groups) } } if len(groups) == 0 { c.log.Debugf(`[%05d:%05d] Undesired \n%#v\n%#v`, shard.ID, replica.ID, shard.Tags, host.Tags) undesired = append(undesired, replica.ID) } else { occupiedHosts[replica.HostID] = true } return true }) var excessReplicaCount = map[string]int{} var missingReplicaCount = map[string]int{} groups = groups[:0] for group, n := range desired { if found[group] > n { excessReplicaCount[group] = found[group] - n } if found[group] < n { missingReplicaCount[group] = n - found[group] } if group == `member` { // Always process the member group first groups = append([]string{group}, groups...) } else { groups = append(groups, group) } } var requiresRebalance = map[string]bool{} for group, tags := range varyCount { var min int var max int for _, n := range tags { if min == 0 || n < min { min = n } if max == 0 || n > max { max = n } if min-max < -1 || min-max > 1 { requiresRebalance[group] = true } } } // Early exit if len(missingReplicaCount) == 0 && len(excessReplicaCount) == 0 && len(requiresRebalance) == 0 && len(undesired) == 0 { return } // Delete undesired replicas for _, replicaID := range undesired { if err = controls.Delete(replicaID); err != nil { c.log.Errorf(`Error deleting replica: %s`, err.Error()) return } // Early exit just simplifies the logic return } // Find matching hosts for each group state.HostIterate(func(host Host) bool { if _, ok := occupiedHosts[host.ID]; ok { return true } for group, n := range desired { if found[group] == n { continue } if c.matchTagFilter(host.Tags, filters[group]) { matches[group] = append(matches[group], host) for tag := range vary { v, _ := host.Tags[tag] if _, ok := varyMatch[group]; !ok { varyMatch[group] = map[string]int{} } varyMatch[group][fmt.Sprintf(`%s=%s`, tag, v)]++ } } } return true }) // TODO - Rebalance (maybe belongs in its own controller) // for group := range requiresRebalance {} // TODO - Remove excess replicas (deciding which to remove while retaining balance) // for group, n := range excessReplicaCount {} // Add missing replicas for _, group := range groups { var n = missingReplicaCount[group] for i := 0; i < n; i++ { if len(matches[group]) == 0 { err = fmt.Errorf(`No more matching hosts`) break } // Find the vary tag values with the fewest replicas var varyTagValues = map[string]string{} var varyTagCounts = map[string]int{} for tag, replicaCount := range varyCount[group] { if varyMatch[group][tag] == 0 { // Don't even try this vary tag because it has no remaining hosts. continue } k, v := c.parseTag(tag) if varyTagCounts[k] == 0 || varyTagCounts[k] < replicaCount { varyTagValues[k] = v varyTagCounts[k] = replicaCount } } if len(varyTagValues) < len(varyCount[group]) { err = fmt.Errorf(`Failed to find an available host matching vary criteria`) break } // TODO - Ensure a host is available with the full tag set rather than each individually to avoid pathological edge case w/ multiple vary tags. // Build vary tag set with fewest replicas var varyTags []string for tagKey, replicaCount := range varyTagCounts { if replicaCount > 0 && group == `member` { // The smallest vary tag value has a member already. If this group is the member group then // we give up because we never schedule members in a way that violates the vary policy. err = fmt.Errorf(`Unable to find host that satisfies member vary policy for shard`) break } varyTags = append(varyTags, fmt.Sprintf(`%s=%s`, tagKey, varyTagValues[tagKey])) } var success bool var replicaID uint64 for j, host := range matches[group] { if _, ok := occupiedHosts[host.ID]; ok { // Host already occupied continue } if c.matchTagFilter(host.Tags, varyTags) { replicaID, err = controls.Create(host.ID, shard.ID, group != `member`) if err != nil { return } for _, varyTag := range varyTags { tagKey, _ := c.parseTag(varyTag) varyMatch[group][tagKey]-- varyCount[group][varyTag]++ } c.log.Infof(`[%05d:%05d] Created replica for %s`, shard.ID, replicaID, shard.Name) matches[group] = append(matches[group][:j], matches[group][j+1:]...) occupiedHosts[host.ID] = true success = true break } } if !success { err = fmt.Errorf(`Unable to find host with matching vary tag set %+v`, varyTags) break } } if err != nil { c.log.Warningf("[%05d] (%s/%s) %s", shard.ID, shard.Name, group, err.Error()) err = nil } } return } func (c *shardControllerDefault) parseTag(tag string) (k, v string) { i := strings.Index(tag, "=") if i < 1 { return tag, "" } return tag[:i], tag[i+1:] } func (c *shardControllerDefault) matchTagFilter(src map[string]string, tags []string) bool { for _, tag := range tags { if len(tag) == 0 { continue } k, v := c.parseTag(tag) if _, ok := src[k]; !ok { // Tag key not present return false } if src[k] != v { // Tag value does not match return false } } return true }
package zongzi import ( "context" "fmt" "sync" "sync/atomic" "time" "github.com/benbjohnson/clock" ) // The controllerManager creates and destroys replicas based on a shard tags. type controllerManager struct { agent *Agent clock clock.Clock ctx context.Context ctxCancel context.CancelFunc index uint64 isLeader atomic.Bool log Logger mutex sync.RWMutex controller Controller wg sync.WaitGroup } func newControllerManager(agent *Agent) *controllerManager { return &controllerManager{ log: agent.log, agent: agent, clock: clock.New(), controller: newShardControllerDefault(agent), } } type Controller interface { Reconcile(*State, Shard, Controls) error } func (c *controllerManager) Start() (err error) { c.mutex.Lock() defer c.mutex.Unlock() c.ctx, c.ctxCancel = context.WithCancel(context.Background()) c.wg.Add(1) go func() { defer c.wg.Done() t := c.clock.Ticker(500 * time.Millisecond) defer t.Stop() for { select { case <-c.ctx.Done(): c.log.Infof("Shard controller manager stopped") return case <-t.C: c.tick() } } }() return } func (c *controllerManager) tick() { c.mutex.Lock() defer c.mutex.Unlock() var err error var hadErr bool var index uint64 var updated = true var controls = newControls(c.agent) if c.isLeader.Load() { for updated { updated = false err = c.agent.State(c.ctx, func(state *State) { index = state.Index() state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool { select { case <-c.ctx.Done(): return false default: } if shard.ID == 0 { return true } controls.updated = false err = c.controller.Reconcile(state, shard, controls) if err != nil { hadErr = true c.log.Warningf("Error resolving shard %d %s %s", shard.ID, shard.Name, err.Error()) c.agent.tagsSet(shard, fmt.Sprintf(`zongzi:controller:error=%s`, err.Error())) } else if _, ok := shard.Tags[`zongzi:controller:error`]; ok { c.agent.tagsRemove(shard, `zongzi:controller:error`) } if controls.updated { // We break the iterator on update in order to catch a fresh snapshot for the next shard. // This ensures that changes applied during reconciliation of this shard will be visible to // reconciliation of the next shard. return false } return true }) updated = controls.updated }) if err != nil { hadErr = true } } } if !hadErr && index > c.index { c.log.Debugf("%s Finished processing %d", c.agent.hostID(), index) // c.agent.dumpState() c.index = index } return } func (c *controllerManager) LeaderUpdated(info LeaderInfo) { c.log.Infof("[%05d:%05d] LeaderUpdated: %05d (term %d)", info.ShardID, info.ReplicaID, info.LeaderID, info.Term) if info.ShardID == 0 { c.isLeader.Store(info.LeaderID == info.ReplicaID) } if c.isLeader.Load() && c.agent.Status() == AgentStatus_Ready && info.LeaderID != 0 { c.agent.shardLeaderSet(info.ShardID, info.LeaderID, info.Term) c.log.Warningf("[%05d:%05d] Leader Set: %d (term %d)", info.ShardID, info.ReplicaID, info.LeaderID, info.Term) } } func (c *controllerManager) Stop() { defer c.log.Infof(`Stopped controllerManager`) if c.ctxCancel != nil { c.ctxCancel() } c.mutex.Lock() defer c.mutex.Unlock() c.index = 0 }
package zongzi type Controls interface { Create(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error) Delete(replicaID uint64) error } func newControls(a *Agent) *controls { return &controls{a, false} } type controls struct { agent *Agent updated bool } func (sc *controls) Create(hostID string, shardID uint64, isNonVoting bool) (id uint64, err error) { id, err = sc.agent.replicaCreate(hostID, shardID, isNonVoting) if err == nil { sc.updated = true } return } func (sc *controls) Delete(replicaID uint64) (err error) { err = sc.agent.replicaDelete(replicaID) if err == nil { sc.updated = true } return }
package zongzi import ( "fmt" "strings" ) type ShardOption func(*Shard) error func WithName(name string) ShardOption { return func(s *Shard) error { s.Name = name return nil } } func WithPlacementMembers(n int, tags ...string) ShardOption { return func(s *Shard) error { s.Tags[`placement:member`] = fmt.Sprintf(`%d;%s`, n, strings.Join(tags, ";")) return nil } } func WithPlacementReplicas(group string, n int, tags ...string) ShardOption { return func(s *Shard) error { s.Tags[`placement:replica:`+group] = fmt.Sprintf(`%d;%s`, n, strings.Join(tags, ";")) return nil } } func WithPlacementVary(tagKeys ...string) ShardOption { return func(s *Shard) error { s.Tags[`placement:vary`] = strings.Join(tagKeys, ";") return nil } } func WithPlacementCover(tagKeys ...string) ShardOption { return func(s *Shard) error { s.Tags[`placement:cover`] = strings.Join(tagKeys, ";") return nil } } func WithTag(k string, v any) ShardOption { return func(s *Shard) error { s.Tags[k] = fmt.Sprintf(`%v`, v) return nil } }
package zongzi import ( "context" "fmt" "io" "github.com/lni/dragonboat/v4/statemachine" ) // StateMachineFactory is a function that returns a StateMachine type StateMachineFactory = func(shardID uint64, replicaID uint64) StateMachine func stateMachineFactoryShim(fn StateMachineFactory) statemachine.CreateConcurrentStateMachineFunc { return func(shardID uint64, replicaID uint64) statemachine.IConcurrentStateMachine { return &stateMachineShim{fn(shardID, replicaID)} } } // StateMachine is a deterministic finite state machine. Snapshots are requested during log compaction to ensure that // the in-memory state can be recovered following a restart. If you expect a dataset larger than memory, a persistent // state machine may be more appropriate. // // Lookup may be called concurrently with Update and SaveSnapshot. It is the caller's responsibility to ensure that // snapshots are generated using snapshot isolation. This can be achieved using Multi Version Concurrency Control // (MVCC). A simple mutex can also be used if blocking writes during read is acceptable. type StateMachine interface { Update(entries []Entry) []Entry Query(ctx context.Context, query []byte) *Result Watch(ctx context.Context, query []byte, result chan<- *Result) PrepareSnapshot() (cursor any, err error) SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error Close() error } var _ statemachine.IConcurrentStateMachine = (*stateMachineShim)(nil) type stateMachineShim struct { sm StateMachine } func (shim *stateMachineShim) Update(entries []Entry) (responses []Entry, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf(`%v`, r) } }() responses = shim.sm.Update(entries) return } func (shim *stateMachineShim) Lookup(query any) (res any, err error) { if q, ok := query.(*lookupQuery); ok { res = shim.sm.Query(q.ctx, q.data) return } if q, ok := query.(*watchQuery); ok { shim.sm.Watch(q.ctx, q.data, q.result) return } return } func (shim *stateMachineShim) PrepareSnapshot() (cursor any, err error) { return shim.sm.PrepareSnapshot() } func (shim *stateMachineShim) SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error { return shim.sm.SaveSnapshot(cursor, w, c, close) } func (shim *stateMachineShim) RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error { return shim.sm.RecoverFromSnapshot(r, f, close) } func (shim *stateMachineShim) Close() error { return shim.sm.Close() }
package zongzi import ( "context" "fmt" "io" "github.com/lni/dragonboat/v4/statemachine" ) // StateMachinePersistentFactory is a function that returns a StateMachinePersistent type StateMachinePersistentFactory = func(shardID uint64, replicaID uint64) StateMachinePersistent func stateMachinePersistentFactoryShim(fn StateMachinePersistentFactory) statemachine.CreateOnDiskStateMachineFunc { return func(shardID uint64, replicaID uint64) statemachine.IOnDiskStateMachine { return &stateMachinePersistentShim{fn(shardID, replicaID)} } } // StateMachinePersistent is a StateMachine where the state is persisted to a medium (such as disk) that can survive // restart. During compaction, calls to Snapshot are replaced with calls to Sync which effectively flushes state to // the persistent medium. SaveSnapshot and RecoverFromSnapshot are used to replicate full on-disk state to new replicas. type StateMachinePersistent interface { Open(stopc <-chan struct{}) (index uint64, err error) Update(entries []Entry) []Entry Query(ctx context.Context, query []byte) *Result Watch(ctx context.Context, query []byte, result chan<- *Result) PrepareSnapshot() (cursor any, err error) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error Sync() error Close() error } var _ statemachine.IOnDiskStateMachine = (*stateMachinePersistentShim)(nil) type stateMachinePersistentShim struct { sm StateMachinePersistent } func (shim *stateMachinePersistentShim) Open(stopc <-chan struct{}) (index uint64, err error) { return shim.sm.Open(stopc) } func (shim *stateMachinePersistentShim) Update(entries []Entry) (responses []Entry, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf(`%v`, r) } }() responses = shim.sm.Update(entries) return } func (shim *stateMachinePersistentShim) Lookup(query any) (res any, err error) { if q, ok := query.(*lookupQuery); ok { res = shim.sm.Query(q.ctx, q.data) return } if q, ok := query.(*watchQuery); ok { shim.sm.Watch(q.ctx, q.data, q.result) return } return } func (shim *stateMachinePersistentShim) PrepareSnapshot() (cursor any, err error) { return shim.sm.PrepareSnapshot() } func (shim *stateMachinePersistentShim) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error { return shim.sm.SaveSnapshot(cursor, w, close) } func (shim *stateMachinePersistentShim) RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error { return shim.sm.RecoverFromSnapshot(r, close) } func (shim *stateMachinePersistentShim) Sync() error { return shim.sm.Sync() } func (shim *stateMachinePersistentShim) Close() error { return shim.sm.Close() }
package zongzi import ( "context" "fmt" "strconv" "sync" "time" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/config" "github.com/lni/dragonboat/v4/logger" "github.com/lni/dragonboat/v4/plugin/tan" "github.com/lni/dragonboat/v4/raftio" "github.com/lni/dragonboat/v4/statemachine" ) const ( projectName = "zongzi" projectUri = "zongzi://github.com/logbn/zongzi" minReplicas = 3 raftTimeout = time.Second joinTimeout = 5 * time.Second waitPeriod = 500 * time.Millisecond ) const ( DefaultGossipAddress = "127.0.0.1:17001" DefaultRaftAddress = "127.0.0.1:17002" DefaultApiAddress = "127.0.0.1:17003" ShardID = 0 ) var ( DefaultHostConfig = HostConfig{ NodeHostDir: "/var/lib/zongzi/raft", RaftAddress: DefaultRaftAddress, RTTMillisecond: 10, WALDir: "/var/lib/zongzi/wal", Expert: config.ExpertConfig{ LogDBFactory: tan.Factory, LogDB: config.GetMediumMemLogDBConfig(), }, } DefaultReplicaConfig = ReplicaConfig{ CheckQuorum: true, CompactionOverhead: 10000, ElectionRTT: 100, EntryCompressionType: config.Snappy, HeartbeatRTT: 10, OrderedConfigChange: true, Quiesce: false, SnapshotCompressionType: config.Snappy, SnapshotEntries: 10000, } ) type ( nodeHostInfoOption = dragonboat.NodeHostInfoOption HostConfig = config.NodeHostConfig ReplicaConfig = config.Config GossipConfig = config.GossipConfig LogDBConfig = config.LogDBConfig EngineConfig = config.EngineConfig LeaderInfo = raftio.LeaderInfo RaftEventListener = raftio.IRaftEventListener SystemEventListener = raftio.ISystemEventListener Entry = statemachine.Entry Result = statemachine.Result SnapshotFile = statemachine.SnapshotFile SnapshotFileCollection = statemachine.ISnapshotFileCollection LogLevel = logger.LogLevel Logger = logger.ILogger AgentStatus string HostStatus string ShardStatus string ReplicaStatus string ) var ( GetLogger = logger.GetLogger GetDefaultEngineConfig = config.GetDefaultEngineConfig ) const ( LogLevelCritical = logger.CRITICAL LogLevelError = logger.ERROR LogLevelWarning = logger.WARNING LogLevelInfo = logger.INFO LogLevelDebug = logger.DEBUG ) const ( AgentStatus_Initializing = AgentStatus("initializing") AgentStatus_Joining = AgentStatus("joining") AgentStatus_Pending = AgentStatus("pending") AgentStatus_Ready = AgentStatus("ready") AgentStatus_Rejoining = AgentStatus("rejoining") AgentStatus_Stopped = AgentStatus("stopped") HostStatus_Active = HostStatus("active") HostStatus_Gone = HostStatus("gone") HostStatus_Missing = HostStatus("missing") HostStatus_New = HostStatus("new") HostStatus_Recovering = HostStatus("recovering") ShardStatus_Active = ShardStatus("active") ShardStatus_Closed = ShardStatus("closed") ShardStatus_Closing = ShardStatus("closing") ShardStatus_New = ShardStatus("new") ShardStatus_Unavailable = ShardStatus("unavailable") ReplicaStatus_Active = ReplicaStatus("active") ReplicaStatus_Closed = ReplicaStatus("closed") ReplicaStatus_Closing = ReplicaStatus("closing") ReplicaStatus_Bootstrapping = ReplicaStatus("bootstrapping") ReplicaStatus_Joining = ReplicaStatus("joining") ReplicaStatus_New = ReplicaStatus("new") ) var ( ErrAborted = dragonboat.ErrAborted ErrCanceled = dragonboat.ErrCanceled ErrRejected = dragonboat.ErrRejected ErrShardClosed = dragonboat.ErrShardClosed ErrShardNotReady = dragonboat.ErrShardNotReady ErrTimeout = dragonboat.ErrTimeout ErrAgentNotReady = fmt.Errorf("Agent not ready") ErrHostNotFound = fmt.Errorf(`Host not found`) ErrIDOutOfRange = fmt.Errorf(`ID out of range`) ErrInvalidFactory = fmt.Errorf(`Invalid Factory`) ErrInvalidGossipAddr = fmt.Errorf(`Invalid gossip address`) ErrReplicaNotActive = fmt.Errorf("Replica not active") ErrReplicaNotAllowed = fmt.Errorf("Replica not allowed") ErrReplicaNotFound = fmt.Errorf("Replica not found") ErrShardExists = fmt.Errorf(`Shard already exists`) ErrShardNotFound = fmt.Errorf(`Shard not found`) ErrInvalidNumberOfArguments = fmt.Errorf(`Invalid number of arguments`) // ErrClusterNameInvalid indicates that the clusterName is invalid // Base36 supports only lowercase alphanumeric characters ErrClusterNameInvalid = fmt.Errorf("Invalid cluster name (base36 maxlen 12)") ClusterNameRegex = `^[a-z0-9]{1,12}$` // ErrNotifyCommitDisabled is logged when non-linearizable writes are requested but disabled. // Set property `NotifyCommit` to `true` in `HostConfig` to add support for non-linearizable writes. ErrNotifyCommitDisabled = fmt.Errorf("Attempted to make a non-linearizable write while NotifyCommit is disabled") ) type ( lookupQuery struct { ctx context.Context data []byte } watchQuery struct { ctx context.Context data []byte result chan *Result } ) func (q *lookupQuery) Release() { q.ctx = nil q.data = q.data[:0] lookupQueryPool.Put(q) } var lookupQueryPool = sync.Pool{New: func() any { return &lookupQuery{} }} func getLookupQuery() *lookupQuery { return lookupQueryPool.Get().(*lookupQuery) } func newLookupQuery(ctx context.Context, data []byte) (q *lookupQuery) { q = lookupQueryPool.Get().(*lookupQuery) q.ctx = ctx q.data = data return q } func (q *watchQuery) Release() { q.ctx = nil q.data = q.data[:0] q.result = nil watchQueryPool.Put(q) } var watchQueryPool = sync.Pool{New: func() any { return &watchQuery{} }} func getWatchQuery() *watchQuery { return watchQueryPool.Get().(*watchQuery) } func newWatchQuery(ctx context.Context, data []byte, result chan *Result) (q *watchQuery) { q = watchQueryPool.Get().(*watchQuery) q.ctx = ctx q.data = data q.result = result return } var resultPool = sync.Pool{New: func() any { return &Result{} }} func ReleaseResult(r *Result) { r.Value = 0 r.Data = r.Data[:0] resultPool.Put(r) } // GetResult can be used to efficiently retrieve an empty Result from a global pool. It is recommended to use this // method to instantiate Result objects returned by Lookup or sent over Watch channels as they will be automatically // returned to the pool to reduce allocation overhead. func GetResult() *Result { return resultPool.Get().(*Result) } var requestStatePool = sync.Pool{New: func() any { return &dragonboat.RequestState{} }} func getRequestState() *dragonboat.RequestState { return requestStatePool.Get().(*dragonboat.RequestState) } func mustBase36Decode(name string) uint64 { id, err := base36Decode(name) if err != nil { panic(err) } return id } func base36Decode(name string) (uint64, error) { return strconv.ParseUint(name, 36, 64) } func base36Encode(id uint64) string { return strconv.FormatUint(id, 36) } type compositeRaftEventListener struct { listeners []raftio.IRaftEventListener } func newCompositeRaftEventListener(listeners ...raftio.IRaftEventListener) raftio.IRaftEventListener { return &compositeRaftEventListener{listeners} } func (c *compositeRaftEventListener) LeaderUpdated(info LeaderInfo) { for _, listener := range c.listeners { if listener != nil { listener.LeaderUpdated(info) } } } // SetLogLevel sets log level for all zongzi and dragonboat loggers. // // Recommend [LogLevelWarning] for production. func SetLogLevel(level LogLevel) { logger.GetLogger("dragonboat").SetLevel(level) logger.GetLogger("gossip").SetLevel(level) logger.GetLogger("grpc").SetLevel(level) logger.GetLogger("logdb").SetLevel(level) logger.GetLogger("raft").SetLevel(level) logger.GetLogger("rsm").SetLevel(level) logger.GetLogger("transport").SetLevel(level) logger.GetLogger("zongzi").SetLevel(level) logger.GetLogger("tan").SetLevel(level) logger.GetLogger("registry").SetLevel(level) logger.GetLogger("config").SetLevel(level) } // SetLogLevelDebug sets a debug log level for most loggers. // but filters out loggers having tons of debug output. func SetLogLevelDebug() { SetLogLevel(logger.DEBUG) logger.GetLogger("dragonboat").SetLevel(logger.WARNING) logger.GetLogger("gossip").SetLevel(logger.ERROR) logger.GetLogger("raft").SetLevel(logger.WARNING) logger.GetLogger("transport").SetLevel(logger.WARNING) } // SetLogLevelProduction sets a good log level for production (gossip logger is a bit noisy). func SetLogLevelProduction() { SetLogLevel(logger.WARNING) logger.GetLogger("gossip").SetLevel(logger.ERROR) } func parseUint64(s string) (uint64, error) { i, err := strconv.Atoi(s) return uint64(i), err } func keys[K comparable, V any](m map[K]V) []K { r := make([]K, 0, len(m)) for k := range m { r = append(r, k) } return r } func sliceContains[T comparable](slice []T, value T) bool { for _, item := range slice { if item == value { return true } } return false } func sliceWithout[T comparable](slice []T, exclude T) []T { var out []T for _, v := range slice { if v != exclude { out = append(out, v) } } return out }