Created
December 27, 2022 03:18
-
-
Save Phathdt/dbd136fd7bee605be34c58138bd92efd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule OpolloBackgroundJob.Shopee.V2.PullingInRangeSKURatingTask do | |
require Logger | |
@delay_next_shop 15 | |
# "2021-11-05T00:00:00Z", "2021-11-0T00:00:00Z" | |
def perform(start_time, end_time) do | |
Logger.warn( | |
"---------- STARTING Shopee PullingInRangeSKURatingTask start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
process(start_time, end_time) | |
Logger.warn( | |
"---------- COMPLETED Shopee PullingInRangeSKURatingTask start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
end | |
# 4, "2021-11-05T00:00:00Z", "2021-11-0T00:00:00Z" | |
def perform(channel_code, start_time, end_time) do | |
Logger.warn( | |
"---------- STARTING Shopee PullingInRangeSKURatingTask channel_code = #{channel_code} start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
process(channel_code, start_time, end_time) | |
Logger.warn( | |
"---------- COMPLETED Shopee PullingInRangeSKURatingTask channel_code = #{channel_code} start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
end | |
defp process(start_time, end_time) do | |
with {:ok, start_time, _} <- DateTime.from_iso8601(start_time), | |
{:ok, end_time, _} <- DateTime.from_iso8601(end_time) do | |
Utils.CacheStore.list_shopee_app_shop_erp() | |
|> Enum.filter(&(&1.shop.active && &1.shop.sku_rating_active)) | |
|> Enum.each(fn %{channel: channel} = shop_data -> | |
Logger.warn( | |
"---------- STARTING Shopee PullingInRangeSKURatingTask process channel_code = #{channel.code} start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
pulling(shop_data, start_time, end_time) | |
Process.sleep((@delay_next_shop + Enum.random(0..10)) * 1000) | |
end) | |
end | |
end | |
defp process(channel_code, start_time, end_time) do | |
with {:ok, start_time, _} <- DateTime.from_iso8601(start_time), | |
{:ok, end_time, _} <- DateTime.from_iso8601(end_time) do | |
Utils.CacheStore.list_shopee_app_shop_erp() | |
|> Enum.find( | |
&(&1.shop.active && &1.channel.active && &1.shop.sku_rating_active && | |
&1.shop.channel_code == channel_code) | |
) | |
|> case do | |
nil -> | |
Logger.error( | |
"Shopee PullingInRangeSKURatingTask cannot find channel_code = #{channel_code}" | |
) | |
%{channel: channel} = shop_data -> | |
Logger.warn( | |
"---------- STARTING Shopee PullingInRangeSKURatingTask process channel_code = #{channel.code} start_time = #{start_time} end_time = #{end_time} ----------" | |
) | |
pulling(shop_data, start_time, end_time) | |
end | |
end | |
end | |
defp pulling(shop_data, start_time, end_time) do | |
with :ok <- | |
OpolloBackgroundJob.Shopee.V2.PullingSKURatingTask.perform( | |
shop_data, | |
start_time, | |
end_time | |
) do | |
OpolloCore.CRM.update_sku_rating_deleted_by_platform(shop_data, start_time, end_time) | |
:ok | |
else | |
_ -> :error | |
end | |
end | |
end | |
defmodule OpolloBackgroundJob.Shopee.V2.PullingSKURatingTask do | |
import Ecto.Query, warn: false | |
require Logger | |
@customer_fields [ | |
:id, | |
:code, | |
:full_name, | |
:channel_id, | |
:ecom_customer_id, | |
:channel_platform | |
] | |
def perform(%{shop: shop, channel: channel}, time_start, time_end) do | |
pull_range_str = | |
"#{Utils.DateTime.format_default(time_start)} - #{Utils.DateTime.format_default(time_end)}" | |
Logger.info( | |
"[Shopee PullingSKURatingTask] Start pulling SKU rating for shop #{shop.channel_code}: #{pull_range_str}" | |
) | |
pull_data_for_single_shop(shop, channel, %{ | |
page_number: 1, | |
ctime_start: DateTime.to_unix(time_start), | |
ctime_end: DateTime.to_unix(time_end) - 1 | |
}) | |
end | |
def pull_data_for_single_shop(shopee_shop, channel, %{page_number: page_number} = params) do | |
with {:auth_info, %{"sku_rating" => %{"cookie" => cookie, "spc_cds" => spc_cds}}} <- | |
{:auth_info, shopee_shop.auth_info}, | |
{:ok, %{"data" => %{"list" => sku_ratings}}} <- | |
CRM.Shopee.Ecommerce.get_ratings(spc_cds, cookie, params) do | |
handle_sku_ratings(sku_ratings, channel) | |
if length(sku_ratings) === CRM.Shopee.Constant.page_size() do | |
last_sku_ratings = List.last(sku_ratings) | |
new_params = | |
Map.merge(params, %{ | |
page_number: page_number + 1, | |
from_page_number: page_number, | |
cursor: last_sku_ratings["comment_id"] | |
}) | |
pull_data_for_single_shop( | |
shopee_shop, | |
channel, | |
new_params | |
) | |
else | |
:ok | |
end | |
else | |
{:auth_info, _} -> | |
pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params) | |
{:ok, %{"errcode" => 2, "message" => "token not found"}} -> | |
pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params) | |
error -> | |
Logger.error( | |
"[Shopee PullingSKURatingTask] shop #{shopee_shop.channel_code} and #{channel.id} failed: #{inspect(error)}" | |
) | |
:error | |
end | |
end | |
defp pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params) do | |
# check redis | |
spc_cds = UUID.uuid4() | |
with {:ok, cookie} <- | |
CRM.Shopee.Auth.login(spc_cds, shopee_shop.username, shopee_shop.password), | |
{:ok, updated_shop} <- | |
OpolloCore.Shopee.update_shopee_shop(shopee_shop, %{ | |
auth_info: | |
Map.merge(shopee_shop.auth_info || %{}, %{ | |
"sku_rating" => %{ | |
"cookie" => "#{cookie}", | |
"spc_cds" => "#{spc_cds}" | |
} | |
}) | |
}) do | |
Utils.CacheStore.update_list_shopee_app_shop() | |
pull_data_for_single_shop(updated_shop, channel, params) | |
else | |
error -> | |
Logger.error( | |
"[Shopee PullingSKURatingTask] Pulling SKU rating for shop #{shopee_shop.channel_code} failed: #{inspect(error)}" | |
) | |
:error | |
end | |
end | |
defp get_existed_sku_ratings(pulled_sku_ratings) do | |
sku_rating_codes = Enum.map(pulled_sku_ratings, &generate_code(&1["comment_id"])) | |
from(sku_rating in OpolloCore.CRM.SKURating, | |
preload: [:customer], | |
where: sku_rating.code in ^sku_rating_codes | |
) | |
|> OpolloCore.Repo.all() | |
|> Enum.into(%{}, &{&1.code, &1}) | |
end | |
defp get_existed_customers(pulled_sku_ratings) do | |
customer_codes = | |
Enum.map( | |
pulled_sku_ratings, | |
&generate_customer_code(&1["user_id"]) | |
) | |
from(customer in OpolloCore.Onpoint.Customer, | |
where: customer.code in ^customer_codes | |
) | |
|> OpolloCore.Repo.all() | |
|> Enum.into(%{}, &{&1.code, &1}) | |
end | |
defp get_existed_products(channel, pulled_sku_ratings) do | |
shopee_shop = Utils.CacheStore.fetch_shopee_shop_by_channel_code(channel.code) | |
shopee_url = Utils.RegionHelper.shopee_url(shopee_shop.region) | |
item_ids = Enum.map(pulled_sku_ratings, &"#{&1["product_id"]}") | |
from(product in OpolloCore.Shopee.ShopeeProduct, | |
join: shop in OpolloCore.Shopee.ShopeeShop, | |
on: shop.channel_code == product.channel_code, | |
where: | |
shop.channel_code == ^channel.code and | |
product.item_id in ^item_ids, | |
select: %{ | |
item_id: product.item_id, | |
sku: product.sku, | |
product_name: product.name, | |
product_url: | |
fragment( | |
"CONCAT(?, '/product/', ?, '/', ?)", | |
type(^shopee_url, :string), | |
shop.shopid, | |
product.item_id | |
) | |
} | |
) | |
|> OpolloCore.Repo.all() | |
|> Enum.into(%{}, &{&1.item_id, &1}) | |
end | |
defp upsert_customers(customers) do | |
OpolloCore.Repo.insert_all( | |
OpolloCore.Onpoint.Customer, | |
customers, | |
on_conflict: | |
{:replace, | |
[ | |
:code, | |
:full_name, | |
:channel_id, | |
:ecom_customer_id, | |
:channel_platform | |
]}, | |
conflict_target: [:id], | |
returning: true | |
) | |
end | |
defp upsert_sku_ratings(sku_ratings) do | |
OpolloCore.Repo.insert_all( | |
OpolloCore.CRM.SKURating, | |
sku_ratings, | |
on_conflict: | |
{:replace, | |
[ | |
:sku_rating_date, | |
:content, | |
:status, | |
:platform_verify_status, | |
:reply, | |
:product_name, | |
:product_id, | |
:product_url, | |
:seller_sku, | |
:order_id, | |
:ecom_order_id, | |
:rating, | |
:images, | |
:updated_at | |
]}, | |
conflict_target: [:code] | |
) | |
end | |
def handle_sku_ratings(pulled_sku_ratings, channel) do | |
existed_customers = get_existed_customers(pulled_sku_ratings) | |
existed_sku_ratings = get_existed_sku_ratings(pulled_sku_ratings) | |
existed_products = get_existed_products(channel, pulled_sku_ratings) | |
existed_params = %{ | |
channel: channel, | |
existed_products: existed_products, | |
existed_customers: existed_customers, | |
existed_sku_ratings: existed_sku_ratings | |
} | |
pulled_data_zipped = | |
Enum.map(pulled_sku_ratings, &normalize_pulled_sku_rating(&1, existed_params)) | |
sku_rating_codes_mapper = | |
Enum.reduce(pulled_data_zipped, %{}, fn {sku_rating, customer}, acc -> | |
Map.put(acc, sku_rating.code, customer.code) | |
end) | |
{sku_ratings, customers} = Enum.unzip(pulled_data_zipped) | |
customers = Enum.uniq_by(customers, & &1.code) | |
{_, customers} = upsert_customers(customers) | |
customer_codes_mapper = | |
Enum.reduce(customers, %{}, fn customer, acc -> | |
Map.put(acc, customer.code, customer.id) | |
end) | |
time_now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) | |
sku_ratings = | |
Enum.map(sku_ratings, fn sku_rating -> | |
customer_code = sku_rating_codes_mapper[sku_rating.code] | |
customer_id = customer_codes_mapper[customer_code] | |
Map.merge(sku_rating, %{ | |
customer_id: customer_id, | |
inserted_at: time_now, | |
updated_at: time_now | |
}) | |
end) | |
upsert_sku_ratings(sku_ratings) | |
end | |
def generate_code(comment_id), | |
do: "SKUR-#{OpolloCore.Onpoint.PlatformShortCode.shopee()}-#{comment_id}" | |
def generate_customer_code(customer_id), | |
do: "#{OpolloCore.Onpoint.PlatformShortCode.shopee()}-#{customer_id}" | |
def timestamp_to_datetime(timestamp), do: DateTime.from_unix!(timestamp) |> DateTime.to_naive() | |
def generate_order_url(order_id), do: "https://banhang.shopee.vn/portal/sale/#{order_id}" | |
def generate_image(image_id), do: "https://cf.shopee.vn/file/#{image_id}_tn" | |
def generate_images(image_ids \\ []) do | |
if Enum.count(image_ids) !== 0, | |
do: image_ids |> Enum.map(&"https://cf.shopee.vn/file/#{&1}"), | |
else: nil | |
end | |
def normalize_pulled_sku_rating(pulled_sku_rating, %{ | |
channel: channel, | |
existed_products: existed_products, | |
existed_customers: existed_customers, | |
existed_sku_ratings: existed_sku_ratings | |
}) do | |
%{ | |
"images" => images, | |
"comment_id" => id, | |
"comment" => comment, | |
"user_id" => user_id, | |
"order_sn" => order_sn, | |
"order_id" => order_id, | |
"rating_star" => rating, | |
"ctime" => comment_time, | |
"user_name" => user_name, | |
"product_id" => product_id, | |
"product_cover" => product_cover | |
} = pulled_sku_rating | |
code = generate_code(id) | |
customer_code = generate_customer_code(user_id) | |
existed_sku_rating = existed_sku_ratings[code] | |
existed_customer = existed_customers[customer_code] | |
reply = if !is_nil(pulled_sku_rating["reply"]), do: pulled_sku_rating["reply"] | |
platform_reply = if !is_nil(reply) and reply["comment"] not in ["", nil], do: reply["comment"] | |
onpoint_reply = | |
if !is_nil(existed_sku_rating) and existed_sku_rating.reply not in ["", nil], | |
do: existed_sku_rating.reply | |
customer = | |
cond do | |
!is_nil(existed_sku_rating) -> | |
existed_sku_rating.customer | |
!is_nil(existed_customer) -> | |
existed_customer | |
true -> | |
%{} | |
end | |
{seller_sku, product_id, product_url, product_name} = | |
case existed_products[product_id] do | |
nil -> | |
{nil, nil, nil, nil} | |
shopee_product -> | |
{ | |
shopee_product.sku, | |
shopee_product.item_id, | |
shopee_product.product_url, | |
shopee_product.product_name | |
} | |
end | |
status = | |
cond do | |
is_nil(product_id) -> | |
OpolloCore.CRM.SKURatingStatus.product_deleted() | |
!is_nil(platform_reply) -> | |
OpolloCore.CRM.SKURatingStatus.replied() | |
true -> | |
OpolloCore.CRM.SKURatingStatus.not_replied() | |
end | |
platform_verify_status = | |
cond do | |
!is_nil(platform_reply) -> | |
OpolloCore.CRM.SKURatingPlatformVerifyStatus.answered() | |
is_nil(platform_reply) and !is_nil(onpoint_reply) -> | |
OpolloCore.CRM.SKURatingPlatformVerifyStatus.failed() | |
true -> | |
nil | |
end | |
sku_rating = %{ | |
code: code, | |
status: status, | |
content: comment, | |
order_id: "#{order_sn}", | |
rating: rating * 1.0, | |
channel_id: channel.id, | |
sku_rating_id: "#{id}", | |
product_id: "#{product_id}", | |
seller_sku: seller_sku, | |
product_name: product_name, | |
ecom_order_id: "#{order_id}", | |
images: generate_images(images), | |
reply: platform_reply || onpoint_reply, | |
order_url: generate_order_url("#{order_id}"), | |
product_image: generate_image(product_cover), | |
sku_rating_date: timestamp_to_datetime(comment_time), | |
product_url: product_url, | |
platform_verify_status: platform_verify_status | |
} | |
customer = | |
Map.merge( | |
Map.take(customer, @customer_fields), | |
%{ | |
code: customer_code, | |
full_name: user_name, | |
channel_id: channel.id, | |
ecom_customer_id: "#{user_id}", | |
channel_platform: OpolloCore.Onpoint.PlatformCode.shopee() | |
} | |
) | |
{sku_rating, customer} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment