Skip to content

Instantly share code, notes, and snippets.

@cheadrian
Last active December 14, 2023 23:25
Show Gist options
  • Save cheadrian/4331cd8eb95ea6a7097b1830f80db781 to your computer and use it in GitHub Desktop.
Save cheadrian/4331cd8eb95ea6a7097b1830f80db781 to your computer and use it in GitHub Desktop.
AliExpress Merge Boss Bot - OpenCV, UiAutomator2

AliExpress Merge Boss Bot

Overview

This repository contains a Python script for an automation bot designed to interact with the Merge Boss game. The bot uses the uiautomator2 library and requires specific dependencies to be installed.

To run this I recommend you first to make it running on PC in order to modify the position parameters, but if you have an display with similar resolution 1080 x 2400 or aspect ratio it should work by default, so you can setup directly using Termux on your Android device.

Check this article to see demo gifs and images.

News

Now you can configure the bot ROI and parameters directly on Termux with an GUI interface.

Install the Termux:API and Termux:GUI APKs.

pip install termuxgui termux-api
wget https://gist.githubusercontent.com/cheadrian/4331cd8eb95ea6a7097b1830f80db781/raw/gui_configurator.py
python gui_configurator.py

This will generate an bot_config.json that is loaded by mergeboss_bot.py script.

Setup on PC

You need to have ADB, Tesseract and Python installed and added to your path (commands in Terminal or CMD that should work: python, adb, tesseract, pip) before you can run this script, then connect the phone to the PC using USB cable and activate Android Debugging from developers options on your Android phone.

On Terminal / CMD: adb devices

And check if you have on device listed here. pip install numpy uiautomator2 setuptools pytesseract

This should install all the necessary Python packages. python -m uiautomator2 init

Now it will install the ATX app on your phone, you should open it and make sure both AtxAgent and UIAutomator are Running. Sadly, it is in Chinese, but you can find an image of how it should look here.

Now you can run the script on PC and check for annotations: python mergeboss_bot.py

Setup on Termux

Before running the script, ensure that your Termux environment is properly configured. Use the following commands to update packages and install necessary dependencies:

pkg update
pkg install python cmake ninja opencv-python libxml2 libxslt tesseract nano

Set environment variables:

export CFLAGS="-Wno-error=incompatible-function-pointer-types"

Install Python packages:

pip install numpy uiautomator2 setuptools pytesseract

Configuration

You can skip adjusting these parameters if your display has the same aspect ratio or similar resolution: 1080 x 2400.

On your PC, make parameter adjustments before running the script in Termux. Check the annotations screen on PC to ensure parameters are set correct and bounding boxes on spot:

ROI_TOP, ROI_BOTTOM, ROI_PADDING, GRID_PADDING, ENG_TOP, ENG_BOTTOM, ENG_RIGHT, ENG_LEFT, GO_TOP, GO_LEFT, EX_TOP, EX_LEFT.

Adjust the following based on your game status:

IGNORED_MATCH_POSITIONS and GENERATOR_POSITIONS

You might want to configure:

AUTO_FARM_ENERGY, MAX_FARM_SESSIONS, MIN_ENERGY_LEVEL, MAX_GENERATOR_GROUP_NUMBERS.

Download your modified script to Termux and set RUN_ON_MOBILE to True:

wget https://URL_TO_YOUR_MODIFIED_SCRIPT/mergeboss_bot.py
nano mergeboss_bot.py

If you don't want to modify the script on PC, simply:

wget https://gist.githubusercontent.com/cheadrian/4331cd8eb95ea6a7097b1830f80db781/raw/mergeboss_bot.py
nano mergeboss_bot.py

Execution

Make sure inside the ATX app both AtxAgent and UIAutomator are running. If you don't have the ATX app installed, you can get both APKs from here.

Run the script:

python mergeboss_bot.py

The script is waiting 10 seconds for you to open the AliExpress Merge Boss game. It will automatically close when there's no more energy left.

If you encounter false matches, consider increasing the SIMILARITY_THRESHOLD to a value between 0.85-0.9.

import termuxgui as tg
import time
import os
import io
import cv2
import numpy as np
import json
SCREENSHOT_NAME = "screenshot.jpg"
SCREENSHOT_ANNON = "screenshot_annon.jpg"
config_path = os.path.expanduser("~/bot_config.json")
IGNORED_MATCH_POSITIONS = 10
GENERATOR_POSITIONS = [1, 2, 3, 4]
# These are for 1080x2400 and represent percentages of height or width.
ROI_TOP = 0.355 # 852px height
ROI_BOTTOM = 0.9025 # 2166px height
ROI_PADDING = 0.0287 # 31px width
# Energy left number position
ENG_TOP = 0.05 # 120px height
ENG_BOTTOM = 0.07 # 168px height
ENG_LEFT = 0.484 # 523px width
ENG_RIGHT = 0.566 # 612px width
# Energy browse deals "Go" button position
GO_TOP = 0.6065 # 1455px height
GO_LEFT = 0.276 # 298px width
# Exit "X" button from task list position
EX_TOP = 0.145 # 350px height
EX_LEFT = 0.926 # 1000px width
# Space between grid squares, px
GRID_PADDING = 7
MIN_ENERGY_LEVEL = 3
MAX_FARM_SESSIONS = 3
SIMILARITY_THRESHOLD = 0.85
MAX_GENERATOR_GROUP_NUMBERS = 2
# Check if config file exists file exists
if os.path.exists(config_path):
with open(config_path, 'r') as json_file:
loaded_data = json.load(json_file)
IGNORED_MATCH_POSITIONS = loaded_data["IGNORED_MATCH_POSITIONS"]
GENERATOR_POSITIONS = loaded_data["GENERATOR_POSITIONS"]
ROI_TOP = loaded_data["ROI_TOP"]
ROI_BOTTOM = loaded_data["ROI_BOTTOM"]
ROI_PADDING = loaded_data["ROI_PADDING"]
ENG_TOP = loaded_data["ENG_TOP"]
ENG_BOTTOM = loaded_data["ENG_BOTTOM"]
ENG_LEFT = loaded_data["ENG_LEFT"]
ENG_RIGHT = loaded_data["ENG_RIGHT"]
GO_TOP = loaded_data["GO_TOP"]
GO_LEFT = loaded_data["GO_LEFT"]
EX_TOP = loaded_data["EX_TOP"]
EX_LEFT = loaded_data["EX_LEFT"]
GRID_PADDING = loaded_data["GRID_PADDING"]
MIN_ENERGY_LEVEL = loaded_data["MIN_ENERGY_LEVEL"]
MAX_FARM_SESSIONS = loaded_data["MAX_FARM_SESSIONS"]
SIMILARITY_THRESHOLD = loaded_data["SIMILARITY_THRESHOLD"]
MAX_GENERATOR_GROUP_NUMBERS = loaded_data["MAX_GENERATOR_GROUP_NUMBERS"]
else:
print(f"The file {config_path} does not exist. Using default values.")
def annotate_image(img):
height, width, _ = img.shape
roi_min, roi_max, width_padding = int(ROI_TOP * height), int(ROI_BOTTOM * height), int(width * ROI_PADDING)
eng_top, eng_bot, eng_left, eng_right = int(ENG_TOP * height), int(ENG_BOTTOM * height), int(ENG_LEFT * width), int(ENG_RIGHT * width)
go_btn_top, go_btn_left = int(GO_TOP * height), int(GO_LEFT * width)
close_btn_top, close_btn_left = int(EX_TOP * height), int(EX_LEFT * width)
max_rows, max_col = 9, 7
padding = GRID_PADDING
square_size = (roi_max - roi_min) // max_rows
contours = []
for row in range(max_rows):
for col in range(max_col):
x = (col * square_size) + int(width_padding) + padding
y = roi_min + row * square_size + padding
contour = np.array([(x, y), (x + square_size - 2 * padding, y),
(x + square_size - 2 * padding, y + square_size - 2 * padding),
(x, y + square_size - 2 * padding)])
contours.append(contour)
cv2.rectangle(img, (width_padding, roi_min), (width - width_padding, roi_max), (0, 255, 255), 6)
cv2.rectangle(img, (eng_left, eng_top), (eng_right, eng_bot), (255, 0, 255), 6)
img = cv2.circle(img, (go_btn_left, go_btn_top), 20, (125, 25, 255) , -1)
img = cv2.circle(img, (close_btn_left, close_btn_top), 20, (125, 25, 255) , -1)
for cnt in contours:
cv2.drawContours(img, [cnt], 0, (255, 0, 0), 2)
for ig in range(IGNORED_MATCH_POSITIONS):
cv2.drawContours(img, [contours[ig]], 0, (0, 0, 255), 4)
for pos in GENERATOR_POSITIONS:
cv2.drawContours(img, [contours[pos - 1]], 0, (0, 255, 0), 8)
return img
def create_button(activity, text, layout, width = 0):
button = tg.Button(activity, text, layout)
button.settextsize(16)
button.setlinearlayoutparams(1)
if width:
button.setwidth(width)
return button
def annotate_and_load_img(screenshot_path, image_viewer):
img = cv2.imread(screenshot_path)
annotated_img = annotate_image(img)
screenshot_anon_path = os.path.expanduser(f"~/{SCREENSHOT_ANNON}")
cv2.imwrite(screenshot_anon_path, annotated_img)
with io.open(screenshot_anon_path, "rb") as f:
image = f.read()
image_viewer.setimage(image)
with tg.Connection() as connection:
activity = tg.Activity(connection)
rootLinear = tg.LinearLayout(activity)
title = tg.TextView(activity, "Merge Boss Configurator", rootLinear)
title.settextsize(24)
title.setmargin(5)
title.setlinearlayoutparams(0)
title.setheight(tg.View.WRAP_CONTENT)
scrollView = tg.NestedScrollView(activity, rootLinear)
scrollLinear = tg.LinearLayout(activity, scrollView)
subtitle = tg.TextView(activity, "Select an screenshot of the game to tune parameters like ROI, padding, generator positions for your game.", scrollLinear)
subtitle.settextsize(16)
subtitle.setmargin(5)
subtitle.setlinearlayoutparams(0)
subtitle.setheight(tg.View.WRAP_CONTENT)
screenshotLinearHorizontal = tg.LinearLayout(activity, scrollLinear, False)
screenshot_btn = create_button(activity, "Pick screenshot", screenshotLinearHorizontal)
load_screenshot_btn = create_button(activity, "Load screenshot", screenshotLinearHorizontal)
paramGridLayout = tg.GridLayout(activity, 2, 3, scrollLinear)
ingore_matches_txt = tg.TextView(activity, "Ignored positions", paramGridLayout)
ingore_matches_txt.setgridlayoutparams(0, 0)
ingore_matches_txt.setwidth(145)
generator_matches_txt = tg.TextView(activity, "Generator positions", paramGridLayout)
generator_matches_txt.setgridlayoutparams(0, 1)
generator_matches_txt.setwidth(145)
ignored_matches = tg.EditText(activity, str(IGNORED_MATCH_POSITIONS), paramGridLayout, singleline = True, inputtype = 'number')
ignored_matches.setgridlayoutparams(1, 0)
ignored_matches.setwidth(145)
generators_matches = tg.EditText(activity, ','.join(map(str, GENERATOR_POSITIONS)), paramGridLayout, singleline = True, inputtype = 'number')
generators_matches.setgridlayoutparams(1, 1)
generators_matches.setwidth(145)
set_param_btn = create_button(activity, "Set", paramGridLayout)
set_param_btn.setgridlayoutparams(0, 2, 2, 1)
energyHorizontalScroll = tg.HorizontalScrollView(activity, scrollLinear)
energyLinearHorizontal = tg.LinearLayout(activity, energyHorizontalScroll, False)
_ = create_button(activity, "ROI", energyLinearHorizontal)
top_pl_roi_btn = create_button(activity, "↑+", energyLinearHorizontal)
top_mn_roi_btn = create_button(activity, "↑-", energyLinearHorizontal)
bot_pl_roi_btn = create_button(activity, "↓+", energyLinearHorizontal)
bot_mn_roi_btn = create_button(activity, "↓-", energyLinearHorizontal)
left_pl_roi_btn = create_button(activity, "↔+", energyLinearHorizontal)
left_mn_roi_btn = create_button(activity, "↔-", energyLinearHorizontal)
grid_pl_roi_btn = create_button(activity, "G+", energyLinearHorizontal)
grid_mn_roi_btn = create_button(activity, "G-", energyLinearHorizontal)
_ = create_button(activity, "ENERGY", energyLinearHorizontal)
top_mn_eng_btn = create_button(activity, "↑", energyLinearHorizontal)
top_pl_eng_btn = create_button(activity, "↓", energyLinearHorizontal)
left_mn_eng_btn = create_button(activity, "←", energyLinearHorizontal)
left_pl_eng_btn = create_button(activity, "→", energyLinearHorizontal)
_ = create_button(activity, "GO", energyLinearHorizontal)
top_pl_go_btn = create_button(activity, "↑", energyLinearHorizontal)
top_mn_go_btn = create_button(activity, "↓", energyLinearHorizontal)
left_pl_go_btn = create_button(activity, "←", energyLinearHorizontal)
left_mn_go_btn = create_button(activity, "→", energyLinearHorizontal)
_ = create_button(activity, "X", energyLinearHorizontal)
top_pl_ex_btn = create_button(activity, "↑", energyLinearHorizontal)
top_mn_ex_btn = create_button(activity, "↓", energyLinearHorizontal)
left_pl_ex_btn = create_button(activity, "←", energyLinearHorizontal)
left_mn_ex_btn = create_button(activity, "→", energyLinearHorizontal)
_, rootHeight = rootLinear.getdimensions()
image_viewer = tg.ImageView(activity, scrollLinear)
image_viewer.setlinearlayoutparams(0)
image_viewer.setheight(rootHeight - int(rootHeight/7), True)
paramGridLayout = tg.GridLayout(activity, 4, 2, scrollLinear)
min_eng_lvl_txt = tg.TextView(activity, "Minimum energy level", paramGridLayout)
min_eng_lvl_txt.setgridlayoutparams(0, 0)
min_eng_lvl_txt.setwidth(145)
max_farm_act_txt = tg.TextView(activity, "Maximum farm actions", paramGridLayout)
max_farm_act_txt.setgridlayoutparams(0, 1)
max_farm_act_txt.setwidth(145)
min_eng_lvl = tg.EditText(activity, str(MIN_ENERGY_LEVEL), paramGridLayout, singleline = True, inputtype = 'number')
min_eng_lvl.setgridlayoutparams(1, 0)
min_eng_lvl.setwidth(145)
max_farm_act = tg.EditText(activity, str(MAX_FARM_SESSIONS), paramGridLayout, singleline = True, inputtype = 'number')
max_farm_act.setgridlayoutparams(1, 1)
max_farm_act.setwidth(145)
sim_thresh_txt = tg.TextView(activity, "Similarity threshold", paramGridLayout)
sim_thresh_txt.setgridlayoutparams(2, 0)
sim_thresh_txt.setwidth(145)
gen_min_groups_txt = tg.TextView(activity, "Press generator when minimum groups number", paramGridLayout)
gen_min_groups_txt.setgridlayoutparams(2, 1)
gen_min_groups_txt.setwidth(145)
sim_thresh = tg.EditText(activity, str(SIMILARITY_THRESHOLD), paramGridLayout, singleline = True, inputtype = 'number')
sim_thresh.setgridlayoutparams(3, 0)
sim_thresh.setwidth(145)
gen_min_groups = tg.EditText(activity, str(MAX_GENERATOR_GROUP_NUMBERS), paramGridLayout, singleline = True, inputtype = 'number')
gen_min_groups.setgridlayoutparams(3, 1)
gen_min_groups.setwidth(145)
save_btn = create_button(activity, "Save settings", scrollLinear)
screenshot_path = os.path.expanduser(f"~/{SCREENSHOT_NAME}")
for event in connection.events():
if event.type == tg.Event.click and event.value["id"] == screenshot_btn:
os.system(f"termux-storage-get {screenshot_path}")
if event.type == tg.Event.click and event.value["id"] == load_screenshot_btn:
with io.open(screenshot_path, "rb") as f:
image = f.read()
time.sleep(1)
image_viewer.setimage(image)
if event.type == tg.Event.click and event.value["id"] == top_pl_roi_btn:
ROI_TOP += 0.0015
if event.type == tg.Event.click and event.value["id"] == top_mn_roi_btn:
ROI_TOP -= 0.0015
if event.type == tg.Event.click and event.value["id"] == bot_pl_roi_btn:
ROI_BOTTOM += 0.0015
if event.type == tg.Event.click and event.value["id"] == bot_mn_roi_btn:
ROI_BOTTOM -= 0.0015
if event.type == tg.Event.click and event.value["id"] == left_pl_roi_btn:
ROI_PADDING += 0.001
if event.type == tg.Event.click and event.value["id"] == left_mn_roi_btn:
ROI_PADDING -= 0.001
if event.type == tg.Event.click and event.value["id"] == grid_pl_roi_btn:
GRID_PADDING += 1
if event.type == tg.Event.click and event.value["id"] == grid_mn_roi_btn:
GRID_PADDING -= 1
if event.type == tg.Event.click and event.value["id"] == top_pl_eng_btn:
ENG_TOP += 0.001
ENG_BOTTOM += 0.001
if event.type == tg.Event.click and event.value["id"] == top_mn_eng_btn:
ENG_TOP -= 0.001
ENG_BOTTOM -= 0.001
if event.type == tg.Event.click and event.value["id"] == left_pl_eng_btn:
ENG_LEFT += 0.001
ENG_RIGHT += 0.001
if event.type == tg.Event.click and event.value["id"] == left_mn_eng_btn:
ENG_LEFT -= 0.001
ENG_RIGHT -= 0.001
if event.type == tg.Event.click and event.value["id"] == top_mn_go_btn:
GO_TOP += 0.001
if event.type == tg.Event.click and event.value["id"] == top_pl_go_btn:
GO_TOP -= 0.001
if event.type == tg.Event.click and event.value["id"] == left_mn_go_btn:
GO_LEFT += 0.001
if event.type == tg.Event.click and event.value["id"] == left_pl_go_btn:
GO_LEFT -= 0.001
if event.type == tg.Event.click and event.value["id"] == top_mn_ex_btn:
EX_TOP += 0.001
if event.type == tg.Event.click and event.value["id"] == top_pl_ex_btn:
EX_TOP -= 0.001
if event.type == tg.Event.click and event.value["id"] == left_mn_ex_btn:
EX_LEFT += 0.001
if event.type == tg.Event.click and event.value["id"] == left_pl_ex_btn:
EX_LEFT -= 0.001
if event.type == tg.Event.click and event.value["id"] == save_btn:
variables_dict = {
"RUN_ON_MOBILE": True,
"IGNORED_MATCH_POSITIONS": IGNORED_MATCH_POSITIONS,
"GENERATOR_POSITIONS": GENERATOR_POSITIONS,
"ROI_TOP": ROI_TOP,
"ROI_BOTTOM": ROI_BOTTOM,
"ROI_PADDING": ROI_PADDING,
"ENG_TOP": ENG_TOP,
"ENG_BOTTOM": ENG_BOTTOM,
"ENG_LEFT": ENG_LEFT,
"ENG_RIGHT": ENG_RIGHT,
"GO_TOP": GO_TOP,
"GO_LEFT": GO_LEFT,
"EX_TOP": EX_TOP,
"EX_LEFT": EX_LEFT,
"GRID_PADDING": GRID_PADDING,
"MIN_ENERGY_LEVEL": MIN_ENERGY_LEVEL,
"MAX_FARM_SESSIONS": MAX_FARM_SESSIONS,
"SIMILARITY_THRESHOLD": SIMILARITY_THRESHOLD,
"MAX_GENERATOR_GROUP_NUMBERS": MAX_GENERATOR_GROUP_NUMBERS
}
json_data = json.dumps(variables_dict, indent=4)
with io.open(config_path, 'w') as json_file:
json_file.write(json_data)
time.sleep(1)
exit()
if event.type == tg.Event.click and event.value["id"] == set_param_btn:
IGNORED_MATCH_POSITIONS = int(ignored_matches.gettext())
GENERATOR_POSITIONS = [int(i) for i in generators_matches.gettext().split(',')]
if event.type == tg.Event.click and event.value["id"] != screenshot_btn and event.value["id"] != save_btn:
annotate_and_load_img(screenshot_path, image_viewer)
import cv2
import time
import uiautomator2 as u2
import numpy as np
import requests
import math
import os
import json
# If you run the script directly on mobile, set this to True to disable
# incompatible functions, like real-time image view, and configure for this
RUN_ON_MOBILE = False
# NOTE: Non-mandatory, you can skip this.
# Replace this URL with your MJPEG stream URL for screen capture
# Use ScreenStream app - info.dvkr.screenstream for the feed
# IMPORTANT: SET RESIZE IMAGE to 100% in STREAM SETTINGS !!!
stream_url = "http://192.168.0.123:8080/stream.mjpeg"
# Alternatively, you can use the built-in uiautomator2 screenshot function,
# but it is slow ~1 FPS or under,
# it will be used automatically if the script can't connect to the stream_url
# If you want to check the energy level, you need Tesseract installed and configured
# When the energy level is under 2, the game will exit
CHECK_ENERGY_LEVEL = True
# Auto-press the generator when no match is found, only if check energy level is enabled
if CHECK_ENERGY_LEVEL:
# Generator positions to press, in a list
GENERATOR_POSITIONS = [1, 2, 3, 4]
# When there's no match, generate objects from each of these generators
# Minimum energy to generate items
MIN_ENERGY_LEVEL = 3
# Get the energy from the 15 seconds product list view
AUTO_FARM_ENERGY = True and CHECK_ENERGY_LEVEL
# Only try to get energy 3 times
MAX_FARM_SESSIONS = 3
# The first 11 squares will be ignored. Adjust to your number of e.g., generators.
IGNORED_MATCH_POSITIONS = 11
# Define the similarity threshold between items
SIMILARITY_THRESHOLD = 0.85
# If there are a maximum of X matches groups left, press the generators
# You can set to 0 if you want to use the generator when there's no match
MAX_GENERATOR_GROUP_NUMBERS = 2
# NOTE: you should adjust these based on your phone display resolution.
# These are for 1080x2400 and represent percentages of height or width.
ROI_TOP = 0.355 # 852px height
ROI_BOTTOM = 0.9025 # 2166px height
ROI_PADDING = 0.0287 # 31px width
# Energy left number position
ENG_TOP = 0.05 # 120px height
ENG_BOTTOM = 0.07 # 168px height
ENG_LEFT = 0.484 # 523px width
ENG_RIGHT = 0.566 # 612px width
# Energy browse deals "Go" button position
GO_TOP = 0.6065 # 1455px height
GO_LEFT = 0.276 # 298px width
# Exit "X" button from task list position
EX_TOP = 0.145 # 350px height
EX_LEFT = 0.926 # 1000px width
# Space between grid squares, px
GRID_PADDING = 7
# Check if config file exists file exists and load the parameters
config_path = os.path.expanduser("~/bot_config.json")
if os.path.exists(config_path):
with open(config_path, 'r') as json_file:
loaded_data = json.load(json_file)
RUN_ON_MOBILE = loaded_data["RUN_ON_MOBILE"]
IGNORED_MATCH_POSITIONS = loaded_data["IGNORED_MATCH_POSITIONS"]
GENERATOR_POSITIONS = loaded_data["GENERATOR_POSITIONS"]
ROI_TOP = loaded_data["ROI_TOP"]
ROI_BOTTOM = loaded_data["ROI_BOTTOM"]
ROI_PADDING = loaded_data["ROI_PADDING"]
ENG_TOP = loaded_data["ENG_TOP"]
ENG_BOTTOM = loaded_data["ENG_BOTTOM"]
ENG_LEFT = loaded_data["ENG_LEFT"]
ENG_RIGHT = loaded_data["ENG_RIGHT"]
GO_TOP = loaded_data["GO_TOP"]
GO_LEFT = loaded_data["GO_LEFT"]
EX_TOP = loaded_data["EX_TOP"]
EX_LEFT = loaded_data["EX_LEFT"]
GRID_PADDING = loaded_data["GRID_PADDING"]
MIN_ENERGY_LEVEL = loaded_data["MIN_ENERGY_LEVEL"]
MAX_FARM_SESSIONS = loaded_data["MAX_FARM_SESSIONS"]
SIMILARITY_THRESHOLD = loaded_data["SIMILARITY_THRESHOLD"]
MAX_GENERATOR_GROUP_NUMBERS = loaded_data["MAX_GENERATOR_GROUP_NUMBERS"]
else:
print(f"The file {config_path} does not exist. Using default values.")
# Applies Sobel edge detection to highlight edges in the image, with a user-defined threshold.
def sobel_edge_detector(img, threshold=50):
grad_x = cv2.Sobel(img, cv2.CV_32F, 1, 0)
grad_y = cv2.Sobel(img, cv2.CV_32F, 0, 1)
grad = np.sqrt(grad_x**2 + grad_y**2)
grad_norm = (grad * 255 / grad.max()).astype(np.uint8)
_, binary_edge = cv2.threshold(grad_norm, threshold, 255, cv2.THRESH_BINARY)
return binary_edge
# Display the extracted images after applying apply_processing
DISPLAY_EXTRACTED_IMGS = True and not RUN_ON_MOBILE
# Display the annotated image
DISPLAY_ANNOTATED_IMGS = True and not RUN_ON_MOBILE
if CHECK_ENERGY_LEVEL:
import pytesseract
print("Make sure you have Tesseract installed on the system and added to PATH")
# Applies image processing techniques, including Gaussian blur and Adaptive Thresholding, Sobel edge detection or simple thresholding.
def apply_processing(img, block_size=7, C=5, blur_size=5, sobel=False, sob_thresh = 35, simply_thresh=False, thresh_val=200):
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blurred_image = cv2.GaussianBlur(gray, (blur_size, blur_size), 0)
if simply_thresh:
ret, thresh = cv2.threshold(blurred_image, thresh_val, 255, cv2.THRESH_BINARY_INV)
return thresh
# Extract edges using Sobel
if sobel:
sobel_edges = sobel_edge_detector(gray, sob_thresh)
return sobel_edges
# Apply adaptive thresholding
adaptive_threshold = cv2.adaptiveThreshold(
blurred_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, block_size, C
)
adaptive_threshold = cv2.bitwise_not(adaptive_threshold)
return adaptive_threshold
# Creates a grid of rectangular contours within the specified region of interest (ROI) with padding.
def generate_grid_contours(img, roi, padding=GRID_PADDING):
height, width, _ = img.shape
roi_min, roi_max, width_padding = roi
max_rows, max_col = 9, 7
square_size = (roi_max - roi_min) // max_rows
contours = [] # List to store contours
# Draw rectangles mesh and find contours with padding
for row in range(max_rows):
for col in range(max_col):
x = (col * square_size) + int(width_padding) + padding
y = roi_min + row * square_size + padding
contour = np.array([(x, y), (x + square_size - 2 * padding, y),
(x + square_size - 2 * padding, y + square_size - 2 * padding),
(x, y + square_size - 2 * padding)])
contours.append(contour)
# Convert the list of contours to array format
contours = np.array(contours)
return contours
# Extracts images from a list of contours. Applies image processing (Sobel edge detection) and morphological dilation for better blob extraction.
img_dilation_kernel = np.ones((7, 7), np.uint8)
def extract_imgs_from_contours(img, contours, apply_process=True):
imgs_list = []
if apply_process:
proc_img = apply_processing(img, sobel=True)
# Apply morphological dilation to the processed image
proc_img = cv2.dilate(proc_img, img_dilation_kernel, iterations=1)
else:
proc_img = img
for contour in contours:
# Extract blobs from each contour using the adaptive threshold function
x, y, w, h = cv2.boundingRect(contour)
cropped_img = proc_img[y:y+h, x:x+w]
imgs_list.append(cropped_img)
return imgs_list
# Determines if an image is blank based on the number of non-zero pixels, using a specified threshold.
def is_blank_img(img, threshold_pixels = 200):
# Count the number of non-zero pixels
non_zero_count = np.sum(img == 255)
# Check if the count is below the threshold
return non_zero_count < threshold_pixels
# Compares two images by finding differing pixels and calculates a normalized similarity metric.
def compare_imgs(img1, img2):
height, width = img1.shape
# Find pixels that differ between the two images
diff_img = np.bitwise_xor(img1, img2)
# Count the number of white pixels
diff_pixels_cnt = np.count_nonzero(diff_img)
normalized_similarity = 1 - (diff_pixels_cnt / (height * width))
return normalized_similarity
# Groups similar images based on the specified similarity threshold, ignoring blank and ignored positions. Returns a list of grouped items.
def group_similar_imgs(imgs, compare_threshold=0.8):
grouped_items = []
visited = set()
for i, img1 in enumerate(imgs):
if is_blank_img(img1) or i < IGNORED_MATCH_POSITIONS:
visited.add(i)
continue
if i not in visited:
group = [i]
found_match = False # Flag to check if any similar blob is found
for j, img2 in enumerate(imgs):
if is_blank_img(img2) or j < IGNORED_MATCH_POSITIONS:
visited.add(j)
continue
if i != j and j not in visited:
similarity = compare_imgs(img1, img2)
if similarity > compare_threshold:
group.append(j)
visited.add(j)
found_match = True
# Add the group only if a match was found
if found_match:
grouped_items.append(group)
for index in group:
visited.add(index)
return grouped_items
# Annotates an image with marked regions of interest (ROI), ignored contours, and marked contours within groups.
# Contours are drawn with different colors and labeled with their respective group IDs.
def annotate_image(img, contours, groups, roi):
height, width, _ = img.shape
# Unpack region of interest
roi_min, roi_max, width_padding = roi
# Mark ROI on image
cv2.rectangle(img, (0, roi_min), (width, roi_max), (0, 255, 255), 6)
# Draw ignored contours
for ig in range(IGNORED_MATCH_POSITIONS):
cv2.drawContours(img, [contours[ig]], 0, (0, 0, 255), 4)
if CHECK_ENERGY_LEVEL:
for pos in GENERATOR_POSITIONS:
cv2.drawContours(img, [contours[pos - 1]], 0, (0, 160, 255), 6)
# Mark contours and groups on image
for group_id, contour_indices in enumerate(groups):
color = (group_id * 30) % 255
for index in contour_indices:
contour = contours[index]
cv2.drawContours(img, [contour], 0, (color, 127, 50), 3)
cv2.putText(img, str(group_id), tuple(contour[0]), cv2.FONT_HERSHEY_SIMPLEX, 2, (color, 127, 50), 4)
return img
# Swipes through elements within groups on a device, avoiding repeated swiping of positions.
def swipe_elements(device, contours, groups, roi):
roi_min, roi_max, width_padding = roi
already_swiped_positions = set()
for group_id, contour_indices in enumerate(groups):
for i in range(len(contour_indices) - 1):
index1 = contour_indices[i]
index2 = contour_indices[i + 1]
contour1 = contours[index1]
contour2 = contours[index2]
x1, y1, w1, h1 = cv2.boundingRect(contour1)
x2, y2, w2, h2 = cv2.boundingRect(contour2)
# Check if the positions have already been swiped
position1 = (x1 + w1 // 2, y1 + h1 // 2)
position2 = (x2 + w2 // 2, y2 + h2 // 2)
if position1 in already_swiped_positions or position2 in already_swiped_positions:
continue # Skip if either position has been swiped
# Swipe from the center of the first contour to the center of the second contour
device.swipe(x1 + w1 // 2, y1 + h1 // 2, x2 + w2 // 2, y2 + h2 // 2, 0.05)
# Update the set of already swiped positions
already_swiped_positions.update([position1, position2])
if len(groups) > 0:
# Touch the first element of the list after merge, because of contouring
x1, y1, _, _= cv2.boundingRect(contours[0])
device.click(x1, y1)
# Generates objects by clicking on specified generator positions.
def generate_objects(device, contours, img):
energy_left = get_energy_level(img)
for pos in GENERATOR_POSITIONS:
x, y, _, _ = cv2.boundingRect(contours[pos])
if energy_left <= MIN_ENERGY_LEVEL:
print("No energy left")
return False
device.click(x, y)
device.click(x, y)
energy_left = energy_left - 1
return True
# Resizes input image based on the specified max height.
def resize_image(image, max_height=720):
# Get the original dimensions of the image
if len(image.shape) == 3:
original_height, original_width, _ = image.shape
else:
original_height, original_width = image.shape
# Calculate the scaling factor to maintain aspect ratio
scale_factor = max_height / original_height
# Calculate the new dimensions
new_height = int(original_height * scale_factor)
new_width = int(original_width * scale_factor)
# Resize the image
resized_image = cv2.resize(image, (new_width, new_height))
return resized_image
# Reads the screen content using MJPEG streaming. If streaming fails, falls back to the uiautomator2's screenshot method.
using_streaming = True
def screen_stream_read(device):
global using_streaming
if(using_streaming):
try:
r = requests.get(stream_url, stream=True)
if(r.status_code == 200):
bytes = b''
for chunk in r.iter_content(chunk_size=1024):
bytes += chunk
a = bytes.find(b'\xff\xd8')
b = bytes.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = bytes[a:b+2]
bytes = bytes[b+2:]
img = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
return img
else:
print("Received unexpected status code {}".format(r.status_code))
except:
print("Could not connect to the screen streaming service - start the ScreenStream app service!")
print("Using low FPS screenshot method from uiautomator2")
using_streaming = False
return device.screenshot(format='opencv')
else:
return device.screenshot(format='opencv')
# Extracts and returns the energy level from a given image. Utilizes Tesseract OCR for text extraction.
def get_energy_level(img):
# Note: you should have Tesseract installed and set in path to use this function
height, width, _ = img.shape
x, y, x1, y1 = int(width * ENG_LEFT), int(height * ENG_TOP), int(width * ENG_RIGHT), int(height * ENG_BOTTOM)
cropped = img[y:y1, x:x1]
# Preprocess the cropped image for better text recognition
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
# # Apply thresholding to enhance text visibility
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
cv2.floodFill(thresh, None, (5, 5), 0, flags=8)
# Use pytesseract to extract numeric text from the preprocessed image
custom_config = r'--oem 3 --psm 7 outputbase digits' # Tesseract OCR configuration for numeric digits
text = pytesseract.image_to_string(thresh, config=custom_config)
if DISPLAY_EXTRACTED_IMGS:
cv2.imshow("Extracted energy", thresh)
print("Energy: ", text.strip())
cv2.waitKey(5)
try:
return int(text.strip())
except ValueError:
print("Could not detect energy level, assuming 100, text: ", text.strip())
return 100
# Automatically farm energy from tasks
def farm_energy(img, device):
height, width, _ = img.shape
# Open tasks menu
device.click(width * ENG_LEFT, height * ENG_TOP)
time.sleep(2)
for i in range(MAX_FARM_SESSIONS):
# Hit the "Go" button and wait for X seconds
device.click(width * GO_LEFT, height * GO_TOP)
time.sleep(16)
device.press("back")
time.sleep(2)
# Hit the "Claim" button
device.click(width * GO_LEFT, height * GO_TOP)
time.sleep(3)
# Exit task menu
device.click(width * EX_LEFT, height * EX_TOP)
time.sleep(1)
# Combines a list of binary images into a grid with the specified number of columns and rows, just for debugging display.
def combine_binary_images(extracted_imgs, columns=7, rows=9):
# Ensure that the number of images is consistent with the specified grid size
if len(extracted_imgs) != (columns * rows):
raise ValueError(f"Number of images ({len(extracted_imgs)}) is not compatible with the grid size ({columns}x{rows}).")
# Resize images to have the same height (assuming they have the same width)
height = extracted_imgs[0].shape[0]
resized_imgs = [cv2.resize(img, (height, height)) for img in extracted_imgs]
# Combine images into a grid
combined_img = np.vstack([np.hstack(resized_imgs[i:i+columns]) for i in range(0, len(resized_imgs), columns)])
return combined_img
# Checks if the current app running on the device is Aliexpress and the screen is on.
def check_if_ali_app(device):
pkg_name = device.info.get("currentPackageName")
screenOn = device.info.get("screenOn")
if("aliexpress" in pkg_name and screenOn):
return True
return False
# Waits for the Aliexpress app to be opened on the device.
def wait_for_ali_app(device):
if(check_if_ali_app(device)):
print("Aliexpress app is running")
return
else:
print("Please open Aliexpress Merge Boss game. Waiting 10 seconds.")
time.sleep(10)
wait_for_ali_app(device)
def main():
print("Don't forget to open the ATX app, and ensure both AtxAgent and UIAutomator are running!\n")
time.sleep(1)
# Connect to the Android device
if RUN_ON_MOBILE:
print("Running on mobile set to true")
device = u2.connect('127.0.0.1')
else:
device = u2.connect()
print("Checking if the Aliexpress app is running")
wait_for_ali_app(device)
img = screen_stream_read(device)
# Only try to merge objects with a similarity above this threshold
height, width, _ = img.shape
# Define the region of interest for duplicate findings
# Top, bottom, left, right padding
roi = int(ROI_TOP * height), int(ROI_BOTTOM * height), width * ROI_PADDING
# Generate ROI grid contours
grid_contours = generate_grid_contours(img, roi)
# Remember the energy farm status
farm_the_energy = AUTO_FARM_ENERGY
while True:
# Read the screenshot in memory
img = screen_stream_read(device)
if not check_if_ali_app(device):
print("Aliexpress app is not running anymore")
break
extracted_imgs = extract_imgs_from_contours(img, grid_contours, apply_process=True)
if DISPLAY_EXTRACTED_IMGS:
display_extracted_img = combine_binary_images(extracted_imgs)
res_display_extracted_img = resize_image(display_extracted_img)
cv2.imshow("Extracted images", res_display_extracted_img)
grouped_items = group_similar_imgs(extracted_imgs, SIMILARITY_THRESHOLD)
# Check the energy left and matches
if CHECK_ENERGY_LEVEL and len(grouped_items) <= MAX_GENERATOR_GROUP_NUMBERS:
if generate_objects(device, grid_contours, img) == False and len(grouped_items) == 0:
print("No group found.")
if farm_the_energy:
print("Starting to farm energy.")
farm_energy(img, device)
print("Finish farming.")
farm_the_energy = False
else:
print("No energy to farm. Exit.")
break
if DISPLAY_ANNOTATED_IMGS:
annotated_img = annotate_image(img, grid_contours, grouped_items, roi)
# Resize image for display
res_annotated_img = resize_image(annotated_img)
# Display the screenshot with annotations
cv2.imshow("Display annotations", res_annotated_img)
if cv2.waitKey(20) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
# Swipe duplicates one over another
swipe_elements(device, grid_contours, grouped_items, roi)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment