package handlers
import (
"github.com/rabbitmq/amqp091-go"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/management/mappers"
)
type CopyHandler struct {
publisher messaging.Publisher
destQueue string
}
func NewCopyHandler(publisher messaging.Publisher, destQueue string) *CopyHandler {
return &CopyHandler{publisher: publisher, destQueue: destQueue}
}
func (h *CopyHandler) Handle(msg amqp091.Delivery) (bool, error) {
// copy/publish message to the destination queue
return true, h.publisher.Publish(h.destQueue, mappers.DeliveryPublishing(msg))
}
// Code generated by mockery v2.33.3. DO NOT EDIT.
package mocks
import (
amqp091 "github.com/rabbitmq/amqp091-go"
mock "github.com/stretchr/testify/mock"
)
// MessageHandler is an autogenerated mock type for the MessageHandler type
type MessageHandler struct {
mock.Mock
}
// Handle provides a mock function with given fields: msg
func (_m *MessageHandler) Handle(msg amqp091.Delivery) (bool, error) {
ret := _m.Called(msg)
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(amqp091.Delivery) (bool, error)); ok {
return rf(msg)
}
if rf, ok := ret.Get(0).(func(amqp091.Delivery) bool); ok {
r0 = rf(msg)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(amqp091.Delivery) error); ok {
r1 = rf(msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewMessageHandler creates a new instance of MessageHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMessageHandler(t interface {
mock.TestingT
Cleanup(func())
}) *MessageHandler {
mock := &MessageHandler{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package handlers
import (
"github.com/rabbitmq/amqp091-go"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/management/mappers"
)
type MoveHandler struct {
publisher messaging.Publisher
destQueue string
}
func NewMoveHandler(publisher messaging.Publisher, destQueue string) *MoveHandler {
return &MoveHandler{publisher: publisher, destQueue: destQueue}
}
func (h *MoveHandler) Handle(msg amqp091.Delivery) (bool, error) {
// move/publish message to the destination queue
err := h.publisher.Publish(h.destQueue, mappers.DeliveryPublishing(msg))
if err != nil {
return true, err
}
return false, nil
}
package handlers
import (
"github.com/rabbitmq/amqp091-go"
)
type PurgeHandler struct{}
func NewPurgeHandler() *PurgeHandler {
return &PurgeHandler{}
}
func (h *PurgeHandler) Handle(_ amqp091.Delivery) (bool, error) {
return false, nil
}
package handlers
import (
"encoding/json"
"os"
"github.com/rabbitmq/amqp091-go"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/selectors"
)
type ViewHandler struct {
count int
outputFile *os.File
}
func NewViewHandler(count int, outputFile *os.File) *ViewHandler {
return &ViewHandler{count: count, outputFile: outputFile}
}
func (h *ViewHandler) Handle(msg amqp091.Delivery) (bool, error) {
// if we viewed --count messages, return
if h.count <= 0 {
return true, nil
}
var encoder *json.Encoder
if h.outputFile != nil {
encoder = json.NewEncoder(h.outputFile)
} else {
encoder = json.NewEncoder(os.Stdout)
}
err := encoder.Encode(selectors.SubsetFromDelivery(msg)) // write message to file/stdout for viewing
if err != nil {
return true, err
}
h.count--
return true, nil
}
package managers
import (
"context"
"log/slog"
"time"
"github.com/rabbitmq/amqp091-go"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/management/handlers"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/management/mappers"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/selectors"
)
const (
partialQueueManagementCtxCancelHelpMsg = `Source queue has potentially been partially managed.
Please check if some messages have been moved from the source queue to temporary queue.
Try to manage queue again and specify the --tempQueue parameter with the currently used temporary queue.
That will cause QueueManager to continue processing from the last processed message that caused error and move all tempQueue messages (also those that were moved to tempQueue during the failed command) to source queue when finished, preserving the order.`
partialQueueManagementHelpMsg = `Source queue has potentially been partially managed.
Please check if some messages have been moved from the source queue to temporary queue.
Please check if the last processed message (the one that caused the error, you can do that using "view --count=1") is requeued back to the front of the source queue.
If message is not at the front, please move message to the front of the source queue manually.
If some messages have been moved and last processed message is at the front, try to manage queue again and specify the --tempQueue parameter with the currently used temporary queue.
That will cause QueueManager to continue processing from the last processed message that caused error and move all tempQueue messages (also those that were moved to tempQueue during the failed command) to source queue when finished, preserving the order.
If publishing to the destination queue (move/copy commands) succeeded, but acknowledging the message failed, please manually remove the duplicated message from the source or destination queue.`
partialTempQueueMoveHelpMsg = `Please manually move remaining messages from the temporary queue to source queue (use "move" command).
Before doing that, please check if the last processed message (the one that caused the error, you can do that using "view --count=1") is requeued back to the front of the temporary queue.
If message is not at the front, please move message to the front of the temporary queue manually.
If publishing to the source queue succeeded, but acknowledging the message failed, please manually remove the duplicated message from the temporary or source queue.`
partialTempQueueMoveCtxCancelHelpMsg = `Please manually move remaining messages from the temporary queue to source queue (use "move" command).`
)
type QueueManager struct {
consumer messaging.Consumer
log *slog.Logger
handler handlers.MessageHandler
publisher messaging.Publisher
selector selectors.Selector
tempQueue string
}
func NewQueueManager(consumer messaging.Consumer, log *slog.Logger, handler handlers.MessageHandler, publisher messaging.Publisher, selector selectors.Selector, tempQueue string) *QueueManager {
return &QueueManager{consumer: consumer, log: log, handler: handler, publisher: publisher, selector: selector, tempQueue: tempQueue}
}
// region Public
func (m *QueueManager) Manage(ctx context.Context, srcQueue string) error {
messages, err := m.consumer.Consume(srcQueue)
if err != nil {
return err
}
m.log.Info("processing source queue")
startTime := time.Now()
var processedMessages, selectedMessages int
var lastProcessedMessage amqp091.Delivery
defer func() {
m.log.Info("processing source queue finished",
slog.Int("processedMessages", processedMessages),
slog.Int("selectedMessages", selectedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}()
loop:
for {
select {
case msg := <-messages:
processedMessages++
selected, err := m.selector.IsSelected(msg)
if err != nil {
return m.handleMsgProcessingError("error occurred while checking if message is selected", err, msg, srcQueue)
}
requeue := true
if selected {
selectedMessages++
// process message with the provided handler
requeue, err = m.handler.Handle(msg)
if err != nil {
return m.handleMsgProcessingError("error occurred while handling message", err, msg, srcQueue)
}
}
if requeue {
// move/publish message to the temporary queue
err = m.publisher.Publish(m.tempQueue, mappers.DeliveryPublishing(msg))
if err != nil {
return m.handleMsgProcessingError("error occurred while publishing message to temporary queue", err, msg, srcQueue)
}
}
err = msg.Ack(false) // purge/remove message from the source queue
if err != nil {
m.logMsgProcessingError("error occurred while acknowledging message", err, msg, srcQueue)
return err
}
if processedMessages%1000 == 0 {
m.log.Info("processing source queue progress",
slog.Int("processedMessages", processedMessages),
slog.Int("selectedMessages", selectedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}
lastProcessedMessage = msg
case <-ctx.Done():
m.log.Error("context cancelled while processing source queue",
slog.Any("error", ctx.Err()),
slog.Any("lastProcessedMessage", lastProcessedMessage),
slog.String("srcQueue", srcQueue),
slog.String("tempQueue", m.tempQueue),
slog.String("help", partialQueueManagementCtxCancelHelpMsg),
)
return ctx.Err()
case <-time.After(time.Second):
break loop
}
}
// move messages back to source queue from temporary queue
return m.moveTempToSource(ctx, m.tempQueue, srcQueue)
}
func (m *QueueManager) moveTempToSource(ctx context.Context, tempQueue, srcQueue string) error {
messages, err := m.consumer.Consume(tempQueue)
if err != nil {
return err
}
m.log.Info("moving messages from temporary to source queue")
startTime := time.Now()
var movedMessages int
var lastMovedMessage amqp091.Delivery
defer func() {
m.log.Info("moving messages from temporary to source queue finished",
slog.Int("movedMessages", movedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}()
loop:
for {
select {
case msg := <-messages:
// move/publish message back to the source queue
err = m.publisher.Publish(srcQueue, mappers.DeliveryPublishing(msg))
if err != nil {
m.logMoveTempToSrcErr("error occurred while moving message from temporary to source queue", err, msg, srcQueue, tempQueue)
errReject := msg.Reject(false)
if errReject != nil {
m.logMoveTempToSrcErr("error occurred while rejecting message", errReject, msg, srcQueue, tempQueue)
}
return err
}
movedMessages++
err = msg.Ack(false)
if err != nil {
m.logMoveTempToSrcErr("error occurred while acknowledging message", err, msg, srcQueue, tempQueue)
return err
}
if movedMessages%1000 == 0 {
m.log.Info("moving messages from temporary to source queue progress",
slog.Int("movedMessages", movedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}
lastMovedMessage = msg
case <-ctx.Done():
m.log.Error("context cancelled while moving messages from temporary to source queue",
slog.Any("error", ctx.Err()),
slog.Any("lastMovedMessage", lastMovedMessage),
slog.String("srcQueue", srcQueue),
slog.String("tempQueue", tempQueue),
slog.String("help", partialTempQueueMoveCtxCancelHelpMsg),
)
return ctx.Err()
case <-time.After(time.Second):
break loop
}
}
return nil
}
// endregion
// region Private
func (m *QueueManager) handleMsgProcessingError(errMsg string, err error, msg amqp091.Delivery, srcQueue string) error {
m.logMsgProcessingError(errMsg, err, msg, srcQueue)
errReject := msg.Reject(true)
if errReject != nil {
m.logMsgProcessingError("error occurred while rejecting message", errReject, msg, srcQueue)
}
return err
}
func (m *QueueManager) logMsgProcessingError(errMsg string, err error, msg amqp091.Delivery, srcQueue string) {
m.log.Error(errMsg,
slog.Any("error", err),
slog.Any("msg", msg),
slog.String("srcQueue", srcQueue),
slog.String("tempQueue", m.tempQueue),
slog.String("help", partialQueueManagementHelpMsg),
)
}
func (m *QueueManager) logMoveTempToSrcErr(errMsg string, err error, msg amqp091.Delivery, srcQueue, tempQueue string) {
m.log.Error(errMsg,
slog.Any("error", err),
slog.Any("msg", msg),
slog.String("srcQueue", srcQueue),
slog.String("tempQueue", tempQueue),
slog.String("help", partialTempQueueMoveHelpMsg),
)
}
// endregion
package managers
import (
"context"
"log/slog"
"time"
"github.com/rabbitmq/amqp091-go"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/management/handlers"
"github.com/happening-oss/rabbitmq-message-ops/internal/messaging/selectors"
)
const (
partialStreamManagementHelpMsg = "Source stream has potentially been partially managed. Please act accordingly on the destination queue."
)
type StreamManager struct {
consumer messaging.Consumer
log *slog.Logger
handler handlers.MessageHandler
publisher messaging.Publisher
selector selectors.Selector
}
func NewStreamManager(consumer messaging.Consumer, log *slog.Logger, handler handlers.MessageHandler, publisher messaging.Publisher, selector selectors.Selector) *StreamManager {
return &StreamManager{consumer: consumer, log: log, handler: handler, publisher: publisher, selector: selector}
}
// region Public
func (m *StreamManager) Manage(ctx context.Context, srcStream string) error {
messages, err := m.consumer.Consume(srcStream)
if err != nil {
return err
}
m.log.Info("processing source stream")
startTime := time.Now()
var processedMessages, selectedMessages int
var lastProcessedMessage amqp091.Delivery
defer func() {
m.log.Info("processing source stream finished",
slog.Int("processedMessages", processedMessages),
slog.Int("selectedMessages", selectedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}()
loop:
for {
select {
case msg := <-messages:
processedMessages++
selected, err := m.selector.IsSelected(msg)
if err != nil {
m.logHandleSrcMsgErr("error occurred while checking if message is selected", err, msg, srcStream)
errReject := msg.Reject(true)
if errReject != nil {
m.logHandleSrcMsgErr("error occurred while rejecting message", errReject, msg, srcStream)
}
return err
}
if selected {
selectedMessages++
_, err = m.handler.Handle(msg)
if err != nil {
m.logHandleSrcMsgErr("error occurred while handling message", err, msg, srcStream)
errReject := msg.Reject(true)
if errReject != nil {
m.logHandleSrcMsgErr("error occurred while rejecting message", errReject, msg, srcStream)
}
return err
}
}
err = msg.Ack(false) // purge/remove message from the source queue
if err != nil {
m.logHandleSrcMsgErr("error occurred while acknowledging message", err, msg, srcStream)
return err
}
if processedMessages%1000 == 0 {
m.log.Info("processing source stream progress",
slog.Int("processedMessages", processedMessages),
slog.Int("selectedMessages", selectedMessages),
slog.Duration("duration", time.Since(startTime)),
)
}
lastProcessedMessage = msg
case <-ctx.Done():
m.log.Error("context cancelled while processing source stream",
slog.Any("error", ctx.Err()),
slog.Any("lastProcessedMessage", lastProcessedMessage),
slog.String("srcStream", srcStream),
slog.String("help", partialStreamManagementHelpMsg),
)
return ctx.Err()
case <-time.After(time.Second):
break loop
}
}
return nil
}
// endregion
// region Private
func (m *StreamManager) logHandleSrcMsgErr(errMsg string, err error, msg amqp091.Delivery, srcStream string) {
m.log.Error(errMsg,
slog.Any("error", err),
slog.Any("msg", msg),
slog.String("srcStream", srcStream),
slog.String("help", partialStreamManagementHelpMsg),
)
}
// endregion
package mappers
import "github.com/rabbitmq/amqp091-go"
func DeliveryPublishing(msg amqp091.Delivery) amqp091.Publishing {
return amqp091.Publishing{
Headers: msg.Headers,
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
Priority: msg.Priority,
CorrelationId: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
Expiration: msg.Expiration,
MessageId: msg.MessageId,
Timestamp: msg.Timestamp,
Type: msg.Type,
UserId: msg.UserId,
AppId: msg.AppId,
Body: msg.Body,
}
}
// Code generated by mockery v2.33.3. DO NOT EDIT.
package mocks
import (
amqp091 "github.com/rabbitmq/amqp091-go"
mock "github.com/stretchr/testify/mock"
)
// Consumer is an autogenerated mock type for the Consumer type
type Consumer struct {
mock.Mock
}
// Close provides a mock function with given fields:
func (_m *Consumer) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Consume provides a mock function with given fields: queue
func (_m *Consumer) Consume(queue string) (<-chan amqp091.Delivery, error) {
ret := _m.Called(queue)
var r0 <-chan amqp091.Delivery
var r1 error
if rf, ok := ret.Get(0).(func(string) (<-chan amqp091.Delivery, error)); ok {
return rf(queue)
}
if rf, ok := ret.Get(0).(func(string) <-chan amqp091.Delivery); ok {
r0 = rf(queue)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan amqp091.Delivery)
}
}
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(queue)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewConsumer creates a new instance of Consumer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewConsumer(t interface {
mock.TestingT
Cleanup(func())
}) *Consumer {
mock := &Consumer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.33.3. DO NOT EDIT.
package mocks
import (
amqp091 "github.com/rabbitmq/amqp091-go"
mock "github.com/stretchr/testify/mock"
)
// Publisher is an autogenerated mock type for the Publisher type
type Publisher struct {
mock.Mock
}
// Close provides a mock function with given fields:
func (_m *Publisher) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Publish provides a mock function with given fields: topic, msg
func (_m *Publisher) Publish(topic string, msg amqp091.Publishing) error {
ret := _m.Called(topic, msg)
var r0 error
if rf, ok := ret.Get(0).(func(string, amqp091.Publishing) error); ok {
r0 = rf(topic, msg)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewPublisher creates a new instance of Publisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewPublisher(t interface {
mock.TestingT
Cleanup(func())
}) *Publisher {
mock := &Publisher{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package mocks
import (
"github.com/stretchr/testify/mock"
)
type Acknowledger struct {
mock.Mock
AckedTags map[uint64]bool
}
func NewAcknowledger(t interface {
mock.TestingT
Cleanup(func())
}) *Acknowledger {
mock := &Acknowledger{AckedTags: make(map[uint64]bool)}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
func (a *Acknowledger) Ack(tag uint64, multiple bool) error {
a.AckedTags[tag] = true
return a.Called(tag, multiple).Error(0)
}
func (a *Acknowledger) Nack(tag uint64, multiple bool, requeue bool) error {
return a.Called(tag, multiple, requeue).Error(0)
}
func (a *Acknowledger) Reject(tag uint64, requeue bool) error {
return a.Called(tag, requeue).Error(0)
}
package selectors
import (
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/rabbitmq/amqp091-go"
)
type FilterExprSelector struct {
program *vm.Program
}
func NewFilterExprSelector(filterExpr string) (*FilterExprSelector, error) {
program, err := expr.Compile(filterExpr, expr.Env(DeliverySubset{}))
if err != nil {
return nil, err
}
return &FilterExprSelector{program: program}, nil
}
func (s *FilterExprSelector) IsSelected(msg amqp091.Delivery) (bool, error) {
output, err := expr.Run(s.program, SubsetFromDelivery(msg))
if err != nil {
return false, err
}
isSelected, ok := output.(bool)
if !ok {
return false, fmt.Errorf("unknown output type: %T", output)
}
return isSelected, nil
}
// Code generated by mockery v2.33.3. DO NOT EDIT.
package mocks
import (
amqp091 "github.com/rabbitmq/amqp091-go"
mock "github.com/stretchr/testify/mock"
)
// Selector is an autogenerated mock type for the Selector type
type Selector struct {
mock.Mock
}
// IsSelected provides a mock function with given fields: msg
func (_m *Selector) IsSelected(msg amqp091.Delivery) (bool, error) {
ret := _m.Called(msg)
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(amqp091.Delivery) (bool, error)); ok {
return rf(msg)
}
if rf, ok := ret.Get(0).(func(amqp091.Delivery) bool); ok {
r0 = rf(msg)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(amqp091.Delivery) error); ok {
r1 = rf(msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewSelector creates a new instance of Selector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewSelector(t interface {
mock.TestingT
Cleanup(func())
}) *Selector {
mock := &Selector{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package selectors
import (
"time"
"unicode/utf8"
"github.com/rabbitmq/amqp091-go"
)
func SubsetFromDelivery(msg amqp091.Delivery) DeliverySubset {
var timestamp string
var body any
if !msg.Timestamp.IsZero() {
timestamp = msg.Timestamp.Format(time.RFC3339Nano)
}
if len(msg.Body) > 0 {
if utf8.Valid(msg.Body) {
body = string(msg.Body)
} else {
body = msg.Body
}
}
return DeliverySubset{
Headers: msg.Headers,
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
Priority: msg.Priority,
CorrelationID: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
Expiration: msg.Expiration,
MessageID: msg.MessageId,
Timestamp: timestamp,
Type: msg.Type,
UserID: msg.UserId,
AppID: msg.AppId,
Redelivered: msg.Redelivered,
Exchange: msg.Exchange,
RoutingKey: msg.RoutingKey,
Body: body,
}
}
// region Structs
type DeliverySubset struct {
Headers amqp091.Table `json:"headers,omitempty" expr:"headers"`
ContentType string `json:"contentType,omitempty" expr:"contentType"`
ContentEncoding string `json:"contentEncoding,omitempty" expr:"contentEncoding"`
DeliveryMode uint8 `json:"deliveryMode,omitempty" expr:"deliveryMode"`
Priority uint8 `json:"priority,omitempty" expr:"priority"`
CorrelationID string `json:"correlationID,omitempty" expr:"correlationID"`
ReplyTo string `json:"replyTo,omitempty" expr:"replyTo"`
Expiration string `json:"expiration,omitempty" expr:"expiration"`
MessageID string `json:"messageID,omitempty" expr:"messageID"`
Timestamp string `json:"timestamp,omitempty" expr:"timestamp"`
Type string `json:"type,omitempty" expr:"type"`
UserID string `json:"userID,omitempty" expr:"userID"`
AppID string `json:"appID,omitempty" expr:"appID"`
Redelivered bool `json:"redelivered,omitempty" expr:"redelivered"`
Exchange string `json:"exchange,omitempty" expr:"exchange"`
RoutingKey string `json:"routingKey,omitempty" expr:"routingKey"`
Body any `json:"body,omitempty" expr:"-"`
}
// endregion
package selectors
import "github.com/rabbitmq/amqp091-go"
type YesSelector struct{}
func NewYesSelector() *YesSelector {
return &YesSelector{}
}
func (s *YesSelector) IsSelected(_ amqp091.Delivery) (bool, error) {
return true, nil
}
package stubs
import (
"context"
"log/slog"
)
type Handler struct{ slog.Handler }
func NewHandler() slog.Handler { return &Handler{} }
func (h *Handler) Enabled(_ context.Context, _ slog.Level) bool { return false }
func (h *Handler) Handle(_ context.Context, _ slog.Record) error { return nil }
func (h *Handler) WithAttrs(_ []slog.Attr) slog.Handler { return h }
func (h *Handler) WithGroup(_ string) slog.Handler { return h }
package util
import (
"reflect"
"runtime"
"strings"
)
// NameOf returns provided function or method name as string
//
// tests.NameOf(package.SomeFunc) -> "SomeFunc"
// tests.NameOf(SomeOtherFunc) -> "SomeOtherFunc"
func NameOf(f interface{}) string {
v := reflect.ValueOf(f)
if v.Kind() == reflect.Func {
if rf := runtime.FuncForPC(v.Pointer()); rf != nil {
name := rf.Name()
dotIndex := strings.LastIndex(name, ".")
name = name[dotIndex+1:]
// some methods come with -fm extension
minusIndex := strings.LastIndex(name, "-")
if minusIndex != -1 {
name = name[0:minusIndex]
}
return name
}
}
return v.String()
}