Skip to content

Instantly share code, notes, and snippets.

@Phathdt
Created December 27, 2022 03:18
Show Gist options
  • Save Phathdt/dbd136fd7bee605be34c58138bd92efd to your computer and use it in GitHub Desktop.
Save Phathdt/dbd136fd7bee605be34c58138bd92efd to your computer and use it in GitHub Desktop.
defmodule OpolloBackgroundJob.Shopee.V2.PullingInRangeSKURatingTask do
require Logger
@delay_next_shop 15
# "2021-11-05T00:00:00Z", "2021-11-0T00:00:00Z"
def perform(start_time, end_time) do
Logger.warn(
"---------- STARTING Shopee PullingInRangeSKURatingTask start_time = #{start_time} end_time = #{end_time} ----------"
)
process(start_time, end_time)
Logger.warn(
"---------- COMPLETED Shopee PullingInRangeSKURatingTask start_time = #{start_time} end_time = #{end_time} ----------"
)
end
# 4, "2021-11-05T00:00:00Z", "2021-11-0T00:00:00Z"
def perform(channel_code, start_time, end_time) do
Logger.warn(
"---------- STARTING Shopee PullingInRangeSKURatingTask channel_code = #{channel_code} start_time = #{start_time} end_time = #{end_time} ----------"
)
process(channel_code, start_time, end_time)
Logger.warn(
"---------- COMPLETED Shopee PullingInRangeSKURatingTask channel_code = #{channel_code} start_time = #{start_time} end_time = #{end_time} ----------"
)
end
defp process(start_time, end_time) do
with {:ok, start_time, _} <- DateTime.from_iso8601(start_time),
{:ok, end_time, _} <- DateTime.from_iso8601(end_time) do
Utils.CacheStore.list_shopee_app_shop_erp()
|> Enum.filter(&(&1.shop.active && &1.shop.sku_rating_active))
|> Enum.each(fn %{channel: channel} = shop_data ->
Logger.warn(
"---------- STARTING Shopee PullingInRangeSKURatingTask process channel_code = #{channel.code} start_time = #{start_time} end_time = #{end_time} ----------"
)
pulling(shop_data, start_time, end_time)
Process.sleep((@delay_next_shop + Enum.random(0..10)) * 1000)
end)
end
end
defp process(channel_code, start_time, end_time) do
with {:ok, start_time, _} <- DateTime.from_iso8601(start_time),
{:ok, end_time, _} <- DateTime.from_iso8601(end_time) do
Utils.CacheStore.list_shopee_app_shop_erp()
|> Enum.find(
&(&1.shop.active && &1.channel.active && &1.shop.sku_rating_active &&
&1.shop.channel_code == channel_code)
)
|> case do
nil ->
Logger.error(
"Shopee PullingInRangeSKURatingTask cannot find channel_code = #{channel_code}"
)
%{channel: channel} = shop_data ->
Logger.warn(
"---------- STARTING Shopee PullingInRangeSKURatingTask process channel_code = #{channel.code} start_time = #{start_time} end_time = #{end_time} ----------"
)
pulling(shop_data, start_time, end_time)
end
end
end
defp pulling(shop_data, start_time, end_time) do
with :ok <-
OpolloBackgroundJob.Shopee.V2.PullingSKURatingTask.perform(
shop_data,
start_time,
end_time
) do
OpolloCore.CRM.update_sku_rating_deleted_by_platform(shop_data, start_time, end_time)
:ok
else
_ -> :error
end
end
end
defmodule OpolloBackgroundJob.Shopee.V2.PullingSKURatingTask do
import Ecto.Query, warn: false
require Logger
@customer_fields [
:id,
:code,
:full_name,
:channel_id,
:ecom_customer_id,
:channel_platform
]
def perform(%{shop: shop, channel: channel}, time_start, time_end) do
pull_range_str =
"#{Utils.DateTime.format_default(time_start)} - #{Utils.DateTime.format_default(time_end)}"
Logger.info(
"[Shopee PullingSKURatingTask] Start pulling SKU rating for shop #{shop.channel_code}: #{pull_range_str}"
)
pull_data_for_single_shop(shop, channel, %{
page_number: 1,
ctime_start: DateTime.to_unix(time_start),
ctime_end: DateTime.to_unix(time_end) - 1
})
end
def pull_data_for_single_shop(shopee_shop, channel, %{page_number: page_number} = params) do
with {:auth_info, %{"sku_rating" => %{"cookie" => cookie, "spc_cds" => spc_cds}}} <-
{:auth_info, shopee_shop.auth_info},
{:ok, %{"data" => %{"list" => sku_ratings}}} <-
CRM.Shopee.Ecommerce.get_ratings(spc_cds, cookie, params) do
handle_sku_ratings(sku_ratings, channel)
if length(sku_ratings) === CRM.Shopee.Constant.page_size() do
last_sku_ratings = List.last(sku_ratings)
new_params =
Map.merge(params, %{
page_number: page_number + 1,
from_page_number: page_number,
cursor: last_sku_ratings["comment_id"]
})
pull_data_for_single_shop(
shopee_shop,
channel,
new_params
)
else
:ok
end
else
{:auth_info, _} ->
pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params)
{:ok, %{"errcode" => 2, "message" => "token not found"}} ->
pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params)
error ->
Logger.error(
"[Shopee PullingSKURatingTask] shop #{shopee_shop.channel_code} and #{channel.id} failed: #{inspect(error)}"
)
:error
end
end
defp pull_data_for_single_shop_with_new_auth(shopee_shop, channel, params) do
# check redis
spc_cds = UUID.uuid4()
with {:ok, cookie} <-
CRM.Shopee.Auth.login(spc_cds, shopee_shop.username, shopee_shop.password),
{:ok, updated_shop} <-
OpolloCore.Shopee.update_shopee_shop(shopee_shop, %{
auth_info:
Map.merge(shopee_shop.auth_info || %{}, %{
"sku_rating" => %{
"cookie" => "#{cookie}",
"spc_cds" => "#{spc_cds}"
}
})
}) do
Utils.CacheStore.update_list_shopee_app_shop()
pull_data_for_single_shop(updated_shop, channel, params)
else
error ->
Logger.error(
"[Shopee PullingSKURatingTask] Pulling SKU rating for shop #{shopee_shop.channel_code} failed: #{inspect(error)}"
)
:error
end
end
defp get_existed_sku_ratings(pulled_sku_ratings) do
sku_rating_codes = Enum.map(pulled_sku_ratings, &generate_code(&1["comment_id"]))
from(sku_rating in OpolloCore.CRM.SKURating,
preload: [:customer],
where: sku_rating.code in ^sku_rating_codes
)
|> OpolloCore.Repo.all()
|> Enum.into(%{}, &{&1.code, &1})
end
defp get_existed_customers(pulled_sku_ratings) do
customer_codes =
Enum.map(
pulled_sku_ratings,
&generate_customer_code(&1["user_id"])
)
from(customer in OpolloCore.Onpoint.Customer,
where: customer.code in ^customer_codes
)
|> OpolloCore.Repo.all()
|> Enum.into(%{}, &{&1.code, &1})
end
defp get_existed_products(channel, pulled_sku_ratings) do
shopee_shop = Utils.CacheStore.fetch_shopee_shop_by_channel_code(channel.code)
shopee_url = Utils.RegionHelper.shopee_url(shopee_shop.region)
item_ids = Enum.map(pulled_sku_ratings, &"#{&1["product_id"]}")
from(product in OpolloCore.Shopee.ShopeeProduct,
join: shop in OpolloCore.Shopee.ShopeeShop,
on: shop.channel_code == product.channel_code,
where:
shop.channel_code == ^channel.code and
product.item_id in ^item_ids,
select: %{
item_id: product.item_id,
sku: product.sku,
product_name: product.name,
product_url:
fragment(
"CONCAT(?, '/product/', ?, '/', ?)",
type(^shopee_url, :string),
shop.shopid,
product.item_id
)
}
)
|> OpolloCore.Repo.all()
|> Enum.into(%{}, &{&1.item_id, &1})
end
defp upsert_customers(customers) do
OpolloCore.Repo.insert_all(
OpolloCore.Onpoint.Customer,
customers,
on_conflict:
{:replace,
[
:code,
:full_name,
:channel_id,
:ecom_customer_id,
:channel_platform
]},
conflict_target: [:id],
returning: true
)
end
defp upsert_sku_ratings(sku_ratings) do
OpolloCore.Repo.insert_all(
OpolloCore.CRM.SKURating,
sku_ratings,
on_conflict:
{:replace,
[
:sku_rating_date,
:content,
:status,
:platform_verify_status,
:reply,
:product_name,
:product_id,
:product_url,
:seller_sku,
:order_id,
:ecom_order_id,
:rating,
:images,
:updated_at
]},
conflict_target: [:code]
)
end
def handle_sku_ratings(pulled_sku_ratings, channel) do
existed_customers = get_existed_customers(pulled_sku_ratings)
existed_sku_ratings = get_existed_sku_ratings(pulled_sku_ratings)
existed_products = get_existed_products(channel, pulled_sku_ratings)
existed_params = %{
channel: channel,
existed_products: existed_products,
existed_customers: existed_customers,
existed_sku_ratings: existed_sku_ratings
}
pulled_data_zipped =
Enum.map(pulled_sku_ratings, &normalize_pulled_sku_rating(&1, existed_params))
sku_rating_codes_mapper =
Enum.reduce(pulled_data_zipped, %{}, fn {sku_rating, customer}, acc ->
Map.put(acc, sku_rating.code, customer.code)
end)
{sku_ratings, customers} = Enum.unzip(pulled_data_zipped)
customers = Enum.uniq_by(customers, & &1.code)
{_, customers} = upsert_customers(customers)
customer_codes_mapper =
Enum.reduce(customers, %{}, fn customer, acc ->
Map.put(acc, customer.code, customer.id)
end)
time_now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
sku_ratings =
Enum.map(sku_ratings, fn sku_rating ->
customer_code = sku_rating_codes_mapper[sku_rating.code]
customer_id = customer_codes_mapper[customer_code]
Map.merge(sku_rating, %{
customer_id: customer_id,
inserted_at: time_now,
updated_at: time_now
})
end)
upsert_sku_ratings(sku_ratings)
end
def generate_code(comment_id),
do: "SKUR-#{OpolloCore.Onpoint.PlatformShortCode.shopee()}-#{comment_id}"
def generate_customer_code(customer_id),
do: "#{OpolloCore.Onpoint.PlatformShortCode.shopee()}-#{customer_id}"
def timestamp_to_datetime(timestamp), do: DateTime.from_unix!(timestamp) |> DateTime.to_naive()
def generate_order_url(order_id), do: "https://banhang.shopee.vn/portal/sale/#{order_id}"
def generate_image(image_id), do: "https://cf.shopee.vn/file/#{image_id}_tn"
def generate_images(image_ids \\ []) do
if Enum.count(image_ids) !== 0,
do: image_ids |> Enum.map(&"https://cf.shopee.vn/file/#{&1}"),
else: nil
end
def normalize_pulled_sku_rating(pulled_sku_rating, %{
channel: channel,
existed_products: existed_products,
existed_customers: existed_customers,
existed_sku_ratings: existed_sku_ratings
}) do
%{
"images" => images,
"comment_id" => id,
"comment" => comment,
"user_id" => user_id,
"order_sn" => order_sn,
"order_id" => order_id,
"rating_star" => rating,
"ctime" => comment_time,
"user_name" => user_name,
"product_id" => product_id,
"product_cover" => product_cover
} = pulled_sku_rating
code = generate_code(id)
customer_code = generate_customer_code(user_id)
existed_sku_rating = existed_sku_ratings[code]
existed_customer = existed_customers[customer_code]
reply = if !is_nil(pulled_sku_rating["reply"]), do: pulled_sku_rating["reply"]
platform_reply = if !is_nil(reply) and reply["comment"] not in ["", nil], do: reply["comment"]
onpoint_reply =
if !is_nil(existed_sku_rating) and existed_sku_rating.reply not in ["", nil],
do: existed_sku_rating.reply
customer =
cond do
!is_nil(existed_sku_rating) ->
existed_sku_rating.customer
!is_nil(existed_customer) ->
existed_customer
true ->
%{}
end
{seller_sku, product_id, product_url, product_name} =
case existed_products[product_id] do
nil ->
{nil, nil, nil, nil}
shopee_product ->
{
shopee_product.sku,
shopee_product.item_id,
shopee_product.product_url,
shopee_product.product_name
}
end
status =
cond do
is_nil(product_id) ->
OpolloCore.CRM.SKURatingStatus.product_deleted()
!is_nil(platform_reply) ->
OpolloCore.CRM.SKURatingStatus.replied()
true ->
OpolloCore.CRM.SKURatingStatus.not_replied()
end
platform_verify_status =
cond do
!is_nil(platform_reply) ->
OpolloCore.CRM.SKURatingPlatformVerifyStatus.answered()
is_nil(platform_reply) and !is_nil(onpoint_reply) ->
OpolloCore.CRM.SKURatingPlatformVerifyStatus.failed()
true ->
nil
end
sku_rating = %{
code: code,
status: status,
content: comment,
order_id: "#{order_sn}",
rating: rating * 1.0,
channel_id: channel.id,
sku_rating_id: "#{id}",
product_id: "#{product_id}",
seller_sku: seller_sku,
product_name: product_name,
ecom_order_id: "#{order_id}",
images: generate_images(images),
reply: platform_reply || onpoint_reply,
order_url: generate_order_url("#{order_id}"),
product_image: generate_image(product_cover),
sku_rating_date: timestamp_to_datetime(comment_time),
product_url: product_url,
platform_verify_status: platform_verify_status
}
customer =
Map.merge(
Map.take(customer, @customer_fields),
%{
code: customer_code,
full_name: user_name,
channel_id: channel.id,
ecom_customer_id: "#{user_id}",
channel_platform: OpolloCore.Onpoint.PlatformCode.shopee()
}
)
{sku_rating, customer}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment