package main
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/dvach"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/redditsave"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/ext/converters"
"github.com/jfk9w/hikkabot/v4/internal/ext/resolvers"
"github.com/jfk9w/hikkabot/v4/internal/ext/vendors"
"github.com/jfk9w-go/aconvert-api"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/gormf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
type C struct {
Telegram struct {
tapp.Config `yaml:",inline"`
core.InterfaceConfig `yaml:",inline"`
} `yaml:"telegram" doc:"Bot-related settings."`
Db apfel.GormConfig `yaml:"db,omitempty" doc:"Poller database connection settings. Supported drivers: postgres, sqlite (not fully)" default:"{\"driver\":\"sqlite\",\"dsn\":\"file::memory:?cache=shared\"}"`
Poller core.PollerConfig `yaml:"poller,omitempty" doc:"Poller-related settings."`
Media struct {
core.BlobConfig `yaml:",inline"`
core.MediatorConfig `yaml:",inline"`
} `yaml:"media,omitempty" doc:"Media downloader settings."`
FFmpeg struct {
Enabled bool `yaml:"enabled,omitempty" doc:"Whether ffmpeg-based media converter should be enabled. Requires ffmpeg to be present in $PATH." default:"true"`
} `yaml:"ffmpeg,omitempty" doc:"FFmpeg-related settings."`
Aconvert struct {
Enabled bool `yaml:"enabled,omitempty" doc:"Whether aconvert.com-based media converter should be enabled."`
aconvert.Config `yaml:",inline"`
} `yaml:"aconvert,omitempty" doc:"aconvert.com-related settings."`
Dvach dvach.Config `yaml:"dvach,omitempty" doc:"2ch.hk-related settings."`
Reddit struct {
Enabled bool `yaml:"enabled,omitempty" doc:"Whether reddit.com-based vendors should be enabled."`
reddit.Config `yaml:",inline"`
Redditsave redditsave.Config `yaml:"redditsave,omitempty" doc:"redditsave.com-related settings. Used to resolve v.redd.it videos with audio."`
Posts vendors.SubredditConfig `yaml:"posts,omitempty" doc:"Subreddit posts vendor settings."`
Suggestions vendors.SubredditSuggestionsConfig `yaml:"suggestions,omitempty" doc:"Subreddit suggestions vendor settings."`
} `yaml:"reddit,omitempty" doc:"reddit.com-related settings."`
Logging apfel.LogfConfig `yaml:"logging,omitempty" doc:"Logging settings."`
Prometheus apfel.PrometheusConfig `yaml:"prometheus,omitempty" doc:"Prometheus settings."`
}
func (c C) LogfConfig() apfel.LogfConfig { return c.Logging }
func (c C) PrometheusConfig() apfel.PrometheusConfig { return c.Prometheus }
func (c C) TelegramConfig() tapp.Config { return c.Telegram.Config }
func (c C) InterfaceConfig() core.InterfaceConfig { return c.Telegram.InterfaceConfig }
func (c C) PollerConfig() core.PollerConfig { return c.Poller }
func (c C) StorageConfig() apfel.GormConfig { return c.Db }
func (c C) BlobConfig() core.BlobConfig { return c.Media.BlobConfig }
func (c C) RedditsaveConfig() redditsave.Config { return c.Reddit.Redditsave }
func (c C) AconvertConfig() aconvert.Config { return c.Aconvert.Config }
func (c C) MediatorConfig() core.MediatorConfig { return c.Media.MediatorConfig }
func (c C) DvachConfig() dvach.Config { return c.Dvach }
func (c C) RedditConfig() reddit.Config { return c.Reddit.Config }
func (c C) SubredditConfig() vendors.SubredditConfig { return c.Reddit.Posts }
func (c C) SubredditSuggestionsConfig() vendors.SubredditSuggestionsConfig {
return c.Reddit.Suggestions
}
var GitCommit = "dev"
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app := apfel.Boot[C]{
Name: "hikkabot",
Version: GitCommit,
}.App(ctx)
defer flu.CloseQuietly(app)
var (
gorm = &apfel.Gorm[C]{
Drivers: map[string]apfel.GormDriver{
"postgres": postgres.Open,
"sqlite": sqlite.Open,
},
Config: gorm.Config{
Logger: gormf.LogfLogger(app, "gorm.sql"),
},
}
poller core.Poller[C]
telegram tapp.Mixin[C]
)
app.Uses(ctx,
new(apfel.Logf[C]),
new(apfel.Prometheus[C]),
&telegram,
gorm,
&poller,
new(core.Interface[C]),
&resolvers.GfycatLike[C]{Name: "gfycat"},
&resolvers.GfycatLike[C]{Name: "redgifs"},
new(resolvers.Imgur[C]),
new(converters.FFmpeg[C]),
new(resolvers.Dvach[C]),
vendors.DvachCatalog[C](),
vendors.DvachThread[C](),
)
config := app.Config()
if config.Aconvert.Enabled {
app.Uses(ctx, new(converters.Aconvert[C]))
}
if config.Reddit.Enabled {
app.Uses(ctx,
new(resolvers.Reddit[C]),
vendors.Subreddit[C](),
vendors.SubredditSuggestions[C](),
)
}
if err := poller.RestoreActive(ctx); err != nil {
logf.Panicf(ctx, "restore active: %+v", err)
}
telegram.Run(ctx)
}
package dvach
import (
"context"
"fmt"
"net/http"
"net/http/cookiejar"
"net/url"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
"github.com/jfk9w-go/flu/logf"
"github.com/pkg/errors"
)
type Config struct {
Usercode string `yaml:"usercode,omitempty" doc:"Auth cookie set for 2ch.hk / and /makaba paths. You can get it from your browser. Required to access hidden boards."`
}
type Context interface {
DvachConfig() Config
}
type Client[C Context] struct {
client httpf.Client
}
func (c Client[C]) String() string {
return "dvach.client"
}
func (c *Client[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
config := app.Config().DvachConfig()
return c.Standalone(ctx, config)
}
func (c *Client[C]) Standalone(ctx context.Context, config Config) error {
jar, err := cookiejar.New(nil)
if err != nil {
return errors.Wrap(err, "create cookie jar")
}
if config.Usercode != "" {
cookieURL := &url.URL{Scheme: "https", Host: Domain}
jar.SetCookies(cookieURL, cookies(config.Usercode, "/"))
jar.SetCookies(cookieURL, cookies(config.Usercode, "/makaba"))
} else {
logf.Get(c).Warnf(ctx, "dvach usercode is empty â hidden boards will be unavailable")
}
c.client = &http.Client{Jar: jar}
return nil
}
func (c *Client[C]) Do(req *http.Request) (*http.Response, error) {
resp, err := c.client.Do(req)
logf.Get(c).Resultf(req.Context(), logf.Trace, logf.Warn, "%s => %v", &httpf.RequestBuilder{Request: req}, err)
return resp, err
}
func (c *Client[C]) GetCatalog(ctx context.Context, board string) (*Catalog, error) {
var catalog Catalog
if err := httpf.GET(Host+"/"+board+"/catalog_num.json").
Exchange(ctx, c).
DecodeBody(flu.JSON(&catalog)).
Error(); err != nil {
return nil, err
}
return &catalog, (&catalog).init(board)
}
func (c *Client[C]) GetThread(ctx context.Context, board string, num int, offset int) ([]Post, error) {
if offset <= 0 {
offset = num
}
var resp struct {
Posts Posts `json:"posts,omitempty"`
Error *Error `json:"error,omitempty"`
}
url := fmt.Sprintf("%s/api/mobile/v2/after/%s/%d/%d", Host, board, num, offset)
if err := httpf.GET(url).
Exchange(ctx, c).
DecodeBody(flu.JSON(&resp)).
Error(); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, *resp.Error
}
return resp.Posts, resp.Posts.init(board)
}
func (c *Client[C]) GetPost(ctx context.Context, board string, num int) (*Post, error) {
posts, err := c.GetThread(ctx, board, num, num)
if err != nil {
return nil, err
}
return &posts[0], nil
}
func (c *Client[C]) GetBoards(ctx context.Context) ([]Board, error) {
var boardMap map[string][]Board
if err := httpf.GET(Host+"/makaba/mobile.fcgi").
Query("task", "get_boards").
Exchange(ctx, c).
DecodeBody(flu.JSON(&boardMap)).
Error(); err != nil {
return nil, err
}
var boards []Board
for _, value := range boardMap {
boards = append(boards, value...)
}
return boards, nil
}
func (c *Client[C]) GetBoard(ctx context.Context, id string) (*Board, error) {
boards, err := c.GetBoards(ctx)
if err != nil {
return nil, err
}
for _, board := range boards {
if board.ID == id {
return &board, nil
}
}
return nil, ErrNotFound
}
package dvach
import (
"context"
"net/http"
)
const ThreadDoesNotExistErrorCode = -3
type Interface interface {
GetCatalog(ctx context.Context, board string) (*Catalog, error)
GetThread(ctx context.Context, board string, num int, offset int) ([]Post, error)
GetPost(ctx context.Context, board string, num int) (*Post, error)
GetBoards(ctx context.Context) ([]Board, error)
GetBoard(ctx context.Context, id string) (*Board, error)
}
func cookies(usercode string, path string) []*http.Cookie {
return []*http.Cookie{
{
Name: "usercode_auth",
Value: usercode,
Domain: Domain,
Path: path,
},
{
Name: "ageallow",
Value: "1",
Domain: Domain,
Path: path,
},
}
}
package dvach
import (
"github.com/pkg/errors"
)
const (
Domain = "2ch.hk"
Host = "https://" + Domain
)
var (
ErrNotFound = errors.New("not found")
)
const (
JPEG FileType = 1
PNG FileType = 2
GIF FileType = 4
WebM FileType = 6
MP4 FileType = 10
)
var Type2MIMEType = map[FileType]string{
JPEG: "image/jpeg",
PNG: "image/png",
GIF: "image/gif",
WebM: "video/webm",
MP4: "video/mp4",
}
type FileType int
func (ft FileType) MIMEType() string {
return Type2MIMEType[ft]
}
package dvach
import (
"fmt"
"sync"
"time"
)
type File struct {
Path string `json:"path"`
Type FileType `json:"type"`
Size int64 `json:"size"`
DurationSecs *int `json:"duration_secs"`
Width *int `json:"width"`
Height *int `json:"height"`
}
func (f File) URL() string {
return Host + f.Path
}
type Post struct {
Num int `json:"num"`
Parent int `json:"parent"`
DateString string `json:"date"`
Subject string `json:"subject"`
Comment string `json:"comment"`
Files []File `json:"files"`
// OP-only fields
PostsCount *int `json:"posts_count"`
FilesCount *int `json:"files_count"`
// fields with custom initialization
Board string
Date time.Time
}
var (
tz *time.Location
tzOnce sync.Once
)
func (p *Post) init(board string) (err error) {
tzOnce.Do(func() {
loc, err := time.LoadLocation("Europe/Moscow")
if err != nil {
panic(err)
}
tz = loc
})
p.Board = board
if p.Parent == 0 {
p.Parent = p.Num
}
datestr := []rune(p.DateString)
p.Date, err = time.ParseInLocation("02/01/06 15:04:05",
string(datestr[:8])+string(datestr[12:]), tz)
return err
}
func (p *Post) IsOriginal() bool {
return p.Parent == p.Num
}
func (p *Post) URL() string {
if p.IsOriginal() {
return fmt.Sprintf("%s/%s/res/%d.html", Host, p.Board, p.Num)
}
return fmt.Sprintf("%s/%s/res/%d.html#%d", Host, p.Board, p.Parent, p.Num)
}
type Posts []Post
func (ps Posts) init(board string) (err error) {
for i := range ps {
err = (&ps[i]).init(board)
if err != nil {
return
}
}
return
}
type Catalog struct {
BoardName string `json:"BoardName"`
Threads []Post `json:"threads"`
}
func (c *Catalog) init(boardID string) (err error) {
return Posts(c.Threads).init(boardID)
}
type Board struct {
ID string `json:"id"`
Name string `json:"name"`
}
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (e Error) Error() string {
return fmt.Sprintf("%d %s", e.Code, e.Message)
}
package reddit
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/pkg/errors"
)
var (
Host = "https://oauth.reddit.com"
AuthEndpoint = "https://www.reddit.com/api/v1/access_token"
)
type Config struct {
ClientID string `yaml:"clientId" doc:"See https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example"`
ClientSecret string `yaml:"clientSecret" doc:"See https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example"`
Username string `yaml:"username" doc:"See https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example"`
Password string `yaml:"password" doc:"See https://github.com/reddit-archive/reddit/wiki/OAuth2-Quick-Start-Example"`
Owner string `yaml:"owner,omitempty" doc:"This value will be used in User-Agent header. If empty, username will be used."`
MaxRetries int `yaml:"maxRetries,omitempty" doc:"Maximum request retries before giving up." default:"3"`
}
type Context interface {
RedditConfig() Config
}
type Client[C Context] struct {
*client
}
func (c *Client[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
config := app.Config().RedditConfig()
owner := config.Owner
if owner == "" {
owner = config.Username
}
token := make(chan string, 1)
token <- ""
c.client = &client{
client: &http.Client{
Transport: withUserAgent(
httpf.NewDefaultTransport(),
fmt.Sprintf(`hikkabot/%s by /u/%s`, app.Version(), owner)),
},
config: config,
clock: app,
token: token,
}
return nil
}
type client struct {
client httpf.Client
config Config
clock syncf.Clock
token chan string
expiresAt time.Time
}
func (c *client) String() string {
return "reddit.client"
}
func (c *client) Do(req *http.Request) (*http.Response, error) {
resp, err := c.client.Do(req)
logf.Get(c).Resultf(req.Context(), logf.Trace, logf.Warn, "%s => %v", &httpf.RequestBuilder{Request: req}, err)
return resp, err
}
func (c *client) getToken(ctx context.Context) (string, error) {
select {
case token := <-c.token:
return token, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func (c *client) done(ctx context.Context, token string) error {
select {
case c.token <- token:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (c *client) authorize(ctx context.Context) (string, error) {
var resp struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := httpf.POST(AuthEndpoint, nil).
Auth(httpf.Basic(c.config.ClientID, c.config.ClientSecret)).
Query("grant_type", "password").
Query("username", c.config.Username).
Query("password", c.config.Password).
Exchange(ctx, c).
CheckStatus(http.StatusOK).
DecodeBody(flu.JSON(&resp)).
Error(); err != nil {
return "", err
}
c.expiresAt = c.clock.Now().Add(time.Duration(resp.ExpiresIn) * time.Second).Add(-time.Minute)
return resp.AccessToken, nil
}
var (
errUnauthorized = errors.New("unauthorized")
errRateLimited = errors.New("rate-limited")
)
func (c *client) execute(ctx context.Context, req *httpf.RequestBuilder, result any) error {
token, err := c.getToken(ctx)
if err != nil {
return err
}
defer func() { _ = c.done(ctx, token) }()
var resp *httpf.ExchangeResult
for i := 0; i < c.config.MaxRetries+1; i++ {
if token == "" || c.expiresAt.Before(c.clock.Now()) {
token, err = c.authorize(ctx)
logf.Get(c).Resultf(ctx, logf.Debug, logf.Error, "refresh token: %v", err)
if err != nil {
return errors.Wrap(err, "authorize")
}
}
resp = req.Auth(httpf.Bearer(token)).Exchange(ctx, c)
resp.HandleFunc(func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusOK:
return nil
case http.StatusUnauthorized:
token = ""
return errUnauthorized
case http.StatusTooManyRequests:
resetValue := resp.Header.Get("X-Ratelimit-Reset")
reset, err := strconv.Atoi(resetValue)
if err != nil {
return errors.Wrapf(err, "parse reset header: %s", resetValue)
}
resetAfter := time.Duration(reset) * time.Second
logf.Get(c).Warnf(ctx, "request overflow, sleeping for %s", resetAfter)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(resetAfter):
return errRateLimited
}
default:
return nil
}
})
if result != nil {
resp.DecodeBody(flu.JSON(result))
}
err = resp.Error()
switch err {
case nil:
return nil
case errUnauthorized, errRateLimited:
continue
default:
return err
}
}
return err
}
func (c *client) GetListing(ctx context.Context, subreddit, sort string, limit int) ([]Thing, error) {
if limit <= 0 {
limit = 25
}
var resp Listing
if err := c.execute(ctx, httpf.GET(Host+"/r/"+subreddit+"/"+sort).
Query("limit", strconv.Itoa(limit)),
&resp); err != nil {
return nil, errors.Wrap(err, "get listing")
}
return resp.Data.Children, nil
}
func (c *client) GetPosts(ctx context.Context, subreddit string, ids ...string) ([]Thing, error) {
var resp Listing
if err := c.execute(ctx, httpf.GET(Host+"/r/"+subreddit+"/api/info").
Query("id", strings.Join(ids, ",")),
&resp); err != nil {
return nil, errors.Wrap(err, "get posts")
}
return resp.Data.Children, nil
}
func (c *client) Subscribe(ctx context.Context, action SubscribeAction, subreddits []string) error {
return c.execute(ctx, httpf.POST(Host+"/api/subscribe", nil).
Query("action", string(action)).
Query("skip_initial_defaults", "true").
Query("sr_name", strings.Join(subreddits, ",")),
nil)
}
func withUserAgent(rt http.RoundTripper, userAgent string) httpf.RoundTripperFunc {
return func(req *http.Request) (*http.Response, error) {
req.Header.Set("User-Agent", userAgent)
return rt.RoundTrip(req)
}
}
package reddit
import (
"html"
"io"
"strconv"
"strings"
"time"
"github.com/jfk9w-go/flu"
"github.com/pkg/errors"
"gopkg.in/guregu/null.v3"
)
type Media struct {
RedditVideo struct {
FallbackURL string `json:"fallback_url"`
} `json:"reddit_video"`
}
type MediaContainer struct {
Media Media `json:"media"`
SecureMedia Media `json:"secure_media"`
}
func (mc MediaContainer) FallbackURL() string {
url := mc.Media.RedditVideo.FallbackURL
if url == "" {
url = mc.SecureMedia.RedditVideo.FallbackURL
}
return url
}
type ThingData struct {
ID string `json:"name" gorm:"primaryKey"`
NumID uint64 `json:"-" gorm:"not null"`
CreatedAt time.Time `json:"-" gorm:"not null"`
Title string `json:"title" gorm:"-"`
Subreddit string `json:"subreddit" gorm:"not null;index"`
Domain string `json:"domain" gorm:"not null"`
URL null.String `json:"url"`
Ups int `json:"ups" gorm:"not null"`
SelfTextHTML string `json:"selftext_html" gorm:"-"`
IsSelf bool `json:"is_self" gorm:"not null"`
CreatedSecs float32 `json:"created_utc" gorm:"-"`
MediaContainer `gorm:"-"`
CrosspostParentList []MediaContainer `json:"crosspost_parent_list" gorm:"-"`
Permalink string `json:"permalink" gorm:"-"`
Author string `json:"author" gorm:"not null"`
}
func (d ThingData) PermalinkURL() string {
return "https://reddit.com" + d.Permalink
}
type Thing struct {
Data ThingData `json:"data" gorm:"embedded"`
LastSeen time.Time `json:"-" gorm:"not null"`
}
func (t Thing) TableName() string {
return "reddit"
}
type Listing struct {
Data struct {
Children []Thing `json:"children"`
} `json:"data"`
}
func (l *Listing) DecodeFrom(body io.Reader) error {
if err := flu.DecodeFrom(flu.IO{R: body}, flu.JSON(l)); err != nil {
return err
}
for i := range l.Data.Children {
child := &l.Data.Children[i]
var err error
id := strings.Split(child.Data.ID, "_")[1]
child.Data.NumID, err = strconv.ParseUint(id, 36, 64)
if err != nil {
return errors.Wrapf(err, "parse id: %s", id)
}
child.Data.SelfTextHTML = html.UnescapeString(child.Data.SelfTextHTML)
child.Data.CreatedAt = time.Unix(int64(child.Data.CreatedSecs), 0)
}
return nil
}
type SubscribeAction string
const (
Subscribe SubscribeAction = "sub"
Unsubscribe SubscribeAction = "unsub"
)
package redditsave
import (
"context"
"net/http"
"net/http/cookiejar"
"time"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/flu/httpf"
"github.com/pkg/errors"
)
var URL = "https://redditsave.com"
type Config struct {
RefreshEvery flu.Duration `yaml:"refreshEvery,omitempty" doc:"Cookie refresh interval" default:"20m"`
}
type Context interface {
RedditsaveConfig() Config
}
type Client[C Context] struct {
*client
}
func (c *Client[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
config := app.Config().RedditsaveConfig()
return c.Standalone(ctx, app, config.RefreshEvery.Value)
}
func (c *Client[C]) Standalone(ctx context.Context, clock syncf.Clock, refreshEvery time.Duration) error {
c.client = &client{
client: new(http.Client),
clock: clock,
refreshEvery: refreshEvery,
}
return nil
}
type client struct {
client *http.Client
clock syncf.Clock
refreshEvery time.Duration
lastRefresh time.Time
mu syncf.RWMutex
}
func (c *client) String() string {
return "redditsave.client"
}
func (c *client) Do(req *http.Request) (*http.Response, error) {
resp, err := c.client.Do(req)
logf.Get(c).Resultf(req.Context(), logf.Trace, logf.Warn, "%s => %v", &httpf.RequestBuilder{Request: req}, err)
return resp, err
}
func (c *client) ResolveURL(ctx context.Context, url string) (string, error) {
now := c.clock.Now()
ctx, cancel := c.mu.Lock(ctx)
if ctx.Err() != nil {
return "", ctx.Err()
}
defer cancel()
if now.Sub(c.lastRefresh) <= c.refreshEvery {
defer cancel()
var resp resolveResponse
err := httpf.GET(URL+"/info").
Query("url", url).
Exchange(ctx, c).
CheckStatus(http.StatusOK).
Handle(&resp).
Error()
return resp.url, err
}
jar, err := cookiejar.New(nil)
if err != nil {
return "", errors.Wrap(err, "create new cookie jar")
}
c.client.Jar = jar
err = httpf.GET(URL).
Exchange(ctx, c).
CheckStatus(http.StatusOK).
Error()
logf.Get(c).Resultf(ctx, logf.Debug, logf.Error, "refresh cookie: %v", err)
if err != nil {
return "", errors.Wrapf(err, "get [%s] to refresh cookie", URL)
}
c.lastRefresh = now
return c.ResolveURL(ctx, url)
}
package redditsave
import (
"errors"
"net/http"
"github.com/jfk9w-go/flu"
tghtml "github.com/jfk9w-go/telegram-bot-api/ext/html"
"golang.org/x/net/html"
)
type resolveResponse struct {
url string
}
func (h *resolveResponse) Handle(resp *http.Response) error {
defer flu.CloseQuietly(resp.Body)
tokenizer := html.NewTokenizer(resp.Body)
for tokenizer.Next() != html.ErrorToken {
token := tokenizer.Token()
if token.Type == html.StartTagToken && token.Data == "a" {
if tghtml.Get(token.Attr, "class") == "downloadbutton" {
h.url = tghtml.Get(token.Attr, "href")
return nil
}
}
}
return errors.New("unable to find url")
}
package srstats
import (
"context"
"net/http"
"github.com/jfk9w-go/flu/logf"
"github.com/pkg/errors"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/httpf"
)
var BaseURL = "https://subredditstats.com/api"
type Client[C any] struct {
client httpf.Client
}
func (c Client[C]) String() string {
return "srstats.client"
}
func (c *Client[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
c.client = new(http.Client)
return nil
}
func (c *Client[C]) Do(req *http.Request) (*http.Response, error) {
resp, err := c.client.Do(req)
logf.Get(c).Resultf(req.Context(), logf.Trace, logf.Warn, "%s => %v", &httpf.RequestBuilder{Request: req}, err)
return resp, err
}
func (c *Client[C]) GetGlobalHistogram(ctx context.Context) (map[string]float64, error) {
var m map[string]float64
return m, httpf.GET(BaseURL+"/globalSubredditsIdHist").
Exchange(ctx, c).
CheckStatus(http.StatusOK).
DecodeBody(flu.JSON(&m)).
Error()
}
func (c *Client[C]) GetHistogram(ctx context.Context, subreddit string) (map[string]float64, error) {
var buf flu.ByteBuffer
if err := httpf.GET(BaseURL+"/subredditNameToSubredditsHist").
Query("subredditName", subreddit).
Exchange(ctx, c).
CheckStatus(http.StatusOK).
CopyBody(&buf).
Error(); err != nil {
return nil, err
}
var resp map[string]float64
if err := flu.DecodeFrom(&buf, flu.JSON(&resp)); err == nil {
return resp, nil
}
return nil, errors.Errorf(buf.String())
}
func (c *Client[C]) GetSubredditNames(ctx context.Context, ids []string) ([]string, error) {
names := make([]string, 0)
return names, httpf.POST(BaseURL+"/specificSubredditIdsToNames", flu.JSON(map[string][]string{"subredditIds": ids})).
ContentType("text/plain").
Exchange(ctx, c).
CheckStatus(http.StatusOK).
DecodeBody(flu.JSON(&names)).
Error()
}
package core
import (
"context"
"os"
"github.com/jfk9w/hikkabot/v4/internal/core/internal/blobs"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/pkg/errors"
)
type BlobConfig struct {
MinSize media.Size `yaml:"minSize,omitempty" doc:"Minimum media file size." pattern:"^(\\d+)([KMGT])?$" default:"1K"`
MaxSize media.Size `yaml:"maxSize,omitempty" doc:"Maximum media file size." pattern:"^(\\d+)([KMGT])?$" default:"50M"`
TTL flu.Duration `yaml:"ttl,omitempty" doc:"How long to keep cached files." default:"15m"`
}
type BlobContext interface {
BlobConfig() BlobConfig
}
type Blobs[C BlobContext] struct {
feed.Blobs
}
func (b Blobs[C]) String() string {
return blobs.ServiceID
}
func (b *Blobs[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if b.Blobs != nil {
return nil
}
dir, err := os.MkdirTemp(os.TempDir(), "blobs-")
if err != nil {
return errors.Wrapf(err, "create temporary directory")
}
config := app.Config().BlobConfig()
blobs := &blobs.Files{
Clock: app,
Dir: dir,
TTL: config.TTL.Value,
SizeBounds: [2]media.Size{config.MinSize, config.MaxSize},
}
if err := app.Manage(ctx, blobs); err != nil {
return err
}
b.Blobs = blobs
return nil
}
var SkipSizeCheck = blobs.SkipSizeCheck
package core
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/core/internal/iface"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
)
type InterfaceConfig struct {
SupervisorID telegram.ID `yaml:"supervisorId" doc:"Telegram admin user ID. If not set, only public commands (e.g. /start) will be available."`
Aliases map[string]telegram.ID `yaml:"aliases,omitempty" doc:"Chat aliases to use in commands: keys are aliases and values are telegram IDs."`
}
type InterfaceContext interface {
tapp.Context
StorageContext
PollerContext
InterfaceConfig() InterfaceConfig
}
type Interface[C InterfaceContext] struct {
*iface.Impl
}
func (i *Interface[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var bot tapp.Mixin[C]
if err := app.Use(ctx, &bot, false); err != nil {
return err
}
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var poller Poller[C]
if err := app.Use(ctx, &poller, false); err != nil {
return err
}
config := app.Config().InterfaceConfig()
if config.SupervisorID == 0 {
logf.Get(i).Warnf(ctx, "telegram supervisor ID is not set â subscription management is disabled; "+
"consider providing valid user ID (hint: use /start command to get it)")
}
i.Impl = &iface.Impl{
Telegram: bot.Bot(),
Poller: poller,
Storage: storage,
SupervisorID: config.SupervisorID,
Aliases: config.Aliases,
}
return nil
}
package blobs
import "context"
const ServiceID = "core.blobs"
type skipSizeCheckKey struct{}
func SkipSizeCheck(ctx context.Context) context.Context {
return context.WithValue(ctx, skipSizeCheckKey{}, true)
}
func skipSizeCheck(ctx context.Context) bool {
_, ok := ctx.Value(skipSizeCheckKey{}).(bool)
return ok
}
package blobs
import (
"context"
"os"
"sync"
"time"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/gofrs/uuid"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/pkg/errors"
)
type Files struct {
Clock syncf.Clock
TTL time.Duration
Dir string
SizeBounds [2]media.Size
files map[flu.File]time.Time
once sync.Once
mu syncf.RWMutex
}
func (fs *Files) String() string {
return ServiceID
}
func (fs *Files) Buffer(mimeType string, ref media.Ref) media.MetaRef {
fs.once.Do(func() { fs.files = make(map[flu.File]time.Time) })
return &fileRef{
fs: fs,
meta: media.Meta{MIMEType: mimeType},
ref: ref,
}
}
func (fs *Files) alloc(ctx context.Context) (flu.File, error) {
ctx, cancel := fs.mu.Lock(ctx)
if ctx.Err() != nil {
return "", ctx.Err()
}
defer cancel()
now := fs.Clock.Now()
for file, createdAt := range fs.files {
if now.Sub(createdAt) > fs.TTL {
err := file.Remove()
logf.Get(fs).Resultf(ctx, logf.Debug, logf.Warn, "remove blob file [%s]: %v", file, err)
}
}
file := flu.File(fs.Dir + "/" + uuid.Must(uuid.NewV4()).String())
fs.files[file] = now
logf.Get(fs).Debugf(ctx, "allocated new file blob [%s]", file)
return file, nil
}
func (fs *Files) Close() error {
return os.RemoveAll(fs.Dir)
}
type fileRef struct {
fs *Files
meta media.Meta
ref media.Ref
file flu.File
err error
once sync.Once
}
func (r *fileRef) GetMeta(ctx context.Context) (*media.Meta, error) {
r.once.Do(func() { r.get(ctx) })
return &r.meta, r.err
}
func (r *fileRef) Get(ctx context.Context) (flu.Input, error) {
r.once.Do(func() { r.get(ctx) })
return r.file, r.err
}
func (r *fileRef) get(ctx context.Context) {
input, err := r.ref.Get(ctx)
if err != nil {
r.err = err
return
}
file, ok := input.(flu.File)
if !ok {
file, err = r.fs.alloc(ctx)
if err != nil {
r.err = err
return
}
if _, err := flu.Copy(input, file); err != nil {
r.err = err
return
}
}
stat, err := os.Stat(file.String())
if err != nil {
r.err = err
return
}
r.file = file
if skipSizeCheck(ctx) {
return
}
size := media.Size(stat.Size())
if size > 0 {
switch {
case size < r.fs.SizeBounds[0]:
r.err = errors.Errorf("size %s too low", size)
return
case size >= r.fs.SizeBounds[1]:
r.err = errors.Errorf("size %s too large", size)
return
}
}
r.meta.Size = media.Size(stat.Size())
}
package iface
import (
"context"
"fmt"
"strings"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/colf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
"github.com/pkg/errors"
)
const (
suspend = "s"
resume = "r"
delete = "d"
fire = "đĨ"
stop = "đ"
bin = "đ"
thumbsUp = "đ"
)
type Impl struct {
Telegram telegram.Client
Poller feed.Poller
Storage feed.Storage
SupervisorID telegram.ID
Aliases map[string]telegram.ID
}
func (i *Impl) String() string {
return ServiceID
}
func (i *Impl) CommandScope() tapp.CommandScope {
if i.SupervisorID == 0 {
return tapp.CommandScope{}
}
return tapp.CommandScope{
UserIDs: colf.Set[telegram.ID]{i.SupervisorID: true},
}
}
//
// Command listeners
//
func (i *Impl) Subscribe(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
if len(cmd.Args) == 0 {
return errSubscribe
}
ref := cmd.Args[0]
ctx, feedID, err := i.resolveFeedID(ctx, cmd, 1)
if err != nil {
return err
}
var options []string
if len(cmd.Args) > 2 {
options = cmd.Args[2:]
}
if err := i.Poller.Subscribe(ctx, feedID, ref, options); err != nil {
return err
}
return cmd.ReplyCallback(ctx, i.Telegram, thumbsUp)
}
func (i *Impl) Suspend(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
header, err := i.parseHeader(cmd, 0)
if err != nil {
return err
}
return i.Poller.Suspend(ctx, header, feed.ErrSuspendedByUser)
}
func (i *Impl) Resume(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
header, err := i.parseHeader(cmd, 0)
if err != nil {
return err
}
return i.Poller.Resume(ctx, header)
}
func (i *Impl) Delete(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
header, err := i.parseHeader(cmd, 0)
if err != nil {
return err
}
return i.Poller.Delete(ctx, header)
}
func (i *Impl) Clear(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
if len(cmd.Args) < 1 {
return errDeleteAll
}
ctx, feedID, err := i.resolveFeedID(ctx, cmd, 1)
if err != nil {
return err
}
pattern := cmd.Args[0]
return i.Poller.Clear(ctx, feedID, pattern)
}
func (i *Impl) List(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
ctx, feedID, err := i.resolveFeedID(ctx, cmd, 0)
if err != nil {
return err
}
active := len(cmd.Args) > 1 && cmd.Args[1] == resume
subs, err := i.Storage.ListSubscriptions(ctx, feedID, active)
if err != nil {
return err
}
if !active && len(subs) == 0 {
active = true
subs, err = i.Storage.ListSubscriptions(ctx, feedID, active)
if err != nil {
return err
}
}
status, changeCmd := stop, resume
if active {
status, changeCmd = fire, suspend
}
// by row
keyboard := make([][]telegram.Button, len(subs))
for i, sub := range subs {
keyboard[i] = []telegram.Button{
(&telegram.Command{Key: changeCmd, Args: []string{formatHeader(sub.Header)}}).Button(sub.Name),
}
}
text := telegram.Text{
ParseMode: telegram.HTML,
Text: fmt.Sprintf("%d subs %s", len(subs), status),
DisableWebPagePreview: true,
}
_, err = i.Telegram.Send(ctx, cmd.Chat.ID, text, &telegram.SendOptions{
ReplyMarkup: telegram.InlineKeyboard(keyboard...),
ReplyToMessageID: cmd.Message.ID,
})
return err
}
func (i *Impl) Sub(ctx context.Context, client telegram.Client, cmd *telegram.Command) error {
return i.Subscribe(ctx, client, cmd)
}
//
// Callback aliases
//
func (i *Impl) S_callback(ctx context.Context, client telegram.Client, cmd *telegram.Command) error {
if err := i.Suspend(ctx, client, cmd); err != nil {
return err
}
return cmd.ReplyCallback(ctx, client, thumbsUp)
}
func (i *Impl) R_callback(ctx context.Context, client telegram.Client, cmd *telegram.Command) error {
if err := i.Resume(ctx, client, cmd); err != nil {
return err
}
return cmd.ReplyCallback(ctx, client, thumbsUp)
}
func (i *Impl) D_callback(ctx context.Context, client telegram.Client, cmd *telegram.Command) error {
if err := i.Delete(ctx, client, cmd); err != nil {
return err
}
return cmd.ReplyCallback(ctx, client, thumbsUp)
}
//
// After triggers
//
func (i *Impl) AfterResume(ctx context.Context, sub *feed.Subscription) error {
chatTitle, err := i.getChatTitle(ctx, sub.FeedID)
if err != nil {
return err
}
buttons := []telegram.Button{
(&telegram.Command{Key: suspend, Args: []string{formatHeader(sub.Header)}}).Button("Suspend"),
}
ctx = receiver.ReplyMarkup(ctx, telegram.InlineKeyboard(buttons))
return ext.HTML(ctx, i.Telegram, i.SupervisorID).
Text(sub.Name+" @ ").
Text(chatTitle).
Text(" %s", fire).
Flush()
}
func (i *Impl) AfterSuspend(ctx context.Context, sub *feed.Subscription) error {
chatTitle, err := i.getChatTitle(ctx, sub.FeedID)
if err != nil {
return err
}
buttons := []telegram.Button{
(&telegram.Command{Key: resume, Args: []string{formatHeader(sub.Header)}}).Button("Resume"),
(&telegram.Command{Key: delete, Args: []string{formatHeader(sub.Header)}}).Button("Delete"),
}
ctx = receiver.ReplyMarkup(ctx, telegram.InlineKeyboard(buttons))
return ext.HTML(ctx, i.Telegram, i.SupervisorID).
Text(sub.Name+" @ ").
Text(chatTitle).
Text(" %s\n%s", stop, sub.Error.String).
Flush()
}
func (i *Impl) AfterDelete(ctx context.Context, sub *feed.Subscription) error {
chatTitle, err := i.getChatTitle(ctx, sub.FeedID)
if err != nil {
return err
}
return ext.HTML(ctx, i.Telegram, i.SupervisorID).
Text(sub.Name+" @ ").
Text(chatTitle).
Text(" %s", bin).
Flush()
}
func (i *Impl) AfterClear(ctx context.Context, feedID feed.ID, pattern string, deleted int64) error {
chatTitle, err := i.getChatTitle(ctx, feedID)
if err != nil {
return err
}
return ext.HTML(ctx, i.Telegram, i.SupervisorID).
Text(fmt.Sprintf("%d subs @ ", deleted)).
Text(chatTitle).
Text(" %s (%s)", bin, pattern).
Flush()
}
//
// Implementation details
//
const headerDelimiter = "+"
func formatHeader(header feed.Header) string {
return strings.Join([]string{header.FeedID.String(), header.Vendor, header.SubID}, headerDelimiter)
}
func (i *Impl) parseHeader(cmd *telegram.Command, argumentIndex int) (header feed.Header, err error) {
arg := cmd.Args[argumentIndex]
tokens := strings.Split(arg, headerDelimiter)
if len(tokens) != 3 {
err = errors.Errorf("invalid header [%s]", header)
return
}
feedID, err := telegram.ParseID(tokens[0])
if err != nil {
err = errors.Wrapf(err, "invalid string id: %s", tokens[2])
return
}
header.SubID = tokens[2]
header.Vendor = tokens[1]
header.FeedID = feed.ID(feedID)
return
}
type contextValues struct {
feed *telegram.Chat
}
func (i *Impl) getChatTitle(ctx context.Context, feedID feed.ID) (string, error) {
var feed *telegram.Chat
if values, ok := ctx.Value(contextValues{}).(contextValues); ok && values.feed != nil {
feed = values.feed
} else {
var err error
feed, err = i.Telegram.GetChat(ctx, telegram.ID(feedID))
switch {
case syncf.IsContextRelated(err):
return "", err
case err != nil:
logf.Get(i).Errorf(ctx, "get chat %d: %v", feedID, err)
return fmt.Sprint(feedID), nil
}
}
if feed.Type == telegram.PrivateChat {
return "<private>", nil
}
return feed.Title, nil
}
func (i *Impl) resolveFeedID(ctx context.Context, cmd *telegram.Command, argumentIndex int) (context.Context, feed.ID, error) {
chatID := cmd.Chat.ID
if len(cmd.Args) > argumentIndex {
if arg := cmd.Args[argumentIndex]; arg != "." {
if id, ok := i.Aliases[arg]; ok {
chatID = id
} else {
chat, err := i.Telegram.GetChat(ctx, telegram.Username(arg))
if err != nil {
logf.Get(i).Warnf(ctx, "failed to get chat %s: %v", arg, err)
chatID, err = telegram.ParseID(arg)
if err != nil {
return nil, 0, errors.Wrap(err, "parse header")
}
} else {
chatID = chat.ID
ctx = context.WithValue(ctx, contextValues{}, contextValues{
feed: chat,
})
}
}
}
}
return ctx, feed.ID(chatID), nil
}
package mediator
import (
"crypto/md5"
"fmt"
"image"
"image/jpeg"
"image/png"
"io"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/corona10/goimagehash"
"github.com/jfk9w-go/flu"
"github.com/pkg/errors"
"golang.org/x/image/bmp"
)
type readImageFunc func(io.Reader) (image.Image, error)
var imageTypes = map[string]readImageFunc{
"image/jpeg": jpeg.Decode,
"image/png": png.Decode,
"image/bmp": bmp.Decode,
}
func hashImage(blob flu.Input, hash *feed.MediaHash, readImage readImageFunc) error {
reader, err := blob.Reader()
if err != nil {
return errors.Wrap(err, "open blob")
}
defer flu.CloseQuietly(reader)
img, err := readImage(reader)
if err != nil {
return errors.Wrap(err, "read image")
}
dhash, err := goimagehash.DifferenceHash(img)
if err != nil {
return errors.Wrap(err, "get diff hash")
}
hash.Type = "dhash"
hash.Value = fmt.Sprintf("%x", dhash.GetHash())
return nil
}
func hashAny(blob flu.Input, hash *feed.MediaHash) error {
md5 := md5.New()
if _, err := flu.Copy(blob, flu.IO{W: md5}); err != nil {
return errors.Wrap(err, "get md5 hash")
}
hash.Type = "md5"
hash.Value = fmt.Sprintf("%x", md5.Sum(nil))
return nil
}
package mediator
import (
"context"
"net/url"
"sync"
"time"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/me3x"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
"github.com/pkg/errors"
)
var convertibleTypes = map[string]string{
"video/webm": "video/mp4",
}
type Impl struct {
Clock syncf.Clock
Storage feed.MediaHashStorage
Blobs feed.Blobs
Locker syncf.Locker
Metrics me3x.Registry
Timeout time.Duration
resolvers []media.Resolver
converters []media.Converter
ctx context.Context
cancel func()
work syncf.WaitGroup
once sync.Once
}
func (m *Impl) String() string {
return ServiceID
}
func (m *Impl) init() {
m.ctx, m.cancel = context.WithCancel(context.Background())
}
func (m *Impl) RegisterMediaResolver(resolver media.Resolver) {
m.resolvers = append(m.resolvers, resolver)
}
func (m *Impl) RegisterMediaConverter(converter media.Converter) {
m.converters = append(m.converters, converter)
}
func (m *Impl) Mediate(ctx context.Context, source string, dedupKey *feed.ID) receiver.MediaRef {
url, err := url.Parse(source)
if err != nil {
return receiver.MediaError{E: err}
}
m.once.Do(m.init)
return syncf.AsyncWith[*receiver.Media](m.ctx, m.work.Spawn, func(ctx context.Context) (*receiver.Media, error) {
var dedup *dedupOpts
if dedupKey != nil {
dedup = &dedupOpts{
key: *dedupKey,
source: url,
}
}
ctx, cancel := context.WithTimeout(ctx, m.Timeout)
defer cancel()
logf.Get(m).Tracef(ctx, "mediating [%s]", source)
startTime := m.Clock.Now()
media, err := m.mediate(ctx, url, dedup)
logf.Get(m).Resultf(ctx, logf.Debug, logf.Warn,
"mediated [%s] in %s: %v", source, m.Clock.Now().Sub(startTime), err)
if errors.Is(err, errDuplicate) {
return nil, nil
}
return media, err
})
}
func (m *Impl) mediate(ctx context.Context, source *url.URL, dedup *dedupOpts) (*receiver.Media, error) {
metaRef, err := m.resolve(ctx, source)
if err != nil {
return nil, err
}
meta, ref, err := m.convert(ctx, metaRef, dedup)
if err != nil {
return nil, err
}
m.incrementCounter(source, dedup, meta, err)
mediaType := telegram.MediaTypeByMIMEType(meta.MIMEType)
if mediaType == telegram.DefaultMediaType {
return nil, errors.Errorf("mime type %s is not supported", meta.MIMEType)
}
if meta.Size > 0 && int64(meta.Size) <= mediaType.RemoteMaxSize() {
input, err := ref.Get(ctx)
if err != nil {
return nil, err
}
return &receiver.Media{
MIMEType: meta.MIMEType,
Input: input,
}, nil
}
if int64(meta.Size) <= mediaType.AttachMaxSize() {
ref := m.Blobs.Buffer(meta.MIMEType, ref)
meta, err := ref.GetMeta(ctx)
if err != nil {
return nil, err
}
if int64(meta.Size) <= mediaType.AttachMaxSize() {
input, err := ref.Get(ctx)
if err != nil {
return nil, err
}
return &receiver.Media{
Input: input,
MIMEType: meta.MIMEType,
}, nil
}
}
return nil, errors.Errorf("size %s too large", meta.Size)
}
func (m *Impl) incrementCounter(source *url.URL, dedup *dedupOpts, meta *media.Meta, err error) {
switch {
case errors.Is(err, errDuplicate):
labels := make(me3x.Labels, 0, 2).
Add("origin", source.Host).
Add("feed_id", dedup.key)
m.Metrics.Counter("duplicate", labels).Inc()
case err != nil:
labels := make(me3x.Labels, 0, 1).
Add("origin", source.Host)
m.Metrics.Counter("failed", labels).Inc()
default:
labels := make(me3x.Labels, 0, 2).
Add("origin", source.Host).
Add("type", meta.MIMEType)
m.Metrics.Counter("ok", labels).Inc()
}
}
func (m *Impl) convert(ctx context.Context, metaRef media.MetaRef, dedup *dedupOpts) (*media.Meta, media.Ref, error) {
meta, err := metaRef.GetMeta(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "get meta")
}
var ref media.Ref
if dedup != nil {
ref = m.Blobs.Buffer(meta.MIMEType, metaRef)
if err := m.dedup(ctx, meta.MIMEType, ref, dedup); err != nil {
return nil, nil, err
}
} else {
ref = m.bufferLeaveURL(meta.MIMEType, metaRef)
}
if mimeType, ok := convertibleTypes[meta.MIMEType]; ok {
for _, converter := range m.converters {
metaRef, err := converter.Convert(ctx, ref, mimeType)
if metaRef == nil && err == nil {
continue
}
logf.Get(m).Resultf(ctx, logf.Debug, logf.Warn, "convert with [%s]: %v", converter, err)
if metaRef != nil {
return m.convert(ctx, metaRef, nil)
}
}
}
return meta, ref, nil
}
func (m *Impl) dedup(ctx context.Context, mimeType string, ref media.Ref, dedup *dedupOpts) error {
input, err := ref.Get(ctx)
if err != nil {
return err
}
now := m.Clock.Now()
hash := &feed.MediaHash{
FeedID: dedup.key,
URL: dedup.source.String(),
FirstSeen: now,
LastSeen: now,
}
if readImage, ok := imageTypes[mimeType]; ok {
err = hashImage(input, hash, readImage)
} else {
err = hashAny(input, hash)
}
logf.Get(m).Resultf(ctx, logf.Debug, logf.Warn, "hash media [%s => %s]: %v", hash.URL, hash.Value, err)
if err != nil {
return err
}
ok, err := m.Storage.IsMediaUnique(ctx, hash)
if err != nil {
return err
}
if !ok {
return errDuplicate
}
return nil
}
func (m *Impl) bufferLeaveURL(mimeType string, ref media.Ref) media.Ref {
return syncf.Resolve[flu.Input](func(ctx context.Context) (flu.Input, error) {
input, err := ref.Get(ctx)
if err != nil {
return nil, err
}
if url, ok := input.(flu.URL); ok {
return url, nil
}
return m.Blobs.Buffer(mimeType, ref).Get(ctx)
})
}
func (m *Impl) resolve(ctx context.Context, source *url.URL) (metaRef media.MetaRef, err error) {
for _, resolver := range m.resolvers {
metaRef, err = resolver.Resolve(ctx, source)
if metaRef == nil && err == nil {
continue
}
logf.Get(m).Resultf(ctx, logf.Debug, logf.Warn, "resolve [%s] with [%s]: %v", source, resolver, err)
if metaRef != nil {
logf.Get(m).Debugf(ctx, "resolve [%s] with [%s]: ok", source, resolver)
return
}
}
logf.Get(m).Debugf(ctx, "resolve [%s] as http ref", source)
return &media.HTTPRef{URL: source.String()}, nil
}
func (m *Impl) Close() error {
if m.cancel != nil {
m.cancel()
m.work.Wait()
}
return nil
}
type dedupOpts struct {
key feed.ID
source *url.URL
}
package poller
import (
"context"
"time"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/gormf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/me3x"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api"
tghtml "github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/output"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
"github.com/pkg/errors"
"gopkg.in/guregu/null.v3"
)
type Impl struct {
Clock syncf.Clock
Storage feed.Storage
Executor feed.TaskExecutor
Metrics me3x.Registry
Telegram telegram.Client
Interval time.Duration
Preload int
vendors map[string]feed.Vendor
stateListeners StateListeners
}
func (p *Impl) String() string {
return ServiceID
}
func (p *Impl) RegisterVendor(id string, vendor feed.Vendor) error {
if p.vendors == nil {
p.vendors = make(map[string]feed.Vendor)
}
if _, ok := p.vendors[id]; ok {
return errors.Errorf("vendor %s already registered", id)
}
p.vendors[id] = vendor
return nil
}
func (p *Impl) RegisterStateListener(listener feed.AfterStateListener) {
p.stateListeners = append(p.stateListeners, listener)
}
func (p *Impl) RestoreActive(ctx context.Context) error {
activeFeedIDs, err := p.Storage.GetActiveFeedIDs(ctx)
if err != nil {
return errors.Wrap(err, "get active feeds from storage")
}
for _, feedID := range activeFeedIDs {
p.submitTask(feedID)
}
return nil
}
func (p *Impl) Subscribe(ctx context.Context, feedID feed.ID, ref string, options []string) error {
for vendorKey, vendor := range p.vendors {
draft, err := vendor.Parse(ctx, ref, options)
switch {
case err != nil:
return errors.Wrapf(err, "parse with %s", vendorKey)
case draft == nil:
continue
}
header := feed.Header{
SubID: draft.SubID,
Vendor: vendorKey,
FeedID: feedID,
}
if listener, ok := vendor.(feed.BeforeResumeListener); ok {
err := listener.BeforeResume(ctx, header)
logf.Get(p).Resultf(ctx, logf.Debug, logf.Warn, "before resume [%s]: %v", header, err)
if err != nil {
return err
}
}
data, err := gormf.ToJSONB(draft.Data)
if err != nil {
return errors.Wrap(err, "convert data")
}
sub := &feed.Subscription{
Header: header,
Name: draft.Name,
Data: data,
}
for _, option := range options {
if option == feed.Deadborn {
sub.Error = null.StringFrom(feed.Deadborn)
}
}
if err := p.Storage.CreateSubscription(ctx, sub); err != nil {
return errors.Wrap(err, "create in storage")
}
if sub.Error.IsZero() {
p.submitTask(sub.FeedID)
p.stateListeners.OnResume(ctx, sub)
return nil
}
p.stateListeners.OnSuspend(ctx, sub)
return nil
}
return errors.New("failed to find matching vendor")
}
func (p *Impl) Suspend(ctx context.Context, header feed.Header, err error) error {
var sub *feed.Subscription
if err := p.Storage.Tx(ctx, func(tx feed.Tx) error {
if err := tx.UpdateSubscription(header, err); err != nil {
return err
}
sub, err = tx.GetSubscription(header)
return err
}); err != nil {
return err
}
p.stateListeners.OnSuspend(ctx, sub)
return nil
}
func (p *Impl) Resume(ctx context.Context, header feed.Header) error {
vendor, ok := p.vendors[header.Vendor]
if !ok {
return errors.Errorf("no vendor for %s", header.Vendor)
}
if listener, ok := vendor.(feed.BeforeResumeListener); ok {
err := listener.BeforeResume(ctx, header)
logf.Get(p).Resultf(ctx, logf.Debug, logf.Warn, "before resume [%s]: %v", header, err)
if err != nil {
return err
}
}
var sub *feed.Subscription
if err := p.Storage.Tx(ctx, func(tx feed.Tx) error {
if err := tx.UpdateSubscription(header, nil); err != nil {
return err
}
var err error
sub, err = tx.GetSubscription(header)
return err
}); err != nil {
return err
}
p.submitTask(header.FeedID)
p.stateListeners.OnResume(ctx, sub)
return nil
}
func (p *Impl) Delete(ctx context.Context, header feed.Header) error {
var sub *feed.Subscription
if err := p.Storage.Tx(ctx, func(tx feed.Tx) error {
var err error
sub, err = tx.GetSubscription(header)
if err != nil {
return err
}
return tx.DeleteSubscription(header)
}); err != nil {
return err
}
p.stateListeners.OnDelete(ctx, sub)
return nil
}
func (p *Impl) Clear(ctx context.Context, feedID feed.ID, pattern string) error {
deleted, err := p.Storage.DeleteAllSubscriptions(ctx, feedID, pattern)
if err != nil {
return err
}
p.stateListeners.OnDeleteAll(ctx, feedID, pattern, deleted)
return nil
}
func (p *Impl) submitTask(feedID feed.ID) {
p.Executor.Submit(feedID, func(ctx context.Context) error {
for {
sub, err := p.Storage.ShiftSubscription(ctx, feedID)
if err != nil {
return err
}
updates, err := p.refresh(ctx, sub)
logf.Get(p).Resultf(ctx, logf.Debug, logf.Warn, "received %d updates for [%s]: %v", updates, sub, err)
switch {
case syncf.IsContextRelated(err):
return err
case err != nil:
sub.Error = null.StringFrom(err.Error())
err := p.Storage.UpdateSubscription(ctx, sub.Header, err)
logf.Get(p).Resultf(ctx, logf.Trace, logf.Warn, "update [%s] in db: %v", sub, err)
switch {
case syncf.IsContextRelated(err):
return err
case err == nil:
p.stateListeners.OnSuspend(ctx, sub)
}
}
if err := flu.Sleep(ctx, p.Interval); err != nil {
return err
}
}
})
}
func (p *Impl) refresh(ctx context.Context, sub *feed.Subscription) (int, error) {
vendor, ok := p.vendors[sub.Vendor]
if !ok {
return 0, errors.Errorf("vendor [%s] not found", sub.Vendor)
}
header := sub.Header
queue := newUpdateQueue(sub.Data, p.Preload)
defer syncf.GoSync(ctx, func(ctx context.Context) {
defer queue.close()
defer func() {
if r := recover(); r != nil {
err, ok := r.(error)
if !ok {
err = errors.Errorf("%+v", r)
}
_ = queue.cancel(ctx, err)
}
}()
logf.Get(p).Debugf(ctx, "starting [%s] refresh now", sub)
if err := vendor.Refresh(ctx, header, queue); err != nil {
_ = queue.cancel(ctx, err)
}
})()
count := 0
for update := range queue.channel {
if err := update.err; err != nil {
return 0, err
}
if update.writeHTML != nil {
html := p.createHTMLWriter(ctx, sub.FeedID)
if err := update.writeHTML(html); err != nil {
return 0, errors.Wrap(err, "write HTML")
}
if err := html.Flush(); err != nil {
return 0, errors.Wrap(err, "flush HTML")
}
}
if err := p.Storage.UpdateSubscription(ctx, header, update.data); err != nil {
return 0, errors.Wrap(err, "update in storage")
}
logf.Get(p).Tracef(ctx, "update [%s]: ok", sub)
p.Metrics.Counter("refresh_update", header.Labels()).Inc()
count++
}
if count == 0 {
if err := p.Storage.UpdateSubscription(ctx, header, sub.Data); err != nil {
return 0, errors.Wrap(err, "update in storage")
}
}
return count, nil
}
func (p *Impl) createHTMLWriter(ctx context.Context, feedID feed.ID) *tghtml.Writer {
return (&tghtml.Writer{
Out: &output.Paged{
Receiver: &receiver.Chat{
Sender: p.Telegram,
ID: telegram.ID(feedID),
Silent: true,
ParseMode: telegram.HTML,
},
},
}).WithContext(output.With(ctx, tghtml.DefaultMaxMessageSize*9/10, 0))
}
package poller
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/gormf"
)
type update struct {
writeHTML feed.WriteHTML
data gormf.JSONB
err error
}
type updateQueue struct {
init gormf.JSONB
channel chan update
}
func newUpdateQueue(init gormf.JSONB, size int) updateQueue {
return updateQueue{
init: init,
channel: make(chan update, size),
}
}
func (q updateQueue) Init(ctx context.Context, value interface{}) error {
if err := q.init.As(value); err != nil {
_ = q.cancel(ctx, err)
return err
}
return nil
}
func (q updateQueue) Submit(ctx context.Context, writeHTML feed.WriteHTML, value interface{}) error {
data, err := gormf.ToJSONB(value)
if err != nil {
return err
}
update := update{
writeHTML: writeHTML,
data: data,
}
return q.submit(ctx, update)
}
func (q updateQueue) cancel(ctx context.Context, err error) error {
return q.submit(ctx, update{err: err})
}
func (q updateQueue) submit(ctx context.Context, update update) error {
select {
case q.channel <- update:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (q updateQueue) close() {
close(q.channel)
}
package poller
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/logf"
)
type StateListeners []feed.AfterStateListener
func (ls StateListeners) OnResume(ctx context.Context, sub *feed.Subscription) {
for _, l := range ls {
err := l.AfterResume(ctx, sub)
logf.Get(l).Resultf(ctx, logf.Trace, logf.Warn, "on resume [%s]: %v", sub, err)
}
}
func (ls StateListeners) OnSuspend(ctx context.Context, sub *feed.Subscription) {
for _, l := range ls {
err := l.AfterSuspend(ctx, sub)
logf.Get(l).Resultf(ctx, logf.Trace, logf.Warn, "on suspend [%s]: %v", sub, err)
}
}
func (ls StateListeners) OnDelete(ctx context.Context, sub *feed.Subscription) {
for _, l := range ls {
err := l.AfterDelete(ctx, sub)
logf.Get(l).Resultf(ctx, logf.Trace, logf.Warn, "on delete [%s]: %v", sub, err)
}
}
func (ls StateListeners) OnDeleteAll(ctx context.Context, feedID feed.ID, pattern string, deleted int64) {
for _, l := range ls {
err := l.AfterClear(ctx, feedID, pattern, deleted)
logf.Get(l).Resultf(ctx, logf.Trace, logf.Warn, "on delete all [%d, '%s', %d]: %v", feedID, pattern, deleted, err)
}
}
package storage
import (
"context"
"fmt"
"strings"
"time"
"github.com/jfk9w-go/flu/colf"
"github.com/jfk9w-go/flu/gormf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type SQL struct {
Clock syncf.Clock
DB *gorm.DB
IsPG bool
}
func (s *SQL) GetActiveFeedIDs(ctx context.Context) ([]feed.ID, error) {
feedIDs := make([]feed.ID, 0)
return feedIDs, s.DB.WithContext(ctx).
Model(new(feed.Subscription)).
Where("error is null").
Select("distinct feed_id").
Scan(&feedIDs).
Error
}
func (s *SQL) GetSubscription(ctx context.Context, header feed.Header) (*feed.Subscription, error) {
return (&sqlTx{db: s.DB.WithContext(ctx)}).GetSubscription(header)
}
func (s *SQL) CreateSubscription(ctx context.Context, sub *feed.Subscription) error {
tx := s.DB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Omit("updated_at").
Create(sub)
if tx.Error == nil && tx.RowsAffected < 1 {
return errors.New("exists")
}
return tx.Error
}
func (s *SQL) ShiftSubscription(ctx context.Context, feedID feed.ID) (*feed.Subscription, error) {
var sub feed.Subscription
err := s.DB.WithContext(ctx).
Where("feed_id = ? and error is null", feedID).
Order("updated_at asc nulls first").
First(&sub).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, feed.ErrNotFound
}
return &sub, err
}
func (s *SQL) ListSubscriptions(ctx context.Context, feedID feed.ID, active bool) ([]feed.Subscription, error) {
var subs []feed.Subscription
return subs, s.DB.WithContext(ctx).
Where("feed_id = ? and (error is null) = ? and (error is null or error != ?)", feedID, active, feed.Deadborn).
Find(&subs).
Error
}
func (s *SQL) DeleteAllSubscriptions(ctx context.Context, feedID feed.ID, errorLike string) (int64, error) {
tx := s.DB.WithContext(ctx).
Delete(new(feed.Subscription), "feed_id = ? and error like ?", feedID, errorLike)
return tx.RowsAffected, tx.Error
}
func (s *SQL) UpdateSubscription(ctx context.Context, header feed.Header, value any) error {
tx := &sqlTx{
clock: s.Clock,
db: s.DB.WithContext(ctx),
}
return tx.UpdateSubscription(header, value)
}
func (s *SQL) Tx(ctx context.Context, body func(tx feed.Tx) error) error {
return s.tx(ctx, func(tx *gorm.DB) error { return body(&sqlTx{clock: s.Clock, db: s.DB}) })
}
func (s *SQL) SaveEvent(ctx context.Context, feedID feed.ID, eventType string, value any) error {
return (&sqlTx{clock: s.Clock, db: s.DB.WithContext(ctx)}).SaveEvent(feedID, eventType, value)
}
func (s *SQL) CountEventsBy(ctx context.Context, feedID feed.ID, since time.Time, key string, multipliers map[string]float64) (map[string]int64, error) {
if err := postgresDisclaimer(s.IsPG, "CountEventsBy"); err != nil {
return nil, err
}
var rows []struct {
Type string
Key string
Events int64
}
types := colf.Keys[string, float64](multipliers)
if err := s.DB.WithContext(ctx).Raw(fmt.Sprintf( /* language=SQL */ `
select type, jsonb_extract_path_text(data, '%s') as key, count(1) as events
from event
where chat_id = ? and type in ? and time >= ?
group by 1, 2`, key),
feedID, types, since).
Scan(&rows).
Error; err != nil {
return nil, err
}
stats := make(map[string]int64)
for _, row := range rows {
stats[row.Key] += int64(float64(row.Events) * multipliers[row.Type])
}
return stats, nil
}
func (s *SQL) EventTx(ctx context.Context, body func(tx feed.EventTx) error) error {
return s.tx(ctx, func(tx *gorm.DB) error { return body(&sqlTx{clock: s.Clock, db: s.DB, isPG: s.IsPG}) })
}
func (s *SQL) IsMediaUnique(ctx context.Context, hash *feed.MediaHash) (bool, error) {
update := clause.Set{
clause.Assignment{Column: clause.Column{Name: "collisions"}, Value: gorm.Expr("blob.collisions + 1")},
clause.Assignment{Column: clause.Column{Name: "url"}, Value: hash.URL},
clause.Assignment{Column: clause.Column{Name: "hash_type"}, Value: hash.Type},
clause.Assignment{Column: clause.Column{Name: "hash"}, Value: hash.Value},
clause.Assignment{Column: clause.Column{Name: "last_seen"}, Value: hash.LastSeen},
}
err := s.DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.
Clauses(gormf.OnConflictClause(hash, "primaryKey", false, update)).
Create(hash).
Error; err != nil {
return errors.Wrap(err, "create")
}
if err := tx.First(hash).Error; err != nil {
return errors.Wrap(err, "find")
}
return nil
})
ok := false
if err == nil && hash.Collisions == 0 {
ok = true
}
return ok, err
}
func (s *SQL) tx(ctx context.Context, body func(tx *gorm.DB) error) error {
return s.DB.WithContext(ctx).Transaction(body)
}
type sqlTx struct {
clock syncf.Clock
db *gorm.DB
isPG bool
}
func (stx *sqlTx) GetSubscription(header feed.Header) (*feed.Subscription, error) {
var sub feed.Subscription
err := stx.db.
Where("lower(sub_id) = lower(?) and vendor = ? and feed_id = ?", header.SubID, header.Vendor, header.FeedID).
First(&sub).
Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, feed.ErrNotFound
}
return &sub, err
}
func (stx *sqlTx) DeleteSubscription(header feed.Header) error {
tx := stx.db.Delete(&feed.Subscription{Header: header})
if tx.Error == nil && tx.RowsAffected < 1 {
return feed.ErrNotFound
}
return tx.Error
}
func (stx *sqlTx) UpdateSubscription(header feed.Header, value interface{}) error {
tx := stx.db.
Model(new(feed.Subscription)).
Where("sub_id = ? and vendor = ? and feed_id = ?",
header.SubID, header.Vendor, header.FeedID)
updates := make(map[string]interface{})
updates["updated_at"] = stx.clock.Now()
switch value := value.(type) {
case nil:
tx = tx.Where("error is not null")
updates["error"] = nil
case gormf.JSONB:
tx = tx.Where("error is null")
updates["data"] = value
case error:
tx = tx.Where("error is null")
updates["error"] = value.Error()
default:
return errors.Errorf("invalid update value type: %T", value)
}
tx = tx.UpdateColumns(updates)
if tx.Error == nil && tx.RowsAffected < 1 {
return feed.ErrNotFound
}
return tx.Error
}
func (stx *sqlTx) GetLastEventData(feedID feed.ID, eventType string, filter map[string]any, value any) error {
if err := postgresDisclaimer(stx.isPG, "GetLastEventData"); err != nil {
return err
}
where, values := whereEvent(feedID, []string{eventType}, filter)
var row struct {
Data gormf.JSONB
}
if err := stx.db.Model(new(feed.Event)).
Where(where, values...).
Order("time desc").
Limit(1).
Select("data").
Scan(&row).
Error; err != nil {
return err
}
return row.Data.As(value)
}
func (stx *sqlTx) SaveEvent(feedID feed.ID, eventType string, value any) error {
data, err := gormf.ToJSONB(value)
if err != nil {
return err
}
event := &feed.Event{
Time: stx.clock.Now(),
Type: eventType,
FeedID: feedID,
Data: data,
}
return stx.db.Create(event).Error
}
func (stx *sqlTx) DeleteEvents(feedID feed.ID, types []string, filter map[string]any) error {
if err := postgresDisclaimer(stx.isPG, "DeleteEvents"); err != nil {
return err
}
where, values := whereEvent(feedID, types, filter)
return stx.db.
Delete(new(feed.Event), append([]any{where}, values...)...).
Error
}
func (stx *sqlTx) CountEventsByType(feedID feed.ID, types []string, filter map[string]any) (map[string]int64, error) {
if err := postgresDisclaimer(stx.isPG, "CountEventsByType"); err != nil {
return nil, err
}
var rows []struct {
Type string
Events int64
}
where, values := whereEvent(feedID, types, filter)
if err := stx.db.Raw(fmt.Sprintf( /* language=SQL */ `
select type, count(1) as events
from event
where %s
group by type`, where),
values...).
Scan(&rows).
Error; err != nil {
return nil, err
}
stats := make(map[string]int64)
for _, row := range rows {
stats[row.Type] = row.Events
}
return stats, nil
}
func postgresDisclaimer(isPG bool, name string) error {
if !isPG {
logf.Get(ServiceID).Warnf(context.TODO(), "%s is not supported, you may want to switch to postgres", name)
return feed.ErrUnsupported
}
return nil
}
func whereEvent(feedID feed.ID, types []string, filter map[string]any) (string, []any) {
var where strings.Builder
where.WriteString("chat_id = ? and type in ?")
values := []any{feedID, types}
for key, value := range filter {
where.WriteString(fmt.Sprintf(` and jsonb_extract_path_text(data, '%s') = ?::text`, key))
values = append(values, value)
}
return where.String(), values
}
package core
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/core/internal/mediator"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
)
type MediatorConfig struct {
Concurrency int `yaml:"concurrency,omitempty" doc:"How many concurrent media downloads to allow." default:"5"`
Timeout flu.Duration `yaml:"timeout,omitempty" doc:"If mediation time exceeds timeout, it will be interrupted." default:"10m"`
}
type MediatorService interface {
feed.Mediator
RegisterMediaResolver(resolver media.Resolver)
RegisterMediaConverter(converter media.Converter)
}
type MediatorContext interface {
apfel.PrometheusContext
BlobContext
StorageContext
MediatorConfig() MediatorConfig
}
type Mediator[C MediatorContext] struct {
MediatorService
}
func (m Mediator[C]) String() string {
return mediator.ServiceID
}
func (m *Mediator[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if m.MediatorService != nil {
return nil
}
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var blobs Blobs[C]
if err := app.Use(ctx, &blobs, false); err != nil {
return err
}
var metrics apfel.Prometheus[C]
if err := app.Use(ctx, &metrics, false); err != nil {
return err
}
config := app.Config().MediatorConfig()
mediator := &mediator.Impl{
Clock: app,
Storage: storage,
Blobs: blobs,
Metrics: metrics.Registry().WithPrefix("app_media"),
Locker: syncf.Semaphore(app, config.Concurrency, 0),
Timeout: config.Timeout.Value,
}
if err := app.Manage(ctx, mediator); err != nil {
return err
}
m.MediatorService = mediator
return nil
}
func (m *Mediator[C]) AfterInclude(ctx context.Context, app apfel.MixinApp[C], mixin apfel.Mixin[C]) error {
if resolver, ok := mixin.(media.Resolver); ok {
m.RegisterMediaResolver(resolver)
logf.Get(m).Infof(ctx, "register resolver [%s]: ok", resolver)
}
if converter, ok := mixin.(media.Converter); ok {
m.RegisterMediaConverter(converter)
logf.Get(m).Infof(ctx, "register converter [%s]: ok", converter)
}
return nil
}
package core
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/core/internal/poller"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
)
type PollerService interface {
feed.Poller
RegisterVendor(id string, vendor feed.Vendor) error
RegisterStateListener(listener feed.AfterStateListener)
RestoreActive(ctx context.Context) error
}
type PollerConfig struct {
RefreshEvery flu.Duration `yaml:"refreshEvery,omitempty" doc:"Feed update interval." default:"1m"`
Preload int `yaml:"preload,omitempty" doc:"Number of items to preload." default:"5"`
}
type PollerContext interface {
apfel.PrometheusContext
tapp.Context
StorageContext
PollerConfig() PollerConfig
}
type Poller[C PollerContext] struct {
PollerService
}
func (p Poller[C]) String() string {
return poller.ServiceID
}
func (p *Poller[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if p.PollerService != nil {
return nil
}
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var executor TaskExecutor[C]
if err := app.Use(ctx, &executor, false); err != nil {
return err
}
var metrics apfel.Prometheus[C]
if err := app.Use(ctx, &metrics, false); err != nil {
return err
}
var bot tapp.Mixin[C]
if err := app.Use(ctx, &bot, false); err != nil {
return err
}
config := app.Config().PollerConfig()
p.PollerService = &poller.Impl{
Clock: app,
Storage: storage,
Executor: executor,
Metrics: metrics.Registry().WithPrefix("app_aggregator"),
Telegram: bot.Bot(),
Interval: config.RefreshEvery.Value,
Preload: config.Preload,
}
return nil
}
func (p *Poller[C]) AfterInclude(ctx context.Context, app apfel.MixinApp[C], mixin apfel.Mixin[C]) error {
if vendor, ok := mixin.(feed.Vendor); ok {
err := p.RegisterVendor(mixin.String(), vendor)
logf.Get(p).Resultf(ctx, logf.Info, logf.Panic, "register vendor [%s]: %v", mixin, err)
}
if listener, ok := mixin.(feed.AfterStateListener); ok {
p.RegisterStateListener(listener)
logf.Get(p).Infof(ctx, "registered state listener [%s]: ok", mixin)
}
return nil
}
package core
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/core/internal/storage"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/apfel"
"github.com/pkg/errors"
)
type StorageService interface {
feed.Storage
feed.EventStorage
feed.MediaHashStorage
}
type StorageContext interface {
StorageConfig() apfel.GormConfig
}
type Storage[C StorageContext] struct {
StorageService
}
func (s Storage[C]) String() string {
return storage.ServiceID
}
func (s *Storage[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if s.StorageService != nil {
return nil
}
config := app.Config().StorageConfig()
if config.Driver != "postgres" {
logf.Get(s).Warnf(ctx, "database driver is not postgres â some functions will be unavailable; "+
"consider switching to postgres")
}
db := &apfel.GormDB[C]{Config: config}
if err := app.Use(ctx, db, false); err != nil {
return err
}
if err := db.DB().AutoMigrate(new(feed.Subscription), new(feed.Event), new(feed.MediaHash)); err != nil {
return errors.Wrap(err, "auto-migrate")
}
s.StorageService = &storage.SQL{
Clock: app,
DB: db.DB().Debug(),
IsPG: db.Config.Driver == "postgres",
}
return nil
}
package core
import (
"context"
"fmt"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/colf"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
)
const taskExecutorServiceID = "core.task-executor"
type TaskExecutor[C any] struct {
feed.TaskExecutor
}
func (e TaskExecutor[C]) String() string {
return taskExecutorServiceID
}
func (e *TaskExecutor[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if e.TaskExecutor != nil {
return nil
}
executor := &taskExecutor{tasks: make(colf.Set[string])}
executor.ctx, executor.cancel = context.WithCancel(context.Background())
if err := app.Manage(ctx, executor); err != nil {
return err
}
e.TaskExecutor = executor
return nil
}
type taskExecutor struct {
ctx context.Context
tasks colf.Set[string]
cancel func()
work syncf.WaitGroup
mu syncf.RWMutex
}
func (e *taskExecutor) String() string {
return taskExecutorServiceID
}
func (e *taskExecutor) Submit(id any, task feed.Task) {
key := fmt.Sprint(id)
ctx, cancel := e.mu.Lock(e.ctx)
if ctx.Err() != nil {
return
}
defer cancel()
if _, ok := e.tasks[key]; ok {
return
}
_, _ = syncf.GoWith(e.ctx, e.work.Spawn, func(ctx context.Context) {
defer func() {
_, cancel := e.mu.Lock(context.Background())
defer cancel()
delete(e.tasks, key)
}()
err := task(ctx)
logf.Resultf(ctx, logf.Debug, logf.Warn, "task [%s] completed: %v", key, err)
})
e.tasks.Add(key)
logf.Get(e).Debugf(ctx, "started task [%s]", key)
}
func (e *taskExecutor) Close() error {
e.cancel()
e.work.Wait()
return nil
}
package converters
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/aconvert-api"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
)
var aconvertFormats = map[string]string{
"video/mp4": "mp4",
}
type Aconvert[C aconvert.Context] struct {
*aconvert.Client[C]
}
func (c Aconvert[C]) String() string {
return "media-converters.aconvert"
}
func (c *Aconvert[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var client aconvert.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
c.Client = &client
return nil
}
func (c *Aconvert[C]) Convert(ctx context.Context, ref media.Ref, mimeType string) (media.MetaRef, error) {
format, ok := aconvertFormats[mimeType]
if !ok {
return nil, nil
}
input, err := ref.Get(ctx)
if err != nil {
return nil, err
}
resp, err := c.Client.Convert(ctx, input, aconvert.Options{}.TargetFormat(format))
logf.Get(c).Resultf(ctx, logf.Debug, logf.Warn, "convert %s: %v", flu.Readable(input), err)
if err != nil {
return nil, err
}
return &media.HTTPRef{
URL: resp.URL(),
Buffer: true,
}, nil
}
package converters
import (
"context"
"os"
"os/exec"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/pkg/errors"
ffmpeg "github.com/u2takey/ffmpeg-go"
)
const ffmpegServiceID = "media-converters.ffmpeg"
type ffmpegFormat struct {
f, vc, ac string
}
var ffmpegFormats = map[string]ffmpegFormat{
"video/mp4": {"mp4", "libx264", "aac"},
}
type FFmpeg[C core.BlobContext] struct {
clock syncf.Clock
blobs feed.Blobs
}
func (c FFmpeg[C]) String() string {
return ffmpegServiceID
}
func (c *FFmpeg[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
_, err := exec.LookPath("ffmpeg")
logf.Get(c).Resultf(ctx, logf.Info, logf.Warn, "check ffmpeg in $PATH: %v", err)
if err != nil {
return apfel.ErrDisabled
}
var blobs core.Blobs[C]
if err := app.Use(ctx, &blobs, false); err != nil {
return err
}
c.clock = app
c.blobs = &blobs
return nil
}
func (c *FFmpeg[C]) Convert(ctx context.Context, ref media.Ref, mimeType string) (media.MetaRef, error) {
format, ok := ffmpegFormats[mimeType]
if !ok {
return nil, nil
}
input, err := ref.Get(ctx)
if err != nil {
return nil, err
}
var stream *ffmpeg.Stream
switch input := input.(type) {
case flu.File:
stream = ffmpeg.Input(input.String())
case flu.URL:
stream = ffmpeg.Input(input.String())
default:
input, err := c.blobs.Buffer("", syncf.Val[flu.Input]{V: input}).Get(ctx)
if err != nil {
return nil, err
}
file, ok := input.(flu.File)
if !ok {
return nil, errors.Errorf("only flu.File blobs are supported, got %T", input)
}
stream = ffmpeg.Input(file.String())
}
blob := c.blobs.Buffer(mimeType, syncf.Val[flu.Input]{V: make(flu.Bytes, 0)})
ctx = core.SkipSizeCheck(ctx)
output, err := blob.Get(ctx)
if err != nil {
return nil, err
}
file, ok := output.(flu.File)
if !ok {
return nil, errors.Errorf("only flu.File blobs are supported, got %T", output)
}
startTime := c.clock.Now()
stream = stream.Output(file.String(), ffmpeg.KwArgs{
"c": "copy",
"f": format.f,
"c:v": format.vc,
"c:a": format.ac,
})
stream.Context = ctx
err = stream.OverWriteOutput().Run()
logf.Get(c).Resultf(ctx, logf.Debug, logf.Warn,
"convert [%s] => [%s] in %s: %v",
flu.Readable(input), output, c.clock.Now().Sub(startTime), err)
if err != nil {
return nil, err
}
stat, err := os.Stat(file.String())
if err != nil {
return nil, err
}
return &media.LocalRef{
Input: file,
Meta: &media.Meta{
MIMEType: mimeType,
Size: media.Size(stat.Size()),
},
}, nil
}
package resolvers
import (
"context"
"net/url"
"strings"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/dvach"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
)
type Dvach[C dvach.Context] struct {
client httpf.Client
}
func (r *Dvach[C]) String() string {
return "media-resolver.dvach"
}
func (r *Dvach[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var client dvach.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
r.client = &client
return nil
}
func (r *Dvach[C]) Resolve(ctx context.Context, url *url.URL) (media.MetaRef, error) {
if !strings.Contains(url.Host, "2ch.hk") {
return nil, nil
}
return &media.HTTPRef{
URL: url.String(),
Client: r.client,
Buffer: true,
}, nil
}
package resolvers
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
)
type GfycatLike[C any] struct {
Name string
}
func (r GfycatLike[C]) String() string {
return "media-resolver." + r.Name
}
func (r *GfycatLike[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
return nil
}
func (r *GfycatLike[C]) Resolve(ctx context.Context, source *url.URL) (media.MetaRef, error) {
if !strings.Contains(source.Host, r.Name) {
return nil, nil
}
url := strings.Trim(source.String(), "/")
lastSlash := strings.LastIndex(url, "/")
code := url[lastSlash+1:]
var resp struct {
GfyItem struct {
URL string `json:"mp4Url"`
} `json:"gfyItem"`
}
apiURL := fmt.Sprintf("https://api.%s.com/v1/gfycats/%s", r.Name, code)
if err := httpf.GET(apiURL).
Exchange(ctx, nil).
CheckStatus(http.StatusOK).
DecodeBody(flu.JSON(&resp)).
Error(); err != nil {
return nil, err
}
return &media.HTTPRef{
URL: resp.GfyItem.URL,
Buffer: true,
}, nil
}
package resolvers
import (
"bufio"
"context"
"net/http"
"net/url"
"regexp"
"strings"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/pkg/errors"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
)
var imgurRegexp = regexp.MustCompile(`.*?(<link rel="image_src"\s+href="|<meta property="og:video"\s+content=")(.*?)".*`)
type Imgur[C any] struct{}
func (r Imgur[C]) String() string {
return "media-resolver.imgur"
}
func (r *Imgur[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
return nil
}
func (r *Imgur[C]) Resolve(ctx context.Context, source *url.URL) (media.MetaRef, error) {
switch source.Host {
case "imgur.com", "www.imgur.com", "i.imgur.com", "m.imgur.com":
default:
return nil, nil
}
url := source.String()
switch {
case strings.Contains(url, ".gifv"):
return &media.HTTPRef{
URL: strings.Replace(url, ".gifv", ".mp4", 1),
}, nil
case strings.Contains(url, ".jpg") ||
strings.Contains(url, ".jpeg") ||
strings.Contains(url, ".png") ||
strings.Contains(url, ".gif"):
return &media.HTTPRef{URL: url}, nil
}
var ref media.HTTPRef
return &ref, httpf.GET(url).
Exchange(ctx, nil).
CheckStatus(http.StatusOK).
HandleFunc(func(resp *http.Response) error {
defer flu.CloseQuietly(resp.Body)
contentType := resp.Header.Get("Content-Type")
if strings.HasPrefix(contentType, "text/html") {
return errors.New("not an html")
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
groups := imgurRegexp.FindStringSubmatch(line)
if len(groups) == 3 {
ref.URL = groups[2]
return nil
}
}
return errors.New("unable to find URL")
}).Error()
}
package resolvers
import (
"context"
"net/url"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/redditsave"
"github.com/jfk9w/hikkabot/v4/internal/feed/media"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/httpf"
"github.com/pkg/errors"
)
type RedditContext interface {
reddit.Context
redditsave.Context
}
type Reddit[C RedditContext] struct {
client httpf.Client
redditsave redditsave.Interface
}
func (r *Reddit[C]) String() string {
return "media-resolver.reddit"
}
func (r *Reddit[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var client reddit.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
var redditsave redditsave.Client[C]
if err := app.Use(ctx, &redditsave, false); err != nil {
return err
}
r.client = client
r.redditsave = redditsave
return nil
}
func (r *Reddit[C]) Resolve(ctx context.Context, source *url.URL) (media.MetaRef, error) {
switch source.Host {
case "preview.redd.it":
return &media.HTTPRef{
URL: source.String(),
Client: r.client,
Buffer: true,
}, nil
case "v.redd.it":
url, err := r.redditsave.ResolveURL(ctx, source.String())
if err != nil {
return nil, errors.Wrap(err, "via redditsave")
}
return &media.HTTPRef{
URL: url,
Client: r.client,
Buffer: true,
Meta: &media.Meta{
MIMEType: "video/mp4",
},
}, nil
default:
return nil, nil
}
}
package dvach
import (
"context"
"regexp"
"sort"
"strings"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/dvach"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/ext/vendors/dvach/internal"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/util"
"github.com/pkg/errors"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/telegram-bot-api"
tghtml "github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/output"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
)
var catalogRegexp = regexp.MustCompile(`^((http|https)://)?(2ch\.hk)?/([a-z]+)(/)?$`)
type CatalogData struct {
Board string `json:"board"`
Query util.Regexp `json:"query,omitempty"`
Offset int `json:"offset,omitempty"`
Auto []string `json:"auto,omitempty"`
}
type Catalog[C Context] struct {
client dvach.Interface
mediator feed.Mediator
}
func (v *Catalog[C]) String() string {
return "2ch/catalog"
}
func (v *Catalog[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var client dvach.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
var mediator core.Mediator[C]
if err := app.Use(ctx, &mediator, false); err != nil {
return err
}
v.client = &client
v.mediator = &mediator
return nil
}
func (v *Catalog[C]) Parse(ctx context.Context, ref string, options []string) (*feed.Draft, error) {
groups := catalogRegexp.FindStringSubmatch(ref)
if len(groups) < 6 {
return nil, nil
}
data := &CatalogData{Board: groups[4]}
loop:
for i, option := range options {
switch {
case option == "auto":
data.Auto = options[i+1:]
break loop
case strings.HasPrefix(option, "re="):
option = option[3:]
fallthrough
default:
if re, err := regexp.Compile(option); err != nil {
return nil, errors.Wrap(err, "compile regexp")
} else {
data.Query.Regexp = re
}
}
}
catalog, err := v.client.GetCatalog(ctx, data.Board)
if err != nil {
return nil, errors.Wrap(err, "get catalog")
}
draft := &feed.Draft{
SubID: data.Board + "/" + data.Query.String(),
Name: catalog.BoardName + " /" + data.Query.String() + "/",
Data: &data,
}
if len(data.Auto) != 0 {
auto := strings.Join(data.Auto, " ")
draft.SubID += "/" + auto
draft.Name += " [" + auto + "]"
}
return draft, nil
}
func (v *Catalog[C]) Refresh(ctx context.Context, header feed.Header, refresh feed.Refresh) error {
var data CatalogData
if err := refresh.Init(ctx, &data); err != nil {
return err
}
catalog, err := v.client.GetCatalog(ctx, data.Board)
if err != nil {
logf.Get(v).Warnf(ctx, "failed to get catalog for [%s]: %v", header, err)
return nil
}
sort.Sort(internal.Posts(catalog.Threads))
for i := range catalog.Threads {
post := &catalog.Threads[i]
writeHTML := v.writeHTML(ctx, data, post)
if writeHTML == nil {
continue
}
data.Offset = post.Num
if err := refresh.Submit(ctx, writeHTML, data); err != nil {
return err
}
}
return nil
}
func (v *Catalog[C]) writeHTML(ctx context.Context, data CatalogData, post *dvach.Post) feed.WriteHTML {
if post.Num <= data.Offset {
return nil
}
if !data.Query.MatchString(strings.ToLower(post.Comment)) {
return nil
}
var mediaRef receiver.MediaRef
if len(post.Files) > 0 {
mediaRef = v.mediator.Mediate(ctx, post.Files[0].URL(), nil)
}
return func(html *tghtml.Writer) error {
ctx := html.Context()
if mediaRef != nil {
ctx = output.With(ctx, tghtml.DefaultMaxCaptionSize, 1)
}
if len(data.Auto) != 0 {
button := (&telegram.Command{Key: "/sub " + post.URL(), Args: data.Auto}).Button("")
button[0] = button[2]
ctx = receiver.ReplyMarkup(ctx, telegram.InlineKeyboard([]telegram.Button{button}))
}
html = html.WithContext(ctx)
html.Bold(post.DateString).Text("\n").
Link("[link]", post.URL())
if post.Comment != "" {
html.Text("\n---\n").MarkupString(post.Comment)
}
if mediaRef != nil {
html.Media(post.URL(), mediaRef, true, true)
}
return nil
}
}
package internal
import (
"fmt"
"strings"
tghtml "github.com/jfk9w-go/telegram-bot-api/ext/html"
"golang.org/x/net/html"
)
type AnchorFormat struct {
Board string
}
func (f AnchorFormat) Format(text string, attrs []html.Attribute) string {
if dataNum := tghtml.Get(attrs, "data-num"); dataNum != "" {
return fmt.Sprintf(`#%s%s`, strings.ToUpper(f.Board), dataNum)
} else {
return tghtml.DefaultAnchorFormat.Format(text, attrs)
}
}
package internal
import "github.com/jfk9w/hikkabot/v4/internal/3rdparty/dvach"
type Posts []dvach.Post
func (ps Posts) Len() int {
return len(ps)
}
func (ps Posts) Less(i, j int) bool {
return ps[i].Num < ps[j].Num
}
func (ps Posts) Swap(i, j int) {
ps[i], ps[j] = ps[j], ps[i]
}
package dvach
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/dvach"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/ext/vendors/dvach/internal"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/util"
"github.com/pkg/errors"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
)
var threadRegexp = regexp.MustCompile(`^((http|https)://)?(2ch\.hk)?/([a-z]+)/res/([0-9]+)\.html?$`)
type ThreadData struct {
Board string `json:"board"`
Num int `json:"num"`
MediaOnly bool `json:"media_only,omitempty"`
Offset int `json:"offset,omitempty"`
Tag string `json:"tag"`
}
type Thread[C Context] struct {
client dvach.Interface
mediator feed.Mediator
}
func (v Thread[C]) String() string {
return "2ch/thread"
}
func (v *Thread[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var client dvach.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
var mediator core.Mediator[C]
if err := app.Use(ctx, &mediator, false); err != nil {
return err
}
v.client = &client
v.mediator = &mediator
return nil
}
func (v *Thread[C]) Parse(ctx context.Context, ref string, options []string) (*feed.Draft, error) {
groups := threadRegexp.FindStringSubmatch(ref)
if len(groups) < 6 {
return nil, nil
}
data := &ThreadData{Board: groups[4]}
data.Num, _ = strconv.Atoi(groups[5])
for _, option := range options {
switch {
case option == "m":
data.MediaOnly = true
case strings.HasPrefix(option, "#"):
data.Tag = option
}
}
post, err := v.client.GetPost(ctx, data.Board, data.Num)
if err != nil {
return nil, errors.Wrap(err, "get post")
}
if data.Tag == "" {
data.Tag = util.Hashtag(post.Subject)
}
return &feed.Draft{
SubID: fmt.Sprintf("%s/%d", data.Board, data.Num),
Name: data.Tag,
Data: &data,
}, nil
}
func (v *Thread[C]) Refresh(ctx context.Context, header feed.Header, refresh feed.Refresh) error {
var data ThreadData
if err := refresh.Init(ctx, &data); err != nil {
return err
}
posts, err := v.client.GetThread(ctx, data.Board, data.Num, data.Offset)
if err != nil {
var dvachErr dvach.Error
if errors.As(err, &dvachErr) && dvachErr.Code == dvach.ThreadDoesNotExistErrorCode {
return err
}
logf.Get(v).Warnf(ctx, "failed to get posts for [%s]: %v", header, err)
return nil
}
for i := range posts {
post := &posts[i]
writeHTML := v.writeHTML(ctx, header, data, post)
if writeHTML == nil {
continue
}
data.Offset = post.Num + 1
if err := refresh.Submit(ctx, writeHTML, data); err != nil {
return err
}
}
return nil
}
func (v *Thread[C]) writeHTML(ctx context.Context, header feed.Header, data ThreadData, post *dvach.Post) feed.WriteHTML {
if data.MediaOnly && len(post.Files) == 0 {
return nil
}
var dedupKey *feed.ID
if data.MediaOnly {
dedupKey = &header.FeedID
}
mediaRefs := make([]receiver.MediaRef, len(post.Files))
for i, file := range post.Files {
mediaRefs[i] = v.mediator.Mediate(ctx, file.URL(), dedupKey)
}
return func(html *html.Writer) error {
if !data.MediaOnly {
if data.Tag == "" {
data.Tag = util.Hashtag(post.Subject)
}
html.Anchors = internal.AnchorFormat{Board: post.Board}
html.Text(data.Tag).Text(fmt.Sprintf("\n#%s%d", strings.ToUpper(post.Board), post.Num))
if post.IsOriginal() {
html.Text(" #OP")
}
if post.Comment != "" {
html.Text("\n---\n").MarkupString(post.Comment)
}
for i, mediaRef := range mediaRefs {
html.Media(post.Files[i].URL(), mediaRef, len(post.Files) == 1, true)
}
return nil
}
html = html.WithContext(receiver.SkipOnMediaError(html.Context()))
for i, mediaRef := range mediaRefs {
html.Text(data.Tag).Media(post.Files[i].URL(), mediaRef, true, true)
}
return nil
}
}
package reddit
import (
"context"
_ "embed"
"time"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/colf"
"github.com/jfk9w-go/flu/logf"
"github.com/pkg/errors"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/gormf"
"gorm.io/gorm"
)
//go:embed subreddit_index.sql
var subredditIndexSQL string
const storageServiceID = "vendors.reddit.storage"
type Storage[C core.StorageContext] struct {
StorageInterface
}
func (s Storage[C]) String() string {
return storageServiceID
}
func (s *Storage[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
if s.StorageInterface != nil {
return nil
}
var storage core.Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
db := &apfel.GormDB[C]{Config: app.Config().StorageConfig()}
if err := app.Use(ctx, db, false); err != nil {
return err
}
if err := db.DB().WithContext(ctx).AutoMigrate(new(reddit.Thing)); err != nil {
return err
}
if err := db.DB().WithContext(ctx).Raw(subredditIndexSQL).Error; err != nil {
return errors.Wrap(err, "create indices")
}
s.StorageInterface = &sqlStorage{
Storage: storage,
EventStorage: storage,
db: db.DB(),
isPG: db.Config.Driver == "postgres",
}
return nil
}
func (s *sqlStorage) SaveThings(ctx context.Context, things []reddit.Thing) error {
if len(things) == 0 {
return nil
}
return s.db.WithContext(ctx).
Clauses(gormf.OnConflictClause(things, "primaryKey", true, nil)).
Create(things).
Error
}
type sqlStorage struct {
feed.Storage
feed.EventStorage
db *gorm.DB
isPG bool
}
func (s *sqlStorage) RedditTx(ctx context.Context, body func(tx StorageTx) error) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { return body(&sqlStorageTx{db: s.db, isPG: s.isPG}) })
}
type sqlStorageTx struct {
db *gorm.DB
isPG bool
}
func (stx *sqlStorageTx) GetPercentile(subreddit string, top float64) (int, error) {
var percentile int
return percentile, stx.db.Raw( /* language=SQL */ `
select ups from (
select ups, cume_dist() over (order by ups) as rank
from reddit
where subreddit = ?
) as t
where t.rank > ?
order by t.rank
limit 1`, subreddit, 1-top).
Scan(&percentile).
Error
}
func (stx *sqlStorageTx) Score(feedID feed.ID, thingIDs []string) (*Score, error) {
if !stx.isPG {
logf.Get(storageServiceID).Warnf(context.TODO(), "Score not supported, you may want to switch to postgres")
return nil, feed.ErrUnsupported
}
score := new(Score)
return score, stx.db.Raw( /* language=SQL */ `
select min(time) as first,
count(distinct case when type in ('click', 'like') then jsonb_extract_path_text(data, 'thing_id') end) as liked_things,
count(distinct case when type = 'dislike' then jsonb_extract_path(data, 'user_id') end) as disliked_things,
count(distinct case when type in ('click', 'like') then jsonb_extract_path(data, 'user_id') end) as likes,
count(distinct case when type = 'dislike' then jsonb_extract_path(data, 'user_id') end) as dislikes
from event
where chat_id = ? and jsonb_extract_path_text(data, 'thing_id') in ?`,
feedID, thingIDs).
Scan(score).
Error
}
func (stx *sqlStorageTx) DeleteStaleThings(until time.Time) (int64, error) {
tx := stx.db.
Where("last_seen < ?", until).
Delete(new(reddit.Thing))
return tx.RowsAffected, tx.Error
}
func (stx *sqlStorageTx) GetFreshThingIDs(ids colf.Set[string]) (colf.Set[string], error) {
freshIDs := make(colf.Slice[string], 0)
if err := stx.db.Raw( /* language=SQL */ `
select id from reddit
where id in ?
order by num_id`,
colf.ToSlice[string](ids)).
Scan(&freshIDs).
Error; err != nil {
return nil, err
}
set := make(colf.Set[string], len(freshIDs))
colf.AddAll[string](&set, freshIDs)
return set, nil
}
package reddit
import (
"context"
"encoding/json"
"net"
"regexp"
"sort"
"time"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/colf"
"github.com/jfk9w-go/flu/httpf"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/me3x"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
"github.com/pkg/errors"
)
var subredditRegexp = regexp.MustCompile(`^(((http|https)://)?reddit\.com)?/[ur]/([0-9A-Za-z_]+)$`)
type SubredditPacingConfig struct {
Gain flu.Duration `yaml:"gain,omitempty" doc:"Do not apply pacing during this interval since subscription start." default:"48h"`
Base float64 `yaml:"base,omitempty" doc:"Base top ratio to be applied for stable subscriptions." default:"0.01"`
Min float64 `yaml:"min,omitempty" doc:"Lowest allowed top ratio."`
Scale float64 `yaml:"scale,omitempty" doc:"Top ratio multiplier. The number is highly dependent on the number of active users." default:"300"`
Members int64 `yaml:"members,omitempty" doc:"Lowest chat members threshold." default:"300"`
Batch int `yaml:"batch,omitempty" doc:"Max update batch size." default:"1"`
}
type SubredditConfig struct {
Pacing SubredditPacingConfig `yaml:"pacing,omitempty" doc:"Settings for controlling pacing based on top ratio."`
CleanInterval flu.Duration `yaml:"cleanInterval,omitempty" doc:"How often to clean things from data." default:"24h"`
ThingTTL flu.Duration `yaml:"thingTtl,omitempty" doc:"How long to keep things in database." default:"168h"`
}
type SubredditContext interface {
reddit.Context
core.MediatorContext
core.StorageContext
tapp.Context
SubredditConfig() SubredditConfig
}
type SubredditData struct {
Subreddit string `json:"subreddit"`
SentIDs colf.Set[string] `json:"sent_ids,omitempty"`
LastCleanSecs int64 `json:"last_clean,omitempty"`
Layout ThingLayout `json:"layout,omitempty"`
}
type Subreddit[C SubredditContext] struct {
config SubredditConfig
clock syncf.Clock
storage StorageInterface
client reddit.Interface
telegram telegram.Client
writer thingWriter[C]
metrics me3x.Registry
}
func (v *Subreddit[C]) String() string {
return "subreddit"
}
func (v *Subreddit[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var mediator core.Mediator[C]
if err := app.Use(ctx, &mediator, false); err != nil {
return err
}
var client reddit.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
var bot tapp.Mixin[C]
if err := app.Use(ctx, &bot, false); err != nil {
return err
}
var writer thingWriter[C]
if err := app.Use(ctx, &writer, false); err != nil {
return err
}
var listener subredditCommandListener[C]
if err := app.Use(ctx, &listener, false); err != nil {
return err
}
var metrics apfel.Prometheus[C]
if err := app.Use(ctx, &metrics, false); err != nil {
return err
}
v.config = app.Config().SubredditConfig()
v.clock = app
v.storage = storage
v.client = client
v.telegram = bot.Bot()
v.writer = writer
v.metrics = metrics.Registry().WithPrefix("app_subreddit")
return nil
}
func (v *Subreddit[C]) BeforeResume(ctx context.Context, header feed.Header) error {
return v.client.Subscribe(ctx, reddit.Subscribe, []string{header.SubID})
}
func (v *Subreddit[C]) Parse(ctx context.Context, ref string, options []string) (*feed.Draft, error) {
groups := subredditRegexp.FindStringSubmatch(ref)
if len(groups) != 5 {
return nil, nil
}
subreddit := groups[4]
things, err := v.getListing(ctx, subreddit, 1)
if err != nil {
return nil, errors.Wrap(err, "get listing")
}
if len(things) > 0 {
subreddit = things[0].Data.Subreddit
}
data := &SubredditData{Subreddit: subreddit}
for _, option := range options {
switch option {
case "t":
data.Layout.ShowText = true
case "!m":
data.Layout.HideMedia = true
case "u":
data.Layout.ShowAuthor = true
case "p":
data.Layout.ShowPaywall = true
data.Layout.HideMediaLink = true
data.Layout.HideLink = true
data.Layout.HideTitle = true
case "l":
data.Layout.ShowPreference = true
}
}
return &feed.Draft{
SubID: subreddit,
Name: getSubredditName(subreddit),
Data: data,
}, nil
}
func (v *Subreddit[C]) Refresh(ctx context.Context, header feed.Header, refresh feed.Refresh) error {
var data SubredditData
if err := refresh.Init(ctx, &data); err != nil {
return err
}
things, err := v.getListing(ctx, data.Subreddit, 100)
if err != nil {
if errors.As(err, new(net.Error)) {
return nil
} else if errors.As(err, new(*json.SyntaxError)) {
return nil
} else if codeErr := new(httpf.StatusCodeError); errors.As(err, codeErr) &&
(codeErr.StatusCode < 400 || codeErr.StatusCode >= 500) {
return nil
}
return err
}
if err := v.storage.SaveThings(ctx, things); err != nil {
return errors.Wrap(err, "save things")
}
var (
count = 0
cleanData = syncf.Lazy[any](func(ctx context.Context) (any, error) { return nil, v.cleanData(ctx, &data) })
percentile = syncf.Lazy[int](func(ctx context.Context) (int, error) { return v.getPercentile(ctx, header, data) })
)
for _, thing := range things {
thing := thing.Data
if data.SentIDs[thing.ID] {
continue
}
percentile, err := percentile.Get(ctx)
if err != nil {
return err
}
if thing.Ups < percentile || thing.IsSelf && !data.Layout.ShowText || !thing.IsSelf && data.Layout.HideMedia {
continue
}
writeHTML := v.writer.writeHTML(ctx, header.FeedID, data.Layout, thing)
if writeHTML == nil {
continue
}
if _, err := cleanData.Get(ctx); err != nil {
return err
}
data.SentIDs.Add(thing.ID)
if err := refresh.Submit(ctx, writeHTML, data); err != nil {
return err
}
count++
if count >= v.config.Pacing.Batch {
break
}
}
return nil
}
func (v *Subreddit[C]) cleanData(ctx context.Context, data *SubredditData) error {
now := v.clock.Now()
if now.Sub(time.Unix(data.LastCleanSecs, 0)) < v.config.CleanInterval.Value {
return nil
}
return v.storage.RedditTx(ctx, func(tx StorageTx) error {
deletedThings, err := tx.DeleteStaleThings(now.Add(-v.config.ThingTTL.Value))
if err != nil {
return err
}
if deletedThings > 0 {
logf.Get(v).Infof(ctx, "deleted %d stale things", deletedThings)
}
freshIDs, err := tx.GetFreshThingIDs(data.SentIDs)
if err != nil {
return err
}
data.SentIDs = freshIDs
data.LastCleanSecs = now.Unix()
return nil
})
}
func (v *Subreddit[C]) getPercentile(ctx context.Context, header feed.Header, data SubredditData) (int, error) {
members, err := v.telegram.GetChatMemberCount(ctx, telegram.ID(header.FeedID))
if err != nil {
return 0, err
}
v.metrics.Gauge("subscribers", me3x.Labels{}.Add("feed_id", header.FeedID)).Set(float64(members))
pacing := v.config.Pacing
var percentile int
return percentile, v.storage.RedditTx(ctx, func(tx StorageTx) error {
boost := 0.
if (data.Layout.ShowPreference || data.Layout.ShowPaywall) && len(data.SentIDs) > 0 {
score, err := tx.Score(header.FeedID, colf.ToSlice[string](data.SentIDs))
switch {
case err == nil && score.First != nil && v.clock.Now().Sub(*score.First) >= pacing.Gain.Value:
thingRatio := (float64(score.LikedThings) - float64(score.DislikedThings)) / float64(len(data.SentIDs))
if members < pacing.Members {
members = pacing.Members
}
userRatio := (float64(score.Likes) - float64(score.Dislikes)) / float64(members)
boost = pacing.Scale * thingRatio * userRatio
case err != nil:
logf.Get(v).Warnf(ctx, "using base pacing %.4f due to score error: %v", pacing.Base, err)
}
}
top := pacing.Base * (boost + 1)
if top < pacing.Min {
top = pacing.Min
}
v.metrics.Gauge("top", header.Labels()).Set(top)
var err error
percentile, err = tx.GetPercentile(data.Subreddit, top)
if err != nil {
return errors.Wrap(err, "get percentile")
}
return nil
})
}
func (v *Subreddit[C]) getListing(ctx context.Context, subreddit string, limit int) ([]reddit.Thing, error) {
things, err := v.client.GetListing(ctx, subreddit, "hot", limit)
if err != nil {
return nil, err
}
now := v.clock.Now()
for i := range things {
things[i].LastSeen = now
}
sort.Sort(thingSorter(things))
return things, nil
}
func getSubredditName(subreddit string) string {
return "#" + subreddit
}
package reddit
import (
"context"
"net/http"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/output"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
const (
like = "like"
dislike = "dislike"
pre = "pre"
click = "click"
)
type SubredditEventData struct {
UserID telegram.ID `json:"user_id" delete:"user_id" count:"-" pre:"user_id"`
MessageID telegram.ID `json:"message_id" delete:"message_id" count:"message_id" pre:"-"`
Subreddit string `json:"subreddit" delete:"-" count:"-" pre:"-"`
ThingID string `json:"thing_id" delete:"-" count:"-" pre:"thing_id"`
}
func newSubredditEventData(cmd *telegram.Command) SubredditEventData {
return SubredditEventData{
Subreddit: cmd.Args[0],
ThingID: cmd.Args[1],
UserID: cmd.User.ID,
MessageID: cmd.Message.ID,
}
}
func (d SubredditEventData) filter(tagName string) (map[string]any, error) {
result := make(map[string]any)
config := &mapstructure.DecoderConfig{
TagName: tagName,
Result: &result,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return nil, err
}
return result, decoder.Decode(d)
}
type subredditCommandListener[C SubredditContext] struct {
reddit reddit.Interface
storage StorageInterface
telegram telegram.Client
writer thingWriter[C]
}
func (l subredditCommandListener[C]) String() string {
return "vendors.reddit.subreddit-commands"
}
func (l *subredditCommandListener[C]) CommandScope() tapp.CommandScope {
return tapp.Public
}
func (l *subredditCommandListener[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var reddit reddit.Client[C]
if err := app.Use(ctx, &reddit, false); err != nil {
return err
}
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var telegram tapp.Mixin[C]
if err := app.Use(ctx, &telegram, false); err != nil {
return err
}
var writer thingWriter[C]
if err := app.Use(ctx, &writer, false); err != nil {
return err
}
l.reddit = reddit
l.storage = storage
l.telegram = telegram.Bot()
l.writer = writer
return nil
}
func (l *subredditCommandListener[C]) Sr_c_callback(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
if len(cmd.Args) < 3 {
return errors.Errorf("expected two arguments")
}
origin, err := telegram.ParseID(cmd.Args[0])
if err != nil {
return err
}
feedID := feed.ID(origin)
data := SubredditEventData{
Subreddit: cmd.Args[1],
ThingID: cmd.Args[2],
UserID: cmd.User.ID,
MessageID: cmd.Message.ID,
}
if origin != cmd.Chat.ID {
// from start
if err := l.storage.EventTx(ctx, func(tx feed.EventTx) error {
filter, err := data.filter("pre")
if err != nil {
return err
}
return tx.GetLastEventData(feedID, pre, filter, &data)
}); err != nil {
return err
}
}
things, err := l.reddit.GetPosts(ctx, data.Subreddit, data.ThingID)
if err != nil {
return errors.Wrap(err, "get post")
}
if len(things) == 0 {
return errors.Wrap(err, "post not found")
}
html, buffer := l.createHTMLWriter(ctx)
layout := ThingLayout{ShowAuthor: true, ShowText: true, HideMedia: true}
writeHTML := l.writer.writeHTML(ctx, feedID, layout, things[0].Data)
if err := writeHTML(html); err != nil {
return errors.Wrap(err, "write html")
}
if err := html.Flush(); err != nil {
return errors.Wrap(err, "flush html")
}
ref := telegram.MessageRef{
ChatID: origin,
ID: data.MessageID,
}
_, err = l.telegram.CopyMessage(ctx, data.UserID, ref, &telegram.CopyOptions{
Caption: buffer.Pages[0],
ParseMode: telegram.HTML,
})
var tgErr telegram.Error
if errors.As(err, &tgErr) && tgErr.ErrorCode == http.StatusForbidden {
if err := l.storage.SaveEvent(ctx, feedID, pre, data); err != nil {
return err
}
err = errors.Wrap(cmd.Start(ctx, l.telegram), "send start")
}
if err != nil {
return err
}
if err := l.storage.SaveEvent(ctx, feedID, click, data); err != nil {
return err
}
_ = cmd.ReplyCallback(ctx, l.telegram, "đŠ")
return nil
}
func (l *subredditCommandListener[C]) Sr_l_callback(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
if err := l.pref(ctx, cmd, like); err != nil {
return err
}
_ = cmd.ReplyCallback(ctx, l.telegram, "đ")
return nil
}
func (l *subredditCommandListener[C]) Src_dl_callback(ctx context.Context, _ telegram.Client, cmd *telegram.Command) error {
if err := l.pref(ctx, cmd, dislike); err != nil {
return err
}
_ = cmd.ReplyCallback(ctx, l.telegram, "đ")
return nil
}
func (l *subredditCommandListener[C]) createHTMLWriter(ctx context.Context) (*html.Writer, *receiver.Buffer) {
buffer := receiver.NewBuffer()
writer := &html.Writer{Out: &output.Paged{Receiver: buffer}}
ctx = output.With(ctx, telegram.MaxCaptionSize, 1)
return writer.WithContext(ctx), buffer
}
func (l *subredditCommandListener[C]) pref(ctx context.Context, cmd *telegram.Command, eventType string) error {
if len(cmd.Args) < 2 {
return errors.Errorf("expected two arguments")
}
feedID := feed.ID(cmd.Chat.ID)
data := newSubredditEventData(cmd)
var stats map[string]int64
if err := l.storage.EventTx(ctx, func(tx feed.EventTx) error {
filter, err := data.filter("delete")
if err != nil {
return err
}
if err := tx.DeleteEvents(feedID, []string{like, dislike}, filter); err != nil {
return err
}
if err := tx.SaveEvent(feedID, eventType, data); err != nil {
return err
}
filter, err = data.filter("count")
if err != nil {
return err
}
stats, err = tx.CountEventsByType(feedID, []string{like, dislike}, filter)
if err != nil {
return errors.Wrap(err, "count events")
}
return nil
}); err != nil {
return err
}
ref := telegram.MessageRef{ChatID: cmd.Chat.ID, ID: cmd.Message.ID}
paywall := false
if len(cmd.Args) > 2 {
paywall = cmd.Args[2] == "p"
}
var buttons []telegram.Button
if paywall {
buttons = []telegram.Button{PaywallButton(feedID, data.Subreddit, data.ThingID)}
}
buttons = append(buttons, PreferenceButtons(data.Subreddit, data.ThingID, stats[like], stats[dislike], paywall)...)
if _, err := l.telegram.EditMessageReplyMarkup(ctx, ref, telegram.InlineKeyboard(buttons)); err != nil {
return errors.Wrap(err, "edit reply markup")
}
return nil
}
package reddit
import (
"context"
"sort"
"strings"
"time"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/srstats"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/logf"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/tapp"
"github.com/pkg/errors"
)
type SubredditSuggestionsConfig struct {
Period flu.Duration `yaml:"period,omitempty" doc:"Period to consider data for." default:"374h"`
Interval flu.Duration `yaml:"interval,omitempty" doc:"How often to make suggestions." default:"24h"`
}
type SubredditSuggestionsContext interface {
tapp.Context
core.StorageContext
core.InterfaceContext
SubredditSuggestionsConfig() SubredditSuggestionsConfig
}
type SubredditSuggestionsData struct {
Ref string `json:"ref"`
FeedID feed.ID `json:"chat_id"`
FiredAtSecs int64 `json:"fired_at"`
Options []string `json:"options"`
}
type SubredditSuggestions[C SubredditSuggestionsContext] struct {
clock syncf.Clock
config SubredditSuggestionsConfig
telegram telegram.Client
storage StorageInterface
client srstats.Client[C]
aliases map[string]telegram.ID
}
func (v SubredditSuggestions[C]) String() string {
return "srstats"
}
func (v *SubredditSuggestions[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var storage Storage[C]
if err := app.Use(ctx, &storage, false); err != nil {
return err
}
var bot tapp.Mixin[C]
if err := app.Use(ctx, &bot, false); err != nil {
return err
}
var client srstats.Client[C]
if err := app.Use(ctx, &client, false); err != nil {
return err
}
v.clock = app
v.config = app.Config().SubredditSuggestionsConfig()
v.telegram = bot.Bot()
v.storage = storage
v.client = client
v.aliases = app.Config().InterfaceConfig().Aliases
return nil
}
func (v *SubredditSuggestions[C]) Parse(ctx context.Context, ref string, options []string) (*feed.Draft, error) {
if !strings.HasPrefix(ref, v.String()+"/") {
return nil, nil
}
ref = ref[len(v.String())+1:]
var chatID telegram.ID
if resolved, ok := v.aliases[ref]; ok {
chatID = resolved
} else {
var err error
chatID, err = telegram.ParseID(ref)
if err != nil {
return nil, errors.Wrapf(err, "parse chat id: %s", ref)
}
}
chat, err := v.telegram.GetChat(ctx, chatID)
if err != nil {
return nil, errors.Wrapf(err, "get chat %s", chatID)
}
return &feed.Draft{
SubID: chatID.String(),
Name: chat.Title,
Data: SubredditSuggestionsData{
Ref: ref,
FeedID: feed.ID(chatID),
Options: options,
},
}, nil
}
func (v *SubredditSuggestions[C]) Refresh(ctx context.Context, header feed.Header, refresh feed.Refresh) error {
var data SubredditSuggestionsData
if err := refresh.Init(ctx, &data); err != nil {
return err
}
now := v.clock.Now()
if time.Unix(data.FiredAtSecs, 0).Add(v.config.Interval.Value).After(now) {
return nil
}
multipliers := map[string]float64{
"like": 1,
"click": 1,
"dislike": -1,
}
since := v.clock.Now().Add(-v.config.Period.Value)
stats, err := v.storage.CountEventsBy(ctx, data.FeedID, since, "subreddit", multipliers)
if err != nil {
return err
}
subreddits := make(map[string]float64, len(stats))
for sr, rating := range stats {
subreddits[sr] = float64(rating)
}
suggestions, err := v.getSuggestions(ctx, subreddits)
if err != nil {
return err
}
data.FiredAtSecs = now.Unix()
writeHTML := v.writeHTML(data, suggestions)
return refresh.Submit(ctx, writeHTML, data)
}
func (v *SubredditSuggestions[C]) writeHTML(data SubredditSuggestionsData, suggestions suggestions) feed.WriteHTML {
return func(html *html.Writer) error {
html.Bold("suggestions").Text(" @ %s â", data.Ref)
var i int
for _, suggestion := range suggestions {
sr := suggestion.subreddit
score := suggestion.score
header := feed.Header{
SubID: sr,
Vendor: "subreddit",
FeedID: data.FeedID,
}
if _, err := v.storage.GetSubscription(context.Background(), header); !errors.Is(err, feed.ErrNotFound) {
continue
}
html.Text("\n").
Link(sr, "https://www.reddit.com/r/"+sr).
Text(" â %.3f%% ", score*100).
Link("đĨ", (&telegram.Command{
Key: "sub",
Args: append([]string{"/r/" + sr, data.Ref}, data.Options...)}).
Button("").
StartCallbackURL(string(v.telegram.Username()))).
Text(" ").
Link("đ", (&telegram.Command{
Key: "sub",
Args: append([]string{"/r/" + sr, data.Ref, feed.Deadborn}, data.Options...)}).
Button("").
StartCallbackURL(string(v.telegram.Username())))
if i++; i >= 10 {
break
}
}
return nil
}
}
func (v *SubredditSuggestions[C]) getSuggestions(ctx context.Context, subreddits map[string]float64) (suggestions, error) {
global, err := v.client.GetGlobalHistogram(ctx)
if err != nil {
return nil, errors.Wrap(err, "get global histogram")
}
normalize(global)
m := make(map[string]float64)
for subreddit, multiplier := range subreddits {
histogram, err := v.client.GetHistogram(ctx, subreddit)
if err != nil {
logf.Get(v).Warnf(ctx, "failed to get histogram for [%s]: %v", subreddit, err)
continue
}
normalize(histogram)
for subreddit, score := range histogram {
globalScore := global[subreddit]
if globalScore < 0.0001 {
continue
}
m[subreddit] += multiplier * score / globalScore
}
}
normalize(m)
suggestions := make(suggestions, len(m))
ids := make([]string, len(m))
i := 0
for subreddit, score := range m {
suggestions[i] = suggestion{subreddit: subreddit, score: score}
ids[i] = subreddit
i++
}
names, err := v.client.GetSubredditNames(ctx, ids)
if err != nil {
return nil, errors.Wrap(err, "get subreddit names")
}
for i := range suggestions {
if len(names) <= i {
suggestions = suggestions[:len(names)]
break
}
suggestions[i].subreddit = names[i]
}
sort.Sort(suggestions)
return suggestions, nil
}
type suggestion struct {
subreddit string
score float64
}
type suggestions []suggestion
func (s suggestions) Len() int {
return len(s)
}
func (s suggestions) Less(i, j int) bool {
return s[i].score > s[j].score
}
func (s suggestions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func normalize(histogram map[string]float64) {
sum := 0.
for _, value := range histogram {
sum += value
}
if sum > 0 {
for key, value := range histogram {
histogram[key] = value / sum
}
}
}
package reddit
import (
"fmt"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w/hikkabot/v4/internal/util"
"github.com/jfk9w-go/telegram-bot-api"
"github.com/jfk9w-go/telegram-bot-api/ext/html"
"github.com/jfk9w-go/telegram-bot-api/ext/output"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
)
type ThingLayout struct {
HideSubreddit bool `json:"hide_subreddit,omitempty"`
HideLink bool `json:"hide_link,omitempty"`
ShowAuthor bool `json:"show_author,omitempty"`
HideTitle bool `json:"hide_title,omitempty"`
ShowText bool `json:"show_text,omitempty"`
HideMedia bool `json:"hide_media,omitempty"`
HideMediaLink bool `json:"hide_media_link,omitempty"`
ShowPaywall bool `json:"show_paywall,omitempty"`
ShowPreference bool `json:"show_preference,omitempty"`
}
func (l *ThingLayout) WriteHTML(feedID feed.ID, thing reddit.ThingData, mediaRef receiver.MediaRef) feed.WriteHTML {
return func(html *html.Writer) error {
var buttons []telegram.Button
ctx := html.Context()
if l.ShowPaywall {
buttons = []telegram.Button{PaywallButton(feedID, thing.Subreddit, thing.ID)}
ctx = output.With(ctx, telegram.MaxCaptionSize, 1)
}
if !l.ShowText && !l.HideMedia {
ctx = receiver.SkipOnMediaError(ctx)
}
if l.ShowPreference {
buttons = append(buttons,
PreferenceButtons(thing.Subreddit, thing.ID, 0, 0, l.ShowPaywall)...)
}
if len(buttons) > 0 {
ctx = receiver.ReplyMarkup(ctx, telegram.InlineKeyboard(buttons))
}
html = html.WithContext(ctx)
if !l.HideSubreddit {
html.Text(getSubredditName(thing.Subreddit))
}
if !l.HideLink {
html.Text(" ").Link("đŦ", thing.PermalinkURL())
}
if l.ShowAuthor {
html.Text("\n").Text(`u/`).Text(util.Hashtag(thing.Author))
}
if !l.HideTitle {
html.Text("\n")
if thing.IsSelf {
html.Bold(thing.Title)
} else {
html.Text(thing.Title)
}
}
if l.ShowText {
html.Text("\n").MarkupString(thing.SelfTextHTML)
}
if !l.HideMedia {
html.Media(thing.URL.String, mediaRef, true, !l.HideMediaLink)
}
return nil
}
}
func PaywallButton(feedID feed.ID, subreddit, thingID string) telegram.Button {
return (&telegram.Command{
Key: "sr_c",
Args: []string{feedID.String(), subreddit, thingID},
}).Button("âšī¸")
}
func PreferenceButtons(subreddit, thingID string, likes, dislikes int64, paywall bool) []telegram.Button {
args := []string{subreddit, thingID}
if paywall {
args = append(args, "p")
}
return []telegram.Button{
(&telegram.Command{
Key: "sr_l",
Args: args,
}).Button(fmt.Sprintf("đ %d", likes)),
(&telegram.Command{
Key: "src_dl",
Args: args,
}).Button(fmt.Sprintf("đ %d", dislikes)),
}
}
package reddit
import "github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
type thingSorter []reddit.Thing
func (ts thingSorter) Len() int {
return len(ts)
}
func (ts thingSorter) Less(i, j int) bool {
return ts[i].Data.NumID < ts[j].Data.NumID
}
func (ts thingSorter) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
package reddit
import (
"context"
"github.com/jfk9w/hikkabot/v4/internal/3rdparty/reddit"
"github.com/jfk9w/hikkabot/v4/internal/core"
"github.com/jfk9w/hikkabot/v4/internal/feed"
"github.com/jfk9w-go/flu/apfel"
"github.com/jfk9w-go/flu/syncf"
"github.com/jfk9w-go/telegram-bot-api/ext/receiver"
"github.com/pkg/errors"
)
type thingWriter[C core.MediatorContext] struct {
mediator feed.Mediator
}
func (w thingWriter[C]) String() string {
return "vendors.reddit.thing-writer"
}
func (w *thingWriter[C]) Include(ctx context.Context, app apfel.MixinApp[C]) error {
var mediator core.Mediator[C]
if err := app.Use(ctx, &mediator, false); err != nil {
return err
}
w.mediator = mediator
return nil
}
func (w *thingWriter[C]) writeHTML(ctx context.Context, feedID feed.ID, layout ThingLayout, thing reddit.ThingData) feed.WriteHTML {
var mediaRef receiver.MediaRef
if !thing.IsSelf && !layout.HideMedia {
var dedupKey *feed.ID
if !layout.ShowText {
dedupKey = &feedID
}
mediaRef = w.mediaRef(ctx, thing, dedupKey)
}
return layout.WriteHTML(feedID, thing, mediaRef)
}
func (w *thingWriter[C]) mediaRef(ctx context.Context, thing reddit.ThingData, dedupKey *feed.ID) receiver.MediaRef {
url := thing.URL.String
if thing.Domain == "v.redd.it" {
url = thing.MediaContainer.FallbackURL()
if url == "" {
for _, mc := range thing.CrosspostParentList {
url = mc.FallbackURL()
if url != "" {
break
}
}
}
if url == "" {
return syncf.Val[*receiver.Media]{E: errors.New("unable to find url")}
}
}
return w.mediator.Mediate(ctx, url, dedupKey)
}
package vendors
import (
"github.com/jfk9w/hikkabot/v4/internal/ext/vendors/dvach"
"github.com/jfk9w/hikkabot/v4/internal/ext/vendors/reddit"
)
type (
SubredditConfig = reddit.SubredditConfig
SubredditSuggestionsConfig = reddit.SubredditSuggestionsConfig
)
func DvachCatalog[C dvach.Context]() *dvach.Catalog[C] {
return new(dvach.Catalog[C])
}
func DvachThread[C dvach.Context]() *dvach.Thread[C] {
return new(dvach.Thread[C])
}
func Subreddit[C reddit.SubredditContext]() *reddit.Subreddit[C] {
return new(reddit.Subreddit[C])
}
func SubredditSuggestions[C reddit.SubredditSuggestionsContext]() *reddit.SubredditSuggestions[C] {
return new(reddit.SubredditSuggestions[C])
}
package media
import (
"context"
"mime"
"net/http"
"strconv"
"github.com/jfk9w-go/flu"
"github.com/jfk9w-go/flu/httpf"
"github.com/pkg/errors"
)
type HTTPRef struct {
Client httpf.Client
URL string
Meta *Meta
Buffer bool
}
func (r HTTPRef) GetMeta(ctx context.Context) (*Meta, error) {
if r.Meta != nil {
return r.Meta, nil
}
var m Meta
return &m, r.exchange(ctx, http.MethodHead).HandleFunc(func(resp *http.Response) error {
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
return errors.New("content type is empty")
}
var err error
m.MIMEType, _, err = mime.ParseMediaType(contentType)
if err != nil {
return errors.Wrapf(err, "invalid content type: %s", contentType)
}
contentLength := resp.Header.Get("Content-Length")
size, err := strconv.ParseInt(contentLength, 10, 64)
if err != nil {
m.Size = -1
} else {
m.Size = Size(size)
}
return nil
}).Error()
}
func (r HTTPRef) Get(ctx context.Context) (flu.Input, error) {
if r.Client != nil {
return r.exchange(ctx, http.MethodGet), nil
}
return flu.URL(r.URL), nil
}
func (r HTTPRef) exchange(ctx context.Context, method string) *httpf.ExchangeResult {
return httpf.Request(r.URL).
Method(method).
Exchange(ctx, r.Client).
CheckStatus(http.StatusOK)
}
type LocalRef struct {
Input flu.Input
Meta *Meta
}
func (r LocalRef) GetMeta(ctx context.Context) (*Meta, error) {
return r.Meta, nil
}
func (r LocalRef) Get(ctx context.Context) (flu.Input, error) {
return r.Input, nil
}
type LazyRef struct {
Ref
Meta *Meta
}
func (r LazyRef) GetMeta(ctx context.Context) (*Meta, error) {
return r.Meta, nil
}
package media
import (
"fmt"
"regexp"
"strconv"
"strings"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)
var sizeRegexp = regexp.MustCompile(`^(\d+)([kmgt])?$`)
const (
b Size = 1
kb = 1 << 10
mb = 1 << 20
gb = 1 << 30
tb = 1 << 40
)
type Size int64
func (s *Size) UnmarshalYAML(node *yaml.Node) error {
match := sizeRegexp.FindStringSubmatch(strings.ToLower(node.Value))
if len(match) < 2 {
return errors.Errorf(`expected expression matching %s`, sizeRegexp.String())
}
amount, err := strconv.ParseInt(match[1], 10, 64)
if err != nil {
return err
}
unit := b
if len(match) == 3 {
switch match[2] {
case "k":
unit = kb
case "m":
unit = mb
case "g":
unit = gb
case "t":
unit = tb
}
}
*s = unit * Size(amount)
return nil
}
func (s Size) MarshalYAML() (any, error) {
return strconv.FormatInt(int64(s), 10), nil
}
func (s Size) String() string {
size := int64(s)
switch {
case size >= tb:
return fmt.Sprintf("%dT", size/tb)
case size >= gb:
return fmt.Sprintf("%dG", size/gb)
case size >= mb:
return fmt.Sprintf("%dM", size/mb)
case size >= kb:
return fmt.Sprintf("%dK", size/kb)
default:
return fmt.Sprintf("%d", size)
}
}
package feed
import (
"context"
"fmt"
"strconv"
"time"
"github.com/jfk9w-go/flu/gormf"
"github.com/jfk9w-go/flu/me3x"
"github.com/jfk9w-go/telegram-bot-api/ext/html"
"gopkg.in/guregu/null.v3"
)
type ID int64
func (id ID) String() string {
return strconv.FormatInt(int64(id), 10)
}
type Header struct {
SubID string `gorm:"primaryKey;column:sub_id"`
Vendor string `gorm:"primaryKey"`
FeedID ID `gorm:"primaryKey"`
}
func (h Header) Labels() me3x.Labels {
return make(me3x.Labels, 0, 3).
Add("sub_id", h.SubID).
Add("vendor", h.Vendor).
Add("feed_id", h.FeedID)
}
func (h Header) String() string {
return fmt.Sprintf("%d.%s.%s", h.FeedID, h.Vendor, h.SubID)
}
type Subscription struct {
Header `gorm:"embedded"`
Name string `gorm:"not null"`
Data gormf.JSONB
UpdatedAt *time.Time
Error null.String
}
func (s *Subscription) TableName() string {
return "feed"
}
type Draft struct {
SubID string
Name string
Data any
}
type Event struct {
Time time.Time `gorm:"not null;index"`
Type string `gorm:"not null;index:idx_event"`
FeedID ID `gorm:"not null;index:idx_event;column:chat_id"`
Data gormf.JSONB
}
func (e *Event) TableName() string {
return "event"
}
type WriteHTML func(html *html.Writer) error
type Task func(context.Context) error
type MediaHash struct {
FeedID ID `gorm:"primaryKey"`
URL string `gorm:"not null"`
Type string `gorm:"primaryKey;column:hash_type"`
Value string `gorm:"primaryKey;column:hash"`
FirstSeen time.Time `gorm:"not null"`
LastSeen time.Time `gorm:"not null"`
Collisions int64 `gorm:"not null"`
}
func (h *MediaHash) TableName() string {
return "blob"
}
package util
import (
"html"
"regexp"
"strings"
"golang.org/x/exp/utf8string"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
var (
tagRegexp = regexp.MustCompile(`<.*?>`)
junkRegexp = regexp.MustCompile(`(?i)[^\wа-ŅŅ_]`)
)
func Hashtag(str string) string {
str = html.UnescapeString(str)
str = tagRegexp.ReplaceAllString(str, "")
fields := strings.Fields(str)
for i, field := range fields {
fields[i] = cases.Title(language.Russian).String(junkRegexp.ReplaceAllString(field, ""))
}
str = strings.Join(fields, "")
tag := utf8string.NewString(str)
if tag.RuneCount() > 25 {
return "#" + tag.Slice(0, 25)
}
return "#" + tag.String()
}
package util
import (
"encoding/json"
"regexp"
"github.com/pkg/errors"
)
type Regexp struct {
*regexp.Regexp
}
func (re Regexp) MatchString(str string) bool {
if re.Regexp == nil {
return true
} else {
return re.Regexp.MatchString(str)
}
}
func (re Regexp) MarshalJSON() ([]byte, error) {
return json.Marshal(re.String())
}
func (re *Regexp) UnmarshalJSON(data []byte) error {
var str string
err := json.Unmarshal(data, &str)
if err != nil {
return errors.Wrap(err, "unmarshal")
}
if str == "" {
return nil
}
regexp, err := regexp.Compile(str)
if err != nil {
return errors.Wrap(err, "compile regexp")
}
re.Regexp = regexp
return nil
}
func (re Regexp) String() string {
if re.Regexp == nil {
return ""
}
return re.Regexp.String()
}