feat 支持蹲库存和历史库存

This commit is contained in:
timerzz 2024-12-02 19:07:38 +08:00
parent 3b321f099d
commit 66959d8a58
9 changed files with 400 additions and 27 deletions

171
ats-tracer/controller.go Normal file
View File

@ -0,0 +1,171 @@
package ats_tracer
import (
"context"
"fmt"
"sync"
"time"
"gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options"
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push"
"gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher"
"github.com/golang/glog"
"gorm.io/gorm"
)
type Controller struct {
ctx context.Context
m sync.RWMutex
// 要追踪的ProviderArticle
tracers *Tracers
storage *storage.Storage
client *coach_client.US
providerId v2.ProviderId
interval time.Duration
threshold int
}
func NewController(ctx context.Context, cfg *options.Config, client *coach_client.US, db *gorm.DB) *Controller {
return &Controller{
ctx: ctx,
providerId: cfg.ProviderId,
interval: cfg.AtsInterval,
client: client,
storage: storage.NewStorage(db),
tracers: NewTracers(storage.NewStorage(db), cfg.ProviderId),
}
}
func (c *Controller) Run() (err error) {
// 加载要追踪的ProviderArticle
if err = c.tracers.Load(); err != nil {
return
}
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
glog.Infof("tracer退出")
return
case <-ticker.C:
c.traceRange()
}
}
}
// 返回ready
func (c *Controller) Ready() bool {
return c.tracers.Ready()
}
// Add 添加一个要追踪库存的ProviderArticle
func (c *Controller) Add(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.SetTraceAts(true)
article.Available = false
if err = c.storage.ProviderArticle().Update(article, "trace_ats", "available"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Add(&article)
return nil
}
func (c *Controller) Delete(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.TraceAts = nil
if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Remove(skuID)
return nil
}
func (c *Controller) Stop(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.SetTraceAts(false)
if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Remove(skuID)
return nil
}
func (c *Controller) traceRange() {
c.tracers.Range(func(tracer *Tracer) bool {
return !tracer.tracing
}, func(tracer *Tracer) {
go func() {
tracer.tracing = true
defer func() {
tracer.tracing = false
}()
if c.doTrace(tracer) {
// 如果蹲到了,需要通知
//resp, err := pusher.Push(c.ctx, &push.PushReq{
// Title: "coach 断货",
// Content: fmt.Sprintf("coach 商品 %s 断货了\n库存为0\n链接%s", tracer.pArticle.SkuID, tracer.pArticle.Link),
//})
//if err != nil {
// glog.Errorf("消息推送失败:%v", err)
//}
//if resp.Code != 0 {
// glog.Errorf("消息推送失败:%s", resp.Msg)
//}
tracer.pArticle.SetTraceAts(false)
_ = c.storage.ProviderArticle().Update(*tracer.pArticle, "trace_ats")
c.tracers.Remove(tracer.pArticle.SkuID)
}
}()
})
}
func (c *Controller) doTrace(tracer *Tracer) (available bool) {
article := tracer.pArticle
inventory, err := c.client.RequestInventory(c.ctx, article.SkuID)
if err != nil {
glog.Warningf("获取coach %s 库存失败:%v", article.SkuID, err)
return
}
article.Available = inventory.Orderable && inventory.Ats > 0
article.Ats = inventory.Ats
if article.Ats == 0 {
article.SetTraceAts(false)
}
if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil {
glog.Errorf("更新数据库失败:%v", err)
article.SetTraceAts(true)
return
}
if article.Ats != tracer.lastAts {
if value := tracer.lastAts - article.Ats; value >= c.threshold {
_, _ = pusher.Push(c.ctx, &push.PushReq{
Title: "coach 商品库存减少",
Content: fmt.Sprintf("coach 商品 %s %s 减少了 %d \n链接%s", tracer.pArticle.SkuID, c.interval, value, tracer.pArticle.Link),
})
}
c.storage.DB().Create(&v2.ProviderAts{
ProviderArticleID: article.ID,
Ats: article.Ats,
})
}
tracer.lastAts, tracer.lastTraceTime = article.Ats, time.Now()
return article.Ats <= 0
}

102
ats-tracer/tracer.go Normal file
View File

@ -0,0 +1,102 @@
package ats_tracer
import (
"slices"
"sync"
"time"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"github.com/golang/glog"
)
type Tracer struct {
pArticle *v2.ProviderArticle
tracing bool //是不是正在抓
lastTraceTime time.Time //上次是什么时候抓的
lastAts int // 上次抓取的库存
}
type Tracers struct {
m sync.RWMutex
list []*Tracer
providerId v2.ProviderId
storage *storage.Storage
ready bool
}
func NewTracers(storage *storage.Storage, providerId v2.ProviderId) *Tracers {
return &Tracers{
storage: storage,
providerId: providerId,
list: make([]*Tracer, 0),
}
}
func (w *Tracers) Add(article *v2.ProviderArticle) {
w.m.Lock()
defer w.m.Unlock()
if !slices.ContainsFunc(w.list, func(item *Tracer) bool {
return item.pArticle.SkuID == article.SkuID
}) {
w.list = append(w.list, &Tracer{pArticle: article, lastAts: -1})
}
}
func (w *Tracers) Remove(skuID string) {
for i, item := range w.list {
if item.pArticle.SkuID == skuID {
w.m.Lock()
w.list = append(w.list[:i], w.list[i+1:]...)
w.m.Unlock()
return
}
}
}
func (w *Tracers) Range(filter func(tracer *Tracer) bool, do func(tracer *Tracer)) {
w.m.RLock()
defer w.m.RUnlock()
for _, item := range w.list {
if filter(item) {
do(item)
}
}
}
func (w *Tracers) Exist(skuID string) bool {
w.m.RLock()
defer w.m.RUnlock()
for _, item := range w.list {
if item.pArticle.SkuID == skuID {
return true
}
}
return false
}
func (w *Tracers) Load() (err error) {
var articles []*v2.ProviderArticle
articles, err = w.storage.ProviderArticle().Find(
storage.NewGetProviderArticleQuery().
SetProviderId(w.providerId).
SetTraceAts(true),
)
if err != nil {
return err
}
w.m.Lock()
defer w.m.Unlock()
for _, article := range articles {
w.list = append(w.list, &Tracer{pArticle: article})
}
glog.Infof("共加载%d个需要追踪库存的商品\n", len(w.list))
w.ready = true
return
}
// 返回ready
func (w *Tracers) Ready() bool {
return w.ready
}

View File

@ -6,6 +6,7 @@ import (
"os"
"os/signal"
ats_tracer "gitea.timerzz.com/kedaya_haitao/coach-spider/ats-tracer"
"gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options"
"gitea.timerzz.com/kedaya_haitao/coach-spider/server"
"gitea.timerzz.com/kedaya_haitao/coach-spider/spider"
@ -15,8 +16,6 @@ import (
"gitea.timerzz.com/kedaya_haitao/common/pkg/proxy"
"gitea.timerzz.com/kedaya_haitao/common/pkg/redis"
"gitea.timerzz.com/kedaya_haitao/common/pkg/web"
"gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher"
"github.com/cloudwego/kitex/client"
"github.com/gofiber/fiber/v3"
"github.com/gofiber/fiber/v3/middleware/cors"
"github.com/gofiber/fiber/v3/middleware/recover"
@ -46,7 +45,7 @@ func main() {
glog.Fatalf("初始化redis失败%v", err)
}
pusher.InitClient("pusher", client.WithHostPorts("pusher:8080"))
//pusher.InitClient("pusher", client.WithHostPorts("pusher:8080"))
// 代理池
pool := proxy.NewProxyPool(cfg.Proxy.Subscribes)
@ -72,8 +71,16 @@ func main() {
cli,
db,
)
tracer := ats_tracer.NewController(
_ctx,
cfg,
cli,
db,
)
wg.Go(spider.Run)
wg.Go(watcher.Run)
wg.Go(tracer.Run)
// http
r := fiber.New(fiber.Config{ErrorHandler: web.ErrHandle})
@ -92,6 +99,7 @@ func main() {
for _, register := range []web.Register{
server.NewSpiderSvc(spider, cfg.ProviderId),
server.NewWatcherSvc(watcher, cfg.ProviderId),
server.NewAtsTracerSvc(tracer, cfg.ProviderId),
web.NewProbe().SetReadyProbe(readyProbe),
} {
register.Registry(r)

4
go.mod
View File

@ -5,9 +5,8 @@ go 1.22.2
toolchain go1.22.3
require (
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8
gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0
github.com/cloudwego/kitex v0.11.3
github.com/gofiber/fiber/v3 v3.0.0-beta.3
github.com/golang/glog v1.2.1
github.com/pkg/errors v0.9.1
@ -47,6 +46,7 @@ require (
github.com/cloudwego/gopkg v0.1.2 // indirect
github.com/cloudwego/hertz v0.9.1 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/cloudwego/kitex v0.11.3 // indirect
github.com/cloudwego/localsession v0.0.2 // indirect
github.com/cloudwego/netpoll v0.6.4 // indirect
github.com/cloudwego/runtimex v0.1.0 // indirect

4
go.sum
View File

@ -1,6 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2 h1:gRKzV+KtHoT126BXcaulTarAqmpNgTX9GS6m34NyrRI=
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2/go.mod h1:BIz+IMGznPiyLnV1+Ntw1zf8rEIcbymmGq+EfvDsSgE=
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8 h1:AJo3Y3icJb8wcjeSnx6SjkHFdBKYm5lFscEuo6O4dDM=
gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8/go.mod h1:BIz+IMGznPiyLnV1+Ntw1zf8rEIcbymmGq+EfvDsSgE=
gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0 h1:WMNOErbI6At865VWI3sN74RMQaZ8ZhwsNSB9A4vg/6Q=
gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0/go.mod h1:nRdxwOP3hhkUdH3PjHq3gt8SA+YEfR/d7Ig9DuQQZQY=
github.com/3andne/restls-client-go v0.1.6 h1:tRx/YilqW7iHpgmEL4E1D8dAsuB0tFF3uvncS+B6I08=

View File

@ -3,10 +3,12 @@ package options
import (
"fmt"
"os"
"strconv"
"time"
"gitea.timerzz.com/kedaya_haitao/common/pkg/proxy"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"github.com/golang/glog"
"gopkg.in/yaml.v3"
)
@ -19,6 +21,8 @@ type Config struct {
Proxy proxy.Option `yaml:"proxy"`
ProviderId v2.ProviderId `yaml:"provider_id"`
WatchInterval time.Duration
AtsInterval time.Duration
AtsThreshold int // 库存一定时间内减少多少个通知
}
func LoadConfigs() (opt *Config, err error) {
@ -30,10 +34,19 @@ func LoadConfigs() (opt *Config, err error) {
}
opt.WatchInterval, _ = time.ParseDuration(os.Getenv("WATCH_INTERVAL"))
opt.AtsInterval, _ = time.ParseDuration(os.Getenv("ATS_INTERVAL"))
if opt.WatchInterval == 0 {
opt.WatchInterval = 5 * time.Minute
}
if opt.AtsInterval == 0 {
opt.AtsInterval = 15 * time.Minute
}
opt.AtsThreshold, _ = strconv.Atoi(os.Getenv("ATS_THRESHOLD"))
if opt.AtsThreshold == 0 {
opt.AtsThreshold = 40
}
glog.Infof("加载watch interval %s\nats interval %s\nats threshold %d", opt.WatchInterval, opt.AtsInterval, opt.AtsThreshold)
// 加载代理配置
cfgPath := os.Getenv(ProxyConfigEnv)
if cfgPath == "" {

78
server/ats-tracer.go Normal file
View File

@ -0,0 +1,78 @@
package server
import (
"fmt"
"net/url"
"strings"
ats_tracer "gitea.timerzz.com/kedaya_haitao/coach-spider/ats-tracer"
"gitea.timerzz.com/kedaya_haitao/common/pkg/web"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"github.com/gofiber/fiber/v3"
)
type AtsTracerSvc struct {
ctl *ats_tracer.Controller
providerId v2.ProviderId
}
func NewAtsTracerSvc(ctl *ats_tracer.Controller, providerId v2.ProviderId) *AtsTracerSvc {
return &AtsTracerSvc{
ctl: ctl,
providerId: providerId,
}
}
func (s *AtsTracerSvc) Registry(r fiber.Router) {
api := r.Group(fmt.Sprintf("/api/v2/provider/%s/ats-trace", s.providerId))
api.Post(":skuID", s.AddAtsTracer)
api.Delete(":skuID", s.DelAtsTracer)
api.Post("stop/:skuID", s.StopAtsTracer)
}
func (s *AtsTracerSvc) AddAtsTracer(ctx fiber.Ctx) (err error) {
skuID := ctx.Params("skuID")
if skuID == "" {
return fmt.Errorf("skuID is empty")
}
if skuID, err = url.QueryUnescape(skuID); err != nil {
return err
}
skuID = strings.ReplaceAll(skuID, " ", "-")
if err = s.ctl.Add(skuID); err != nil {
return
}
return ctx.JSON(web.NewResponse("ok"))
}
func (s *AtsTracerSvc) DelAtsTracer(ctx fiber.Ctx) (err error) {
skuID := ctx.Params("skuID")
if skuID == "" {
return fmt.Errorf("skuID is empty")
}
if skuID, err = url.QueryUnescape(skuID); err != nil {
return err
}
skuID = strings.ReplaceAll(skuID, " ", "-")
if err = s.ctl.Delete(skuID); err != nil {
return
}
return ctx.JSON(web.NewResponse("ok"))
}
func (s *AtsTracerSvc) StopAtsTracer(ctx fiber.Ctx) (err error) {
skuID := ctx.Params("skuID")
if skuID == "" {
return fmt.Errorf("skuID is empty")
}
if skuID, err = url.QueryUnescape(skuID); err != nil {
return err
}
skuID = strings.ReplaceAll(skuID, " ", "-")
if err = s.ctl.Stop(skuID); err != nil {
return
}
return ctx.JSON(web.NewResponse("ok"))
}

View File

@ -248,7 +248,7 @@ func (c *Controller) saveProducts(articles []v2.Article) {
oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost)
}
oldProviderArticle.Cost = pArticle.Cost
oldProviderArticle.Ast = pArticle.Ast
oldProviderArticle.Ats = pArticle.Ats
oldProviderArticle.Available = pArticle.Available
if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil {
glog.Errorf("保存供应商商品信息失败: %v", err)
@ -303,7 +303,7 @@ func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error {
Pid: pid,
SkuID: pid,
Available: resp.Inventory.Orderable,
Ast: resp.Inventory.Ats,
Ats: resp.Inventory.Ats,
Image: article.Image,
Link: fmt.Sprintf("%s/%s", "https://www.coachoutlet.com", resp.Url),
Cost: utils.CalculateProviderPrice(
@ -330,8 +330,8 @@ func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error {
if err != nil {
return fmt.Errorf("请求商品库存失败: %v", err)
}
pArticle.Ast = inv.Ats
return c.storage.ProviderArticle().Update(pArticle, "ast")
pArticle.Ats = inv.Ats
return c.storage.ProviderArticle().Update(pArticle, "ats")
}
func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) {

View File

@ -10,8 +10,6 @@ import (
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push"
"gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher"
"github.com/golang/glog"
"gorm.io/gorm"
)
@ -51,6 +49,7 @@ func (c *Controller) Run() (err error) {
for {
select {
case <-c.ctx.Done():
glog.Info("watcher 退出")
return
case <-ticker.C:
c.watchRange()
@ -115,17 +114,19 @@ func (c *Controller) watchRange() {
}()
if c.doWatch(watcher) {
// 如果蹲到了,需要通知
resp, err := pusher.Push(c.ctx, &push.PushReq{
Title: "coach 补货",
Content: fmt.Sprintf("coach 商品 %s 补货\n库存%d\n链接%s", watcher.pArticle.SkuID, watcher.pArticle.Ast, watcher.pArticle.Link),
})
if err != nil {
glog.Errorf("消息推送失败:%v", err)
}
if resp.Code != 0 {
glog.Errorf("消息推送失败:%s", resp.Msg)
}
//// 如果蹲到了,需要通知
//resp, err := pusher.Push(c.ctx, &push.PushReq{
// Title: "coach 补货",
// Content: fmt.Sprintf("coach 商品 %s 补货\n库存%d\n链接%s", watcher.pArticle.SkuID, watcher.pArticle.Ats, watcher.pArticle.Link),
//})
//if err != nil {
// glog.Errorf("消息推送失败:%v", err)
//}
//if resp.Code != 0 {
// glog.Errorf("消息推送失败:%s", resp.Msg)
//}
watcher.pArticle.SetWatch(false)
_ = c.storage.ProviderArticle().Update(*watcher.pArticle, "watch")
c.watchers.Remove(watcher.pArticle.SkuID)
}
}()
@ -140,9 +141,9 @@ func (c *Controller) doWatch(watcher *Watcher) (available bool) {
return
}
article.Available = inventory.Orderable && inventory.Ats > 0
article.Ast = inventory.Ats
article.Ats = inventory.Ats
article.SetWatch(!article.Available)
if err = c.storage.ProviderArticle().Update(*article, "available", "ast", "updated_at"); err != nil {
if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil {
glog.Errorf("更新数据库失败:%v", err)
article.SetWatch(true)
return