// Package staticlint - составной стат-анализатор для анализа кода по множеству правил. // Включает в себя: // * все стандартные статические анализаторы пакета golang.org/x/tools/go/analysis/passes; // * все анализаторы класса SA пакета staticcheck.io; // * анализатор U1000 пакета staticcheck.io; // * github.com/tdakkota/asciicheck; // * github.com/timakin/bodyclose/passes/bodyclose; // * и анализатор использования os.Exit в main функции пакета main; // Использование: // ./staticlint [fileSelector] // Для помощи, набрать ./staticlint -help package main import ( "strings" "github.com/kosalnik/metrics/cmd/staticlint/passes/exitonmain" "github.com/tdakkota/asciicheck" "github.com/timakin/bodyclose/passes/bodyclose" "golang.org/x/tools/go/analysis" "golang.org/x/tools/go/analysis/multichecker" "golang.org/x/tools/go/analysis/passes/appends" "golang.org/x/tools/go/analysis/passes/asmdecl" "golang.org/x/tools/go/analysis/passes/assign" "golang.org/x/tools/go/analysis/passes/atomic" "golang.org/x/tools/go/analysis/passes/atomicalign" "golang.org/x/tools/go/analysis/passes/bools" "golang.org/x/tools/go/analysis/passes/buildssa" "golang.org/x/tools/go/analysis/passes/buildtag" "golang.org/x/tools/go/analysis/passes/cgocall" "golang.org/x/tools/go/analysis/passes/composite" "golang.org/x/tools/go/analysis/passes/copylock" "golang.org/x/tools/go/analysis/passes/ctrlflow" "golang.org/x/tools/go/analysis/passes/deepequalerrors" "golang.org/x/tools/go/analysis/passes/defers" "golang.org/x/tools/go/analysis/passes/directive" "golang.org/x/tools/go/analysis/passes/errorsas" "golang.org/x/tools/go/analysis/passes/fieldalignment" "golang.org/x/tools/go/analysis/passes/findcall" "golang.org/x/tools/go/analysis/passes/framepointer" "golang.org/x/tools/go/analysis/passes/httpmux" "golang.org/x/tools/go/analysis/passes/httpresponse" "golang.org/x/tools/go/analysis/passes/ifaceassert" "golang.org/x/tools/go/analysis/passes/inspect" "golang.org/x/tools/go/analysis/passes/loopclosure" "golang.org/x/tools/go/analysis/passes/lostcancel" "golang.org/x/tools/go/analysis/passes/nilfunc" "golang.org/x/tools/go/analysis/passes/nilness" "golang.org/x/tools/go/analysis/passes/pkgfact" "golang.org/x/tools/go/analysis/passes/printf" "golang.org/x/tools/go/analysis/passes/reflectvaluecompare" "golang.org/x/tools/go/analysis/passes/shadow" "golang.org/x/tools/go/analysis/passes/shift" "golang.org/x/tools/go/analysis/passes/sigchanyzer" "golang.org/x/tools/go/analysis/passes/slog" "golang.org/x/tools/go/analysis/passes/sortslice" "golang.org/x/tools/go/analysis/passes/stdmethods" "golang.org/x/tools/go/analysis/passes/stdversion" "golang.org/x/tools/go/analysis/passes/stringintconv" "golang.org/x/tools/go/analysis/passes/structtag" "golang.org/x/tools/go/analysis/passes/testinggoroutine" "golang.org/x/tools/go/analysis/passes/tests" "golang.org/x/tools/go/analysis/passes/timeformat" "golang.org/x/tools/go/analysis/passes/unmarshal" "golang.org/x/tools/go/analysis/passes/unreachable" "golang.org/x/tools/go/analysis/passes/unsafeptr" "golang.org/x/tools/go/analysis/passes/unusedresult" "golang.org/x/tools/go/analysis/passes/unusedwrite" "golang.org/x/tools/go/analysis/passes/usesgenerics" "honnef.co/go/tools/staticcheck" ) func main() { analyzers := append(standardAnalyzers(), staticCheckAnalyzers()...) analyzers = append(analyzers, bodyclose.Analyzer) analyzers = append(analyzers, asciicheck.NewAnalyzer()) analyzers = append(analyzers, exitonmain.ExitOnMainAnalyzer) multichecker.Main(analyzers...) } func standardAnalyzers() []*analysis.Analyzer { return []*analysis.Analyzer{ appends.Analyzer, asmdecl.Analyzer, assign.Analyzer, atomic.Analyzer, atomicalign.Analyzer, bools.Analyzer, buildssa.Analyzer, buildtag.Analyzer, cgocall.Analyzer, composite.Analyzer, copylock.Analyzer, ctrlflow.Analyzer, deepequalerrors.Analyzer, defers.Analyzer, directive.Analyzer, errorsas.Analyzer, fieldalignment.Analyzer, findcall.Analyzer, framepointer.Analyzer, httpmux.Analyzer, httpresponse.Analyzer, ifaceassert.Analyzer, inspect.Analyzer, loopclosure.Analyzer, lostcancel.Analyzer, nilfunc.Analyzer, nilness.Analyzer, pkgfact.Analyzer, printf.Analyzer, reflectvaluecompare.Analyzer, shadow.Analyzer, shift.Analyzer, sigchanyzer.Analyzer, slog.Analyzer, sortslice.Analyzer, stdmethods.Analyzer, stdversion.Analyzer, stringintconv.Analyzer, structtag.Analyzer, testinggoroutine.Analyzer, tests.Analyzer, timeformat.Analyzer, unmarshal.Analyzer, unreachable.Analyzer, unsafeptr.Analyzer, unusedresult.Analyzer, unusedwrite.Analyzer, usesgenerics.Analyzer, } } func staticCheckAnalyzers() []*analysis.Analyzer { var checks []*analysis.Analyzer for _, v := range staticcheck.Analyzers { if strings.HasPrefix(v.Analyzer.Name, "SA") || v.Analyzer.Name == "U1000" { checks = append(checks, v.Analyzer) } } return checks }
// Package exitonmain содержит анализатор, который запрещает использовать os.Exit() в методе main пакета main. // Он не содержит никаких флагов. Работает как есть. package exitonmain import ( "go/ast" "go/types" "golang.org/x/tools/go/analysis" "golang.org/x/tools/go/types/typeutil" ) var ExitOnMainAnalyzer = &analysis.Analyzer{ Name: "exitonmain", Doc: "check for call os.Exit() on func main() in packet main", Run: run, } func run(pass *analysis.Pass) (any, error) { isExit := func(v *ast.CallExpr) bool { return IsFunctionNamed(typeutil.StaticCallee(pass.TypesInfo, v), "os", "Exit") } if pass.Pkg.Name() != "main" { return nil, nil } for _, file := range pass.Files { inMain := 0 ast.Inspect(file, func(node ast.Node) bool { if inMain == 0 { switch x := node.(type) { case *ast.FuncDecl: if x.Name.Name == "main" { inMain++ } else { return false } } return true } if node == nil { inMain-- return true } inMain++ switch x := node.(type) { case *ast.ExprStmt: if call, ok := x.X.(*ast.CallExpr); ok { if isExit(call) { pass.Reportf(call.Pos(), `Call os.Exit on function main of package main`) } } case *ast.DeferStmt: if isExit(x.Call) { pass.Reportf(x.Call.Pos(), `Call os.Exit on function main of package main`) } case *ast.GoStmt: if isExit(x.Call) { pass.Reportf(x.Call.Pos(), `Call os.Exit on function main of package main`) } } return true }) } return nil, nil } func IsFunctionNamed(f *types.Func, pkgPath string, names ...string) bool { if f == nil { return false } if f.Pkg() == nil || f.Pkg().Path() != pkgPath { return false } if f.Type().(*types.Signature).Recv() != nil { return false } for _, n := range names { if f.Name() == n { return true } } return false }
// Package client содержит реализацию клиента к коллектору метрик. // Создаётся с использованием NewClient(). Запускается методом Run(). // При старте запускает два параллельных цикла. Один собирает метрики раз в PoolInterval секунд. // Второй цикл отсылает собранные в последний раз метрики коллектору раз в ReportInterval секунд. package client import ( "context" "math/rand" "sync" "time" "github.com/kosalnik/metrics/internal/config" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/metric" "github.com/kosalnik/metrics/internal/models" "golang.org/x/sync/errgroup" ) type Client struct { sender Sender config *config.Agent gauge map[string]float64 pollCount int64 mu sync.Mutex } func NewClient(ctx context.Context, config *config.Agent) *Client { var sender Sender if config.GRPC { log.Info().Msg("Activate GRPC Collector") sender = NewGRPCSender(ctx, config.CollectorAddress) } else { log.Info().Msg("Activate REST Collector") sender = NewSenderRest(config) } return &Client{ config: config, sender: NewSenderPool( ctx, sender, int(config.RateLimit), ), } } func (c *Client) Run(ctx context.Context) error { g := errgroup.Group{} g.Go(func() error { return c.RunMetricsSender(ctx) }) g.Go(func() error { return c.RunMetricsPoller(ctx) }) return g.Wait() } // RunMetricsSender Запустить процесс периодической отправки метрик в коллектор func (c *Client) RunMetricsSender(ctx context.Context) error { log.Info(). Int64("Report interval", c.config.ReportInterval). Str("Collector address", c.config.CollectorAddress). Msg("Running agent") return c.push(ctx) } // RunMetricsPoller Запустить процесс периодического сбора метрик func (c *Client) RunMetricsPoller(ctx context.Context) error { log.Info(). Int64("Poll interval", c.config.PollInterval). Msg("Running metrics poll process") return c.poll(ctx) } func (c *Client) Shutdown(ctx context.Context) { log.Info().Msg("Shutdown service Client") if err := c.sender.SendBatch(ctx, c.collectMetrics()); err != nil { log.Error().Err(err).Msg("fail push") } log.Info().Msg("Shutdown service Client completed") } func (c *Client) push(ctx context.Context) error { tick := time.NewTicker(time.Duration(c.config.ReportInterval) * time.Second) defer tick.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-tick.C: log.Info().Msg("Push") if err := c.sender.SendBatch(ctx, c.collectMetrics()); err != nil { log.Error().Err(err).Msg("fail push") } } } } func (c *Client) collectMetrics() []models.Metrics { c.mu.Lock() defer c.mu.Unlock() list := make([]models.Metrics, len(c.gauge)+2) i := 0 if c.gauge != nil { for k, v := range c.gauge { kk := k vv := v list[i] = models.Metrics{ID: kk, MType: models.MGauge, Value: vv} i++ } } vv := c.pollCount list[i] = models.Metrics{ID: "PollCount", MType: models.MCounter, Delta: vv} rv := rand.Float64() list[i+1] = models.Metrics{ID: "RandomValue", MType: models.MGauge, Value: rv} return list } func (c *Client) poll(ctx context.Context) error { tick := time.NewTicker(time.Duration(c.config.PollInterval) * time.Second) defer tick.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-tick.C: if err := c.pollMetrics(ctx); err != nil { log.Error().Err(err).Msg("poll error") } } } } func (c *Client) pollMetrics(ctx context.Context) error { var err error c.mu.Lock() defer c.mu.Unlock() c.gauge, err = metric.GetMetrics(ctx) if err != nil { return err } c.pollCount = c.pollCount + 1 log.Debug().Int64("count", c.pollCount).Msg("PollCount") return nil }
package client import ( "io" "net/http" ) type HttpClientWrapper struct { origin *http.Client mutators []Mutator } var _ HttpSender = &HttpClientWrapper{} type HttpClientWrapperOpt = func(c *HttpClientWrapper) func NewHttpClient(opts ...HttpClientWrapperOpt) *HttpClientWrapper { c := &HttpClientWrapper{ origin: &http.Client{}, } for _, opt := range opts { opt(c) } return c } func (h HttpClientWrapper) Do(req *http.Request) (*http.Response, error) { req, err := h.applyMutators(req) if err != nil { return nil, err } return h.origin.Do(req) } func (h HttpClientWrapper) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) { req, err := http.NewRequest("POST", url, body) if err != nil { return nil, err } req.Header.Set("Content-Type", contentType) return h.Do(req) } func (h HttpClientWrapper) applyMutators(req *http.Request) (*http.Request, error) { var err error for _, m := range h.mutators { req, err = m(req) if err != nil { return nil, err } } return req, nil } type Mutator func(req *http.Request) (*http.Request, error) func WithMutators(m ...Mutator) HttpClientWrapperOpt { return func(c *HttpClientWrapper) { c.mutators = m } }
package client import ( "context" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" pb "github.com/kosalnik/metrics/pkg/metrics" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type GRPCSender struct { client pb.MetricsClient } func NewGRPCSender(ctx context.Context, addr string) *GRPCSender { var conn *grpc.ClientConn var err error log.Info().Str("addr", addr).Msg("Dial grpc client") conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal().Err(err).Msg("fail grpc connect") } go func() { defer conn.Close() <-ctx.Done() }() return &GRPCSender{client: pb.NewMetricsClient(conn)} } func (g *GRPCSender) SendGauge(k string, v float64) { res, err := g.client.AddGauge(context.Background(), &pb.MetricsItem{Value: &v, Type: pb.MType_GAUGE, Id: k}) if err != nil { log.Error().Str("key", k).Float64("val", v).Err(err).Msg("send gauge. error") } if res.Error != "" { log.Error().Str("key", k).Float64("val", v).Str("err", res.Error).Msg("send gauge. response") } } func (g *GRPCSender) SendCounter(k string, v int64) { res, err := g.client.AddCounter(context.Background(), &pb.MetricsItem{Delta: &v, Type: pb.MType_GAUGE, Id: k}) if err != nil { log.Error().Str("key", k).Int64("val", v).Err(err).Msg("send counter. error") } if res.Error != "" { log.Error().Str("key", k).Int64("val", v).Str("err", res.Error).Msg("send counter. response") } } func (g *GRPCSender) SendBatch(ctx context.Context, list []models.Metrics) error { req := pb.MetricsList{} for i := range list { v := list[i] switch v.MType { case models.MGauge: req.Items = append(req.Items, &pb.MetricsItem{Value: &v.Value, Id: v.ID, Type: pb.MType_GAUGE}) case models.MCounter: req.Items = append(req.Items, &pb.MetricsItem{Delta: &v.Delta, Id: v.ID, Type: pb.MType_COUNTER}) } } if len(req.Items) == 0 { return nil } res, err := g.client.AddBatch(ctx, &req) if err != nil { log.Error().Any("data", list).Err(err).Msg("send batch. error") return err } if res.Error != "" { log.Error().Any("data", list).Str("err", res.Error).Msg("send batch. response") } return nil } var _ Sender = &GRPCSender{}
package client import ( "context" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" ) type SenderPool struct { client Sender jobs chan func() } var _ Sender = &SenderPool{} func NewSenderPool(ctx context.Context, client Sender, num int) *SenderPool { p := &SenderPool{ client: client, jobs: make(chan func()), } for w := 1; w <= int(num); w++ { log.Info().Int("id", w).Msg("Start worker") go p.worker(p.jobs) } go func() { <-ctx.Done() close(p.jobs) }() return p } func (p *SenderPool) Shutdown(_ context.Context) { } func (p *SenderPool) SendGauge(k string, v float64) { p.jobs <- func() { p.client.SendGauge(k, v) } } func (p *SenderPool) SendCounter(k string, v int64) { p.jobs <- func() { p.client.SendCounter(k, v) } } func (p *SenderPool) SendBatch(ctx context.Context, list []models.Metrics) error { log.Debug().Msg("Push job") p.jobs <- func() { if err := p.client.SendBatch(ctx, list); err != nil { log.Error().Err(err).Msg("Send batch failed") } } return nil } func (*SenderPool) worker(jobs <-chan func()) { for f := range jobs { log.Debug().Msg("worker receive job") f() } log.Info().Msg("Stop worker") }
package client import ( "bytes" "context" "crypto/rand" "encoding/json" "fmt" "net/http" "github.com/kosalnik/metrics/internal/crypt" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/config" "github.com/kosalnik/metrics/internal/models" ) type SenderRest struct { client *HttpClientWrapper config *config.Agent } func NewSenderRest(config *config.Agent) Sender { mutators := []Mutator{ SetRealIPMutator(), crypt.NewSignMutator([]byte(config.Hash.Key)), } if config.PublicKey != nil { mutators = append(mutators, crypt.NewCipherRequestMutator(crypt.NewEncoder(config.PublicKey, rand.Reader))) } return &SenderRest{ client: NewHttpClient(WithMutators(mutators...)), config: config, } } func (c *SenderRest) SendGauge(k string, v float64) { m := models.Metrics{ ID: k, MType: models.MGauge, Value: v, } data, err := json.Marshal(m) if err != nil { log.Error().Str("key", k).Float64("val", v).Err(err).Msg("send gauge. fail marshal") return } body := bytes.NewReader(data) url := fmt.Sprintf("http://%s/update/", c.config.CollectorAddress) req, err := http.NewRequest(http.MethodPost, url, body) if err != nil { log.Error().Str("key", k).Float64("val", v).Err(err).Msg("send gauge. fail make request") return } req.Header.Set("Accept-Encoding", "gzip, deflate") req.Header.Set("Content-Type", "application/json") r, err := c.client.Do(req) if err != nil { log.Error().Str("key", k).Float64("val", v).Err(err).Msg("send gauge. fail post") return } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body.") } }() } func (c *SenderRest) SendCounter(k string, v int64) { vv := float64(v) m := models.Metrics{ ID: k, MType: models.MCounter, Delta: v, Value: vv, } data, err := json.Marshal(m) if err != nil { log.Error().Str("key", k).Int64("val", v).Err(err).Msg("send counter. fail marshal") return } body := bytes.NewReader(data) url := fmt.Sprintf("http://%s/update/", c.config.CollectorAddress) req, err := http.NewRequest(http.MethodPost, url, body) if err != nil { log.Error().Str("key", k).Int64("val", v).Err(err).Msg("send gauge. fail make request") return } req.Header.Set("Accept-Encoding", "gzip, deflate") req.Header.Set("Content-Type", "application/json") r, err := c.client.Do(req) log.Info().Str("url", url).Str("body", string(data)).Msg("send counter") if err != nil { log.Error().Err(err).Msg("Fail push") return } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body") } }() } func (c *SenderRest) SendBatch(ctx context.Context, list []models.Metrics) error { if len(list) == 0 { return nil } data, err := json.Marshal(list) if err != nil { log.Error().Any("list", list).Err(err).Msg("fail send batch. fail marshal") return err } body := bytes.NewReader(data) url := fmt.Sprintf("http://%s/updates/", c.config.CollectorAddress) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) if err != nil { log.Error().Err(err).Msg("send batch. fail make request") return err } req.Header.Set("Accept-Encoding", "gzip, deflate") req.Header.Set("Content-Type", "application/json") r, err := c.client.Do(req) log.Info().Str("url", url).Str("body", string(data)).Msg("send counter") if err != nil { log.Error().Err(err).Msg("Fail push") return err } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body") } }() return nil }
package client import ( "bytes" "context" "encoding/json" "fmt" "net/http" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" "github.com/kosalnik/metrics/internal/config" ) type SenderSimple struct { client *http.Client config *config.Agent } func NewSenderSimple(config *config.Agent) Sender { return &SenderSimple{ client: http.DefaultClient, config: config, } } func (c *SenderSimple) SendGauge(k string, v float64) { r, err := c.client.Post(fmt.Sprintf("http://%s/update/gauge/%s/%v", c.config.CollectorAddress, k, v), "text/plain", nil) if err != nil { log.Error().Err(err).Msg("fail push") return } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body") } }() } func (c *SenderSimple) SendCounter(k string, v int64) { r, err := c.client.Post(fmt.Sprintf("http://%s/update/counter/%s/%v", c.config.CollectorAddress, k, v), "text/plain", nil) if err != nil { log.Error().Err(err).Msg("Fail push") return } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body") } }() } func (c *SenderSimple) SendBatch(ctx context.Context, list []models.Metrics) error { if len(list) == 0 { return nil } data, err := json.Marshal(list) if err != nil { log.Error().Any("list", list).Err(err).Msg("fail send batch. fail marshal") return err } body := bytes.NewReader(data) url := fmt.Sprintf("http://%s/updates/", c.config.CollectorAddress) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) if err != nil { log.Error().Err(err).Msg("send batch. fail make request") return err } req.Header.Set("Accept-Encoding", "gzip, deflate") req.Header.Set("Content-Type", "application/json") r, err := c.client.Do(req) log.Info().Str("url", url).Str("body", string(data)).Msg("send counter") if err != nil { log.Error().Err(err).Msg("Fail push") return err } defer func() { if err := r.Body.Close(); err != nil { log.Error().Err(err).Msg("fail close body") } }() return nil }
package client import ( "net/http" "github.com/kosalnik/metrics/internal/util" ) var xRealIP = http.CanonicalHeaderKey("X-Real-IP") func SetRealIPMutator() func(r *http.Request) (*http.Request, error) { return func(request *http.Request) (*http.Request, error) { myIP, err := util.GetMyHostIP() if err != nil { return nil, err } request.Header.Set(xRealIP, myIP.String()) return request, nil } }
package backup import ( "context" "encoding/json" "os" "github.com/kosalnik/metrics/internal/storage" ) // Dump - Тип выполняет сохранение содержимого хранилища на диск. type Dump struct { storage storage.Dumper path string } // NewDump возвращает тип Dump. // На вход ожидаются Storage и абсолютный путь до файла, в который нужно сохранять бекап func NewDump(storage storage.Dumper, path string) *Dump { return &Dump{ storage: storage, path: path, } } // Store - вызовом этого метода данные из Storage сохраняются на диск func (m *Dump) Store(ctx context.Context) error { if m.path == "" { return nil } f, err := os.CreateTemp(os.TempDir(), "backup") if err != nil { return err } savePath := f.Name() b, err := m.storage.GetAll(ctx) if err != nil { return err } d, err := json.Marshal(Backup{Data: b}) if err != nil { return err } if _, err := f.Write(d); err != nil { return err } if err := f.Close(); err != nil { return err } if err := os.Rename(savePath, m.path); err != nil { return err } return nil }
package backup import ( "context" "time" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" "github.com/kosalnik/metrics/internal/storage" ) type Dumper interface { Store(ctx context.Context) error } type Recoverer interface { Recover(ctx context.Context) error } type Storage interface { storage.Dumper storage.BatchInserter storage.UpdateAwarer } type Backup struct { Data []models.Metrics } // BackupManager - структура управляющая бекапом и восстановлением из бекапа. type BackupManager struct { dump Dumper recover Recoverer storage Storage lastBackup time.Time backupInterval time.Duration } // NewBackupManager - создаёт BackupManager с конфигурацией config.Backup. // На вход принимает хранилище с интерфейсом Storage и настройки бекапа. func NewBackupManager(s Storage, cfg Config) (*BackupManager, error) { if cfg.FileStoragePath == "" { return nil, nil } var d *Dump var r *Recover if cfg.StoreInterval > 0 { d = NewDump(s, cfg.FileStoragePath) } if cfg.Restore { r = NewRecover(s, cfg.FileStoragePath) } if d == nil && r == nil { return nil, nil } return &BackupManager{ dump: d, recover: r, storage: s, backupInterval: time.Duration(cfg.StoreInterval) * time.Second, lastBackup: time.Now(), }, nil } func (m *BackupManager) BackupLoop(ctx context.Context) { if m == nil || m.dump == nil || m.backupInterval == 0 { log.Info().Msg("schedule backup skipped") return } tick := time.NewTicker(m.backupInterval) defer tick.Stop() for { select { case <-ctx.Done(): log.Info().Msg("backup loop: context done") return case <-tick.C: if m.lastBackup.Equal(m.storage.UpdatedAt()) { log.Debug().Msg("backup loop: no changes, skip backup") continue } log.Info().Msg("backup loop: store") if err := m.Store(ctx); err != nil { log.Error().Err(err).Msg("Fail backup") } } } } func (m *BackupManager) Store(ctx context.Context) (err error) { if m == nil || m.dump == nil { log.Info().Msg("Store skipped") return nil } log.Info().Msg("Backup start") defer func() { if err != nil { log.Error().Err(err).Msg("Backup error") } else { log.Info().Msg("Backup completed") } }() return m.dump.Store(ctx) } // Recover - восстановить данные из бекапа. func (m *BackupManager) Recover(ctx context.Context) error { if m == nil || m.recover == nil { log.Info().Msg("recover skipped") return nil } log.Info().Msg("recover start") return m.recover.Recover(ctx) }
package backup import ( "context" "encoding/json" "os" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/storage" ) type Recover struct { recover storage.BatchInserter path string } // NewRecover - создаст инстанс типа Recover, которую можно использовать для восстановления Storage из бекапа. // На вход подаётся объект реализующий интерфейс storage.BatchInserter и путь к файлу из которого нужно восстанавливать. func NewRecover(storage storage.BatchInserter, path string) *Recover { return &Recover{ recover: storage, path: path, } } // Recover - Восстановить Storage из бекапа. func (m *Recover) Recover(ctx context.Context) error { if m == nil || m.path == "" { log.Info().Msg("Recover skipped. No Path or Disabled") return nil } d, err := os.ReadFile(m.path) if err != nil { return err } var b Backup if err := json.Unmarshal(d, &b); err != nil { return err } if len(b.Data) == 0 { return nil } if err := m.recover.UpsertAll(ctx, b.Data); err != nil { return err } return nil }
package config import ( "crypto/rsa" "crypto/x509" "encoding/pem" "flag" "fmt" "os" "strconv" ) func ParseAgentFlags(args []string, c *Agent) error { fs := flag.NewFlagSet(args[0], flag.ContinueOnError) fs.SetOutput(os.Stdout) fs.StringVar(&c.CollectorAddress, "a", c.CollectorAddress, "address server endpoint") fs.Int64Var(&c.PollInterval, "p", c.PollInterval, "Pool interval (seconds)") fs.Int64Var(&c.ReportInterval, "r", c.ReportInterval, "Report interval (seconds)") fs.Int64Var(&c.RateLimit, "l", c.RateLimit, "Rate limit") fs.StringVar(&c.Hash.Key, "k", c.Hash.Key, "SHA256 Key") publicKeyFile := fs.String("crypto-key", "", "Public Key") _ = fs.String("config", "", "Config file") _ = fs.String("c", "", "Config file (shorthand)") var err error if err = loadFromConfigFile(fs, c); err != nil { return err } if err = fs.Parse(args[1:]); err != nil { return err } if v := os.Getenv("PROFILING"); v != "" { c.Profiling.Enabled, err = strconv.ParseBool(v) if err != nil { return fmt.Errorf("PROFILING should be bool, got: %s : %w", v, err) } } if v := os.Getenv("ADDRESS"); v != "" { c.CollectorAddress = v } if v := os.Getenv("REPORT_INTERVAL"); v != "" { c.ReportInterval, err = strconv.ParseInt(v, 10, 64) if err != nil { return fmt.Errorf("REPORT_INTERVAL should be Int64, got: %s : %w", v, err) } } if v := os.Getenv("GRPC"); v != "" { c.GRPC, err = strconv.ParseBool(v) if err != nil { return fmt.Errorf("GRPC should be bool, got: %s : %w", v, err) } } if v := os.Getenv("RATE_LIMIT"); v != "" { c.RateLimit, err = strconv.ParseInt(v, 10, 64) if err != nil { return fmt.Errorf("RATE_LIMIT should be Int64, got: %s : %w", v, err) } } if v := os.Getenv("POLL_INTERVAL"); v != "" { c.PollInterval, err = strconv.ParseInt(v, 10, 64) if err != nil { return fmt.Errorf("POLL_INTERVAL should be Int64, got: %s : %w", v, err) } } if v := os.Getenv("KEY"); v != "" { c.Hash.Key = v } if v := os.Getenv("CRYPTO_KEY"); v != "" { publicKeyFile = &v } if publicKeyFile != nil && *publicKeyFile != "" { publicKeyPEM, err := os.ReadFile(*publicKeyFile) if err != nil { return fmt.Errorf("fail to read key: %w", err) } publicKeyBlock, _ := pem.Decode(publicKeyPEM) publicKey, err := x509.ParsePKIXPublicKey(publicKeyBlock.Bytes) if err != nil { return fmt.Errorf("fail to parse key: %w", err) } c.PublicKey = publicKey.(*rsa.PublicKey) } return nil }
// Package config contains config. // Ну что тут сказать. В этом пакете находятся структуры конфигураций компонентов системы. package config import ( "crypto/rsa" _ "embed" "github.com/kosalnik/metrics/internal/backup" "github.com/kosalnik/metrics/internal/crypt" "github.com/kosalnik/metrics/internal/log" ) const ( defaultServerBind = ":8080" defaultCollectorAddress = "127.0.0.1:8080" defaultPollInterval = 2 defaultReportInterval = 10 defaultRateLimit = 1 ) type Agent struct { CollectorAddress string `json:"address"` GRPC bool `json:"grpc"` PollInterval int64 `json:"poll_interval"` ReportInterval int64 `json:"report_interval"` PublicKey *rsa.PublicKey `json:"crypto_key"` RateLimit int64 `json:"rate_limit"` Hash crypt.Config Logger log.Config Profiling Profiling } type Server struct { Address string `json:"address"` GRPCAddress string `json:"grpc_address"` TrustedSubnet string `json:"trusted_subnet"` Backup backup.Config `json:"backup"` PrivateKey *rsa.PrivateKey `json:"crypto_key"` Logger log.Config DB DB Hash crypt.Config Profiling Profiling } type Profiling struct { Enabled bool } type DB struct { DSN string } func NewAgent() *Agent { return &Agent{ Profiling: Profiling{}, Logger: log.Config{Level: "info"}, CollectorAddress: defaultCollectorAddress, PollInterval: defaultPollInterval, ReportInterval: defaultReportInterval, Hash: crypt.Config{Key: ""}, RateLimit: defaultRateLimit, } } func NewServer() *Server { return &Server{ Profiling: Profiling{}, Logger: log.Config{Level: "info"}, Address: defaultServerBind, Backup: backup.Config{}, Hash: crypt.Config{Key: ""}, } }
package config import ( "encoding/json" "flag" "io" "os" "github.com/kosalnik/metrics/internal/log" ) func loadFromJson(f io.Reader, c any) error { b, err := io.ReadAll(f) if err != nil { return err } if err := json.Unmarshal(b, c); err != nil { return err } return nil } func loadFromConfigFile(fs *flag.FlagSet, c any) error { var configFilePath string fs.Visit(func(f *flag.Flag) { if (f.Name == "config" || f.Name == "c") && f.Value.String() != "" { configFilePath = f.Value.String() } }) if v := os.Getenv("CONFIG"); v != "" { configFilePath = v } if configFilePath != "" { log.Debug().Str("path", configFilePath).Msg("Load config from file") fl, err := os.Open(configFilePath) if err != nil { return err } if err := loadFromJson(fl, c); err != nil { return err } } return nil }
package config import ( "crypto/x509" "encoding/pem" "flag" "fmt" "os" "strconv" ) const ( defaultAddress = ":8080" defaultStoreInterval = 300 defaultBackupFileStoragePath = "/tmp/metrics-db.json" ) func ParseServerFlags(args []string, c *Server) error { fs := flag.NewFlagSet(args[0], flag.ContinueOnError) fs.SetOutput(os.Stdout) fs.StringVar(&c.Address, "a", defaultAddress, "server endpoint (ip:port)") fs.IntVar(&c.Backup.StoreInterval, "i", defaultStoreInterval, "Store interval") fs.StringVar(&c.Backup.FileStoragePath, "f", defaultBackupFileStoragePath, "File storage path") fs.BoolVar(&c.Backup.Restore, "r", true, "Restore storage before start") fs.StringVar(&c.DB.DSN, "d", "", "Database DSN") fs.StringVar(&c.Hash.Key, "k", "", "SHA256 Key") fs.StringVar(&c.TrustedSubnet, "t", "", "Trusted Subnet") privateKeyFile := fs.String("crypto-key", "", "Public Key") _ = fs.String("config", "", "Config file") _ = fs.String("c", "", "Config file (shorthand)") var err error if err = loadFromConfigFile(fs, c); err != nil { return err } if err := fs.Parse(args[1:]); err != nil { return fmt.Errorf("fail to parse flags: %w", err) } if v := os.Getenv("PROFILING"); v != "" { c.Profiling.Enabled, err = strconv.ParseBool(v) if err != nil { return fmt.Errorf("PROFILING should be bool, got: %s : %w", v, err) } } if v := os.Getenv("ADDRESS"); v != "" { c.Address = v } if v := os.Getenv("GRPC_ADDRESS"); v != "" { c.GRPCAddress = v } if v := os.Getenv("STORE_INTERVAL"); v != "" { c.Backup.StoreInterval, err = strconv.Atoi(v) if err != nil { return fmt.Errorf("wrong env STORE_INTERVAL: %w", err) } } if v := os.Getenv("FILE_STORAGE_PATH"); v != "" { c.Backup.FileStoragePath = v } if v := os.Getenv("RESTORE"); v != "" { c.Backup.Restore, err = strconv.ParseBool(v) if err != nil { return fmt.Errorf("wrong env RESTORE: %w", err) } } if v := os.Getenv("DATABASE_DSN"); v != "" { c.DB.DSN = v } if v := os.Getenv("KEY"); v != "" { c.Hash.Key = v } if v := os.Getenv("TRUSTED_SUBNET"); v != "" { c.TrustedSubnet = v } if v := os.Getenv("CRYPTO_KEY"); v != "" { privateKeyFile = &v } if privateKeyFile != nil && *privateKeyFile != "" { privateKeyPEM, err := os.ReadFile(*privateKeyFile) if err != nil { return fmt.Errorf("fail to read key: %w", err) } keyBlock, _ := pem.Decode(privateKeyPEM) privateKey, err := x509.ParsePKCS1PrivateKey(keyBlock.Bytes) if err != nil { return fmt.Errorf("fail to parse key: %w", err) } c.PrivateKey = privateKey } return nil }
package crypt import ( "crypto/rand" "crypto/rsa" "io" ) type RSAEncoder struct { random io.Reader key *rsa.PublicKey } func NewEncoder(key *rsa.PublicKey, random io.Reader) *RSAEncoder { if random == nil { random = rand.Reader } return &RSAEncoder{random: random, key: key} } func (r *RSAEncoder) Encode(b []byte) ([]byte, error) { return rsa.EncryptPKCS1v15(r.random, r.key, b) } type RSADecoder struct { random io.Reader key *rsa.PrivateKey } func NewDecoder(key *rsa.PrivateKey, random io.Reader) *RSADecoder { if random == nil { random = rand.Reader } return &RSADecoder{key: key, random: random} } func (r *RSADecoder) Decode(b []byte) ([]byte, error) { return rsa.DecryptPKCS1v15(r.random, r.key, b) }
package crypt import ( "net/http" ) //go:generate mockgen -source=cipher_interceptor.go -destination=./mock/encoder.go -package=mock type Encoder interface { Encode([]byte) ([]byte, error) } func NewCipherInterceptor(encoder Encoder, transport http.RoundTripper) *CipherRoundTripper { return &CipherRoundTripper{ core: transport, mutator: NewCipherRequestMutator(encoder), } } type CipherRoundTripper struct { core http.RoundTripper mutator func(r *http.Request) (*http.Request, error) } func (a *CipherRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) { req, err := a.mutator(request) if err != nil { return nil, err } return a.core.RoundTrip(req) } var _ http.RoundTripper = &CipherRoundTripper{}
package crypt import ( "bytes" "io" "net/http" ) //go:generate mockgen -source=cipher_middleware.go -destination=./mock/decoder.go -package=mock type Decoder interface { Decode([]byte) ([]byte, error) } func CipherMiddleware(decoder Decoder) func(next http.Handler) http.Handler { if decoder == nil { return func(next http.Handler) http.Handler { return next } } return func(next http.Handler) http.Handler { return http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() ciphertext, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "empty body", http.StatusInternalServerError) return } plaintext, err := decoder.Decode(ciphertext) if err != nil { http.Error(w, "wrong request", http.StatusBadRequest) return } req := r.Clone(r.Context()) req.Body = io.NopCloser(bytes.NewReader(plaintext)) next.ServeHTTP(w, req) }, ) } }
package crypt import ( "bytes" "context" "io" "net/http" ) func NewCipherRequestMutator(encoder Encoder) func(req *http.Request) (*http.Request, error) { return func(request *http.Request) (*http.Request, error) { if encoder == nil || request.Body == nil { return request, nil } b, err := io.ReadAll(request.Body) if err != nil { return request, err } defer request.Body.Close() ciphertext, err := encoder.Encode(b) if err != nil { return request, err } req := request.Clone(context.Background()) req.Body = io.NopCloser(bytes.NewReader(ciphertext)) req.ContentLength = int64(len(ciphertext)) return req, nil } }
// Package crypt contains working with signs. package crypt import ( "crypto/hmac" "crypto/sha256" "encoding/hex" "net/http" ) const hashHeader = "HashSHA256" func ExtractSign(r *http.Request) string { return r.Header.Get(hashHeader) } func ToSignRequest(r *http.Request, value string) { r.Header.Set(hashHeader, value) } func GetSign(data []byte, key []byte) string { h := hmac.New(sha256.New, key) h.Write(data) return hex.EncodeToString(h.Sum(nil)) } func VerifySign(data []byte, sign string, key []byte) bool { return GetSign(data, key) == sign }
package crypt import ( "net/http" ) func VerifyHashInterceptor(cfg Config, transport http.RoundTripper) *AddHash { return &AddHash{ core: transport, signer: NewSignMutator([]byte(cfg.Key)), } } type AddHash struct { core http.RoundTripper signer func(r *http.Request) (*http.Request, error) } func (a *AddHash) RoundTrip(request *http.Request) (*http.Response, error) { req, err := a.signer(request) if err != nil { return nil, err } return a.core.RoundTrip(req) } var _ http.RoundTripper = &AddHash{}
package crypt import ( "bytes" "io" "net/http" "github.com/kosalnik/metrics/internal/log" ) func HashCheckMiddleware(cfg Config) func(next http.Handler) http.Handler { if cfg.Key == "" { return func(next http.Handler) http.Handler { return next } } return func(next http.Handler) http.Handler { return http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { expectedHash := ExtractSign(r) log.Debug().Str("hash", expectedHash).Msg("Get Hash Header") if expectedHash != "" { defer r.Body.Close() b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "empty body", http.StatusInternalServerError) return } if !VerifySign(b, expectedHash, []byte(cfg.Key)) { http.Error(w, "verify hash fail", http.StatusBadRequest) return } req := r.Clone(r.Context()) req.Body = io.NopCloser(bytes.NewReader(b)) next.ServeHTTP(w, req) return } next.ServeHTTP(w, r) }, ) } }
package crypt import ( "bytes" "context" "io" "net/http" ) func NewSignMutator(key []byte) func(*http.Request) (*http.Request, error) { return func(request *http.Request) (*http.Request, error) { if len(key) == 0 || request.Body == nil { return request, nil } b, err := io.ReadAll(request.Body) if err != nil { return nil, err } defer request.Body.Close() h := GetSign(b, key) req := request.Clone(context.Background()) req.Body = io.NopCloser(bytes.NewReader(b)) ToSignRequest(req, h) return req, nil } }
// Package handlers contains handlers. // Методы этого пакета создают хендлеры для http.Server package handlers import ( "encoding/json" "net/http" "sort" "strings" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/storage" ) func NewGetAllHandler(s storage.Storager) func(res http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { accept := req.Header.Get("Accept") isJSON := accept != "" && strings.Contains(accept, "application/json") if isJSON { w.Header().Set("Content-Type", "application/json") } else { w.Header().Set("Content-Type", "text/html") } items, err := s.GetAll(req.Context()) if err != nil { log.Error().Err(err).Msg("fail get all") http.Error(w, `"fail get all"`, http.StatusInternalServerError) return } if len(items) == 0 { w.Write([]byte(`[]`)) return } var data []byte if isJSON { data, err = json.Marshal(items) if err != nil { http.Error(w, `"fail marshal"`, http.StatusInternalServerError) return } } else { t := make([]string, len(items)) for i, v := range items { log.Info().Any("v", v).Msg("get all metrics") t[i] = v.String() } sort.Strings(t) data = []byte(strings.Join(t, "\n")) } if _, err := w.Write(data); err != nil { log.Error().Err(err).Msg("fail write response") } } }
package handlers import ( "encoding/json" "fmt" "io" "net/http" "github.com/go-chi/chi/v5" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" "github.com/kosalnik/metrics/internal/storage" ) func NewRestGetHandler(s storage.Storager) func(res http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json") data, err := io.ReadAll(req.Body) log.Info().Str("body", string(data)).Msg("Handle Get") if err != nil { http.Error(w, `"Wrong data"`, http.StatusBadRequest) return } var m models.Metrics if err := json.Unmarshal(data, &m); err != nil { http.Error(w, `"Wrong json"`, http.StatusBadRequest) return } switch m.MType { case models.MGauge: v, err := s.GetGauge(req.Context(), m.ID) if err != nil { http.Error(w, `"fail get gauge"`, http.StatusInternalServerError) return } if v == nil { break } if out, err := json.Marshal(v); err != nil { http.Error(w, `"internal error"`, http.StatusInternalServerError) } else { log.Info().Str("body", string(out)).Msg("Handle Get Result") _, _ = w.Write(out) } return case models.MCounter: v, err := s.GetCounter(req.Context(), m.ID) if err != nil { http.Error(w, `"fail get counter"`, http.StatusInternalServerError) return } if v == nil { break } if out, err := json.Marshal(v); err != nil { http.Error(w, `"internal error"`, http.StatusInternalServerError) } else { _, _ = w.Write(out) } return } http.Error(w, `"not found"`, http.StatusNotFound) } } func NewGetHandler(s storage.Storager) func(res http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { mType := models.MType(chi.URLParam(req, "type")) mName := chi.URLParam(req, "name") switch mType { case models.MGauge: v, err := s.GetGauge(req.Context(), mName) if err != nil { http.Error(w, `"fail get gauge"`, http.StatusInternalServerError) return } if v == nil { http.NotFound(w, req) return } res := fmt.Sprintf("%v", v.Value) if _, err := w.Write([]byte(res)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } return case models.MCounter: v, err := s.GetCounter(req.Context(), mName) if err != nil { http.Error(w, `"fail get counter"`, http.StatusInternalServerError) return } if v == nil { http.NotFound(w, req) return } res := fmt.Sprintf("%v", v.Delta) if _, err := w.Write([]byte(res)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } return } http.Error(w, "Not Found", http.StatusNotFound) } }
package handlers import ( "context" "net/http" "time" "github.com/kosalnik/metrics/internal/storage" ) func NewPingHandler(db storage.Pinger) func(res http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() if err := db.Ping(ctx); err != nil { http.Error(w, "", http.StatusInternalServerError) return } } }
package handlers import ( "encoding/json" "fmt" "io" "net/http" "strconv" "github.com/go-chi/chi/v5" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" "github.com/kosalnik/metrics/internal/storage" ) func NewRestUpdateHandler(s storage.Storager) func(res http.ResponseWriter, req *http.Request) { return func(res http.ResponseWriter, req *http.Request) { res.Header().Set("Content-Type", "application/json") data, err := io.ReadAll(req.Body) log.Debug().Str("data", string(data)).Msg("Handle update") if err != nil { http.Error(res, `"Wrong data"`, http.StatusBadRequest) return } var m models.Metrics if err := json.Unmarshal(data, &m); err != nil { http.Error(res, `"Wrong json"`, http.StatusBadRequest) return } switch m.MType { case models.MGauge: r, err := s.SetGauge(req.Context(), m.ID, m.Value) if err != nil { http.Error(res, `"fail set gauge"`, http.StatusInternalServerError) return } if out, err := json.Marshal(r); err != nil { http.Error(res, `"internal error"`, http.StatusInternalServerError) } else { _, _ = res.Write(out) } return case models.MCounter: r, err := s.IncCounter(req.Context(), m.ID, m.Delta) if err != nil { http.Error(res, `"fail inc counter"`, http.StatusInternalServerError) return } if out, err := json.Marshal(r); err != nil { http.Error(res, `"internal error"`, http.StatusInternalServerError) } else { _, _ = res.Write(out) } return } http.Error(res, `"not found"`, http.StatusNotFound) } } func NewUpdateHandler(s storage.Storager) func(res http.ResponseWriter, req *http.Request) { return func(res http.ResponseWriter, req *http.Request) { res.Header().Set("Content-Type", "text/plain") mType := models.MType(chi.URLParam(req, "type")) mName := chi.URLParam(req, "name") mVal := chi.URLParam(req, "value") log.Debug(). Str("type", string(mType)). Str("name", mName). Str("val", mVal).Msg("Handle update") switch mType { case models.MGauge: v, err := strconv.ParseFloat(mVal, 64) if err != nil { http.Error(res, "bad request", http.StatusBadRequest) return } r, err := s.SetGauge(req.Context(), mName, v) if err != nil { http.Error(res, `"fail set gauge"`, http.StatusInternalServerError) return } if _, err := res.Write([]byte(fmt.Sprintf("%f", r.Value))); err != nil { log.Error().Err(err).Msg("fail write response") } return case models.MCounter: v, err := strconv.ParseInt(mVal, 10, 64) if err != nil { http.Error(res, "bad request", http.StatusBadRequest) return } r, err := s.IncCounter(req.Context(), mName, v) if err != nil { log.Error().Err(err).Msg("fail inc counter") http.Error(res, `"fail inc counter"`, http.StatusInternalServerError) return } if _, err := res.Write([]byte(fmt.Sprintf("%d", r.Delta))); err != nil { log.Error().Err(err).Msg("fail write response") } return } http.Error(res, "bad request", http.StatusBadRequest) } }
package handlers import ( "encoding/json" "io" "net/http" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" "github.com/kosalnik/metrics/internal/storage" ) func NewUpdateBatchHandler(s storage.BatchInserter) func(res http.ResponseWriter, req *http.Request) { return func(res http.ResponseWriter, req *http.Request) { res.Header().Set("Content-Type", "application/json") data, err := io.ReadAll(req.Body) log.Debug().Str("data", string(data)).Msg("Handle batch update") if err != nil { http.Error(res, `"Wrong data"`, http.StatusBadRequest) return } var mList []models.Metrics if err := json.Unmarshal(data, &mList); err != nil { http.Error(res, `"Wrong json"`, http.StatusBadRequest) return } if err := s.UpsertAll(req.Context(), mList); err != nil { http.Error(res, `"fail upsert"`, http.StatusInternalServerError) return } } }
// Package logger initializes logger. package log import ( "os" "github.com/rs/zerolog" ) var loggerInstance = zerolog.New(os.Stderr).With().Timestamp().Logger() type Config struct { Level string } func InitLogger(levelName string) error { level, err := zerolog.ParseLevel(levelName) if err != nil { return err } loggerInstance = loggerInstance.Level(level) return nil } func Debug() *zerolog.Event { return loggerInstance.Debug() } func Warn() *zerolog.Event { return loggerInstance.Warn() } func Warning() *zerolog.Event { return loggerInstance.Warn() } func Info() *zerolog.Event { return loggerInstance.Info() } func Error() *zerolog.Event { return loggerInstance.Error() } func Fatal() *zerolog.Event { return loggerInstance.Fatal() } func Panic() *zerolog.Event { return loggerInstance.Panic() } func Log() *zerolog.Event { return loggerInstance.Log() }
// Package memstorage implements storage in memory. package memstorage import ( "context" "sync" "time" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/storage" "github.com/kosalnik/metrics/internal/models" ) type MemStorage struct { updatedAt time.Time gauge map[string]float64 counter map[string]int64 mu sync.Mutex } func NewMemStorage() *MemStorage { return &MemStorage{ gauge: make(map[string]float64), counter: make(map[string]int64), updatedAt: time.Now(), } } var _ storage.Storager = &MemStorage{} func (m *MemStorage) GetGauge(_ context.Context, name string) (*models.Metrics, error) { m.mu.Lock() defer m.mu.Unlock() v, ok := m.gauge[name] if ok { return &models.Metrics{ID: name, MType: models.MGauge, Value: v}, nil } return nil, nil } func (m *MemStorage) GetCounter(_ context.Context, name string) (*models.Metrics, error) { m.mu.Lock() defer m.mu.Unlock() v, ok := m.counter[name] if ok { return &models.Metrics{ID: name, MType: models.MCounter, Delta: v}, nil } return nil, nil } func (m *MemStorage) SetGauge(_ context.Context, name string, value float64) (*models.Metrics, error) { m.mu.Lock() defer m.mu.Unlock() m.gauge[name] = value m.updatedAt = time.Now() return &models.Metrics{ID: name, MType: models.MGauge, Value: value}, nil } func (m *MemStorage) IncCounter(_ context.Context, name string, value int64) (*models.Metrics, error) { m.mu.Lock() defer m.mu.Unlock() v := m.counter[name] + value log.Info().Str("k", name).Int64("old", m.counter[name]).Int64("new", v).Msg("IncCounter") m.counter[name] = v m.updatedAt = time.Now() return &models.Metrics{ID: name, MType: models.MCounter, Delta: v}, nil } func (m *MemStorage) UpsertAll(_ context.Context, list []models.Metrics) error { if len(list) == 0 { return nil } m.mu.Lock() defer m.mu.Unlock() log.Info().Any("list", list).Msg("upsertAll") for _, v := range list { switch v.MType { case models.MGauge: t := v.Value m.gauge[v.ID] = t m.updatedAt = time.Now() continue case models.MCounter: t := v.Delta m.counter[v.ID] += t m.updatedAt = time.Now() } } return nil } func (m *MemStorage) GetAll(_ context.Context) ([]models.Metrics, error) { m.mu.Lock() defer m.mu.Unlock() res := make([]models.Metrics, len(m.gauge)+len(m.counter)) i := 0 for k, v := range m.gauge { t := v res[i] = models.Metrics{ID: k, MType: models.MGauge, Value: t} i++ } for k, v := range m.counter { t := v res[i] = models.Metrics{ID: k, MType: models.MCounter, Delta: t} i++ } return res, nil } func (m *MemStorage) Close() error { return nil } func (m *MemStorage) Ping(_ context.Context) error { return nil } func (m *MemStorage) UpdatedAt() time.Time { m.mu.Lock() defer m.mu.Unlock() return m.updatedAt }
// Package metric contains implementation of the metrics getter. // Method GetMetrics receives the set of metrics from system. package metric import ( "context" "runtime" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/mem" "github.com/kosalnik/metrics/internal/log" ) func GetMetrics(ctx context.Context) (map[string]float64, error) { var m runtime.MemStats runtime.ReadMemStats(&m) r := map[string]float64{ "Alloc": float64(m.Alloc), "BuckHashSys": float64(m.BuckHashSys), "Frees": float64(m.Frees), "GCCPUFraction": m.GCCPUFraction, "GCSys": float64(m.GCSys), "HeapAlloc": float64(m.HeapAlloc), "HeapIdle": float64(m.HeapIdle), "HeapInuse": float64(m.HeapInuse), "HeapObjects": float64(m.HeapObjects), "HeapReleased": float64(m.HeapReleased), "HeapSys": float64(m.HeapSys), "LastGC": float64(m.LastGC), "Lookups": float64(m.Lookups), "MCacheInuse": float64(m.MCacheInuse), "MCacheSys": float64(m.MCacheSys), "MSpanInuse": float64(m.MSpanInuse), "MSpanSys": float64(m.MSpanSys), "Mallocs": float64(m.Mallocs), "NextGC": float64(m.NextGC), "NumForcedGC": float64(m.NumForcedGC), "NumGC": float64(m.NumGC), "OtherSys": float64(m.OtherSys), "PauseTotalNs": float64(m.PauseTotalNs), "StackInuse": float64(m.StackInuse), "StackSys": float64(m.StackSys), "Sys": float64(m.Sys), "TotalAlloc": float64(m.TotalAlloc), } if cpuUsage, err := cpu.PercentWithContext(ctx, 0, false); err == nil { r["CPUutilization1"] = float64(cpuUsage[0]) } else { log.Error().Err(err).Msg("get cpu usage fail") } if memUsage, err := mem.VirtualMemory(); err == nil { r["TotalMemory"] = float64(memUsage.Total) r["FreeMemory"] = float64(memUsage.Free) } else { log.Error().Err(err).Msg("get memory usage fail") } return r, nil }
// Package models содержит определения моделей, используемых приложением. package models import ( "encoding/json" "fmt" ) type Metrics struct { ID string `json:"id"` // имя метрики MType MType `json:"type"` // параметр, принимающий значение gauge или counter Delta int64 `json:"delta,omitempty"` // значение метрики в случае передачи counter Value float64 `json:"value,omitempty"` // значение метрики в случае передачи gauge } func (m *Metrics) MarshalJSON() ([]byte, error) { switch m.MType { case MGauge: return json.Marshal( struct { ID string `json:"id"` MType MType `json:"type"` Value float64 `json:"value"` }{m.ID, m.MType, m.Value}, ) case MCounter: return json.Marshal( struct { ID string `json:"id"` MType MType `json:"type"` Delta int64 `json:"delta"` }{m.ID, m.MType, m.Delta}, ) } return json.Marshal(m) } type MType string const ( MGauge MType = "gauge" MCounter MType = "counter" ) func (m *Metrics) String() string { if m.MType == MCounter { return fmt.Sprintf("%s = %d", m.ID, m.Delta) } return fmt.Sprintf("%s = %g", m.ID, m.Value) }
package postgres import ( "context" "database/sql" "errors" "time" ) func NewConn(dsn string) (*sql.DB, error) { db, err := sql.Open("pgx", dsn) if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() var ok bool row := db.QueryRowContext(ctx, "SELECT true AS ok") if err := row.Scan(&ok); err != nil { return nil, err } if !ok { return nil, errors.New("DB Connection timeout exceed") } return db, nil }
// Package postgres содержит реализацию репозиториев к БД postgres. package postgres import ( "context" "database/sql" "errors" "fmt" "sync" "time" "github.com/kosalnik/metrics/internal/log" "github.com/kosalnik/metrics/internal/models" ) var schemaGaugeSQL = `CREATE TABLE IF NOT EXISTS gauge( id VARCHAR(200) PRIMARY KEY, value double precision not null )` var schemaCounterSQL = `CREATE TABLE IF NOT EXISTS counter( id VARCHAR(200) PRIMARY KEY, value bigint not null )` type DBStorage struct { updatedAt time.Time db *sql.DB mu sync.Mutex } func NewDBStorage(db *sql.DB) (*DBStorage, error) { return &DBStorage{mu: sync.Mutex{}, updatedAt: time.Now(), db: db}, nil } func (d *DBStorage) CreateTablesIfNotExist(ctx context.Context) error { return d.inTransaction(ctx, func(tr *sql.Tx) error { if _, err := d.db.ExecContext(ctx, schemaCounterSQL); err != nil { return err } if _, err := d.db.ExecContext(ctx, schemaGaugeSQL); err != nil { return err } return nil }) } func (d *DBStorage) GetGauge(ctx context.Context, name string) (*models.Metrics, error) { r := d.db.QueryRowContext(ctx, "SELECT value FROM gauge WHERE id = $1", name) var v float64 if err := r.Scan(&v); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil } log.Error().Err(err).Msg("fail db request. get gauge") return nil, err } return &models.Metrics{ID: name, MType: models.MGauge, Value: v}, nil } func (d *DBStorage) SetGauge(ctx context.Context, name string, value float64) (*models.Metrics, error) { s := "INSERT INTO gauge (id, value) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET value = $2" _, err := d.db.ExecContext(ctx, s, name, value) if err != nil { log.Error().Err(err).Msg("fail db request. set gauge") return nil, err } d.setUpdatedAt() v, err := d.GetGauge(ctx, name) return v, err } func (d *DBStorage) GetCounter(ctx context.Context, name string) (*models.Metrics, error) { r := d.db.QueryRowContext(ctx, "SELECT value FROM counter WHERE id = $1", name) var v int64 if err := r.Scan(&v); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil } log.Error().Err(err).Msg("fail db request. get counter") return nil, err } return &models.Metrics{ID: name, MType: models.MCounter, Delta: v}, nil } func (d *DBStorage) IncCounter(ctx context.Context, name string, value int64) (*models.Metrics, error) { s := "INSERT INTO counter (id, value) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET value = counter.value + $2" _, err := d.db.ExecContext(ctx, s, name, value) if err != nil { log.Error().Err(err).Msg("fail db request. inc counter") return nil, err } d.setUpdatedAt() v, err := d.GetCounter(ctx, name) return v, err } func (d *DBStorage) inTransaction(ctx context.Context, fn func(tr *sql.Tx) error) error { tr, err := d.db.BeginTx(ctx, nil) if err != nil { return err } err = fn(tr) if err != nil { err := tr.Rollback() if err != nil { return fmt.Errorf("fail rollback transaction: %w", err) } return err } return nil } func (d *DBStorage) UpsertAll(ctx context.Context, list []models.Metrics) (err error) { return d.inTransaction(ctx, func(tr *sql.Tx) error { incCounterSt, err := tr.PrepareContext( ctx, "INSERT INTO counter (id, value) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET value = counter.value + $2", ) if err != nil { return fmt.Errorf("fail upsert: %w", err) } defer incCounterSt.Close() setGaugeSt, err := tr.PrepareContext( ctx, "INSERT INTO gauge (id, value) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET value = $2", ) if err != nil { return fmt.Errorf("fail upsert: %w", err) } defer setGaugeSt.Close() log.Info().Any("list", list).Msg("upsertAll") for _, v := range list { switch v.MType { case models.MGauge: if _, err := setGaugeSt.ExecContext(ctx, v.ID, v.Value); err != nil { return err } continue case models.MCounter: if _, err := incCounterSt.ExecContext(ctx, v.ID, v.Delta); err != nil { return err } } } if err := tr.Commit(); err != nil { return fmt.Errorf("fail commit transaction: %w", err) } d.setUpdatedAt() return nil }) } func (d *DBStorage) GetAll(ctx context.Context) ([]models.Metrics, error) { g, err := d.getAllGauge(ctx) if err != nil { return nil, err } c, err := d.getAllCounter(ctx) if err != nil { return nil, err } return append(g, c...), nil } func (d *DBStorage) getAllGauge(ctx context.Context) ([]models.Metrics, error) { var res []models.Metrics rows, err := d.db.QueryContext(ctx, "SELECT id, value FROM gauge ORDER BY id") if err != nil { if errors.Is(err, sql.ErrNoRows) { return res, nil } return nil, err } defer rows.Close() for rows.Next() { m := models.Metrics{MType: models.MGauge} if err := rows.Scan(&m.ID, &m.Value); err != nil { return nil, err } res = append(res, m) } if err := rows.Err(); err != nil { return nil, err } return res, nil } func (d *DBStorage) getAllCounter(ctx context.Context) ([]models.Metrics, error) { var res []models.Metrics rows, err := d.db.QueryContext(ctx, "SELECT id, value FROM counter ORDER BY id") if err != nil { if errors.Is(err, sql.ErrNoRows) { return res, nil } return nil, err } defer rows.Close() for rows.Next() { m := models.Metrics{MType: models.MCounter} if err := rows.Scan(&m.ID, &m.Delta); err != nil { return nil, err } res = append(res, m) } if err := rows.Err(); err != nil { return nil, err } return res, nil } func (d *DBStorage) Close() error { return d.db.Close() } func (d *DBStorage) UpdatedAt() time.Time { d.mu.Lock() defer d.mu.Unlock() return d.updatedAt } func (d *DBStorage) setUpdatedAt() { d.mu.Lock() defer d.mu.Unlock() d.updatedAt = time.Now() } func (d *DBStorage) Ping(ctx context.Context) error { return d.db.PingContext(ctx) }
package util import ( "errors" "net" "os" ) // GetMyHostIP - получить ip своего хоста func GetMyHostIP() (net.IP, error) { hostname, err := os.Hostname() if err != nil { return nil, err } addr, err := net.LookupIP(hostname) if err != nil { return nil, err } if len(addr) == 0 { return nil, errors.New("not found") } return addr[0], nil }
package version import ( "fmt" "io" "github.com/kosalnik/metrics/internal/log" ) type VersionInfo struct { BuildVersion string BuildDate string BuildCommit string } const hello = `Build version: %s Build date: %s Build commit: %s ` func (b VersionInfo) Print(w io.Writer) { p := []any{ val(b.BuildVersion), val(b.BuildDate), val(b.BuildCommit), } if _, err := fmt.Fprintf(w, hello, p...); err != nil { log.Error().Err(err).Msg("Fail to print the build version") } } func val(v string) string { if v == "" { return "N/A" } return v }