Last active
September 11, 2018 03:06
-
-
Save elisong/55664bc252aef78991cbbaa48c1014a4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# ############################################################################ | |
# 内容: 实例(instance)元信息分区表 | |
# 表: meta_instances | |
# 调度: 0 0 * * * /usr/bin/python3 odps_meta_instances.py | |
# 日志: odps_meta_instances.log | |
# ############################################################################ | |
from multiprocessing import Pool | |
from datetime import datetime, timedelta, time | |
import re | |
from odps import ODPS | |
from odps.models import Schema, Column, Partition | |
today_now = datetime.now() | |
yesterday = today_now - timedelta(1) | |
yesterday_min = datetime.combine(yesterday, time.min) | |
yesterday_max = datetime.combine(yesterday, time.max) | |
o = ODPS(access_id='<your access id>', | |
secret_access_key='<your access key>', | |
project='<your project>', | |
endpoint='http://service.odps.aliyun.com/api', | |
tunnel_endpoint='http://dt.odps.aliyun.com') | |
class MetaInstances(object): | |
def __init__(self, table='meta_instances', start=yesterday_min, end=yesterday_max): | |
self.table = table | |
self.start = start | |
self.end = end | |
self.columns = [ | |
Column(name='instance', type='string', comment='实例'), | |
Column(name='start_tm', type='datetime', comment='开始时间'), | |
Column(name='end_tm', type='datetime', comment='结束时间'), | |
Column(name='cost_tm', type='bigint', comment='总耗时(秒)'), | |
Column(name='status', type='string', comment='实例状态'), | |
Column(name='owner', type='string', comment='作者'), | |
Column(name='task', type='string', comment='子任务'), | |
Column(name='tbl_in', type='string', comment='输入表(以,分割)'), | |
Column(name='tbl_out', type='string', comment='输出表(以,分割)'), ] | |
self.partitions = [Partition(name='pt', type='string', comment='按日期分区')] | |
self.schema = Schema(columns=self.columns, partitions=self.partitions) | |
o.create_table(table, self.schema, if_not_exists=True) | |
self.partition_new = 'pt=%s' % (start.strftime('%Y%m%d'), ) | |
t = o.get_table(table) | |
t.delete_partition(self.partition_new, if_exists=True) | |
def write_meta(self, ins): | |
start_time = ins.start_time + timedelta(hours=8) | |
end_time = ins.end_time + timedelta(hours=8) | |
tbl_in = set() | |
tbl_out = set() | |
task_names = ins.get_task_names() | |
try: | |
for task in task_names: | |
smy = ins.get_task_summary(task) | |
tbl_in.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) | |
for key in smy['Inputs'].keys()]) | |
tbl_out.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) | |
for key in smy['Outputs'].keys()]) | |
except: | |
pass | |
records = [ins.name, start_time, end_time, (end_time - start_time).seconds, | |
ins.status.value.lower(), ins.owner.split(':')[-1], | |
','.join(task_names), ','.join(tbl_in), ','.join(tbl_out)] | |
o.write_table(self.table, records, partition=self.partition_new, | |
create_partition=True) | |
def run_multiprocess(self): | |
instances = o.list_instances(start_time=self.start, | |
end_time=self.end, | |
only_owner=False, | |
status='Terminated') | |
pool = Pool() | |
for ins in instances: | |
pool.apply_async(self.write_meta, args=(ins, )) | |
pool.close() | |
pool.join() | |
with open('odps_' + self.table + '.log', 'a') as f: | |
f.write("execute ok from %s to %s \n" | |
% (today_now.strftime('%Y-%m-%d %H:%M:%S'), | |
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) | |
if __name__ == '__main__': | |
mt = MetaInstances() | |
mt.run_multiprocess() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment