package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
"github.com/qor5/x/v3/login"
v "github.com/qor5/x/v3/ui/vuetify"
h "github.com/theplant/htmlgo"
"github.com/urfave/cli/v3"
"go.uber.org/fx"
"golang.org/x/text/language"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/clickhouse"
"go.ads.coffee/platform/admin/internal/config"
"go.ads.coffee/platform/admin/internal/database"
"go.ads.coffee/platform/admin/internal/internat"
"go.ads.coffee/platform/admin/internal/logger"
"go.ads.coffee/platform/admin/internal/modules/ads"
"go.ads.coffee/platform/admin/internal/modules/media"
"go.ads.coffee/platform/admin/internal/modules/stats"
"go.ads.coffee/platform/admin/internal/modules/users"
umodels "go.ads.coffee/platform/admin/internal/modules/users/models"
"go.ads.coffee/platform/admin/internal/s3storage"
"go.ads.coffee/platform/admin/internal/server"
)
func main() {
cmd := &cli.Command{
Name: "kodikapusta",
Flags: []cli.Flag{
&cli.StringFlag{Name: "config", Aliases: []string{"c"}},
},
Commands: []*cli.Command{
{
Name: "serve",
Aliases: []string{"s"},
Action: func(ctx context.Context, cmd *cli.Command) error {
fx.New(
fx.Provide(
func() prometheus.Registerer {
// default prometheus
return prometheus.DefaultRegisterer
},
),
fx.Provide(
func() (config.Config, error) {
cfg := cmd.String("config")
if cfg == "" {
cfg = "admin/configs/config.yaml"
}
return config.New(cfg)
},
),
database.Module,
logger.Module,
s3storage.Module,
server.Module,
clickhouse.Module,
ads.Module,
users.Module,
media.Module,
stats.Module,
fx.Provide(
configure,
auth,
),
fx.Invoke(
serve,
),
).Run()
return nil
},
},
{
Name: "user",
Aliases: []string{"u"},
Action: func(ctx context.Context, cmd *cli.Command) error {
fx.New(
fx.Provide(
func() (config.Config, error) {
cfg := cmd.String("config")
if cfg == "" {
cfg = "admin/configs/config.yaml"
}
return config.New(cfg)
},
),
database.Module,
logger.Module,
s3storage.Module,
server.Module,
ads.Module,
users.Module,
media.Module,
fx.Invoke(
user,
),
).Run()
return nil
},
},
{
Name: "migrate",
Aliases: []string{"m"},
Action: func(ctx context.Context, cmd *cli.Command) error {
fx.New(
fx.Provide(
func() (config.Config, error) {
cfg := cmd.String("config")
if cfg == "" {
cfg = "admin/configs/config.yaml"
}
return config.New(cfg)
},
),
database.Module,
logger.Module,
ads.Module,
users.Module,
fx.Invoke(
migrate,
),
).Run()
return nil
},
},
},
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
panic(err)
}
}
func serve(lc fx.Lifecycle, srv *server.Server) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return srv.Serve()
},
OnStop: func(ctx context.Context) error {
return srv.Shutdown(ctx)
},
})
}
func auth(users *users.Users, pb *presets.Builder) *login.Builder {
return users.Auth(pb)
}
func configure(
db *gorm.DB,
media *media.Media,
users *users.Users,
ads *ads.Ads,
stats *stats.Stats,
) *presets.Builder {
b := presets.New()
// Set up the project name, ORM and Homepage
b.URIPrefix("/admin").
// BrandTitle("Кофеин").
DataOperator(gorm2op.DataOperator(db)).
HomePageFunc(func(ctx *web.EventContext) (r web.PageResponse, err error) {
r.Body = v.VContainer(
h.H1("Реклама"),
h.P().Text("Лучшая DSP"))
return
})
media.Configure(b)
ads.Configure(b)
users.Configure(b)
stats.Configure(b)
b.MenuOrder(
"dashboard",
"separator",
"advertisers",
"campaigns",
"bgroups",
"banners",
"separator",
"media-library",
"users",
"separator",
"placement",
"units",
"networks",
)
i18nB := b.GetI18n()
i18nB.SupportLanguages(language.Russian, language.English)
i18nB.
RegisterForModule(language.English, presets.ModelsI18nModuleKey, internat.Messages_en_EN_ModelsI18nModuleKey).
RegisterForModule(language.Russian, presets.ModelsI18nModuleKey, internat.Messages_ru_RU_ModelsI18nModuleKey).
RegisterForModule(language.English, presets.CoreI18nModuleKey, internat.Messages_en_EN).
RegisterForModule(language.Russian, presets.CoreI18nModuleKey, internat.Messages_ru_RU).
GetSupportLanguagesFromRequestFunc(func(r *http.Request) []language.Tag {
return b.GetI18n().GetSupportLanguages()
})
return b
}
func user(db *gorm.DB) {
fmt.Println("add user: admin, password")
u := umodels.User{
Name: "admin",
}
u.Account = "admin"
u.Password = "password"
u.EncryptPassword()
if err := db.Model(&u).Save(&u).Error; err != nil {
log.Fatal(err)
}
os.Exit(0)
}
//nolint:errcheck
func migrate(ads *ads.Ads, users *users.Users) {
fmt.Println("migrate models")
ads.Migrate()
users.Migrate()
os.Exit(0)
}
package clickhouse
import (
"fmt"
"time"
"github.com/uptrace/go-clickhouse/ch"
)
type Clickhouse struct {
DB *ch.DB
}
func New(config Config) (*Clickhouse, error) {
db := ch.Connect(
ch.WithDSN(fmt.Sprintf("clickhouse://%s:%s/%s?sslmode=disable", config.Host, config.Port, config.Database)),
// ch.WithInsecure(true),
// ch.WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
ch.WithUser(config.User),
ch.WithPassword(config.Password),
ch.WithTimeout(30*time.Second),
ch.WithDialTimeout(30*time.Second),
// ch.WithReadTimeout(5*time.Second),
// ch.WithWriteTimeout(5*time.Second),
ch.WithQuerySettings(map[string]interface{}{
"prefer_column_name_to_alias": 1,
}),
)
return &Clickhouse{
DB: db,
}, nil
}
func (c *Clickhouse) Close() error {
err := c.DB.Close()
if err != nil {
return err
}
return nil
}
package config
import (
"os"
"go.uber.org/config"
"go.uber.org/fx"
"go.ads.coffee/platform/admin/internal/clickhouse"
"go.ads.coffee/platform/admin/internal/database"
"go.ads.coffee/platform/admin/internal/s3storage"
"go.ads.coffee/platform/admin/internal/server"
)
type Config struct {
fx.Out
Database database.Config `yaml:"database"`
S3Storage s3storage.Config `yaml:"s3storage"`
Server server.Config `yaml:"server"`
Clickhouse clickhouse.Config `yaml:"clickhouse"`
}
func New(file string) (Config, error) {
provider, err := config.NewYAML(
config.Expand(os.LookupEnv),
config.File(file),
config.Permissive(),
)
if err != nil {
return Config{}, err
}
cfg := Config{}
err = provider.Get("").Populate(&cfg)
if err != nil {
return Config{}, err
}
return cfg, nil
}
package database
import "fmt"
type Config struct {
Debug bool `yaml:"debug"`
User string `yaml:"user"`
Password string `yaml:"password"`
Dbname string `yaml:"dbname"`
Host string `yaml:"host"`
Port string `yaml:"port"`
}
func (c Config) Connection() string {
return fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable host=%s port=%s",
c.User,
c.Password,
c.Dbname,
c.Host,
c.Port,
)
}
func (c Config) URL() string {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
c.User,
c.Password,
c.Host,
c.Port,
c.Dbname,
)
}
package database
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func New(config Config) *gorm.DB {
// Create database connection
db, err := gorm.Open(postgres.Open(config.Connection()))
if err != nil {
panic(err)
}
// Set db log level
if config.Debug {
db.Logger = db.Logger.LogMode(logger.Info)
} else {
db.Logger = db.Logger.LogMode(logger.Warn)
}
return db
}
package form
import "net/http"
const (
defaultMaxMemory = 32 << 20 // 32 MB
)
//nolint:errcheck
func Values(r *http.Request, key string) []string {
if r.Form == nil {
r.ParseMultipartForm(defaultMaxMemory)
}
if vs := r.Form[key]; len(vs) > 0 {
return vs
}
return nil
}
package internat
import (
"fmt"
"github.com/qor5/admin/v3/presets"
)
var Messages_en_EN = &presets.Messages{
DialogTitleDefault: "Confirm",
SuccessfullyUpdated: "Successfully Updated",
SuccessfullyCreated: "Successfully Created",
Search: "Search",
New: "New",
Update: "Update",
Delete: "Delete",
Edit: "Edit",
FormTitle: "Form",
OK: "OK",
Cancel: "Cancel",
Clear: "Clear",
Create: "Create",
SelectedTemplate: func(v any) string {
return fmt.Sprintf("%v Selected", v)
},
DeleteConfirmationText: "Are you sure you want to delete this object?",
DeleteObjectsConfirmationText: func(v int) string {
return fmt.Sprintf(`Are you sure you want to delete %v objects`, v)
},
CreatingObjectTitleTemplate: "New {modelName}",
EditingObjectTitleTemplate: "Editing {modelName} {id}",
ListingObjectTitleTemplate: "{modelName}",
DetailingObjectTitleTemplate: "{modelName} {id}",
FiltersClear: "Reset",
FiltersAdd: "Add Filters",
FilterApply: "Apply",
FilterByTemplate: "Filter by {filter}",
FiltersDateInTheLast: "is in the last",
FiltersDateEquals: "is equal to",
FiltersDateBetween: "is between",
FiltersDateIsAfter: "is after",
FiltersDateIsAfterOrOn: "is on or after",
FiltersDateIsBefore: "is before",
FiltersDateIsBeforeOrOn: "is before or on",
FiltersDateDays: "days",
FiltersDateMonths: "months",
FiltersDateAnd: "and",
FiltersDateStartAt: "Start at",
FiltersDateEndAt: "End at",
FiltersDateTo: "to",
FiltersDateClear: "Clear",
FiltersDateOK: "OK",
FiltersNumberEquals: "is equal to",
FiltersNumberBetween: "between",
FiltersNumberGreaterThan: "is greater than",
FiltersNumberLessThan: "is less than",
FiltersNumberAnd: "and",
FiltersStringEquals: "is equal to",
FiltersStringContains: "contains",
FiltersMultipleSelectIn: "in",
FiltersMultipleSelectNotIn: "not in",
PaginationRowsPerPage: "Rows per page: ",
ListingNoRecordToShow: "No records to show",
ListingSelectedCountNotice: "{count} records are selected. ",
ListingClearSelection: "clear selection",
BulkActionNoRecordsSelected: "No records selected",
BulkActionNoAvailableRecords: "None of the selected records can be executed with this action.",
BulkActionSelectedIdsProcessNoticeTemplate: "Partially selected records cannot be executed with this action: {ids}.",
ConfirmDialogTitleText: "Confirm",
ConfirmDialogPromptText: "Are you sure?",
Language: "Language",
Colon: ":",
NotFoundPageNotice: "Sorry, the requested page cannot be found. Please check the URL.",
ButtonLabelActionsMenu: "Actions",
Save: "Save",
AddRow: "Add Item",
AddCard: "Add Card",
AddButton: "Add Button",
CheckboxTrueLabel: "YES",
CheckboxFalseLabel: "NO",
HumanizeTimeAgo: "ago",
HumanizeTimeFromNow: "from now",
HumanizeTimeNow: "now",
HumanizeTime1Second: "1 second %s",
HumanizeTimeSeconds: "%d seconds %s",
HumanizeTime1Minute: "1 minute %s",
HumanizeTimeMinutes: "%d minutes %s",
HumanizeTime1Hour: "1 hour %s",
HumanizeTimeHours: "%d hours %s",
HumanizeTime1Day: "1 day %s",
HumanizeTimeDays: "%d days %s",
HumanizeTime1Week: "1 week %s",
HumanizeTimeWeeks: "%d weeks %s",
HumanizeTime1Month: "1 month %s",
HumanizeTimeMonths: "%d months %s",
HumanizeTime1Year: "1 year %s",
HumanizeTime2Years: "2 years %s",
HumanizeTimeYears: "%d years %s",
HumanizeTimeLongWhile: "a long while %s",
LeaveBeforeUnsubmit: "If you leave before submitting the form, you will lose all the unsaved input.",
RecordNotFound: "record not found",
}
var Messages_ru_RU = &presets.Messages{
DialogTitleDefault: "Подтвердить",
SuccessfullyUpdated: "Успешно обновлено",
SuccessfullyCreated: "Успешно создано",
Search: "Поиск",
New: "Новый",
Update: "Обновить",
Delete: "Удалить",
Edit: "Редактировать",
FormTitle: "Форма",
OK: "ОК",
Cancel: "Отменить",
Clear: "Очистить",
Create: "Создать",
SelectedTemplate: func(v any) string {
return fmt.Sprintf("%v Выбрано", v)
},
DeleteConfirmationText: "Вы уверены, что хотите удалить этот объект?",
DeleteObjectsConfirmationText: func(v int) string {
return fmt.Sprintf(`Вы уверены, что хотите удалить %v объектов`, v)
},
CreatingObjectTitleTemplate: "Новый {modelName}",
EditingObjectTitleTemplate: "Редактирование {modelName} {id}",
ListingObjectTitleTemplate: "{modelName}",
DetailingObjectTitleTemplate: "{modelName} {id}",
FiltersClear: "Сбросить",
FiltersAdd: "Добавить фильтры",
FilterApply: "Применить",
FilterByTemplate: "Фильтровать по {filter}",
FiltersDateInTheLast: "в течение последних",
FiltersDateEquals: "равно",
FiltersDateBetween: "между",
FiltersDateIsAfter: "после",
FiltersDateIsAfterOrOn: "после или равно",
FiltersDateIsBefore: "до",
FiltersDateIsBeforeOrOn: "до или равно",
FiltersDateDays: "дней",
FiltersDateMonths: "месяцев",
FiltersDateAnd: "и",
FiltersDateStartAt: "Начало",
FiltersDateEndAt: "Конец",
FiltersDateTo: "до",
FiltersDateClear: "Очистить",
FiltersDateOK: "ОК",
FiltersNumberEquals: "равно",
FiltersNumberBetween: "между",
FiltersNumberGreaterThan: "больше чем",
FiltersNumberLessThan: "меньше чем",
FiltersNumberAnd: "и",
FiltersStringEquals: "равно",
FiltersStringContains: "содержит",
FiltersMultipleSelectIn: "в",
FiltersMultipleSelectNotIn: "не в",
PaginationRowsPerPage: "Строк на странице: ",
ListingNoRecordToShow: "Нет записей для отображения",
ListingSelectedCountNotice: "{count} записей выбраны. ",
ListingClearSelection: "очистить выбор",
BulkActionNoRecordsSelected: "Нет выбранных записей",
BulkActionNoAvailableRecords: "Ни одна из выбранных записей не может быть выполнена с этой операцией.",
BulkActionSelectedIdsProcessNoticeTemplate: "Частично выбранные записи не могут быть выполнены с этой операцией: {ids}.",
ConfirmDialogTitleText: "Подтвердить",
ConfirmDialogPromptText: "Вы уверены?",
Language: "Язык",
Colon: ":",
NotFoundPageNotice: "Извините, запрашиваемая страница не найдена. Пожалуйста, проверьте URL.",
ButtonLabelActionsMenu: "Действия",
Save: "Сохранить",
AddRow: "Добавить элемент",
AddCard: "Добавить карточку",
AddButton: "Добавить кнопку",
CheckboxTrueLabel: "ДА",
CheckboxFalseLabel: "НЕТ",
HumanizeTimeAgo: "назад",
HumanizeTimeFromNow: "сейчас",
HumanizeTimeNow: "сейчас",
HumanizeTime1Second: "1 секунда %s",
HumanizeTimeSeconds: "%d секунд %s",
HumanizeTime1Minute: "1 минута %s",
HumanizeTimeMinutes: "%d минут %s",
HumanizeTime1Hour: "1 час %s",
HumanizeTimeHours: "%d часов %s",
HumanizeTime1Day: "1 день %s",
HumanizeTimeDays: "%d дней %s",
HumanizeTime1Week: "1 неделя %s",
HumanizeTimeWeeks: "%d недель %s",
HumanizeTime1Month: "1 месяц %s",
HumanizeTimeMonths: "%d месяцев %s",
HumanizeTime1Year: "1 год %s",
HumanizeTime2Years: "2 года %s",
HumanizeTimeYears: "%d лет %s",
HumanizeTimeLongWhile: "очень давно %s",
LeaveBeforeUnsubmit: "Если вы покинете форму до её отправки, все несохранённые данные будут утеряны.",
RecordNotFound: "запись не найдена",
}
package logger
import (
"go.uber.org/zap"
)
func New() *zap.Logger {
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return logger
}
package ads
import (
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/builders"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
// конфигурация админки для модуля ads
type Ads struct {
logger *zap.Logger
db *gorm.DB
banner *builders.Banner
group *builders.Group
campaign *builders.Campaign
advertiser *builders.Advertiser
network *builders.Network
placement *builders.Placement
unit *builders.Unit
}
func New(
logger *zap.Logger,
db *gorm.DB,
banner *builders.Banner,
group *builders.Group,
campaign *builders.Campaign,
advertiser *builders.Advertiser,
network *builders.Network,
placement *builders.Placement,
unit *builders.Unit,
) *Ads {
return &Ads{
logger: logger,
db: db,
banner: banner,
group: group,
campaign: campaign,
advertiser: advertiser,
network: network,
placement: placement,
unit: unit,
}
}
func (m *Ads) Configure(b *presets.Builder) {
b.AssetFunc(func(ctx *web.EventContext) {
ctx.Injector.HeadHTML(`
<style>
details summary::marker {
content: "❯ ";
}
/*
details[open] summary::marker { content:" " }
*/
</style>
`)
})
m.advertiser.Configure(b)
m.campaign.Configure(b)
m.group.Configure(b)
m.banner.Configure(b)
m.network.Configure(b)
m.placement.Configure(b)
m.unit.Configure(b)
}
// TODO mowe to different command
func (u *Ads) Migrate() {
err := u.db.AutoMigrate(
&models.Advertiser{},
&models.Campaign{},
&models.Banner{},
&models.Bgroup{},
&models.Audience{},
&models.Network{},
&models.Unit{},
&models.Placement{},
)
if err != nil {
panic(err)
}
}
package builders
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/components"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Advertiser struct {
logger *zap.Logger
db *gorm.DB
}
func NewAdvertiser(logger *zap.Logger, db *gorm.DB) *Advertiser {
return &Advertiser{
logger: logger,
db: db,
}
}
const (
copyAdvertiserEvent = "copyAdvertiser"
archiveAdvertiserEvent = "archiveAdvertiser"
unarchiveAdvertiserEvent = "unarchiveAdvertiser"
)
func (m *Advertiser) Configure(b *presets.Builder) {
ma := b.Model(&models.Advertiser{}).
MenuIcon("mdi-account-group").
// Label("Рекламодатели").
RightDrawerWidth("1000")
mal := ma.Listing("ID", "Title", "Start", "End", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := m.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := m.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
SelectableColumns(true).
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Start",
DBColumn: "start",
},
{
FieldName: "End",
DBColumn: "end",
},
})
mal.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
{
Key: "created",
Label: "Создан",
ItemType: vuetifyx.ItemTypeDate,
SQLCondition: `created_at %s ?`,
},
}
})
mal.Field("Title").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Advertiser)
style := ""
text := ""
if c.ArchivedAt != nil {
style = "color:#bb0"
text = " - архив"
}
return h.Td().Children(
h.A().
Text(c.Title+text).
Style(style).
Attr("onclick", "event.stopPropagation();").
Href(fmt.Sprintf("/admin/campaigns?f_advertiser=%d", c.ID)),
)
})
mal.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Advertiser)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
man := mal.RowMenu()
// Добавляем обработчик копирования
man.RowMenuItem("Copy").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
return v.VListItem(
web.Slot(
v.VIcon("mdi-content-copy"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Копировать"),
),
).Attr("@click",
web.Plaid().EventFunc(copyAdvertiserEvent).Query("id", id).Go(),
)
})
man.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Advertiser)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveAdvertiserEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveAdvertiserEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
ma.RegisterEventFunc(copyAdvertiserEvent, m.copyAdvertiser)
ma.RegisterEventFunc(archiveAdvertiserEvent, m.archiveAdvertiser)
ma.RegisterEventFunc(unarchiveAdvertiserEvent, m.unarchiveAdvertiser)
mae := ma.Editing(
&presets.FieldsSection{
Title: "Info",
Rows: [][]string{
{"Title"},
{"Info"},
{"OrdContract"},
{"Start", "End"},
{"Active"},
},
},
&presets.FieldsSection{
Title: "Targeting",
Rows: [][]string{
{"Timetable"},
{"Targeting"},
},
},
&presets.FieldsSection{
Title: "Budget",
Rows: [][]string{
{"Budget"},
},
},
&presets.FieldsSection{
Title: "Capping",
Rows: [][]string{
{"Capping"},
},
},
)
mae.Field("Info").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return v.VTextarea().
Label(field.Label).
Attr(web.VField(field.FormKey, fmt.Sprint(reflectutils.MustGet(obj, field.Name)))...).
Disabled(field.Disabled).
ErrorMessages(field.Errors...)
})
timetable := components.NewTimetable(m.logger)
mae.Field("Timetable").
ComponentFunc(timetable.Component).
SetterFunc(timetable.Setter)
targeting := components.NewTargeting(m.logger)
mae.Field("Targeting").
ComponentFunc(targeting.Component).
SetterFunc(targeting.Setter)
budget := components.NewBudget(m.logger)
mae.Field("Budget").
ComponentFunc(budget.Component).
SetterFunc(budget.Setter)
capping := components.NewCapping(m.logger)
mae.Field("Capping").
ComponentFunc(capping.Component).
SetterFunc(capping.Setter)
}
func (m *Advertiser) copyAdvertiser(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Advertiser
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find advertiser: %w", err)
}
// Создаем копию
copyAdvertiser := models.Advertiser{
Title: original.Title + " (Копия)",
Info: original.Info,
Start: original.Start,
End: original.End,
Timetable: original.Timetable,
Targeting: original.Targeting,
Budget: original.Budget,
Capping: original.Capping,
Active: false,
OrdContract: original.OrdContract,
}
// Сохраняем копию в базу данных
if err := m.db.Create(©Advertiser).Error; err != nil {
return r, fmt.Errorf("failed to create copy: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Advertiser{}),
presets.PayloadModelsUpdated{Ids: []string{id, strconv.Itoa(int(copyAdvertiser.ID))}},
)
return r, nil
}
func (m *Advertiser) archiveAdvertiser(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Advertiser
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find campaign: %w", err)
}
now := time.Now()
if err := original.Archive(m.db, &now); err != nil {
return r, fmt.Errorf("failed to archive campaign: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Advertiser{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (m *Advertiser) unarchiveAdvertiser(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Advertiser
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find advertiser: %w", err)
}
if err := original.Archive(m.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive advertiser: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Advertiser{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/qor5/admin/v3/media"
"github.com/qor5/admin/v3/media/base"
"github.com/qor5/admin/v3/media/media_library"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/components"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Banner struct {
logger *zap.Logger
db *gorm.DB
}
func NewBanner(logger *zap.Logger, db *gorm.DB) *Banner {
return &Banner{
logger: logger,
db: db,
}
}
const (
copyBannerEvent = "copyBanner"
archiveBannerEvent = "archiveBanner"
unarchiveBannerEvent = "unarchiveBanner"
)
func (m *Banner) Configure(b *presets.Builder) {
mb := b.Model(&models.Banner{}).
MenuIcon("mdi-image").
// Label("Креативы").
RightDrawerWidth("1000")
mbl := mb.Listing("ID", "Title", "Icon", "Price", "Bgroup", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
// по умоланию архивные сущности не показываются
// только если явно выбрать их в фильтре
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := m.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := m.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
// SelectableColumns(true).
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Active",
DBColumn: "active",
},
})
mbl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
// msgr := i18n.MustGetModuleMessages(ctx.R, presets.ModelsI18nModuleKey, Messages_en_US).(*Messages)
var companyOptions []*vuetifyx.SelectItem
err := m.db.Model(&models.Bgroup{}).Select("title as text, id as value").Scan(&companyOptions).Error
if err != nil {
panic(err)
}
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "group",
Label: "Группа",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: `bgroup_id %s ?`,
Options: companyOptions,
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
}
})
mbl.Field("Bgroup").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Banner)
var group models.Bgroup
if c.BgroupID == 0 {
return h.Td()
}
m.db.First(&group, "id = ?", c.BgroupID)
return h.Td().Text(group.Title)
})
mbl.Field("Price").Label("CPM")
mbl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Banner)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mbn := mbl.RowMenu()
// Добавляем обработчик копирования
mbn.RowMenuItem("Copy").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
return v.VListItem(
web.Slot(
v.VIcon("mdi-content-copy"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Копировать"),
),
).Attr("@click",
web.Plaid().EventFunc(copyBannerEvent).Query("id", id).Go(),
)
})
mbn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
banner := obj.(*models.Banner)
if banner.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveBannerEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveBannerEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mb.RegisterEventFunc(copyBannerEvent, m.copyBanner)
mb.RegisterEventFunc(archiveBannerEvent, m.archiveBanner)
mb.RegisterEventFunc(unarchiveBannerEvent, m.unarchiveBanner)
mbe := mb.Editing(
&presets.FieldsSection{
Title: "Info",
Rows: [][]string{
{"BgroupID"},
{"Title"},
{"Label"},
{"Description"},
{"Image"},
{"Icon"},
{"Active"},
},
},
&presets.FieldsSection{
Title: "Price",
Rows: [][]string{
{"Price"},
},
},
&presets.FieldsSection{
Title: "Marker",
Rows: [][]string{
{"Erid"},
},
},
&presets.FieldsSection{
Title: "Timetable",
Rows: [][]string{
{"Timetable"},
},
},
&presets.FieldsSection{
Title: "Targeting",
Rows: [][]string{
{"Targeting"},
},
},
&presets.FieldsSection{
Title: "Budget",
Rows: [][]string{
{"Budget"},
},
},
&presets.FieldsSection{
Title: "Capping",
Rows: [][]string{
{"Capping"},
},
},
&presets.FieldsSection{
Title: "Tracking",
Rows: [][]string{
{"Clicktracker"},
{"Imptracker"},
{"Target"},
{"Macros"},
},
},
)
mbe.Field("Price").Label("CPM")
mbe.Field("Macros").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return h.Div(
h.P(h.Text("Доступные макросы:")),
h.P(h.Text("{gaid} {adid} {click_id} {ssp} {banner_id} {group_id} {campaign_id} {advertiser_id}")),
)
})
mbe.Field("Image").
WithContextValue(
media.MediaBoxConfig,
&media_library.MediaBoxConfig{
AllowType: media_library.ALLOW_TYPE_IMAGE,
Sizes: map[string]*base.Size{
"image": {
Width: 640,
Height: 360,
},
"250x250": {
Width: 250,
Height: 250,
},
"450x450": {
Width: 450,
Height: 450,
},
},
})
mbe.Field("Icon").
WithContextValue(
media.MediaBoxConfig,
&media_library.MediaBoxConfig{
AllowType: media_library.ALLOW_TYPE_IMAGE,
Sizes: map[string]*base.Size{
"image": {
Width: 64,
Height: 64,
},
},
})
mbe.Field("BgroupID").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Banner)
var comps []models.Bgroup
m.db.Find(&comps)
sel := v.VSelect().
Label("Группа").
Items(comps).
ItemTitle("Title").
ItemValue("ID").
Attr(web.VField("BgroupID", c.BgroupID)...)
return h.Div(
sel,
)
})
mbe.Field("Description").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return v.VTextarea().
Label(field.Label).
Attr(web.VField(field.FormKey, fmt.Sprint(reflectutils.MustGet(obj, field.Name)))...).
Disabled(field.Disabled).
ErrorMessages(field.Errors...)
})
mbe.Field("Erid").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return v.VTextField().
Label(field.Label).
Attr(web.VField(field.FormKey, fmt.Sprint(reflectutils.MustGet(obj, field.Name)))...).
Disabled(field.Disabled).
ErrorMessages(field.Errors...)
})
timetable := components.NewTimetable(m.logger)
mbe.Field("Timetable").
ComponentFunc(timetable.Component).
SetterFunc(timetable.Setter)
targeting := components.NewTargeting(m.logger)
mbe.Field("Targeting").
ComponentFunc(targeting.Component).
SetterFunc(targeting.Setter)
budget := components.NewBudget(m.logger)
mbe.Field("Budget").
ComponentFunc(budget.Component).
SetterFunc(budget.Setter)
capping := components.NewCapping(m.logger)
mbe.Field("Capping").
ComponentFunc(capping.Component).
SetterFunc(capping.Setter)
}
type Format struct {
Title string
Value string
}
func (m *Banner) copyBanner(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Banner
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find banner: %w", err)
}
// Создаем копию
nb, err := original.Copy(m.db, original.BgroupID)
if err != nil {
return r, fmt.Errorf("error on copy banner: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Banner{}),
presets.PayloadModelsUpdated{Ids: []string{id, strconv.Itoa(int(nb.ID))}},
)
return r, nil
}
func (m *Banner) archiveBanner(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Banner
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find banner: %w", err)
}
now := time.Now()
original.ArchivedAt = &now
m.db.Save(original)
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Banner{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (m *Banner) unarchiveBanner(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Banner
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find banner: %w", err)
}
original.ArchivedAt = nil
m.db.Save(original)
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Banner{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/components"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Campaign struct {
logger *zap.Logger
db *gorm.DB
}
func NewCampaign(logger *zap.Logger, db *gorm.DB) *Campaign {
return &Campaign{
logger: logger,
db: db,
}
}
const (
copyCapmaignEvent = "copyCampaign"
archiveCampaignEvent = "archiveCampaign"
unarchiveCampaignEvent = "unarchiveCampaign"
)
func (m *Campaign) Configure(b *presets.Builder) *presets.ModelBuilder {
mc := b.Model(&models.Campaign{}).
MenuIcon("mdi-bullseye-arrow").
// Label("Кампании").
RightDrawerWidth("1000")
mcl := mc.Listing("ID", "Title", "Bundle", "Start", "End", "Advertiser", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := m.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := m.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
SelectableColumns(true).
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Start",
DBColumn: "start",
},
{
FieldName: "End",
DBColumn: "end",
},
})
mcl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
// msgr := i18n.MustGetModuleMessages(ctx.R, presets.ModelsI18nModuleKey, Messages_en_US).(*Messages)
var options []*vuetifyx.SelectItem
err := m.db.Model(&models.Advertiser{}).Select("title as text, id as value").Scan(&options).Error
if err != nil {
m.logger.Error("erro on load advertisers", zap.Error(err))
return nil
}
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "advertiser",
Label: "Рекламодатель",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: `advertiser_id %s ?`,
Options: options,
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
{
Key: "created",
Label: "Создан",
ItemType: vuetifyx.ItemTypeDate,
SQLCondition: `created_at %s ?`,
},
}
})
mcl.Field("Advertiser").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Campaign)
var adv models.Advertiser
if c.AdvertiserID == 0 {
return h.Td()
}
m.db.First(&adv, "id = ?", c.AdvertiserID)
return h.Td().Children(
h.A().
Text(adv.Title).
Attr("onclick", "event.stopPropagation();").
Href(fmt.Sprintf("/admin/campaigns?f_advertiser=%d", c.AdvertiserID)),
)
})
mcl.Field("Title").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Campaign)
style := ""
text := ""
if c.ArchivedAt != nil {
style = "color:#bb0"
text = " - архив"
}
return h.Td().Children(
h.A().
Text(c.Title+text).
Style(style).
Attr("onclick", "event.stopPropagation();").
Href(fmt.Sprintf("/admin/bgroups?f_campaign=%d", c.ID)),
)
})
mcl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Campaign)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mcn := mcl.RowMenu()
// Добавляем обработчик копирования
mcn.RowMenuItem("Copy").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
return v.VListItem(
web.Slot(
v.VIcon("mdi-content-copy"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Копировать"),
),
).Attr("@click",
web.Plaid().EventFunc(copyCapmaignEvent).Query("id", id).Go(),
)
})
mcn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Campaign)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveCampaignEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveCampaignEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mc.RegisterEventFunc(copyCapmaignEvent, m.copyCampaign)
mc.RegisterEventFunc(archiveCampaignEvent, m.archiveCampaign)
mc.RegisterEventFunc(unarchiveCampaignEvent, m.unarchiveCampaign)
mce := mc.Editing(
&presets.FieldsSection{
Title: "Info",
Rows: [][]string{
{"AdvertiserID"},
{"Title", "Bundle"},
{"Start", "End"},
{"Active"},
},
},
&presets.FieldsSection{
Title: "Targeting",
Rows: [][]string{
{"Timetable"},
{"Targeting"},
},
},
&presets.FieldsSection{
Title: "Budget",
Rows: [][]string{
{"Budget"},
},
},
&presets.FieldsSection{
Title: "Capping",
Rows: [][]string{
{"Capping"},
},
},
)
mce.Field("AdvertiserID").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Campaign)
var comps []models.Advertiser
m.db.Find(&comps)
sel := v.VSelect().
Label("Рекламодатель").
Items(comps).
ItemTitle("Title").ItemValue("ID").
Attr(web.VField("AdvertiserID", c.AdvertiserID)...)
return h.Div(
sel,
)
})
timetable := components.NewTimetable(m.logger)
mce.Field("Timetable").
ComponentFunc(timetable.Component).
SetterFunc(timetable.Setter)
targeting := components.NewTargeting(m.logger)
mce.Field("Targeting").
ComponentFunc(targeting.Component).
SetterFunc(targeting.Setter)
budget := components.NewBudget(m.logger)
mce.Field("Budget").
ComponentFunc(budget.Component).
SetterFunc(budget.Setter)
capping := components.NewCapping(m.logger)
mce.Field("Capping").
ComponentFunc(capping.Component).
SetterFunc(capping.Setter)
return mc
}
func (m *Campaign) copyCampaign(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Campaign
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find camapign: %w", err)
}
// Создаем копию
nc, err := original.Copy(m.db, original.AdvertiserID)
if err != nil {
return r, fmt.Errorf("error on copy cmpaign: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Campaign{}),
presets.PayloadModelsUpdated{Ids: []string{id, strconv.Itoa(int(nc.ID))}},
)
return r, nil
}
func (m *Campaign) archiveCampaign(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Campaign
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find campaign: %w", err)
}
now := time.Now()
if err := original.Archive(m.db, &now); err != nil {
return r, fmt.Errorf("failed to archive campaign: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Campaign{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (m *Campaign) unarchiveCampaign(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Campaign
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find advertiser: %w", err)
}
if err := original.Archive(m.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive campaign: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Campaign{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/components"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Group struct {
logger *zap.Logger
db *gorm.DB
}
func NewGroup(logger *zap.Logger, db *gorm.DB) *Group {
return &Group{
logger: logger,
db: db,
}
}
// Константа для имени события копирования
const (
copyGroupEvent = "copyGroup"
archiveGroupEvent = "archiveGroup"
unarchiveGroupEvent = "unarchiveGroup"
)
func (m *Group) Configure(b *presets.Builder) *presets.ModelBuilder {
mg := b.Model(&models.Bgroup{}).
MenuIcon("mdi-lightbulb-group").
// Label("Группы").
RightDrawerWidth("1000")
mgl := mg.Listing("ID", "Title", "Price", "Start", "End", "Campaign", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := m.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := m.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
SelectableColumns(true).
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Start",
DBColumn: "start",
},
{
FieldName: "End",
DBColumn: "end",
},
})
mgl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
// msgr := i18n.MustGetModuleMessages(ctx.R, presets.ModelsI18nModuleKey, Messages_en_US).(*Messages)
var options []*vuetifyx.SelectItem
err := m.db.Model(&models.Campaign{}).Select("title as text, id as value").Scan(&options).Error
if err != nil {
panic(err)
}
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "campaign",
Label: "Кампания",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: `campaign_id %s ?`,
Options: options,
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
{
Key: "created",
Label: "Создан",
ItemType: vuetifyx.ItemTypeDate,
SQLCondition: `created_at %s ?`,
},
}
})
mgl.Field("Campaign").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Bgroup)
var comp models.Campaign
if c.CampaignID == 0 {
return h.Td()
}
m.db.First(&comp, "id = ?", c.CampaignID)
return h.Td().Children(
h.A().
Text(comp.Title).
Attr("onclick", "event.stopPropagation();").
Href(fmt.Sprintf("/admin/bgroups?f_campaign=%d", c.CampaignID)),
)
})
mgl.Field("Title").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Bgroup)
style := ""
text := ""
if c.ArchivedAt != nil {
style = "color:#bb0"
text = " - архив"
}
return h.Td().Children(
h.A().
Text(c.Title+text).
Style(style).
Attr("onclick", "event.stopPropagation();").
Href(fmt.Sprintf("/admin/banners?f_group=%d", c.ID)),
)
})
mgl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Bgroup)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mgl.Field("Price").Label("CPM")
mgn := mgl.RowMenu()
// Добавляем обработчик копирования
mgn.RowMenuItem("Copy").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
return v.VListItem(
web.Slot(
v.VIcon("mdi-content-copy"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Копировать"),
),
).Attr("@click",
web.Plaid().EventFunc(copyGroupEvent).Query("id", id).Go(),
)
})
mgn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Bgroup)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveGroupEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveGroupEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mg.RegisterEventFunc(copyGroupEvent, m.copyGroup)
mg.RegisterEventFunc(archiveGroupEvent, m.archive)
mg.RegisterEventFunc(unarchiveGroupEvent, m.unarchive)
mge := mg.Editing(
&presets.FieldsSection{
Title: "Info",
Rows: [][]string{
{"CampaignID"},
{"Title", "Price"},
{"Start", "End"},
{"Active"},
},
},
&presets.FieldsSection{
Title: "Targeting",
Rows: [][]string{
{"Timetable"},
{"Targeting"},
},
},
&presets.FieldsSection{
Title: "Budget",
Rows: [][]string{
{"Budget"},
},
},
&presets.FieldsSection{
Title: "Capping",
Rows: [][]string{
{"Capping"},
},
},
)
mge.Field("Price").Label("CPM")
mge.Field("CampaignID").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Bgroup)
var comps []models.Campaign
m.db.Find(&comps)
sel := v.VSelect().
Label("Кампания").
Items(comps).
ItemTitle("Title").ItemValue("ID").
Attr(web.VField("CampaignID", c.CampaignID)...)
return h.Div(
sel,
)
})
timetable := components.NewTimetable(m.logger)
mge.Field("Timetable").
ComponentFunc(timetable.Component).
SetterFunc(timetable.Setter)
targeting := components.NewTargeting(m.logger)
mge.Field("Targeting").
ComponentFunc(targeting.Component).
SetterFunc(targeting.Setter)
budget := components.NewBudget(m.logger)
mge.Field("Budget").
ComponentFunc(budget.Component).
SetterFunc(budget.Setter)
capping := components.NewCapping(m.logger)
mge.Field("Capping").
ComponentFunc(capping.Component).
SetterFunc(capping.Setter)
return mg
}
// Функция для копирования группы
func (m *Group) copyGroup(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Bgroup
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find group: %w", err)
}
// Создаем копию
ng, err := original.Copy(m.db, original.CampaignID)
if err != nil {
return r, fmt.Errorf("failed on copy group: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Bgroup{}),
presets.PayloadModelsUpdated{Ids: []string{id, strconv.Itoa(int(ng.ID))}},
)
return r, nil
}
func (m *Group) archive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Bgroup
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find group: %w", err)
}
now := time.Now()
if err := original.Archive(m.db, &now); err != nil {
return r, fmt.Errorf("failed to archive group: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Bgroup{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (m *Group) unarchive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Bgroup
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find group: %w", err)
}
if err := original.Archive(m.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive group: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Bgroup{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Network struct {
logger *zap.Logger
db *gorm.DB
}
func NewNetwork(logger *zap.Logger, db *gorm.DB) *Network {
return &Network{
logger: logger,
db: db,
}
}
const (
archiveNetworkEvent = "archiveNetwork"
unarchiveNetworkEvent = "unarchiveNetwork"
)
func (n *Network) Configure(b *presets.Builder) *presets.ModelBuilder {
mn := b.Model(&models.Network{}).
MenuIcon("mdi-lan").
// Label("Рекламодатели").
RightDrawerWidth("1000")
mnl := mn.Listing("ID", "Title", "Name", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := n.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := n.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Active",
DBColumn: "active",
},
})
mne := mn.Editing(
&presets.FieldsSection{
// Title: "Info",
Rows: [][]string{
{"Title"},
{"Name"},
{"Data"},
{"Active"},
},
},
).ValidateFunc(func(obj interface{}, ctx *web.EventContext) (err web.ValidationErrors) {
u := obj.(*models.Network)
if u.Title == "" {
err.FieldError("Title", "Title is required")
}
if u.Name == "" {
err.FieldError("Name", "Name is required")
}
return
})
mnl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
{
Key: "created",
Label: "Создан",
ItemType: vuetifyx.ItemTypeDate,
SQLCondition: `created_at %s ?`,
},
}
})
mnl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Network)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mne.Field("Data").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return v.VTextarea().
Label(field.Label).
Attr(web.VField(field.FormKey, fmt.Sprint(reflectutils.MustGet(obj, field.Name)))...).
Disabled(field.Disabled).
ErrorMessages(field.Errors...)
})
mnn := mnl.RowMenu()
mnn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Network)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveNetworkEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveNetworkEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mn.RegisterEventFunc(archiveNetworkEvent, n.archive)
mn.RegisterEventFunc(unarchiveNetworkEvent, n.unarchive)
return mn
}
func (n *Network) archive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Network
if err := n.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find network: %w", err)
}
now := time.Now()
if err := original.Archive(n.db, &now); err != nil {
return r, fmt.Errorf("failed to archive network: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Network{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (n *Network) unarchive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Network
if err := n.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find network: %w", err)
}
if err := original.Archive(n.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive network: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Network{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Placement struct {
logger *zap.Logger
db *gorm.DB
}
func NewPlacement(logger *zap.Logger, db *gorm.DB) *Placement {
return &Placement{
logger: logger,
db: db,
}
}
const (
archivePlacementEvent = "archivePlacement"
unarchivePlacementEvent = "unarchivePlacement"
)
func (m *Placement) Configure(b *presets.Builder) {
mp := b.Model(&models.Placement{}).
MenuIcon("mdi-vector-difference-ba").
// Label("Рекламодатели").
RightDrawerWidth("1000")
mpl := mp.Listing("ID", "Title", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := m.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := m.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Active",
DBColumn: "active",
},
})
mp.Editing(
&presets.FieldsSection{
// Title: "Info",
Rows: [][]string{
{"Title"},
{"Active"},
},
},
).ValidateFunc(func(obj interface{}, ctx *web.EventContext) (err web.ValidationErrors) {
u := obj.(*models.Placement)
if u.Title == "" {
err.FieldError("Name", "Name is required")
}
return
})
mpl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
{
Key: "created",
Label: "Создан",
ItemType: vuetifyx.ItemTypeDate,
SQLCondition: `created_at %s ?`,
},
}
})
mpl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Placement)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mpn := mpl.RowMenu()
mpn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Placement)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archivePlacementEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchivePlacementEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mp.RegisterEventFunc(archivePlacementEvent, m.archive)
mp.RegisterEventFunc(unarchivePlacementEvent, m.unarchive)
}
func (m *Placement) archive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Placement
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find placement: %w", err)
}
now := time.Now()
if err := original.Archive(m.db, &now); err != nil {
return r, fmt.Errorf("failed to archive placement: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Placement{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (m *Placement) unarchive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Placement
if err := m.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find placement: %w", err)
}
if err := original.Archive(m.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive placement: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Placement{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package builders
import (
"fmt"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/admin/v3/presets/gorm2op"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/qor5/x/v3/ui/vuetifyx"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Unit struct {
logger *zap.Logger
db *gorm.DB
}
func NewUnit(logger *zap.Logger, db *gorm.DB) *Unit {
return &Unit{
logger: logger,
db: db,
}
}
const (
archiveUnitEvent = "archiveUnit"
unarchiveUnitEvent = "unarchiveUnit"
)
func (n *Unit) Configure(b *presets.Builder) *presets.ModelBuilder {
mn := b.Model(&models.Unit{}).
MenuIcon("mdi-volcano-outline").
// Label("Рекламодатели").
RightDrawerWidth("1000")
mnl := mn.Listing("ID", "Title", "Price", "Placement", "Network", "Active").
SearchFunc(func(ctx *web.EventContext, params *presets.SearchParams) (result *presets.SearchResult, err error) {
// по умоланию архивные сущности не показываются
// только если явно выбрать их в фильтре
exist := false
for _, v := range params.SQLConditions {
if strings.Contains(v.Query, "archived_at is not null") {
exist = true
break
}
if strings.Contains(v.Query, "(archived_at is not null or archived_at is null)") {
exist = true
break
}
}
if !exist {
qdb := n.db.Where("archived_at is null")
return gorm2op.DataOperator(qdb).Search(ctx, params)
} else {
qdb := n.db.Where("")
return gorm2op.DataOperator(qdb).Search(ctx, params)
}
}).
SearchColumns("Title").
// SelectableColumns(true).
OrderableFields([]*presets.OrderableField{
{
FieldName: "ID",
DBColumn: "id",
},
{
FieldName: "Title",
DBColumn: "title",
},
{
FieldName: "Active",
DBColumn: "active",
},
})
mnl.FilterDataFunc(func(ctx *web.EventContext) vuetifyx.FilterData {
return []*vuetifyx.FilterItem{
{
Key: "archived",
Label: "Архив",
ItemType: vuetifyx.ItemTypeSelect,
SQLCondition: "archived_at is null",
Options: []*vuetifyx.SelectItem{
{
Text: "В архиве",
Value: "is_archived",
SQLCondition: "archived_at is not null",
},
{
Text: "Все",
Value: "all",
SQLCondition: "(archived_at is not null or archived_at is null)",
},
},
},
{
Key: "active",
Label: "Активность",
ItemType: vuetifyx.ItemTypeSelect,
Options: []*vuetifyx.SelectItem{
{
Text: "Включен",
Value: "is_active",
SQLCondition: "active = true",
},
{
Text: "Выключен",
Value: "not_active",
SQLCondition: "active = false",
},
},
},
}
})
mnl.Field("Placement").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Unit)
var placement models.Placement
if c.PlacementID == 0 {
return h.Td()
}
n.db.First(&placement, "id = ?", c.PlacementID)
return h.Td().Text(placement.Title)
})
mnl.Field("Network").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Unit)
var network models.Network
if c.NetworkID == 0 {
return h.Td()
}
n.db.First(&network, "id = ?", c.NetworkID)
return h.Td().Text(network.Title)
})
mnl.Field("Active").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Unit)
color := "red"
text := "выключен"
if c.Active {
text = "включен"
color = "green"
}
return h.Td().Children(h.Span(text).Style("color:" + color))
})
mne := mn.Editing(
&presets.FieldsSection{
// Title: "Info",
Rows: [][]string{
{"Title"},
{"Price"},
{"PlacementID"},
{"NetworkID"},
{"Data"},
{"Active"},
},
},
).ValidateFunc(func(obj interface{}, ctx *web.EventContext) (err web.ValidationErrors) {
u := obj.(*models.Unit)
if u.Title == "" {
err.FieldError("Title", "Title is required")
}
return
})
mne.Field("PlacementID").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Unit)
var items []models.Placement
n.db.Find(&items)
sel := v.VSelect().
Variant("outlined").Density("compact").
Label("Плейсмент").
Items(items).
ItemTitle("Title").
ItemValue("ID").
Attr(web.VField("PlacementID", c.PlacementID)...)
return h.Div(
sel,
)
})
mne.Field("NetworkID").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
c := obj.(*models.Unit)
var items []models.Network
n.db.Find(&items)
sel := v.VSelect().
Variant("outlined").Density("compact").
Label("Рекламная сеть").
Items(items).
ItemTitle("Title").
ItemValue("ID").
Attr(web.VField("NetworkID", c.NetworkID)...)
return h.Div(
sel,
)
})
mne.Field("Data").ComponentFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
return v.VTextarea().
Label(field.Label).
Attr(web.VField(field.FormKey, fmt.Sprint(reflectutils.MustGet(obj, field.Name)))...).
Disabled(field.Disabled).
ErrorMessages(field.Errors...)
})
mnn := mnl.RowMenu()
mnn.RowMenuItem("Archive").
ComponentFunc(func(obj interface{}, id string, ctx *web.EventContext) h.HTMLComponent {
item := obj.(*models.Unit)
if item.ArchivedAt == nil {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-down"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Архивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(archiveUnitEvent).Query("id", id).Go(),
)
} else {
return v.VListItem(
web.Slot(
v.VIcon("mdi-archive-arrow-up"), // Используем иконку копирования
).Name("prepend"),
v.VListItemTitle(
h.Text("Разархивировать"),
),
).Attr("@click",
web.Plaid().EventFunc(unarchiveUnitEvent).Query("id", id).Go(),
)
}
})
// Регистрируем обработчик события копирования
mn.RegisterEventFunc(archiveUnitEvent, n.archive)
mn.RegisterEventFunc(unarchiveUnitEvent, n.unarchive)
return mn
}
func (n *Unit) archive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Unit
if err := n.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find unit: %w", err)
}
now := time.Now()
if err := original.Archive(n.db, &now); err != nil {
return r, fmt.Errorf("failed to archive unit: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Unit{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
func (n *Unit) unarchive(ctx *web.EventContext) (r web.EventResponse, err error) {
id := ctx.R.FormValue("id")
if id == "" {
return r, fmt.Errorf("id is required")
}
// Находим оригинальную запись
var original models.Unit
if err := n.db.First(&original, id).Error; err != nil {
return r, fmt.Errorf("failed to find unit: %w", err)
}
if err := original.Archive(n.db, nil); err != nil {
return r, fmt.Errorf("failed to unarchive unit: %w", err)
}
// Обновляем список
r.Emit(
presets.NotifModelsUpdated(&models.Unit{}),
presets.PayloadModelsUpdated{Ids: []string{id}},
)
return r, nil
}
package components
import (
"errors"
"strconv"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Budget struct {
logger *zap.Logger
}
func NewBudget(logger *zap.Logger) *Budget {
return &Budget{
logger: logger,
}
}
func (b *Budget) Component(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
data, ok := field.Value(obj).(string)
if !ok {
b.logger.Error("budget field value is not string", zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
budget, err := models.NewBudget(data)
if err != nil {
b.logger.Error("error unmarshal budget", zap.Error(err), zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
components := []h.HTMLComponent{
v.VCol([]h.HTMLComponent{
v.VRow(
[]h.HTMLComponent{
h.Label("Показы").Style("width: 120px; margin-left: 12px; margin-top: 7px;"),
v.VTextField().
Label("Суточный").
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Impressions.Daily", budget.Impressions.Daily)...),
v.VTextField().
Label("Общий").
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Impressions.Total", budget.Impressions.Total)...),
v.VCheckbox().
Label("Равномерный").
Density("compact").
Attr(web.VField("Budget.Impressions.Uniform", budget.Impressions.Uniform)...),
}...,
),
v.VRow(
[]h.HTMLComponent{
h.Label("Клики").Style("width: 120px; margin-left: 12px; margin-top: 7px;"),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Clicks.Daily", budget.Clicks.Daily)...),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Clicks.Total", budget.Clicks.Total)...),
v.VCheckbox().
Label("Равномерный").
Density("compact").
Attr(web.VField("Budget.Clicks.Uniform", budget.Clicks.Uniform)...),
}...,
),
v.VRow(
[]h.HTMLComponent{
h.Label("Конверсии").Style("width: 120px; margin-left: 12px; margin-top: 7px;"),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Conversions.Daily", budget.Conversions.Daily)...),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Conversions.Total", budget.Conversions.Total)...),
v.VCheckbox().
Label("Равномерный").
Density("compact").
Attr(web.VField("Budget.Conversions.Uniform", budget.Conversions.Uniform)...),
}...,
),
v.VRow(
[]h.HTMLComponent{
h.Label("Деньги").Style("width: 120px; margin-left: 12px; margin-top: 7px;"),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Money.Daily", budget.Money.Daily)...),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Budget.Money.Total", budget.Money.Total)...),
v.VCheckbox().
Label("Равномерный").
Density("compact").
Attr(web.VField("Budget.Money.Uniform", budget.Money.Uniform)...),
}...,
),
}...,
),
}
return h.Div(components...).Class("budget-field")
}
func (b *Budget) Setter(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) error {
data, ok := field.Value(obj).(string)
if !ok {
return errors.New("budget field value is not string")
}
budget, err := models.NewBudget(data)
if err != nil {
return err
}
budget.Impressions.Daily, err = strconv.Atoi(ctx.R.FormValue("Budget.Impressions.Daily"))
if err != nil {
return err
}
budget.Impressions.Total, err = strconv.Atoi(ctx.R.FormValue("Budget.Impressions.Total"))
if err != nil {
return err
}
budget.Impressions.Uniform, err = strconv.ParseBool(ctx.R.FormValue("Budget.Impressions.Uniform"))
if err != nil {
return err
}
budget.Clicks.Daily, err = strconv.Atoi(ctx.R.FormValue("Budget.Clicks.Daily"))
if err != nil {
return err
}
budget.Clicks.Total, err = strconv.Atoi(ctx.R.FormValue("Budget.Clicks.Total"))
if err != nil {
return err
}
budget.Clicks.Uniform, err = strconv.ParseBool(ctx.R.FormValue("Budget.Clicks.Uniform"))
if err != nil {
return err
}
budget.Conversions.Daily, err = strconv.Atoi(ctx.R.FormValue("Budget.Conversions.Daily"))
if err != nil {
return err
}
budget.Conversions.Total, err = strconv.Atoi(ctx.R.FormValue("Budget.Conversions.Total"))
if err != nil {
return err
}
budget.Conversions.Uniform, err = strconv.ParseBool(ctx.R.FormValue("Budget.Conversions.Uniform"))
if err != nil {
return err
}
budget.Money.Daily, err = strconv.Atoi(ctx.R.FormValue("Budget.Money.Daily"))
if err != nil {
return err
}
budget.Money.Total, err = strconv.Atoi(ctx.R.FormValue("Budget.Money.Total"))
if err != nil {
return err
}
budget.Money.Uniform, err = strconv.ParseBool(ctx.R.FormValue("Budget.Money.Uniform"))
if err != nil {
return err
}
return reflectutils.Set(obj, field.Name, budget.String())
}
package components
import (
"errors"
"strconv"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Capping struct {
logger *zap.Logger
}
func NewCapping(logger *zap.Logger) *Capping {
return &Capping{
logger: logger,
}
}
func (c *Capping) Component(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
data, ok := field.Value(obj).(string)
if !ok {
c.logger.Error("capping field value is not string", zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
capping, err := models.NewCapping(data)
if err != nil {
c.logger.Error("error unmarshal budget", zap.Error(err), zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
components := []h.HTMLComponent{
v.VRow(
[]h.HTMLComponent{
v.VCol([]h.HTMLComponent{
h.Div(
h.Label("Показы").Class("text-subtitle-2"),
).Style("padding-bottom: 12px;"),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Capping.Count", capping.Count)...),
}...),
v.VCol([]h.HTMLComponent{
h.Div(
h.Label("Период (часы)").Class("text-subtitle-2"),
).Style("padding-bottom: 12px;"),
v.VTextField().
Hint("1000").
Variant("outlined").Density("compact").
Attr(web.VField("Capping.Period", capping.Period)...),
}...),
}...,
),
}
return h.Div(components...).Class("capping-field")
}
func (c *Capping) Setter(obj any, field *presets.FieldContext, ctx *web.EventContext) error {
data, ok := field.Value(obj).(string)
if !ok {
return errors.New("capping field value is not string")
}
capping, err := models.NewCapping(data)
if err != nil {
return err
}
capping.Count, err = strconv.Atoi(ctx.R.FormValue("Capping.Count"))
if err != nil {
return err
}
capping.Period, err = strconv.Atoi(ctx.R.FormValue("Capping.Period"))
if err != nil {
return err
}
return reflectutils.Set(obj, field.Name, capping.String())
}
package components
import (
"errors"
"strings"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.uber.org/zap"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Targeting struct {
logger *zap.Logger
}
func NewTargeting(logger *zap.Logger) *Targeting {
return &Targeting{
logger: logger,
}
}
func (t *Targeting) Component(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
data, ok := field.Value(obj).(string)
if !ok {
t.logger.Error("targeting field value is not string", zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
targeting, err := models.NewTargeting(data)
if err != nil {
t.logger.Error("error unmarshal targeting", zap.Error(err), zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
border := "border: 1px solid #ddd; border-radius: 4px; margin-bottom: 10px;"
header := "display: inline; margin: 0;"
summary := "cursor: pointer; padding: 12px;"
components := []h.HTMLComponent{
v.VCol([]h.HTMLComponent{
h.Details(
h.Summary(
h.H3("Бандлы").Style(header),
).Style(summary),
h.Div(
h.Div([]h.HTMLComponent{
h.Label("Включить").Class("v-label theme--dark"),
v.VTextarea().
Hint("com.example ru.rustore").
Attr(web.VField("Targeting.Bundle.IncludeOr",
strings.Join(targeting.Bundle.IncludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
h.Div([]h.HTMLComponent{
h.Label("Исключить").Class("v-label theme--dark"),
v.VTextarea().
Hint("com.example ru.rustore").
Attr(web.VField("Targeting.Bundle.ExcludeOr",
strings.Join(targeting.Bundle.ExcludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
).Style("padding: 16px;"),
).Style(border),
h.Details(
h.Summary(
h.H3("Страны").Style(header),
).Style(summary),
h.Div(
h.Div([]h.HTMLComponent{
h.Label("Включить").Class("v-label theme--dark"),
v.VTextarea().
Hint("RU US").
Attr(web.VField("Targeting.Country.IncludeOr",
strings.Join(targeting.Country.IncludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
h.Div([]h.HTMLComponent{
h.Label("Исключить").Class("v-label theme--dark"),
v.VTextarea().
Hint("RU US").
Attr(web.VField("Targeting.Country.ExcludeOr",
strings.Join(targeting.Country.ExcludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
).Style("padding: 16px;"),
).Style(border),
h.Details(
h.Summary(
h.H3("Регионы").Style(header),
).Style(summary),
h.Div(
h.Div([]h.HTMLComponent{
h.Label("Включить").Class("v-label theme--dark"),
v.VTextarea().
Hint("SPE MOW").
Attr(web.VField("Targeting.Region.IncludeOr",
strings.Join(targeting.Region.IncludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
h.Div([]h.HTMLComponent{
h.Label("Исключить").Class("v-label theme--dark"),
v.VTextarea().
Hint("SPE MOW").
Attr(web.VField("Targeting.Region.ExcludeOr",
strings.Join(targeting.Region.ExcludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
).Style("padding: 16px;"),
).Style(border),
h.Details(
h.Summary(
h.H3("Города").Style(header),
).Style(summary),
h.Div(
h.Div([]h.HTMLComponent{
h.Label("Включить").Class("v-label theme--dark"),
v.VTextarea().
Hint("KUF OMS").
Attr(web.VField("Targeting.City.IncludeOr",
strings.Join(targeting.City.IncludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
h.Div([]h.HTMLComponent{
h.Label("Исключить").Class("v-label theme--dark"),
v.VTextarea().
Hint("KUF OMS").
Attr(web.VField("Targeting.City.ExcludeOr",
strings.Join(targeting.City.ExcludeOr, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
).Style("padding: 16px;"),
).Style(border),
h.Details(
h.Summary(
h.H3("IP").Style(header),
).Style(summary),
h.Div(
h.Div([]h.HTMLComponent{
h.Label("Включить").Class("v-label theme--dark"),
v.VTextarea().
Hint("188.170.172.0/22 188.170.192.0/22").
Attr(web.VField("Targeting.IP.Include",
strings.Join(targeting.IP.Include, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
h.Div([]h.HTMLComponent{
h.Label("Исключить").Class("v-label theme--dark"),
v.VTextarea().
Hint("188.170.172.0/22 188.170.192.0/22").
Attr(web.VField("Targeting.IP.Exclude",
strings.Join(targeting.IP.Exclude, " "))...).
Disabled(false).
ErrorMessages(field.Errors...),
}...),
).Style("padding: 16px;"),
).Style(border),
}...),
}
return h.Div(components...).Class("targeting-field")
}
func (t *Targeting) Setter(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) error {
data, ok := field.Value(obj).(string)
if !ok {
return errors.New("budget field value is not string")
}
targeting, err := models.NewTargeting(data)
if err != nil {
return err
}
if ctx.R.Form.Has("Targeting.Bundle.IncludeOr") {
targeting.Bundle.IncludeOr = strings.Fields(ctx.R.FormValue("Targeting.Bundle.IncludeOr"))
}
if ctx.R.Form.Has("Targeting.Bundle.ExcludeOr") {
targeting.Bundle.ExcludeOr = strings.Fields(ctx.R.FormValue("Targeting.Bundle.ExcludeOr"))
}
if ctx.R.Form.Has("Targeting.Country.IncludeOr") {
targeting.Country.IncludeOr = strings.Fields(ctx.R.FormValue("Targeting.Country.IncludeOr"))
}
if ctx.R.Form.Has("Targeting.Region.IncludeOr") {
targeting.Region.IncludeOr = strings.Fields(ctx.R.FormValue("Targeting.Region.IncludeOr"))
}
if ctx.R.Form.Has("Targeting.City.IncludeOr") {
targeting.City.IncludeOr = strings.Fields(ctx.R.FormValue("Targeting.City.IncludeOr"))
}
if ctx.R.Form.Has("Targeting.Country.ExcludeOr") {
targeting.Country.ExcludeOr = strings.Fields(ctx.R.FormValue("Targeting.Country.ExcludeOr"))
}
if ctx.R.Form.Has("Targeting.Region.ExcludeOr") {
targeting.Region.ExcludeOr = strings.Fields(ctx.R.FormValue("Targeting.Region.ExcludeOr"))
}
if ctx.R.Form.Has("Targeting.City.ExcludeOr") {
targeting.City.ExcludeOr = strings.Fields(ctx.R.FormValue("Targeting.City.ExcludeOr"))
}
if ctx.R.Form.Has("Targeting.IP.Include") {
targeting.IP.Include = strings.Fields(ctx.R.FormValue("Targeting.IP.Include"))
}
if ctx.R.Form.Has("Targeting.IP.Exclude") {
targeting.IP.Exclude = strings.Fields(ctx.R.FormValue("Targeting.IP.Exclude"))
}
return reflectutils.Set(obj, field.Name, targeting.String())
}
package components
import (
"fmt"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
"github.com/sunfmin/reflectutils"
h "github.com/theplant/htmlgo"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
"go.uber.org/zap"
)
type Timetable struct {
logger *zap.Logger
}
func NewTimetable(logger *zap.Logger) *Timetable {
return &Timetable{
logger: logger,
}
}
func (t *Timetable) Component(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) h.HTMLComponent {
data, ok := field.Value(obj).(string)
if !ok {
t.logger.Error("timetable field value is not string", zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
timetable, err := models.NewTimetable(data)
if err != nil {
t.logger.Error("error unmarshal timetable", zap.Error(err), zap.String("field", field.Name), zap.Any("value", field.Value(obj)))
}
days := []string{"Пн", "Вт", "Ср", "Чт", "Пт", "Сб", "Вс"}
components := []h.HTMLComponent{
h.H3("Расписание активности").Style("margin-bottom: 8px;"),
}
titles := []h.HTMLComponent{}
for hour := 0; hour < 24; hour++ {
titles = append(titles, h.Span(fmt.Sprintf("%d", hour)).Style("font-size: 8px; width: 27px; display: inline-block; text-align: center;"))
}
components = append(components, h.Div(
titles...,
).Style("margin-left: 46px;"))
for day, name := range days {
dayComponents := []h.HTMLComponent{
h.Strong(name).Style("display: inline-block; width: 40px;"),
}
for hour := 0; hour < 24; hour++ {
key := fmt.Sprintf("Timetable[%d][%d]", day, hour)
val, ok := timetable[day][hour]
if !ok {
val = true
}
dayComponents = append(dayComponents,
h.Div(
h.Input("").Type("checkbox").Name(key).Attr(web.VField(key, val)...),
).Style("display: inline-block; margin: 2px; margin-right: 8px; padding: 2px 2px; cursor: pointer;"),
)
}
components = append(components,
v.VCol(dayComponents...),
)
}
return h.Div(components...).Class("timetable-field")
}
func (t *Timetable) Setter(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) error {
timetable := models.Timetable{}
for day := 0; day < 7; day++ {
for hour := 0; hour < 24; hour++ {
if ctx.R.FormValue(fmt.Sprintf("Timetable[%d][%d]", day, hour)) == "true" {
timetable.Set(day, hour, true)
} else {
timetable.Set(day, hour, false)
}
}
}
return reflectutils.Set(obj, field.Name, timetable.String())
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Advertiser struct {
gorm.Model
Title string
Info string
Active bool
Start time.Time
End time.Time
Targeting string
Budget string
Capping string
Timetable string
OrdContract string
ArchivedAt *time.Time
}
func (original Advertiser) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(original).Error; err != nil {
return fmt.Errorf("failed archive: %w", err)
}
campaigns := []Campaign{}
if err := db.Model(Campaign{}).
Where("advertiser_id = ?", original.ID).
Find(&campaigns).Error; err != nil {
return fmt.Errorf("error on get camapigns: %w", err)
}
for _, c := range campaigns {
if err := c.Archive(db, archive); err != nil {
return fmt.Errorf("err on archive campaigns: %w", err)
}
}
return nil
}
package models
import (
"fmt"
"time"
"github.com/qor5/admin/v3/media/media_library"
"gorm.io/gorm"
)
type Banner struct {
gorm.Model
Title string
Label string
Description string
Active bool
Erid string
OrdCategory string
OrdTargeting string
OrdFormat string
OrdKktu string
Price int
Image media_library.MediaBox `sql:"type:text;"`
Icon media_library.MediaBox `sql:"type:text;"`
Start time.Time
End time.Time
Clicktracker string
Imptracker string
Target string
Targeting string
Budget string
Capping string
BgroupID int
Bgroup Bgroup
Timetable string
ArchivedAt *time.Time
}
func (original Banner) Copy(db *gorm.DB, group int) (Banner, error) {
copy := Banner{
BgroupID: group,
Title: original.Title + " (Копия)",
Label: original.Label,
Description: original.Description,
Start: original.Start,
End: original.End,
Timetable: original.Timetable,
Targeting: original.Targeting,
Budget: original.Budget,
Capping: original.Capping,
Price: original.Price,
Active: false,
Erid: original.Erid,
OrdCategory: original.OrdCategory,
OrdTargeting: original.OrdTargeting,
OrdFormat: original.OrdFormat,
OrdKktu: original.OrdKktu,
Image: original.Image,
Icon: original.Icon,
Clicktracker: original.Clicktracker,
Imptracker: original.Imptracker,
Target: original.Target,
}
if err := db.Create(©).Error; err != nil {
return copy, fmt.Errorf("failed to create copy banners: %w", err)
}
return copy, nil
}
func (original Banner) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(original).Error; err != nil {
return fmt.Errorf("failed archive: %w", err)
}
return nil
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Bgroup struct {
gorm.Model
Title string
Active bool
Price int
Start time.Time
End time.Time
Targeting string
Budget string
Capping string
Timetable string
CampaignID int
Campaign Campaign
ArchivedAt *time.Time
}
func (original Bgroup) Copy(db *gorm.DB, campaign int) (Bgroup, error) {
copy := Bgroup{
CampaignID: campaign,
Title: original.Title + " (Копия)",
Price: original.Price,
Start: original.Start,
End: original.End,
Timetable: original.Timetable,
Targeting: original.Targeting,
Budget: original.Budget,
Capping: original.Capping,
Active: false,
}
if err := db.Create(©).Error; err != nil {
return Bgroup{}, fmt.Errorf("failed to create copy: %w", err)
}
banners := []Banner{}
if err := db.Model(Banner{}).
Where("bgroup_id = ?", original.ID).
Find(&banners).Error; err != nil {
return Bgroup{}, fmt.Errorf("error on get banners: %w", err)
}
for _, b := range banners {
if _, err := b.Copy(db, int(copy.ID)); err != nil {
return Bgroup{}, fmt.Errorf("err on copy banner: %w", err)
}
}
return copy, nil
}
func (original Bgroup) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(&original).Error; err != nil {
return fmt.Errorf("failed to archive: %w", err)
}
banners := []Banner{}
if err := db.Model(Banner{}).
Where("bgroup_id = ?", original.ID).
Find(&banners).Error; err != nil {
return fmt.Errorf("error on get banners: %w", err)
}
for _, b := range banners {
if err := b.Archive(db, archive); err != nil {
return fmt.Errorf("err on archive banner: %w", err)
}
}
return nil
}
package models
import "encoding/json"
type Limit struct {
Daily int `json:"daily"`
Total int `json:"total"`
Uniform bool `json:"uniform"`
}
type Budget struct {
Impressions Limit `json:"impressions"`
Clicks Limit `json:"clicks"`
Money Limit `json:"money"`
Conversions Limit `json:"conversions"`
}
func NewBudget(data string) (Budget, error) {
b := Budget{}
if data == "" {
return b, nil
}
if err := json.Unmarshal([]byte(data), &b); err != nil {
return Budget{}, err
}
return b, nil
}
func (b Budget) String() string {
data, err := json.Marshal(b)
if err != nil {
return ""
}
return string(data)
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Campaign struct {
gorm.Model
Title string
Active bool
Bundle string
Start time.Time
End time.Time
Targeting string
Budget string
Capping string
Timetable string
AdvertiserID int
Advertiser Advertiser
ArchivedAt *time.Time
}
func (original Campaign) Copy(db *gorm.DB, advertiser int) (Campaign, error) {
copy := Campaign{
AdvertiserID: advertiser,
Title: original.Title + " (Копия)",
Bundle: original.Bundle,
Start: original.Start,
End: original.End,
Timetable: original.Timetable,
Targeting: original.Targeting,
Budget: original.Budget,
Capping: original.Capping,
Active: false,
}
// Сохраняем копию в базу данных
if err := db.Create(©).Error; err != nil {
return Campaign{}, fmt.Errorf("failed to create copy: %w", err)
}
groups := []Bgroup{}
if err := db.Model(Bgroup{}).
Where("campaign_id = ?", original.ID).
Find(&groups).Error; err != nil {
return Campaign{}, fmt.Errorf("error on get groups: %w", err)
}
for _, g := range groups {
if _, err := g.Copy(db, int(copy.ID)); err != nil {
return Campaign{}, fmt.Errorf("err on copy groups: %w", err)
}
}
return copy, nil
}
func (original Campaign) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(original).Error; err != nil {
return fmt.Errorf("failed archive: %w", err)
}
groups := []Bgroup{}
if err := db.Model(Bgroup{}).
Where("campaign_id = ?", original.ID).
Find(&groups).Error; err != nil {
return fmt.Errorf("error on get groups: %w", err)
}
for _, g := range groups {
if err := g.Archive(db, archive); err != nil {
return fmt.Errorf("err on archive groups: %w", err)
}
}
return nil
}
package models
import "encoding/json"
const CappingKeyTemplate = "%s:%s:%s" // uid, item, id
type Capping struct {
Count int `json:"count"`
Period int `json:"period"`
}
func NewCapping(data string) (Capping, error) {
c := Capping{}
if data == "" {
return c, nil
}
if err := json.Unmarshal([]byte(data), &c); err != nil {
return Capping{}, err
}
return c, nil
}
func (c Capping) String() string {
data, err := json.Marshal(c)
if err != nil {
return ""
}
return string(data)
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Network struct {
gorm.Model
Title string
Name string
Data string
Active bool
ArchivedAt *time.Time
}
func (original Network) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(&original).Error; err != nil {
return fmt.Errorf("failed to archive: %w", err)
}
return nil
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Placement struct {
gorm.Model
Title string
Active bool
ArchivedAt *time.Time
}
func (original Placement) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(&original).Error; err != nil {
return fmt.Errorf("failed to archive: %w", err)
}
return nil
}
package models
import "encoding/json"
type ExcludeIncludeOrAnd struct {
IncludeOr []string `json:"include_or"`
ExcludeOr []string `json:"exclude_or"`
IncludeAnd []string `json:"include_and"`
ExcludeAnd []string `json:"exclude_and"`
}
type ExcludeInclude struct {
Include []string `json:"include"`
Exclude []string `json:"exclude"`
}
type Targeting struct {
Bundle ExcludeIncludeOrAnd `json:"bundle"`
Audience ExcludeIncludeOrAnd `json:"audience"`
Bapp ExcludeIncludeOrAnd `json:"bapp"`
Country ExcludeIncludeOrAnd `json:"country"`
Region ExcludeIncludeOrAnd `json:"region"`
City ExcludeIncludeOrAnd `json:"city"`
IP ExcludeInclude `json:"ip"`
Network ExcludeIncludeOrAnd `json:"network"`
}
func NewTargeting(data string) (Targeting, error) {
t := Targeting{}
if data == "" {
return t, nil
}
if err := json.Unmarshal([]byte(data), &t); err != nil {
return Targeting{}, err
}
return t, nil
}
func (t Targeting) String() string {
data, err := json.Marshal(t)
if err != nil {
return ""
}
return string(data)
}
package models
import "encoding/json"
type Timetable map[int]map[int]bool // 7 дней, 24 часа
func NewTimetable(data string) (Timetable, error) {
t := Timetable{}
if data == "" {
return t, nil
}
if err := json.Unmarshal([]byte(data), &t); err != nil {
return Timetable{}, err
}
return t, nil
}
func (t Timetable) Validate(day, hour int) bool {
if len(t) == 0 {
return true
}
if day < 1 || day > 7 || hour < 0 || hour > 23 {
return false
}
val, ok := t[day][hour]
if !ok {
return false
}
return val
}
func (t Timetable) Set(day, hour int, value bool) {
if t[day] == nil {
t[day] = map[int]bool{}
}
t[day][hour] = value
}
func (t Timetable) String() string {
if len(t) == 0 {
return ""
}
data, _ := json.Marshal(t)
return string(data)
}
package models
import (
"fmt"
"time"
"gorm.io/gorm"
)
type Unit struct {
gorm.Model
Title string
Price int
NetworkID int
Network Network
PlacementID int
Placement Placement
Data string
Active bool
ArchivedAt *time.Time
}
func (original Unit) Archive(db *gorm.DB, archive *time.Time) error {
original.ArchivedAt = archive
if err := db.Save(&original).Error; err != nil {
return fmt.Errorf("failed to archive: %w", err)
}
return nil
}
package media
import (
"github.com/qor5/admin/v3/media"
"github.com/qor5/admin/v3/media/oss"
"github.com/qor5/admin/v3/presets"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/s3storage"
)
type Media struct {
db *gorm.DB
s3storage *s3storage.Client
}
func New(db *gorm.DB, s3storage *s3storage.Client) *Media {
return &Media{
db: db,
s3storage: s3storage,
}
}
func (m *Media) Configure(b *presets.Builder) {
oss.Storage = m.s3storage
mediab := media.New(m.db).AutoMigrate()
b.Use(mediab)
// mediab.GetPresetsModelBuilder().
// Label("Медиа")
}
package stats
import (
"context"
"fmt"
"strings"
"time"
"github.com/uptrace/go-clickhouse/ch"
"go.uber.org/zap"
"go.ads.coffee/platform/admin/internal/clickhouse"
)
const (
Date = "2006-01-02"
DateHour = "2006-01-02 15"
DateHourMinute = "2006-01-02 15:04"
ByHour = "hour"
ByDay = "day"
)
// достыпные метрики
const (
MetricRequests = "requests"
MetricResponses = "responses"
MetricWins = "wins"
MetricImpressions = "impressions"
MetricClicks = "clicks"
MetricConversions = "conversions"
MetricPrice = "price"
)
// доступные фильтры
const (
FilterAdvertiserId = "advertiser_id"
FilterCampaignId = "campaign_id"
FilterGroupId = "group_id"
FilterBannerId = "banner_id"
FilterNetwork = "network"
FilterBundle = "bundle"
)
// доступные группировки
const (
GroupAdvertiser = "advertiser_id"
GroupCampaign = "campaign_id"
GroupGroup = "group_id"
GroupBanner = "banner_id"
GroupNetwork = "network"
GroupBundle = "bundle"
GroupSlot = "slot"
)
type Query struct {
logger *zap.Logger
clickhouse *clickhouse.Clickhouse
}
func NewQuery(logger *zap.Logger, clickhouse *clickhouse.Clickhouse) *Query {
return &Query{
logger: logger,
clickhouse: clickhouse,
}
}
type Stat struct {
Labels []string
// metric name hour value
Datasets map[string]map[string]float64
}
type Condition struct {
From time.Time
To time.Time
Metrics []string // из каких табличек нужно доставать данные
Filters []Filter
Groups []string
By string
}
type Filter struct {
Field string
Value []string
}
func (q *Query) Select(ctx context.Context, condition Condition) (Stat, error) {
if len(condition.Groups) == 0 {
return Stat{}, nil
}
if len(condition.Metrics) == 0 {
return Stat{}, nil
}
diff := condition.To.Sub(condition.From)
labels := []string{}
if condition.By == ByDay {
cnt := int(diff.Hours()/24) + 1
for i := 0; i < cnt; i++ {
labels = append(labels, condition.From.Add(time.Duration(i)*time.Hour*24).Format(Date))
}
} else {
cnt := int(diff.Hours())
for i := 0; i < cnt; i++ {
labels = append(labels, condition.From.Add(time.Duration(i)*time.Hour).Format(DateHour))
}
}
datasets := map[string]map[string]float64{}
for _, metric := range condition.Metrics {
st, err := q.query(ctx, metric, condition, labels)
if err != nil {
q.logger.Error("query", zap.Error(err))
return Stat{}, err
}
for key, item := range st {
datasets[key] = item
}
}
return Stat{
Labels: labels,
Datasets: datasets,
}, nil
}
type Item struct {
Time time.Time
Label0 string
Label1 string
Label2 string
Label3 string
Label4 string
Value float64
}
func (i Item) Key(metric string, groups []string) string {
if i.Label4 != "" {
return metric + " - " + groups[0] + ":" + i.Label0 + " - " + groups[1] + ":" + i.Label1 + " - " + groups[2] + ":" + i.Label2 + " - " + groups[3] + ":" + i.Label3 + " - " + groups[4] + ":" + i.Label4
}
if i.Label3 != "" {
return metric + " - " + groups[0] + ":" + i.Label0 + " - " + groups[1] + ":" + i.Label1 + " - " + groups[2] + ":" + i.Label2 + " - " + groups[3] + ":" + i.Label3
}
if i.Label2 != "" {
return metric + " - " + groups[0] + ":" + i.Label0 + " - " + groups[1] + ":" + i.Label1 + " - " + groups[2] + ":" + i.Label2
}
if i.Label1 != "" {
return metric + " - " + groups[0] + ":" + i.Label0 + " - " + groups[1] + ":" + i.Label1
}
if i.Label0 != "" {
return metric + " - " + groups[0] + ":" + i.Label0
}
return metric
}
func (q *Query) query(ctx context.Context, metric string, condition Condition, hours []string) (map[string]map[string]float64, error) {
sel := q.clickhouse.DB.NewSelect()
if condition.By == ByDay {
sel.ColumnExpr("toDate(timestamp) as time")
} else {
sel.ColumnExpr("timestamp as time")
}
for i, group := range condition.Groups {
sel.ColumnExpr(group+" as label?", i)
}
// если метрика деньги, то тут берем price
if metric == MetricPrice {
sel.ColumnExpr("toFloat64(sum(price)) as value")
} else {
sel.ColumnExpr("sum(count) as value")
}
// если метрика деньги, то тут берем impressions
if metric == MetricPrice {
sel.ModelTableExpr("impressions_hour")
} else {
sel.ModelTableExpr(metric + "_hour")
}
for _, filter := range condition.Filters {
if len(filter.Value) == 0 {
continue
}
sel.Where(filter.Field+" IN (?)", ch.In(filter.Value))
}
sel.Where("timestamp >= ?", condition.From)
sel.Where("timestamp <= ?", condition.To)
sel.Group("time")
for _, group := range condition.Groups {
sel.Group(group)
}
sel.Order("time DESC")
sel.Limit(1000)
fmt.Println(sel.String())
items := []Item{}
err := sel.Scan(ctx, &items)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, nil
}
data := map[string]map[string]float64{}
// раскладываю по метрикам часы с их значениями
for _, item := range items {
key := item.Key(metric, condition.Groups)
hour := ""
if condition.By == ByDay {
hour = item.Time.Format(Date)
} else {
hour = item.Time.Format(DateHour)
}
if _, exist := data[key]; !exist {
data[key] = map[string]float64{}
}
data[key][hour] = item.Value
}
if condition.By == ByHour {
// заполняем дырки в случае отсутствия данных в определенном часе
for key, item := range data {
for _, hour := range hours {
_, exist := item[hour]
if !exist {
data[key][hour] = 0
}
}
}
}
return data, nil
}
type BundleModel struct {
Bundle string
}
func (q *Query) bundles(ctx context.Context) ([]Option, error) {
sel := q.clickhouse.DB.NewSelect()
sel.ColumnExpr("bundle")
sel.ModelTableExpr("impressions_hour")
sel.Group("bundle")
items := []BundleModel{}
err := sel.Scan(ctx, &items)
if err != nil {
return nil, err
}
options := make([]Option, 0, len(items))
for _, item := range items {
if !strings.Contains(item.Bundle, ".") {
continue
}
options = append(options, Option{
ID: item.Bundle,
Name: item.Bundle,
})
}
return options, err
}
type NetworkModel struct {
Network string
}
func (q *Query) networks(ctx context.Context) ([]Option, error) {
sel := q.clickhouse.DB.NewSelect()
sel.ColumnExpr("network")
sel.ModelTableExpr("impressions_hour")
sel.Group("network")
items := []NetworkModel{}
err := sel.Scan(ctx, &items)
if err != nil {
return nil, err
}
options := make([]Option, 0, len(items))
for _, item := range items {
options = append(options, Option{
ID: item.Network,
Name: item.Network,
})
}
return options, err
}
type SlotModel struct {
Slot string
}
package stats
import (
"bytes"
"context"
"fmt"
"html/template"
"strings"
"time"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
v "github.com/qor5/x/v3/ui/vuetify"
vx "github.com/qor5/x/v3/ui/vuetifyx"
h "github.com/theplant/htmlgo"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/ads/models"
)
type Stats struct {
db *gorm.DB
query *Query
template *template.Template
}
func New(db *gorm.DB, query *Query) *Stats {
tm := template.Must(template.New("").Parse(page))
return &Stats{
db: db,
query: query,
template: tm,
}
}
type Dashboard struct{}
type DateRow struct {
ID int
Date string
}
type Option struct {
ID string
Name string
}
func (s *Stats) Configure(pb *presets.Builder) {
b := pb.Model(&Dashboard{}).
MenuIcon("mdi-view-dashboard").
URIName("dashboard")
b.RegisterEventFunc("reload_dashboard", func(ctx *web.EventContext) (er web.EventResponse, err error) {
// Получаем все параметры из запроса
params := ctx.Queries()
// Формируем URL с параметрами
url := "/admin/dashboard"
if len(params) > 0 {
url += "?" + params.Encode()
}
// Делаем полный редирект
er.RedirectURL = url
return
})
lb := b.Listing()
lb.PageFunc(func(ctx *web.EventContext) (r web.PageResponse, err error) {
// filters
startedAt := started(ctx)
endedAt := ended(ctx, startedAt)
metrics := parse(ctx, "metrics")
if metrics == nil {
metrics = []string{MetricImpressions, MetricClicks}
}
grouped := parse(ctx, "grouped")
banenrs := parse(ctx, "banenrs")
groups := parse(ctx, "groups")
campaigns := parse(ctx, "campaigns")
advertisers := parse(ctx, "advertisers")
bundles := parse(ctx, "bundles")
networks := parse(ctx, "networks")
// load stats
data, err := s.query.Select(context.Background(), Condition{
From: startedAt,
To: endedAt,
Metrics: metrics,
Filters: []Filter{
{
Field: FilterAdvertiserId,
Value: advertisers,
},
{
Field: FilterCampaignId,
Value: campaigns,
},
{
Field: FilterGroupId,
Value: groups,
},
{
Field: FilterBannerId,
Value: banenrs,
},
{
Field: FilterBundle,
Value: bundles,
},
{
Field: FilterNetwork,
Value: networks,
},
},
Groups: grouped,
})
if err != nil {
return r, err
}
notes := make([]*DateRow, 0, len(data.Labels))
for i := len(data.Labels); i > 0; i-- {
notes = append(notes, &DateRow{
ID: i - 1,
Date: data.Labels[i-1],
})
}
// prepare table
dt := vx.DataTable(notes).WithoutHeader(false)
bundleOptions, _ := s.query.bundles(context.Background())
networkOptions, _ := s.query.networks(context.Background())
dt.Hover(true)
dt.Column("Date").Title("Дата и час").CellComponentFunc(func(obj interface{}, fieldName string, ctx *web.EventContext) h.HTMLComponent {
n := obj.(*DateRow)
return h.Td(h.Text(n.Date)).Style("width: 136px; min-width: 136px;")
})
for key, dataset := range data.Datasets {
dt.Column(key).Title(key).CellComponentFunc(func(obj interface{}, fieldName string, ctx *web.EventContext) h.HTMLComponent {
n := obj.(*DateRow)
return h.Td(h.Text(fmt.Sprintf("%.2f", dataset[n.Date]))).Style("min-width: 100px")
})
}
// graph
var script bytes.Buffer
if err := s.template.Execute(&script, data); err != nil {
return r, err
}
// render page
body := v.VContainer(
h.Div(
h.RawHTML(`<div style="width: 100%; height: 400px;">
<canvas id="stats-chart" style="width: 100%; height: 400px;"></canvas>
</div>`),
h.Script("").Attr("src", "https://cdn.jsdelivr.net/npm/chart.js"),
h.Script(script.String()),
).Style("margin-bottom: 30px"),
web.Scope(
h.Div(
h.H3("Срезы"),
).Style("margin-bottom: 20px; margin-top: 30px"),
v.VRow(
h.Div(
vx.VXSelect().
Items(s.metrics()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("Metrics", metrics)...).
Label("Метрики"),
).Id("Metrics").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXSelect().
Items(s.grouped()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("Grouped", grouped)...).
Label("Группировки"),
).Id("Grouped").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXDatepicker().Type("datetimepicker").
Format("YYYY-MM-DD HH:mm").
Clearable(true).
Id("StartedAt").
Attr(web.VField("StartedAt", startedAt)...).
Label("Начало").
Width(240),
).Style("margin-right: 34px; padding-left: 12px;"),
vx.VXDatepicker().Type("datetimepicker").
Format("YYYY-MM-DD HH:mm").
Clearable(true).
Id("EndedAt").
Attr(web.VField("EndedAt", endedAt)...).
Label("Конец").
Width(240),
),
h.Div(
h.H3("Фильтры"),
).Style("margin-bottom: 20px; margin-top: 30px"),
v.VRow(
h.Div(
vx.VXSelect().
Items(s.banners()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("BannerId", banenrs)...).
Label("Баннер"),
).Id("BannerId").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXSelect().
Items(s.groups()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("GroupId", groups)...).
Label("Группа"),
).Id("GroupId").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXSelect().
Items(s.campaigns()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("CampaignId", campaigns)...).
Label("Кампания"),
).Id("CampaignId").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXSelect().
Items(s.advertisers()).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("AdvertiserId", advertisers)...).
Label("Рекламодатель"),
).Id("AdvertiserId").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
),
v.VRow(
h.Div(
vx.VXSelect().
Items(bundleOptions).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("Bundle", bundles)...).
Label("Бандл"),
).Id("Bundle").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
h.Div(
vx.VXSelect().
Items(networkOptions).
ItemTitle("Name").
ItemValue("ID").
Multiple(true).
Clearable(true).
Chips(true).
Attr(web.VField("Network", networks)...).
Label("Сеть"),
).Id("Network").Style("margin-right: 20px; padding-left: 12px; width: 250px"),
),
h.Div(
v.VBtn("Обновить").Color("primary").Attr("@click", web.Plaid().
EventFunc("reload_dashboard").
Query("started_at", web.Var("form.StartedAt")).
Query("ended_at", web.Var("form.EndedAt")).
Query("banners", web.Var("form.BannerId ? form.BannerId.join(',') : ''")).
Query("groups", web.Var("form.GroupId ? form.GroupId.join(',') : ''")).
Query("campaigns", web.Var("form.CampaignId ? form.CampaignId.join(',') : ''")).
Query("advertisers", web.Var("form.AdvertiserId ? form.AdvertiserId.join(',') : ''")).
Query("bundles", web.Var("form.Bundle ? form.Bundle.join(',') : ''")).
Query("networks", web.Var("form.Network ? form.Network.join(',') : ''")).
Query("grouped", web.Var("form.Grouped ? form.Grouped.join(',') : ''")).
Query("by", web.Var("form.By")).
Query("metrics", web.Var("form.Metrics ? form.Metrics.join(',') : ''")).
Go()),
).Style("margin-top: 20px; margin-bottom: 30px"),
).VSlot("{ locals, form }"),
h.Div(
dt,
).Style("margin-top: 20px"),
)
r.Body = body
r.PageTitle = "Отчеты"
return
})
}
const page = `
document.addEventListener('DOMContentLoaded', function() {
const ctx = document.getElementById('stats-chart').getContext('2d');
new Chart(ctx, {
type: 'line',
data: {
labels: [
{{range $index, $elem := .Labels}}
{{if $index}},{{end}}
'{{$elem}}'
{{end}}
],
datasets: [
{{$labels := .Labels}}
{{range $name, $dataset := .Datasets}}
{
label: '{{$name}}',
data: [
{{range $index, $elem := $labels}}
{{if $index}},{{end}}
{{ index $dataset $elem }}
{{end}}
],
borderWidth: 1
},
{{end}}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
scales: {
y: {
beginAtZero: true
}
}
}
});
});
`
func (s *Stats) banners() []Option {
modles := []models.Banner{}
s.db.Model(&models.Banner{}).Find(&modles)
items := make([]Option, 0, len(modles))
for _, model := range modles {
items = append(items, Option{
ID: fmt.Sprintf("%d", model.ID),
Name: model.Title,
})
}
return items
}
func (s *Stats) groups() []Option {
modles := []models.Bgroup{}
s.db.Model(&models.Bgroup{}).Find(&modles)
items := make([]Option, 0, len(modles))
for _, model := range modles {
items = append(items, Option{
ID: fmt.Sprintf("%d", model.ID),
Name: model.Title,
})
}
return items
}
func (s *Stats) campaigns() []Option {
modles := []models.Campaign{}
s.db.Model(&models.Campaign{}).Find(&modles)
items := make([]Option, 0, len(modles))
for _, group := range modles {
items = append(items, Option{
ID: fmt.Sprintf("%d", group.ID),
Name: group.Title,
})
}
return items
}
func (s *Stats) advertisers() []Option {
modles := []models.Advertiser{}
s.db.Model(&models.Advertiser{}).Find(&modles)
items := make([]Option, 0, len(modles))
for _, group := range modles {
items = append(items, Option{
ID: fmt.Sprintf("%d", group.ID),
Name: group.Title,
})
}
return items
}
func (s *Stats) metrics() []Option {
return []Option{
{
ID: MetricRequests,
Name: "Запросы",
},
{
ID: MetricResponses,
Name: "Респонсы",
},
{
ID: MetricWins,
Name: "Победы",
},
{
ID: MetricImpressions,
Name: "Показы",
},
{
ID: MetricClicks,
Name: "Клики",
},
{
ID: MetricConversions,
Name: "Конверсии",
},
{
ID: MetricPrice,
Name: "Деньги",
},
}
}
func (s *Stats) grouped() []Option {
return []Option{
{
ID: GroupBanner,
Name: "Баннеры",
},
{
ID: GroupGroup,
Name: "Группы",
},
{
ID: GroupCampaign,
Name: "Кампании",
},
{
ID: GroupAdvertiser,
Name: "Рекламодатели",
},
{
ID: GroupNetwork,
Name: "Сети",
},
{
ID: GroupBundle,
Name: "Бандлы",
},
}
}
func parse(ctx *web.EventContext, key string) []string {
v := ctx.R.FormValue(key)
if v == "" {
return nil
}
items := []string{}
for _, c := range strings.Split(v, ",") {
items = append(items, strings.TrimSpace(c))
}
return items
}
func started(ctx *web.EventContext) time.Time {
in := ctx.R.FormValue("started_at")
if in == "" {
return time.Now().Add(-(time.Hour * 24))
}
t, err := time.Parse(DateHourMinute, in)
if err != nil {
return time.Now().Add(-(time.Hour * 24))
}
return t
}
func ended(ctx *web.EventContext, started time.Time) time.Time {
var (
t time.Time
err error
)
in := ctx.R.FormValue("ended_at")
if in == "" {
t = time.Now()
} else {
t, err = time.Parse(DateHourMinute, in)
if err != nil {
t = time.Now()
}
}
if t.Before(started) {
return started
}
return t
}
package users
import (
plogin "github.com/qor5/admin/v3/login"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/web/v3"
"github.com/qor5/x/v3/login"
h "github.com/theplant/htmlgo"
"gorm.io/gorm"
"go.ads.coffee/platform/admin/internal/modules/users/models"
)
var loginSecret = "edfkdehfkdfhu"
type Users struct {
db *gorm.DB
}
func New(db *gorm.DB) *Users {
return &Users{
db: db,
}
}
func (u *Users) Auth(pb *presets.Builder) *login.Builder {
lb := plogin.New(pb).
DB(u.db).
UserModel(&models.User{}).
Secret(loginSecret).
TOTP(false)
pb.ProfileFunc(func(ctx *web.EventContext) h.HTMLComponent {
return h.A(h.Text("Выход")).Href(lb.LogoutURL).Style("margin-left:100px")
})
return lb
}
func (u *Users) Configure(b *presets.Builder) {
m := b.Model(&models.User{}).
MenuIcon("mdi-account-multiple")
// Label("Пользователи")
m.Editing("Name", "Account", "Password").Field("Password").
SetterFunc(func(obj interface{}, field *presets.FieldContext, ctx *web.EventContext) (err error) {
u := obj.(*models.User)
if v := ctx.R.FormValue(field.Name); v != "" {
u.Password = v
u.EncryptPassword()
}
return nil
})
}
func (u *Users) Migrate() {
err := u.db.AutoMigrate(
&models.User{},
)
if err != nil {
panic(err)
}
}
//nolint:staticcheck
package s3storage
import (
"bytes"
"context"
"fmt"
"io"
"mime"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/opentracing/opentracing-go/log"
"github.com/qor5/x/v3/oss"
)
// S3Service interface for S3 operations
type S3Service interface {
GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error)
PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error)
DeleteObjectWithContext(ctx context.Context, input *s3.DeleteObjectInput, opts ...request.Option) (*s3.DeleteObjectOutput, error)
DeleteObjectsWithContext(ctx context.Context, input *s3.DeleteObjectsInput, opts ...request.Option) (*s3.DeleteObjectsOutput, error)
ListObjectsV2WithContext(ctx context.Context, input *s3.ListObjectsV2Input, opts ...request.Option) (*s3.ListObjectsV2Output, error)
GetObjectRequest(input *s3.GetObjectInput) (*request.Request, *s3.GetObjectOutput)
CopyObjectWithContext(ctx context.Context, input *s3.CopyObjectInput, opts ...request.Option) (*s3.CopyObjectOutput, error)
SelectObjectContentWithContext(ctx context.Context, input *s3.SelectObjectContentInput, opts ...request.Option) (*s3.SelectObjectContentOutput, error)
}
// Client S3 storage
type Client struct {
S3 S3Service
Config Config
}
// New initialize S3 storage
func New(config Config) *Client {
if config.ACL == "" {
config.ACL = "public-read"
}
client := &Client{Config: config}
// Создаем AWS конфигурацию
awsConfig := &aws.Config{
Region: aws.String(config.Region),
S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle),
}
// Устанавливаем endpoint если указан
if config.S3Endpoint != "" {
awsConfig.Endpoint = aws.String(config.S3Endpoint)
}
// Используем статические credentials
awsConfig.Credentials = credentials.NewStaticCredentials(
config.AccessID,
config.AccessKey,
config.SessionToken,
)
sess := session.Must(session.NewSession(awsConfig))
client.S3 = s3.New(sess) // *s3.S3 implements S3Service
return client
}
// Get receive file with given path
func (client Client) Get(ctx context.Context, path string) (file *os.File, err error) {
readCloser, err := client.GetStream(ctx, path)
ext := filepath.Ext(path)
pattern := fmt.Sprintf("s3*%s", ext)
if err == nil {
if file, err = os.CreateTemp("/tmp", pattern); err == nil {
defer func() {
if err := readCloser.Close(); err != nil {
log.Error(err)
}
}()
_, err = io.Copy(file, readCloser)
if _, err := file.Seek(0, 0); err != nil {
return nil, err
}
}
}
return file, err
}
// GetStream get file as stream
func (client Client) GetStream(ctx context.Context, path string) (io.ReadCloser, error) {
getResponse, err := client.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(client.Config.Bucket),
Key: aws.String(client.ToS3Key(path)),
})
if err != nil {
return nil, err
}
return getResponse.Body, err
}
// Put store a reader into given path
func (client Client) Put(ctx context.Context, urlPath string, reader io.Reader) (*oss.Object, error) {
if seeker, ok := reader.(io.ReadSeeker); ok {
if _, err := seeker.Seek(0, 0); err != nil {
return nil, err
}
}
urlPath = client.ToS3Key(urlPath)
buffer, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
fileType := mime.TypeByExtension(path.Ext(urlPath))
if fileType == "" {
fileType = http.DetectContentType(buffer)
}
params := &s3.PutObjectInput{
Bucket: aws.String(client.Config.Bucket), // required
Key: aws.String(urlPath), // required
ACL: aws.String(client.Config.ACL),
Body: bytes.NewReader(buffer),
ContentLength: aws.Int64(int64(len(buffer))),
ContentType: aws.String(fileType),
}
if client.Config.CacheControl != "" {
params.CacheControl = aws.String(client.Config.CacheControl)
}
_, err = client.S3.PutObjectWithContext(ctx, params)
if err != nil {
return nil, err
}
now := time.Now()
return &oss.Object{
Path: urlPath,
Name: filepath.Base(urlPath),
LastModified: &now,
StorageInterface: client,
}, nil
}
// Delete delete file
func (client Client) Delete(ctx context.Context, path string) error {
_, err := client.S3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(client.Config.Bucket),
Key: aws.String(client.ToS3Key(path)),
})
return err
}
// DeleteObjects delete files in bulk
func (client Client) DeleteObjects(ctx context.Context, paths []string) (err error) {
var objs []*s3.ObjectIdentifier
for _, v := range paths {
obj := &s3.ObjectIdentifier{
Key: aws.String(strings.TrimPrefix(client.ToS3Key(v), "/")),
}
objs = append(objs, obj)
}
input := &s3.DeleteObjectsInput{
Bucket: aws.String(client.Config.Bucket),
Delete: &s3.Delete{
Objects: objs,
},
}
_, err = client.S3.DeleteObjectsWithContext(ctx, input)
if err != nil {
return err
}
return nil
}
// List list all objects under current path
func (client Client) List(ctx context.Context, path string) ([]*oss.Object, error) {
var objects []*oss.Object
var prefix string
if path != "" {
prefix = strings.Trim(path, "/") + "/"
}
listObjectsResponse, err := client.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(client.Config.Bucket),
Prefix: aws.String(prefix),
})
if err == nil {
for _, content := range listObjectsResponse.Contents {
objects = append(objects, &oss.Object{
Path: "/" + client.ToS3Key(*content.Key),
Name: filepath.Base(*content.Key),
LastModified: content.LastModified,
StorageInterface: client,
})
}
}
return objects, err
}
// GetEndpoint get endpoint, FileSystem's endpoint is /
func (client Client) GetEndpoint(ctx context.Context) string {
if client.Config.Endpoint != "" {
return client.Config.Endpoint
}
endpoint := client.getS3Endpoint(ctx)
for _, prefix := range []string{"https://", "http://"} {
endpoint = strings.TrimPrefix(endpoint, prefix)
}
if strings.Contains(endpoint, "localhost") {
return endpoint + "/" + client.Config.Bucket + "/"
}
return client.Config.Bucket + "." + endpoint
}
var urlRegexp = regexp.MustCompile(`(https?:)?//((\w+).)+(\w+)/`)
// ToS3Key process path to s3 key
func (client Client) ToS3Key(urlPath string) string {
if urlRegexp.MatchString(urlPath) {
if u, err := url.Parse(urlPath); err == nil {
if client.Config.S3ForcePathStyle { // First part of path will be bucket name
return strings.TrimPrefix(strings.TrimPrefix(u.Path, "/"+client.Config.Bucket), "/")
}
return strings.TrimPrefix(u.Path, "/")
}
}
if client.Config.S3ForcePathStyle { // First part of path will be bucket name
return strings.TrimPrefix(urlPath, "/"+client.Config.Bucket+"/")
}
return strings.TrimPrefix(urlPath, "/")
}
// GetURL get public accessible URL
func (client Client) GetURL(ctx context.Context, path string) (url string, err error) {
if client.getS3Endpoint(ctx) == "" {
if client.Config.ACL == "private" || client.Config.ACL == "authenticated-read" {
req, _ := client.S3.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(client.Config.Bucket),
Key: aws.String(client.ToS3Key(path)),
})
url, err = req.Presign(1 * time.Hour)
if err != nil {
return "", err
}
return url, nil
}
}
return path, nil
}
// Copy copy s3 file from "from" to "to"
func (client Client) Copy(ctx context.Context, from, to string) (err error) {
_, err = client.S3.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
Bucket: aws.String(client.Config.Bucket),
CopySource: aws.String(from),
Key: aws.String(to),
})
return err
}
func (client Client) getS3Endpoint(ctx context.Context) string {
if client.Config.S3Endpoint != "" {
return client.Config.S3Endpoint
}
// В AWS SDK v1 endpoint обычно определяется автоматически на основе региона
// Для кастомных endpoint нужно указывать явно в конфиге
return ""
}
// Дополнительные методы для совместимости
// PutWithContext алиас для Put с контекстом
func (client Client) PutWithContext(ctx context.Context, urlPath string, reader io.Reader) (*oss.Object, error) {
return client.Put(ctx, urlPath, reader)
}
// GetWithContext алиас для Get с контекстом
func (client Client) GetWithContext(ctx context.Context, path string) (*os.File, error) {
return client.Get(ctx, path)
}
// DeleteWithContext алиас для Delete с контекстом
func (client Client) DeleteWithContext(ctx context.Context, path string) error {
return client.Delete(ctx, path)
}
func (client Client) SelectObjectContentWithContext(
ctx context.Context,
in *s3.SelectObjectContentInput,
) (*s3.SelectObjectContentOutput, error) {
return client.S3.SelectObjectContentWithContext(ctx, in)
}
package server
import (
"context"
"net"
"net/http"
"github.com/qor5/admin/v3/presets"
"github.com/qor5/x/v3/login"
"go.uber.org/zap"
)
type Server struct {
config Config
logger *zap.Logger
lb *login.Builder
pb *presets.Builder
srv *http.Server
}
func New(
config Config,
logger *zap.Logger,
lb *login.Builder,
pb *presets.Builder,
) *Server {
return &Server{
config: config,
logger: logger,
lb: lb,
pb: pb,
}
}
func (s *Server) Serve() error {
s.logger.Info("app server", zap.String("host", "http://localhost"+s.config.Port+"/admin"))
mux := http.NewServeMux()
mux.Handle("/", s.pb)
s.lb.Mount(mux)
handler := s.lb.Middleware()(mux)
s.srv = &http.Server{
Addr: s.config.Port,
Handler: handler,
}
ln, err := net.Listen("tcp", s.srv.Addr)
if err != nil {
return err
}
go func() {
if err := s.srv.Serve(ln); err != nil && err != http.ErrServerClosed {
s.logger.Error("app server", zap.Error(err))
}
}()
return nil
}
func (s *Server) Shutdown(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}
package main
import (
"context"
"os"
"github.com/urfave/cli/v3"
"go.uber.org/fx"
"go.ads.coffee/platform/analytics/internal/clickhouse"
"go.ads.coffee/platform/analytics/internal/config"
"go.ads.coffee/platform/analytics/internal/handlers"
)
func main() {
cmd := &cli.Command{
Name: "analytics",
Flags: []cli.Flag{
&cli.StringFlag{Name: "config", Aliases: []string{"c"}},
},
Commands: []*cli.Command{
{
Name: "aggregations",
Flags: []cli.Flag{
&cli.StringFlag{Name: "table", Aliases: []string{"t"}},
},
Aliases: []string{"a"},
Action: func(ctx context.Context, cmd *cli.Command) error {
fx.New(
fx.Provide(
func() (config.Config, error) {
cfg := cmd.String("config")
if cfg == "" {
cfg = "analytics/configs/config.yaml"
}
return config.New(cfg)
},
),
handlers.Module,
clickhouse.Module,
fx.Invoke(
func(aggregate *handlers.Aggregate) {
if err := aggregate.Run(ctx, cmd); err != nil {
panic(err)
}
os.Exit(0)
},
),
).Run()
return nil
},
},
},
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
panic(err)
}
}
package clickhouse
import (
"fmt"
"time"
"github.com/uptrace/go-clickhouse/ch"
)
type Clickhouse struct {
DB *ch.DB
}
func New(config Config) (*Clickhouse, error) {
db := ch.Connect(
ch.WithDSN(fmt.Sprintf("clickhouse://%s:%s/%s?sslmode=disable", config.Host, config.Port, config.Database)),
// ch.WithInsecure(true),
// ch.WithTLSConfig(&tls.Config{InsecureSkipVerify: true}),
ch.WithUser(config.User),
ch.WithPassword(config.Password),
ch.WithTimeout(30*time.Second),
ch.WithDialTimeout(30*time.Second),
// ch.WithReadTimeout(5*time.Second),
// ch.WithWriteTimeout(5*time.Second),
ch.WithQuerySettings(map[string]interface{}{
"prefer_column_name_to_alias": 1,
}),
)
return &Clickhouse{
DB: db,
}, nil
}
func (c *Clickhouse) Close() error {
err := c.DB.Close()
if err != nil {
return err
}
return nil
}
package config
import (
"os"
"go.uber.org/config"
"go.uber.org/fx"
"go.ads.coffee/platform/analytics/internal/clickhouse"
)
type Config struct {
fx.Out
Clickhouse clickhouse.Config `yaml:"clickhouse"`
}
func New(file string) (Config, error) {
provider, err := config.NewYAML(
config.Expand(os.LookupEnv),
config.File(file),
config.Permissive(),
)
if err != nil {
return Config{}, err
}
cfg := Config{}
err = provider.Get("").Populate(&cfg)
if err != nil {
return Config{}, err
}
return cfg, nil
}
package handlers
import (
"context"
"fmt"
"log"
"time"
"github.com/urfave/cli/v3"
"go.ads.coffee/platform/analytics/internal/clickhouse"
)
type Aggregate struct {
click *clickhouse.Clickhouse
}
func NewAggregate(click *clickhouse.Clickhouse) *Aggregate {
return &Aggregate{
click: click,
}
}
func (a *Aggregate) Run(ctx context.Context, cmd *cli.Command) error {
table := cmd.String("table")
now := time.Now()
endTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
startTime := endTime.Add(-1 * time.Hour)
log.Printf("aggregating data from %s to %s", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
// SQL запрос для агрегации данных
query := `
INSERT INTO analytics.` + table + `_hour
SELECT
action,
toDateTime(formatDateTime(timestamp, '%Y-%m-%d %H:00:00')) as tt,
banner_id,
group_id,
campaign_id,
advertiser_id,
city,
country,
region,
sum( multiIf(price > 0, price/1000, 0 )) as price,
count(*) as count,
network,
bundle
FROM analytics.` + table + `
WHERE timestamp >= ? AND timestamp < ?
GROUP BY
action,
toDateTime(formatDateTime(timestamp, '%Y-%m-%d %H:00:00')),
banner_id,
group_id,
campaign_id,
advertiser_id,
city,
country,
region,
network,
bundle
`
log.Printf("query: %s", query)
_, err := a.click.DB.ExecContext(ctx, query, startTime, endTime)
if err != nil {
return fmt.Errorf("failed to execute aggregation query: %v", err)
}
return nil
}
package circuitbreaker
import "context"
type Circuit interface {
Go(
ctx context.Context,
runFunc func(context.Context) error,
fallbackFunc func(context.Context, error) error,
) error
Run(ctx context.Context, runFunc func(context.Context) error) error
Execute(
ctx context.Context,
runFunc func(context.Context) error,
fallbackFunc func(context.Context, error) error,
) error
}
type noopCircuit struct{}
func (c *noopCircuit) Go(
ctx context.Context,
runFunc func(context.Context) error,
_ func(context.Context, error) error,
) error {
return runFunc(ctx)
}
func (c *noopCircuit) Run(ctx context.Context, runFunc func(context.Context) error) error {
return runFunc(ctx)
}
func (c *noopCircuit) Execute(
ctx context.Context,
runFunc func(context.Context) error,
_ func(context.Context, error) error,
) error {
return runFunc(ctx)
}
package circuitbreaker
import (
"context"
"errors"
"time"
"github.com/cep21/circuit/v4"
"go.uber.org/zap"
)
const (
defaultTimeout = 1 * time.Second
defaultMaxConcurrentRequests = -1 // unlimited
defaultHystrixSleepWindow = 5 * time.Second
defaultHystrixHalfOpenAttempts = 1
defaultHystrixRequiredConcurrentSuccessful = 1
defaultHystrixRequestVolumeThreshold = 20
defaultHystrixErrorThresholdPercentage = 50
defaultHystrixNumBuckets = 10
defaultHystrixRollingDuration = 10 * time.Second
)
type HystrixConfig struct {
// Closer
SleepWindow time.Duration `yaml:"sleep_window"`
HalfOpenAttempts int64 `yaml:"half_open_attempts"`
RequiredConcurrentSuccessful int64 `yaml:"required_concurrent_successful"`
// Opener
ErrorThresholdPercentage int64 `yaml:"error_threshold_percentage"`
RequestVolumeThreshold int64 `yaml:"request_volume_threshold"`
RollingDuration time.Duration `yaml:"rolling_duration"`
NumBuckets int `yaml:"num_buckets"`
}
type Config struct {
Enabled bool `yaml:"enabled"`
Timeout time.Duration `yaml:"timeout"`
MaxConcurrentRequests int64 `yaml:"max_concurrent_requests"`
ContextDeadlineIsAnError bool `yaml:"context_deadline_is_an_error"`
Hystrix HystrixConfig `yaml:"hystrix"`
}
func NewConfig() *Config {
return &Config{
Enabled: true,
Timeout: defaultTimeout,
MaxConcurrentRequests: defaultMaxConcurrentRequests,
ContextDeadlineIsAnError: false,
Hystrix: HystrixConfig{
SleepWindow: defaultHystrixSleepWindow,
HalfOpenAttempts: defaultHystrixHalfOpenAttempts,
RequiredConcurrentSuccessful: defaultHystrixRequiredConcurrentSuccessful,
ErrorThresholdPercentage: defaultHystrixErrorThresholdPercentage,
RequestVolumeThreshold: defaultHystrixRequestVolumeThreshold,
RollingDuration: defaultHystrixRollingDuration,
NumBuckets: defaultHystrixNumBuckets,
},
}
}
func (c *Config) ToCircuitBreakerConfig(
logger *zap.Logger,
name string,
metrics *Metrics,
) circuit.Config {
var (
ignoreInterrupts bool
isErrInterrupt func(err error) bool
)
if c.ContextDeadlineIsAnError {
ignoreInterrupts = true
isErrInterrupt = func(err error) bool {
return !errors.Is(err, context.DeadlineExceeded)
}
}
return circuit.Config{
General: circuit.GeneralConfig{
Disabled: !c.Enabled,
GoLostErrors: func(err error, pan interface{}) {
switch {
case err != nil:
logger.Error("lost error", zap.Error(err))
case pan != nil:
logger.Error("lost panic", zap.Any("panic", pan))
}
},
},
Execution: circuit.ExecutionConfig{
Timeout: c.Timeout,
MaxConcurrentRequests: c.MaxConcurrentRequests,
IgnoreInterrupts: ignoreInterrupts,
IsErrInterrupt: isErrInterrupt,
},
Fallback: circuit.FallbackConfig{
Disabled: false,
MaxConcurrentRequests: c.MaxConcurrentRequests,
},
Metrics: circuit.MetricsCollectors{
Run: []circuit.RunMetrics{
NewRunMetrics(metrics, name),
},
Fallback: []circuit.FallbackMetrics{
NewFallbackMetrics(metrics, name),
},
Circuit: []circuit.Metrics{
NewCircuitMetrics(metrics, name),
},
},
}
}
package circuitbreaker
import (
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/telemetry"
)
var Module = fx.Module(
"circuitbreaker",
fx.Provide(
NewPool,
NewMetrics,
adapterTelemetry,
),
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("circuitbreaker")
}),
)
func adapterTelemetry(t *telemetry.Telemetry) Telemetry { //nolint: ireturn
return t
}
package circuitbreaker
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.ads.coffee/platform/pkg/telemetry"
)
type MetricsType string
const (
MetricsTypeSuccess MetricsType = "success"
MetricsTypeErrFailure MetricsType = "failure"
MetricsTypeErrTimeout MetricsType = "timeout"
MetricsTypeErrBadRequest MetricsType = "bad_request"
MetricsTypeErrInterrupt MetricsType = "interrupt"
MetricsTypeErrConcurrencyLimitReject MetricsType = "concurrency_limit_reject"
MetricsTypeErrShortCircuit MetricsType = "short_circuit"
MetricsTypeOpened MetricsType = "opened"
MetricsTypeClosed MetricsType = "closed"
)
type Telemetry interface {
Register(collectors ...prometheus.Collector) error
}
type Metrics struct {
total *prometheus.CounterVec
duration *prometheus.HistogramVec
state *prometheus.CounterVec
}
func NewMetrics(tel Telemetry) (*Metrics, error) {
const subsystem = "circuit_breaker"
labelNames := []string{"type", "name", "fallback"}
stateLabelNames := []string{"type", "name"}
total := prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "calls_total",
Help: "total number of circuit breaker calls.",
},
labelNames,
)
duration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: "call_duration_seconds",
Help: "circuit breaker call latencies in seconds.",
Buckets: telemetry.DefaultHistogramBuckets,
},
labelNames,
)
state := prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "state_total",
Help: "circuit breaker state",
},
stateLabelNames,
)
if err := tel.Register(
total,
duration,
state,
); err != nil {
return nil, err
}
return &Metrics{
total: total,
duration: duration,
state: state,
}, nil
}
func (m *Metrics) Success(
_ context.Context,
_ time.Time,
duration time.Duration,
fallback bool,
name string,
) {
var vals []string
if fallback {
vals = []string{string(MetricsTypeSuccess), name, "true"}
} else {
vals = []string{string(MetricsTypeSuccess), name, "false"}
}
m.total.WithLabelValues(vals...).Inc()
m.duration.WithLabelValues(vals...).Observe(duration.Seconds())
}
func (m *Metrics) ErrFailure(
_ context.Context,
_ time.Time,
duration time.Duration,
fallback bool,
name string,
) {
var vals []string
if fallback {
vals = []string{string(MetricsTypeErrFailure), name, "true"}
} else {
vals = []string{string(MetricsTypeErrFailure), name, "false"}
}
m.total.WithLabelValues(vals...).Inc()
m.duration.WithLabelValues(vals...).Observe(duration.Seconds())
}
func (m *Metrics) ErrTimeout(
_ context.Context,
_ time.Time,
duration time.Duration,
name string,
) {
vals := []string{string(MetricsTypeErrTimeout), name, "false"}
m.total.WithLabelValues(vals...).Inc()
m.duration.WithLabelValues(vals...).Observe(duration.Seconds())
}
func (m *Metrics) ErrBadRequest(
_ context.Context,
_ time.Time,
duration time.Duration,
name string,
) {
vals := []string{string(MetricsTypeErrBadRequest), name, "false"}
m.total.WithLabelValues(vals...).Inc()
m.duration.WithLabelValues(vals...).Observe(duration.Seconds())
}
func (m *Metrics) ErrInterrupt(
_ context.Context,
_ time.Time,
duration time.Duration,
name string,
) {
vals := []string{string(MetricsTypeErrInterrupt), name, "false"}
m.total.WithLabelValues(vals...).Inc()
m.duration.WithLabelValues(vals...).Observe(duration.Seconds())
}
func (m *Metrics) ErrConcurrencyLimitReject(
_ context.Context,
_ time.Time,
fallback bool,
name string,
) {
var vals []string
if fallback {
vals = []string{string(MetricsTypeErrConcurrencyLimitReject), name, "true"}
} else {
vals = []string{string(MetricsTypeErrConcurrencyLimitReject), name, "false"}
}
m.total.WithLabelValues(vals...).Inc()
}
func (m *Metrics) ErrShortCircuit(
_ context.Context,
_ time.Time,
name string,
) {
vals := []string{string(MetricsTypeErrShortCircuit), name, "false"}
m.total.WithLabelValues(vals...).Inc()
}
func (m *Metrics) Opened(
_ context.Context,
_ time.Time,
name string,
) {
m.state.WithLabelValues(string(MetricsTypeOpened), name).Add(1)
}
func (m *Metrics) Closed(
_ context.Context,
_ time.Time,
name string,
) {
m.state.WithLabelValues(string(MetricsTypeClosed), name).Add(1)
}
type RunMetrics struct {
metrics *Metrics
name string
}
func NewRunMetrics(metrics *Metrics, name string) RunMetrics {
return RunMetrics{
metrics: metrics,
name: name,
}
}
func (m RunMetrics) Success(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.Success(ctx, now, duration, false, m.name)
}
func (m RunMetrics) ErrFailure(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.ErrFailure(ctx, now, duration, false, m.name)
}
func (m RunMetrics) ErrTimeout(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.ErrTimeout(ctx, now, duration, m.name)
}
func (m RunMetrics) ErrBadRequest(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.ErrBadRequest(ctx, now, duration, m.name)
}
func (m RunMetrics) ErrInterrupt(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.ErrInterrupt(ctx, now, duration, m.name)
}
func (m RunMetrics) ErrConcurrencyLimitReject(
ctx context.Context,
now time.Time,
) {
m.metrics.ErrConcurrencyLimitReject(ctx, now, false, m.name)
}
func (m RunMetrics) ErrShortCircuit(
ctx context.Context,
now time.Time,
) {
m.metrics.ErrShortCircuit(ctx, now, m.name)
}
type FallbackMetrics struct {
metrics *Metrics
name string
}
func NewFallbackMetrics(metrics *Metrics, name string) FallbackMetrics {
return FallbackMetrics{
metrics: metrics,
name: name,
}
}
func (m FallbackMetrics) Success(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.Success(ctx, now, duration, true, m.name)
}
func (m FallbackMetrics) ErrFailure(
ctx context.Context,
now time.Time,
duration time.Duration,
) {
m.metrics.ErrFailure(ctx, now, duration, true, m.name)
}
func (m FallbackMetrics) ErrConcurrencyLimitReject(
ctx context.Context,
now time.Time,
) {
m.metrics.ErrConcurrencyLimitReject(ctx, now, true, m.name)
}
type CircuitMetrics struct {
metrics *Metrics
name string
}
func NewCircuitMetrics(metrics *Metrics, name string) CircuitMetrics {
return CircuitMetrics{
metrics: metrics,
name: name,
}
}
func (m CircuitMetrics) Opened(
ctx context.Context,
now time.Time,
) {
m.metrics.Opened(ctx, now, m.name)
}
func (m CircuitMetrics) Closed(
ctx context.Context,
now time.Time,
) {
m.metrics.Closed(ctx, now, m.name)
}
package circuitbreaker
import (
"github.com/cep21/circuit/v4"
"github.com/cep21/circuit/v4/closers/hystrix"
"go.uber.org/zap"
)
type Pool struct {
logger *zap.Logger
manager *circuit.Manager
}
func NewPool(logger *zap.Logger, metrics *Metrics, configs map[string]*Config) (*Pool, error) {
hystrixConf := hystrix.Factory{
CreateConfigureCloser: []func(circuitName string) hystrix.ConfigureCloser{
func(circuitName string) hystrix.ConfigureCloser {
c, ok := configs[circuitName]
if !ok {
return hystrix.ConfigureCloser{}
}
return hystrix.ConfigureCloser{
SleepWindow: c.Hystrix.SleepWindow,
HalfOpenAttempts: c.Hystrix.HalfOpenAttempts,
RequiredConcurrentSuccessful: c.Hystrix.RequiredConcurrentSuccessful,
}
},
},
CreateConfigureOpener: []func(circuitName string) hystrix.ConfigureOpener{
func(circuitName string) hystrix.ConfigureOpener {
c, ok := configs[circuitName]
if !ok {
return hystrix.ConfigureOpener{}
}
return hystrix.ConfigureOpener{
ErrorThresholdPercentage: c.Hystrix.ErrorThresholdPercentage,
RequestVolumeThreshold: c.Hystrix.RequestVolumeThreshold,
RollingDuration: c.Hystrix.RollingDuration,
NumBuckets: c.Hystrix.NumBuckets,
}
},
},
}
manager := &circuit.Manager{
DefaultCircuitProperties: []circuit.CommandPropertiesConstructor{
hystrixConf.Configure,
func(circuitName string) circuit.Config {
c, ok := configs[circuitName]
if !ok {
return circuit.Config{}
}
return c.ToCircuitBreakerConfig(logger.Named(circuitName), circuitName, metrics)
},
},
}
for name := range configs {
if _, err := manager.CreateCircuit(name); err != nil {
return nil, err
}
}
return &Pool{
logger: logger,
manager: manager,
}, nil
}
func (p *Pool) Get(name string) Circuit {
if p == nil {
return &noopCircuit{}
}
c := p.manager.GetCircuit(name)
if c == nil {
return &noopCircuit{}
}
return c
}
package clock
import "time"
type Clock struct {
}
func New() *Clock {
return &Clock{}
}
func (c *Clock) Now() time.Time {
return time.Now()
}
package database
import "fmt"
type Config struct {
Debug bool `yaml:"debug"`
User string `yaml:"user"`
Password string `yaml:"password"`
Dbname string `yaml:"dbname"`
Host string `yaml:"host"`
Port string `yaml:"port"`
}
func (c Config) Connection() string {
return fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable host=%s port=%s",
c.User,
c.Password,
c.Dbname,
c.Host,
c.Port,
)
}
package database
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func New(config Config) *gorm.DB {
connection := config.Connection()
// Create database connection
db, err := gorm.Open(postgres.Open(connection))
if err != nil {
panic(err)
}
// Set db log level
if config.Debug {
db.Logger = db.Logger.LogMode(logger.Info)
} else {
db.Logger = db.Logger.LogMode(logger.Warn)
}
return db
}
package geoip
import (
"bytes"
"embed"
"io"
"sync"
)
type EmbeddedFile struct {
data []byte
reader *bytes.Reader
mu sync.RWMutex
closed bool
}
func NewEmbeddedFile(fs embed.FS, path string) (*EmbeddedFile, error) {
data, err := fs.ReadFile(path)
if err != nil {
return nil, err
}
return &EmbeddedFile{
data: data,
reader: bytes.NewReader(data),
}, nil
}
func (f *EmbeddedFile) Read(p []byte) (n int, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.closed {
return 0, io.ErrClosedPipe
}
return f.reader.Read(p)
}
func (f *EmbeddedFile) ReadAt(p []byte, off int64) (n int, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.closed {
return 0, io.ErrClosedPipe
}
return f.reader.ReadAt(p, off)
}
func (f *EmbeddedFile) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.closed {
return io.ErrClosedPipe
}
f.closed = true
f.data = nil
f.reader = nil
return nil
}
func (f *EmbeddedFile) Seek(offset int64, whence int) (int64, error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.closed {
return 0, io.ErrClosedPipe
}
return f.reader.Seek(offset, whence)
}
package geoip
import (
"github.com/ip2location/ip2location-go/v9"
)
type Geoip struct {
DBv4 *ip2location.DB
DBv6 *ip2location.DB
}
func New() (*Geoip, error) {
filev4, err := NewEmbeddedFile(location, "IP2LOCATION-LITE-DB3.BIN")
if err != nil {
panic(err)
}
dbv4, err := ip2location.OpenDBWithReader(filev4)
if err != nil {
return nil, err
}
filev6, err := NewEmbeddedFile(location, "IP2LOCATION-LITE-DB3.IPV6.BIN")
if err != nil {
panic(err)
}
dbv6, err := ip2location.OpenDBWithReader(filev6)
if err != nil {
return nil, err
}
return &Geoip{
DBv4: dbv4,
DBv6: dbv6,
}, nil
}
package health
import (
"context"
"fmt"
"time"
)
type ComponentKind uint8
const (
ComponentKindApp ComponentKind = 1 << iota
ComponentKindLocal
ComponentKindExternal
ComponentKindAll = ComponentKindApp | ComponentKindLocal | ComponentKindExternal
)
func (k *ComponentKind) UnmarshalYAML(unmarshal func(interface{}) error) error {
var kindStr string
if err := unmarshal(&kindStr); err != nil {
return err
}
switch kindStr {
case "app":
*k = ComponentKindApp
case "local":
*k = ComponentKindLocal
case "external":
*k = ComponentKindExternal
default:
return fmt.Errorf("invalid component kind: %s", kindStr)
}
return nil
}
type CheckFunc func(ctx context.Context) error
type ComponentProvider interface {
HealthComponents() []*Component
}
type Component struct {
Kind ComponentKind
Name string
CheckFunc CheckFunc
CheckErr error
CheckDuration time.Duration
StaticDetails map[Detail]any
}
func (c *Component) Check(ctx context.Context) {
timeStamp := time.Now()
c.CheckErr = c.CheckFunc(ctx)
c.CheckDuration = time.Since(timeStamp)
}
package health
import (
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"health",
fx.Provide(
NewHealth,
),
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("health")
}),
)
type Params struct {
fx.In
Logger *zap.Logger
Config Config
Components []*Component `group:"health_component"`
ComponentProviders []ComponentProvider `group:"health_component_provider"`
}
package health
import (
"encoding/json"
"net/http"
)
func (h *Health) HandlerExternal() http.HandlerFunc {
return h.typeHandler(ComponentKindExternal)
}
func (h *Health) Handler() http.HandlerFunc {
return h.typeHandler(ComponentKindApp | ComponentKindLocal)
}
func (h *Health) typeHandler(
kind ComponentKind,
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
h.Check(r.Context(), kind)
state := NewState(h, kind)
if state.Status != StatusUp {
w.WriteHeader(http.StatusServiceUnavailable)
}
st, err := json.Marshal(state)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
st = []byte(err.Error())
}
if _, err := w.Write(st); err != nil {
return
}
}
}
package health
import (
"context"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
)
const (
AppComponentName = "application"
)
type CheckCallback func(component *Component, err error, duration time.Duration)
type Health struct {
logger *zap.Logger
config Config
isReady atomic.Value // global readiness flag, initial is false
components []*Component
componentProviders []ComponentProvider
m sync.RWMutex
}
func NewHealth(p Params) *Health {
h := &Health{
logger: p.Logger,
config: p.Config,
components: []*Component{},
}
h.isReady.Store(false)
h.components = append(h.components, &Component{
Kind: ComponentKindApp,
Name: AppComponentName,
CheckFunc: func(context.Context) error {
// nolint:errcheck
if isReady, _ := h.isReady.Load().(bool); !isReady {
return ErrApplicationIsNotReady
}
return nil
},
})
h.components = append(h.components, p.Components...)
h.componentProviders = p.ComponentProviders
return h
}
func (h *Health) Config() Config {
return h.config
}
func (h *Health) SetReady(ready bool) {
h.isReady.Store(ready)
}
func (h *Health) Iter(requestedKind ComponentKind, callback func(*Component)) {
if callback == nil {
return
}
h.m.RLock()
defer h.m.RUnlock()
for _, component := range h.components {
if component.Kind&requestedKind == 0 {
continue
}
callback(component)
}
for _, provider := range h.componentProviders {
for _, component := range provider.HealthComponents() {
if component.Kind&requestedKind == 0 {
continue
}
callback(component)
}
}
}
func (h *Health) Check(ctx context.Context, requestedKind ComponentKind) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second) // 3 seconds for check
defer cancel()
h.m.Lock()
defer h.m.Unlock()
for _, component := range h.components {
if component.Kind&requestedKind == 0 {
continue
}
component.Check(ctx)
if component.CheckErr != nil {
h.logger.Error(
"health check failed",
zap.String("component", component.Name),
zap.Error(component.CheckErr),
)
}
}
for _, provider := range h.componentProviders {
for _, component := range provider.HealthComponents() {
if component.Kind&requestedKind == 0 {
continue
}
component.Check(ctx)
if component.CheckErr != nil {
h.logger.Error(
"health check failed",
zap.String("component", component.Name),
zap.Error(component.CheckErr),
)
}
}
}
}
package health
import (
"math"
)
type Detail string
const (
HostnameDetail Detail = "hostname"
VersionDetail Detail = "version"
TimeDetail Detail = "time"
MessageDetail Detail = "message"
)
type ComponentState struct {
Status Status `json:"status"`
Details map[Detail]any `json:"details,omitempty"`
}
type State struct {
Status Status `json:"status"`
Components map[string]ComponentState `json:"components"`
}
func NewState(h *Health, requestedKind ComponentKind) State {
state := State{
Status: StatusUp,
Components: make(map[string]ComponentState, len(h.components)),
}
h.Iter(requestedKind, func(c *Component) {
var status Status
switch c.CheckErr {
case nil:
status = StatusUp
default:
status = StatusDown
}
details := make(map[Detail]any)
if c.Kind == ComponentKindApp {
cfg := h.Config()
details[HostnameDetail] = cfg.Hostname
details[VersionDetail] = cfg.Version
}
elapsedSeconds := c.CheckDuration.Seconds()
// round to 4 decimal placess
details[TimeDetail] = math.Round(elapsedSeconds*10000) / 10000
if c.CheckErr != nil {
details[MessageDetail] = c.CheckErr.Error()
}
componentState := ComponentState{
Status: status,
Details: details,
}
if componentState.Status == StatusDown {
state.Status = StatusDown
}
state.Components[c.Name] = componentState
})
return state
}
package kafkapool
import (
"strings"
"time"
"github.com/cenkalti/backoff/v4"
)
type SASLMechanism string
const (
SASLMechanismNone SASLMechanism = ""
SASLMechanismPlain SASLMechanism = "PLAIN"
SASLMechanismScramSHA256 SASLMechanism = "SCRAM-SHA-256"
SASLMechanismScramSHA512 SASLMechanism = "SCRAM-SHA-512"
)
type Config struct {
Enabled bool `yaml:"enabled"`
Prefix string `yaml:"prefix"`
Seeds []string `yaml:"seeds"`
SASLMechanism SASLMechanism `yaml:"sasl_mechanism"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Certificate string `yaml:"certificate"`
TLS bool `yaml:"tls"`
AllowAutoTopicCreation bool `yaml:"allow_auto_topic_creation"`
Producer ConfigProducer `yaml:"producer"`
Consumer ConfigConsumer `yaml:"consumer"`
}
type ConfigProducer struct {
DisableIdempotentWrite bool `yaml:"disable_idempotent_write"`
RequestTimeout time.Duration `yaml:"request_timeout"`
RecordRetries int `yaml:"record_retries"`
}
type ConfigConsumer struct {
ConfigBackoff ConfigBackoff `yaml:"backoff"` //nolint:tagliatelle
}
type ConfigBackoff struct {
Enabled bool `yaml:"enabled"`
MaxRetries *uint64 `yaml:"max_retries"`
InitialInterval *time.Duration `yaml:"initial_interval"`
MaxInterval *time.Duration `yaml:"max_interval"`
MaxElapsedTime *time.Duration `yaml:"max_elapsed_time"`
Multiplier *float64 `yaml:"multiplier"`
RandomizationFactor *float64 `yaml:"randomization_factor"`
}
func (c *Config) Prepare() *Config {
if len(c.Seeds) == 1 {
c.Seeds = strings.Split(c.Seeds[0], ",")
}
return c
}
func (c ConfigBackoff) GetBackOff() backoff.BackOff {
if !c.Enabled {
return &backoff.StopBackOff{}
}
var opts []backoff.ExponentialBackOffOpts
if c.InitialInterval != nil {
opts = append(opts, backoff.WithInitialInterval(*c.InitialInterval))
}
if c.MaxInterval != nil {
opts = append(opts, backoff.WithMaxInterval(*c.MaxInterval))
}
if c.MaxElapsedTime != nil {
opts = append(opts, backoff.WithMaxElapsedTime(*c.MaxElapsedTime))
}
if c.Multiplier != nil {
opts = append(opts, backoff.WithMultiplier(*c.Multiplier))
}
if c.RandomizationFactor != nil {
opts = append(opts, backoff.WithRandomizationFactor(*c.RandomizationFactor))
}
var bo backoff.BackOff = backoff.NewExponentialBackOff(opts...)
if c.MaxRetries != nil {
bo = backoff.WithMaxRetries(bo, *c.MaxRetries)
}
return bo
}
package kafkapool
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/chapsuk/wait"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kotel"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/health"
)
// ConsumerOpt allows customize consumer.
type ConsumerOpt func(c *Consumer)
func WithConsumeResetOffsetConsumerOpt(offset kgo.Offset) ConsumerOpt {
return func(c *Consumer) {
c.opts = append(c.opts, kgo.ConsumeResetOffset(offset))
}
}
func WithCustomErrorHandlerConsumerOpt(eh func(err error)) ConsumerOpt {
return func(c *Consumer) {
c.errHandler = eh
}
}
type Consumer struct {
kf *Kafka
logger *zap.Logger
opts []kgo.Opt
errHandler func(error)
client *kgo.Client
consumers map[string]tconsumer
wg wait.Group
}
type tconsumer struct {
mtrx MetricsProvider
kTracer *kotel.Tracer
logger *zap.Logger
ctx context.Context
contextCancel context.CancelFunc
rec chan *kgo.Record
backoff backoff.BackOff
}
func NewConsumer(kf *Kafka, opts ...ConsumerOpt) (*Consumer, error) {
c := &Consumer{
kf: kf,
logger: kf.logger.Named("consumer"),
opts: make([]kgo.Opt, 0, len(opts)),
consumers: make(map[string]tconsumer, 1),
}
c.errHandler = func(err error) {
c.logger.Warn("Kafka: consumer error", zap.Error(err))
}
for _, opt := range opts {
opt(c)
}
c.opts = append(c.opts, kf.opts...)
cl, err := kgo.NewClient(c.opts...)
if err != nil {
return nil, err
}
c.client = cl
return c, nil
}
func (c *Consumer) Subscribe(
topic string,
callback ConsumerCallback,
) {
topic = c.kf.formatWithPrefix(topic)
c.client.AddConsumeTopics(topic)
ctx, cancel := context.WithCancel(context.Background())
tc := tconsumer{
mtrx: c.kf.metrics,
kTracer: c.kf.kTracer,
logger: c.logger.Named("topic_consumer"),
rec: make(chan *kgo.Record),
ctx: ctx,
contextCancel: cancel,
backoff: c.kf.cfg.Consumer.ConfigBackoff.GetBackOff(),
}
c.wg.Add(func() {
tc.consume(topic, callback, c.errHandler)
})
c.consumers[topic] = tc
}
func (c *Consumer) Consume() {
c.wg.Add(func() {
for {
fetches := c.client.PollFetches(context.Background())
if fetches.IsClientClosed() {
c.logger.Info("stop consumer")
return
}
fetches.EachError(func(t string, p int32, err error) {
c.errHandler(fmt.Errorf("topic: %s, partition: %d error: %w", t, p, err))
})
fetches.EachTopic(c.consumeTopic)
}
})
}
func (c *Consumer) Ping(ctx context.Context) error {
if err := c.client.Ping(ctx); err != nil {
return fmt.Errorf("failed to ping Kafka: %w", err)
}
return nil
}
func (c *Consumer) consumeTopic(t kgo.FetchTopic) {
tconsumers, ok := c.consumers[t.Topic]
if !ok {
return
}
t.EachRecord(func(r *kgo.Record) {
tconsumers.rec <- r
})
}
func (c *Consumer) Close() {
c.client.Close()
for _, tc := range c.consumers {
tc.stop()
}
c.wg.Wait()
}
func (c *Consumer) HealthComponent() *health.Component {
return &health.Component{
Kind: health.ComponentKindExternal,
Name: "kafka_consumer",
CheckFunc: func(ctx context.Context) error {
return c.Ping(ctx)
},
}
}
func (tc *tconsumer) consume(
topic string,
callback ConsumerCallback,
errHandler func(err error),
) {
tc.logger.Info("start consume", zap.String("topic", topic))
for {
select {
case <-tc.ctx.Done():
tc.logger.Info("stop consuming", zap.String("topic", topic))
return
case rec := <-tc.rec:
start := time.Now()
ctx, span := tc.kTracer.WithProcessSpan(rec)
tryNum := 0
err := backoff.RetryNotify(
func() error {
tryNum++
return callback(ctx, convertRecordToPayload(rec))
},
backoff.WithContext(tc.backoff, ctx),
func(err error, duration time.Duration) {
tc.logger.Warn(
"consumer call callback, retrying",
zap.String("topic", topic),
zap.Int("try_num", tryNum),
zap.Duration("retry_delay", duration),
zap.Error(err),
)
},
)
tc.mtrx.ConsumerHandleDuration(topic, err, time.Since(start).Seconds())
if err != nil {
span.RecordError(err)
errHandler(err)
}
span.End()
}
}
}
func (tc *tconsumer) stop() {
tc.contextCancel()
}
package kafkapool
import (
"context"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/chapsuk/wait"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kotel"
"go.uber.org/zap"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.ads.coffee/platform/pkg/health"
)
// ConsumerGroupOpt allows customize consumer group.
type ConsumerGroupOpt func(cg *ConsumerGroup)
func WithCustomErrorHandlerConsumerGroupOpt(eh func(err error)) ConsumerGroupOpt {
return func(cg *ConsumerGroup) {
cg.errHandler = eh
}
}
func WithNotAckOnErrorConsumerGroupOpt() ConsumerGroupOpt {
return func(cg *ConsumerGroup) {
cg.ackOnError = false
}
}
type tp struct {
t string
p int32
}
type pconsumer struct {
group string
topic string
partition int32
logger *zap.Logger
mtrx MetricsProvider
kTracer *kotel.Tracer
ctx context.Context
contextCancel context.CancelFunc
done chan struct{}
recs chan kgo.FetchTopicPartition
ackOnError bool
callback ConsumerCallback
markRecordsFunc func(...*kgo.Record)
errHandler func(error)
backoff backoff.BackOff
}
type ConsumerGroup struct {
kf *Kafka
logger *zap.Logger
group string
opts []kgo.Opt
ackOnError bool
errHandler func(error)
client *kgo.Client
handlers map[string]ConsumerCallback
consumers map[tp]*pconsumer
wg wait.Group
}
func NewConsumerGroup(
kf *Kafka,
group string,
opts ...ConsumerGroupOpt,
) (*ConsumerGroup, error) {
group = kf.formatWithPrefix(group)
cg := &ConsumerGroup{
kf: kf,
logger: kf.logger.Named("consumer_group").With(zap.String("group", group)),
group: group,
opts: make([]kgo.Opt, 0, len(opts)+len(kf.opts)+6),
ackOnError: true, // ack if was error
handlers: make(map[string]ConsumerCallback, 1),
consumers: make(map[tp]*pconsumer),
wg: wait.Group{},
}
cg.errHandler = func(err error) {
cg.logger.Warn("Kafka: consumer error", zap.Error(err))
}
for _, opt := range opts {
opt(cg)
}
mainOpts := []kgo.Opt{
kgo.ConsumerGroup(group),
kgo.OnPartitionsAssigned(cg.assigned),
kgo.OnPartitionsRevoked(cg.revoked),
kgo.OnPartitionsLost(cg.lost),
kgo.AutoCommitMarks(),
kgo.BlockRebalanceOnPoll(),
}
cg.opts = append(cg.opts, kf.opts...)
cg.opts = append(cg.opts, mainOpts...)
cl, err := kgo.NewClient(cg.opts...)
if err != nil {
return nil, err
}
cg.client = cl
return cg, nil
}
func (cg *ConsumerGroup) Subscribe(
topic string,
callback ConsumerCallback,
) {
topic = cg.kf.formatWithPrefix(topic)
cg.handlers[topic] = callback
cg.client.AddConsumeTopics(topic)
}
func (cg *ConsumerGroup) Close() {
cg.client.Close()
cg.wg.Wait()
}
func (cg *ConsumerGroup) Consume(maxPollRecords int) {
cg.wg.Add(func() {
for {
fetches := cg.client.PollRecords(context.Background(), maxPollRecords)
if fetches.IsClientClosed() {
cg.logger.Info("stop consumer group")
return
}
fetches.EachError(func(t string, p int32, err error) {
cg.errHandler(fmt.Errorf("topic: %s, partition: %d error: %w", t, p, err))
})
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
cg.client.PauseFetchPartitions(map[string][]int32{
p.Topic: {p.Partition},
})
go func() {
defer cg.client.ResumeFetchPartitions(map[string][]int32{
p.Topic: {p.Partition},
})
tpKey := tp{p.Topic, p.Partition}
cons, ok := cg.consumers[tpKey]
if !ok {
cg.errHandler(
fmt.Errorf(
"unknown topic partition in consumer_group: %s:%d",
p.Topic,
p.Partition,
),
)
return
}
cons.recs <- p
}()
})
cg.client.AllowRebalance()
}
})
}
func (cg *ConsumerGroup) Ping(ctx context.Context) error {
if err := cg.client.Ping(ctx); err != nil {
return fmt.Errorf("failed to ping Kafka: %w", err)
}
return nil
}
func (cg *ConsumerGroup) HealthComponent() *health.Component {
name := fmt.Sprintf("kafka_consumer-group-%s", cg.group)
return &health.Component{
Kind: health.ComponentKindExternal,
Name: name,
CheckFunc: func(ctx context.Context) error {
return cg.Ping(ctx)
},
}
}
func (cg *ConsumerGroup) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned {
for _, partition := range partitions {
ctx, cancel := context.WithCancel(context.Background())
pc := &pconsumer{
logger: cg.logger.Named("topic_partition_consumer"),
group: cg.group,
topic: topic,
partition: partition,
ackOnError: cg.ackOnError,
mtrx: cg.kf.metrics,
kTracer: cg.kf.kTracer,
callback: cg.handlers[topic],
ctx: ctx,
contextCancel: cancel,
done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition),
markRecordsFunc: func(r ...*kgo.Record) {
cl.MarkCommitRecords(r...)
},
errHandler: cg.errHandler,
backoff: cg.kf.cfg.Consumer.ConfigBackoff.GetBackOff(),
}
cg.consumers[tp{topic, partition}] = pc
cg.wg.Add(pc.consume)
}
}
}
func (cg *ConsumerGroup) revoked(ctx context.Context, cl *kgo.Client, revoked map[string][]int32) {
cg.killConsumers(revoked)
if err := cl.CommitMarkedOffsets(ctx); err != nil {
cg.logger.Warn("revoke commit, failed", zap.Error(err))
}
}
func (cg *ConsumerGroup) lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) {
cg.killConsumers(lost)
}
func (cg *ConsumerGroup) killConsumers(lost map[string][]int32) {
var wg sync.WaitGroup
defer wg.Wait()
for topic, partitions := range lost {
for _, partition := range partitions {
tpKey := tp{topic, partition}
pc := cg.consumers[tpKey]
delete(cg.consumers, tpKey)
pc.contextCancel()
pc.logger.Info(
"waiting for work to finish",
zap.String("topic", topic),
zap.Int32("partition", partition),
)
wg.Add(1)
go func() { <-pc.done; wg.Done() }()
}
}
}
func (pc *pconsumer) consume() {
defer close(pc.done)
pc.logger.Info("starting", zap.String("topic", pc.topic), zap.Int32("partition", pc.partition))
defer pc.logger.Info("stop", zap.String("topic", pc.topic), zap.Int32("partition", pc.partition))
for {
select {
case <-pc.ctx.Done():
return
case p := <-pc.recs:
for _, r := range p.Records {
select {
case <-pc.ctx.Done():
return
default:
}
start := time.Now()
ctx, span := pc.kTracer.WithProcessSpan(r)
span.SetAttributes(semconv.MessagingKafkaConsumerGroupKey.String(pc.group))
var err error
tryNum := 0
err = backoff.RetryNotify(
func() error {
tryNum++
return pc.callback(ctx, convertRecordToPayload(r))
},
backoff.WithContext(pc.backoff, pc.ctx),
func(err error, duration time.Duration) {
pc.logger.Warn(
"consumer call callback, retrying",
zap.String("topic", pc.topic),
zap.Int32("partition", pc.partition),
zap.Int("try_num", tryNum),
zap.Duration("retry_delay", duration),
zap.Error(err),
)
},
)
pc.mtrx.ConsumerGroupHandleDuration(pc.group, pc.topic, err, time.Since(start).Seconds())
if err != nil {
pc.errHandler(err)
span.RecordError(err)
if pc.ackOnError {
pc.markRecordsFunc(r)
}
} else {
pc.markRecordsFunc(r)
}
span.End()
}
}
}
}
package kafkapool
import (
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/telemetry"
)
var (
Module = fx.Module(
"kafka-pool",
fx.Provide(
NewPool,
NewMetrics,
adapterTelemetry,
adapterTracer,
),
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("kafka-pool")
}),
)
)
func adapterTelemetry(t *telemetry.Telemetry) Telemetry { //nolint: ireturn
return t
}
func adapterTracer(t *telemetry.Telemetry) Tracer { //nolint: ireturn
return t
}
package kafkapool
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kotel"
"github.com/twmb/franz-go/plugin/kzap"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type Key []byte
type Payload []byte
type ConsumerCallback func(ctx context.Context, payload Payload) error
type Tracer interface {
StartSpan(
ctx context.Context,
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span)
TracerProvider() trace.TracerProvider
}
type MetricsProvider interface {
ProducerTotal(topic string, err error)
ConsumerHandleDuration(topic string, err error, seconds float64)
ConsumerGroupHandleDuration(group, topic string, err error, seconds float64)
}
// Kafka contains all things for default connection, metrics, and logger.
type Kafka struct {
logger *zap.Logger
cfg *Config
metrics MetricsProvider
tracer Tracer
kTracer *kotel.Tracer
opts []kgo.Opt
}
// nolint:cyclop
func NewKafka(
logger *zap.Logger,
cfg *Config,
metrics MetricsProvider,
tracer Tracer,
) (*Kafka, error) {
if cfg == nil {
return nil, fmt.Errorf("empty config")
}
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.Seeds...),
kgo.WithLogger(kzap.New(logger, kzap.Level(kgo.LogLevelWarn))),
}
logger.Info("kafka mechanism config", zap.String("mechanism", string(cfg.SASLMechanism)))
switch cfg.SASLMechanism {
case SASLMechanismNone:
// do nothing
case SASLMechanismPlain:
opts = append(opts, kgo.SASL(plain.Auth{
User: cfg.Username,
Pass: cfg.Password,
}.AsMechanism()))
case SASLMechanismScramSHA256:
opts = append(opts, kgo.SASL(scram.Auth{
User: cfg.Username,
Pass: cfg.Password,
}.AsSha256Mechanism()))
case SASLMechanismScramSHA512:
opts = append(opts, kgo.SASL(scram.Auth{
User: cfg.Username,
Pass: cfg.Password,
}.AsSha512Mechanism()))
default:
return nil, fmt.Errorf("unknown sasl mechanism: %s", cfg.SASLMechanism)
}
if cfg.TLS {
opts = append(opts, kgo.DialTLS())
}
if cfg.Certificate != "" {
caCert, err := os.ReadFile(cfg.Certificate)
if err != nil {
return nil, fmt.Errorf("read certificate: %w", err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
return nil, fmt.Errorf("append certs from pem")
}
opts = append(opts, kgo.DialTLSConfig(
//nolint: gosec
&tls.Config{
MinVersion: tls.VersionTLS10,
RootCAs: caCertPool,
}))
}
if cfg.AllowAutoTopicCreation {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
kTracer := kotel.NewTracer(
kotel.TracerProvider(tracer.TracerProvider()),
kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})),
)
kotelOps := []kotel.Opt{
kotel.WithTracer(kTracer),
}
kotelService := kotel.NewKotel(kotelOps...)
opts = append(opts, kgo.WithHooks(kotelService.Hooks()...))
return &Kafka{
logger: logger,
metrics: metrics,
tracer: tracer,
kTracer: kTracer,
cfg: cfg,
opts: opts,
}, nil
}
func (k *Kafka) Enabled() bool {
return k.cfg.Enabled
}
func (k *Kafka) formatWithPrefix(str string) string {
if k.cfg.Prefix != "" {
return fmt.Sprintf("%s_%s", k.cfg.Prefix, str)
}
return str
}
func (k *Kafka) producerOpts() []kgo.Opt {
res := make([]kgo.Opt, 0)
pCfg := k.cfg.Producer
if pCfg.DisableIdempotentWrite {
res = append(res, kgo.DisableIdempotentWrite())
}
if pCfg.RequestTimeout > 0 {
res = append(res, kgo.ProduceRequestTimeout(pCfg.RequestTimeout))
}
if pCfg.RecordRetries > 0 {
res = append(res, kgo.RecordRetries(pCfg.RecordRetries))
}
return res
}
func convertPayloadToRecord(
topic string,
key Key,
payload Payload,
) *kgo.Record {
return &kgo.Record{
Topic: topic,
Key: key,
Value: payload,
}
}
func convertPayloadsToRecords(
topic string,
payload ...Payload,
) []*kgo.Record {
res := make([]*kgo.Record, 0, len(payload))
for _, p := range payload {
res = append(res, convertPayloadToRecord(topic, nil, p))
}
return res
}
func convertRecordToPayload(r *kgo.Record) Payload {
return r.Value
}
package kafkapool
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"go.ads.coffee/platform/pkg/telemetry"
)
type Telemetry interface {
Register(collectors ...prometheus.Collector) error
}
type Metrics struct {
producerTotal *prometheus.CounterVec
consumerHandle *prometheus.HistogramVec
consumerGroupHandle *prometheus.HistogramVec
}
func NewMetrics(mr Telemetry) (*Metrics, error) {
producerTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "kafka",
Subsystem: "producer",
Name: "send_count_total",
Help: "Produced events count",
},
[]string{"topic", telemetry.ErrLabel},
)
consumerHandle := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "kafka",
Subsystem: "consumer",
Name: "duration_seconds",
Help: "Time elapsed to consume single messsage",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"topic", telemetry.ErrLabel},
)
consumerGroupHandle := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "kafka",
Subsystem: "consumer_group",
Name: "duration_seconds",
Help: "Time elapsed to consume single messsage",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"group", "topic", telemetry.ErrLabel},
)
err := mr.Register(producerTotal, consumerHandle, consumerGroupHandle)
if err != nil {
return nil, fmt.Errorf("register: %w", err)
}
return &Metrics{
producerTotal: producerTotal,
consumerHandle: consumerHandle,
consumerGroupHandle: consumerGroupHandle,
}, nil
}
func (m *Metrics) ProducerTotal(topic string, err error) {
m.producerTotal.WithLabelValues(topic, telemetry.ErrLabelValue(err)).Inc()
}
func (m *Metrics) ConsumerHandleDuration(topic string, err error, seconds float64) {
m.consumerHandle.WithLabelValues(topic, telemetry.ErrLabelValue(err)).Observe(seconds)
}
func (m *Metrics) ConsumerGroupHandleDuration(group, topic string, err error, seconds float64) {
m.consumerGroupHandle.WithLabelValues(group, topic, telemetry.ErrLabelValue(err)).Observe(seconds)
}
type NamedMetrics struct {
name string
producerTotal *prometheus.CounterVec
consumerHandle *prometheus.HistogramVec
consumerGroupHandle *prometheus.HistogramVec
}
func NewNamedMetrics(name string, mr Telemetry) (*NamedMetrics, error) {
producerTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "kafka_pool",
Subsystem: "producer",
Name: "send_count_total",
Help: "Produced events count",
},
[]string{"name", "topic", telemetry.ErrLabel},
)
consumerHandle := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "kafka_pool",
Subsystem: "consumer",
Name: "duration_seconds",
Help: "Time elapsed to consume single messsage",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"name", "topic", telemetry.ErrLabel},
)
consumerGroupHandle := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "kafka_pool",
Subsystem: "consumer_group",
Name: "duration_seconds",
Help: "Time elapsed to consume single messsage",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"name", "group", "topic", telemetry.ErrLabel},
)
err := mr.Register(producerTotal, consumerHandle, consumerGroupHandle)
if err != nil {
return nil, fmt.Errorf("register: %w", err)
}
return &NamedMetrics{
name: name,
producerTotal: producerTotal,
consumerHandle: consumerHandle,
consumerGroupHandle: consumerGroupHandle,
}, nil
}
func (m *NamedMetrics) ProducerTotal(topic string, err error) {
m.producerTotal.WithLabelValues(m.name, topic, telemetry.ErrLabelValue(err)).Inc()
}
func (m *NamedMetrics) ConsumerHandleDuration(topic string, err error, seconds float64) {
m.consumerHandle.WithLabelValues(m.name, topic, telemetry.ErrLabelValue(err)).Observe(seconds)
}
func (m *NamedMetrics) ConsumerGroupHandleDuration(
group, topic string,
err error,
seconds float64,
) {
m.consumerGroupHandle.
WithLabelValues(
m.name,
group,
topic,
telemetry.ErrLabelValue(err),
).Observe(seconds)
}
package kafkapool
import (
"errors"
"fmt"
"go.uber.org/zap"
)
var (
// ErrPoolNotFound happens when try to get non existing pool.
ErrPoolNotFound = errors.New("pool not found")
)
type Pool struct {
cfgs map[string]*Config
pools map[string]*Kafka
mtrx *Metrics
logger *zap.Logger
tracer Tracer
telemetry Telemetry
}
func NewPool(
logger *zap.Logger,
cfgs map[string]*Config,
mtrx *Metrics,
tracer Tracer,
telemetry Telemetry,
) (*Pool, error) {
p := &Pool{
cfgs: cfgs,
pools: make(map[string]*Kafka, len(cfgs)),
logger: logger,
mtrx: mtrx,
tracer: tracer,
telemetry: telemetry,
}
for name, cfg := range p.cfgs {
mtr, err := NewNamedMetrics(name, p.telemetry)
if err != nil {
return nil, fmt.Errorf("error on create metrics: %w", err)
}
kf, err := NewKafka(
logger.Named(name),
cfg,
mtr,
tracer,
)
if err != nil {
return nil, fmt.Errorf("create kafka cli %s: %w", name, err)
}
p.pools[name] = kf
}
return p, nil
}
func (p *Pool) GetPool(name string) (*Kafka, error) {
if kf, ok := p.pools[name]; ok {
return kf, nil
}
return nil, ErrPoolNotFound
}
package kafkapool
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/health"
)
type ProducerOpt func(*Producer)
type Producer struct {
kf *Kafka
cl *kgo.Client
}
func NewProducer(kf *Kafka) (*Producer, error) {
p := &Producer{
kf: kf,
}
cl, err := kgo.NewClient(append(kf.opts, kf.producerOpts()...)...)
if err != nil {
return nil, err
}
p.cl = cl
return p, nil
}
func (p *Producer) Send(ctx context.Context, topic string, payload Payload) error {
if !p.kf.Enabled() {
return nil
}
topic = p.kf.formatWithPrefix(topic)
ctx, span := p.kf.tracer.StartSpan(ctx, "producer_send")
defer span.End()
err := p.cl.ProduceSync(
ctx,
convertPayloadToRecord(topic, nil, payload),
).FirstErr()
p.kf.metrics.ProducerTotal(topic, err)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("produce: %w", err)
}
return nil
}
func (p *Producer) SendWithKey(
ctx context.Context,
topic string,
key Key,
payload Payload,
) error {
if !p.kf.Enabled() {
return nil
}
topic = p.kf.formatWithPrefix(topic)
ctx, span := p.kf.tracer.StartSpan(ctx, "producer_send")
defer span.End()
err := p.cl.ProduceSync(
ctx,
convertPayloadToRecord(topic, key, payload),
).FirstErr()
p.kf.metrics.ProducerTotal(topic, err)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("produce: %w", err)
}
return nil
}
func (p *Producer) SendBatch(ctx context.Context, topic string, payloads ...Payload) []error {
if !p.kf.Enabled() {
return nil
}
topic = p.kf.formatWithPrefix(topic)
ctx, span := p.kf.tracer.StartSpan(ctx, "producer_send_batch")
defer span.End()
results := p.cl.ProduceSync(ctx, convertPayloadsToRecords(topic, payloads...)...)
errors := make([]error, 0, len(payloads))
for i := range results {
r := &results[i]
p.kf.metrics.ProducerTotal(topic, r.Err)
if r.Err != nil {
span.RecordError(r.Err)
errors = append(errors, fmt.Errorf("produce error: %w", r.Err))
continue
}
}
if len(errors) > 0 {
return errors
}
return nil
}
func (p *Producer) SendAsync(ctx context.Context, topic string, payload Payload) {
if !p.kf.Enabled() {
return
}
topic = p.kf.formatWithPrefix(topic)
ctx, span := p.kf.tracer.StartSpan(ctx, "producer_send_async")
defer span.End()
p.cl.Produce(ctx,
convertPayloadToRecord(topic, nil, payload),
func(_ *kgo.Record, err error) {
if err != nil {
span.RecordError(err)
p.kf.logger.Error("Kafka: async producer error", zap.Error(err), zap.String("topic", topic))
}
p.kf.metrics.ProducerTotal(topic, err)
})
}
func (p *Producer) Ping(ctx context.Context) error {
if err := p.cl.Ping(ctx); err != nil {
return fmt.Errorf("failed to ping Kafka: %w", err)
}
return nil
}
func (p *Producer) Close(_ context.Context) error {
p.cl.Close()
return nil
}
func (p *Producer) HealthComponent() *health.Component {
return &health.Component{
Kind: health.ComponentKindLocal,
Name: "kafka_producer",
CheckFunc: func(ctx context.Context) error {
return p.Ping(ctx)
},
}
}
package logger
import (
"go.uber.org/zap"
)
func New() *zap.Logger {
cfg := zap.NewProductionConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
logger, err := cfg.Build()
if err != nil {
panic(err)
}
return logger
}
package middleware
import (
"fmt"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"go.ads.coffee/platform/pkg/telemetry"
)
type Telemetry interface {
Register(collectors ...prometheus.Collector) error
}
func Metrics(tel Telemetry) (func(next http.Handler) http.Handler, error) {
requestsTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "http",
Subsystem: "requests",
Name: "total",
Help: "Total number of HTTP requests.",
},
[]string{"method", "pattern", "status"},
)
requestDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "http",
Subsystem: "requests",
Name: "duration",
Help: "HTTP request latencies in seconds.",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"method", "pattern"},
)
if err := tel.Register(requestsTotal, requestDuration); err != nil {
return nil, err
}
metricsHandler := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chictx := chi.RouteContext(r.Context())
ww := chimiddleware.NewWrapResponseWriter(w, r.ProtoMajor)
ts := time.Now()
next.ServeHTTP(ww, r)
cost := time.Since(ts)
requestsTotal.WithLabelValues(
r.Method,
chictx.RoutePattern(),
fmt.Sprint(ww.Status()),
).Inc()
requestDuration.WithLabelValues(
r.Method,
chictx.RoutePattern(),
).Observe(cost.Seconds())
})
}
return metricsHandler, nil
}
package redispool
import (
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/health"
"go.ads.coffee/platform/pkg/telemetry"
)
var Module = fx.Module(
"redispool",
fx.Provide(
NewPool,
NewMetrics,
adapterHealth,
adapterTelemetry,
adapterTracer,
),
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("redispool")
}),
fx.Invoke(
func(lc fx.Lifecycle, pp *Pool) {
lc.Append(fx.Hook{
OnStop: pp.Stop,
})
},
),
)
type HealthComponentOut struct {
fx.Out
Pool *health.Component `group:"health_component"`
}
func adapterHealth(pp *Pool) HealthComponentOut {
return HealthComponentOut{
Pool: pp.HealthComponent(),
}
}
func adapterTelemetry(t *telemetry.Telemetry) Telemetry { //nolint: ireturn
return t
}
func adapterTracer(t *telemetry.Telemetry) Tracer { //nolint: ireturn
return t
}
package redispool
type KeyFormatter interface {
FormatKey(key string) string
}
type DummyKeyFormatter struct{}
func (DummyKeyFormatter) FormatKey(key string) string {
return key
}
package redispool
import (
"github.com/prometheus/client_golang/prometheus"
"go.ads.coffee/platform/pkg/telemetry"
)
type Telemetry interface {
Register(collectors ...prometheus.Collector) error
}
type Metrics struct {
duration *prometheus.HistogramVec
connections *prometheus.GaugeVec
connectionsCall *prometheus.GaugeVec
poolConnCreatedTotal *prometheus.CounterVec
singleCommands *prometheus.HistogramVec
pipelinedCommands *prometheus.CounterVec
}
type NamedMetrics struct {
name string
mtrx *Metrics
}
func NewMetrics(tel Telemetry) (*Metrics, error) {
duration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "redis_pool_action_duration_seconds",
Help: "redis pool action request latencies in seconds.",
Buckets: telemetry.DefaultHistogramBuckets,
},
[]string{"name", "action", telemetry.ErrLabel},
)
connections := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "redis_pool_connections",
Help: "redis pool connections",
},
[]string{"name", "status"},
)
connectionsCall := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "redis_pool_connections_calls",
Help: "Number of redis pool connections calls.",
},
[]string{"name", "status"},
)
poolConnCreatedTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "redis_pool_connection_created_total",
Help: "Total number of created connections in pool.",
},
[]string{"name", "addr"},
)
singleCommands := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "redis_single_commands",
Help: "Histogram of single Redis commands",
Buckets: telemetry.DefaultHistogramBuckets,
}, []string{"name", "command", telemetry.ErrLabel})
pipelinedCommands := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "redis_pipelined_commands_total",
Help: "Number of pipelined Redis commands",
}, []string{"name", telemetry.ErrLabel})
if err := tel.Register(
duration,
connections,
connectionsCall,
poolConnCreatedTotal,
singleCommands,
pipelinedCommands,
); err != nil {
return nil, err
}
return &Metrics{
duration: duration,
connections: connections,
connectionsCall: connectionsCall,
poolConnCreatedTotal: poolConnCreatedTotal,
singleCommands: singleCommands,
pipelinedCommands: pipelinedCommands,
}, nil
}
func (m *Metrics) Duration(name, action string, err error, d float64) {
m.duration.WithLabelValues(
name,
action,
telemetry.ErrLabelValue(err),
).Observe(d)
}
func (m *Metrics) Connections(name, status string, val float64) {
m.connections.WithLabelValues(name, status).Set(val)
}
func (m *Metrics) ConnectionsCall(name, status string, val float64) {
m.connectionsCall.WithLabelValues(name, status).Set(val)
}
func (m *Metrics) ConnectionCreate(name, addr string) {
m.poolConnCreatedTotal.WithLabelValues(name, addr).Inc()
}
func (m *Metrics) SingleCommands(name, cmdName string, d float64, err error) {
m.singleCommands.WithLabelValues(
name,
cmdName,
telemetry.ErrLabelValue(err),
).Observe(d)
}
func (m *Metrics) PipelinedCommands(name string, err error) {
m.pipelinedCommands.WithLabelValues(
name,
telemetry.ErrLabelValue(err),
).Inc()
}
func NewNamedMetrics(name string, mtrx *Metrics) *NamedMetrics {
return &NamedMetrics{
name: name,
mtrx: mtrx,
}
}
func (m *NamedMetrics) Duration(action string, err error, d float64) {
m.mtrx.Duration(m.name, action, err, d)
}
func (m *NamedMetrics) Connections(status string, val float64) {
m.mtrx.Connections(m.name, status, val)
}
func (m *NamedMetrics) ConnectionsCall(status string, val float64) {
m.mtrx.ConnectionsCall(m.name, status, val)
}
func (m *NamedMetrics) ConnectionCreate(addr string) {
m.mtrx.ConnectionCreate(m.name, addr)
}
func (m *NamedMetrics) SingleCommands(cmdName string, d float64, err error) {
m.mtrx.SingleCommands(m.name, cmdName, d, err)
}
func (m *NamedMetrics) PipelinedCommands(err error) {
m.mtrx.PipelinedCommands(m.name, err)
}
package redispool
import (
"context"
"net"
"time"
"github.com/redis/go-redis/v9"
)
type (
metricsHook struct {
metrics MetricsProvider
}
)
func newMetricsHook(metrics MetricsProvider) *metricsHook {
return &metricsHook{
metrics: metrics,
}
}
func (h *metricsHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
h.metrics.ConnectionCreate(addr)
return next(ctx, network, addr)
}
}
func (h *metricsHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
defer func(t time.Time) {
h.metrics.SingleCommands(cmd.Name(), time.Since(t).Seconds(), cmd.Err())
}(time.Now())
err := next(ctx, cmd)
if err != nil {
return err
}
return nil
}
}
func (h *metricsHook) ProcessPipelineHook(
next redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) (err error) {
defer func(t time.Time) {
h.metrics.SingleCommands("pipeline", time.Since(t).Seconds(), err)
h.metrics.PipelinedCommands(err)
}(time.Now())
err = next(ctx, cmds)
if err != nil {
return err
}
return nil
}
}
package redispool
import (
"context"
"errors"
"fmt"
"sync"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/circuitbreaker"
"go.ads.coffee/platform/pkg/health"
)
const (
CircuitNamePrefix = "redis:"
)
var (
// ErrPoolNotFound happens when try to get non existing pool.
ErrPoolNotFound = errors.New("pool not found")
)
type Tracer interface {
StartSpan(
ctx context.Context,
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span)
}
type MetricsProvider interface {
Duration(action string, err error, d float64)
Connections(status string, val float64)
ConnectionsCall(status string, val float64)
ConnectionCreate(addr string)
SingleCommands(cmdName string, d float64, err error)
PipelinedCommands(err error)
}
type Pool struct {
cfgs map[string]*Config
pools map[string]*Redis
mu sync.RWMutex
logger *zap.Logger
mtrx *Metrics
tracer Tracer
}
func NewPool(
logger *zap.Logger,
cfgs map[string]*Config,
mtrx *Metrics,
tracer Tracer,
pool *circuitbreaker.Pool,
) (*Pool, error) {
p := &Pool{
cfgs: cfgs,
pools: make(map[string]*Redis, len(cfgs)),
logger: logger,
mtrx: mtrx,
tracer: tracer,
}
for name, cfg := range p.cfgs {
redis, err := NewRedis(
logger,
cfg,
pool.Get(CircuitName(name)),
NewNamedMetrics(name, mtrx),
p.tracer,
)
if err != nil {
return nil, err
}
p.pools[name] = redis
}
return p, nil
}
func (p *Pool) GetPool(name string) (*Redis, error) {
p.mu.RLock()
defer p.mu.RUnlock()
if pool, ok := p.pools[name]; ok {
return pool, nil
}
return nil, ErrPoolNotFound
}
func (p *Pool) Stop(_ context.Context) error {
p.mu.RLock()
defer p.mu.RUnlock()
for _, pool := range p.pools {
err := pool.Close()
if err != nil {
p.logger.Error("stop redis", zap.Error(err))
return err
}
}
return nil
}
func (p *Pool) HealthComponent() *health.Component {
return &health.Component{
Kind: health.ComponentKindLocal,
Name: "redis_pool",
CheckFunc: func(ctx context.Context) error {
p.mu.RLock()
defer p.mu.RUnlock()
for name, conn := range p.pools {
err := conn.Ping(ctx)
if err != nil {
return fmt.Errorf("%s: %w", name, err)
}
}
return nil
},
}
}
func CircuitName(name string) string {
return fmt.Sprintf("%s%s", CircuitNamePrefix, name)
}
package redispool
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/circuitbreaker"
"go.ads.coffee/platform/pkg/health"
)
const (
writeStatsPeriod = 10 * time.Second
)
type CallCallback func(
ctx context.Context,
clu *redis.ClusterClient,
keyFormatter KeyFormatter,
) error
type Redis struct {
cfg *Config
logger *zap.Logger
rd *redis.ClusterClient
metrics MetricsProvider
circuit circuitbreaker.Circuit
tracer Tracer
}
func NewRedis(
logger *zap.Logger,
cfg *Config,
circuit circuitbreaker.Circuit,
metrics MetricsProvider,
tracer Tracer,
) (*Redis, error) {
redisClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: cfg.ClusterAddrs,
Username: cfg.Username,
Password: cfg.Password,
MaxRetries: cfg.MaxRetries,
MinRetryBackoff: cfg.MinRetryBackoff,
MaxRetryBackoff: cfg.MaxRetryBackoff,
MaxRedirects: cfg.MaxRedirects,
DialTimeout: cfg.DialTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
PoolSize: cfg.PoolSize,
PoolTimeout: cfg.PoolTimeout,
MinIdleConns: cfg.MinIdleConns,
MaxIdleConns: cfg.MaxIdleConns,
MaxActiveConns: cfg.MaxActiveConns,
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
ConnMaxLifetime: cfg.ConnMaxLifetime,
ReadOnly: true,
RouteRandomly: cfg.RouteRandomly,
RouteByLatency: cfg.RouteByLatency,
})
redisClient.AddHook(newMetricsHook(metrics)) // metrics hook
r := &Redis{
cfg: cfg,
logger: logger,
rd: redisClient,
metrics: metrics,
circuit: circuit,
tracer: tracer,
}
go r.reportStats()
return r, nil
}
func (r *Redis) FormatKey(key string) string {
if r.cfg.KeyPrefix == "" {
return key
}
return fmt.Sprintf("%s:%s", r.cfg.KeyPrefix, key)
}
func (r *Redis) Call(
ctx context.Context, name string, callback CallCallback,
) (err error) {
if !r.cfg.Enabled {
return nil
}
return r.circuit.Run(ctx, func(ctx context.Context) error {
return r.call(ctx, name, callback)
})
}
func (r *Redis) call(
ctx context.Context, name string, callback CallCallback,
) (err error) {
spanName := fmt.Sprintf("call_%s", name)
ctx, span := r.tracer.StartSpan(ctx, spanName, trace.WithAttributes(
attribute.String("redis.query.name", name),
))
defer span.End()
defer func(ts time.Time) {
r.metrics.Duration(
name,
err,
time.Since(ts).Seconds(),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
}(time.Now())
err = callback(ctx, r.rd, r)
if err != nil {
return err
}
return nil
}
func (r *Redis) Close() error {
return r.rd.Close()
}
func (r *Redis) Ping(ctx context.Context) error {
return r.rd.Ping(ctx).Err()
}
func (r *Redis) HealthComponent() *health.Component {
return &health.Component{
Kind: health.ComponentKindLocal,
Name: "redis",
CheckFunc: func(ctx context.Context) error {
return r.Ping(ctx)
},
}
}
func (r *Redis) reportStats() {
for range time.NewTicker(writeStatsPeriod).C {
stat := r.rd.PoolStats()
r.metrics.ConnectionsCall("hits", float64(stat.Hits))
r.metrics.ConnectionsCall("misses", float64(stat.Misses))
r.metrics.ConnectionsCall("timeouts", float64(stat.Timeouts))
r.metrics.Connections("idle", float64(stat.IdleConns))
r.metrics.Connections("stale", float64(stat.StaleConns))
r.metrics.Connections("total", float64(stat.TotalConns))
}
}
package telemetry
var (
DefaultObjectives = map[float64]float64{
0.5: 0.01,
0.95: 0.001,
0.99: 0.001,
0.999: 0.0001,
1.0: 0,
}
DefaultHistogramBuckets = []float64{
0.001,
0.01,
0.1,
0.2,
0.3,
0.4,
0.45,
0.5,
0.55,
0.6,
0.65,
0.7,
0.75,
0.8,
0.85,
0.9,
1.0,
1.5,
2.0,
3.0,
5.0,
10.0,
30.0,
60.0,
120.0,
300.0,
}
)
// ErrLabel is error static label.
const ErrLabel = "error"
// ErrLabelValue returns string representation of error label value.
func ErrLabelValue(err error) string {
if err != nil {
return "true"
}
return "false"
}
package telemetry
import (
"go.uber.org/fx"
"go.uber.org/zap"
)
var Module = fx.Module(
"telemetry",
fx.Provide(
New,
),
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("telemetry")
}),
fx.Invoke(func(lc fx.Lifecycle, tel *Telemetry) {
lc.Append(fx.Hook{
OnStop: tel.Stop,
})
}),
)
package telemetry
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func (t *Telemetry) Handler() http.HandlerFunc {
return promhttp.Handler().ServeHTTP
}
package telemetry
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"go.uber.org/zap"
otelBridge "go.opentelemetry.io/otel/bridge/opentracing"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
type Telemetry struct {
cfg Config
logger *zap.Logger
registry prometheus.Registerer
tracer trace.TracerProvider
bridgeTracer *otelBridge.BridgeTracer
tracerStopFunc func(ctx context.Context) error
}
func New(
logger *zap.Logger,
config Config,
pr prometheus.Registerer,
) (*Telemetry, error) {
t := &Telemetry{
cfg: config,
logger: logger,
registry: pr,
tracer: noop.NewTracerProvider(), // default empty tracer
bridgeTracer: otelBridge.NewBridgeTracer(),
}
{ // tracer
if config.Jaeger.Enabled {
var err error
t.tracer, t.bridgeTracer, t.tracerStopFunc, err = tracerProvider(config)
if err != nil {
return nil, fmt.Errorf("tracer provider: %w", err)
}
}
}
return t, nil
}
func (t *Telemetry) Registry() prometheus.Registerer {
return t.registry
}
func (t *Telemetry) Register(collectors ...prometheus.Collector) error {
for _, collector := range collectors {
if err := t.registry.Register(collector); err != nil {
return fmt.Errorf("failed to register collector: %w", err)
}
}
return nil
}
func (t *Telemetry) Stop(ctx context.Context) error {
if t.tracerStopFunc != nil {
err := t.tracerStopFunc(ctx)
if err != nil {
return fmt.Errorf("stop func: %w", err)
}
}
return nil
}
func tracerProvider(config Config) (
trace.TracerProvider,
*otelBridge.BridgeTracer,
func(ctx context.Context) error,
error,
) {
ctx := context.Background()
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName(config.ServiceName),
semconv.ServiceVersion(config.Version),
semconv.ServiceInstanceID(config.Hostname),
semconv.HostName(config.Hostname),
),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("create resource: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// nolint:staticcheck // TODO change later
conn, err := grpc.DialContext(ctx, config.Jaeger.Endpoint,
// Note the use of insecure transport here. TLS is recommended in production.
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), // nolint:staticcheck // TODO change later
)
if err != nil {
return nil, nil, nil, fmt.Errorf("create gRPC connection to collector: %w", err)
}
traceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithGRPCConn(conn),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("create trace exporter: %w", err)
}
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.Jaeger.SamplingRatio/100.0)),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
bridgeTracer, wrapperTracerProvider := otelBridge.NewTracerPair(tracerProvider.Tracer(""))
// set the Tracer Provider and the W3C Trace Context propagator as globals
otel.SetTracerProvider(wrapperTracerProvider)
// set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{})
return wrapperTracerProvider, bridgeTracer, tracerProvider.Shutdown, nil
}
func (t *Telemetry) StartSpan(
ctx context.Context,
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span) {
return t.tracer.Tracer(t.cfg.ServiceName).Start(ctx, spanName, opts...)
}
func (t *Telemetry) TracerProvider() trace.TracerProvider {
return t.tracer
}
func (t *Telemetry) OpenTracer() *otelBridge.BridgeTracer {
return t.bridgeTracer
}
package main
import (
"context"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v3"
"go.uber.org/fx"
"go.ads.coffee/platform/pkg/circuitbreaker"
"go.ads.coffee/platform/pkg/database"
"go.ads.coffee/platform/pkg/health"
"go.ads.coffee/platform/pkg/kafkapool"
"go.ads.coffee/platform/pkg/logger"
"go.ads.coffee/platform/pkg/redispool"
"go.ads.coffee/platform/pkg/telemetry"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/config"
"go.ads.coffee/platform/server/internal/repos/banners"
"go.ads.coffee/platform/server/internal/repos/placements"
"go.ads.coffee/platform/server/internal/repos/units"
"go.ads.coffee/platform/server/internal/server"
"go.ads.coffee/platform/server/internal/sessions"
"go.ads.coffee/platform/server/plugins"
)
func main() {
cmd := &cli.Command{
Name: "kodikapusta",
Flags: []cli.Flag{
&cli.StringFlag{Name: "config", Aliases: []string{"c"}},
},
Commands: []*cli.Command{
{
Name: "serve",
Aliases: []string{"s"},
Action: func(ctx context.Context, cmd *cli.Command) error {
fx.New(
fx.Provide(
func() prometheus.Registerer {
// default prometheus
return prometheus.DefaultRegisterer
},
),
fx.Provide(
func() (config.Config, error) {
cfg := cmd.String("config")
if cfg == "" {
cfg = "server/configs/config.yaml"
}
return config.New(cfg)
},
),
logger.Module,
server.Module,
database.Module,
sessions.Module,
analytics.Module,
telemetry.Module,
health.Module,
circuitbreaker.Module,
redispool.Module,
kafkapool.Module,
plugins.Module,
// repos
banners.Module,
placements.Module,
units.Module,
fx.Invoke(
start,
caches,
),
).Run()
return nil
},
},
},
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
panic(err)
}
// sig := make(chan os.Signal, 1)
// signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
// <-sig
// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel()
// if err := app.Stop(ctx); err != nil {
// panic(err)
// }
}
func start(lc fx.Lifecycle, server *server.Server) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return server.Start(ctx)
},
OnStop: func(ctx context.Context) error {
return server.Shutdown(ctx)
},
})
}
func caches(
banners *banners.Cache,
placements *placements.Cache,
units *units.Cache,
) {
go banners.Start(context.Background())
go placements.Start(context.Background())
go units.Start(context.Background())
}
package analytics
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"go.ads.coffee/platform/pkg/kafkapool"
"go.ads.coffee/platform/pkg/telemetry"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
type Analytics struct {
tel *telemetry.Telemetry
producer *kafkapool.Producer
logger *zap.Logger
}
func New(logger *zap.Logger, pool *kafkapool.Pool, tel *telemetry.Telemetry) (*Analytics, error) {
if err := tel.Register(actions, money); err != nil {
return nil, err
}
kfk, err := pool.GetPool("main")
if err != nil {
return nil, fmt.Errorf("kafka pool error: %w", err)
}
producer, err := kafkapool.NewProducer(kfk)
if err != nil {
return nil, fmt.Errorf("kafka producer error: %w", err)
}
return &Analytics{
tel: tel,
producer: producer,
logger: logger,
}, nil
}
func (r *Analytics) Log(ctx context.Context, name string, event ads.Event) error {
actions.WithLabelValues(
name,
).Inc()
data, err := event.JSON()
if err != nil {
return err
}
// write to kafka
r.producer.SendAsync(
context.WithoutCancel(ctx),
name,
data,
)
r.logger.Info("write to analytics", zap.Any("event", event))
return nil
}
func (r *Analytics) LogRequest(ctx context.Context, state *plugins.State) error {
return r.Log(
ctx,
ads.ActionRequest,
ads.Event{
RequestID: state.RequestID,
Timestamp: time.Now().UTC().Unix(),
Action: ads.ActionRequest,
GAID: "",
OAID: "",
// Country: rc.Request.Country(),
// Region: rc.Request.Region(),
// City: rc.Request.City(),
// Network: rc.Network,
// Make: rc.Request.Make(),
},
)
}
func (r *Analytics) LogResponse(ctx context.Context, w ads.Banner, state *plugins.State) error {
return r.Log(
ctx,
ads.ActionResponse,
ads.Event{
RequestID: state.RequestID,
ClickID: state.ClickID,
Timestamp: time.Now().Unix(),
Action: ads.ActionResponse,
BannerID: w.ID,
GroupID: w.GroupID,
CampaignID: w.CampaignID,
AdvertiserID: w.AdvertiserID,
GAID: "",
OAID: "",
// Country: rc.Request.Country(),
// Region: rc.Request.Region(),
// City: rc.Request.City(),
// Network: rc.Network,
Price: float64(w.Price),
},
)
}
func (r *Analytics) LogWin(ctx context.Context, data ads.TrackerInfo) error {
data.Action = ads.ActionWin
return r.Log(ctx, ads.ActionWin, ads.Event(data))
}
func (r *Analytics) LogConversion(ctx context.Context, data ads.TrackerInfo) error {
data.Action = ads.ActionConversion
return r.Log(ctx, ads.ActionConversion, ads.Event(data))
}
func (r *Analytics) LogImpression(ctx context.Context, data ads.TrackerInfo) error {
money.WithLabelValues(
ads.ActionImpression,
).Add(data.Price)
data.Action = ads.ActionImpression
return r.Log(ctx, ads.ActionImpression, ads.Event(data))
}
func (r *Analytics) LogClick(ctx context.Context, data ads.TrackerInfo) error {
data.Action = ads.ActionClick
return r.Log(ctx, ads.ActionClick, ads.Event(data))
}
package config
import (
"os"
"go.uber.org/config"
"go.uber.org/fx"
"go.ads.coffee/platform/pkg/circuitbreaker"
"go.ads.coffee/platform/pkg/database"
"go.ads.coffee/platform/pkg/health"
"go.ads.coffee/platform/pkg/kafkapool"
"go.ads.coffee/platform/pkg/redispool"
"go.ads.coffee/platform/pkg/telemetry"
"go.ads.coffee/platform/server/internal/pipeline"
"go.ads.coffee/platform/server/internal/server"
)
type Config struct {
fx.Out
Pipelines []pipeline.Config `yaml:"pipelines"`
Server server.Config `yaml:"server"`
Health health.Config `yaml:"health"`
CircuitBreaker map[string]*circuitbreaker.Config `yaml:"circuit-breaker"`
RedisPool map[string]*redispool.Config `yaml:"redis-pool"`
Telemetry telemetry.Config `yaml:"telemetry"`
Database database.Config `yaml:"database"`
Kafka map[string]*kafkapool.Config `yaml:"kafka-pool"`
}
func New(file string) (Config, error) {
provider, err := config.NewYAML(
config.Expand(os.LookupEnv),
config.File(file),
config.Permissive(),
)
if err != nil {
return Config{}, err
}
cfg := Config{}
err = provider.Get("").Populate(&cfg)
if err != nil {
return Config{}, err
}
return cfg, nil
}
package ads
import (
"strconv"
"strings"
"time"
"github.com/qor5/admin/v3/media/media_library"
)
const (
CreativeTypeBanner = "banner"
CreativeTypeVideo = "video"
CreativeTypeNative = "native"
CreativeTypeMediator = "mediator"
)
type Banner struct {
ID uint
Title string
Price int
Active bool
Type string
Network string
Targeting Targeting
Timetable Timetable
BannerBudget Budget
GroupBudget Budget
CampaignBudget Budget
AdvertiserBudget Budget
BannerCapping Capping
GroupCapping Capping
CampaignCapping Capping
AdvertiserCapping Capping
Image media_library.MediaBox
Icon media_library.MediaBox
Clicktracker string
Imptracker string
Target string
Label string
Description string
Bundle string
Erid string
GroupID string
CampaignID string
AdvertiserID string
BannerStart time.Time
BannerEnd time.Time
GroupStart time.Time
GroupEnd time.Time
CampaignStart time.Time
CampaignEnd time.Time
AdvertiserStart time.Time
AdvertiserEnd time.Time
Data string
}
func (b Banner) PriceFormated() string {
return strconv.FormatFloat(float64(b.Price), 'f', -1, 64)
}
func (b Banner) Media(style string) string {
if b.Image.Url == "" {
return ""
}
u := b.Image.URL(style)
if strings.Contains(u, "http:") || strings.Contains(u, "https:") {
return u // TODO: replace to cdn
}
return "https:" + u // TODO: replace to cdn
}
package ads
import "encoding/json"
type Limit struct {
Daily int64 `json:"daily"`
Total int64 `json:"total"`
Uniform bool `json:"uniform"`
}
type Budget struct {
Impressions Limit `json:"impressions"`
Clicks Limit `json:"clicks"`
Money Limit `json:"money"`
Conversions Limit `json:"conversions"`
}
func NewBudget(data string) (Budget, error) {
b := Budget{}
if data == "" {
return b, nil
}
if err := json.Unmarshal([]byte(data), &b); err != nil {
return Budget{}, err
}
return b, nil
}
package ads
import "encoding/json"
const CappingKeyTemplate = "%s:%s:%s" // uid, item, id
type Capping struct {
Count int64 `json:"count"`
Period int64 `json:"period"`
}
func NewCapping(data string) (Capping, error) {
c := Capping{}
if data == "" {
return c, nil
}
if err := json.Unmarshal([]byte(data), &c); err != nil {
return Capping{}, err
}
return c, nil
}
type CappingInfo struct {
LastSeen int64 `json:"last_seen"`
Count int64 `json:"count"`
}
package ads
import "encoding/json"
type Event struct {
Timestamp int64 `json:"timestamp"`
Action string `json:"action"`
RequestID string `json:"request_id"`
ClickID string `json:"click_id"`
BannerID uint `json:"banner_id"`
GroupID string `json:"group_id"`
CampaignID string `json:"campaign_id"`
AdvertiserID string `json:"advertiser_id"`
GAID string `json:"gaid"`
OAID string `json:"oaid"`
Bundle string `json:"bundle"`
City string `json:"city"`
Country string `json:"country"`
Region string `json:"region"`
Price float64 `json:"price"`
Network string `json:"network"`
Size string `json:"size"`
Make string `json:"make"`
}
func (e Event) JSON() ([]byte, error) {
data, err := json.Marshal(e)
if err != nil {
return nil, err
}
return data, nil
}
package ads
import (
"encoding/json"
"strings"
)
const base = "//platform.hb.vkcloud-storage.ru"
type Image struct {
Url string `json:"url"`
}
func NewImage(data string) (Image, error) {
i := Image{}
if data == "" {
return i, nil
}
if err := json.Unmarshal([]byte(data), &i); err != nil {
return Image{}, err
}
return i, nil
}
func (i Image) Full(cdn string) string {
if cdn == "" && strings.HasPrefix(i.Url, "//") {
return "https:" + i.Url
}
if cdn == "" {
return i.Url
}
img := strings.ReplaceAll(i.Url, base, cdn)
img = strings.ReplaceAll(img, "file", "file.image")
return img
}
package ads
import (
"encoding/json"
"net"
"slices"
)
type Targeting struct {
Bundle ExcludeInclude
Audience ExcludeInclude
Bapp ExcludeInclude
IP ExcludeIncludeIP
Country ExcludeInclude
City ExcludeInclude
Region ExcludeInclude
Network ExcludeInclude
}
func NewTargeting(data string) (Targeting, error) {
t := Targeting{}
if data == "" {
return t, nil
}
if err := json.Unmarshal([]byte(data), &t); err != nil {
return Targeting{}, err
}
return t, nil
}
type ExcludeInclude struct {
IncludeOr []string
ExcludeOr []string
IncludeAnd []string
ExcludeAnd []string
}
func (e ExcludeInclude) Validate(values []string) bool {
if len(e.ExcludeAnd) > 0 && arrayInArrayAnd(values, e.ExcludeAnd) {
return false // в установленных приложениях есть все из исключения
}
if len(e.IncludeAnd) > 0 && !arrayInArrayAnd(values, e.IncludeAnd) {
return false // в установленных приложениях нет всех из включенных
}
// проверяем если заполнены вхождения по OR
if len(e.ExcludeOr) > 0 && arrayInArrayOr(values, e.ExcludeOr) {
return false // в установленных приложениях есть все из исключения
}
if len(e.IncludeOr) > 0 && !arrayInArrayOr(values, e.IncludeOr) {
return false // в установленных приложениях нет всех из включенных
}
return true
}
type ExcludeIncludeIP struct {
Include []*net.IPNet
Exclude []*net.IPNet
}
func (e ExcludeIncludeIP) Validate(ip string) bool {
if len(e.Exclude) > 0 && ipInNetworks(e.Exclude, ip) {
return false // в ip есть в исключениях
}
if len(e.Include) > 0 && !ipInNetworks(e.Include, ip) {
return false // ip нет во включенных
}
return true
}
func ipInNetworks(networks []*net.IPNet, val string) bool {
ip := net.ParseIP(val)
if ip == nil {
return false
}
for _, network := range networks {
if network.Contains(ip) {
return true
}
}
return false
}
// Возвращает true если есть хотя бы одно значение из search входит в массив items.
func arrayInArrayOr[T comparable](items []T, search []T) bool {
for _, value := range search {
if slices.Contains(items, value) {
return true
}
}
return false
}
// Возвращает true если есть все значения из search входят в массив items.
func arrayInArrayAnd[T comparable](items []T, search []T) bool {
for _, value := range search {
if !slices.Contains(items, value) {
return false
}
}
return true
}
package ads
import "encoding/json"
type Timetable map[int]map[int]bool // 7 дней, 24 часа
func NewTimetable(data string) (Timetable, error) {
t := Timetable{}
if data == "" {
return t, nil
}
if err := json.Unmarshal([]byte(data), &t); err != nil {
return Timetable{}, err
}
return t, nil
}
func (t Timetable) Validate(day, hour int) bool {
if len(t) == 0 {
return true
}
if day < 0 || day > 6 || hour < 0 || hour > 23 {
return false
}
val, ok := t[day][hour]
if !ok {
return false
}
return val
}
func (t Timetable) Merge(source Timetable) Timetable {
result := t
if len(source) > 0 {
result = source
}
return result
}
package plugins
import (
"context"
"net/http"
"go.ads.coffee/platform/server/internal/domain/ads"
)
type State struct {
RequestID string
ClickID string
Request *http.Request
Response http.ResponseWriter
User *User
Device *Device
Candidates []ads.Banner
Winners []ads.Banner
Placement ads.Placement
Units []ads.Unit
}
func (s *State) Value(key any) any {
return s.Request.Context().Value("action")
}
func (s *State) WithValue(key, value any) {
ctx := s.Request.Context()
ctx = context.WithValue(ctx, key, value)
s.Request = s.Request.WithContext(ctx)
}
type User struct {
ID string
}
type Device struct {
UA string
IP string
}
type Placement struct {
ID string
Units []ads.Unit
}
package inputs
import (
"go.ads.coffee/platform/server/internal/domain/plugins"
)
type Inputs struct {
plugins map[string]plugins.Input
}
func New(inputs []plugins.Input) *Inputs {
plugins := map[string]plugins.Input{}
for _, input := range inputs {
plugins[input.Name()] = input
}
return &Inputs{
plugins: plugins,
}
}
func (i *Inputs) Get(name string, cfg map[string]any) plugins.Input {
return i.plugins[name].Copy(cfg)
}
package outputs
import "go.ads.coffee/platform/server/internal/domain/plugins"
type Outputs struct {
list map[string]plugins.Output
}
func New(list []plugins.Output) *Outputs {
plugins := map[string]plugins.Output{}
for _, output := range list {
plugins[output.Name()] = output
}
return &Outputs{
list: plugins,
}
}
func (i *Outputs) Get(name string, cfg map[string]any) plugins.Output {
return i.list[name].Copy(cfg)
}
package pipeline
import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/internal/inputs"
"go.ads.coffee/platform/server/internal/outputs"
"go.ads.coffee/platform/server/internal/stages"
"go.ads.coffee/platform/server/internal/targetings"
)
type Manager struct {
pipelines []*Pipeline
}
func NewManager(
pipelines []Config,
inputs *inputs.Inputs,
outputs *outputs.Outputs,
stages *stages.Stages,
targetings *targetings.Targetings,
) *Manager {
m := &Manager{}
for _, c := range pipelines {
tt := []plugins.Targeting{}
for _, t := range c.Targetings {
tt = append(tt, targetings.Get(t.Name, t.Config))
}
ss := []plugins.Stage{}
for _, s := range c.Stages {
v := stages.Get(s.Name, s.Config)
switch s := v.(type) {
case plugins.WithTargetings:
s.Targetings(tt)
default:
ss = append(ss, v)
}
}
m.pipelines = append(m.pipelines, NewPipeline(
c.Name,
c.Route,
inputs.Get(c.Input.Name, c.Input.Config),
outputs.Get(c.Output.Name, c.Output.Config),
ss,
))
}
return m
}
func (m *Manager) Mount(router *chi.Mux) {
for _, p := range m.pipelines {
router.Mount(p.Route(), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
state := &plugins.State{
RequestID: uuid.NewString(),
ClickID: uuid.NewString(),
Request: r,
Response: w,
}
if err := p.Do(ctx, state); err != nil {
w.WriteHeader(http.StatusNotFound)
}
}))
}
}
package pipeline
import (
"context"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
type Pipeline struct {
name string
route string
input plugins.Input
output plugins.Output
stages []plugins.Stage
}
func NewPipeline(
name string,
route string,
input plugins.Input,
output plugins.Output,
stages []plugins.Stage,
) *Pipeline {
return &Pipeline{
name: name,
route: route,
input: input,
output: output,
stages: stages,
}
}
func (p *Pipeline) Name() string {
return p.name
}
func (p *Pipeline) Route() string {
return p.route
}
func (p *Pipeline) Do(
ctx context.Context,
state *plugins.State,
) error {
if ok := p.input.Do(ctx, state); !ok {
return nil
}
for _, stage := range p.stages {
if err := stage.Do(ctx, state); err != nil {
return err
}
}
return p.output.Do(ctx, state)
}
package banners
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/domain/ads"
)
type Banners interface {
All(ctx context.Context) ([]ads.Banner, error)
}
type Cache struct {
logger *zap.Logger
repo Banners
lock sync.RWMutex
banners []ads.Banner
bannersById map[uint]ads.Banner
}
func NewCache(logger *zap.Logger, repo *Repo) *Cache {
return &Cache{
logger: logger,
repo: repo,
bannersById: map[uint]ads.Banner{},
}
}
func (c *Cache) All(ctx context.Context) []ads.Banner {
c.lock.RLock()
defer c.lock.RUnlock()
banners := make([]ads.Banner, len(c.banners))
copy(banners, c.banners)
return banners
}
func (c *Cache) One(ctx context.Context, id uint) (ads.Banner, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
b, ok := c.bannersById[id]
return b, ok
}
// Start reload banners cache.
func (c *Cache) Start(ctx context.Context) {
c.reload()
ticker := time.NewTicker(time.Minute * 1)
for range ticker.C {
select {
case <-ctx.Done():
return
default:
c.reload()
}
}
}
func (c *Cache) reload() {
banners, err := c.repo.All(context.Background())
if err != nil {
c.logger.Error("error on get banners from repo", zap.Error(err))
return
}
c.lock.Lock()
c.banners = banners
c.bannersById = map[uint]ads.Banner{}
for _, banner := range banners {
c.bannersById[banner.ID] = banner
}
c.lock.Unlock()
}
package banners
import (
"context"
"encoding/json"
"net"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/server/internal/domain/ads"
)
const network = "coffee"
type Repo struct {
logger *zap.Logger
db *gorm.DB
}
func NewRepo(logger *zap.Logger, db *gorm.DB) *Repo {
return &Repo{
logger: logger.Named("banners"),
db: db,
}
}
func (b *Repo) All(ctx context.Context) ([]ads.Banner, error) {
rows := []Row{}
err := b.db.Model(Row{}).Raw(`select
banners.id as id,
banners.title as title,
CASE
WHEN banners.price IS NOT NULL AND banners.price > 0 THEN banners.price
ELSE bgroups.price
END AS price,
banners.active as active,
banners.targeting as banner_targeting,
bgroups.targeting as bgroup_targeting,
campaigns.targeting as campaign_targeting,
advertisers.targeting as advertiser_targeting,
banners.timetable as banner_timetable,
bgroups.timetable as bgroup_timetable,
campaigns.timetable as campaign_timetable,
advertisers.timetable as advertiser_timetable,
banners.budget as banner_budget,
bgroups.budget as bgroup_budget,
campaigns.budget as campaign_budget,
advertisers.budget as advertiser_budget,
banners.capping as banner_capping,
bgroups.capping as bgroup_capping,
campaigns.capping as campaign_capping,
advertisers.capping as advertiser_capping,
banners.image as image,
banners.icon as icon,
banners.clicktracker as clicktracker,
banners.imptracker as imptracker,
banners.target as target,
banners.label as label,
banners.description as description,
campaigns.bundle as bundle,
banners.erid as erid,
bgroups.id as bgroup_id,
campaigns.id as campaign_id,
advertisers.id as advertiser_id,
banners.start as banner_start,
banners.end as banner_end,
bgroups.start as bgroups_start,
bgroups.end as bgroups_end,
campaigns.start as campaign_start,
campaigns.end as campaign_end,
advertisers.start as advertiser_start,
advertisers.end as advertiser_end
from banners
join bgroups ON (banners.bgroup_id = bgroups.id)
join campaigns ON (bgroups.campaign_id = campaigns.id)
join advertisers ON (campaigns.advertiser_id = advertisers.id)
where
banners.active = true
and banners.deleted_at is NULL
and banners.archived_at is NULL
and bgroups.active = true
and bgroups.deleted_at is NULL
and bgroups.archived_at is NULL
and campaigns.active = true
and campaigns.deleted_at is NULL
and campaigns.archived_at is NULL
and advertisers.active = true
and advertisers.deleted_at is NULL
and advertisers.archived_at is NULL
and (banners."end" is null or banners."end" > NOW() or banners."end" < '2001-01-02 00:00:00')
and (campaigns."end" is null or campaigns."end" > NOW() or campaigns."end" < '2001-01-02 00:00:00')
and (bgroups."end" is null or bgroups."end" > NOW() or bgroups."end" < '2001-01-02 00:00:00')
and (advertisers."end" is null or advertisers."end" > NOW() or advertisers."end" < '2001-01-02 00:00:00')`).Find(&rows).Error
if err != nil {
return nil, err
}
banners := make([]ads.Banner, 0, len(rows))
for _, row := range rows {
banner, err := toModel(row)
if err != nil {
b.logger.Warn("error on convert row to model", zap.Error(err), zap.Uint("id", row.ID))
continue
}
banners = append(banners, banner)
}
return banners, nil
}
func toModel(row Row) (ads.Banner, error) {
banner := ads.Banner{
ID: row.ID,
Title: row.Title,
Price: row.Price,
Active: row.Active,
Clicktracker: row.Clicktracker,
Imptracker: row.Imptracker,
Target: row.Target,
Label: row.Label,
Description: row.Description,
Bundle: row.Bundle,
Erid: row.Erid,
GroupID: row.GroupID,
CampaignID: row.CampaignID,
AdvertiserID: row.AdvertiserID,
BannerStart: row.BannerStart,
BannerEnd: row.BannerEnd,
GroupStart: row.GroupStart,
GroupEnd: row.GroupEnd,
CampaignStart: row.CampaignStart,
CampaignEnd: row.CampaignEnd,
AdvertiserStart: row.AdvertiserStart,
AdvertiserEnd: row.AdvertiserEnd,
Image: row.Image,
Icon: row.Icon,
Network: network,
}
var err error
// targetings
atg, err := newTargeting(row.AdvertiserTargeting)
if err != nil {
return ads.Banner{}, err
}
ctg, err := newTargeting(row.CampaignTargeting)
if err != nil {
return ads.Banner{}, err
}
gtg, err := newTargeting(row.GroupTargeting)
if err != nil {
return ads.Banner{}, err
}
btg, err := newTargeting(row.BannerTargeting)
if err != nil {
return ads.Banner{}, err
}
banner.Targeting = atg.merge(ctg).merge(gtg).merge(btg).toDomain()
// timetables
att, err := ads.NewTimetable(row.AdvertiserTimetable)
if err != nil {
return ads.Banner{}, err
}
ctt, err := ads.NewTimetable(row.CampaignTimetable)
if err != nil {
return ads.Banner{}, err
}
gtt, err := ads.NewTimetable(row.GroupTimetable)
if err != nil {
return ads.Banner{}, err
}
btt, err := ads.NewTimetable(row.BannerTimetable)
if err != nil {
return ads.Banner{}, err
}
banner.Timetable = att.Merge(ctt).Merge(gtt).Merge(btt)
// budget
if banner.BannerBudget, err = ads.NewBudget(row.BannerBudget); err != nil {
return ads.Banner{}, err
}
if banner.GroupBudget, err = ads.NewBudget(row.GroupBudget); err != nil {
return ads.Banner{}, err
}
if banner.CampaignBudget, err = ads.NewBudget(row.CampaignBudget); err != nil {
return ads.Banner{}, err
}
if banner.AdvertiserBudget, err = ads.NewBudget(row.AdvertiserBudget); err != nil {
return ads.Banner{}, err
}
// capping
if banner.BannerCapping, err = ads.NewCapping(row.BannerCapping); err != nil {
return ads.Banner{}, err
}
if banner.GroupCapping, err = ads.NewCapping(row.GroupCapping); err != nil {
return ads.Banner{}, err
}
if banner.CampaignCapping, err = ads.NewCapping(row.CampaignCapping); err != nil {
return ads.Banner{}, err
}
if banner.AdvertiserCapping, err = ads.NewCapping(row.AdvertiserCapping); err != nil {
return ads.Banner{}, err
}
return banner, nil
}
type excludeInclude struct {
Include []string `json:"include"`
Exclude []string `json:"exclude"`
IncludeOr []string `json:"include_or"`
ExcludeOr []string `json:"exclude_or"`
IncludeAnd []string `json:"include_and"`
ExcludeAnd []string `json:"exclude_and"`
}
func (e excludeInclude) merge(source excludeInclude) excludeInclude {
result := e
if len(source.IncludeOr) > 0 {
result.IncludeOr = source.IncludeOr
}
if len(source.ExcludeOr) > 0 {
result.ExcludeOr = source.ExcludeOr
}
if len(source.IncludeAnd) > 0 {
result.IncludeAnd = source.IncludeAnd
}
if len(source.ExcludeAnd) > 0 {
result.ExcludeAnd = source.ExcludeAnd
}
return result
}
func (e excludeInclude) toExcludeIncludeString() ads.ExcludeInclude {
return ads.ExcludeInclude{
IncludeOr: e.IncludeOr,
ExcludeOr: e.ExcludeOr,
IncludeAnd: e.IncludeAnd,
ExcludeAnd: e.ExcludeAnd,
}
}
func (e excludeInclude) toExcludeIncludeIP() ads.ExcludeIncludeIP {
t := ads.ExcludeIncludeIP{
Include: []*net.IPNet{},
Exclude: []*net.IPNet{},
}
for _, v := range e.Include {
_, network, err := net.ParseCIDR(v)
if err != nil {
t.Include = append(t.Include, network)
}
}
for _, v := range e.Exclude {
_, network, err := net.ParseCIDR(v)
if err != nil {
t.Exclude = append(t.Exclude, network)
}
}
return t
}
type targeting struct {
Bundle excludeInclude `json:"bundle"`
Audience excludeInclude `json:"audience"`
Bapp excludeInclude `json:"bapp"`
IP excludeInclude `json:"ip"`
Country excludeInclude `json:"country"`
City excludeInclude `json:"city"`
Region excludeInclude `json:"region"`
Network excludeInclude `json:"network"`
}
func newTargeting(data string) (targeting, error) {
t := targeting{}
if data == "" {
return t, nil
}
if err := json.Unmarshal([]byte(data), &t); err != nil {
return targeting{}, err
}
return t, nil
}
func (t targeting) merge(source targeting) targeting {
return targeting{
Bundle: t.Bundle.merge(source.Bundle),
Audience: t.Audience.merge(source.Audience),
Bapp: t.Bapp.merge(source.Bapp),
IP: t.IP.merge(source.IP),
Country: t.Country.merge(source.Country),
City: t.City.merge(source.City),
Region: t.Region.merge(source.Region),
Network: t.Network.merge(source.Network),
}
}
func (t targeting) toDomain() ads.Targeting {
return ads.Targeting{
Bundle: t.Bundle.toExcludeIncludeString(),
Audience: t.Audience.toExcludeIncludeString(),
Bapp: t.Bapp.toExcludeIncludeString(),
IP: t.Bapp.toExcludeIncludeIP(),
Country: t.Country.toExcludeIncludeString(),
City: t.City.toExcludeIncludeString(),
Region: t.Region.toExcludeIncludeString(),
Network: t.Network.toExcludeIncludeString(),
}
}
package placements
import (
"context"
"sync"
"time"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.uber.org/zap"
)
type Placements interface {
All(ctx context.Context) ([]ads.Placement, error)
}
type Cache struct {
logger *zap.Logger
repo Placements
lock sync.RWMutex
placements []ads.Placement
placementById map[uint]ads.Placement
}
func NewCache(logger *zap.Logger, repo *Repo) *Cache {
return &Cache{
logger: logger,
repo: repo,
placementById: map[uint]ads.Placement{},
}
}
func (c *Cache) All(ctx context.Context) []ads.Placement {
c.lock.RLock()
defer c.lock.RUnlock()
placements := make([]ads.Placement, len(c.placements))
copy(placements, c.placements)
return placements
}
func (c *Cache) One(ctx context.Context, id uint) (ads.Placement, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
b, ok := c.placementById[id]
return b, ok
}
// Start reload banners cache.
func (c *Cache) Start(ctx context.Context) {
c.reload()
ticker := time.NewTicker(time.Minute * 1)
for range ticker.C {
select {
case <-ctx.Done():
return
default:
c.reload()
}
}
}
func (c *Cache) reload() {
placements, err := c.repo.All(context.Background())
if err != nil {
c.logger.Error("error on get placements from repo", zap.Error(err))
return
}
c.lock.Lock()
c.placements = placements
c.placementById = map[uint]ads.Placement{}
for _, placement := range placements {
c.placementById[placement.ID] = placement
}
c.lock.Unlock()
}
package placements
import (
"context"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/server/internal/domain/ads"
)
type Repo struct {
logger *zap.Logger
db *gorm.DB
}
func NewRepo(logger *zap.Logger, db *gorm.DB) *Repo {
r := &Repo{
logger: logger,
db: db,
}
return r
}
func (b *Repo) All(ctx context.Context) ([]ads.Placement, error) {
rows := []ads.Placement{}
err := b.db.Model(ads.Placement{}).
Where("deleted_at is null and active = true").
Find(&rows).Error
if err != nil {
return nil, err
}
return rows, nil
}
package units
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/domain/ads"
)
type Placements interface {
All(ctx context.Context) ([]ads.Unit, error)
}
type Cache struct {
logger *zap.Logger
repo Placements
lock sync.RWMutex
units []ads.Unit
unitsByPlacement map[uint][]ads.Unit
}
func NewCache(logger *zap.Logger, repo *Repo) *Cache {
return &Cache{
logger: logger,
repo: repo,
unitsByPlacement: map[uint][]ads.Unit{},
}
}
func (c *Cache) All(ctx context.Context) []ads.Unit {
c.lock.RLock()
defer c.lock.RUnlock()
units := make([]ads.Unit, len(c.units))
copy(units, c.units)
return units
}
func (c *Cache) FindByPlacement(ctx context.Context, placement uint) ([]ads.Unit, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
b, ok := c.unitsByPlacement[placement]
units := make([]ads.Unit, len(b))
copy(units, b)
return b, ok
}
// Start reload banners cache.
func (c *Cache) Start(ctx context.Context) {
c.reload()
ticker := time.NewTicker(time.Minute * 1)
for range ticker.C {
select {
case <-ctx.Done():
return
default:
c.reload()
}
}
}
func (c *Cache) reload() {
units, err := c.repo.All(context.Background())
if err != nil {
c.logger.Error("error on get placements from repo", zap.Error(err))
return
}
c.lock.Lock()
c.units = units
c.unitsByPlacement = map[uint][]ads.Unit{}
for _, unit := range units {
c.unitsByPlacement[unit.PlacementID] = append(c.unitsByPlacement[unit.PlacementID], unit)
}
c.lock.Unlock()
}
package units
import (
"context"
"go.uber.org/zap"
"gorm.io/gorm"
"go.ads.coffee/platform/server/internal/domain/ads"
)
type Repo struct {
logger *zap.Logger
db *gorm.DB
}
func NewRepo(logger *zap.Logger, db *gorm.DB) *Repo {
r := &Repo{
logger: logger,
db: db,
}
return r
}
func (r *Repo) All(ctx context.Context) ([]ads.Unit, error) {
rows := []ads.Unit{}
err := r.db.Model(ads.Unit{}).
Preload("Network").
Where("deleted_at is null and active = true and archived_at is NULL").
Find(&rows).Error
if err != nil {
return nil, err
}
return rows, nil
}
package server
import (
"context"
"fmt"
"net"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
"go.ads.coffee/platform/server/internal/pipeline"
"go.ads.coffee/platform/server/static"
)
type Manager interface {
Mount(router *chi.Mux)
}
type Server struct {
config Config
srv *http.Server
manager Manager
}
func New(config Config, manager *pipeline.Manager) *Server {
return &Server{
config: config,
srv: &http.Server{Addr: config.Port},
manager: manager,
}
}
func (s *Server) Start(ctx context.Context) error {
router := chi.NewRouter()
router.Use(
cors.Handler(cors.Options{
// AllowedOrigins: []string{"https://foo.com"}, // Use this to allow specific origin hosts
// AllowedOrigins: []string{"https://*", "http://*"},
AllowedOrigins: []string{"*"},
// AllowOriginFunc: func(r *http.Request, origin string) bool { return true },
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"},
ExposedHeaders: []string{"Link"},
AllowCredentials: false,
MaxAge: 300, // Maximum value not ignored by any of major browsers
}),
)
s.manager.Mount(router)
http.Handle("/", router)
fs := http.FileServer(http.FS(static.FS))
router.Handle("/static/*", http.StripPrefix("/static/", fs))
ln, err := net.Listen("tcp", s.config.Port)
if err != nil {
return err
}
fmt.Println("Served at http://localhost" + s.config.Port)
go func() {
if err := s.srv.Serve(ln); err != nil {
fmt.Println(err)
}
}()
return nil
}
func (s *Server) Shutdown(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}
package sessions
import (
"crypto/sha256"
"encoding/hex"
"net"
"net/http"
"strings"
"sync"
"time"
)
type Session struct {
Value string
expiry time.Time
}
func (s Session) isExpired() bool {
return s.expiry.Before(time.Now())
}
type Sessions struct {
sessions sync.Map
}
func New() *Sessions {
return &Sessions{}
}
// TODO: в ключе нужно использовать слот
func (s *Sessions) LoadWithExpire(r *http.Request) (Session, bool) {
session, ok := s.LoadWithoutExpire(r)
if session.isExpired() {
return Session{}, false
}
return session, ok
}
func (s *Sessions) LoadWithoutExpire(r *http.Request) (Session, bool) {
token := s.identifier(r)
raw, ok := s.sessions.Load(token)
if !ok {
return Session{}, false
}
session, ok := raw.(Session)
return session, ok
}
func (s *Sessions) Start(r *http.Request, value string) error {
token := s.identifier(r)
expires := time.Now().Add(10 * time.Minute)
s.sessions.Store(token, Session{
Value: value,
expiry: expires,
})
return nil
}
func (s *Sessions) identifier(r *http.Request) string {
agent := r.UserAgent()
ip := forwarded(address(r), r)
data := agent + ip
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:])
}
func address(r *http.Request) string {
addr := r.RemoteAddr
ip, _, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
return ip
}
func forwarded(ip string, r *http.Request) string {
forwardedFor := r.Header.Get("X-Forwarded-For")
if forwardedFor != "" {
ips := strings.Split(forwardedFor, ",")
if len(ips) > 0 {
ip = strings.TrimSpace(ips[0])
}
}
return ip
}
package stages
import "go.ads.coffee/platform/server/internal/domain/plugins"
type Stages struct {
list map[string]plugins.Stage
}
func New(list []plugins.Stage) *Stages {
plugins := map[string]plugins.Stage{}
for _, stage := range list {
plugins[stage.Name()] = stage
}
return &Stages{
list: plugins,
}
}
func (i *Stages) Get(name string, cfg map[string]any) plugins.Stage {
return i.list[name].Copy(cfg)
}
package targetings
import "go.ads.coffee/platform/server/internal/domain/plugins"
type Targetings struct {
list map[string]plugins.Targeting
}
func New(list []plugins.Targeting) *Targetings {
plugins := map[string]plugins.Targeting{}
for _, targeting := range list {
plugins[targeting.Name()] = targeting
}
return &Targetings{
list: plugins,
}
}
func (i *Targetings) Get(name string, cfg map[string]any) plugins.Targeting {
return i.list[name].Copy(cfg)
}
//nolint:errcheck
package filesystem
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path"
"regexp"
"strings"
"github.com/gabriel-vasile/mimetype"
"go.ads.coffee/platform/server/internal/tools/inflector"
"go.ads.coffee/platform/server/internal/tools/security"
)
// FileReader defines an interface for a file resource reader.
type FileReader interface {
Open() (io.ReadSeekCloser, error)
}
// File defines a single file [io.ReadSeekCloser] resource.
//
// The file could be from a local path, multipart/form-data header, etc.
type File struct {
Reader FileReader `form:"-" json:"-" xml:"-"`
Name string `form:"name" json:"name" xml:"name"`
OriginalName string `form:"originalName" json:"originalName" xml:"originalName"`
Size int64 `form:"size" json:"size" xml:"size"`
}
// AsMap implements [core.mapExtractor] and returns a value suitable
// to be used in an API rule expression.
func (f *File) AsMap() map[string]any {
return map[string]any{
"name": f.Name,
"originalName": f.OriginalName,
"size": f.Size,
}
}
// NewFileFromPath creates a new File instance from the provided local file path.
func NewFileFromPath(path string) (*File, error) {
f := &File{}
info, err := os.Stat(path)
if err != nil {
return nil, err
}
f.Reader = &PathReader{Path: path}
f.Size = info.Size()
f.OriginalName = info.Name()
f.Name = normalizeName(f.Reader, f.OriginalName)
return f, nil
}
// NewFileFromBytes creates a new File instance from the provided byte slice.
func NewFileFromBytes(b []byte, name string) (*File, error) {
size := len(b)
if size == 0 {
return nil, errors.New("cannot create an empty file")
}
f := &File{}
f.Reader = &BytesReader{b}
f.Size = int64(size)
f.OriginalName = name
f.Name = normalizeName(f.Reader, f.OriginalName)
return f, nil
}
// NewFileFromMultipart creates a new File from the provided multipart header.
func NewFileFromMultipart(mh *multipart.FileHeader) (*File, error) {
f := &File{}
f.Reader = &MultipartReader{Header: mh}
f.Size = mh.Size
f.OriginalName = mh.Filename
f.Name = normalizeName(f.Reader, f.OriginalName)
return f, nil
}
// NewFileFromURL creates a new File from the provided url by
// downloading the resource and load it as BytesReader.
//
// Example
//
// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel()
//
// file, err := filesystem.NewFileFromURL(ctx, "https://example.com/image.png")
func NewFileFromURL(ctx context.Context, url string) (*File, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode > 399 {
return nil, fmt.Errorf("failed to download url %s (%d)", url, res.StatusCode)
}
var buf bytes.Buffer
if _, err = io.Copy(&buf, res.Body); err != nil {
return nil, err
}
return NewFileFromBytes(buf.Bytes(), path.Base(url))
}
// -------------------------------------------------------------------
var _ FileReader = (*MultipartReader)(nil)
// MultipartReader defines a FileReader from [multipart.FileHeader].
type MultipartReader struct {
Header *multipart.FileHeader
}
// Open implements the [filesystem.FileReader] interface.
func (r *MultipartReader) Open() (io.ReadSeekCloser, error) {
return r.Header.Open()
}
// -------------------------------------------------------------------
var _ FileReader = (*PathReader)(nil)
// PathReader defines a FileReader from a local file path.
type PathReader struct {
Path string
}
// Open implements the [filesystem.FileReader] interface.
func (r *PathReader) Open() (io.ReadSeekCloser, error) {
return os.Open(r.Path)
}
// -------------------------------------------------------------------
var _ FileReader = (*BytesReader)(nil)
// BytesReader defines a FileReader from bytes content.
type BytesReader struct {
Bytes []byte
}
// Open implements the [filesystem.FileReader] interface.
func (r *BytesReader) Open() (io.ReadSeekCloser, error) {
return &bytesReadSeekCloser{bytes.NewReader(r.Bytes)}, nil
}
type bytesReadSeekCloser struct {
*bytes.Reader
}
// Close implements the [io.ReadSeekCloser] interface.
func (r *bytesReadSeekCloser) Close() error {
return nil
}
// -------------------------------------------------------------------
var extInvalidCharsRegex = regexp.MustCompile(`[^\w\.\*\-\+\=\#]+`)
const randomAlphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
func normalizeName(fr FileReader, name string) string {
// extension
// ---
originalExt := extractExtension(name)
cleanExt := extInvalidCharsRegex.ReplaceAllString(originalExt, "")
if cleanExt == "" {
// try to detect the extension from the file content
cleanExt, _ = detectExtension(fr)
}
if extLength := len(cleanExt); extLength > 20 {
// keep only the last 20 characters (it is multibyte safe after the regex replace)
cleanExt = "." + cleanExt[extLength-20:]
}
// name
// ---
cleanName := inflector.Snakecase(strings.TrimSuffix(name, originalExt))
if length := len(cleanName); length < 3 {
// the name is too short so we concatenate an additional random part
cleanName += security.RandomStringWithAlphabet(10, randomAlphabet)
} else if length > 100 {
// keep only the first 100 characters (it is multibyte safe after Snakecase)
cleanName = cleanName[:100]
}
return fmt.Sprintf(
"%s_%s%s",
cleanName,
security.RandomStringWithAlphabet(10, randomAlphabet), // ensure that there is always a random part
cleanExt,
)
}
// extractExtension extracts the extension (with leading dot) from name.
//
// This differ from filepath.Ext() by supporting double extensions (eg. ".tar.gz").
//
// Returns an empty string if no match is found.
//
// Example:
// extractExtension("test.txt") // .txt
// extractExtension("test.tar.gz") // .tar.gz
// extractExtension("test.a.tar.gz") // .tar.gz
func extractExtension(name string) string {
primaryDot := strings.LastIndex(name, ".")
if primaryDot == -1 {
return ""
}
// look for secondary extension
secondaryDot := strings.LastIndex(name[:primaryDot], ".")
if secondaryDot >= 0 {
return name[secondaryDot:]
}
return name[primaryDot:]
}
// detectExtension tries to detect the extension from file mime type.
func detectExtension(fr FileReader) (string, error) {
r, err := fr.Open()
if err != nil {
return "", err
}
defer r.Close()
mt, err := mimetype.DetectReader(r)
if err != nil {
return "", err
}
return mt.Extension(), nil
}
package inflector
import (
"regexp"
"strings"
"unicode"
)
var columnifyRemoveRegex = regexp.MustCompile(`[^\w\.\*\-\_\@\#]+`)
var snakecaseSplitRegex = regexp.MustCompile(`[\W_]+`)
// UcFirst converts the first character of a string into uppercase.
func UcFirst(str string) string {
if str == "" {
return ""
}
s := []rune(str)
return string(unicode.ToUpper(s[0])) + string(s[1:])
}
// Columnify strips invalid db identifier characters.
func Columnify(str string) string {
return columnifyRemoveRegex.ReplaceAllString(str, "")
}
// Sentenize converts and normalizes string into a sentence.
func Sentenize(str string) string {
str = strings.TrimSpace(str)
if str == "" {
return ""
}
str = UcFirst(str)
lastChar := str[len(str)-1:]
if lastChar != "." && lastChar != "?" && lastChar != "!" {
return str + "."
}
return str
}
// Sanitize sanitizes `str` by removing all characters satisfying `removePattern`.
// Returns an error if the pattern is not valid regex string.
func Sanitize(str string, removePattern string) (string, error) {
exp, err := regexp.Compile(removePattern)
if err != nil {
return "", err
}
return exp.ReplaceAllString(str, ""), nil
}
// Snakecase removes all non word characters and converts any english text into a snakecase.
// "ABBREVIATIONS" are preserved, eg. "myTestDB" will become "my_test_db".
func Snakecase(str string) string {
var result strings.Builder
// split at any non word character and underscore
words := snakecaseSplitRegex.Split(str, -1)
for _, word := range words {
if word == "" {
continue
}
if result.Len() > 0 {
result.WriteString("_")
}
for i, c := range word {
if unicode.IsUpper(c) && i > 0 &&
// is not a following uppercase character
!unicode.IsUpper(rune(word[i-1])) {
result.WriteString("_")
}
result.WriteRune(c)
}
}
return strings.ToLower(result.String())
}
package security
import (
"crypto/hmac"
"crypto/md5"
"crypto/sha256"
"crypto/sha512"
"crypto/subtle"
"encoding/base64"
"fmt"
"strings"
)
// S256Challenge creates base64 encoded sha256 challenge string derived from code.
// The padding of the result base64 string is stripped per [RFC 7636].
//
// [RFC 7636]: https://datatracker.ietf.org/doc/html/rfc7636#section-4.2
func S256Challenge(code string) string {
h := sha256.New()
h.Write([]byte(code))
return strings.TrimRight(base64.URLEncoding.EncodeToString(h.Sum(nil)), "=")
}
// MD5 creates md5 hash from the provided plain text.
func MD5(text string) string {
h := md5.New()
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
// SHA256 creates sha256 hash as defined in FIPS 180-4 from the provided text.
func SHA256(text string) string {
h := sha256.New()
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
// SHA512 creates sha512 hash as defined in FIPS 180-4 from the provided text.
func SHA512(text string) string {
h := sha512.New()
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
// HS256 creates a HMAC hash with sha256 digest algorithm.
func HS256(text string, secret string) string {
h := hmac.New(sha256.New, []byte(secret))
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
// HS512 creates a HMAC hash with sha512 digest algorithm.
func HS512(text string, secret string) string {
h := hmac.New(sha512.New, []byte(secret))
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
// Equal compares two hash strings for equality without leaking timing information.
func Equal(hash1 string, hash2 string) bool {
return subtle.ConstantTimeCompare([]byte(hash1), []byte(hash2)) == 1
}
package security
import (
"crypto/aes"
"crypto/cipher"
crand "crypto/rand"
"encoding/base64"
"io"
)
// Encrypt encrypts "data" with the specified "key" (must be valid 32 char AES key).
//
// This method uses AES-256-GCM block cypher mode.
func Encrypt(data []byte, key string) (string, error) {
block, err := aes.NewCipher([]byte(key))
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonce := make([]byte, gcm.NonceSize())
// populates the nonce with a cryptographically secure random sequence
if _, err := io.ReadFull(crand.Reader, nonce); err != nil {
return "", err
}
cipherByte := gcm.Seal(nonce, nonce, data, nil)
result := base64.StdEncoding.EncodeToString(cipherByte)
return result, nil
}
// Decrypt decrypts encrypted text with key (must be valid 32 chars AES key).
//
// This method uses AES-256-GCM block cypher mode.
func Decrypt(cipherText string, key string) ([]byte, error) {
block, err := aes.NewCipher([]byte(key))
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonceSize := gcm.NonceSize()
cipherByte, err := base64.StdEncoding.DecodeString(cipherText)
if err != nil {
return nil, err
}
nonce, cipherByteClean := cipherByte[:nonceSize], cipherByte[nonceSize:]
return gcm.Open(nil, nonce, cipherByteClean, nil)
}
package security
import (
"errors"
"time"
// @todo update to v5
"github.com/golang-jwt/jwt/v4"
)
// ParseUnverifiedJWT parses JWT and returns its claims
// but DOES NOT verify the signature.
//
// It verifies only the exp, iat and nbf claims.
func ParseUnverifiedJWT(token string) (jwt.MapClaims, error) {
claims := jwt.MapClaims{}
parser := &jwt.Parser{}
_, _, err := parser.ParseUnverified(token, claims)
if err == nil {
err = claims.Valid()
}
return claims, err
}
// ParseJWT verifies and parses JWT and returns its claims.
func ParseJWT(token string, verificationKey string) (jwt.MapClaims, error) {
parser := jwt.NewParser(jwt.WithValidMethods([]string{"HS256"}))
parsedToken, err := parser.Parse(token, func(t *jwt.Token) (any, error) {
return []byte(verificationKey), nil
})
if err != nil {
return nil, err
}
if claims, ok := parsedToken.Claims.(jwt.MapClaims); ok && parsedToken.Valid {
return claims, nil
}
return nil, errors.New("unable to parse token")
}
// NewJWT generates and returns new HS256 signed JWT.
func NewJWT(payload jwt.MapClaims, signingKey string, duration time.Duration) (string, error) {
claims := jwt.MapClaims{
"exp": time.Now().Add(duration).Unix(),
}
for k, v := range payload {
claims[k] = v
}
return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString([]byte(signingKey))
}
package security
import (
cryptoRand "crypto/rand"
"math/big"
mathRand "math/rand" // @todo replace with rand/v2?
)
const defaultRandomAlphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
// RandomString generates a cryptographically random string with the specified length.
//
// The generated string matches [A-Za-z0-9]+ and it's transparent to URL-encoding.
func RandomString(length int) string {
return RandomStringWithAlphabet(length, defaultRandomAlphabet)
}
// RandomStringWithAlphabet generates a cryptographically random string
// with the specified length and characters set.
//
// It panics if for some reason rand.Int returns a non-nil error.
func RandomStringWithAlphabet(length int, alphabet string) string {
b := make([]byte, length)
max := big.NewInt(int64(len(alphabet)))
for i := range b {
n, err := cryptoRand.Int(cryptoRand.Reader, max)
if err != nil {
panic(err)
}
b[i] = alphabet[n.Int64()]
}
return string(b)
}
// PseudorandomString generates a pseudorandom string with the specified length.
//
// The generated string matches [A-Za-z0-9]+ and it's transparent to URL-encoding.
//
// For a cryptographically random string (but a little bit slower) use RandomString instead.
func PseudorandomString(length int) string {
return PseudorandomStringWithAlphabet(length, defaultRandomAlphabet)
}
// PseudorandomStringWithAlphabet generates a pseudorandom string
// with the specified length and characters set.
//
// For a cryptographically random (but a little bit slower) use RandomStringWithAlphabet instead.
func PseudorandomStringWithAlphabet(length int, alphabet string) string {
b := make([]byte, length)
max := len(alphabet)
for i := range b {
b[i] = alphabet[mathRand.Intn(max)]
}
return string(b)
}
package security
import (
cryptoRand "crypto/rand"
"fmt"
"math/big"
"regexp/syntax"
"strings"
)
const defaultMaxRepeat = 6
var anyCharNotNLPairs = []rune{'A', 'Z', 'a', 'z', '0', '9'}
// RandomStringByRegex generates a random string matching the regex pattern.
// If optFlags is not set, fallbacks to [syntax.Perl].
//
// NB! While the source of the randomness comes from [crypto/rand] this method
// is not recommended to be used on its own in critical secure contexts because
// the generated length could vary too much on the used pattern and may not be
// as secure as simply calling [security.RandomString].
// If you still insist on using it for such purposes, consider at least
// a large enough minimum length for the generated string, e.g. `[a-z0-9]{30}`.
//
// This function is inspired by github.com/pipe01/revregexp, github.com/lucasjones/reggen and other similar packages.
func RandomStringByRegex(pattern string, optFlags ...syntax.Flags) (string, error) {
var flags syntax.Flags
if len(optFlags) == 0 {
flags = syntax.Perl
} else {
for _, f := range optFlags {
flags |= f
}
}
r, err := syntax.Parse(pattern, flags)
if err != nil {
return "", err
}
var sb = new(strings.Builder)
err = writeRandomStringByRegex(r, sb)
if err != nil {
return "", err
}
return sb.String(), nil
}
func writeRandomStringByRegex(r *syntax.Regexp, sb *strings.Builder) error {
// https://pkg.go.dev/regexp/syntax#Op
switch r.Op {
case syntax.OpCharClass:
c, err := randomRuneFromPairs(r.Rune)
if err != nil {
return err
}
_, err = sb.WriteRune(c)
return err
case syntax.OpAnyChar, syntax.OpAnyCharNotNL:
c, err := randomRuneFromPairs(anyCharNotNLPairs)
if err != nil {
return err
}
_, err = sb.WriteRune(c)
return err
case syntax.OpAlternate:
idx, err := randomNumber(len(r.Sub))
if err != nil {
return err
}
return writeRandomStringByRegex(r.Sub[idx], sb)
case syntax.OpConcat:
var err error
for _, sub := range r.Sub {
err = writeRandomStringByRegex(sub, sb)
if err != nil {
break
}
}
return err
case syntax.OpRepeat:
return repeatRandomStringByRegex(r.Sub[0], sb, r.Min, r.Max)
case syntax.OpQuest:
return repeatRandomStringByRegex(r.Sub[0], sb, 0, 1)
case syntax.OpPlus:
return repeatRandomStringByRegex(r.Sub[0], sb, 1, -1)
case syntax.OpStar:
return repeatRandomStringByRegex(r.Sub[0], sb, 0, -1)
case syntax.OpCapture:
return writeRandomStringByRegex(r.Sub[0], sb)
case syntax.OpLiteral:
_, err := sb.WriteString(string(r.Rune))
return err
default:
return fmt.Errorf("unsupported pattern operator %d", r.Op)
}
}
func repeatRandomStringByRegex(r *syntax.Regexp, sb *strings.Builder, min int, max int) error {
if max < 0 {
max = defaultMaxRepeat
}
if max < min {
max = min
}
n := min
if max != min {
randRange, err := randomNumber(max - min)
if err != nil {
return err
}
n += randRange
}
var err error
for i := 0; i < n; i++ {
err = writeRandomStringByRegex(r, sb)
if err != nil {
return err
}
}
return nil
}
func randomRuneFromPairs(pairs []rune) (rune, error) {
idx, err := randomNumber(len(pairs) / 2)
if err != nil {
return 0, err
}
return randomRuneFromRange(pairs[idx*2], pairs[idx*2+1])
}
func randomRuneFromRange(min rune, max rune) (rune, error) {
offset, err := randomNumber(int(max - min + 1))
if err != nil {
return min, err
}
return min + rune(offset), nil
}
func randomNumber(maxSoft int) (int, error) {
randRange, err := cryptoRand.Int(cryptoRand.Reader, big.NewInt(int64(maxSoft)))
return int(randRange.Int64()), err
}
package postback
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"inputs.postback",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Input)),
fx.ResultTags(`group:"inputs"`),
),
),
)
type Postback struct {
}
func New() *Postback {
return &Postback{}
}
func (s *Postback) Name() string {
return "inputs.postback"
}
func (s *Postback) Copy(cfg map[string]any) plugins.Input {
return &Postback{}
}
func (stages *Postback) Do(ctx context.Context, state *plugins.State) bool {
// нужно получить данные пользователя из запроса
state.User = &plugins.User{}
state.Device = &plugins.Device{}
return true
}
package rtb
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"inputs.rtb",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Input)),
fx.ResultTags(`group:"inputs"`),
),
),
)
type Rtb struct {
}
func New() *Rtb {
return &Rtb{}
}
func (rtb *Rtb) Name() string {
return "inputs.rtb"
}
func (rtb *Rtb) Copy(cfg map[string]any) plugins.Input {
return &Rtb{}
}
func (rtb *Rtb) Do(ctx context.Context, state *plugins.State) bool {
// обработка разных типов запросов тоже
// может быть вынесена в пллагины
// тут я могу понять какие форматы мне нужны
// разбираем rtb запрос
return true
}
package static
import (
"context"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/internal/repos/banners"
"go.ads.coffee/platform/server/internal/repos/placements"
"go.ads.coffee/platform/server/internal/repos/units"
"go.ads.coffee/platform/server/internal/sessions"
)
const (
actionClick = "click"
actionKey = "action"
)
var Module = fx.Module(
"inputs.static",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Input)),
fx.ResultTags(`group:"inputs"`),
),
),
)
type Analytics interface {
LogClick(ctx context.Context, data ads.TrackerInfo) error
}
type Banners interface {
One(ctx context.Context, id uint) (ads.Banner, bool)
}
type Session interface {
LoadWithExpire(r *http.Request) (sessions.Session, bool)
}
type Placements interface {
One(ctx context.Context, id uint) (ads.Placement, bool)
}
type Units interface {
FindByPlacement(ctx context.Context, id uint) ([]ads.Unit, bool)
}
type Static struct {
logger *zap.Logger
banners Banners
sessions Session
analytics Analytics
placements Placements
units Units
}
func New(
logger *zap.Logger,
cache *banners.Cache,
sessions *sessions.Sessions,
analytics *analytics.Analytics,
placements *placements.Cache,
units *units.Cache,
) *Static {
return &Static{
logger: logger,
banners: cache,
sessions: sessions,
analytics: analytics,
placements: placements,
units: units,
}
}
func (s *Static) Name() string {
return "inputs.static"
}
func (s *Static) Copy(cfg map[string]any) plugins.Input {
return &Static{
banners: s.banners,
logger: s.logger,
sessions: s.sessions,
analytics: s.analytics,
placements: s.placements,
units: s.units,
}
}
func (s *Static) Do(ctx context.Context, state *plugins.State) bool {
action := chi.URLParam(state.Request, "action")
state.WithValue(actionKey, action)
// нужно получить данные пользователя из запроса
state.User = &plugins.User{}
state.Device = &plugins.Device{}
// проверить наличие placement
id, _ := strconv.Atoi(chi.URLParam(state.Request, "placement"))
placement, exit := s.placements.One(ctx, uint(id))
if !exit {
return false
}
state.Placement = placement
units, exit := s.units.FindByPlacement(ctx, placement.ID)
if exit {
state.Units = units
}
// проверяем есть ли в сессии баннер для экшена click
// если баннер в сессии, то редиректим на трекер url
if action == actionClick {
session, ok := s.sessions.LoadWithExpire(state.Request)
if !ok {
s.logger.Warn("error on load banner from cache")
state.Response.WriteHeader(http.StatusNotFound)
return false
}
id, _ := strconv.Atoi(session.Value)
banner, ok := s.banners.One(ctx, uint(id))
if !ok {
s.logger.Warn("error on load banner from cache")
state.Response.WriteHeader(http.StatusNotFound)
return false
}
if err := s.analytics.LogClick(ctx, ads.TrackerInfo{}); err != nil {
s.logger.Error("error on log click", zap.Error(err))
}
http.Redirect(state.Response, state.Request, banner.Target, http.StatusSeeOther)
return false
}
return true
}
package tracker
import (
"context"
"encoding/base64"
"encoding/json"
"time"
"github.com/go-chi/chi/v5"
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"inputs.tracker",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Input)),
fx.ResultTags(`group:"inputs"`),
),
),
)
type Analytics interface {
LogImpression(ctx context.Context, data ads.TrackerInfo) error
LogClick(ctx context.Context, data ads.TrackerInfo) error
}
type Tracker struct {
logger *zap.Logger
analytics Analytics
}
func New(logger *zap.Logger, analytics *analytics.Analytics) *Tracker {
return &Tracker{
logger: logger,
analytics: analytics,
}
}
func (s *Tracker) Name() string {
return "inputs.tracker"
}
func (s *Tracker) Copy(cfg map[string]any) plugins.Input {
return &Tracker{
logger: s.logger,
analytics: s.analytics,
}
}
func (s *Tracker) Do(ctx context.Context, state *plugins.State) bool {
raw, err := base64.URLEncoding.DecodeString(chi.URLParam(state.Request, "data"))
if err != nil {
return false
}
info := ads.TrackerInfo{} // TODO: total use ads.Event?
if err := json.Unmarshal(raw, &info); err != nil {
return false
}
info.Timestamp = time.Now().Unix()
switch info.Action {
case ads.ActionImpression:
if err := s.analytics.LogImpression(ctx, info); err != nil {
s.logger.Error("failed to log impression", zap.Error(err))
return false
}
case ads.ActionClick:
if err := s.analytics.LogClick(ctx, info); err != nil {
s.logger.Error("failed to log click", zap.Error(err))
return false
}
default: // uncnown or emnty action
return false
}
return true
}
package web
import (
"context"
"strconv"
"github.com/go-chi/chi/v5"
"go.uber.org/fx"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/internal/repos/placements"
"go.ads.coffee/platform/server/internal/repos/units"
)
var Module = fx.Module(
"inputs.web",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Input)),
fx.ResultTags(`group:"inputs"`),
),
),
)
type Analytics interface {
LogRequest(ctx context.Context, state *plugins.State) error
}
type Placements interface {
One(ctx context.Context, id uint) (ads.Placement, bool)
}
type Units interface {
FindByPlacement(ctx context.Context, id uint) ([]ads.Unit, bool)
}
type Web struct {
logger *zap.Logger
analytics Analytics
placements Placements
units Units
}
func New(
logger *zap.Logger,
analytics *analytics.Analytics,
placements *placements.Cache,
units *units.Cache,
) *Web {
return &Web{
logger: logger,
analytics: analytics,
placements: placements,
units: units,
}
}
func (w *Web) Name() string {
return "inputs.web"
}
func (w *Web) Copy(cfg map[string]any) plugins.Input {
return &Web{
logger: w.logger,
analytics: w.analytics,
placements: w.placements,
units: w.units,
}
}
func (w *Web) Do(ctx context.Context, state *plugins.State) bool {
// нужно получить данные пользователя из запроса
state.User = &plugins.User{}
state.Device = &plugins.Device{}
// проверить наличие placement
id, _ := strconv.Atoi(chi.URLParam(state.Request, "placement"))
placement, exit := w.placements.One(ctx, uint(id))
if !exit {
return false
}
state.Placement = placement
units, exit := w.units.FindByPlacement(ctx, placement.ID)
if exit {
state.Units = units
}
// check error
if err := w.analytics.LogRequest(ctx, state); err != nil {
w.logger.Error("error on log request", zap.Error(err))
}
return true
}
package empty
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"outputs.empty",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Output)),
fx.ResultTags(`group:"outputs"`),
),
),
)
type Empty struct {
}
func New() *Empty {
return &Empty{}
}
func (r *Empty) Name() string {
return "outputs.empty"
}
func (r *Empty) Copy(cfg map[string]any) plugins.Output {
return &Empty{}
}
func (rtb *Empty) Do(ctx context.Context, state *plugins.State) error {
return nil
}
package pixel
import (
"context"
"image"
"image/color"
"image/gif"
"net/http"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"outputs.pixel",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Output)),
fx.ResultTags(`group:"outputs"`),
),
),
)
type Pixel struct {
}
func New() *Pixel {
return &Pixel{}
}
func (r *Pixel) Name() string {
return "outputs.pixel"
}
func (r *Pixel) Copy(cfg map[string]any) plugins.Output {
return &Pixel{}
}
func (rtb *Pixel) Do(ctx context.Context, state *plugins.State) error {
img := image.NewRGBA(image.Rect(0, 0, 1, 1))
img.Set(0, 0, color.RGBA{255, 0, 0, 255})
if err := gif.Encode(state.Response, img, nil); err != nil {
state.Response.WriteHeader(http.StatusInternalServerError)
return err
}
state.Response.Header().Add("Content-Type", "image/gif")
state.Response.WriteHeader(http.StatusOK)
return nil
}
package rtb
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"outputs.rtb",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Output)),
fx.ResultTags(`group:"outputs"`),
),
),
)
type Rtb struct {
}
func New() *Rtb {
return &Rtb{}
}
func (r *Rtb) Name() string {
return "outputs.rtb"
}
func (r *Rtb) Copy(cfg map[string]any) plugins.Output {
return &Rtb{}
}
func (rtb *Rtb) Do(ctx context.Context, state *plugins.State) error {
return nil
}
// nolint:errcheck
package formats
import (
"bytes"
"context"
"fmt"
"image"
"image/color"
"image/draw"
"image/jpeg"
"image/png"
"io"
"net/http"
"strings"
"github.com/golang/freetype"
"github.com/nfnt/resize"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/tools/filesystem"
"golang.org/x/image/font"
"golang.org/x/image/font/gofont/goregular"
)
type Banner struct {
}
func NewBanner() *Banner {
return &Banner{}
}
func (b *Banner) Banner(ctx context.Context, base string, banner ads.Banner, w http.ResponseWriter) error {
image, err := filesystem.NewFileFromURL(ctx, banner.Media("image"))
if err != nil {
return err
}
buffer, err := image.Reader.Open()
if err != nil {
return err
}
defer buffer.Close()
data, format, err := b.Render(buffer, banner.Description, banner.Title)
if err != nil {
return err
}
w.Header().Set("Content-Length", fmt.Sprint(data.Len()))
w.Header().Set("Content-Type", "image/"+format)
if _, err = io.Copy(w, data); err != nil {
return err
}
return nil
}
func (b *Banner) Render(file io.Reader, description, info string) (*bytes.Buffer, string, error) {
// Параметры баннера (все размеры увеличены вдвое)
width := 820 // было 410
img := image.NewRGBA(image.Rect(0, 0, width, 360)) // было 180
// Заливаем фон
draw.Draw(img, img.Bounds(), &image.Uniform{color.RGBA{248, 249, 250, 255}}, image.Point{}, draw.Src)
// Загружаем шрифт
f, err := freetype.ParseFont(goregular.TTF)
if err != nil {
panic(err)
}
// Контекст для текста (увеличиваем DPI для сохранения четкости)
c := freetype.NewContext()
c.SetDPI(72) // было 72
c.SetFont(f)
c.SetClip(img.Bounds())
c.SetDst(img)
c.SetSrc(image.Black)
c.SetHinting(font.HintingNone)
// Декодируем изображение
data, format, err := image.Decode(file)
if err != nil {
return nil, "", err
}
// Масштабируем изображение до высоты 180 пикселей
data = resize.Resize(0, 180, data, resize.Lanczos3)
imgX, imgY := 30, 30 // было 15,15
draw.Draw(img,
image.Rect(imgX, imgY, imgX+data.Bounds().Dx(), imgY+data.Bounds().Dy()),
data,
image.Point{},
draw.Src)
// Разбиваем описание на строки
lines := wrap(description, (width-imgX-data.Bounds().Dy()-30)/10)
// Вычисляем общую высоту текста (межстрочный интервал увеличен вдвое)
textHeight := len(lines) * 40 // было 20
if textHeight < 1 {
textHeight = 40
}
// Вычисляем стартовую позицию текста для центрирования
textX := imgX + data.Bounds().Dx() + 30 // было +15
textStartY := imgY + (data.Bounds().Dy()-textHeight)/2
if textStartY < imgY {
textStartY = imgY
}
// Рисуем основной текст с центрированием (размер шрифта увеличен вдвое)
c.SetFontSize(30) // было 15
for i, line := range lines {
pt := freetype.Pt(textX, textStartY+40*i+30) // было 20*i+15
c.DrawString(line, pt)
}
// Рисуем разделительную линию (толщина линии увеличена)
markerY := imgY + data.Bounds().Dy() + 40 // было +20
drawLine(img, 30, markerY-20, width-30, markerY-20, color.RGBA{220, 220, 220, 255}) // было 15,markerY-10
// Информация о рекламодателе (размер шрифта увеличен вдвое)
c.SetFontSize(20) // было 10
c.SetSrc(image.NewUniform(color.RGBA{150, 150, 150, 255}))
infoLines := wrap(info, 90)
for _, line := range infoLines {
markerY += 30 // было 15
pt := freetype.Pt(30, markerY) // было 15
c.DrawString(line, pt)
}
// Сохраняем результат в том же формате, что и исходное изображение
result := bytes.NewBuffer([]byte{})
switch format {
case "jpeg":
err = jpeg.Encode(result, img, &jpeg.Options{Quality: 90}) // Качество JPEG
case "png":
err = png.Encode(result, img)
// case "webp":
// err = webp.Encode(outFile, rgba, &webp.Options{Lossless: true}) // Без потерь для WebP
default:
return nil, "", fmt.Errorf("undefined format: %s", format)
}
if err != nil {
return nil, "", err
}
return result, format, nil
}
// Функция для разбивки текста на строки
func wrap(text string, lineLength int) []string {
words := strings.Fields(text)
if len(words) == 0 {
return nil
}
var lines []string
currentLine := words[0]
for _, word := range words[1:] {
if len(currentLine)+1+len(word) <= lineLength {
currentLine += " " + word
} else {
lines = append(lines, currentLine)
currentLine = word
}
}
lines = append(lines, currentLine)
return lines
}
// Функция для рисования линии
func drawLine(img *image.RGBA, x1, y1, x2, y2 int, col color.Color) {
dx, dy := x2-x1, y2-y1
steps := max(abs(dx), abs(dy))
for i := 0; i <= steps; i++ {
x := x1 + (dx * i / steps)
y := y1 + (dy * i / steps)
img.Set(x, y, col)
}
}
func abs(x int) int {
if x < 0 {
return -x
}
return x
}
package static
import (
"context"
"fmt"
"net/http"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/internal/sessions"
"go.ads.coffee/platform/server/plugins/outputs/static/formats"
"go.uber.org/fx"
"go.uber.org/zap"
)
const (
baseUrlKey = "base"
actionImg = "img"
actionKey = "action"
)
var Module = fx.Module(
"outputs.static",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Output)),
fx.ResultTags(`group:"outputs"`),
),
formats.NewBanner,
),
)
type Analytics interface {
LogImpression(ctx context.Context, data ads.TrackerInfo) error
}
type Banner interface {
Banner(ctx context.Context, base string, banner ads.Banner, w http.ResponseWriter) error
}
type Session interface {
Start(r *http.Request, value string) error
}
type Static struct {
base string
logger *zap.Logger
sessions Session
analytics Analytics
format Banner
}
func New(
logger *zap.Logger,
format *formats.Banner,
sessions *sessions.Sessions,
analytics *analytics.Analytics,
) *Static {
return &Static{
format: format,
sessions: sessions,
analytics: analytics,
}
}
func (w *Static) Name() string {
return "outputs.static"
}
func (w *Static) Copy(cfg map[string]any) plugins.Output {
base := ""
if cfg != nil {
base = cfg[baseUrlKey].(string)
}
return &Static{
base: base,
logger: w.logger,
format: w.format,
sessions: w.sessions,
analytics: w.analytics,
}
}
func (w *Static) Do(ctx context.Context, state *plugins.State) error {
action := state.Value(actionKey).(string)
// сюда мы попадаем только для экшена img
if action != actionImg {
state.Response.WriteHeader(http.StatusNotFound)
return nil
}
if len(state.Winners) == 0 {
state.Response.WriteHeader(http.StatusNotFound)
return nil
}
banner := state.Winners[0]
if err := w.sessions.Start(state.Request, fmt.Sprintf("%d", banner.ID)); err != nil {
return fmt.Errorf("error on start session: %w", err)
}
if err := w.analytics.LogImpression(ctx, ads.TrackerInfo{}); err != nil {
w.logger.Error("error on log impression", zap.Error(err))
}
return w.format.Banner(ctx, w.base, banner, state.Response)
}
package formats
import (
"context"
"encoding/base64"
"encoding/json"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
const TypeNative = "native"
type Native struct {
base string
}
func NewNative() *Native {
return &Native{}
}
func (b *Native) Name() string {
return "native"
}
type NativeResponse struct {
Description string `json:"description"`
Title string `json:"information"`
Image string `json:"image"`
Target string `json:"target"`
Impressions []string `json:"impressions"`
Clicks []string `json:"clicks"`
Data string `json:"data,omitempty"`
Network string `json:"network"`
}
func (f *Native) Copy(cfg map[string]any) plugins.Format {
base, _ := cfg["base"].(string)
return &Native{
base: base,
}
}
func (f *Native) Render(ctx context.Context, state *plugins.State) (any, error) {
items := []NativeResponse{}
for _, b := range state.Winners {
click, err := f.tracker(b, state, ads.ActionClick)
if err != nil {
return nil, err
}
clicktrackers := []string{
click,
}
if b.Clicktracker != "" {
clicktrackers = append(clicktrackers, b.Clicktracker)
}
impression, err := f.tracker(b, state, ads.ActionImpression)
if err != nil {
return nil, err
}
impressiontrackers := []string{
impression,
}
if b.Imptracker != "" {
impressiontrackers = append(impressiontrackers, b.Imptracker)
}
items = append(items, NativeResponse{
Title: b.Title,
Description: b.Description,
Target: b.Target,
Image: b.Media("250x250"),
Data: b.Data,
Network: b.Network,
Impressions: impressiontrackers,
Clicks: clicktrackers,
})
}
return items, nil
}
func (f *Native) tracker(w ads.Banner, state *plugins.State, action string) (string, error) {
info := ads.TrackerInfo{
Action: action,
BannerID: w.ID,
GroupID: w.GroupID,
CampaignID: w.CampaignID,
AdvertiserID: w.AdvertiserID,
ClickID: state.ClickID,
RequestID: state.RequestID,
}
data, err := json.Marshal(info)
if err != nil {
return "", err
}
return f.base + "/tracker/" + base64.URLEncoding.EncodeToString(data) + ".gif", nil
}
package web
import (
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/plugins/outputs/web/formats"
)
var Module = fx.Module(
"outputs.web",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Output)),
fx.ResultTags(`group:"outputs"`),
fx.ParamTags(`group:"outputs.web.formats"`),
),
formats.NewNative,
fx.Annotate(
func(n *formats.Native) plugins.Format {
return n
},
fx.As(new(plugins.Format)),
fx.ResultTags(`group:"outputs.web.formats"`),
),
),
)
package web
import (
"context"
"encoding/json"
"fmt"
"go.uber.org/zap"
"go.ads.coffee/platform/server/internal/analytics"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
type Analytics interface {
LogResponse(ctx context.Context, w ads.Banner, state *plugins.State) error
}
type Web struct {
format string
formats map[string]plugins.Format
analytics Analytics
logger *zap.Logger
}
func New(ff []plugins.Format, logger *zap.Logger, analytics *analytics.Analytics) *Web {
formats := map[string]plugins.Format{}
for _, f := range ff {
formats[f.Name()] = f
}
return &Web{
analytics: analytics,
formats: formats,
logger: logger,
}
}
func (w *Web) Name() string {
return "outputs.web"
}
func (w *Web) Copy(cfg map[string]any) plugins.Output {
format := "native" // default format
if cfg != nil {
format = cfg["format"].(string)
}
dest := make(map[string]plugins.Format, len(w.formats))
for _, f := range w.formats {
dest[f.Name()] = f.Copy(cfg)
}
return &Web{
analytics: w.analytics,
format: format,
formats: dest,
logger: w.logger,
}
}
//nolint:errcheck
func (w *Web) Do(ctx context.Context, state *plugins.State) error {
format, ok := w.formats[w.format]
if !ok {
return fmt.Errorf("format %s not found", w.format)
}
result, err := format.Render(ctx, state)
if err != nil {
return fmt.Errorf("error on render format: %w", err)
}
data, err := json.Marshal(result)
if err != nil {
return fmt.Errorf("error on render format: %w", err)
}
_, err = state.Response.Write(data)
if len(state.Winners) > 0 {
if err := w.analytics.LogResponse(ctx, state.Winners[0], state); err != nil {
w.logger.Error("error on log response", zap.Error(err))
}
}
return err
}
package banners
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
"go.ads.coffee/platform/server/internal/repos/banners"
)
var Module = fx.Module(
"stages.banners",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Stage)),
fx.ResultTags(`group:"stages"`),
),
),
)
type BannersCache interface {
All(ctx context.Context) []ads.Banner
}
type Banners struct {
cache BannersCache
}
func New(cache *banners.Cache) *Banners {
return &Banners{
cache: cache,
}
}
func (b *Banners) Name() string {
return "stages.banners"
}
func (b *Banners) Copy(cfg map[string]any) plugins.Stage {
return &Banners{
cache: b.cache,
}
}
func (b *Banners) Do(ctx context.Context, state *plugins.State) error {
state.Candidates = b.cache.All(ctx)
return nil
}
package limits
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"stages.limits",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Stage)),
fx.ResultTags(`group:"stages"`),
),
),
)
type Limits struct{}
func New() *Limits {
return &Limits{}
}
func (l *Limits) Name() string {
return "stages.limits"
}
func (l *Limits) Copy(cfg map[string]any) plugins.Stage {
return &Limits{}
}
func (l *Limits) Do(ctx context.Context, state *plugins.State) error {
// срабатывают ограничения по показам, капингам и так далее
state.Candidates = state.Candidates[:]
return nil
}
package mediation
import (
"context"
"fmt"
"github.com/mroth/weightedrand/v2"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"stages.mediation",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Stage)),
fx.ResultTags(`group:"stages"`),
),
),
)
type Mediation struct{}
func New() *Mediation {
return &Mediation{}
}
func (t *Mediation) Name() string {
return "stages.mediation"
}
func (t *Mediation) Copy(cfg map[string]any) plugins.Stage {
return &Mediation{}
}
func (t *Mediation) Do(ctx context.Context, state *plugins.State) error {
winners := state.Winners
for _, u := range state.Units {
winners = append(winners, ads.Banner{
ID: u.ID,
Title: u.Name, // TODO: title or name?
Price: u.Price,
Type: ads.CreativeTypeMediator,
Data: u.Data,
Network: u.Network.Name,
})
}
if len(winners) == 1 {
state.Winners = winners
return nil
}
winner, _, err := t.rotate(winners)
if err != nil {
return fmt.Errorf("error on rotate: %w", err)
}
state.Winners = []ads.Banner{winner}
return nil
}
func (t *Mediation) rotate(candidates []ads.Banner) (ads.Banner, bool, error) {
choices := []weightedrand.Choice[ads.Banner, int]{}
for _, candidate := range candidates {
choices = append(choices, weightedrand.NewChoice(candidate, candidate.Price))
}
if len(choices) == 0 {
return ads.Banner{}, false, nil
}
chooser, err := weightedrand.NewChooser(choices...)
if err != nil {
return ads.Banner{}, false, fmt.Errorf("error on chooser: %w", err)
}
return chooser.Pick(), true, nil
}
package rotation
import (
"context"
"fmt"
"github.com/mroth/weightedrand/v2"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/ads"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"stages.rotation",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Stage)),
fx.ResultTags(`group:"stages"`),
),
),
)
type Rotattion struct{}
func New() *Rotattion {
return &Rotattion{}
}
func (r *Rotattion) Name() string {
return "stages.rotation"
}
func (r *Rotattion) Copy(cfg map[string]any) plugins.Stage {
return &Rotattion{}
}
func (r *Rotattion) Do(ctx context.Context, state *plugins.State) error {
winners, ok, err := r.rotate(state.Candidates)
if err != nil {
return err
}
if !ok {
return nil
}
state.Winners = []ads.Banner{winners}
return nil
}
func (r *Rotattion) rotate(candidates []ads.Banner) (ads.Banner, bool, error) {
choices := []weightedrand.Choice[ads.Banner, int]{}
for _, candidate := range candidates {
choices = append(choices, weightedrand.NewChoice(candidate, candidate.Price))
}
if len(choices) == 0 {
return ads.Banner{}, false, nil
}
chooser, err := weightedrand.NewChooser(choices...)
if err != nil {
return ads.Banner{}, false, fmt.Errorf("error on chooser: %w", err)
}
return chooser.Pick(), true, nil
}
package targeting
import (
"context"
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"stages.targeting",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Stage)),
fx.ResultTags(`group:"stages"`),
),
),
)
type Targeting struct{}
func New() *Targeting {
return &Targeting{}
}
func (t *Targeting) Name() string {
return "stages.targeting"
}
func (t *Targeting) Copy(cfg map[string]any) plugins.Stage {
return &Targeting{}
}
func (t *Targeting) Targetings(tt []plugins.Targeting) {
// set targetings
}
func (t *Targeting) Do(ctx context.Context, state *plugins.State) error {
// обрабатываются таргетинги
state.Candidates = state.Candidates[:]
return nil
}
package apps
import (
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"targetings.apps",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Targeting)),
fx.ResultTags(`group:"targetings"`),
),
),
)
type Apps struct{}
func New() *Apps {
return &Apps{}
}
func (a *Apps) Name() string {
return "targetings.apps"
}
func (a *Apps) Copy(cfg map[string]any) plugins.Targeting {
return &Apps{}
}
func (a *Apps) Filter() {}
package geo
import (
"go.uber.org/fx"
"go.ads.coffee/platform/server/internal/domain/plugins"
)
var Module = fx.Module(
"targetings.geo",
fx.Provide(
fx.Annotate(
New,
fx.As(new(plugins.Targeting)),
fx.ResultTags(`group:"targetings"`),
),
),
)
type Geo struct {
}
func New() *Geo {
return &Geo{}
}
func (g *Geo) Name() string {
return "targetings.geo"
}
func (g *Geo) Copy(cfg map[string]any) plugins.Targeting {
return &Geo{}
}
func (g *Geo) Filter() {
}