package bidetwriter
import (
"bufio"
"io"
"os"
)
// BidetWriter a writer that calls the provided Flushfunc() function after the io.Writer.Write() call.
type BidetWriter struct {
writeCloser io.Closer
Writer io.Writer
FlushFunc func() error
}
// Takes a write closer and will close the file if Close() is called.
func NewBidetWriter(w io.WriteCloser) BidetWriter {
bufWriter := bufio.NewWriter(w)
return BidetWriter{
writeCloser: w,
Writer: bufWriter,
FlushFunc: bufWriter.Flush,
}
}
func (bw BidetWriter) Close() error {
if bw.writeCloser != os.Stdout {
return bw.writeCloser.Close()
}
return nil
}
func (bw BidetWriter) Write(b []byte) (n int, err error) {
n, err = bw.Writer.Write(b)
flushErr := bw.FlushFunc()
// Return the Write() error first before we return the underlying flush error
if err == nil {
err = flushErr
}
return
}
package config
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
// Entry point to read in the provided file, resolve the connections, readers and writers and apply the configuration
func ApplyConfigurationFromFile(filepath string) {
config, err := readConfig(filepath)
if err != nil {
fmt.Printf("Failed to apply configuration from filepath [%s]. Err: [%s].\n", filepath, err.Error())
return
}
err = config.Initialise()
if err != nil {
fmt.Println(err.Error())
return
}
defer config.Close()
signalCtx, signalStopFunc := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer signalStopFunc()
ctx, cancelFunc := context.WithCancel(signalCtx)
// TODO: If we have stdin configured, we need to start another go routine that is grabbing content from stdin
go applyConfig(ctx, cancelFunc, config.Conns, config.Settings)
<-ctx.Done()
}
// Read and return a Config from the provided filepath
func readConfig(filePath string) (Config, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return Config{}, err
}
var config Config
err = yaml.Unmarshal(data, &config)
return config, err
}
func applyConfig(ctx context.Context, cancelFunc context.CancelFunc, connections []Connection, settings ConfigSettings) {
// TODO: This needs to be smarter and understand the "flow" of information and call the correct reader and writers in the correct order
startTime := time.Now()
for {
for _, connection := range connections {
written, err := io.Copy(connection.Writer, connection.Reader)
if err != nil {
// TODO: Add a debug flag to enable this
fmt.Printf("Error occurred when copying content from reader [%s] to writer(s) [%s]. Error: [%s]\n", connection.ReaderId, connection.WriterIds, err.Error())
}
if written > 0 {
fmt.Printf("Wrote [%d] bytes from reader [%s] to writer(s) [%s].\n", written, connection.ReaderId, connection.WriterIds)
startTime = time.Now()
}
}
select {
// This will only be detected if a OS signal is received
case <-ctx.Done():
return
default:
timeDifference := time.Since(startTime)
if settings.Timeout > 0 && timeDifference.Seconds() >= float64(settings.Timeout) {
cancelFunc()
return
}
}
}
}
package config
import (
"fmt"
"io"
"os"
"github.com/Kilemonn/flow/stdio"
"gopkg.in/yaml.v3"
)
const (
StdIn string = "stdin"
StdOut string = "stdout"
)
type Config struct {
Connections []ConfigConnection
Nodes ConfigNodes
Settings ConfigSettings
models map[string]ConfigModel `json:"-"`
readers map[string]io.ReadCloser `json:"-"`
writers map[string]io.WriteCloser `json:"-"`
Conns []Connection `json:"-"`
}
type ConfigNodes struct {
Ports []ConfigPort
Files []ConfigFile
Sockets []ConfigSocket
Ipcs []ConfigIPC
}
type Connection struct {
Reader io.Reader
ReaderId string
Writer io.Writer
WriterIds []string
}
type ConfigConnection struct {
ReaderID string
WriterID string
}
// An interface that all Config* objects will implement.
type ConfigModel interface {
// GetID returns the ID of the model
GetID() string
// Validate will perform any validation on behalf of the object, or any pre-setup that is required
Validate() error
// Reader will get a reader for the underlying configuration
Reader() (io.ReadCloser, error)
// Writer will get a writer for the underlying configuration
Writer() (io.WriteCloser, error)
}
func (c *Config) Initialise() error {
err := c.validate()
if err != nil {
return err
}
err = c.combineToReadersAndWriters()
if err != nil {
return err
}
c.createConnections()
return nil
}
// Write the provided Config to the provided filepath
func (c Config) writeConfig(filePath string) error {
yamlData, err := yaml.Marshal(&c)
if err != nil {
return err
}
f, err := os.Create(filePath)
if err != nil {
return err
}
defer f.Close()
_, err = io.Writer.Write(f, yamlData)
return err
}
func (c *Config) validate() error {
// TODO: Add checks to make sure there is no loops, OR any nodes that are not connected (have no connection)?
err := c.componentIDsAreUnique()
if err != nil {
return err
}
for _, model := range c.models {
err = model.Validate()
if err != nil {
return err
}
}
return nil
}
func isInvalidID(id string) bool {
return id == StdIn || id == StdOut
}
// Check that the IDs of files and ports are unique and also do not clash with stdin or stdout
func (c *Config) componentIDsAreUnique() error {
c.models = make(map[string]ConfigModel)
nodes := c.Nodes
for _, port := range nodes.Ports {
if _, exists := c.models[port.GetID()]; isInvalidID(port.GetID()) || exists {
return fmt.Errorf("found port with a duplicate ID [%s] defined or is overriding \"%s\" or \"%s\"", port.GetID(), StdIn, StdOut)
} else {
c.models[port.GetID()] = &port
}
}
for _, file := range nodes.Files {
if _, exists := c.models[file.GetID()]; isInvalidID(file.GetID()) || exists {
return fmt.Errorf("found file with a duplicate ID [%s] defined or is overriding \"%s\" or \"%s\"", file.GetID(), StdIn, StdOut)
} else {
c.models[file.GetID()] = file
}
}
for _, socket := range nodes.Sockets {
if _, exists := c.models[socket.GetID()]; isInvalidID(socket.GetID()) || exists {
return fmt.Errorf("found socket with a duplicate ID [%s] defined or is overriding \"%s\" or \"%s\"", socket.GetID(), StdIn, StdOut)
} else {
c.models[socket.GetID()] = socket
}
}
for _, ipc := range nodes.Ipcs {
if _, exists := c.models[ipc.GetID()]; isInvalidID(ipc.GetID()) || exists {
return fmt.Errorf("found ipc with a duplicate ID [%s] defined or is overriding \"%s\" or \"%s\"", ipc.GetID(), StdIn, StdOut)
} else {
c.models[ipc.GetID()] = ipc
}
}
return nil
}
// Load all configured readers and writers and load them into the returned map with "id" -> [io.ReadCloser] / [io.WriteCloser] as appropriate.
// StdIn and StdOut are also initialised and returned in these maps.
func (c *Config) combineToReadersAndWriters() error {
c.readers = make(map[string]io.ReadCloser)
stdIn, _ := stdio.CreateStdInReader()
c.readers[StdIn] = stdIn
c.writers = make(map[string]io.WriteCloser)
stdOut, _ := stdio.CreateStdOutWriter()
c.writers[StdOut] = stdOut
// Firstly iterate over and ONLY initialise the READER (listening) sockets, since if we connect to ourself we need to make sure
// the reader is listening first before the writer connects to us (for TCP). See below for the second loop.
// This is the same for IPC channels.
for _, connection := range c.Connections {
rID := connection.ReaderID
if _, exists := c.readers[rID]; !exists {
if model, ok := c.models[rID]; ok {
r, err := model.Reader()
if err != nil {
return err
}
c.readers[rID] = r
}
}
}
// Move the socket writer init and IPC init to a second loop:
for _, connection := range c.Connections {
wID := connection.WriterID
if _, exists := c.writers[wID]; !exists {
if model, ok := c.models[wID]; ok {
w, err := model.Writer()
if err != nil {
return err
}
c.writers[wID] = w
}
}
}
// -1 from reader and writer count since stdin and stdout are always registered
fmt.Printf("Configured [%d] writers, [%d] readers and [%d] connections.\n", len(c.writers)-1, len(c.readers)-1, len(c.Connections))
return nil
}
// Create the connection objects which contains the [io.ReadCloser] and its [io.WriteCloser].
// This will look up and resolve multiple writers per reader, and bundle them in a [io.MultiWriter].
func (c *Config) createConnections() {
convertedReaders := make(map[string]bool)
c.Conns = make([]Connection, 0)
for _, conf := range c.Connections {
if _, exists := convertedReaders[conf.ReaderID]; !exists {
writer, writerIds := c.getWritersForReaderId(conf.ReaderID)
convertedReaders[conf.ReaderID] = true
if writer != nil {
c.Conns = append(c.Conns, Connection{
Reader: c.readers[conf.ReaderID],
ReaderId: conf.ReaderID,
Writer: writer,
WriterIds: writerIds,
})
} else {
fmt.Printf("Resolved no matching writers for reader with id [%s]", conf.ReaderID)
}
}
}
}
// Get all the [io.WriteCloser] that has the provided [string] as its registered [io.ReadCloser]. If only a single [io.WriteCloser] is resolved it will
// be returned, otherwise if there are multiple they will be wrapped in an [io.MultiWriter].
func (c Config) getWritersForReaderId(readerId string) (io.Writer, []string) {
w := []io.Writer{}
writerNames := []string{}
for _, conf := range c.Connections {
if conf.ReaderID == readerId {
w = append(w, c.writers[conf.WriterID])
writerNames = append(writerNames, conf.WriterID)
}
}
if len(w) == 0 {
return nil, writerNames
} else if len(w) == 1 {
return w[0], writerNames
} else {
return io.MultiWriter(w...), writerNames
}
}
// Close all provided reader and writers
// Only the "first" occurring error will be returned, writers are closed first.
func (c Config) Close() error {
var err error
for _, w := range c.writers {
e := w.Close()
if e != nil && err == nil {
err = e
}
}
for _, r := range c.readers {
e := r.Close()
if e != nil && err == nil {
err = e
}
}
return err
}
package config
import (
"errors"
"fmt"
"io"
"os"
"github.com/Kilemonn/flow/bidetwriter"
"github.com/Kilemonn/flow/file"
)
type ConfigFile struct {
ID string
Path string
// Determines whether this file is in truncate mode or append mode. By default this is false
// meaning it is in append mode.
Trunc bool
file *file.SyncFileReadWriter
}
// [ConfigModel.GetID]
func (c ConfigFile) GetID() string {
return c.ID
}
// [ConfigModel.Validate]
func (c ConfigFile) Validate() error {
// TODO: Should we fail on input files that don't exist?
if _, err := os.Stat(c.Path); errors.Is(err, os.ErrNotExist) {
file, err := os.Create(c.Path)
if err != nil {
return fmt.Errorf("failed to create file with ID [%s] and path [%s] with error %s", c.GetID(), c.Path, err.Error())
}
file.Close()
}
return nil
}
// [ConfigModel.Reader]
func (c ConfigFile) Reader() (io.ReadCloser, error) {
err := c.initialiseFile()
return c.file, err
}
// [ConfigModel.Writer]
func (c ConfigFile) Writer() (io.WriteCloser, error) {
err := c.initialiseFile()
if err != nil {
return nil, err
}
return bidetwriter.NewBidetWriter(c.file), nil
}
func (c *ConfigFile) initialiseFile() error {
if c.file == nil {
mode := os.O_CREATE | os.O_RDWR
if c.Trunc {
mode |= os.O_TRUNC
}
temp, err := file.NewSynchronisedFileReadWriter(c.Path, mode)
c.file = &temp
return err
}
return nil
}
package config
import (
"io"
"github.com/Kilemonn/flow/ipc"
)
type ConfigIPC struct {
ID string
Channel string
}
// [ConfigModel.GetID]
func (c ConfigIPC) GetID() string {
return c.ID
}
// [ConfigModel.Validate]
func (c ConfigIPC) Validate() error {
return nil
}
// [ConfigModel.Reader]
func (c ConfigIPC) Reader() (io.ReadCloser, error) {
return ipc.NewIPCReader(c.Channel)
}
// [ConfigModel.Writer]
func (c ConfigIPC) Writer() (io.WriteCloser, error) {
return ipc.NewIPCWriter(c.Channel)
}
package config
import (
"fmt"
"io"
"slices"
"time"
"github.com/Kilemonn/flow/serial"
goSerial "go.bug.st/serial"
)
type ConfigPort struct {
ID string
Channel string
Mode goSerial.Mode
// The resolved and connected port, in a scenario where we call validate
Port *serial.CustomPort `json:"-"`
ReadTimeout int
}
// [ConfigModel.GetID]
func (c ConfigPort) GetID() string {
return c.ID
}
// [ConfigModel.Validate]
func (c ConfigPort) Validate() error {
ports := serial.GetSerialPorts()
if !slices.Contains(ports, c.Channel) {
return fmt.Errorf("no port with name [%s] is available/connected", c.Channel)
}
return nil
}
func (c *ConfigPort) Open() error {
port, err := serial.OpenSerialConnection(c.Channel, c.Mode)
if err != nil {
return fmt.Errorf("failed to open connection to port with comm [%s] and ID [%s] with error: [%s]", c.Channel, c.GetID(), err.Error())
}
if c.ReadTimeout > 0 {
err = port.SetReadTimeout(time.Millisecond * time.Duration(c.ReadTimeout))
if err != nil {
return fmt.Errorf("failed to set timeout on serial port connection with comm [%s] and ID [%s] with error: [%s]", c.Channel, c.GetID(), err.Error())
}
}
customPort := serial.NewCustomPort(port)
c.Port = &customPort
return nil
}
// [ConfigModel.Reader]
func (c *ConfigPort) Reader() (io.ReadCloser, error) {
if c.Port == nil {
err := c.Open()
if err != nil {
return nil, err
}
}
return *c.Port, nil
}
// [ConfigModel.Writer]
func (c *ConfigPort) Writer() (io.WriteCloser, error) {
if c.Port == nil {
err := c.Open()
if err != nil {
return nil, err
}
}
return *c.Port, nil
}
package config
import (
"io"
"github.com/Kilemonn/flow/socket"
)
type ConfigSocket struct {
ID string
Protocol string
Port uint16
Address string
}
// [ConfigModel.GetID]
func (c ConfigSocket) GetID() string {
return c.ID
}
// [ConfigModel.Validate]
func (c ConfigSocket) Validate() error {
return nil
}
// [ConfigModel.Reader]
func (c ConfigSocket) Reader() (io.ReadCloser, error) {
return socket.CreateSocketReader(c.Protocol, c.Address, c.Port)
}
// [ConfigModel.Writer]
func (c ConfigSocket) Writer() (io.WriteCloser, error) {
return socket.CreateSocketWriter(c.Protocol, c.Address, c.Port)
}
package file
import (
"io"
"io/fs"
"os"
"sync"
)
type SyncFileReadWriter struct {
file *os.File
mutex sync.Mutex
}
// NewSynchronisedFileReadWriter create a new SyncFileReadWriter.
// Do not pass os.O_APPEND as a flag, it is not required.
func NewSynchronisedFileReadWriter(filepath string, flags int) (SyncFileReadWriter, error) {
file, err := os.OpenFile(filepath, flags, fs.ModeType)
if err != nil {
return SyncFileReadWriter{}, err
}
// Move to the start of the file to make sure the reading pointer is ready
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return SyncFileReadWriter{}, err
}
return SyncFileReadWriter{
file: file,
}, nil
}
// [io.Reader]
func (rw *SyncFileReadWriter) Read(b []byte) (int, error) {
rw.mutex.Lock()
defer rw.mutex.Unlock()
return rw.file.Read(b)
}
// [io.Writer]
// Moves the file pointer to the end, performs the read, then returns it back to its prior position ready to Read
func (rw *SyncFileReadWriter) Write(b []byte) (int, error) {
rw.mutex.Lock()
defer rw.mutex.Unlock()
// Get current position
currentPos, err := rw.file.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
// Move to the end to perform the write
_, err = rw.file.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
// Return back to previous position after write
defer rw.file.Seek(currentPos, io.SeekStart)
return rw.file.Write(b)
}
// [io.Closer]
func (rw *SyncFileReadWriter) Close() error {
rw.mutex.Lock()
defer rw.mutex.Unlock()
return rw.file.Close()
}
package main
import (
"flag"
"fmt"
"os"
"github.com/Kilemonn/flow/config"
"github.com/Kilemonn/flow/serial"
)
const (
APPLICATION_VERSION = "1.0.0"
MENU_OPTION_HELP = "help"
MENU_OPTION_SERIAL = "serial"
MENU_OPTION_SERIAL_LS = "serialls"
MENU_OPTION_CONFIG_APPLY = "config-apply"
)
func main() {
var (
comFlag string
baudFlag uint
parityFlag string
dataSizeFlag uint
twoStopBitFlag bool
configFilePath string
)
flag.StringVar(&comFlag, "com", "", "com serial name")
flag.UintVar(&baudFlag, "baud", 0, "baud rate")
flag.StringVar(&parityFlag, "parity", "", "parity bit")
flag.UintVar(&dataSizeFlag, "data-size", 0, "data size")
flag.BoolVar(&twoStopBitFlag, "two-stop-bits", false, "two stop bit")
flag.StringVar(&configFilePath, "f", "", "configuration file path")
flag.Parse()
if len(os.Args) <= 1 || len(flag.Args()) == 0 {
printHelp()
return
}
switch flag.Args()[0] {
case MENU_OPTION_HELP:
printHelp()
case MENU_OPTION_SERIAL:
serial.StartSerial(comFlag, baudFlag, parityFlag, dataSizeFlag, twoStopBitFlag)
case MENU_OPTION_SERIAL_LS:
serial.SerialList()
case MENU_OPTION_CONFIG_APPLY:
config.ApplyConfigurationFromFile(configFilePath)
default:
printHelp()
}
}
func printHelp() {
fmt.Printf("flow - cli v%s.\n", APPLICATION_VERSION)
fmt.Printf("%s -f <file configuration path> - Create and apply the connection forwarding between reader and writers defined in the config file.\n", MENU_OPTION_CONFIG_APPLY)
fmt.Printf("%s -com <COM0 or /dev/tty/USB0> -baud <baud rate> -parity <Even / Odd> -data-size <default is 8> -two-stop-bits <true is 2, false is 1 (default)> - Create a serial connection with device.\n", MENU_OPTION_SERIAL)
fmt.Printf("%s - List connected serial devices.\n", MENU_OPTION_SERIAL_LS)
}
package ipc
import (
"io"
"time"
"github.com/Kilemonn/flow/queuedreader"
ipcClient "github.com/Kilemonn/go-ipc/client"
ipcServer "github.com/Kilemonn/go-ipc/server"
)
const (
IPCReadDeadline = 10 * time.Millisecond
)
type IPCReader struct {
server ipcServer.IPCServer
clients []ipcClient.IPCClient
}
func (r IPCReader) Close() (err error) {
for _, c := range r.clients {
e := c.Close()
if e != nil && err == nil {
err = e
}
}
e := r.server.Close()
if e != nil && err == nil {
err = e
}
return err
}
// Get the amount of active connections
func (r IPCReader) connectionCount() int {
return len(r.clients)
}
// Check if any incoming connections are pending to be accepted.
// This is naturally blocking, so there is a deadline set for [IPCReadDeadline]
// before this function returns with no accepted connections.
func (r *IPCReader) acceptWaitingConnections() {
for {
client, err := r.server.Accept(IPCReadDeadline)
if err != nil {
return
}
client.ReadTimeout = IPCReadDeadline
r.clients = append(r.clients, client)
}
}
func (r *IPCReader) Read(b []byte) (n int, err error) {
r.acceptWaitingConnections()
q := queuedreader.NewQueuedReader(r.clients)
return q.Read(b)
}
func NewIPCReader(ipcChannelName string) (io.ReadCloser, error) {
server, err := ipcServer.NewIPCServer(ipcChannelName, &ipcServer.IPCServerConfig{Override: true})
if err != nil {
return nil, err
}
return &IPCReader{server: *server}, nil
}
package ipc
import (
"io"
ipcClient "github.com/Kilemonn/go-ipc/client"
)
func NewIPCWriter(ipcChannelName string) (io.WriteCloser, error) {
return ipcClient.NewIPCClient(ipcChannelName)
}
package queuedreader
import (
"errors"
"io"
"net"
)
// An error handler function that receives the index of the reader when the error occurred and the read object itself
type ErrorHandlerFunc[R io.Reader] func(int, R)
// A reader that sequentially attempts to read from the list of [io.Reader], similar to the [io.MultiReader].
// A Pre-read handler function, on EOF handler and on timeout handler function can be provided which will be called
// accordingly.
// Note, that when an EOF or timeout occurs, the reader will attempt to read from the next [io.Reader] until a
// result is returned OR another error occurs (not EOF or timeout).
type QueuedReader[R io.Reader] struct {
readers []R
preReadFunc func(R)
onEOFFunc ErrorHandlerFunc[R]
onTimeoutFunc ErrorHandlerFunc[R]
}
// NewQueuedReader creates a new [QueuedReader] from the provided slice of [io.Reader]s.
func NewQueuedReader[R io.Reader](readers []R) QueuedReader[R] {
return QueuedReader[R]{
readers: readers,
preReadFunc: nil,
onEOFFunc: nil,
onTimeoutFunc: nil,
}
}
// SetEOFHandlerFunc sets the function called when EOF occurs (note that this will not stop the read in loop)
func (q *QueuedReader[R]) SetEOFHandlerFunc(handler ErrorHandlerFunc[R]) {
q.onEOFFunc = handler
}
// SetTimeoutHandlerFunc sets the function called when a timeout occurs (note that this will not stop the
// read in loop)
func (q *QueuedReader[R]) SetTimeoutHandlerFunc(handler ErrorHandlerFunc[R]) {
q.onTimeoutFunc = handler
}
// SetPreReadHandlerFunc sets the function called before a read attempt (for network connections you can set
// deadline configuration or other things here)
func (q *QueuedReader[R]) SetPreReadHandlerFunc(handler func(R)) {
q.preReadFunc = handler
}
// Read will iterate over the stored [io.Reader]s and return the first that performs a successful read (without EOF),
// or on the first non-EOF and non-Timeout error, or [io.EOF] will be returned if all [io.Reader]s timeout or return
// [io.EOF].
func (q QueuedReader[R]) Read(b []byte) (int, error) {
for i, r := range q.readers {
if q.preReadFunc != nil {
q.preReadFunc(r)
}
n, err := r.Read(b)
if err != nil {
if e, ok := err.(net.Error); ok && e.Timeout() {
// Call handler and continue to next reader
if q.onTimeoutFunc != nil {
q.onTimeoutFunc(i, r)
}
} else if errors.Is(err, io.EOF) {
// EOF occurs when the remote closes the connection OR when there is no data to be read (depending on the reader)
// Call handler and continue to next reader
if q.onEOFFunc != nil {
q.onEOFFunc(i, r)
}
} else {
// On other errors, make sure we return immediately to the caller
return n, err
}
} else {
// If there is no error, return the read number of bytes to the caller
return n, err
}
}
return 0, io.EOF
}
package serial
import (
"io"
goSerial "go.bug.st/serial"
)
// Wraps the [goSerial.Port] because its Read() function does not return EOF on timeout. This causes problems with [io.Copy]
// when this is used as a reader, since no error is returned it will hang.
// https://github.com/bugst/go-serial/issues/141
type CustomPort struct {
Port goSerial.Port
}
func NewCustomPort(port goSerial.Port) CustomPort {
return CustomPort{
Port: port,
}
}
// [io.Reader.Read]
func (p CustomPort) Read(b []byte) (n int, err error) {
n, err = p.Port.Read(b)
// TODO: The library should fix this, https://github.com/bugst/go-serial/issues/141
if n == 0 && err == nil {
return 0, io.EOF
}
return n, err
}
// [io.Writer.Write]
func (p CustomPort) Write(b []byte) (int, error) {
return p.Port.Write(b)
}
// [io.Closer.Close]
func (p CustomPort) Close() error {
return p.Port.Close()
}
package serial
import (
"fmt"
"io"
"slices"
goSerial "go.bug.st/serial"
)
func CreatePort(channel string, mode goSerial.Mode) (io.ReadWriteCloser, error) {
return CreateReadWriter(channel, mode)
}
func CreateReadWriter(portName string, mode goSerial.Mode) (io.ReadWriteCloser, error) {
ports := GetSerialPorts()
if !slices.Contains(ports, portName) {
return nil, fmt.Errorf("failed to find port with name [%s]", portName)
}
port, err := OpenSerialConnection(portName, mode)
if err != nil {
return nil, err
}
return port, err
}
package serial
import (
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/Kilemonn/flow/stdio"
goSerial "go.bug.st/serial"
)
const (
POLLING_DELAY_MS = "100"
)
func parseParity(parity string) goSerial.Parity {
parityVal := goSerial.NoParity
if strings.ToLower(parity) == "even" {
parityVal = goSerial.EvenParity
} else if strings.ToLower(parity) == "odd" {
parityVal = goSerial.OddParity
}
return parityVal
}
func parseStopBits(stopBits bool) goSerial.StopBits {
stopBitsVal := goSerial.OneStopBit
if stopBits {
stopBitsVal = goSerial.TwoStopBits
}
return stopBitsVal
}
func parseSerialSettings(baud uint, parity string, dataLen uint, stopBits bool) (goSerial.Mode, error) {
if dataLen == 0 {
return goSerial.Mode{}, errors.New("data length must be set and greater than 0")
} else if strings.ToLower(parity) != "even" && strings.ToLower(parity) != "odd" && len(parity) != 0 {
return goSerial.Mode{}, errors.New("parity must be 'even' or 'odd'")
} else if baud == 0 {
return goSerial.Mode{}, errors.New("baud rate must be set and greater than 0")
}
mode := goSerial.Mode{
BaudRate: int(baud),
Parity: parseParity(parity),
DataBits: int(dataLen),
StopBits: parseStopBits(stopBits),
}
return mode, nil
}
func OpenSerialConnection(com string, mode goSerial.Mode) (goSerial.Port, error) {
return goSerial.Open(com, &mode)
}
func GetSerialPorts() []string {
ports, err := goSerial.GetPortsList()
if err != nil {
fmt.Printf("Failed to retrieve ports list. Error: [%s]", err.Error())
return []string{}
}
if len(ports) == 0 {
fmt.Printf("No serial ports connected.")
return []string{}
}
return ports
}
func printSerialPorts() {
for _, port := range GetSerialPorts() {
fmt.Printf("%v\n", port)
}
}
func rxPrintThread(port goSerial.Port, writer io.Writer, newLine bool) error {
buffer := make([]byte, 100)
counter := 0
if newLine {
for {
_, err := port.Read(buffer)
if err != nil {
return err
}
counter = counter + 1
// fmt.Printf("Refresh%v = Received %v\n", counter, string(buffer))
writer.Write(buffer)
}
} else {
for {
_, err := port.Read(buffer)
if err != nil {
return err
}
counter = counter + 1
// fmt.Printf("Refresh%v = Received %v\n", counter, string(buffer))
writer.Write(buffer)
}
}
}
func SerialList() {
printSerialPorts()
}
func StartSerial(com string, baud uint, parity string, dataLen uint, stopBits bool) {
mode, err := parseSerialSettings(baud, parity, dataLen, stopBits)
if err != nil {
fmt.Printf("Failed to parse serial settings. Error: [%s]", err.Error())
return
}
port, err := OpenSerialConnection(com, mode)
if err != nil {
fmt.Printf("Failed to open serial connection. Error: [%s]", err.Error())
return
}
reader, _ := stdio.CreateStdInReader()
bytes, err := io.ReadAll(reader)
if err != nil {
fmt.Printf("Failed to read in all data from provided input stream. Error: [%s]", err.Error())
return
}
// this needs to be fixed just testing things out
writer, _ := stdio.CreateStdOutWriter()
go rxPrintThread(port, writer, false)
time.Sleep(1 * time.Second)
// "ACDEF\n\r"
n, err := port.Write(bytes)
if err != nil {
fmt.Printf("Failed to write bytes to port [%s]. Error [%s].\n", com, err.Error())
return
}
fmt.Printf("Sent %d bytes\n", n)
time.Sleep(2 * time.Second)
}
package socket
import (
"fmt"
"io"
"net"
"net/netip"
"strings"
"time"
)
const (
SocketReadDeadline = 10 * time.Millisecond
)
func CreateSocketReader(protocol string, addr string, port uint16) (io.ReadCloser, error) {
if strings.ToLower(protocol) == "tcp" {
return NewTCPSocketReader(addr, port)
} else if strings.ToLower(protocol) == "udp" {
return NewUDPSocketReader(addr, port)
} else {
return nil, fmt.Errorf("invalid protocol provided [%s]", protocol)
}
}
func NewUDPSocketReader(addr string, port uint16) (io.ReadCloser, error) {
address, err := netip.ParseAddr(addr)
if err != nil {
return nil, err
}
udpAddr := net.UDPAddrFromAddrPort(netip.AddrPortFrom(address, port))
conn, err := net.ListenUDP("udp", udpAddr)
return UDPTimeoutReader{Conn: conn}, err
}
func NewTCPSocketReader(addr string, port uint16) (io.ReadCloser, error) {
address, err := netip.ParseAddr(addr)
if err != nil {
return nil, err
}
tcpAddr := net.TCPAddrFromAddrPort(netip.AddrPortFrom(address, port))
listener, err := net.ListenTCP("tcp", tcpAddr)
return &TCPTimeoutReader{Listener: listener}, err
}
package socket
import (
"fmt"
"io"
"net"
"net/netip"
"strings"
)
func CreateSocketWriter(protocol string, addr string, port uint16) (io.WriteCloser, error) {
if strings.ToLower(protocol) == "tcp" {
return NewTCPSocketWriter(addr, port)
} else if strings.ToLower(protocol) == "udp" {
return NewUDPSocketWriter(addr, port)
} else {
return nil, fmt.Errorf("invalid protocol provided [%s]", protocol)
}
}
func NewUDPSocketWriter(addr string, port uint16) (io.WriteCloser, error) {
address, err := netip.ParseAddr(addr)
if err != nil {
return nil, err
}
udpAddr := net.UDPAddrFromAddrPort(netip.AddrPortFrom(address, port))
return net.DialUDP("udp", nil, udpAddr)
}
func NewTCPSocketWriter(addr string, port uint16) (io.WriteCloser, error) {
address, err := netip.ParseAddr(addr)
if err != nil {
return nil, err
}
tcpAddr := net.TCPAddrFromAddrPort(netip.AddrPortFrom(address, port))
return net.DialTCP("tcp", nil, tcpAddr)
}
package socket
import (
"net"
"slices"
"time"
"github.com/Kilemonn/flow/queuedreader"
)
type TCPTimeoutReader struct {
Listener *net.TCPListener
Conns []*net.TCPConn
indicies []int
}
// Close all connections then the listener. Only the first occurring error will be returned.
func (r TCPTimeoutReader) Close() error {
var err error
for _, c := range r.Conns {
e := c.Close()
if e != nil && err == nil {
err = e
}
}
e := r.Listener.Close()
if e != nil && err == nil {
err = e
}
return err
}
// Get the amount of active connections
func (r *TCPTimeoutReader) connectionCount() int {
return len(r.Conns)
}
// Check if any incoming connections are pending to be accepted.
// This is naturally blocking, so there is a deadline set for [ScoketReadDeadline]
// before this function returns with no accepted connections.
func (r *TCPTimeoutReader) acceptWaitingConnections() {
for {
r.Listener.SetDeadline(time.Now().Add(SocketReadDeadline))
conn, err := r.Listener.AcceptTCP()
if err != nil {
// We got an error and it IS a timeout so leave without error
if e, ok := err.(net.Error); ok && e.Timeout() {
return
}
}
r.Conns = append(r.Conns, conn)
}
}
// Removes connections from the connections list that have been marked for removal.
func (r *TCPTimeoutReader) removeClosedConnections() {
if len(r.indicies) == 0 {
return
}
slices.Sort(r.indicies)
// Sort and then reverse iterate so we don't change any of the indicies of further forward elements when we remove them
for _, i := range slices.Backward(r.indicies) {
r.Conns = append(r.Conns[:i], r.Conns[i+1:]...)
}
r.indicies = []int(nil)
}
// Firstly calls [acceptWaitingConnections].
// Then wraps the read with a deadline to timeout the Read attempt if there is no incoming data.
// Timeout used is [SocketReadDeadline].
// Removes any connections that have been closed.
func (r *TCPTimeoutReader) Read(b []byte) (n int, err error) {
// Firstly we need to accept any connections and add them to our connection list
r.acceptWaitingConnections()
defer r.removeClosedConnections()
q := queuedreader.NewQueuedReader(r.Conns)
q.SetPreReadHandlerFunc(func(conn *net.TCPConn) {
conn.SetReadDeadline(time.Now().Add(SocketReadDeadline))
})
// EOF occurs when the remote closes the connection OR when there is no data to be read (depending on the reader)
q.SetEOFHandlerFunc(func(i int, conn *net.TCPConn) {
r.indicies = append(r.indicies, i)
})
return q.Read(b)
}
package socket
import (
"io"
"net"
"time"
)
type UDPTimeoutReader struct {
Conn *net.UDPConn
}
func (r UDPTimeoutReader) Close() error {
return r.Conn.Close()
}
// Wraps the read with a deadline to timeout the Read attempt if there is no incoming data.
// Timeout used is [SocketReadDeadline].
func (r UDPTimeoutReader) Read(b []byte) (n int, err error) {
r.Conn.SetReadDeadline(time.Now().Add(SocketReadDeadline))
n, err = r.Conn.Read(b)
if err != nil {
// We got an error and it IS a timeout so leave without error
if e, ok := err.(net.Error); ok && e.Timeout() {
// Return EOF here so the call from io.Copy doesn't permanently loop
return n, io.EOF
}
}
return
}
package stdio
import (
"io"
"os"
)
func CreateStdInReader() (io.ReadCloser, error) {
return os.Stdin, nil
}
package stdio
import (
"io"
"os"
"github.com/Kilemonn/flow/bidetwriter"
)
func CreateStdOutWriter() (io.WriteCloser, error) {
return bidetwriter.NewBidetWriter(os.Stdout), nil
}
package testutil
import (
"net"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// WithTempFile helper function to create a temp file before calling a method that accepts the temp file name
// the temp file is removed after this function finishes
func WithTempFile(t *testing.T, testFunc func(string)) {
temp, err := os.CreateTemp("", "*")
require.NoError(t, err)
defer os.Remove(temp.Name())
testFunc(temp.Name())
}
// CaptureStdout captures and returns an os.File that contains all content written to stdout during the provided test function
// stdout is returned to normal after this function
func CaptureStdout(t *testing.T, testFunc func()) *os.File {
reader, writer, err := os.Pipe()
require.NoError(t, err)
defer writer.Close()
// Revert stdout after the end of this function
defer func(v *os.File) { os.Stdout = v }(os.Stdout)
os.Stdout = writer
testFunc()
return reader
}
// WithBytesInStdIn pre-load stdin with the provided bytes before running the provided test
// reverts std in after the test is complete
func WithBytesInStdIn(t *testing.T, bytes []byte, testFunc func()) {
reader, writer, err := os.Pipe()
require.NoError(t, err)
n, err := writer.Write(bytes)
require.NoError(t, err)
require.Equal(t, len(bytes), n)
writer.Close()
// Revert stdin after the end of this function
defer func(v *os.File) { os.Stdin = v }(os.Stdin)
os.Stdin = reader
testFunc()
}
// TakesAtleast asserts that the provided func thats atleast the provided duration or longer to complete
func TakesAtleast(t *testing.T, duration time.Duration, testFunc func()) {
start := time.Now()
testFunc()
diff := time.Since(start)
require.GreaterOrEqual(t, diff, duration)
}
func GetUDPPort(conn *net.UDPConn) uint16 {
if conn != nil {
if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok {
return addr.AddrPort().Port()
}
}
return 0
}
func GetTCPPort(conn *net.TCPListener) uint16 {
if conn != nil {
if addr, ok := conn.Addr().(*net.TCPAddr); ok {
return addr.AddrPort().Port()
}
}
return 0
}