2024-09-13 22:47:19 +08:00
|
|
|
|
package spider
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
|
|
|
|
|
"gitea.timerzz.com/kedaya_haitao/common/pkg/cron"
|
|
|
|
|
"gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe"
|
|
|
|
|
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
|
|
|
|
|
"gitea.timerzz.com/kedaya_haitao/common/structs/utils"
|
|
|
|
|
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
|
|
|
|
|
"github.com/golang/glog"
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
|
"github.com/samber/lo"
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Controller struct {
|
|
|
|
|
ctx context.Context
|
|
|
|
|
cron *cron.Cron
|
|
|
|
|
storage *storage.Storage
|
|
|
|
|
client *coach_client.US
|
|
|
|
|
rdb *redis.Client
|
|
|
|
|
sub *subscribe.Server
|
|
|
|
|
providerId v2.ProviderId
|
|
|
|
|
provider v2.Provider
|
|
|
|
|
subscribeClient *subscribe.Client
|
|
|
|
|
ready bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewController(ctx context.Context, providerId v2.ProviderId, client *coach_client.US, db *gorm.DB, rdb *redis.Client) *Controller {
|
|
|
|
|
return &Controller{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
providerId: providerId,
|
|
|
|
|
client: client,
|
|
|
|
|
storage: storage.NewStorage(db),
|
|
|
|
|
cron: cron.NewCron(),
|
|
|
|
|
rdb: rdb,
|
|
|
|
|
sub: subscribe.NewServer(ctx, rdb),
|
|
|
|
|
subscribeClient: subscribe.NewClient(rdb),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Controller) Run() (err error) {
|
|
|
|
|
if err = c.AutoMigrate(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if c.provider, err = c.storage.Provider().GetByProvider(c.providerId); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if err = c.ListenProvider(c.ctx, c.rdb); err != nil {
|
|
|
|
|
return errors.Wrap(err, "订阅失败")
|
|
|
|
|
}
|
|
|
|
|
// 定时抓取
|
|
|
|
|
go c.cron.SetTimeHHmm(c.provider.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx)
|
|
|
|
|
c.ready = true
|
|
|
|
|
<-c.ctx.Done()
|
|
|
|
|
glog.Infof("controller服务退出")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 监听配置变更
|
|
|
|
|
func (c *Controller) ListenProvider(ctx context.Context, rdb *redis.Client) error {
|
|
|
|
|
server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) {
|
|
|
|
|
glog.Errorf("供应商信息更新处理失败: %v", err)
|
|
|
|
|
})
|
|
|
|
|
err := server.Subscribe(utils.ProviderConfigNotifyChannel(c.providerId), c.providerChange)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "订阅失败")
|
|
|
|
|
}
|
|
|
|
|
go server.Run()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 供应商信息发生变化
|
|
|
|
|
func (c *Controller) providerChange(ctx context.Context, message string) error {
|
|
|
|
|
provider, err := c.storage.Provider().GetByProvider(c.providerId)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// 定的拉取时间变了
|
|
|
|
|
if provider.Config.Ticker != c.provider.Config.Ticker {
|
|
|
|
|
// 重新启动下定时任务
|
|
|
|
|
c.cron.Stop()
|
|
|
|
|
go c.cron.SetTimeHHmm(provider.Config.Ticker).Run(c.ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 检查要不要重新计算所有providerArticle的价格
|
|
|
|
|
var needUpdate = false
|
|
|
|
|
if provider.Config.ExchangeRate != c.provider.Config.ExchangeRate || provider.Config.Freight != c.provider.Config.Freight {
|
|
|
|
|
needUpdate = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
oldProcess := make(map[uint]v2.CalculateProcess, len(c.provider.CalculateProcess))
|
|
|
|
|
for _, process := range c.provider.CalculateProcess {
|
|
|
|
|
oldProcess[process.ID] = process
|
|
|
|
|
}
|
|
|
|
|
for _, process := range provider.CalculateProcess {
|
|
|
|
|
old, ok := oldProcess[process.ID]
|
|
|
|
|
if !ok {
|
|
|
|
|
// 不存在,说明新增了
|
|
|
|
|
needUpdate = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if old.Condition != process.Condition || old.Process != process.Process {
|
|
|
|
|
// 条件或者计算过程变了,需要更新
|
|
|
|
|
needUpdate = true
|
|
|
|
|
}
|
|
|
|
|
delete(oldProcess, process.ID)
|
|
|
|
|
}
|
|
|
|
|
if len(oldProcess) > 0 {
|
|
|
|
|
needUpdate = true
|
|
|
|
|
}
|
|
|
|
|
c.provider.Config = provider.Config
|
|
|
|
|
c.provider.CalculateProcess = provider.CalculateProcess
|
|
|
|
|
|
|
|
|
|
if needUpdate {
|
|
|
|
|
var results = make([]v2.ProviderArticle, 0, 20)
|
|
|
|
|
err = c.storage.ProviderArticle().FindInBatches(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId), &results, func(tx *gorm.DB, batch int) error {
|
|
|
|
|
for idx := range results {
|
|
|
|
|
results[idx].Cost = utils.CalculateProviderPrice(append(provider.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{
|
|
|
|
|
"originalPrice": results[idx].Cost.OriginalPrice,
|
|
|
|
|
"freight": c.provider.Config.Freight,
|
|
|
|
|
"exchangeRate": c.provider.Config.ExchangeRate,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return tx.Select("id", "cost").Save(&results).Error
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Controller) AutoMigrate() error {
|
|
|
|
|
if err := c.storage.Article().AutoMigrate(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := c.storage.Provider().AutoMigrate(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return c.storage.ProviderArticle().AutoMigrate()
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-21 14:36:31 +08:00
|
|
|
|
func (c *Controller) productsToArticles(products []coach_client.Product) (articles []v2.Article) {
|
|
|
|
|
for _, product := range products {
|
|
|
|
|
// 一个color是一个sku
|
|
|
|
|
for _, color := range product.Colors {
|
|
|
|
|
// 如果没找到,说明没有标准商品,创建一个
|
|
|
|
|
article := v2.Article{
|
|
|
|
|
Name: product.Name,
|
|
|
|
|
EnglishName: product.Name,
|
|
|
|
|
Pid: color.VgId,
|
|
|
|
|
Brand: v2.Brand_Coach,
|
|
|
|
|
Image: color.Media.Thumbnail.Src,
|
|
|
|
|
Providers: make([]v2.ProviderArticle, 0, 1),
|
|
|
|
|
}
|
|
|
|
|
pArticle := v2.ProviderArticle{
|
|
|
|
|
ProviderId: c.providerId,
|
|
|
|
|
Brand: v2.Brand_Coach,
|
|
|
|
|
Pid: color.VgId,
|
|
|
|
|
SkuID: color.VgId,
|
|
|
|
|
Available: color.Orderable,
|
|
|
|
|
Link: fmt.Sprintf("%s/%s", "https://www.coachoutlet.com", color.Url),
|
|
|
|
|
}
|
|
|
|
|
// 拿到现在的价格
|
|
|
|
|
price, _ := lo.Find(product.VariantsOnSale, func(item coach_client.Variant) bool {
|
|
|
|
|
return item.Id == color.VgId
|
|
|
|
|
})
|
|
|
|
|
// 计算成本
|
|
|
|
|
pArticle.Cost = utils.CalculateProviderPrice(
|
|
|
|
|
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
|
|
|
|
|
map[string]float64{
|
|
|
|
|
"originalPrice": price.Price.Sales.Value,
|
|
|
|
|
"freight": c.provider.Config.Freight,
|
|
|
|
|
"exchangeRate": c.provider.Config.ExchangeRate,
|
|
|
|
|
})
|
|
|
|
|
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
|
|
|
|
|
article.Providers = append(article.Providers, pArticle)
|
|
|
|
|
articles = append(articles, article)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO长时间没更新的,需要更新
|
2024-09-13 22:47:19 +08:00
|
|
|
|
func (c *Controller) Crawl() {
|
|
|
|
|
glog.Infof("%s 开始抓取信息", time.Now())
|
|
|
|
|
// 开始拉取,修改状态
|
|
|
|
|
c.setProviderStatus(v2.ProviderStatus_Pulling)
|
|
|
|
|
|
|
|
|
|
var msgs = make([]string, 0)
|
|
|
|
|
for page, totalPage := 1, -1; page <= totalPage || totalPage == -1; page++ {
|
|
|
|
|
resp, err := c.client.ViewAllBags(c.ctx, page)
|
|
|
|
|
glog.Infof("开始处理第%d页数据", page)
|
|
|
|
|
if err != nil {
|
|
|
|
|
msg := fmt.Sprintf("访问coach第%d页失败: %v", page, err)
|
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
|
glog.Error(msg)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
totalPage = resp.PageData.TotalPages
|
2024-11-21 14:36:31 +08:00
|
|
|
|
c.saveProducts(c.productsToArticles(resp.PageData.Products))
|
2024-09-13 22:47:19 +08:00
|
|
|
|
glog.Infof("第%d页数据保存完成", page)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 拉取结束,修改状态
|
|
|
|
|
c.provider.PullAt = time.Now()
|
|
|
|
|
if len(msgs) > 0 {
|
|
|
|
|
c.setProviderStatus(v2.ProviderStatus_Error, strings.Join(msgs, "\n"))
|
|
|
|
|
} else {
|
|
|
|
|
c.setProviderStatus(v2.ProviderStatus_Normal)
|
|
|
|
|
}
|
|
|
|
|
glog.Infof("%s 抓取信息结束", time.Now())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 对coach返回的数据进行处理保存
|
2024-11-21 14:36:31 +08:00
|
|
|
|
func (c *Controller) saveProducts(articles []v2.Article) {
|
|
|
|
|
for _, article := range articles {
|
|
|
|
|
oldArticle, err := c.storage.Article().Get(storage.NewGetArticleQuery().SetBrand(v2.Brand_Coach).SetPid(article.Pid))
|
|
|
|
|
if err != nil && !errors.As(err, &gorm.ErrRecordNotFound) {
|
|
|
|
|
glog.Errorf("获取商品信息失败: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// 如果已经存在了,那么不需要重新创建article
|
|
|
|
|
if err == nil {
|
|
|
|
|
// 查看现在有没有这个供应商的商品
|
|
|
|
|
oldProviderArticle, _, exist := lo.FindIndexOf(oldArticle.Providers, func(item v2.ProviderArticle) bool {
|
2024-09-13 22:47:19 +08:00
|
|
|
|
return item.ProviderId == c.providerId
|
|
|
|
|
})
|
2024-11-21 14:36:31 +08:00
|
|
|
|
// 创建供应商商品
|
2024-09-13 22:47:19 +08:00
|
|
|
|
if !exist {
|
2024-11-21 14:36:31 +08:00
|
|
|
|
oldArticle.Providers = append(oldArticle.Providers, article.Providers...)
|
|
|
|
|
// 保存商品信息
|
|
|
|
|
if err = c.storage.Article().Upsert(article); err != nil {
|
|
|
|
|
glog.Errorf("保存商品信息失败: %v", err)
|
|
|
|
|
continue
|
2024-09-13 22:47:19 +08:00
|
|
|
|
}
|
|
|
|
|
} else {
|
2024-11-21 14:36:31 +08:00
|
|
|
|
pArticle := article.Providers[0]
|
|
|
|
|
if oldProviderArticle.Cost.OriginalPrice != pArticle.Cost.OriginalPrice {
|
|
|
|
|
oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost)
|
|
|
|
|
}
|
|
|
|
|
oldProviderArticle.Cost = pArticle.Cost
|
|
|
|
|
oldProviderArticle.Ast = pArticle.Ast
|
|
|
|
|
if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil {
|
|
|
|
|
glog.Errorf("保存供应商商品信息失败: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-09-13 22:47:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-11-21 14:36:31 +08:00
|
|
|
|
} else {
|
|
|
|
|
// 如果article不存在,那么保存整个article
|
2024-09-13 22:47:19 +08:00
|
|
|
|
// 保存商品信息
|
|
|
|
|
if err = c.storage.Article().Upsert(article); err != nil {
|
|
|
|
|
glog.Errorf("保存商品信息失败: %v", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-11-21 14:36:31 +08:00
|
|
|
|
|
|
|
|
|
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil {
|
|
|
|
|
glog.Errorf("通知商品利润率失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error {
|
2024-11-21 15:44:31 +08:00
|
|
|
|
old, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
|
|
|
|
|
if err == nil {
|
|
|
|
|
return fmt.Errorf("该商品已经存在")
|
|
|
|
|
}
|
|
|
|
|
if !errors.As(err, &gorm.ErrRecordNotFound) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-11-21 14:36:31 +08:00
|
|
|
|
resp, err := c.client.RequestProductDetail(ctx, pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("请求商品信息失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
article := v2.Article{
|
|
|
|
|
Name: resp.Name,
|
|
|
|
|
EnglishName: resp.Name,
|
|
|
|
|
Pid: resp.Id,
|
|
|
|
|
Brand: v2.Brand_Coach,
|
|
|
|
|
}
|
2024-11-21 15:44:31 +08:00
|
|
|
|
pArticle := v2.ProviderArticle{
|
|
|
|
|
ProviderId: c.providerId,
|
|
|
|
|
Brand: v2.Brand_Coach,
|
|
|
|
|
Pid: pid,
|
|
|
|
|
SkuID: pid,
|
|
|
|
|
Available: resp.Inventory.Orderable,
|
|
|
|
|
Ast: resp.Inventory.Ats,
|
|
|
|
|
Link: fmt.Sprintf("%s/%s", "https://www.coachoutlet.com", resp.Url),
|
|
|
|
|
Cost: utils.CalculateProviderPrice(
|
|
|
|
|
append(c.provider.CalculateProcess, old.CalculateProcess...),
|
|
|
|
|
map[string]float64{
|
|
|
|
|
"originalPrice": resp.Prices.CurrentPrice,
|
|
|
|
|
"freight": c.provider.Config.Freight,
|
|
|
|
|
"exchangeRate": c.provider.Config.ExchangeRate,
|
|
|
|
|
}),
|
|
|
|
|
}
|
|
|
|
|
pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost)
|
|
|
|
|
article.Providers = append(article.Providers, pArticle)
|
2024-11-21 14:36:31 +08:00
|
|
|
|
if len(resp.ImageGroups) > 0 {
|
|
|
|
|
article.Image = resp.ImageGroups[0].Images[0].Src
|
|
|
|
|
}
|
|
|
|
|
c.saveProducts([]v2.Article{article})
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-21 15:44:31 +08:00
|
|
|
|
func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error {
|
2024-11-21 14:36:31 +08:00
|
|
|
|
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("获取商品信息失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
inv, err := c.client.RequestInventory(ctx, pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("请求商品库存失败: %v", err)
|
2024-09-13 22:47:19 +08:00
|
|
|
|
}
|
2024-11-21 14:36:31 +08:00
|
|
|
|
pArticle.Ast = inv.Ats
|
|
|
|
|
return c.storage.ProviderArticle().Update(pArticle, "ast")
|
2024-09-13 22:47:19 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-11-21 17:21:17 +08:00
|
|
|
|
func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) {
|
|
|
|
|
inv, err := c.client.RequestInventory(ctx, pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("请求商品库存失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
return inv.Ats, nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-09-13 22:47:19 +08:00
|
|
|
|
// 更新某个商品的价格
|
|
|
|
|
func (c *Controller) FetchArticlePrice(ctx context.Context, id uint) error {
|
|
|
|
|
pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetID(id))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("获取商品信息失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
detail, err := c.client.RequestProductDetail(ctx, pArticle.Pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("请求商品库存失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
cost := utils.CalculateProviderPrice(
|
|
|
|
|
append(c.provider.CalculateProcess, pArticle.CalculateProcess...),
|
|
|
|
|
map[string]float64{
|
|
|
|
|
"originalPrice": detail.Prices.CurrentPrice,
|
|
|
|
|
"freight": c.provider.Config.Freight,
|
|
|
|
|
"exchangeRate": c.provider.Config.ExchangeRate,
|
|
|
|
|
})
|
|
|
|
|
pArticle.Available = detail.Inventory.Orderable && detail.Inventory.Ats > 0
|
|
|
|
|
// 价格发生变化了,
|
|
|
|
|
if pArticle.Cost.OriginalPrice != cost.OriginalPrice {
|
|
|
|
|
pArticle.HistoryPrice = append(pArticle.HistoryPrice, cost)
|
|
|
|
|
}
|
|
|
|
|
pArticle.Cost = cost
|
|
|
|
|
if err = c.storage.ProviderArticle().Upsert(pArticle); err != nil {
|
|
|
|
|
return fmt.Errorf("保存商品信息失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
// 抓取成功,利润率重新计算
|
|
|
|
|
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(pArticle.ArticleID))); err != nil {
|
|
|
|
|
glog.Warningf("发布消息失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 修改保存状态
|
|
|
|
|
func (c *Controller) setProviderStatus(status v2.ProviderStatus, msg ...string) {
|
|
|
|
|
c.provider.Status = status
|
|
|
|
|
if len(msg) > 0 {
|
|
|
|
|
c.provider.Msg = msg[0]
|
|
|
|
|
}
|
|
|
|
|
if err := c.storage.Provider().UpdateStatus(c.provider); err != nil {
|
|
|
|
|
glog.Errorf("更新状态失败: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Controller) Ready() bool {
|
|
|
|
|
return c.ready
|
|
|
|
|
}
|