Created
November 8, 2023 08:16
-
-
Save mrchipset/30e45cfbaf80ce25da0ed4e305041fab to your computer and use it in GitHub Desktop.
雨云代理价格8折自动检查创建脚本
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import http.client | |
import json | |
API_KEY = 'YOUR RAINYUN APIKEY' # Please change this API_KEY to your own APIKEY | |
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36' | |
logger = logging.getLogger("check_agent_price") | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# add a handle | |
file_handler = logging.FileHandler('/var/log/check_agent_price.log') | |
file_handler.setLevel(logging.INFO) | |
# add a formatter | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
stream_handler = logging.StreamHandler() | |
stream_handler.setLevel(logging.INFO) | |
stream_handler.setFormatter(formatter) | |
logger.addHandler(stream_handler) | |
def get_rules(): | |
logger.info('Try to get agent price list.') | |
conn = http.client.HTTPSConnection("api.v2.rainyun.com") | |
payload = '' | |
headers = { | |
'x-api-key': API_KEY, | |
'User-Agent': USER_AGENT | |
} | |
conn.request("GET", "/user/vip/agent_price?options=%7B%22columnFilters%22:%7B%22id%22:%22%22%7D,%22sort%22:%5B%5D,%22page%22:1,%22perPage%22:20%7D", payload, headers) | |
res = conn.getresponse() | |
data = res.read().decode('utf-8') | |
jobj = json.loads(data) | |
if 'code' in jobj: | |
if jobj['code'] == 200: | |
logger.info('Get agent price list successfully.') | |
return jobj['data']['Records'] | |
else: | |
logger.info('Get agent price error.') | |
return None | |
else: | |
logger.info('Get agent price error.') | |
return None | |
def check_rule_exists(rules): | |
for rule in rules: | |
if rule['scene'] == 'create' and rule['rate'] == 0.8 and rule['priority'] == 1: | |
return True | |
return False | |
def create_rule(): | |
logger.info('Try to create agent price.') | |
conn = http.client.HTTPSConnection("api.v2.rainyun.com") | |
payload = json.dumps({ | |
"scene": "create", | |
"priority": 1, | |
"rate": 0.8 | |
}) | |
headers = { | |
'x-api-key': API_KEY, | |
'User-Agent': USER_AGENT, | |
'Content-Type': 'application/json' | |
} | |
conn.request("POST", "/user/vip/agent_price", payload, headers) | |
res = conn.getresponse() | |
data = res.read().decode('utf-8') | |
jobj = json.loads(data) | |
if 'code' in jobj: | |
if jobj['code'] == 200: | |
logger.info('Create agent price successfully.') | |
return True | |
else: | |
logger.info('Create agent price failed.') | |
return False | |
else: | |
logger.info('Create agent price failed.') | |
return False | |
def main(): | |
data = get_rules() | |
# print(data, type(data)) | |
if data is None: | |
return | |
is_existed = check_rule_exists(data) | |
if not is_existed: | |
logger.info('Agent rule is deleted, try to create.') | |
# print("Rule is deleted, try to create.") | |
res = create_rule() | |
else: | |
logger.info('Rule exists, do not change.') | |
# print("Rule exists, do not change.") | |
# Finished | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment