// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright (c) 2017 Alexey Kovrizhkin <lekovr+webtail@gmail.com>
// Minor changes
package webtail
import (
"bytes"
"net/http"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
// Client is a middleman between the websocket connection and the hub.
type Client struct {
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
log logr.Logger
}
const (
newline = "\n"
space = " "
)
// runReadPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) runReadPump(wg *sync.WaitGroup, quit chan *Client, inbox chan *Message) {
wg.Add(1)
defer func() {
quit <- c
err := c.conn.Close()
if err != nil {
c.log.Error(err, "ReadPump conn Close")
}
wg.Done()
}()
c.conn.SetReadLimit(maxMessageSize)
err := c.conn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.log.Error(err, "SetReadDeadline")
return
}
c.conn.SetPongHandler(func(string) error { return c.conn.SetReadDeadline(time.Now().Add(pongWait)) })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
c.log.Error(err, "UnexpectedCloseError")
}
return
}
message = bytes.TrimSpace(bytes.ReplaceAll(message, []byte(newline), []byte(space)))
inbox <- &Message{Client: c, Message: message}
}
}
// runWritePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) runWritePump(wg *sync.WaitGroup) {
wg.Add(1)
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
err := c.conn.Close()
if err != nil {
c.log.Error(err, "WritePump conn Close")
}
defer wg.Done()
}()
for {
select {
case message, ok := <-c.send:
err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
c.log.Error(err, "SetWriteDeadline")
return
}
if ok {
c.sendMesage(message)
continue
}
// The hub closed the channel. Send Bye and exit
err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil && err != websocket.ErrCloseSent {
c.log.Error(err, "Close socket")
}
return
case <-ticker.C:
err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
err = c.conn.WriteMessage(websocket.PingMessage, nil)
}
if err != nil {
return
}
}
}
}
func (c *Client) sendMesage(message []byte) {
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
c.log.Error(err, "NextWriter")
return
}
_, err = w.Write(message)
if err != nil {
c.log.Error(err, "Write")
return
}
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
_, err = w.Write([]byte(newline))
if err == nil {
_, err = w.Write(<-c.send)
}
if err != nil {
return
}
}
if err := w.Close(); err != nil {
return
}
}
func upgrader(readBufferSize, writeBufferSize int) websocket.Upgrader {
return websocket.Upgrader{
ReadBufferSize: readBufferSize,
WriteBufferSize: writeBufferSize,
CheckOrigin: func(r *http.Request) bool { return true },
}
}
package main
import (
"net/http"
"os"
"os/signal"
"syscall"
"time"
stats_api "github.com/fukata/golang-stats-api-handler"
"github.com/LeKovr/go-kit/config"
"github.com/LeKovr/go-kit/logger"
"github.com/LeKovr/go-kit/ver"
"github.com/LeKovr/webtail"
)
// Config holds all config vars
type Config struct {
Listen string `long:"listen" default:":8080" description:"Http listen address"`
HTML string `long:"html" default:"" description:"Serve pages from this path"`
Logger logger.Config `group:"Logging Options" namespace:"log" env-namespace:"LOG"`
Tail webtail.Config `group:"Webtail Options"`
}
var (
// App version, actual value will be set at build time.
version = "0.0-dev"
// Repository address, actual value will be set at build time.
repo = "repo.git"
)
// Run app and exit via given exitFunc
func Run(exitFunc func(code int)) {
// Load config
var cfg Config
err := config.Open(&cfg)
defer func() { config.Close(err, exitFunc) }()
if err != nil {
return
}
log := logger.New(cfg.Logger, nil)
log.Info("WebTail. Tail (log)files via web.", "v", version)
var wt *webtail.Service
wt, err = webtail.New(log, &cfg.Tail)
if err != nil {
return
}
go ver.Check(log, repo, version)
http.Handle("/", webtail.FileServer(cfg.HTML))
http.Handle("/tail", wt)
http.HandleFunc("/api/stats", stats_api.Handler)
log.Info("Listen", "addr", cfg.Listen)
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
go wt.Run()
go func() {
// service connections
s := &http.Server{
Addr: cfg.Listen,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
if err = s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
quit <- os.Interrupt
}
}()
<-quit
wt.Close()
log.Info("Server stopped")
}
//go:build !test
// +build !test
// This file holds code which does not covered by tests
package main
import (
"os"
)
func main() {
Run(os.Exit)
}
package webtail
import (
"embed"
"io/fs"
"net/http"
)
//go:embed html
var content embed.FS
// FileServer return embedded or given fs
func FileServer(path string) http.Handler {
if path != "" {
return http.FileServer(http.Dir(path))
}
fsub, err := fs.Sub(content, "html")
if err != nil {
panic(err)
}
return http.FileServer(http.FS(fsub))
}
package webtail
import (
"encoding/json"
"sync"
"time"
"github.com/go-logr/logr"
)
// Returned Messages
const (
MsgSubscribed = "success"
MsgUnSubscribed = "success"
MsgUnknownChannel = "unknown channel"
MsgNotSubscribed = "not subscribed"
MsgWorkerError = "worker create error"
MsgSubscribedAlready = "attached already"
MsgNone = ""
)
// InMessage holds incoming client request
type InMessage struct {
Type string `json:"type"`
Channel string `json:"channel,omitempty"`
}
// TailMessage holds outgoing file tail row
type TailMessage struct {
Type string `json:"type"`
Channel string `json:"channel,omitempty"`
Data string `json:"data,omitempty"`
}
// TraceMessage holds outgoing trace state
type TraceMessage struct {
Type string `json:"type"`
Enabled bool `json:"enabled"`
}
// StatsMessage holds outgoing app stats
type StatsMessage struct {
Type string `json:"type"`
Data map[string]uint64 `json:"data,omitempty"`
}
// IndexItemEvent holds messages from indexer
type IndexItemEvent struct {
ModTime time.Time `json:"mtime"`
Size int64 `json:"size"`
Name string `json:"name"`
Deleted bool `json:"deleted,omitempty"`
}
// IndexMessage holds outgoing message item for file index
type IndexMessage struct {
Type string `json:"type"`
Data IndexItemEvent `json:"data"`
Error string `json:"error,omitempty"`
}
// Message holds received message and sender
type Message struct {
Client *Client
Message []byte
}
// subscribers holds clients subscribed on channel
type subscribers map[*Client]bool
// codebeat:disable[TOO_MANY_IVARS]
// Hub maintains the set of active clients and broadcasts messages to them
type Hub struct {
// Logger
log logr.Logger
// Tail Service workers
workers *TailService
// wg used by Close for wh.WorkerStop ending
wg *sync.WaitGroup
// Registered clients.
clients map[*Client]bool
// Channel subscribers
subscribers map[string]subscribers
// Channel subscriber counts
stats map[string]uint64
// Inbound messages from the clients.
broadcast chan *Message
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
// Inbound messages from the tailers.
receive chan *TailMessage
// Inbound messages from the channel indexer.
index chan *IndexItemEvent
// Quit channel
quit chan struct{}
}
// codebeat:enable[TOO_MANY_IVARS]
// NewHub creates hub for client services
func NewHub(logger logr.Logger, ts *TailService, wg *sync.WaitGroup) *Hub {
return &Hub{
log: logger,
workers: ts,
wg: wg,
clients: make(map[*Client]bool),
subscribers: make(map[string]subscribers),
stats: make(map[string]uint64),
broadcast: make(chan *Message),
register: make(chan *Client),
unregister: make(chan *Client),
receive: make(chan *TailMessage),
index: make(chan *IndexItemEvent),
quit: make(chan struct{}),
}
}
// Run processes hub messages
func (h *Hub) Run() {
h.subscribers[""] = make(subscribers)
h.workers.IndexerRun(h.index, h.wg)
defer h.workers.WorkerStop("")
onAir := true
for {
select {
case client := <-h.register:
if onAir {
h.clients[client] = true
}
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
h.unsubscribeClient(client, onAir)
}
if !onAir && len(h.clients) == 0 {
return
}
case cmessage := <-h.broadcast:
// client sends attach/detach/?list
h.fromClient(cmessage)
case wmessage := <-h.receive:
// tailer sends file line
h.fromTailer(wmessage)
case imessage := <-h.index:
// worker sends index update
h.fromIndexer(imessage)
case <-h.quit:
onAir = false
if len(h.clients) == 0 {
return
}
for client := range h.clients {
close(client.send)
}
}
}
}
// Close closes message processing
func (h *Hub) Close() {
h.quit <- struct{}{}
}
func (h *Hub) fromClient(msg *Message) {
var data []byte
in := InMessage{}
err := json.Unmarshal(msg.Message, &in)
if err != nil {
data, _ = json.Marshal(TailMessage{Type: "error", Data: "parse error"})
h.send(msg.Client, data)
return
}
h.log.Info("Received from Client", "message", in)
switch in.Type {
case "attach":
msgData, ok := h.subscribe(in.Channel, msg.Client)
data = formatTailMessage(in.Channel, "attach", msgData, ok)
case "detach":
msgData, ok := h.unsubscribe(in.Channel, msg.Client)
data = formatTailMessage(in.Channel, "detach", msgData, ok)
case "stats":
// send index counters
data, _ = json.Marshal(StatsMessage{Type: "stats", Data: h.stats})
case "trace":
// on/off tracing
h.workers.SetTrace(in.Channel)
data, _ = json.Marshal(TraceMessage{Type: "trace", Enabled: h.workers.TraceEnabled()})
}
if len(data) > 0 {
h.send(msg.Client, data)
}
}
// fromTailer processes message from worker
func (h *Hub) fromTailer(msg *TailMessage) {
if h.workers.TraceEnabled() {
h.log.Info("Trace from tailer", "channel", msg.Channel, "data", msg.Data, "type", msg.Type)
}
data, _ := json.Marshal(msg)
if msg.Type == "log" && !h.workers.TailerAppend(msg.Channel, data) {
h.log.Info("Incomplete line skipped")
return
}
clients := h.subscribers[msg.Channel]
for client := range clients {
h.send(client, data)
}
}
// process message from indexer
func (h *Hub) fromIndexer(msg *IndexItemEvent) {
if h.workers.TraceEnabled() {
h.log.Info("Trace from indexer", "message", msg)
}
data, _ := json.Marshal(IndexMessage{Type: "index", Data: *msg})
h.workers.IndexUpdate(msg)
clients := h.subscribers[""]
for client := range clients {
h.send(client, data)
}
}
func (h *Hub) subscribe(channel string, client *Client) (string, bool) {
var err error
if !h.workers.ChannelExists(channel) {
return MsgUnknownChannel, false
}
if !h.workers.WorkerExists(channel) {
readyChan := make(chan struct{})
// no producer => create
err = h.workers.TailerRun(channel, h.receive, readyChan, h.wg)
if err != nil {
h.log.Error(err, "Worker create error")
return MsgWorkerError, false
}
h.subscribers[channel] = make(subscribers)
<-readyChan
} else if _, ok := h.subscribers[channel][client]; ok {
return MsgSubscribedAlready, false
}
// Confirm attach
// not via data because have to be first in response
if h.send(client, formatTailMessage(channel, "attach", MsgSubscribed, true)) {
if h.sendReply(channel, client) {
// subscribe client
h.subscribers[channel][client] = true
h.stats[channel]++
}
}
return MsgNone, true
}
func (h *Hub) sendReply(ch string, cl *Client) bool {
if ch != "" {
// send actual buffer
for _, item := range h.workers.TailerBuffer(ch) {
if !h.send(cl, item) {
return false
}
}
return true
}
// send channel index
for _, v := range h.workers.IndexKeys() {
file := h.workers.IndexItem(v)
idx := &IndexMessage{
Type: "index",
Data: IndexItemEvent{
Name: v,
ModTime: file.ModTime,
Size: file.Size,
},
}
data, _ := json.Marshal(idx)
if !h.send(cl, data) {
return false
}
}
return true
}
func (h *Hub) send(client *Client, data []byte) bool {
h.log.Info("Send reply", "message", string(data))
select {
case client.send <- data:
default:
h.unsubscribeClient(client, true)
return false
}
return true
}
// unsubscribeClient removes all client subscriptions
func (h *Hub) unsubscribeClient(client *Client, needsClose bool) {
for k := range h.subscribers {
if _, ok := h.subscribers[k][client]; ok {
h.log.Info("Remove subscriber from channel", "channel", k)
h.unsubscribe(k, client)
}
}
if needsClose {
close(client.send)
}
delete(h.clients, client)
}
func (h *Hub) unsubscribe(channel string, client *Client) (string, bool) {
subscribers, ok := h.subscribers[channel]
if !ok {
return MsgUnknownChannel, false
}
if _, ok = subscribers[client]; !ok {
return MsgNotSubscribed, false
}
delete(h.subscribers[channel], client)
h.stats[channel]--
if channel != "" && h.stats[channel] == 0 {
// tailer has no subscribers => stop it
h.workers.WorkerStop(channel)
}
return MsgUnSubscribed, true
}
// formatTailMessage packs data to json
func formatTailMessage(channel, cmd, msgData string, ok bool) []byte {
if msgData == MsgNone {
return []byte{}
}
msg := TailMessage{Data: msgData, Channel: channel}
if ok {
msg.Type = cmd
} else {
msg.Type = "error"
}
data, _ := json.Marshal(msg)
return data
}
package webtail
// This file holds directory tree indexer methods
import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/dc0d/dirwatch"
"github.com/go-logr/logr"
)
// IndexItemAttr holds File (index item) Attrs
type IndexItemAttr struct {
ModTime time.Time `json:"mtime"`
Size int64 `json:"size"`
}
// IndexItemAttrStore holds all index items
type IndexItemAttrStore map[string]*IndexItemAttr
type indexWorker struct {
out chan *IndexItemEvent
quit chan struct{}
log logr.Logger
root string
}
// IndexerRun runs indexer
func (ts *TailService) IndexerRun(out chan *IndexItemEvent, wg *sync.WaitGroup) {
quit := make(chan struct{})
ts.workers[""] = &TailAttr{Quit: quit}
readyChan := make(chan struct{})
go indexWorker{
out: out,
quit: quit,
log: ts.log,
root: ts.Config.Root,
}.run(readyChan, wg)
<-readyChan
err := loadIndex(ts.index, ts.Config.Root, time.Now())
if err != nil {
ts.log.Error(err, "Path walk")
}
ts.log.V(1).Info("Indexer started")
}
// IndexKeys returns sorted index keys
func (ts *TailService) IndexKeys() []string {
items := ts.index
// To store the keys in slice in sorted order
keys := make([]string, len(items))
i := 0
for k := range items {
keys[i] = k
i++
}
sort.Strings(keys)
return keys
}
// IndexItem returns index item
func (ts *TailService) IndexItem(key string) *IndexItemAttr {
return ts.index[key]
}
// IndexUpdate updates TailService index item
func (ts *TailService) IndexUpdate(msg *IndexItemEvent) {
if !msg.Deleted {
ts.index[msg.Name] = &IndexItemAttr{ModTime: msg.ModTime, Size: msg.Size}
return
}
if _, ok := ts.index[msg.Name]; ok {
ts.log.Info("Deleting path from index", "path", msg.Name)
items := ts.index
for k := range items {
if strings.HasPrefix(k, msg.Name) {
delete(ts.index, k)
}
}
}
}
// run runs indexer worker
func (iw indexWorker) run(readyChan chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer func() {
wg.Done()
iw.log.V(1).Info("Indexer stopped")
}()
notify := func(ev dirwatch.Event) {
iw.log.Info("Handling file event", "event", ev)
if err := sendUpdate(iw.out, iw.root, ev.Name); err != nil {
iw.log.Error(err, "Cannot get stat for file", "filepath", ev.Name)
}
}
logger := func(args ...interface{}) {} // Is it called ever?
watcher := dirwatch.New(dirwatch.Notify(notify), dirwatch.Logger(logger))
defer watcher.Stop()
watcher.Add(iw.root, true)
readyChan <- struct{}{}
<-iw.quit
}
// sendUpdate sends index update to out channel
func sendUpdate(out chan *IndexItemEvent, root, filePath string) error {
dir := strings.TrimSuffix(root, "/")
p := strings.TrimPrefix(filePath, dir+"/")
f, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
out <- &IndexItemEvent{Name: p, Deleted: true}
} else {
return err
}
} else if !f.IsDir() {
out <- &IndexItemEvent{Name: p, ModTime: f.ModTime(), Size: f.Size()}
}
return nil
}
// loadIndex loads index items for the first time
func loadIndex(index IndexItemAttrStore, root string, lastmod time.Time) error {
dir := strings.TrimSuffix(root, "/")
err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if !f.IsDir() {
if f.ModTime().Before(lastmod) {
p := strings.TrimPrefix(path, dir+"/")
index[p] = &IndexItemAttr{ModTime: f.ModTime(), Size: f.Size()}
}
}
return nil
})
return err
}
package webtail
// This file holds directory file tail methods
import (
"io"
"os"
"path"
"path/filepath"
"sync"
"github.com/go-logr/logr"
"github.com/nxadm/tail"
)
// TailAttr holds tail worker attributes
type TailAttr struct {
// Store for last Config.Lines lines
Buffer [][]byte
// Quit worker process
Quit chan struct{}
// Skip 1st line when read file not from start
IsHeadTrimmed bool
}
// TailService holds Worker hub operations
type TailService struct {
log logr.Logger
Config *Config
workers map[string]*TailAttr
index IndexItemAttrStore
}
// tailWorker holds tailer run arguments
type tailWorker struct {
out chan *TailMessage
quit chan struct{}
log logr.Logger
tf *tail.Tail
channel string
}
// NewTailService creates tailer service
func NewTailService(logger logr.Logger, cfg *Config) (*TailService, error) {
_, err := os.Stat(cfg.Root)
if err != nil {
return nil, err
}
aPath, err := filepath.Abs(cfg.Root)
if err != nil {
return nil, err
}
if aPath != cfg.Root {
cfg.Root = aPath
}
return &TailService{
Config: cfg,
log: logger,
workers: make(map[string]*TailAttr),
index: make(IndexItemAttrStore),
}, nil
}
// WorkerExists checks if worker already registered
func (ts *TailService) WorkerExists(channel string) bool {
_, ok := ts.workers[channel]
return ok
}
// ChannelExists checks if channel allowed to attach
func (ts *TailService) ChannelExists(channel string) bool {
if channel == "" {
return true
}
_, ok := ts.index[channel]
return ok
}
// SetTrace turns on/off logging of incoming workers messages
func (ts *TailService) SetTrace(mode string) {
if mode == "on" {
ts.Config.Trace = true
} else if mode == "off" {
ts.Config.Trace = false
}
ts.log.Info("Tracing", "trace", ts.Config.Trace)
}
// TraceEnabled returns trace state
func (ts *TailService) TraceEnabled() bool {
return ts.Config.Trace
}
// WorkerStop stops worker (tailer or indexer)
func (ts *TailService) WorkerStop(channel string) {
w := ts.workers[channel]
w.Quit <- struct{}{}
delete(ts.workers, channel)
}
// TailerBuffer returns worker buffer
func (ts *TailService) TailerBuffer(channel string) [][]byte {
return ts.workers[channel].Buffer
}
// TailerAppend adds a line into worker buffer
func (ts *TailService) TailerAppend(channel string, data []byte) bool {
if ts.workers[channel].IsHeadTrimmed {
// Skip first trimmed (partial) line
ts.workers[channel].IsHeadTrimmed = false
return false
}
buf := ts.workers[channel].Buffer
if len(buf) == ts.Config.Lines {
// drop oldest line if buffer is full
buf = buf[1:]
}
buf = append(buf, data)
ts.workers[channel].Buffer = buf
return true
}
// TailerRun creates and runs tail worker
func (ts *TailService) TailerRun(channel string, out chan *TailMessage, readyChan chan struct{}, wg *sync.WaitGroup) error {
cfg := ts.Config
config := tail.Config{
Follow: true,
ReOpen: true,
MustExist: true,
MaxLineSize: cfg.MaxLineSize,
Poll: cfg.Poll,
}
filename := path.Join(cfg.Root, channel)
headTrimmed := false
if cfg.Bytes != 0 {
fi, err := os.Stat(filename)
if err != nil {
return err
}
// get the file size
size := fi.Size()
if size > cfg.Bytes {
config.Location = &tail.SeekInfo{Offset: -cfg.Bytes, Whence: io.SeekEnd}
headTrimmed = true
}
}
t, err := tail.TailFile(filename, config)
if err != nil {
return err
}
quit := make(chan struct{})
ts.workers[channel] = &TailAttr{Buffer: [][]byte{}, Quit: quit, IsHeadTrimmed: headTrimmed}
go tailWorker{
tf: t,
channel: channel,
out: out,
quit: quit,
log: ts.log,
}.run(readyChan, wg)
return nil
}
// run runs tail worker
func (tw tailWorker) run(readyChan chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer func() {
tw.log.Info("tailworker close")
wg.Done()
}()
log := tw.log.WithValues("channel", tw.channel)
log.Info("Tailer started")
readyChan <- struct{}{}
for {
select {
case line, ok := <-tw.tf.Lines:
if !ok {
log.Error(tw.tf.Err(), "Tailer channel is unavailable")
tw.out <- &TailMessage{Channel: tw.channel, Data: tw.tf.Err().Error(), Type: "error"}
<-tw.quit
return
}
tw.out <- &TailMessage{Channel: tw.channel, Data: line.Text, Type: "log"}
case <-tw.quit:
err := tw.tf.Stop() // Cleanup()
if err != nil {
log.Error(err, "Tailer stopped with error")
} else {
log.Info("Tailer stopped")
}
return
}
}
}
// Package webtail holds tailer service
// You don't need anything except Service methods
package webtail
import (
"net/http"
"sync"
"github.com/go-logr/logr"
)
// codebeat:disable[TOO_MANY_IVARS]
// Config defines local application flags
type Config struct {
Root string `long:"root" default:"log/" description:"Root directory for log files"`
Bytes int64 `long:"bytes" default:"5000" description:"tail from the last Nth location"`
Lines int `long:"lines" default:"100" description:"keep N old lines for new consumers"`
MaxLineSize int `long:"split" default:"180" description:"split line if longer"`
ListCache int `long:"cache" default:"2" description:"Time to cache file listing (sec)"`
Poll bool `long:"poll" description:"use polling, instead of inotify"`
Trace bool `long:"trace" description:"trace worker channels"`
ClientBufferSize int `long:"out_buf" default:"256" description:"Client Buffer Size"`
WSReadBufferSize int `long:"ws_read_buf" default:"1024" description:"WS Read Buffer Size"`
WSWriteBufferSize int `long:"ws_write_buf" default:"1024" description:"WS Write Buffer Size"`
}
// codebeat:enable[TOO_MANY_IVARS]
// Service holds WebTail service
type Service struct {
cfg *Config
hub *Hub
wg *sync.WaitGroup
log logr.Logger
}
// New creates WebTail service
func New(log logr.Logger, cfg *Config) (*Service, error) {
tail, err := NewTailService(log, cfg)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
hub := NewHub(log, tail, &wg)
service := Service{cfg: cfg, hub: hub, log: log, wg: &wg}
return &service, nil
}
// Run runs a message hub
func (wt *Service) Run() {
wt.hub.Run()
}
// Close stops a message hub
func (wt *Service) Close() {
wt.log.Info("Service Exiting")
wt.hub.Close()
wt.wg.Wait()
}
// Handle handles websocket requests from the peer
func (wt *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wsUpgrader := upgrader(wt.cfg.WSReadBufferSize, wt.cfg.WSWriteBufferSize)
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
wt.log.Error(err, "Upgrade connection")
return
}
client := &Client{
conn: conn,
send: make(chan []byte, wt.cfg.ClientBufferSize),
log: wt.log,
}
wt.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.runWritePump(wt.wg)
go client.runReadPump(wt.wg, wt.hub.unregister, wt.hub.broadcast)
}