From 66959d8a5828cdaa4424bc7e88c0ea578ecdb896 Mon Sep 17 00:00:00 2001 From: timerzz Date: Mon, 2 Dec 2024 19:07:38 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E6=94=AF=E6=8C=81=E8=B9=B2=E5=BA=93?= =?UTF-8?q?=E5=AD=98=E5=92=8C=E5=8E=86=E5=8F=B2=E5=BA=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ats-tracer/controller.go | 171 +++++++++++++++++++++++++++++++++++++++ ats-tracer/tracer.go | 102 +++++++++++++++++++++++ cmd/main.go | 14 +++- go.mod | 4 +- go.sum | 4 +- pkg/options/init.go | 15 +++- server/ats-tracer.go | 78 ++++++++++++++++++ spider/controller.go | 8 +- watcher/controller.go | 31 +++---- 9 files changed, 400 insertions(+), 27 deletions(-) create mode 100644 ats-tracer/controller.go create mode 100644 ats-tracer/tracer.go create mode 100644 server/ats-tracer.go diff --git a/ats-tracer/controller.go b/ats-tracer/controller.go new file mode 100644 index 0000000..14dd828 --- /dev/null +++ b/ats-tracer/controller.go @@ -0,0 +1,171 @@ +package ats_tracer + +import ( + "context" + "fmt" + "sync" + "time" + + "gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options" + coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client" + "gitea.timerzz.com/kedaya_haitao/common/structs/storage" + v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" + "gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push" + "gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher" + "github.com/golang/glog" + "gorm.io/gorm" +) + +type Controller struct { + ctx context.Context + m sync.RWMutex + + // 要追踪的ProviderArticle + tracers *Tracers + + storage *storage.Storage + client *coach_client.US + + providerId v2.ProviderId + interval time.Duration + threshold int +} + +func NewController(ctx context.Context, cfg *options.Config, client *coach_client.US, db *gorm.DB) *Controller { + return &Controller{ + ctx: ctx, + providerId: cfg.ProviderId, + interval: cfg.AtsInterval, + client: client, + storage: storage.NewStorage(db), + tracers: NewTracers(storage.NewStorage(db), cfg.ProviderId), + } +} + +func (c *Controller) Run() (err error) { + // 加载要追踪的ProviderArticle + if err = c.tracers.Load(); err != nil { + return + } + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + glog.Infof("tracer退出") + return + case <-ticker.C: + c.traceRange() + } + } +} + +// 返回ready +func (c *Controller) Ready() bool { + return c.tracers.Ready() +} + +// Add 添加一个要追踪库存的ProviderArticle +func (c *Controller) Add(skuID string) error { + article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) + if err != nil { + return fmt.Errorf("获取商品信息失败: %v", err) + } + article.SetTraceAts(true) + article.Available = false + if err = c.storage.ProviderArticle().Update(article, "trace_ats", "available"); err != nil { + return fmt.Errorf("更新数据库失败:%v", err) + } + c.tracers.Add(&article) + return nil +} + +func (c *Controller) Delete(skuID string) error { + article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) + if err != nil { + return fmt.Errorf("获取商品信息失败: %v", err) + } + article.TraceAts = nil + if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil { + return fmt.Errorf("更新数据库失败:%v", err) + } + c.tracers.Remove(skuID) + return nil +} + +func (c *Controller) Stop(skuID string) error { + article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) + if err != nil { + return fmt.Errorf("获取商品信息失败: %v", err) + } + article.SetTraceAts(false) + if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil { + return fmt.Errorf("更新数据库失败:%v", err) + } + c.tracers.Remove(skuID) + return nil +} + +func (c *Controller) traceRange() { + c.tracers.Range(func(tracer *Tracer) bool { + return !tracer.tracing + }, func(tracer *Tracer) { + go func() { + tracer.tracing = true + defer func() { + tracer.tracing = false + }() + + if c.doTrace(tracer) { + // 如果蹲到了,需要通知 + //resp, err := pusher.Push(c.ctx, &push.PushReq{ + // Title: "coach 断货", + // Content: fmt.Sprintf("coach 商品 %s 断货了\n库存为0\n链接:%s", tracer.pArticle.SkuID, tracer.pArticle.Link), + //}) + //if err != nil { + // glog.Errorf("消息推送失败:%v", err) + //} + //if resp.Code != 0 { + // glog.Errorf("消息推送失败:%s", resp.Msg) + //} + tracer.pArticle.SetTraceAts(false) + _ = c.storage.ProviderArticle().Update(*tracer.pArticle, "trace_ats") + c.tracers.Remove(tracer.pArticle.SkuID) + } + }() + }) +} + +func (c *Controller) doTrace(tracer *Tracer) (available bool) { + article := tracer.pArticle + inventory, err := c.client.RequestInventory(c.ctx, article.SkuID) + if err != nil { + glog.Warningf("获取coach %s 库存失败:%v", article.SkuID, err) + return + } + article.Available = inventory.Orderable && inventory.Ats > 0 + article.Ats = inventory.Ats + if article.Ats == 0 { + article.SetTraceAts(false) + } + + if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil { + glog.Errorf("更新数据库失败:%v", err) + article.SetTraceAts(true) + return + } + if article.Ats != tracer.lastAts { + if value := tracer.lastAts - article.Ats; value >= c.threshold { + _, _ = pusher.Push(c.ctx, &push.PushReq{ + Title: "coach 商品库存减少", + Content: fmt.Sprintf("coach 商品 %s %s 减少了 %d \n链接:%s", tracer.pArticle.SkuID, c.interval, value, tracer.pArticle.Link), + }) + } + c.storage.DB().Create(&v2.ProviderAts{ + ProviderArticleID: article.ID, + Ats: article.Ats, + }) + } + tracer.lastAts, tracer.lastTraceTime = article.Ats, time.Now() + return article.Ats <= 0 +} diff --git a/ats-tracer/tracer.go b/ats-tracer/tracer.go new file mode 100644 index 0000000..a820410 --- /dev/null +++ b/ats-tracer/tracer.go @@ -0,0 +1,102 @@ +package ats_tracer + +import ( + "slices" + "sync" + "time" + + "gitea.timerzz.com/kedaya_haitao/common/structs/storage" + v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" + "github.com/golang/glog" +) + +type Tracer struct { + pArticle *v2.ProviderArticle + tracing bool //是不是正在抓 + lastTraceTime time.Time //上次是什么时候抓的 + lastAts int // 上次抓取的库存 +} + +type Tracers struct { + m sync.RWMutex + list []*Tracer + + providerId v2.ProviderId + storage *storage.Storage + ready bool +} + +func NewTracers(storage *storage.Storage, providerId v2.ProviderId) *Tracers { + return &Tracers{ + storage: storage, + providerId: providerId, + list: make([]*Tracer, 0), + } +} + +func (w *Tracers) Add(article *v2.ProviderArticle) { + w.m.Lock() + defer w.m.Unlock() + if !slices.ContainsFunc(w.list, func(item *Tracer) bool { + return item.pArticle.SkuID == article.SkuID + }) { + w.list = append(w.list, &Tracer{pArticle: article, lastAts: -1}) + } +} + +func (w *Tracers) Remove(skuID string) { + for i, item := range w.list { + if item.pArticle.SkuID == skuID { + w.m.Lock() + w.list = append(w.list[:i], w.list[i+1:]...) + w.m.Unlock() + return + } + } +} + +func (w *Tracers) Range(filter func(tracer *Tracer) bool, do func(tracer *Tracer)) { + w.m.RLock() + defer w.m.RUnlock() + for _, item := range w.list { + if filter(item) { + do(item) + } + } +} + +func (w *Tracers) Exist(skuID string) bool { + w.m.RLock() + defer w.m.RUnlock() + for _, item := range w.list { + if item.pArticle.SkuID == skuID { + return true + } + } + return false +} + +func (w *Tracers) Load() (err error) { + var articles []*v2.ProviderArticle + articles, err = w.storage.ProviderArticle().Find( + storage.NewGetProviderArticleQuery(). + SetProviderId(w.providerId). + SetTraceAts(true), + ) + if err != nil { + return err + } + w.m.Lock() + defer w.m.Unlock() + for _, article := range articles { + w.list = append(w.list, &Tracer{pArticle: article}) + } + glog.Infof("共加载%d个需要追踪库存的商品\n", len(w.list)) + w.ready = true + return +} + +// 返回ready +func (w *Tracers) Ready() bool { + return w.ready +} diff --git a/cmd/main.go b/cmd/main.go index 864c097..537ee17 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" + ats_tracer "gitea.timerzz.com/kedaya_haitao/coach-spider/ats-tracer" "gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options" "gitea.timerzz.com/kedaya_haitao/coach-spider/server" "gitea.timerzz.com/kedaya_haitao/coach-spider/spider" @@ -15,8 +16,6 @@ import ( "gitea.timerzz.com/kedaya_haitao/common/pkg/proxy" "gitea.timerzz.com/kedaya_haitao/common/pkg/redis" "gitea.timerzz.com/kedaya_haitao/common/pkg/web" - "gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher" - "github.com/cloudwego/kitex/client" "github.com/gofiber/fiber/v3" "github.com/gofiber/fiber/v3/middleware/cors" "github.com/gofiber/fiber/v3/middleware/recover" @@ -46,7 +45,7 @@ func main() { glog.Fatalf("初始化redis失败:%v", err) } - pusher.InitClient("pusher", client.WithHostPorts("pusher:8080")) + //pusher.InitClient("pusher", client.WithHostPorts("pusher:8080")) // 代理池 pool := proxy.NewProxyPool(cfg.Proxy.Subscribes) @@ -72,8 +71,16 @@ func main() { cli, db, ) + + tracer := ats_tracer.NewController( + _ctx, + cfg, + cli, + db, + ) wg.Go(spider.Run) wg.Go(watcher.Run) + wg.Go(tracer.Run) // http r := fiber.New(fiber.Config{ErrorHandler: web.ErrHandle}) @@ -92,6 +99,7 @@ func main() { for _, register := range []web.Register{ server.NewSpiderSvc(spider, cfg.ProviderId), server.NewWatcherSvc(watcher, cfg.ProviderId), + server.NewAtsTracerSvc(tracer, cfg.ProviderId), web.NewProbe().SetReadyProbe(readyProbe), } { register.Registry(r) diff --git a/go.mod b/go.mod index 6266a9e..a95a51c 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,8 @@ go 1.22.2 toolchain go1.22.3 require ( - gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2 + gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8 gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0 - github.com/cloudwego/kitex v0.11.3 github.com/gofiber/fiber/v3 v3.0.0-beta.3 github.com/golang/glog v1.2.1 github.com/pkg/errors v0.9.1 @@ -47,6 +46,7 @@ require ( github.com/cloudwego/gopkg v0.1.2 // indirect github.com/cloudwego/hertz v0.9.1 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/cloudwego/kitex v0.11.3 // indirect github.com/cloudwego/localsession v0.0.2 // indirect github.com/cloudwego/netpoll v0.6.4 // indirect github.com/cloudwego/runtimex v0.1.0 // indirect diff --git a/go.sum b/go.sum index 4866612..50f4bad 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2 h1:gRKzV+KtHoT126BXcaulTarAqmpNgTX9GS6m34NyrRI= -gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241129125918-50b9ed22adb2/go.mod h1:BIz+IMGznPiyLnV1+Ntw1zf8rEIcbymmGq+EfvDsSgE= +gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8 h1:AJo3Y3icJb8wcjeSnx6SjkHFdBKYm5lFscEuo6O4dDM= +gitea.timerzz.com/kedaya_haitao/common v0.0.0-20241202091018-277d73739be8/go.mod h1:BIz+IMGznPiyLnV1+Ntw1zf8rEIcbymmGq+EfvDsSgE= gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0 h1:WMNOErbI6At865VWI3sN74RMQaZ8ZhwsNSB9A4vg/6Q= gitea.timerzz.com/kedaya_haitao/pusher v0.0.0-20241129135359-c16e02a7eab0/go.mod h1:nRdxwOP3hhkUdH3PjHq3gt8SA+YEfR/d7Ig9DuQQZQY= github.com/3andne/restls-client-go v0.1.6 h1:tRx/YilqW7iHpgmEL4E1D8dAsuB0tFF3uvncS+B6I08= diff --git a/pkg/options/init.go b/pkg/options/init.go index 91f8c64..b5ae26a 100644 --- a/pkg/options/init.go +++ b/pkg/options/init.go @@ -3,10 +3,12 @@ package options import ( "fmt" "os" + "strconv" "time" "gitea.timerzz.com/kedaya_haitao/common/pkg/proxy" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" + "github.com/golang/glog" "gopkg.in/yaml.v3" ) @@ -19,6 +21,8 @@ type Config struct { Proxy proxy.Option `yaml:"proxy"` ProviderId v2.ProviderId `yaml:"provider_id"` WatchInterval time.Duration + AtsInterval time.Duration + AtsThreshold int // 库存一定时间内减少多少个通知 } func LoadConfigs() (opt *Config, err error) { @@ -30,10 +34,19 @@ func LoadConfigs() (opt *Config, err error) { } opt.WatchInterval, _ = time.ParseDuration(os.Getenv("WATCH_INTERVAL")) - + opt.AtsInterval, _ = time.ParseDuration(os.Getenv("ATS_INTERVAL")) if opt.WatchInterval == 0 { opt.WatchInterval = 5 * time.Minute } + if opt.AtsInterval == 0 { + opt.AtsInterval = 15 * time.Minute + } + opt.AtsThreshold, _ = strconv.Atoi(os.Getenv("ATS_THRESHOLD")) + if opt.AtsThreshold == 0 { + opt.AtsThreshold = 40 + } + + glog.Infof("加载watch interval %s\nats interval %s\nats threshold %d", opt.WatchInterval, opt.AtsInterval, opt.AtsThreshold) // 加载代理配置 cfgPath := os.Getenv(ProxyConfigEnv) if cfgPath == "" { diff --git a/server/ats-tracer.go b/server/ats-tracer.go new file mode 100644 index 0000000..80e7139 --- /dev/null +++ b/server/ats-tracer.go @@ -0,0 +1,78 @@ +package server + +import ( + "fmt" + "net/url" + "strings" + + ats_tracer "gitea.timerzz.com/kedaya_haitao/coach-spider/ats-tracer" + "gitea.timerzz.com/kedaya_haitao/common/pkg/web" + v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" + "github.com/gofiber/fiber/v3" +) + +type AtsTracerSvc struct { + ctl *ats_tracer.Controller + providerId v2.ProviderId +} + +func NewAtsTracerSvc(ctl *ats_tracer.Controller, providerId v2.ProviderId) *AtsTracerSvc { + return &AtsTracerSvc{ + ctl: ctl, + providerId: providerId, + } +} + +func (s *AtsTracerSvc) Registry(r fiber.Router) { + api := r.Group(fmt.Sprintf("/api/v2/provider/%s/ats-trace", s.providerId)) + api.Post(":skuID", s.AddAtsTracer) + api.Delete(":skuID", s.DelAtsTracer) + api.Post("stop/:skuID", s.StopAtsTracer) +} + +func (s *AtsTracerSvc) AddAtsTracer(ctx fiber.Ctx) (err error) { + skuID := ctx.Params("skuID") + if skuID == "" { + return fmt.Errorf("skuID is empty") + } + if skuID, err = url.QueryUnescape(skuID); err != nil { + return err + } + skuID = strings.ReplaceAll(skuID, " ", "-") + if err = s.ctl.Add(skuID); err != nil { + return + } + return ctx.JSON(web.NewResponse("ok")) +} + +func (s *AtsTracerSvc) DelAtsTracer(ctx fiber.Ctx) (err error) { + skuID := ctx.Params("skuID") + if skuID == "" { + return fmt.Errorf("skuID is empty") + } + if skuID, err = url.QueryUnescape(skuID); err != nil { + return err + } + skuID = strings.ReplaceAll(skuID, " ", "-") + if err = s.ctl.Delete(skuID); err != nil { + return + } + + return ctx.JSON(web.NewResponse("ok")) +} + +func (s *AtsTracerSvc) StopAtsTracer(ctx fiber.Ctx) (err error) { + skuID := ctx.Params("skuID") + if skuID == "" { + return fmt.Errorf("skuID is empty") + } + if skuID, err = url.QueryUnescape(skuID); err != nil { + return err + } + skuID = strings.ReplaceAll(skuID, " ", "-") + if err = s.ctl.Stop(skuID); err != nil { + return + } + + return ctx.JSON(web.NewResponse("ok")) +} diff --git a/spider/controller.go b/spider/controller.go index 1d73849..7e1ebec 100644 --- a/spider/controller.go +++ b/spider/controller.go @@ -248,7 +248,7 @@ func (c *Controller) saveProducts(articles []v2.Article) { oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost) } oldProviderArticle.Cost = pArticle.Cost - oldProviderArticle.Ast = pArticle.Ast + oldProviderArticle.Ats = pArticle.Ats oldProviderArticle.Available = pArticle.Available if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil { glog.Errorf("保存供应商商品信息失败: %v", err) @@ -303,7 +303,7 @@ func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error { Pid: pid, SkuID: pid, Available: resp.Inventory.Orderable, - Ast: resp.Inventory.Ats, + Ats: resp.Inventory.Ats, Image: article.Image, Link: fmt.Sprintf("%s/%s", "https://www.coachoutlet.com", resp.Url), Cost: utils.CalculateProviderPrice( @@ -330,8 +330,8 @@ func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error { if err != nil { return fmt.Errorf("请求商品库存失败: %v", err) } - pArticle.Ast = inv.Ats - return c.storage.ProviderArticle().Update(pArticle, "ast") + pArticle.Ats = inv.Ats + return c.storage.ProviderArticle().Update(pArticle, "ats") } func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) { diff --git a/watcher/controller.go b/watcher/controller.go index a624299..e55cb99 100644 --- a/watcher/controller.go +++ b/watcher/controller.go @@ -10,8 +10,6 @@ import ( coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client" "gitea.timerzz.com/kedaya_haitao/common/structs/storage" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" - "gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push" - "gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher" "github.com/golang/glog" "gorm.io/gorm" ) @@ -51,6 +49,7 @@ func (c *Controller) Run() (err error) { for { select { case <-c.ctx.Done(): + glog.Info("watcher 退出") return case <-ticker.C: c.watchRange() @@ -115,17 +114,19 @@ func (c *Controller) watchRange() { }() if c.doWatch(watcher) { - // 如果蹲到了,需要通知 - resp, err := pusher.Push(c.ctx, &push.PushReq{ - Title: "coach 补货", - Content: fmt.Sprintf("coach 商品 %s 补货\n库存:%d\n链接:%s", watcher.pArticle.SkuID, watcher.pArticle.Ast, watcher.pArticle.Link), - }) - if err != nil { - glog.Errorf("消息推送失败:%v", err) - } - if resp.Code != 0 { - glog.Errorf("消息推送失败:%s", resp.Msg) - } + //// 如果蹲到了,需要通知 + //resp, err := pusher.Push(c.ctx, &push.PushReq{ + // Title: "coach 补货", + // Content: fmt.Sprintf("coach 商品 %s 补货\n库存:%d\n链接:%s", watcher.pArticle.SkuID, watcher.pArticle.Ats, watcher.pArticle.Link), + //}) + //if err != nil { + // glog.Errorf("消息推送失败:%v", err) + //} + //if resp.Code != 0 { + // glog.Errorf("消息推送失败:%s", resp.Msg) + //} + watcher.pArticle.SetWatch(false) + _ = c.storage.ProviderArticle().Update(*watcher.pArticle, "watch") c.watchers.Remove(watcher.pArticle.SkuID) } }() @@ -140,9 +141,9 @@ func (c *Controller) doWatch(watcher *Watcher) (available bool) { return } article.Available = inventory.Orderable && inventory.Ats > 0 - article.Ast = inventory.Ats + article.Ats = inventory.Ats article.SetWatch(!article.Available) - if err = c.storage.ProviderArticle().Update(*article, "available", "ast", "updated_at"); err != nil { + if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil { glog.Errorf("更新数据库失败:%v", err) article.SetWatch(true) return