us-coach-spider/ats-tracer/controller.go
timerzz 764c144f86
All checks were successful
Build image / build (push) Successful in 2m26s
完善供应商爬虫功能
- 实现定时抓取供应商商品信息功能
- 添加供应商配置变更监听及处理逻辑
- 完善商品价格计算及更新机制
- 添加商品库存跟踪功能
- 实现商品详情抓取及更新接口
- 优化错误处理及日志记录
- 添加服务就绪状态检查
- 完善数据库迁移及初始化逻辑"
2025-03-27 17:13:04 +08:00

188 lines
5.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package ats_tracer
import (
"context"
"fmt"
"sync"
"time"
"gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options"
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push"
"gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
type Controller struct {
ctx context.Context
m sync.RWMutex
// 要追踪的ProviderArticle
tracers *Tracers
storage *storage.Storage
client coach_client.USCAClient
providerId v2.ProviderId
interval time.Duration
threshold int
}
func NewController(ctx context.Context, cfg *options.Config, client coach_client.USCAClient, db *gorm.DB) *Controller {
return &Controller{
ctx: ctx,
providerId: cfg.ProviderId,
interval: cfg.AtsInterval,
threshold: cfg.AtsThreshold,
client: client,
storage: storage.NewStorage(db),
tracers: NewTracers(storage.NewStorage(db), cfg.ProviderId),
}
}
func (c *Controller) Run() (err error) {
// 加载要追踪的ProviderArticle
if err = c.tracers.Load(); err != nil {
return
}
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
logrus.Infof("tracer退出")
return
case <-ticker.C:
c.traceRange()
}
}
}
// 清理24小时前的ProviderAts
func (c *Controller) CleanAts() error {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
logrus.Infof("cleaner退出")
return nil
case <-ticker.C:
c.storage.DB().Delete(v2.ProviderAts{}, "created_at < ?", time.Now().Add(-24*time.Hour))
}
}
}
// 返回ready
func (c *Controller) Ready() bool {
return c.tracers.Ready()
}
// Add 添加一个要追踪库存的ProviderArticle
func (c *Controller) Add(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.SetTraceAts(true)
article.Available = false
if err = c.storage.ProviderArticle().Update(article, "trace_ats", "available"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Add(&article)
return nil
}
func (c *Controller) Delete(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.TraceAts = nil
if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Remove(skuID)
return nil
}
func (c *Controller) Stop(skuID string) error {
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
article.SetTraceAts(false)
if err = c.storage.ProviderArticle().Update(article, "trace_ats"); err != nil {
return fmt.Errorf("更新数据库失败:%v", err)
}
c.tracers.Remove(skuID)
return nil
}
func (c *Controller) traceRange() {
c.tracers.Range(func(tracer *Tracer) bool {
return !tracer.tracing
}, func(tracer *Tracer) {
go func() {
tracer.tracing = true
defer func() {
tracer.tracing = false
}()
if c.doTrace(tracer) {
//如果蹲到了,需要通知
resp, err := pusher.Push(c.ctx, &push.PushReq{
Title: "coach 断货",
Content: fmt.Sprintf("coach 商品 %s 断货了\n库存为0\n链接%s", tracer.pArticle.SkuID, tracer.pArticle.Link),
})
if err != nil {
logrus.Errorf("消息推送失败:%v", err)
}
if resp.Code != 0 {
logrus.Errorf("消息推送失败:%s", resp.Msg)
}
tracer.pArticle.SetTraceAts(false)
_ = c.storage.ProviderArticle().Update(*tracer.pArticle, "trace_ats")
c.tracers.Remove(tracer.pArticle.SkuID)
}
}()
})
}
func (c *Controller) doTrace(tracer *Tracer) (available bool) {
article := tracer.pArticle
inventory, err := c.client.RequestInventory(c.ctx, article.SkuID)
if err != nil {
logrus.Warningf("获取coach %s 库存失败:%v", article.SkuID, err)
return
}
article.Available = inventory.Orderable && inventory.Ats > 0
article.Ats = inventory.Ats
if article.Ats == 0 {
article.SetTraceAts(false)
}
if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil {
logrus.Errorf("更新数据库失败:%v", err)
article.SetTraceAts(true)
return
}
if article.Ats != tracer.lastAts {
if value := tracer.lastAts - article.Ats; value >= c.threshold {
_, _ = pusher.Push(c.ctx, &push.PushReq{
Title: "coach 商品库存减少",
Content: fmt.Sprintf("coach 商品 %s %s 减少了 %d当前库存%d \n链接%s", tracer.pArticle.SkuID, time.Now().Sub(tracer.lastTraceTime).String(), value, article.Ats, tracer.pArticle.Link),
})
}
c.storage.DB().Create(&v2.ProviderAts{
ProviderArticleID: article.ID,
Ats: article.Ats,
})
}
tracer.lastAts, tracer.lastTraceTime = article.Ats, time.Now()
return article.Ats <= 0
}