us-coach-spider/spider/controller.go
timerzz 8aa605ff87
All checks were successful
Build image / build (push) Successful in 2m22s
fix 修复商品链接不正确的bug
2025-03-29 20:27:52 +08:00

483 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package spider
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"time"
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
"gitea.timerzz.com/kedaya_haitao/common/pkg/cron"
"gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
"gitea.timerzz.com/kedaya_haitao/common/structs/utils"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
type Controller struct {
ctx context.Context
cron *cron.Cron
storage *storage.Storage
client coach_client.USCAClient
rdb *redis.Client
sub *subscribe.Server
providerId v2.ProviderId
provider v2.Provider
subscribeClient *subscribe.Client
ready bool
notCrawl bool
}
func NewController(ctx context.Context, providerId v2.ProviderId, client coach_client.USCAClient, db *gorm.DB, rdb *redis.Client) *Controller {
return &Controller{
ctx: ctx,
providerId: providerId,
client: client,
storage: storage.NewStorage(db),
cron: cron.NewCron(),
rdb: rdb,
sub: subscribe.NewServer(ctx, rdb),
subscribeClient: subscribe.NewClient(rdb),
}
}
func (c *Controller) Run() (err error) {
if err = c.AutoMigrate(); err != nil {
return err
}
if c.provider, err = c.storage.Provider().GetByProvider(c.providerId); err != nil {
return
}
if err = c.ListenProvider(c.ctx, c.rdb); err != nil {
return errors.Wrap(err, "订阅失败")
}
// 定时抓取
go c.cron.SetTimeHHmm(c.provider.Config.Ticker).SetFunc(c.cronCrawl).Run(c.ctx)
c.ready = true
<-c.ctx.Done()
logrus.Infof("controller服务退出")
return nil
}
// 监听配置变更
func (c *Controller) ListenProvider(ctx context.Context, rdb *redis.Client) error {
server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) {
logrus.Errorf("供应商信息更新处理失败: %v", err)
})
err := server.Subscribe(utils.ProviderConfigNotifyChannel(c.providerId), c.providerChange)
if err != nil {
return errors.Wrap(err, "订阅失败")
}
go server.Run()
return nil
}
// 供应商信息发生变化
func (c *Controller) providerChange(ctx context.Context, message string) error {
provider, err := c.storage.Provider().GetByProvider(c.providerId)
if err != nil {
return err
}
// 定的拉取时间变了
if provider.Config.Ticker != c.provider.Config.Ticker {
// 重新启动下定时任务
c.cron.Stop()
go c.cron.SetTimeHHmm(provider.Config.Ticker).Run(c.ctx)
}
// 检查NotCrawl有没有变
if provider.Config.NotCrawl != c.provider.Config.NotCrawl {
c.notCrawl = provider.Config.NotCrawl
}
// 检查要不要重新计算所有providerArticle的价格
var needUpdate = false
if provider.Config.ExchangeRate != c.provider.Config.ExchangeRate || provider.Config.Freight != c.provider.Config.Freight {
needUpdate = true
}
oldProcess := make(map[uint]v2.CalculateProcess, len(c.provider.CalculateProcess))
for _, process := range c.provider.CalculateProcess {
oldProcess[process.ID] = process
}
for _, process := range provider.CalculateProcess {
old, ok := oldProcess[process.ID]
if !ok {
// 不存在,说明新增了
needUpdate = true
break
}
if old.Condition != process.Condition || old.Process != process.Process {
// 条件或者计算过程变了,需要更新
needUpdate = true
}
delete(oldProcess, process.ID)
}
if len(oldProcess) > 0 {
needUpdate = true
}
c.provider.Config = provider.Config
c.provider.CalculateProcess = provider.CalculateProcess
if needUpdate {
logrus.Infof("供应商价格计算过程发生变化,重新计算所有商品价格")
provider.Status = v2.ProviderStatus_Calculating
if err = c.storage.Provider().UpdateStatus(provider); err != nil {
logrus.Errorf("更新供应商状态失败: %v", err)
}
var results = make([]v2.ProviderArticle, 0, 20)
var processed = 0
err = c.storage.ProviderArticle().FindInBatches(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId), &results, func(tx *gorm.DB, batch int) error {
for idx := range results {
results[idx].Cost = utils.CalculateProviderPrice(append(provider.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{
"originalPrice": results[idx].Cost.OriginalPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
}
if err = tx.Select("id", "cost").Save(&results).Error; err != nil {
return err
}
ids := lo.Map(results, func(item v2.ProviderArticle, index int) string {
return fmt.Sprintf("%d", item.ArticleID)
})
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strings.Join(ids, ",")); err != nil {
logrus.Errorf("通知商品利润率失败: %v", err)
}
processed += len(results)
return nil
})
provider.Status = v2.ProviderStatus_Normal
if err = c.storage.Provider().UpdateStatus(provider); err != nil {
logrus.Errorf("更新供应商状态失败: %v", err)
}
logrus.Infof("重新计算所有商品价格完成共%d个", processed)
}
return err
}
func (c *Controller) AutoMigrate() error {
if err := c.storage.Article().AutoMigrate(); err != nil {
return err
}
if err := c.storage.Provider().AutoMigrate(); err != nil {
return err
}
return c.storage.ProviderArticle().AutoMigrate()
}
func (c *Controller) productsToArticles(products []coach_client.Product) (articles []v2.Article) {
for _, product := range products {
// 一个color是一个sku
for _, color := range product.Colors {
article := v2.Article{
Name: product.Name,
EnglishName: product.Name,
Pid: color.VgId,
Brand: v2.Brand_Coach,
Image: color.Media.Thumbnail.Src,
Providers: make([]v2.ProviderArticle, 0, 1),
}
link, _ := url.JoinPath(c.client.BaseUrl(), color.Url)
pArticle := v2.ProviderArticle{
ProviderId: c.providerId,
Brand: v2.Brand_Coach,
Pid: color.VgId,
SkuID: color.VgId,
Available: color.Orderable,
Image: color.Media.Thumbnail.Src,
Link: link,
}
// 拿到现在的价格
price, _ := lo.Find(product.VariantsOnSale, func(item coach_client.Variant) bool {
return item.Id == color.VgId
})
// 计算成本
pArticle.Cost = utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
map[string]float64{
"originalPrice": price.Price.Sales.Value,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
article.Providers = append(article.Providers, pArticle)
articles = append(articles, article)
}
}
return
}
func (c *Controller) cronCrawl() {
if c.notCrawl {
logrus.Info("当前不进行抓取")
return
}
c.Crawl()
}
func (c *Controller) Crawl() {
logrus.Infof("%s 开始抓取信息", time.Now())
// 开始拉取,修改状态
c.setProviderStatus(v2.ProviderStatus_Pulling)
c.crawlAllBags()
c.crawlUpdateBags()
// 拉取结束,修改状态
c.provider.PullAt = time.Now()
c.setProviderStatus(v2.ProviderStatus_Normal)
logrus.Infof("%s 抓取信息结束", time.Now())
}
// 抓取所有包
func (c *Controller) crawlAllBags() {
for page, totalPage := 1, -1; page <= totalPage || totalPage == -1; page++ {
resp, err := c.client.ViewAllBags(c.ctx, page)
logrus.WithField("part", "抓所有包").Infof("开始处理第%d页数据", page)
if err != nil {
logrus.WithField("part", "抓所有包").Errorf("访问coach第%d页失败: %v", page, err)
continue
}
totalPage = resp.PageData.TotalPages
c.saveProducts(c.productsToArticles(resp.PageData.Products))
logrus.WithField("part", "抓所有包").Infof("第%d页数据保存完成", page)
}
}
// 抓取之前没更新的
func (c *Controller) crawlUpdateBags() {
logrus.Info("更新之前没更新的商品信息")
var results = make([]v2.Article, 0, 10)
var total = 0
c.storage.DB().
Joins("LEFT JOIN provider_articles ON provider_articles.article_id=articles.id AND provider_articles.provider_id = ?", c.providerId).
Where("(provider_articles.exclude = false or provider_articles.exclude is null) and (articles.brand = 'coach') and (provider_articles.updated_at is null or provider_articles.updated_at < ?)", time.Now().Add(-time.Hour*24)).
FindInBatches(&results, 10, func(tx *gorm.DB, batch int) error {
total += len(results)
pids := lo.Map(results, func(item v2.Article, index int) string {
return item.Pid
})
list, err := c.client.RequestProductDetailList(c.ctx, pids...)
if err != nil {
logrus.Errorf("请求商品信息失败: %v", err)
return nil
}
for idx, item := range list {
article := &results[idx]
var pArticle v2.ProviderArticle
if item.Id == "" {
pArticle = v2.ProviderArticle{
Pid: article.Pid,
Brand: article.Brand,
ProviderId: c.providerId,
SkuID: article.Pid,
Exclude: true,
}
} else {
link, _ := url.JoinPath(c.client.BaseUrl(), item.Url)
pArticle = v2.ProviderArticle{
Pid: article.Pid,
Brand: article.Brand,
Link: link,
Image: article.Image,
ProviderId: c.providerId,
SkuID: article.Pid,
Ats: item.Inventory.Ats,
Available: item.Inventory.Orderable,
}
pArticle.Cost = utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
map[string]float64{
"originalPrice": item.Prices.CurrentPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
}
article.Providers = append(article.Providers, pArticle)
}
c.saveProducts(results)
return nil
})
logrus.Infof("共%d条数据更新", total)
}
// 对coach返回的数据进行处理保存
func (c *Controller) saveProducts(articles []v2.Article) {
for _, article := range articles {
oldArticle, err := c.storage.Article().Get(storage.NewGetArticleQuery().SetBrand(v2.Brand_Coach).SetPid(article.Pid))
if err != nil && !errors.As(err, &gorm.ErrRecordNotFound) {
logrus.Errorf("获取商品信息失败: %v", err)
continue
}
// 如果已经存在了那么不需要重新创建article
if err == nil {
article.ID = oldArticle.ID
// 查看现在有没有这个供应商的商品
oldProviderArticle, _, exist := lo.FindIndexOf(oldArticle.Providers, func(item v2.ProviderArticle) bool {
return item.ProviderId == c.providerId
})
// 创建供应商商品
if !exist {
oldArticle.Providers = article.Providers
// 保存商品信息
if err = c.storage.Article().Upsert(oldArticle); err != nil {
logrus.Errorf("保存商品信息失败: %v", err)
continue
}
} else {
pArticle := article.Providers[0]
if oldProviderArticle.Cost.OriginalPrice != pArticle.Cost.OriginalPrice {
oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost)
}
oldProviderArticle.Cost = pArticle.Cost
oldProviderArticle.Ats = pArticle.Ats
oldProviderArticle.Available = pArticle.Available
if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil {
logrus.Errorf("保存供应商商品信息失败: %v", err)
continue
}
}
} else {
// 如果article不存在那么保存整个article
// 保存商品信息
if err = c.storage.Article().Upsert(article); err != nil {
logrus.Errorf("保存商品信息失败: %v", err)
continue
}
if err = c.storage.DB().Model(&v2.Article{}).Where("pid = ? and brand = ?", article.Pid, article.Brand).Select("id").First(&article.ID).Error; err != nil {
logrus.Errorf("获取商品ID失败: %v", err)
continue
}
}
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil {
logrus.Errorf("通知商品利润率失败: %v", err)
}
}
}
func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error {
old, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
if err == nil {
return nil
}
if !errors.As(err, &gorm.ErrRecordNotFound) {
return err
}
resp, err := c.client.RequestProductDetail(ctx, pid)
if err != nil {
return fmt.Errorf("请求商品信息失败: %v", err)
}
article := v2.Article{
Name: resp.Name,
EnglishName: resp.Name,
Pid: resp.Id,
Brand: v2.Brand_Coach,
}
if len(resp.ImageGroups) > 0 {
article.Image = resp.ImageGroups[0].Images[0].Src
}
link, _ := url.JoinPath(c.client.BaseUrl(), resp.Url)
pArticle := v2.ProviderArticle{
ProviderId: c.providerId,
Brand: v2.Brand_Coach,
Pid: pid,
SkuID: pid,
Available: resp.Inventory.Orderable,
Ats: resp.Inventory.Ats,
Image: article.Image,
Link: link,
Cost: utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, old.CalculateProcess...),
map[string]float64{
"originalPrice": resp.Prices.CurrentPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
}),
}
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
article.Providers = append(article.Providers, pArticle)
c.saveProducts([]v2.Article{article})
return nil
}
func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error {
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
inv, err := c.client.RequestInventory(ctx, pid)
if err != nil {
return fmt.Errorf("请求商品库存失败: %v", err)
}
pArticle.Ats = inv.Ats
return c.storage.ProviderArticle().Update(pArticle, "ats")
}
func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) {
inv, err := c.client.RequestInventory(ctx, pid)
if err != nil {
return 0, fmt.Errorf("请求商品库存失败: %v", err)
}
return inv.Ats, nil
}
// 更新某个商品的价格
func (c *Controller) FetchArticlePrice(ctx context.Context, id uint) error {
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetID(id))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
detail, err := c.client.RequestProductDetail(ctx, pArticle.Pid)
if err != nil {
return fmt.Errorf("请求商品库存失败: %v", err)
}
cost := utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
map[string]float64{
"originalPrice": detail.Prices.CurrentPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
pArticle.Available = detail.Inventory.Orderable && detail.Inventory.Ats > 0
// 价格发生变化了,
if pArticle.Cost.OriginalPrice != cost.OriginalPrice {
pArticle.HistoryPrice = append(pArticle.HistoryPrice, cost)
}
pArticle.Cost = cost
if err = c.storage.ProviderArticle().Upsert(pArticle); err != nil {
return fmt.Errorf("保存商品信息失败: %v", err)
}
// 抓取成功,利润率重新计算
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(pArticle.ArticleID))); err != nil {
logrus.Warningf("发布消息失败: %v", err)
}
return nil
}
// 修改保存状态
func (c *Controller) setProviderStatus(status v2.ProviderStatus, msg ...string) {
c.provider.Status = status
if len(msg) > 0 {
c.provider.Msg = msg[0]
}
if err := c.storage.Provider().UpdateStatus(c.provider); err != nil {
logrus.Errorf("更新状态失败: %v", err)
}
}
func (c *Controller) Ready() bool {
return c.ready
}