Created
December 22, 2019 04:09
-
-
Save Gobi03/f85b6bd4b89c40880d85ba01221161eb to your computer and use it in GitHub Desktop.
天下一GameBattleContest(β)のコード https://tenka1.klab.jp/2019-obt/problem_re31z0.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
辺をランダムに取得し、点数を計算するサンプルプログラムです。 | |
実行には python3 環境が必要です。 | |
TOKEN 変数を書き換えて実行してください。 | |
""" | |
import heapq | |
import queue | |
import os | |
import random | |
from typing import NamedTuple, List, Dict | |
from urllib.request import urlopen | |
import time | |
# ゲームサーバのアドレス | |
GAME_SERVER = os.getenv('GAME_SERVER', 'https://obt.tenka1.klab.jp') | |
# あなたのトークン | |
TOKEN = "XXXXXXXXXXX" | |
# 頂点と辺の数 | |
NUM_NODES = 400 | |
NUM_EDGES = 760 | |
def call_api(x): | |
with urlopen(f'{GAME_SERVER}{x}') as res: | |
return res.read().decode() | |
# ユーザ定義関数 | |
# coord1 => coord2 への移動方向 | |
# マンハッタン距離1である前提 | |
def to_dir(coord1, coord2): # (int, int), (int, int) | |
(x1, y1) = coord1 | |
(x2, y2) = coord2 | |
if x1 < x2: | |
return "R" | |
elif x1 > x2: | |
return "L" | |
elif y1 < y2: | |
return "D" | |
elif y1 > y2: | |
return "U" | |
def coords_to_edge(coord1, coord2): # (int, int), (int, int) | |
(x1, y1) = min(coord1, coord2) | |
(x2, y2) = max(coord1, coord2) | |
todir = to_dir((x1, y1), (x2, y2)) | |
if todir == "R": | |
return 39 * y1 + x1 | |
elif todir == "D": | |
return 39 * y1 + x1 + 19 | |
def node_to_coord(node: int): | |
return (node % 20, node // 20) | |
# マンハッタン距離を求める | |
def get_dist(coord1, coord2): # (int, int), (int, int) | |
(x1, y1) = coord1 | |
(x2, y2) = coord2 | |
return abs(x2-x1) + abs(y2-y1) | |
def get_roots(coord1, coord2): # (int, int), (int, int) => int list | |
res = [] | |
(x1, y1) = min(coord1, coord2) | |
(x2, y2) = max(coord1, coord2) | |
for x in range(x1, x2): | |
res.append(coords_to_edge((x, y1), (x+1, y1))) | |
for y in range(min(y1, y2), max(y1, y2)): | |
res.append(coords_to_edge((x2, y), (x2, y+1))) | |
return res | |
def get_roots2(coord1, coord2): # (int, int), (int, int) => int list | |
res = [] | |
(x1, y1) = min(coord1, coord2) | |
(x2, y2) = max(coord1, coord2) | |
for y in range(min(y1, y2), max(y1, y2)): | |
res.append(coords_to_edge((x1, y), (x1, y+1))) | |
for x in range(x1, x2): | |
res.append(coords_to_edge((x, y2), (x+1, y2))) | |
return res | |
# main | |
def main(): | |
pq = [] # dist, s: int, g: int | |
ops_queue = queue.Queue() | |
query_num = 0 | |
# GET /api/game | |
game = call_api('/api/game') | |
game_id, remining_ms = list(map(int, game.split())) | |
print(f"\n\nゲームID {game_id}") | |
print(f"残り時間 {remining_ms / 1000}s") | |
claim_counts = [0]*NUM_EDGES | |
my_claims = [0]*NUM_EDGES | |
# GET /api/edges/<token>/<game_id> | |
# 辺の取得状況を取得 | |
edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n') | |
if edges_resp[0] == "ok": | |
claim_counts = list(map(int, edges_resp[1].split())) | |
my_claims = list(map(int, edges_resp[2].split())) | |
# GET /api/stage/<game_id> | |
# 支点、終点、コスト | |
stage_resp = call_api(f'/api/stage/{game_id}') | |
stage = list(map(int, stage_resp.split())) | |
(num_source, num_sink) = (stage[0], stage[1]) | |
sources = stage[2:2 + num_source] # list | |
sinks = stage[2 + num_source:2 + num_source + num_sink] # list | |
caps = stage[2 + num_source + num_sink:] # list | |
assert len(caps) == NUM_EDGES | |
# initialize | |
for s in sources: | |
coord_s = node_to_coord(s) | |
for g in sinks: | |
coord_g = node_to_coord(g) | |
v = (get_dist(coord_s, coord_g), s, g) | |
heapq.heappush(pq, v) | |
while True: | |
# GET /api/stage/<game_id> | |
# 支点、終点、コスト | |
stage_resp = call_api(f'/api/stage/{game_id}') | |
stage = list(map(int, stage_resp.split())) | |
(num_source, num_sink) = (stage[0], stage[1]) | |
sources = stage[2:2 + num_source] # list | |
sinks = stage[2 + num_source:2 + num_source + num_sink] # list | |
caps = stage[2 + num_source + num_sink:] # list | |
assert len(caps) == NUM_EDGES | |
# # GET /api/edges/<token>/<game_id> | |
# # 辺の取得状況を取得 | |
# edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n') | |
# if edges_resp[0] == "ok": | |
# claim_counts = list(map(int, edges_resp[1].split())) | |
# #my_claims = list(map(int, edges_resp[2].split())) | |
## process | |
edge_index = -1 | |
while True: | |
if ops_queue.empty(): | |
# TODO: 他のグラフと連結しないように | |
(_, s, g) = heapq.heappop(pq) | |
roots = get_roots(node_to_coord(s), node_to_coord(g)) | |
if all(map(lambda i: my_claims[i] == 0, roots)): | |
for e in roots: | |
ops_queue.put(e) | |
else: | |
roots = get_roots2(node_to_coord(s), node_to_coord(g)) | |
if all(map(lambda i: my_claims[i] == 0, roots)): | |
for e in roots: | |
ops_queue.put(e) | |
while not ops_queue.empty(): | |
tmp_index = ops_queue.get() | |
if my_claims[tmp_index] == 0: | |
edge_index = tmp_index | |
break | |
if edge_index != -1: | |
break | |
# GET /api/claim/<token>/<game_id>/<edge_index> | |
claim_resp = call_api(f'/api/claim/{TOKEN}/{game_id}/{edge_index}').strip() | |
if claim_resp == "ok": | |
query_num += 1 | |
my_claims[edge_index] = 1 | |
print(f"辺取得成功 {edge_index}, クエリ成功回数: {query_num}") | |
## スコア計算 | |
#if claim_resp == "ok": | |
# TODO: 重たそうだからそのうち消す | |
# score = calculate_score(sources, sinks, caps, claim_counts, my_claims) | |
# print("スコア", score) | |
elif claim_resp == "game_finished": | |
main() | |
elif claim_resp == "claim_time_limit": | |
ops_queue.put(edge_index) | |
else: | |
print(f"error msg: {claim_resp}") | |
print(f"辺取得失敗 {edge_index}") | |
# game = call_api('/api/game') | |
# _, remining_ms = list(map(int, game.split())) | |
# print(f"残り時間 {remining_ms / 1000}s") | |
# API制限のため少しの間待ちます | |
time.sleep(0) | |
# cf. https://github.com/spaghetti-source/algorithm/blob/9cca6b826f19ed7e42dd326a4fbbb9f4d34f04d3/graph/maximum_flow_dinic.cc | |
INF = 1 << 50 | |
class Edge(NamedTuple): | |
src: int | |
dst: int | |
capacity: int | |
rev: int | |
class Graph: | |
def __init__(self, n: int): | |
self.n = n | |
self.adj: List[List[Edge]] = [[] for _ in range(n)] | |
self.flow: Dict[Edge, int] = {} | |
def add_edge(self, src: int, dst: int, capacity: int): | |
e1 = Edge(src, dst, capacity, len(self.adj[dst])) | |
self.adj[src].append(e1) | |
self.flow[e1] = 0 | |
e2 = Edge(dst, src, 0, len(self.adj[src])-1) | |
self.adj[dst].append(e2) | |
self.flow[e2] = 0 | |
def max_flow(self, s: int, t: int): | |
level = [0 for _ in range(self.n)] | |
iter = [0 for _ in range(self.n)] | |
def levelize(): | |
for i in range(len(level)): | |
level[i] = -1 | |
level[s] = 0 | |
q = [s] | |
while q: | |
u, q = q[0], q[1:] | |
if u == t: | |
break | |
for e in self.adj[u]: | |
if e.capacity > self.flow[e] and level[e.dst] < 0: | |
q.append(e.dst) | |
level[e.dst] = level[u] + 1 | |
return level[t] | |
def augment(u, cur): | |
if u == t: return cur | |
while iter[u] < len(self.adj[u]): | |
e = self.adj[u][iter[u]] | |
r = self.adj[e.dst][e.rev] | |
if e.capacity > self.flow[e] and level[u] < level[e.dst]: | |
f = augment(e.dst, min(cur, e.capacity - self.flow[e])) | |
if f > 0: | |
self.flow[e] += f | |
self.flow[r] -= f | |
return f | |
iter[u] += 1 | |
return 0 | |
flow = 0 | |
while levelize() >= 0: | |
iter = [0 for _ in range(self.n)] | |
while True: | |
f = augment(s, INF) | |
if f <= 0: | |
break | |
flow += f | |
return flow | |
# フローを流して点数を計算します | |
def calculate_score(sources, sinks, caps, claim_counts, my_claims): | |
ratio = 1000 | |
g = Graph(NUM_NODES + 2) | |
# 各始点に向けて容量無限の辺を貼る | |
for source_node in sources: | |
g.add_edge(NUM_NODES, source_node, INF) | |
# 各終点から容量無限の辺を貼る | |
for sink_node in sinks: | |
g.add_edge(sink_node, NUM_NODES + 1, INF) | |
# 自分が取得した辺を貼る | |
for edge_index in range(NUM_EDGES): | |
if not my_claims[edge_index]: | |
continue | |
q = edge_index // 39 | |
r = edge_index % 39 | |
if r < 19: | |
v1 = 20 * q + r | |
v2 = 20 * q + r + 1 | |
else: | |
v1 = 20 * q + r - 19 | |
v2 = 20 * q + r - 19 + 20 | |
cap = caps[edge_index] * ratio // claim_counts[edge_index] | |
g.add_edge(v1, v2, cap) | |
g.add_edge(v2, v1, cap) | |
return g.max_flow(NUM_NODES, NUM_NODES + 1) / ratio | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment