us-coach-spider/spider/controller.go
timerzz 4770d1cdf6
All checks were successful
Build image / build (push) Successful in 2m32s
feat 支持抓取美国coach
2024-12-06 17:24:57 +08:00

395 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package spider
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"time"
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
"gitea.timerzz.com/kedaya_haitao/common/pkg/cron"
"gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe"
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
"gitea.timerzz.com/kedaya_haitao/common/structs/utils"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
"gorm.io/gorm"
)
type Controller struct {
ctx context.Context
cron *cron.Cron
storage *storage.Storage
client coach_client.USClient
rdb *redis.Client
sub *subscribe.Server
providerId v2.ProviderId
provider v2.Provider
subscribeClient *subscribe.Client
ready bool
}
func NewController(ctx context.Context, providerId v2.ProviderId, client coach_client.USClient, db *gorm.DB, rdb *redis.Client) *Controller {
return &Controller{
ctx: ctx,
providerId: providerId,
client: client,
storage: storage.NewStorage(db),
cron: cron.NewCron(),
rdb: rdb,
sub: subscribe.NewServer(ctx, rdb),
subscribeClient: subscribe.NewClient(rdb),
}
}
func (c *Controller) Run() (err error) {
if err = c.AutoMigrate(); err != nil {
return err
}
if c.provider, err = c.storage.Provider().GetByProvider(c.providerId); err != nil {
return
}
if err = c.ListenProvider(c.ctx, c.rdb); err != nil {
return errors.Wrap(err, "订阅失败")
}
// 定时抓取
go c.cron.SetTimeHHmm(c.provider.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx)
c.ready = true
<-c.ctx.Done()
glog.Infof("controller服务退出")
return nil
}
// 监听配置变更
func (c *Controller) ListenProvider(ctx context.Context, rdb *redis.Client) error {
server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) {
glog.Errorf("供应商信息更新处理失败: %v", err)
})
err := server.Subscribe(utils.ProviderConfigNotifyChannel(c.providerId), c.providerChange)
if err != nil {
return errors.Wrap(err, "订阅失败")
}
go server.Run()
return nil
}
// 供应商信息发生变化
func (c *Controller) providerChange(ctx context.Context, message string) error {
provider, err := c.storage.Provider().GetByProvider(c.providerId)
if err != nil {
return err
}
// 定的拉取时间变了
if provider.Config.Ticker != c.provider.Config.Ticker {
// 重新启动下定时任务
c.cron.Stop()
go c.cron.SetTimeHHmm(provider.Config.Ticker).Run(c.ctx)
}
// 检查要不要重新计算所有providerArticle的价格
var needUpdate = false
if provider.Config.ExchangeRate != c.provider.Config.ExchangeRate || provider.Config.Freight != c.provider.Config.Freight {
needUpdate = true
}
oldProcess := make(map[uint]v2.CalculateProcess, len(c.provider.CalculateProcess))
for _, process := range c.provider.CalculateProcess {
oldProcess[process.ID] = process
}
for _, process := range provider.CalculateProcess {
old, ok := oldProcess[process.ID]
if !ok {
// 不存在,说明新增了
needUpdate = true
break
}
if old.Condition != process.Condition || old.Process != process.Process {
// 条件或者计算过程变了,需要更新
needUpdate = true
}
delete(oldProcess, process.ID)
}
if len(oldProcess) > 0 {
needUpdate = true
}
c.provider.Config = provider.Config
c.provider.CalculateProcess = provider.CalculateProcess
if needUpdate {
var results = make([]v2.ProviderArticle, 0, 20)
err = c.storage.ProviderArticle().FindInBatches(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId), &results, func(tx *gorm.DB, batch int) error {
for idx := range results {
results[idx].Cost = utils.CalculateProviderPrice(append(provider.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{
"originalPrice": results[idx].Cost.OriginalPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
}
return tx.Select("id", "cost").Save(&results).Error
})
}
return err
}
func (c *Controller) AutoMigrate() error {
if err := c.storage.Article().AutoMigrate(); err != nil {
return err
}
if err := c.storage.Provider().AutoMigrate(); err != nil {
return err
}
return c.storage.ProviderArticle().AutoMigrate()
}
func (c *Controller) productsToArticles(products []coach_client.Product) (articles []v2.Article) {
for _, product := range products {
// 一个color是一个sku
for _, color := range product.Colors {
// 如果没找到,说明没有标准商品,创建一个
article := v2.Article{
Name: product.Name,
EnglishName: product.Name,
Pid: color.VgId,
Brand: v2.Brand_Coach,
Image: color.Media.Thumbnail.Src,
Providers: make([]v2.ProviderArticle, 0, 1),
}
link, _ := url.JoinPath(c.client.BaseUrl(), color.Url)
pArticle := v2.ProviderArticle{
ProviderId: c.providerId,
Brand: v2.Brand_Coach,
Pid: color.VgId,
SkuID: color.VgId,
Available: color.Orderable,
Image: color.Media.Thumbnail.Src,
Link: link,
}
// 拿到现在的价格
price, _ := lo.Find(product.VariantsOnSale, func(item coach_client.Variant) bool {
return item.Id == color.VgId
})
// 计算成本
pArticle.Cost = utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
map[string]float64{
"originalPrice": price.Price.Sales.Value,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
article.Providers = append(article.Providers, pArticle)
articles = append(articles, article)
}
}
return
}
// TODO长时间没更新的需要更新
func (c *Controller) Crawl() {
glog.Infof("%s 开始抓取信息", time.Now())
// 开始拉取,修改状态
c.setProviderStatus(v2.ProviderStatus_Pulling)
var msgs = make([]string, 0)
for page, totalPage := 1, -1; page <= totalPage || totalPage == -1; page++ {
resp, err := c.client.ViewAllBags(c.ctx, page)
glog.Infof("开始处理第%d页数据", page)
if err != nil {
msg := fmt.Sprintf("访问coach第%d页失败: %v", page, err)
msgs = append(msgs, msg)
glog.Error(msg)
continue
}
totalPage = resp.PageData.TotalPages
c.saveProducts(c.productsToArticles(resp.PageData.Products))
glog.Infof("第%d页数据保存完成", page)
break
}
// 拉取结束,修改状态
c.provider.PullAt = time.Now()
if len(msgs) > 0 {
c.setProviderStatus(v2.ProviderStatus_Error, strings.Join(msgs, "\n"))
} else {
c.setProviderStatus(v2.ProviderStatus_Normal)
}
glog.Infof("%s 抓取信息结束", time.Now())
}
// 对coach返回的数据进行处理保存
func (c *Controller) saveProducts(articles []v2.Article) {
for _, article := range articles {
oldArticle, err := c.storage.Article().Get(storage.NewGetArticleQuery().SetBrand(v2.Brand_Coach).SetPid(article.Pid))
if err != nil && !errors.As(err, &gorm.ErrRecordNotFound) {
glog.Errorf("获取商品信息失败: %v", err)
continue
}
// 如果已经存在了那么不需要重新创建article
if err == nil {
article.ID = oldArticle.ID
// 查看现在有没有这个供应商的商品
oldProviderArticle, _, exist := lo.FindIndexOf(oldArticle.Providers, func(item v2.ProviderArticle) bool {
return item.ProviderId == c.providerId
})
// 创建供应商商品
if !exist {
oldArticle.Providers = article.Providers
// 保存商品信息
if err = c.storage.Article().Upsert(oldArticle); err != nil {
glog.Errorf("保存商品信息失败: %v", err)
continue
}
} else {
pArticle := article.Providers[0]
if oldProviderArticle.Cost.OriginalPrice != pArticle.Cost.OriginalPrice {
oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost)
}
oldProviderArticle.Cost = pArticle.Cost
oldProviderArticle.Ats = pArticle.Ats
oldProviderArticle.Available = pArticle.Available
if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil {
glog.Errorf("保存供应商商品信息失败: %v", err)
continue
}
}
} else {
// 如果article不存在那么保存整个article
// 保存商品信息
if err = c.storage.Article().Upsert(article); err != nil {
glog.Errorf("保存商品信息失败: %v", err)
continue
}
if err = c.storage.DB().Model(&v2.Article{}).Where("pid = ? and brand = ?", article.Pid, article.Brand).Select("id").First(&article.ID).Error; err != nil {
glog.Errorf("获取商品ID失败: %v", err)
continue
}
}
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil {
glog.Errorf("通知商品利润率失败: %v", err)
}
}
}
func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error {
old, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
if err == nil {
return nil
}
if !errors.As(err, &gorm.ErrRecordNotFound) {
return err
}
resp, err := c.client.RequestProductDetail(ctx, pid)
if err != nil {
return fmt.Errorf("请求商品信息失败: %v", err)
}
article := v2.Article{
Name: resp.Name,
EnglishName: resp.Name,
Pid: resp.Id,
Brand: v2.Brand_Coach,
}
if len(resp.ImageGroups) > 0 {
article.Image = resp.ImageGroups[0].Images[0].Src
}
link, _ := url.JoinPath(c.client.BaseUrl(), resp.Url)
pArticle := v2.ProviderArticle{
ProviderId: c.providerId,
Brand: v2.Brand_Coach,
Pid: pid,
SkuID: pid,
Available: resp.Inventory.Orderable,
Ats: resp.Inventory.Ats,
Image: article.Image,
Link: link,
Cost: utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, old.CalculateProcess...),
map[string]float64{
"originalPrice": resp.Prices.CurrentPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
}),
}
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
article.Providers = append(article.Providers, pArticle)
c.saveProducts([]v2.Article{article})
return nil
}
func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error {
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
inv, err := c.client.RequestInventory(ctx, pid)
if err != nil {
return fmt.Errorf("请求商品库存失败: %v", err)
}
pArticle.Ats = inv.Ats
return c.storage.ProviderArticle().Update(pArticle, "ats")
}
func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) {
inv, err := c.client.RequestInventory(ctx, pid)
if err != nil {
return 0, fmt.Errorf("请求商品库存失败: %v", err)
}
return inv.Ats, nil
}
// 更新某个商品的价格
func (c *Controller) FetchArticlePrice(ctx context.Context, id uint) error {
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetID(id))
if err != nil {
return fmt.Errorf("获取商品信息失败: %v", err)
}
detail, err := c.client.RequestProductDetail(ctx, pArticle.Pid)
if err != nil {
return fmt.Errorf("请求商品库存失败: %v", err)
}
cost := utils.CalculateProviderPrice(
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
map[string]float64{
"originalPrice": detail.Prices.CurrentPrice,
"freight": c.provider.Config.Freight,
"exchangeRate": c.provider.Config.ExchangeRate,
})
pArticle.Available = detail.Inventory.Orderable && detail.Inventory.Ats > 0
// 价格发生变化了,
if pArticle.Cost.OriginalPrice != cost.OriginalPrice {
pArticle.HistoryPrice = append(pArticle.HistoryPrice, cost)
}
pArticle.Cost = cost
if err = c.storage.ProviderArticle().Upsert(pArticle); err != nil {
return fmt.Errorf("保存商品信息失败: %v", err)
}
// 抓取成功,利润率重新计算
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(pArticle.ArticleID))); err != nil {
glog.Warningf("发布消息失败: %v", err)
}
return nil
}
// 修改保存状态
func (c *Controller) setProviderStatus(status v2.ProviderStatus, msg ...string) {
c.provider.Status = status
if len(msg) > 0 {
c.provider.Msg = msg[0]
}
if err := c.storage.Provider().UpdateStatus(c.provider); err != nil {
glog.Errorf("更新状态失败: %v", err)
}
}
func (c *Controller) Ready() bool {
return c.ready
}