package ctrl import ( "context" "errors" "github.com/JMURv/service-discovery/internal/repo" md "github.com/JMURv/service-discovery/pkg/model" "go.uber.org/zap" "io" ) type ServiceDiscoveryRepo interface { io.Closer ListNames(ctx context.Context) ([]string, error) ListAddrsByName(ctx context.Context, name string) ([]string, error) ListServices(ctx context.Context) ([]md.Service, error) FindServiceByName(ctx context.Context, name string) (string, error) Register(ctx context.Context, name, addr string, svcType md.SvcType) error Deregister(ctx context.Context, name, addr string) error DeactivateSvc(_ context.Context, name, addr string) error ActivateSvc(ctx context.Context, name, addr string) error } type Controller struct { repo ServiceDiscoveryRepo newAddrChan chan md.Service } func New(repo ServiceDiscoveryRepo, newAddrChan chan md.Service) *Controller { return &Controller{ repo: repo, newAddrChan: newAddrChan, } } func (c *Controller) ListNames(ctx context.Context) ([]string, error) { names, err := c.repo.ListNames(ctx) if err != nil { zap.L().Error("Error finding svcs", zap.Error(err)) return nil, err } return names, nil } func (c *Controller) ListAddrsByName(ctx context.Context, name string) ([]string, error) { svcs, err := c.repo.ListAddrsByName(ctx, name) if err != nil && errors.Is(err, repo.ErrNotFound) { zap.L().Debug("Error svc not registered") return []string{}, ErrNotFound } else if err != nil { zap.L().Error( "Error finding list of addrs", zap.Error(err), zap.String("name", name), ) return []string{}, err } return svcs, nil } func (c *Controller) ListServices(ctx context.Context) ([]md.Service, error) { svcs, err := c.repo.ListServices(ctx) if err != nil { zap.L().Error("Error finding svcs", zap.Error(err)) return nil, err } return svcs, nil } func (c *Controller) FindServiceByName(ctx context.Context, name string) (string, error) { addr, err := c.repo.FindServiceByName(ctx, name) if err != nil && errors.Is(err, repo.ErrNotFound) { zap.L().Debug( "Error svc not registered", zap.String("name", name), ) return "", ErrNotFound } else if err != nil { zap.L().Error( "Error finding svc", zap.String("name", name), zap.Error(err), ) return "", err } return addr, nil } func (c *Controller) Register(ctx context.Context, name, addr string, svcType md.SvcType) error { if err := c.repo.Register(ctx, name, addr, svcType); err != nil && errors.Is(err, repo.ErrAlreadyExists) { zap.L().Debug( "Error svc already registered", zap.String("name", name), zap.String("address", addr), ) return ErrAlreadyExists } else if err != nil { zap.L().Error( "Error registering svc", zap.String("name", name), zap.String("address", addr), zap.Error(err), ) return err } c.newAddrChan <- md.Service{Name: name, Address: addr, SvcType: svcType} zap.L().Debug( "Registered svc", zap.String("name", name), zap.String("address", addr), ) return nil } func (c *Controller) Deregister(ctx context.Context, name, addr string) error { err := c.repo.Deregister(ctx, name, addr) if err != nil && errors.Is(err, repo.ErrNotFound) { zap.L().Debug( "Error svc not registered", zap.String("name", name), zap.String("address", addr), ) return ErrNotFound } else if err != nil { zap.L().Error( "Error deregistering svc", zap.String("name", name), zap.String("address", addr), zap.Error(err), ) return err } return nil }
package grpc import ( "context" "errors" "fmt" pb "github.com/JMURv/service-discovery/api/pb" "github.com/JMURv/service-discovery/internal/ctrl" md "github.com/JMURv/service-discovery/pkg/model" "github.com/JMURv/service-discovery/pkg/model/mapper" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" "net" ) type Ctrl interface { ListNames(ctx context.Context) ([]string, error) ListAddrsByName(ctx context.Context, name string) ([]string, error) ListServices(ctx context.Context) ([]md.Service, error) FindServiceByName(ctx context.Context, name string) (string, error) Register(ctx context.Context, name, addr string, svcType md.SvcType) error Deregister(ctx context.Context, name, addr string) error } type Handler struct { pb.ServiceDiscoveryServer srv *grpc.Server ctrl Ctrl } func New(ctrl Ctrl) *Handler { srv := grpc.NewServer() reflection.Register(srv) return &Handler{ ctrl: ctrl, srv: srv, } } func (h *Handler) Start(port int) { pb.RegisterServiceDiscoveryServer(h.srv, h) lis, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) if err != nil { zap.L().Fatal("failed to listen", zap.Error(err)) } if err = h.srv.Serve(lis); err != nil && !errors.Is(err, grpc.ErrServerStopped) { zap.L().Fatal("failed to serve", zap.Error(err)) } } func (h *Handler) Close() error { h.srv.GracefulStop() return nil } func (h *Handler) ListNames(ctx context.Context, _ *pb.Empty) (*pb.ListNamesMsg, error) { res, err := h.ctrl.ListNames(ctx) if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.ListNamesMsg{ Name: res, }, nil } func (h *Handler) ListAddrsByName(ctx context.Context, req *pb.ServiceNameMsg) (*pb.ListAddrsMsg, error) { if req == nil || req.Name == "" { zap.L().Error("failed to decode request") return nil, status.Errorf(codes.InvalidArgument, ctrl.ErrDecodeRequest.Error()) } res, err := h.ctrl.ListAddrsByName(ctx, req.Name) if err != nil && errors.Is(err, ctrl.ErrNotFound) { return nil, status.Errorf(codes.NotFound, err.Error()) } else if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.ListAddrsMsg{ Address: res, }, nil } func (h *Handler) ListServices(ctx context.Context, _ *pb.Empty) (*pb.ListServiceMsg, error) { res, err := h.ctrl.ListServices(ctx) if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.ListServiceMsg{ Service: mapper.ListSvcToProto(res), }, nil } func (h *Handler) Register(ctx context.Context, req *pb.RegisterMsg) (*pb.Empty, error) { if req == nil || req.Name == "" || req.Address == "" || req.Type == "" { zap.L().Error("failed to decode request") return nil, status.Errorf(codes.InvalidArgument, ctrl.ErrDecodeRequest.Error()) } err := h.ctrl.Register(ctx, req.Name, req.Address, md.SvcType(req.Type)) if err != nil && errors.Is(err, ctrl.ErrAlreadyExists) { return nil, status.Errorf(codes.AlreadyExists, err.Error()) } else if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.Empty{}, nil } func (h *Handler) Deregister(ctx context.Context, req *pb.NameAndAddressMsg) (*pb.Empty, error) { if req == nil || req.Name == "" || req.Address == "" { zap.L().Error("failed to decode request") return nil, status.Errorf(codes.InvalidArgument, ctrl.ErrDecodeRequest.Error()) } err := h.ctrl.Deregister(ctx, req.Name, req.Address) if err != nil && errors.Is(err, ctrl.ErrNotFound) { return nil, status.Errorf(codes.NotFound, err.Error()) } else if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.Empty{}, nil } func (h *Handler) FindServiceByName(ctx context.Context, req *pb.ServiceNameMsg) (*pb.ServiceAddressMsg, error) { if req == nil || req.Name == "" { zap.L().Error("failed to decode request") return nil, status.Errorf(codes.InvalidArgument, ctrl.ErrDecodeRequest.Error()) } res, err := h.ctrl.FindServiceByName(ctx, req.Name) if err != nil && errors.Is(err, ctrl.ErrNotFound) { return nil, status.Errorf(codes.NotFound, err.Error()) } else if err != nil { return nil, status.Errorf(codes.Internal, ctrl.ErrInternalError.Error()) } return &pb.ServiceAddressMsg{ Address: res, }, nil }
package http import ( "context" "errors" "fmt" "github.com/JMURv/service-discovery/internal/ctrl" "github.com/JMURv/service-discovery/internal/hdl/grpc" "github.com/JMURv/service-discovery/internal/validation" md "github.com/JMURv/service-discovery/pkg/model" utils "github.com/JMURv/service-discovery/pkg/utils/http" "github.com/goccy/go-json" "github.com/gorilla/mux" "go.uber.org/zap" "net/http" "time" ) type Handler struct { srv *http.Server ctrl grpc.Ctrl } func New(ctrl grpc.Ctrl) *Handler { return &Handler{ ctrl: ctrl, } } func (h *Handler) Start(port int) { r := mux.NewRouter() r.HandleFunc("/health-check", h.healthCheck).Methods(http.MethodGet) r.HandleFunc("/register", h.register).Methods(http.MethodPost) r.HandleFunc("/deregister", h.deregister).Methods(http.MethodPost) r.HandleFunc("/find", h.find).Methods(http.MethodPost) r.HandleFunc("/list-svcs", h.listSvcs).Methods(http.MethodGet) r.HandleFunc("/list-addrs", h.ListAddrsByName).Methods(http.MethodPost) h.srv = &http.Server{ Handler: r, Addr: fmt.Sprintf(":%v", port), WriteTimeout: 15 * time.Second, ReadTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } err := h.srv.ListenAndServe() if err != nil && err != http.ErrServerClosed { zap.L().Debug("Server error", zap.Error(err)) } } func (h *Handler) Close() error { if err := h.srv.Shutdown(context.Background()); err != nil { return err } return nil } func (h *Handler) healthCheck(w http.ResponseWriter, r *http.Request) { utils.SuccessResponse(w, http.StatusOK, "OK") } func (h *Handler) listSvcs(w http.ResponseWriter, r *http.Request) { svcs, err := h.ctrl.ListServices(r.Context()) if err != nil && errors.Is(err, ctrl.ErrAlreadyExists) { utils.ErrResponse(w, http.StatusConflict, err) return } else if err != nil { utils.ErrResponse(w, http.StatusInternalServerError, err) return } utils.SuccessResponse(w, http.StatusOK, svcs) } func (h *Handler) ListAddrsByName(w http.ResponseWriter, r *http.Request) { req := &md.Service{} if err := json.NewDecoder(r.Body).Decode(req); err != nil { zap.L().Debug("failed to decode request", zap.Error(err)) utils.ErrResponse(w, http.StatusBadRequest, err) return } if req.Name == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingName)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingName) return } svcs, err := h.ctrl.ListAddrsByName(r.Context(), req.Name) if err != nil && errors.Is(err, ctrl.ErrAlreadyExists) { utils.ErrResponse(w, http.StatusConflict, err) return } else if err != nil { utils.ErrResponse(w, http.StatusInternalServerError, err) return } utils.SuccessResponse(w, http.StatusOK, svcs) } func (h *Handler) register(w http.ResponseWriter, r *http.Request) { req := &md.Service{} if err := json.NewDecoder(r.Body).Decode(req); err != nil { zap.L().Debug("failed to decode request", zap.Error(err)) utils.ErrResponse(w, http.StatusBadRequest, err) return } if req.Name == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingName)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingName) return } else if req.Address == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingAddress)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingAddress) return } err := h.ctrl.Register(r.Context(), req.Name, req.Address, req.SvcType) if err != nil && errors.Is(err, ctrl.ErrAlreadyExists) { utils.ErrResponse(w, http.StatusConflict, err) return } else if err != nil { utils.ErrResponse(w, http.StatusInternalServerError, ctrl.ErrInternalError) return } utils.SuccessResponse(w, http.StatusCreated, "OK") } func (h *Handler) deregister(w http.ResponseWriter, r *http.Request) { req := &md.Service{} if err := json.NewDecoder(r.Body).Decode(req); err != nil { zap.L().Debug("failed to decode request", zap.Error(err)) utils.ErrResponse(w, http.StatusBadRequest, err) return } if req.Name == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingName)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingName) return } else if req.Address == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingAddress)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingAddress) return } err := h.ctrl.Deregister(r.Context(), req.Name, req.Address) if err != nil && errors.Is(err, ctrl.ErrNotFound) { utils.ErrResponse(w, http.StatusNotFound, err) return } else if err != nil { utils.ErrResponse(w, http.StatusInternalServerError, err) return } utils.SuccessResponse(w, http.StatusOK, "OK") } func (h *Handler) find(w http.ResponseWriter, r *http.Request) { req := &md.Service{} if err := json.NewDecoder(r.Body).Decode(req); err != nil { zap.L().Debug("failed to decode request", zap.Error(err)) utils.ErrResponse(w, http.StatusBadRequest, err) return } if req.Name == "" { zap.L().Debug("failed to decode request", zap.Error(validation.ErrMissingName)) utils.ErrResponse(w, http.StatusBadRequest, validation.ErrMissingName) return } res, err := h.ctrl.FindServiceByName(r.Context(), req.Name) if err != nil && errors.Is(err, ctrl.ErrNotFound) { utils.ErrResponse(w, http.StatusNotFound, err) return } else if err != nil { utils.ErrResponse(w, http.StatusInternalServerError, err) return } utils.SuccessResponse(w, http.StatusOK, res) }
package memory import ( "context" "github.com/JMURv/service-discovery/internal/repo" md "github.com/JMURv/service-discovery/pkg/model" "sync" ) type Repository struct { sync.RWMutex services []md.Service rrIndex map[string]int } func New() *Repository { return &Repository{ services: make([]md.Service, 0, 10), rrIndex: make(map[string]int), } } func (r *Repository) Close() error { r.Lock() defer r.Unlock() r.services = nil r.rrIndex = nil return nil } func (r *Repository) ListNames(_ context.Context) ([]string, error) { r.RLock() defer r.RUnlock() names := make([]string, 0, len(r.services)) for _, svc := range r.services { if svc.IsActive { names = append(names, svc.Name) } } if len(names) == 0 { return []string{}, repo.ErrNotFound } return names, nil } func (r *Repository) ListAddrsByName(_ context.Context, name string) ([]string, error) { r.RLock() defer r.RUnlock() var addrs []string for _, svc := range r.services { if svc.Name == name && svc.IsActive { addrs = append(addrs, svc.Address) } } if len(addrs) == 0 { return []string{}, repo.ErrNotFound } return addrs, nil } func (r *Repository) ListServices(_ context.Context) ([]md.Service, error) { r.RLock() defer r.RUnlock() res := make([]md.Service, 0, len(r.services)) for _, svc := range r.services { res = append(res, svc) } if len(res) == 0 { return []md.Service{}, repo.ErrNotFound } return res, nil } func (r *Repository) FindServiceByName(_ context.Context, name string) (string, error) { r.RLock() defer r.RUnlock() var availableServices []md.Service for _, svc := range r.services { if svc.Name == name && svc.IsActive { availableServices = append(availableServices, svc) } } if len(availableServices) == 0 { return "", repo.ErrNotFound } currentIndex := r.rrIndex[name] selectedSvc := availableServices[currentIndex] r.rrIndex[name] = (currentIndex + 1) % len(availableServices) return selectedSvc.Address, nil } func (r *Repository) Register(_ context.Context, name, addr string, svcType md.SvcType) error { r.Lock() defer r.Unlock() for _, registered := range r.services { if registered.Address == addr { return repo.ErrAlreadyExists } } r.services = append(r.services, md.Service{Name: name, Address: addr, SvcType: svcType, IsActive: true}) return nil } func (r *Repository) Deregister(_ context.Context, name, addr string) error { r.Lock() defer r.Unlock() for i, v := range r.services { if v.Name == name && v.Address == addr { r.services = append(r.services[:i], r.services[i+1:]...) delete(r.rrIndex, name) return nil } } return repo.ErrNotFound } func (r *Repository) DeactivateSvc(_ context.Context, name, addr string) error { r.Lock() defer r.Unlock() for i, svc := range r.services { if svc.Name == name && svc.Address == addr { r.services[i].IsActive = false return nil } } return repo.ErrNotFound } func (r *Repository) ActivateSvc(_ context.Context, name, addr string) error { r.Lock() defer r.Unlock() for i, svc := range r.services { if svc.Name == name && svc.Address == addr { r.services[i].IsActive = true return nil } } return repo.ErrNotFound }