Skip to content

Instantly share code, notes, and snippets.

@elisong
Last active September 11, 2018 03:06
Show Gist options
  • Save elisong/55664bc252aef78991cbbaa48c1014a4 to your computer and use it in GitHub Desktop.
Save elisong/55664bc252aef78991cbbaa48c1014a4 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ############################################################################
# 内容: 实例(instance)元信息分区表
# 表: meta_instances
# 调度: 0 0 * * * /usr/bin/python3 odps_meta_instances.py
# 日志: odps_meta_instances.log
# ############################################################################
from multiprocessing import Pool
from datetime import datetime, timedelta, time
import re
from odps import ODPS
from odps.models import Schema, Column, Partition
today_now = datetime.now()
yesterday = today_now - timedelta(1)
yesterday_min = datetime.combine(yesterday, time.min)
yesterday_max = datetime.combine(yesterday, time.max)
o = ODPS(access_id='<your access id>',
secret_access_key='<your access key>',
project='<your project>',
endpoint='http://service.odps.aliyun.com/api',
tunnel_endpoint='http://dt.odps.aliyun.com')
class MetaInstances(object):
def __init__(self, table='meta_instances', start=yesterday_min, end=yesterday_max):
self.table = table
self.start = start
self.end = end
self.columns = [
Column(name='instance', type='string', comment='实例'),
Column(name='start_tm', type='datetime', comment='开始时间'),
Column(name='end_tm', type='datetime', comment='结束时间'),
Column(name='cost_tm', type='bigint', comment='总耗时(秒)'),
Column(name='status', type='string', comment='实例状态'),
Column(name='owner', type='string', comment='作者'),
Column(name='task', type='string', comment='子任务'),
Column(name='tbl_in', type='string', comment='输入表(以,分割)'),
Column(name='tbl_out', type='string', comment='输出表(以,分割)'), ]
self.partitions = [Partition(name='pt', type='string', comment='按日期分区')]
self.schema = Schema(columns=self.columns, partitions=self.partitions)
o.create_table(table, self.schema, if_not_exists=True)
self.partition_new = 'pt=%s' % (start.strftime('%Y%m%d'), )
t = o.get_table(table)
t.delete_partition(self.partition_new, if_exists=True)
def write_meta(self, ins):
start_time = ins.start_time + timedelta(hours=8)
end_time = ins.end_time + timedelta(hours=8)
tbl_in = set()
tbl_out = set()
task_names = ins.get_task_names()
try:
for task in task_names:
smy = ins.get_task_summary(task)
tbl_in.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1)
for key in smy['Inputs'].keys()])
tbl_out.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1)
for key in smy['Outputs'].keys()])
except:
pass
records = [ins.name, start_time, end_time, (end_time - start_time).seconds,
ins.status.value.lower(), ins.owner.split(':')[-1],
','.join(task_names), ','.join(tbl_in), ','.join(tbl_out)]
o.write_table(self.table, records, partition=self.partition_new,
create_partition=True)
def run_multiprocess(self):
instances = o.list_instances(start_time=self.start,
end_time=self.end,
only_owner=False,
status='Terminated')
pool = Pool()
for ins in instances:
pool.apply_async(self.write_meta, args=(ins, ))
pool.close()
pool.join()
with open('odps_' + self.table + '.log', 'a') as f:
f.write("execute ok from %s to %s \n"
% (today_now.strftime('%Y-%m-%d %H:%M:%S'),
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
if __name__ == '__main__':
mt = MetaInstances()
mt.run_multiprocess()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment