package main import ( "flag" "log" "math" "os" "sync" "time" "go.vxn.dev/xilt/internal/config" "go.vxn.dev/xilt/internal/database" "go.vxn.dev/xilt/internal/parser" "go.vxn.dev/xilt/internal/reader" "go.vxn.dev/xilt/pkg/logger" ) const ( reservedRoutines = 2 ) func getRoutineCount(cfg *config.Config) int { // Max number of routines is equal to the maximum memory usage limit / the average size of a batch, taking into account the configured average log size and batch size // reservedRoutines are subtracted because one write routine is running concurrently and at the same time another batch is being put together by reading from the log file, which also has to be taken into account return int(math.Max(1, math.Floor(float64(cfg.MaxMemoryUsageMB)/(cfg.AverageLogSizeMB*float64(cfg.BatchSize)))-reservedRoutines)) } func main() { // Start timer start := time.Now() // Load config cfg, err := config.Load(flag.CommandLine, os.Args[1:]) if err != nil { log.Fatalln("error loading config:", err) } l := logger.NewLogger(cfg.Verbose) l.Debug("config loaded...") db := database.NewDB(l, cfg) if err := db.Init(); err != nil { log.Fatalln("error initializing database: ", err) } defer func() { if err := db.Close(); err != nil { l.Println("error closing database:", err) } l.Debug("database closed...") }() // Logs are distributed to parsing routines in batches via this channel batchChannel := make(chan []string) // Parsing routines distribute batches of parsed logs to the single writing routine via this channel parsedLogChannel := make(chan []parser.Log) var batchWg sync.WaitGroup var insertWg sync.WaitGroup parser, err := parser.NewParser(l, nil) if err != nil { l.Println("error creating parser: ", err) return } // Spin up routines to parse logs routineCount := getRoutineCount(cfg) l.Debugf("spinning up %d log parsing routines...", routineCount) for i := range routineCount { batchWg.Add(1) go parser.ParseBatch(i, batchChannel, parsedLogChannel, &batchWg) } l.Debug("log parsing routines spawned...") // There is only one DB write routine due to SQLite's single-writer model insertWg.Add(1) go db.InsertBatch(parsedLogChannel, &insertWg) l.Debug("batch insert routine spawned...") // Instantiate a reader which will read from the configured input file and push raw logs into the batchChannel for parsing reader := reader.NewReader(l, cfg) if err := reader.ReadAndBatch(batchChannel); err != nil { l.Printf("error while reading from log file and batching: %v", err) return } close(batchChannel) batchWg.Wait() close(parsedLogChannel) insertWg.Wait() if err := db.CreateIndexes(); err != nil { l.Println("error creating table indexes: ", err) } // Stop timer & print duration end := time.Now() l.Println("log parsing finished") l.Printf("elapsed time: %s", end.Sub(start)) }
// Package config provides access to configurable parameters affecting the workings of the app (e.g. the path to the log file to parse, the path to the DB file to store the parsed logs in, the batch size or the memory usage limit). package config import ( "flag" "fmt" "path/filepath" ) type Config struct { BatchSize int InputFilePath string DBFilePath string MaxMemoryUsageMB int AverageLogSizeMB float64 Verbose bool CreateIndexes bool } const ( defaultInputFilePath = "access.log" defaultDbFilePath = "logs.db" defaultBatchSize = 5000 defaultMaxMemUsageMB = 100 defaultAverageLogSizeMB = 0.001 // 1 KB defaultVerbose = false defaultCreateIndexes = false ) func defineFlags(fs *flag.FlagSet, cfg *Config) { fs.IntVar(&cfg.BatchSize, "batchSize", defaultBatchSize, "Defines the batch size. Used for calculating the number of goroutines to spin up.") fs.IntVar(&cfg.MaxMemoryUsageMB, "maxMemUsage", defaultMaxMemUsageMB, "Defines the maximum allowed memory usage in Megabytes. Used for calculating the number of goroutines to spin up.") fs.Float64Var(&cfg.AverageLogSizeMB, "avgLogSize", defaultAverageLogSizeMB, "Defines the average size of one log in MB. Used for calculating the number of goroutines to spin up.") fs.BoolVar(&cfg.Verbose, "v", defaultVerbose, "Defines whether verbose mode should be used.") fs.BoolVar(&cfg.CreateIndexes, "i", defaultCreateIndexes, "Defines whether indexes should be created in the parsed logs' table.") } // Load attempts to parse flags and args and update the config with the parsed values. A default value is returned for each field if no value is specified in a flag/arg. If successful, it returns the updated config. Otherwise, an error is returned. func Load(fs *flag.FlagSet, args []string) (*Config, error) { cfg := &Config{ BatchSize: defaultBatchSize, InputFilePath: defaultInputFilePath, DBFilePath: defaultDbFilePath, MaxMemoryUsageMB: defaultMaxMemUsageMB, AverageLogSizeMB: defaultAverageLogSizeMB, Verbose: defaultVerbose, CreateIndexes: defaultCreateIndexes, } defineFlags(fs, cfg) // Parse flags if err := fs.Parse(args); err != nil { return nil, fmt.Errorf("error parsing flags: %v", err) } // Parse args parsedArgs := fs.Args() if len(parsedArgs) >= 1 { cleanPath := filepath.Clean(parsedArgs[0]) if cleanPath != "." { cfg.InputFilePath = cleanPath } else { return nil, fmt.Errorf("the provided log file path is invalid") } } if len(parsedArgs) >= 2 { cleanPath := filepath.Clean(parsedArgs[1]) if cleanPath != "." { cfg.DBFilePath = cleanPath } else { return nil, fmt.Errorf("the provided DB file path is invalid") } } if err := cfg.validate(); err != nil { return nil, err } return cfg, nil } // validate checks that the currently configured values make sense for continuing with log processing. // TODO: validate file paths? func (cfg *Config) validate() error { if cfg.MaxMemoryUsageMB <= 0 { return fmt.Errorf("MaxMemoryUsageMB must be greater than 0. Got %d", cfg.MaxMemoryUsageMB) } if cfg.AverageLogSizeMB <= 0 { return fmt.Errorf("AverageLogSizeMB must be greater than 0. Got %f", cfg.AverageLogSizeMB) } if cfg.BatchSize <= 0 { return fmt.Errorf("BatchSize must be greater than 0. Got %d", cfg.BatchSize) } return nil }
// Package database provides methods for interacting with the database to store parsed logs. package database import ( "database/sql" "fmt" "sync" _ "github.com/ncruces/go-sqlite3/driver" _ "github.com/ncruces/go-sqlite3/embed" "go.vxn.dev/xilt/internal/config" "go.vxn.dev/xilt/internal/parser" "go.vxn.dev/xilt/pkg/logger" ) const ( createLogTableScript = `CREATE TABLE "logs" ("ID" INTEGER NOT NULL, "IP" TEXT, "Identity" TEXT,"UserID" TEXT, "Time" TEXT, "TimestampUTC" TEXT , "Method" TEXT, "Route" TEXT, "Params" TEXT, "ResponseCode" INTEGER, "BytesSent" INTEGER, "Referer" TEXT, "Agent" TEXT, PRIMARY KEY("id" AUTOINCREMENT));` insertLogStatement = "INSERT INTO logs (IP, Identity, UserID, Time, TimestampUTC, Method, Route, Params, ResponseCode, BytesSent, Referer, Agent) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" createIndexesScript = ` CREATE INDEX idx_logs_ip ON logs(IP); CREATE INDEX idx_logs_ts ON logs(TimestampUTC); CREATE INDEX idx_logs_method ON logs(Method); CREATE INDEX idx_logs_route ON logs(Route); CREATE INDEX idx_logs_referer ON logs(Referer); CREATE INDEX idx_logs_ts_ip ON logs(TimestampUTC, IP); ` ) type db struct { conn *sql.DB logger logger.Logger config *config.Config } type Database interface { Init(cfg *config.Config) error Close() error InsertBatch(id int, parsedLogChan <-chan []parser.Log, wg *sync.WaitGroup) } // NewDB returns a new instance of a DB struct initialized with the provided logger and config. func NewDB(l logger.Logger, c *config.Config) *db { return &db{ logger: l, config: c, } } // Init initializes the DB struct. It attempts to connect to the database, configure it to better optimize write performance and creates a table for storage of parsed logs. func (d *db) Init() error { // Connect to DB db, err := sql.Open("sqlite3", "file:"+d.config.DBFilePath) if err != nil { return fmt.Errorf("failed to connect to DB: %w", err) } d.conn = db // A wrapper to close the DB connection after an error and also handle a possible DB closing error // Used for the SQLite queries below handleFailure := func(originalErr error) error { if closeErr := d.conn.Close(); closeErr != nil { d.logger.Println("failed to close DB after error: ", closeErr) } return originalErr } // Optimize SQLite for writes _, err = d.conn.Exec("PRAGMA synchronous = OFF;") if err != nil { return handleFailure(fmt.Errorf("failed to set PRAGMA synchronous: %w", err)) } // WAL mode has issues with saving big (50k+) batch sizes and creating indexes on tables containing large amounts of logs, therefore the default rollback journal method is used instead. // https://github.com/Tencent/wcdb/issues/243 // _, err = d.conn.Exec("PRAGMA journal_mode = WAL;") // if err != nil { // return handleFailure(fmt.Errorf("failed to set journal_mode: %w", err)) // } // Create a new table to store parsed logs _, err = d.conn.Exec(createLogTableScript) if err != nil { return handleFailure(fmt.Errorf("failed to create log table: %w", err)) } d.logger.Debug("DB initialized...") return nil } // Close closes the connection of the DB struct if it is not nil. func (d *db) Close() error { if d.conn == nil { return nil } return d.conn.Close() } // InsertBatch inserts a batch of logs from an output channel to the database. It is optimized for concurrent usage in goroutines. func (d *db) InsertBatch(parsedLogChan <-chan []parser.Log, wg *sync.WaitGroup) { defer wg.Done() for parsedLogBatch := range parsedLogChan { d.logger.Debug("write routine beginning insert") tx, err := d.conn.Begin() if err != nil { d.logger.Printf("write routine failed to start transaction: %v", err) continue } stmt, err := tx.Prepare(insertLogStatement) if err != nil { d.logger.Printf("write routine failed to prepare statement: %v", err) if err = tx.Rollback(); err != nil { d.logger.Printf("write routine failed to roll back transaction: %v", err) } continue } for _, parsedLog := range parsedLogBatch { _, err := stmt.Exec(parsedLog.IP, parsedLog.Identity, parsedLog.User, parsedLog.Time, parsedLog.TimestampUTC, parsedLog.Method, parsedLog.Route, parsedLog.Params, parsedLog.ResponseCode, parsedLog.BytesSent, parsedLog.Referer, parsedLog.Agent) if err != nil { d.logger.Printf("write routine failed to insert: %v", err) if err = tx.Rollback(); err != nil { d.logger.Printf("write routine failed to roll back transaction: %v", err) } break } } if err := stmt.Close(); err != nil { d.logger.Println("write routine failed to close statement: %v", err) } if err := tx.Commit(); err != nil { d.logger.Println("write routine failed to commit transaction: %v", err) } else { d.logger.Debugf("write routine successfully inserted batch of %d logs", len(parsedLogBatch)) } } } // CreateIndexes creates indexes on the log table if enabled in the config provided to the DB struct. func (d *db) CreateIndexes() error { if d.config.CreateIndexes { d.logger.Println("creating table indexes...") if _, err := d.conn.Exec(createIndexesScript); err != nil { return err } d.logger.Println("table indexes created...") return nil } return nil }
// Package parser provides functionality enabling parsing of batches of logs in raw string format (Combined and Common Log Formats are currently supported), returning a batch of parsed logs in a structured format for further processing and/or storage. It is designed for efficient batch processing and concurrent handling of log data. package parser import ( "errors" "regexp" "strconv" "strings" "sync" "time" "go.vxn.dev/xilt/pkg/logger" ) type Log struct { IP string Identity string User string Time string TimestampUTC string Method string Route string Params string ResponseCode uint16 BytesSent uint32 Referer string Agent string } type Parser interface { parseLog(l string) (*Log, error) ParseBatch(id int, batchChan <-chan []string, parsedLogChan chan<- []Log, wg *sync.WaitGroup) } type parser struct { logger logger.Logger regex *regexp.Regexp } const ( defaultParams = "-" defaultBytesSent = 0 defaultReferer = "-" defaultAgent = "-" defaultLogTimeLayout = "02/Jan/2006:15:04:05 -0700" timestampUTCLayout = "2006-01-02T15:04:05Z07:00" ) var ( defaultRegex = `^(?<ip>\S*).* (?<identity>\S*) (?<user>\S*) \[(?<timestamp>.*)\]\s"(?<method>\S*)\s(?<route>\S*)\s(?<protocol>[^"]*)"\s(?<response>\S*)\s(?<bytes>\S*)\s?"?(?<referrer>[^"]*)"?\s?"?(?<agent>[^"]*)"?\s*$` ) // NewParser returns a new Parser instance. It takes a logger instance implementing the Logger interface and a regex pattern string. If the regex pattern is not passed (passing a nil pointer instead), the default regex pattern is used to create the Parser instance. func NewParser(l logger.Logger, r *string) (*parser, error) { if r == nil { r = &defaultRegex } regex, err := regexp.Compile(*r) if err != nil { return nil, err } return &parser{ logger: l, regex: regex, }, nil } // parseLog takes a single log in raw form and returns a parsed Log struct for further manipulation. func (p *parser) parseLog(l string) (*Log, error) { matches := p.regex.FindStringSubmatch(l) if matches == nil { return nil, errors.New(("invalid log format: " + l)) } // Initialize the Log struct with default values where needed. If the log being parsed contains the values (not "-"), the default values will be replaced. parsedLog := Log{ Params: defaultParams, Referer: defaultReferer, Agent: defaultAgent, } // TODO: use net.ParseIP? parsedLog.IP = matches[1] parsedLog.Identity = matches[2] parsedLog.User = matches[3] parsedLog.Time = matches[4] parsedLog.Method = matches[5] // Parse route and params uri := strings.Split(matches[6], "?") parsedLog.Route = uri[0] if len(uri) > 1 { parsedLog.Params = uri[1] } // Parse response code responseCode, err := strconv.ParseUint(matches[8], 10, 16) if err != nil { p.logger.Println("error parsing response code: ", err) } else { parsedLog.ResponseCode = uint16(responseCode) } // Parse UTC timestamp parsedTime, err := time.Parse(defaultLogTimeLayout, matches[4]) if err != nil { p.logger.Println("error parsing time:", err) } else { parsedLog.TimestampUTC = parsedTime.UTC().Format(timestampUTCLayout) } // Parse bytes sent bytesSent, err := strconv.ParseUint(matches[9], 10, 64) if err != nil { // If there is an error parsing as int and the value is not "-", there is some issue with the format. If the value is "-", it is a valid value, therefore no error is logged and the default value of "-" is used. if matches[9] != "-" { p.logger.Debugf("error parsing bytes sent: %v. proceeding with default value", err) } } else { parsedLog.BytesSent = uint32(bytesSent) } // Parse Referer if present if len(matches) > 9 && matches[10] != "" { parsedLog.Referer = matches[10] } // Parse Agent if present if len(matches) > 10 && matches[11] != "" { parsedLog.Agent = matches[11] } return &parsedLog, nil } // ParseBatch reads batches of raw logs from an input channel, parses each log in the batch, and sends successfully parsed logs from the batch to an output channel for further processing or storage. It is designed to run concurrently as part of a goroutine, and only valid logs are included in the output batch. func (p *parser) ParseBatch(id int, batchChan <-chan []string, parsedLogChan chan<- []Log, wg *sync.WaitGroup) { defer wg.Done() for batch := range batchChan { p.logger.Debugf("routine %d beginning to parse a batch of %d logs", id, len(batch)) parsedLogs := make([]Log, 0, len(batch)) for _, logEntry := range batch { parsedLog, err := p.parseLog(logEntry) if err != nil { p.logger.Println("error parsing log: ", err) continue } parsedLogs = append(parsedLogs, *parsedLog) } p.logger.Debugf("routine %d successfully parsed a batch", id) parsedLogChan <- parsedLogs } }
// Package reader provides functionality for reading from a log file and pushing raw logs to a batch channel to be processed. package reader import ( "bufio" "errors" "fmt" "io/fs" "os" "go.vxn.dev/xilt/internal/config" "go.vxn.dev/xilt/pkg/logger" ) type reader struct { logger logger.Logger cfg *config.Config } // NewReader returns a new instance of the Reader struct. func NewReader(l logger.Logger, c *config.Config) *reader { return &reader{ logger: l, cfg: c, } } // ReadAndBatch reads from the file configured in the Reader struct's config and pushes raw logs into a batch channel for further processing. func (r *reader) ReadAndBatch(batchChannel chan<- []string) error { file, err := os.Open(r.cfg.InputFilePath) if err != nil { switch { case errors.Is(err, fs.ErrNotExist): return fmt.Errorf("error: file '%s' does not exist", r.cfg.InputFilePath) case errors.Is(err, fs.ErrPermission): return fmt.Errorf("error: insufficient permissions to read file '%s'", r.cfg.InputFilePath) default: return fmt.Errorf("error opening file '%s': %v", r.cfg.InputFilePath, err) } } r.logger.Debug("log file opened...") defer func() { file.Close() r.logger.Debug("log file closed...") }() scanner := bufio.NewScanner(file) batch := make([]string, 0, r.cfg.BatchSize) r.logger.Println("beginning reading from log file and the parsing process...") // Iterate over the lines in the log file and push them into the batch slice until the batch size or EOF is reached for scanner.Scan() { line := scanner.Text() if len(line) > 0 { batch = append(batch, line) if len(batch) == r.cfg.BatchSize { batchChannel <- batch batch = make([]string, 0, r.cfg.BatchSize) } } } // Push the remaining logs if any if len(batch) > 0 { batchChannel <- batch } if err := scanner.Err(); err != nil { return fmt.Errorf("error reading file '%s': %v", r.cfg.InputFilePath, err) } return nil }
// Package logger provides a wrapper around the log library. It enables the usage of the Debug and Debugf methods which log only if the boolean provided to NewLogger is set to true. package logger import ( "log" ) type Logger interface { Println(v ...any) Printf(format string, v ...any) Debug(v ...any) Debugf(format string, v ...any) } type logger struct { verbose bool } // NewLogger returns a new instance of a logger. func NewLogger(v bool) *logger { return &logger{ verbose: v, } } // Println calls Output to print to the standard logger. // Arguments are handled in the manner of [fmt.Println]. func (l *logger) Println(v ...any) { log.Println(v...) } // Println calls Output to print to the standard logger. // Arguments are handled in the manner of [fmt.Printf]. func (l *logger) Printf(format string, v ...any) { log.Printf(format, v...) } // Debug calls Output to print to the standard logger if the verbose flag is enabled in the config. // Arguments are handled in the manner of [fmt.Println]. func (l *logger) Debug(v ...any) { if l.verbose { log.Println(v...) } } // Debugf calls Output to print to the standard logger if the verbose flag is enabled in the config. // Arguments are handled in the manner of [fmt.Printf]. func (l *logger) Debugf(format string, v ...any) { if l.verbose { log.Printf(format, v...) } }