package main import ( "github.com/PDOK/azure-storage-usage-exporter/internal/agg" "github.com/PDOK/azure-storage-usage-exporter/internal/du" "github.com/PDOK/azure-storage-usage-exporter/internal/metrics" "github.com/creasty/defaults" ) type Config struct { Azure *du.AzureBlobInventoryReportConfig `yaml:"azure,omitempty"` Metrics metrics.Config `yaml:"metrics,omitempty"` Labels agg.Labels `yaml:"labels"` Rules []agg.AggregationRule `yaml:"rules"` } type unmarshalledConfig Config func (c *Config) UnmarshalYAML(unmarshal func(any) error) error { tmp := new(unmarshalledConfig) if err := defaults.Set(tmp); err != nil { return err } if err := unmarshal(tmp); err != nil { return err } *c = Config(*tmp) return nil }
package main import ( "errors" "log" "net/http" "os" "time" "github.com/PDOK/azure-storage-usage-exporter/internal/du" "github.com/google/uuid" "github.com/PDOK/azure-storage-usage-exporter/internal/metrics" "github.com/go-co-op/gocron/v2" "github.com/iancoleman/strcase" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/PDOK/azure-storage-usage-exporter/internal/agg" "gopkg.in/yaml.v2" "github.com/urfave/cli/v2" ) const ( cliOptAzureStorageConnectionString = "azure-storage-connection-string" cliOptBindAddress = "bind-address" cliOptConfigFile = "config" ) var ( cliFlags = []cli.Flag{ &cli.StringFlag{ Name: cliOptAzureStorageConnectionString, Usage: "Connection string for connecting to the Azure blob storage that holds the inventory (overrides the config file entry)", EnvVars: []string{strcase.ToScreamingSnake(cliOptAzureStorageConnectionString)}, }, &cli.StringFlag{ Name: cliOptBindAddress, Usage: "The TCP network address addr that is listened on.", Value: ":8080", EnvVars: []string{strcase.ToScreamingSnake(cliOptBindAddress)}, }, &cli.StringFlag{ Name: cliOptConfigFile, Usage: "Config file with aggregation labels and rules", EnvVars: []string{strcase.ToScreamingSnake(cliOptConfigFile)}, Required: true, TakesFile: true, }, } ) func main() { app := cli.NewApp() app.HelpName = "Azure Storage Usage Exporter" app.Name = "azure-storage-usage-exporter" app.Usage = "Aggregates an Azure Blob Inventory Report and export as Prometheus metrics" app.Flags = cliFlags app.Action = func(c *cli.Context) error { config, err := loadConfig(c) if err != nil { return err } aggregator, err := createAggregator(config) if err != nil { return err } metricsUpdater := metrics.NewUpdater(aggregator, config.Metrics) scheduler, err := gocron.NewScheduler() if err != nil { return err } _, err = scheduler.NewJob( gocron.DurationJob(time.Hour), // blob inventory reports run daily or weekly, so checking hourly seems frequent enough gocron.NewTask(metricsUpdater.UpdatePromMetrics), gocron.WithName("updating metrics"), gocron.WithSingletonMode(gocron.LimitModeReschedule), gocron.WithStartAt(gocron.WithStartImmediately()), gocron.WithEventListeners( gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) { log.Printf("%s (%s) errored: %s", jobName, jobID, err.Error()) }))) if err != nil { return err } scheduler.Start() http.Handle("/metrics", promhttp.Handler()) server := &http.Server{ Addr: c.String("bind-address"), ReadHeaderTimeout: 10 * time.Second, } return server.ListenAndServe() } err := app.Run(os.Args) if err != nil { log.Fatal(err) } } func createAggregator(config *Config) (*agg.Aggregator, error) { if config.Azure == nil { return nil, errors.New("azure config is required") } duReader := du.NewAzureBlobInventoryReportDuReader(*config.Azure) log.Print("testing azure connection") if err := duReader.TestConnection(); err != nil { return nil, err } return agg.NewAggregator( duReader, config.Labels, config.Rules, ) } func loadConfig(c *cli.Context) (*Config, error) { config := new(Config) configYaml, err := os.ReadFile(c.String(cliOptConfigFile)) if err != nil { return nil, err } if err := yaml.Unmarshal(configYaml, &config); err != nil { return nil, err } azureStorageConnectionStringFromCli := c.String(cliOptAzureStorageConnectionString) if config.Azure != nil && azureStorageConnectionStringFromCli != "" { config.Azure.AzureStorageConnectionString = azureStorageConnectionStringFromCli } return config, nil }
package agg import ( "encoding/json" "errors" "log" "slices" "time" "github.com/PDOK/azure-storage-usage-exporter/internal/du" _ "github.com/marcboeker/go-duckdb" // duckdb sql driver "golang.org/x/exp/maps" ) const ( Deleted = "deleted" StorageAccount = "storage_account" ) type Labels = map[string]string type AggregationRule struct { // The named groups are used as labels Pattern ReGroup `yaml:"pattern"` // A label not found as named group is looked up in this StaticLabels map[string]string `yaml:"labels"` } type AggregationGroup struct { Labels Labels Deleted bool } type AggregationResult struct { AggregationGroup AggregationGroup StorageUsage du.StorageUsage } type Aggregator struct { duReader du.Reader labelsWithDefaults Labels rules []AggregationRule } func NewAggregator(duReader du.Reader, labelsWithDefaults Labels, rules []AggregationRule) (*Aggregator, error) { if _, exists := labelsWithDefaults[Deleted]; exists { return nil, errors.New("cannot use custom label: " + Deleted) } if labelsWithDefaults == nil { labelsWithDefaults = Labels{} } else { labelsWithDefaults = maps.Clone(labelsWithDefaults) } if given, exists := labelsWithDefaults[StorageAccount]; !exists { labelsWithDefaults[StorageAccount] = duReader.GetStorageAccountName() } else if given == "" { delete(labelsWithDefaults, StorageAccount) } return &Aggregator{ duReader: duReader, labelsWithDefaults: labelsWithDefaults, rules: rules, }, nil } func (a *Aggregator) GetLabelNames() []string { keys := maps.Keys(a.labelsWithDefaults) keys = append(keys, Deleted) return keys } func (a *Aggregator) GetStorageAccountName() string { return a.labelsWithDefaults[StorageAccount] } func (a *Aggregator) Aggregate(previousRunDate time.Time) (aggregationResults []AggregationResult, runDate time.Time, err error) { log.Print("starting aggregation") runDate, rowsCh, errCh, err := a.duReader.Read(previousRunDate) if err != nil { return nil, runDate, err } if !runDate.After(previousRunDate) { return nil, runDate, nil } intermediateResults := make(map[string]du.StorageUsage) i := 0 for rowsCh != nil && errCh != nil { select { case err, ok := <-errCh: if !ok { errCh = nil continue } if err != nil { return nil, runDate, err } case row, ok := <-rowsCh: if !ok { rowsCh = nil continue } aggregationGroup := a.applyRulesToAggregate(row) intermediateResults[marshalAggregationGroup(aggregationGroup)] += row.Bytes if i%10000 == 0 { log.Printf("%d disk usage rows processed so far", i) } i++ } } log.Printf("done aggregating blob inventory, %d du rows processed", i) return intermediateResultsToAggregationResults(intermediateResults), runDate, nil } // The key in intermediate results of Aggregator.Aggregate is a JSON representation of AggregationGroup // because a map is not a comparable type. // Property order in the JSON is predictable/constant. func marshalAggregationGroup(aggregationGroup AggregationGroup) string { b, _ := json.Marshal(aggregationGroup) return string(b) } func unmarshalAggregationGroup(aggregationGroupJSON string) AggregationGroup { aggregationGroup := new(AggregationGroup) _ = json.Unmarshal([]byte(aggregationGroupJSON), aggregationGroup) return *aggregationGroup } func intermediateResultsToAggregationResults(intermediateResults map[string]du.StorageUsage) []AggregationResult { aggregationResults := make([]AggregationResult, len(intermediateResults)) i := 0 for aggregationGroup, storageUsage := range intermediateResults { aggregationResults[i] = AggregationResult{ AggregationGroup: unmarshalAggregationGroup(aggregationGroup), StorageUsage: storageUsage, } i++ } // sort by storageUsage desc slices.SortFunc(aggregationResults, func(a, b AggregationResult) int { return int(b.StorageUsage - a.StorageUsage) }) return aggregationResults } func (a *Aggregator) applyRulesToAggregate(row du.Row) AggregationGroup { for _, aggregationRule := range a.rules { labelsFromPattern, err := aggregationRule.Pattern.Groups(row.Dir) if err != nil { continue } aggregationGroup := AggregationGroup{ Labels: a.applyRuleDefaults(labelsFromPattern, aggregationRule), } aggregationGroup.Deleted = nilBoolToBool(row.Deleted) return aggregationGroup } // default if no rule matches return AggregationGroup{ Labels: maps.Clone(a.labelsWithDefaults), Deleted: nilBoolToBool(row.Deleted), } } func (a *Aggregator) applyRuleDefaults(labelsFromPattern Labels, rule AggregationRule) Labels { labels := maps.Clone(a.labelsWithDefaults) for label, defaultVal := range labels { labels[label] = defaultStr( labelsFromPattern[label], // first use a match group rule.StaticLabels[label], // otherwise use a static label from the rule defaultVal, // fall back to the label default ) } return labels } func defaultStr(s ...string) string { for i := range s { if s[i] != "" { return s[i] } } return "" } func nilBoolToBool(p *bool) bool { if p != nil { return *p } return false }
package agg import ( "github.com/oriser/regroup" ) // ReGroup wraps regroup.ReGroup to add YAML marshalling from and to a (regex) string type ReGroup struct { *regroup.ReGroup original string } func NewReGroup(original string) ReGroup { return ReGroup{ ReGroup: regroup.MustCompile(original), original: original, } } func (r *ReGroup) UnmarshalYAML(unmarshal func(any) error) error { if err := unmarshal(&r.original); err != nil { return err } reGroup, err := regroup.Compile(r.original) if err != nil { return err } r.ReGroup = reGroup return nil } func (r ReGroup) MarshalYAML() (interface{}, error) { if r.ReGroup == nil { return "", nil } return r.original, nil }
package du import ( "context" "errors" "fmt" "log" "net/url" "regexp" "slices" "strings" "time" "github.com/creasty/defaults" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/jmoiron/sqlx" "github.com/oriser/regroup" "golang.org/x/exp/maps" ) type AzureBlobInventoryReportConfig struct { AzureStorageConnectionString string `yaml:"AzureStorageConnectionString" default:"DefaultEndpointsProtocol=http;BlobEndpoint=http://localhost:10000/devstoreaccount1;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"` BlobInventoryContainer string `yaml:"BlobInventoryContainer" default:"blob-inventory"` MaxMemory string `yaml:"maxMemory" default:"1GB"` Threads int `yaml:"threads" default:"4"` } type unmarshalledAzureBlobInventoryReportConfig AzureBlobInventoryReportConfig func (c *AzureBlobInventoryReportConfig) UnmarshalYAML(unmarshal func(any) error) error { tmp := new(unmarshalledAzureBlobInventoryReportConfig) if err := defaults.Set(tmp); err != nil { return err } if err := unmarshal(tmp); err != nil { return err } *c = AzureBlobInventoryReportConfig(*tmp) return nil } type AzureBlobInventoryReportDuReader struct { config AzureBlobInventoryReportConfig } type rulesRanByDate = map[time.Time][]string const ( runDatePathFormat = "2006/01/02/15-04-05" maxSaneCountDuRows = 10000000 // 10 million. if breached, maybe adapt duDepth duDepth = 4 // aggregate blob usage 4 dirs deep ) var ( blobInventoryFileRunMatchPattern = regroup.MustCompile(`^(?P<date>\d{4}/\d{2}/\d{2}/\d{2}-\d{2}-\d{2})/(?P<rule>[^/]+)/[^_]+_\d+_\d+.parquet$`) ) func NewAzureBlobInventoryReportDuReader(config AzureBlobInventoryReportConfig) *AzureBlobInventoryReportDuReader { return &AzureBlobInventoryReportDuReader{ config: config, } } func (ar *AzureBlobInventoryReportDuReader) TestConnection() error { blobClient, err := ar.newBlobClient() if err != nil { return err } pager := blobClient.NewListBlobsFlatPager(ar.config.BlobInventoryContainer, &azblob.ListBlobsFlatOptions{MaxResults: int32Ptr(1)}) _, err = pager.NextPage(context.TODO()) return err } func (ar *AzureBlobInventoryReportDuReader) GetStorageAccountName() string { // github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared/ParseConnectionString is unfortunately internal if match := regexp.MustCompile(`AccountName=([^;]+)`).FindStringSubmatch(ar.config.AzureStorageConnectionString); len(match) == 2 { return match[1] } if match := regexp.MustCompile(`BlobEndpoint=([^;]+)`).FindStringSubmatch(ar.config.AzureStorageConnectionString); len(match) == 2 { if blobEndpoint, err := url.Parse(match[1]); blobEndpoint != nil && err != nil { if blobEndpoint.Path != "" { return blobEndpoint.Path } return regexp.MustCompile(`^[^.]+`).FindString(blobEndpoint.Host) } } return "_unknown" } func (ar *AzureBlobInventoryReportDuReader) Read(previousRunDate time.Time) (time.Time, <-chan Row, <-chan error, error) { log.Print("finding newest inventory run") rulesRanByDate, err := ar.findRuns() if err != nil { return time.Time{}, nil, nil, err } runDate, found := getLastRunDate(rulesRanByDate) if !found { err = errors.New("no run date found") return runDate, nil, nil, err } if !runDate.After(previousRunDate) { // no new data err = errors.New("newest run date is not after previous run date") return runDate, nil, nil, err } log.Printf("found newest inventory run: %s", runDate) log.Print("setting up duckdb, including azure blob store connection") db, err := sqlx.Connect("duckdb", "") if err != nil { return runDate, nil, nil, err } err = ar.initDB(db) if err != nil { return runDate, nil, nil, err } rowsReceiver := make(chan Row, maxSaneCountDuRows/100) errReceiver := make(chan error) go ar.readRowsFromInventoryReport(runDate, db, rowsReceiver, errReceiver) return runDate, rowsReceiver, errReceiver, nil } // readRowsFromInventoryReport coarsely aggregates the inventory reports parquet output with duckdb, // grouping all blob names to max duDepth levels deep func (ar *AzureBlobInventoryReportDuReader) readRowsFromInventoryReport(runDate time.Time, db *sqlx.DB, rowsCh chan<- Row, errCh chan<- error) { defer close(rowsCh) defer close(errCh) // language=sql duQuery := ` SELECT array_to_string(string_split(i.Name, '/')[1:-2][1:?], '/') as dir, -- it's ar 1-based index; inclusive boundaries; :-2 strips the filename i."Deleted" as deleted, sum(i."Content-Length") as bytes, count(*) as cnt FROM read_parquet([?]) i GROUP BY dir, deleted ORDER BY bytes DESC LIMIT ? -- sanity limit ` parquetWildcardPath := fmt.Sprintf("az://%s/%s/%s/*.parquet", ar.config.BlobInventoryContainer, runDate.Format(runDatePathFormat), "*") log.Print("start querying blob inventory (might take a while)") dbRows, err := db.Queryx(duQuery, duDepth, parquetWildcardPath, maxSaneCountDuRows) //nolint:sqlclosecheck // it's closed 5 lines down if err != nil { errCh <- err return } defer dbRows.Close() i := 0 for dbRows.Next() { if i >= maxSaneCountDuRows { errCh <- errors.New("du rows count sanity limit was reached") return } var duRow Row err = dbRows.StructScan(&duRow) if err != nil { errCh <- err return } rowsCh <- duRow i++ } log.Printf("done querying blob inventory, %d disk usage rows processed", i) } func (ar *AzureBlobInventoryReportDuReader) initDB(db *sqlx.DB) error { // language=sql azInitQuery := `INSTALL azure; LOAD azure; SET azure_transport_option_type = 'curl'; -- fixes cert issues CREATE SECRET az (TYPE AZURE, PROVIDER CONFIG, CONNECTION_STRING '%s');` azInitQuery = fmt.Sprintf(azInitQuery, removeQuotes(ar.config.AzureStorageConnectionString)) if _, err := db.Exec(azInitQuery); err != nil { return err } // language=sql memSetQuery := `SET max_memory = '%s'; SET threads = %d;` memSetQuery = fmt.Sprintf(memSetQuery, removeQuotes(ar.config.MaxMemory), ar.config.Threads) if _, err := db.Exec(memSetQuery); err != nil { return err } return nil } func (ar *AzureBlobInventoryReportDuReader) findRuns() (rulesRanByDate, error) { blobClient, err := ar.newBlobClient() if err != nil { return nil, err } pager := blobClient.NewListBlobsFlatPager(ar.config.BlobInventoryContainer, nil) rulesRanByDate := make(map[time.Time][]string) for pager.More() { page, err := pager.NextPage(context.TODO()) if err != nil { return nil, err } if page.Segment == nil { break } for _, blob := range page.Segment.BlobItems { g, err := blobInventoryFileRunMatchPattern.Groups(*blob.Name) if err != nil { // no match continue } runDate, err := time.Parse(runDatePathFormat, g["date"]) if err != nil { // unexpected return nil, err } rulesRanByDate[runDate] = append(rulesRanByDate[runDate], g["rule"]) } } return rulesRanByDate, nil } func getLastRunDate(rulesRanByDate rulesRanByDate) (runDate time.Time, ok bool) { dates := maps.Keys(rulesRanByDate) if len(dates) == 0 { return } return slices.MaxFunc(dates, func(i, j time.Time) int { return i.Compare(j) }), true } func (ar *AzureBlobInventoryReportDuReader) newBlobClient() (*azblob.Client, error) { blobClient, err := azblob.NewClientFromConnectionString(ar.config.AzureStorageConnectionString, nil) if err != nil { return nil, err } return blobClient, nil } func int32Ptr(i int32) *int32 { return &i } func removeQuotes(s string) string { return strings.ReplaceAll(s, "'", "") }
package metrics import ( "log" "strconv" "time" "github.com/creasty/defaults" "github.com/PDOK/azure-storage-usage-exporter/internal/agg" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) type Updater struct { config Config aggregator *agg.Aggregator storageUsageGauge *prometheus.GaugeVec lastRunDateMetric prometheus.Gauge lastRunDate time.Time } type Config struct { MetricNamespace string `yaml:"metricNamespace" default:"azure"` MetricSubsystem string `yaml:"metricSubsystem" default:"storage"` Limit int `yaml:"limit" default:"1000"` } type unmarshalledConfig Config func (c *Config) UnmarshalYAML(unmarshal func(any) error) error { tmp := new(unmarshalledConfig) if err := defaults.Set(tmp); err != nil { return err } if err := unmarshal(tmp); err != nil { return err } *c = Config(*tmp) return nil } func NewUpdater(aggregator *agg.Aggregator, config Config) *Updater { // promauto automatically registers with prometheus.DefaultRegisterer storageUsageGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: config.MetricNamespace, Subsystem: config.MetricSubsystem, Name: "usage", }, aggregator.GetLabelNames()) lastRunDateMetricLabels := prometheus.Labels{} storageAccountName := aggregator.GetStorageAccountName() if storageAccountName != "" { lastRunDateMetricLabels[agg.StorageAccount] = storageAccountName } lastRunDateMetric := promauto.NewGauge(prometheus.GaugeOpts{ Namespace: config.MetricNamespace, Subsystem: config.MetricSubsystem, Name: "last_run_date", ConstLabels: lastRunDateMetricLabels, }) return &Updater{ config: config, aggregator: aggregator, storageUsageGauge: storageUsageGauge, lastRunDateMetric: lastRunDateMetric, } } func (ms *Updater) UpdatePromMetrics() error { log.Printf("start updating metrics. previous run was %s", ms.lastRunDate) aggregationResults, lastRunDate, err := ms.aggregator.Aggregate(ms.lastRunDate) if err != nil { if !lastRunDate.IsZero() && lastRunDate.Equal(ms.lastRunDate) { log.Print("no newer blob inventory run found") return nil } return err } log.Print("start setting metrics") ms.lastRunDate = lastRunDate ms.lastRunDateMetric.Set(float64(lastRunDate.UnixNano()) / 1e9) ms.storageUsageGauge.Reset() if len(aggregationResults) > ms.config.Limit { log.Printf("(metrics count will be limited to %d (of %d)", ms.config.Limit, len(aggregationResults)) } for i, aggregationResult := range aggregationResults { if i >= ms.config.Limit { break } ms.storageUsageGauge.With(aggregationGroupToLabels(aggregationResult.AggregationGroup)).Set(float64(aggregationResult.StorageUsage)) } log.Printf("done updating metrics for run %s", ms.lastRunDate) return nil } func aggregationGroupToLabels(aggregationGroup agg.AggregationGroup) prometheus.Labels { labels := aggregationGroup.Labels labels[agg.Deleted] = strconv.FormatBool(aggregationGroup.Deleted) return labels }