Last active
June 30, 2018 06:00
-
-
Save qzane/d50f2002c48c7b59a9e2e47af43fcdea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os,sys | |
import time | |
import random | |
import numpy as np | |
from sklearn.svm import SVR,SVC | |
from PIL import Image,ImageDraw | |
from skimage.morphology import remove_small_objects,binary_dilation,disk | |
from skimage import filters, feature | |
CMD_TAP = '''adb shell " | |
sendevent /dev/input/event1 3 57 0 | |
sendevent /dev/input/event1 3 53 %d | |
sendevent /dev/input/event1 3 54 %d | |
sendevent /dev/input/event1 1 330 1 | |
sendevent /dev/input/event1 0 0 0 | |
sendevent /dev/input/event1 3 57 -1 | |
sendevent /dev/input/event1 1 330 0 | |
sendevent /dev/input/event1 0 0 0 | |
"''' | |
PATH = os.path.split(os.path.realpath('touch.py'))[0] | |
print("Script path:", PATH) | |
SAFE = (20, 1200) | |
import subprocess | |
import time | |
import numpy as np | |
import io | |
from threading import Thread | |
try: | |
from Queue import Queue | |
except ImportError: | |
from queue import Queue # python 3.x | |
''' | |
from multiprocessing import Queue | |
from multiprocessing import Process as Thread | |
''' | |
def record_sub(queue, queue_kill): | |
''' sub process for event recording ''' | |
print(os.getpid(),'pid') | |
begin = time.time() | |
fp = subprocess.Popen(['adb','exec-out','getevent','-q'],stdout=subprocess.PIPE) | |
stdout = fp.stdout | |
while(1): | |
if not queue_kill.empty(): | |
print('end') | |
fp.kill() | |
fp.wait() | |
return | |
try: | |
res = stdout.readline().decode('utf-8') | |
res = res.strip().split(' ')[-3:] | |
now = int(1000*(time.time()-begin)) | |
res = [int(i, 16) for i in res] | |
res.append(now) | |
print(res) | |
queue.put(res) | |
except: | |
pass | |
def record_thread(): | |
queue = Queue() | |
queue_kill = Queue() | |
sub = Thread(target=record_sub, args=(queue, queue_kill)) | |
sub.start() | |
return queue, queue_kill | |
def record_thread_end(queue, queue_kill): | |
res = [] | |
while(not queue.empty()): | |
res.append(queue.get(False)) | |
queue_kill.put('0') | |
return res | |
def record_to_shell(res): | |
''' convert record to shell commands ''' | |
# sleep 0 takes 0.02s, so ignore any gap smaller than 10ms | |
tmp0 = 'sleep %.03f\n' | |
tmp1 = 'sendevent /dev/input/event1 %d %d %d\n' | |
cmd = '' | |
past = 0 | |
for i in res: | |
if i[3] - past > 20: | |
cmd += tmp0%((i[3]-past)/1000) | |
past = i[3] | |
cmd += tmp1%(i[0],i[1],i[2]) | |
return cmd | |
def record_replay(cmd, rm=True): | |
''' replay using shell commands ''' | |
fname = "cmd_%d_%04d.sh"%(time.time(),random.randint(0,9999)) | |
fpath1 = os.path.abspath(fname) | |
fpath2 = '/sdcard/'+fname | |
if rm: | |
cmd += 'rm '+fpath2+'\n' | |
cmd = cmd.encode('utf-8') | |
with open(fpath1,'wb') as f: | |
f.write(cmd) | |
tmp = os.popen('adb push %s %s'%(fpath1, fpath2)) | |
tmp.read() | |
if rm: | |
os.remove(fpath1) | |
tmp = os.popen('adb shell "nohup sh %s"'%fpath2) | |
tmp.read() | |
def disp(im): | |
if isinstance(im, np.ndarray): | |
im = im.astype('float32') | |
im = im - im.min() | |
im = im * 250 / im.max() | |
im = im.astype('uint8') | |
Image.fromarray(im).show() | |
def screencap(convert=True): | |
'''takes about 1000 ms''' | |
data = subprocess.Popen(['adb','exec-out','screencap','-p'],stdout=subprocess.PIPE).stdout.read() | |
fp = io.BytesIO(data) | |
im = Image.open(fp) | |
if convert: | |
im = im.convert('L').resize((270,480,), Image.ANTIALIAS) | |
im = np.array(im) | |
return im | |
def save_screen(): | |
path = PATH | |
path = os.path.join(path, 'temps') | |
if not os.path.isdir(path): | |
os.mkdir(path) | |
im = screencap(False) | |
im.save(os.path.join(path, "p%d.png"%time.time())) | |
def read_temp(fname): | |
im = Image.open(fname) | |
im = im.convert('L') | |
im = im.resize((im.size[0]//4, im.size[1]//4), Image.ANTIALIAS) | |
da = np.array(im) | |
tmp = os.path.split(fname)[-1] | |
tmp = tmp.split('.') | |
name = tmp[0] | |
threshold = 0.6 | |
try: | |
if tmp[1][:2] == 'ts': | |
threshold = int(tmp[1][2:]) / 100.0 | |
assert(threshold<=1.0 and threshold >= 0.0) | |
except: | |
pass | |
return da, name, threshold | |
def read_temps(): | |
res = {} | |
path = os.path.join(PATH, 'temp') | |
for f in os.listdir(path): | |
if not (f.endswith('.png') or f.endswith('.jpg')): | |
continue | |
da, name, threshold = read_temp(os.path.join(path, f)) | |
res[name] = [da, threshold] | |
return res | |
TEMPS = None | |
def matchTemps(img, name): | |
global TEMPS | |
if TEMPS is None: | |
TEMPS = read_temps() | |
if not name in TEMPS: | |
return None | |
target, threshold = TEMPS[name] | |
match = feature.match_template(img, target, pad_input=True) | |
if match.max() < threshold: | |
return None | |
else: | |
match = np.where(match > threshold) | |
return (int(match[0].mean()), int(match[1].mean())) | |
def tap(x,y, fuzzy=True): | |
if fuzzy: | |
x,y = x+random.random()*20-10, y+random.random()*20-10 | |
tmp = os.popen(CMD_TAP%(int(x),int(y))) | |
tmp.read() | |
def click_safe(pos=None): | |
tap(SAFE[0], SAFE[1]) | |
def click_point(pos): | |
tap(pos[1]<<2, pos[0]<<2) | |
def click_sp_map(pos): | |
click_point(pos) | |
time.sleep(1) | |
tap(500,650) | |
def click_game_over(pos=None): | |
click_safe(pos) | |
time.sleep(1) | |
save_screen() | |
def click_continue(pos=None): | |
for i in range(7): | |
click_safe(pos) | |
time.sleep(0.3) | |
def click_fight(pos): | |
click_point(pos) | |
click_continue(pos) | |
def click_win(pos): | |
click_safe(pos) | |
time.sleep(1) | |
def get_feather(): | |
men = [[90,1850], [500,610], [670,755], [315, 910]] | |
for man in men: | |
tap(man[0], man[1]) | |
click_continue((1,1)) | |
pos = matchTemps(screencap(), 'close') | |
if pos: | |
click_point(pos) | |
time.sleep(1) | |
time.sleep(random.random()*60) | |
def click_very_hard(pos): | |
if random.random() > 0.03: | |
click_point(pos) | |
else: | |
print('get feather') | |
get_feather() | |
def click_auto(pos): | |
'''prepare to auto play''' | |
img = screencap() | |
pos1 = matchTemps(img, 'auto_bright') | |
pos2 = matchTemps(img, 'reposition') | |
if not (pos1 and pos2): # aotu too many mismatches | |
print('mis match') | |
return | |
tap(260,580) | |
time.sleep(0.03) | |
tap(260,940) | |
time.sleep(0.03) | |
tap(260,940) | |
time.sleep(0.03) | |
tap(430,580) | |
time.sleep(0.03) | |
tap(100,580) | |
time.sleep(0.03) | |
tap(100,580) | |
time.sleep(0.03) | |
tap(620,580) | |
time.sleep(0.03) | |
tap(260,580) | |
time.sleep(0.03) | |
tap(260,580) | |
time.sleep(0.03) | |
time.sleep(0.03) | |
#click_point(pos) | |
#click_auto and fight | |
tap(850,1800) | |
time.sleep(0.3) | |
tap(500,960) | |
def play_with_temps(): | |
temps = ['auto_bright', 'auto_dark', 'auto_fight', 'continue', 'close', 'back', | |
'close-en', 'fight', 'game_over', 'sp_map', 'to_fight', 'very_hard', | |
'win', 'end_auto'] | |
solution1 = {'win':click_win, | |
'game_over':click_game_over, | |
'close':click_point, | |
'very_hard':click_point, | |
'sp_map':click_sp_map, | |
'fight':click_fight, | |
'auto_fight':click_point, | |
'continue':click_continue, | |
'end_auto':click_continue | |
} | |
solution2 = {'to_fight':click_point, 'back':click_point, 'reposition':click_auto} | |
solution3 = {}#{'auto_bright':click_auto}# todo: fix this when partially moved | |
solutions = [solution1, solution2, solution3] | |
while(1): | |
img = screencap() | |
solve = False | |
for solution in solutions: | |
for key in solution: | |
pos = matchTemps(img, key) | |
if pos: | |
''' | |
if key == 'auto_bright': | |
tmp = Image.fromarray(img) | |
tmp.save(os.path.join(PATH, 'temps', 'p%d.png'%time.time())) | |
''' | |
print(key) | |
solution[key](pos) | |
solve = True | |
time.sleep(0.5) | |
break | |
if solve: | |
break | |
if not solve: | |
print('not solve') | |
click_safe((1,1)) | |
# 1080,1920 | |
def auto_play(): | |
''' ... ''' | |
import time | |
import sys | |
times = 0 | |
begin = time.time() | |
while 1: | |
tap(540,900) | |
time.sleep(0.5) | |
tap(540,1200) | |
time.sleep(0.5) | |
tap(540,900) | |
for i in range(32): | |
tap(540,900) | |
time.sleep(0.2) | |
time.sleep(0.5) | |
tap(260,580) | |
time.sleep(0.03) | |
tap(260,940) | |
time.sleep(0.03) | |
tap(260,940) | |
time.sleep(0.03) | |
tap(430,580) | |
time.sleep(0.03) | |
tap(100,580) | |
time.sleep(0.03) | |
tap(100,580) | |
time.sleep(0.03) | |
tap(620,580) | |
time.sleep(0.03) | |
tap(260,580) | |
time.sleep(0.03) | |
tap(260,580) | |
time.sleep(0.03) | |
time.sleep(0.03) | |
tap(850,1800) | |
time.sleep(0.3) | |
tap(500,960) | |
time.sleep(9.5+random.random()*5) | |
if(random.random()<0.019): | |
print('sleep inside') | |
time.sleep(random.random()*50) | |
tap(500,960) | |
times += 1 | |
past = (time.time()-begin)/60.0 | |
print(times, times*5, times*5*3, "%.1f"%past, "%.1f"%(times*5/past)) | |
sys.stdout.flush() | |
time.sleep(3.5+random.random()*3) | |
if time.localtime()[3] == 2 and time.localtime()[4]>50: | |
os._exit(0) | |
if(random.random()<0.013): | |
print('sleep outside') | |
time.sleep(random.random()*35) | |
if __name__ == '__main__': | |
play_with_temps() | |
#auto_play() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment