Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save wooparadog/88fac497cf9bf5fef3cf1e0e500eb050 to your computer and use it in GitHub Desktop.
Save wooparadog/88fac497cf9bf5fef3cf1e0e500eb050 to your computer and use it in GitHub Desktop.
import datetime
import logging
from typing import List, Optional
from orion.biz.common import UserTrafficData
from orion.biz.handlers.message.message import query_message
from orion.biz.handlers.user import get_user, update_user
from orion.biz.models.user_traffic import UserTraffic
from orion.common import Role
from orion.utils.cache import redis_cacher
from orion.utils.db import DBSession, transactional_session
from orion.utils.random import random_lower_string
logger = logging.getLogger(__name__)
def create_user_traffic(
medium: str,
source: str,
campaign: str,
created_by: int,
emoji: str,
end_transaction=True,
) -> UserTraffic:
"""
Create a new UserTraffic record.
"""
with transactional_session(
UserTraffic.GetDBSession(), end_transaction=end_transaction
) as session:
new_traffic = UserTraffic(
medium=medium,
source=source,
campaign=campaign,
emoji=emoji,
uuid=random_lower_string(5),
created_by=created_by,
)
session.add(new_traffic)
return new_traffic
def query_user_traffic(
medium: Optional[str] = None,
source: Optional[str] = None,
campaign: Optional[str] = None,
) -> UserTrafficData:
return UserTraffic.get_first_or_raise(
UserTraffic.source == source, UserTraffic.campaign == campaign
).as_data()
def get_all_user_traffic() -> List[UserTrafficData]:
"""
Retrieve all UserTraffic records.
"""
return [ut.as_data() for ut in UserTraffic.query_by()]
@redis_cacher.cache(
int(datetime.timedelta(minutes=5).total_seconds()),
)
def _get_all_user_traffic_with_cache() -> List[UserTrafficData]:
return get_all_user_traffic()
def update_user_traffic(
traffic_id: int,
updated_by: int,
medium: Optional[str] = None,
source: Optional[str] = None,
campaign: Optional[str] = None,
emoji: Optional[str] = None,
end_transaction: bool = True,
) -> UserTraffic:
"""
Update an existing UserTraffic record.
"""
with transactional_session(
UserTraffic.GetDBSession(), end_transaction=end_transaction
):
traffic = UserTraffic.get_by_id_or_raise(traffic_id)
if medium:
traffic.medium = medium # type: ignore
if source:
traffic.source = source # type: ignore
if campaign:
traffic.campaign = campaign # type: ignore
if emoji:
traffic.emoji = emoji # type: ignore
traffic.created_by = updated_by # type: ignore
return traffic
def detect_user_traffic(user_id: int, end_transaction: bool = True):
first_messages, _ = query_message(
user_id=user_id, role=Role.USER, page_size=1, valid_only=False
)
if not first_messages:
return
first_message = first_messages[0]
logger.info(f"Try to detect where the user come from: {user_id}")
all_uts = _get_all_user_traffic_with_cache()
for ut in sorted(all_uts, key=lambda x: -x.id):
if ut.emoji in first_message.content or ut.uuid in first_message.content:
with transactional_session(DBSession, end_transaction=end_transaction):
user = get_user(user_id)
if not user:
logger.error(f"Failed to detect user: {user_id}")
return
user_data = user.as_data()
user_data.settings.properties["utm"] = str(ut.id)
update_user(user_id, user_settings=user_data.settings)
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment