Skip to content

Instantly share code, notes, and snippets.

@Gobi03
Created December 22, 2019 04:09
Show Gist options
  • Save Gobi03/f85b6bd4b89c40880d85ba01221161eb to your computer and use it in GitHub Desktop.
Save Gobi03/f85b6bd4b89c40880d85ba01221161eb to your computer and use it in GitHub Desktop.
天下一GameBattleContest(β)のコード https://tenka1.klab.jp/2019-obt/problem_re31z0.html
#!/usr/bin/env python3
"""
辺をランダムに取得し、点数を計算するサンプルプログラムです。
実行には python3 環境が必要です。
TOKEN 変数を書き換えて実行してください。
"""
import heapq
import queue
import os
import random
from typing import NamedTuple, List, Dict
from urllib.request import urlopen
import time
# ゲームサーバのアドレス
GAME_SERVER = os.getenv('GAME_SERVER', 'https://obt.tenka1.klab.jp')
# あなたのトークン
TOKEN = "XXXXXXXXXXX"
# 頂点と辺の数
NUM_NODES = 400
NUM_EDGES = 760
def call_api(x):
with urlopen(f'{GAME_SERVER}{x}') as res:
return res.read().decode()
# ユーザ定義関数
# coord1 => coord2 への移動方向
# マンハッタン距離1である前提
def to_dir(coord1, coord2): # (int, int), (int, int)
(x1, y1) = coord1
(x2, y2) = coord2
if x1 < x2:
return "R"
elif x1 > x2:
return "L"
elif y1 < y2:
return "D"
elif y1 > y2:
return "U"
def coords_to_edge(coord1, coord2): # (int, int), (int, int)
(x1, y1) = min(coord1, coord2)
(x2, y2) = max(coord1, coord2)
todir = to_dir((x1, y1), (x2, y2))
if todir == "R":
return 39 * y1 + x1
elif todir == "D":
return 39 * y1 + x1 + 19
def node_to_coord(node: int):
return (node % 20, node // 20)
# マンハッタン距離を求める
def get_dist(coord1, coord2): # (int, int), (int, int)
(x1, y1) = coord1
(x2, y2) = coord2
return abs(x2-x1) + abs(y2-y1)
def get_roots(coord1, coord2): # (int, int), (int, int) => int list
res = []
(x1, y1) = min(coord1, coord2)
(x2, y2) = max(coord1, coord2)
for x in range(x1, x2):
res.append(coords_to_edge((x, y1), (x+1, y1)))
for y in range(min(y1, y2), max(y1, y2)):
res.append(coords_to_edge((x2, y), (x2, y+1)))
return res
def get_roots2(coord1, coord2): # (int, int), (int, int) => int list
res = []
(x1, y1) = min(coord1, coord2)
(x2, y2) = max(coord1, coord2)
for y in range(min(y1, y2), max(y1, y2)):
res.append(coords_to_edge((x1, y), (x1, y+1)))
for x in range(x1, x2):
res.append(coords_to_edge((x, y2), (x+1, y2)))
return res
# main
def main():
pq = [] # dist, s: int, g: int
ops_queue = queue.Queue()
query_num = 0
# GET /api/game
game = call_api('/api/game')
game_id, remining_ms = list(map(int, game.split()))
print(f"\n\nゲームID {game_id}")
print(f"残り時間 {remining_ms / 1000}s")
claim_counts = [0]*NUM_EDGES
my_claims = [0]*NUM_EDGES
# GET /api/edges/<token>/<game_id>
# 辺の取得状況を取得
edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n')
if edges_resp[0] == "ok":
claim_counts = list(map(int, edges_resp[1].split()))
my_claims = list(map(int, edges_resp[2].split()))
# GET /api/stage/<game_id>
# 支点、終点、コスト
stage_resp = call_api(f'/api/stage/{game_id}')
stage = list(map(int, stage_resp.split()))
(num_source, num_sink) = (stage[0], stage[1])
sources = stage[2:2 + num_source] # list
sinks = stage[2 + num_source:2 + num_source + num_sink] # list
caps = stage[2 + num_source + num_sink:] # list
assert len(caps) == NUM_EDGES
# initialize
for s in sources:
coord_s = node_to_coord(s)
for g in sinks:
coord_g = node_to_coord(g)
v = (get_dist(coord_s, coord_g), s, g)
heapq.heappush(pq, v)
while True:
# GET /api/stage/<game_id>
# 支点、終点、コスト
stage_resp = call_api(f'/api/stage/{game_id}')
stage = list(map(int, stage_resp.split()))
(num_source, num_sink) = (stage[0], stage[1])
sources = stage[2:2 + num_source] # list
sinks = stage[2 + num_source:2 + num_source + num_sink] # list
caps = stage[2 + num_source + num_sink:] # list
assert len(caps) == NUM_EDGES
# # GET /api/edges/<token>/<game_id>
# # 辺の取得状況を取得
# edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n')
# if edges_resp[0] == "ok":
# claim_counts = list(map(int, edges_resp[1].split()))
# #my_claims = list(map(int, edges_resp[2].split()))
## process
edge_index = -1
while True:
if ops_queue.empty():
# TODO: 他のグラフと連結しないように
(_, s, g) = heapq.heappop(pq)
roots = get_roots(node_to_coord(s), node_to_coord(g))
if all(map(lambda i: my_claims[i] == 0, roots)):
for e in roots:
ops_queue.put(e)
else:
roots = get_roots2(node_to_coord(s), node_to_coord(g))
if all(map(lambda i: my_claims[i] == 0, roots)):
for e in roots:
ops_queue.put(e)
while not ops_queue.empty():
tmp_index = ops_queue.get()
if my_claims[tmp_index] == 0:
edge_index = tmp_index
break
if edge_index != -1:
break
# GET /api/claim/<token>/<game_id>/<edge_index>
claim_resp = call_api(f'/api/claim/{TOKEN}/{game_id}/{edge_index}').strip()
if claim_resp == "ok":
query_num += 1
my_claims[edge_index] = 1
print(f"辺取得成功 {edge_index}, クエリ成功回数: {query_num}")
## スコア計算
#if claim_resp == "ok":
# TODO: 重たそうだからそのうち消す
# score = calculate_score(sources, sinks, caps, claim_counts, my_claims)
# print("スコア", score)
elif claim_resp == "game_finished":
main()
elif claim_resp == "claim_time_limit":
ops_queue.put(edge_index)
else:
print(f"error msg: {claim_resp}")
print(f"辺取得失敗 {edge_index}")
# game = call_api('/api/game')
# _, remining_ms = list(map(int, game.split()))
# print(f"残り時間 {remining_ms / 1000}s")
# API制限のため少しの間待ちます
time.sleep(0)
# cf. https://github.com/spaghetti-source/algorithm/blob/9cca6b826f19ed7e42dd326a4fbbb9f4d34f04d3/graph/maximum_flow_dinic.cc
INF = 1 << 50
class Edge(NamedTuple):
src: int
dst: int
capacity: int
rev: int
class Graph:
def __init__(self, n: int):
self.n = n
self.adj: List[List[Edge]] = [[] for _ in range(n)]
self.flow: Dict[Edge, int] = {}
def add_edge(self, src: int, dst: int, capacity: int):
e1 = Edge(src, dst, capacity, len(self.adj[dst]))
self.adj[src].append(e1)
self.flow[e1] = 0
e2 = Edge(dst, src, 0, len(self.adj[src])-1)
self.adj[dst].append(e2)
self.flow[e2] = 0
def max_flow(self, s: int, t: int):
level = [0 for _ in range(self.n)]
iter = [0 for _ in range(self.n)]
def levelize():
for i in range(len(level)):
level[i] = -1
level[s] = 0
q = [s]
while q:
u, q = q[0], q[1:]
if u == t:
break
for e in self.adj[u]:
if e.capacity > self.flow[e] and level[e.dst] < 0:
q.append(e.dst)
level[e.dst] = level[u] + 1
return level[t]
def augment(u, cur):
if u == t: return cur
while iter[u] < len(self.adj[u]):
e = self.adj[u][iter[u]]
r = self.adj[e.dst][e.rev]
if e.capacity > self.flow[e] and level[u] < level[e.dst]:
f = augment(e.dst, min(cur, e.capacity - self.flow[e]))
if f > 0:
self.flow[e] += f
self.flow[r] -= f
return f
iter[u] += 1
return 0
flow = 0
while levelize() >= 0:
iter = [0 for _ in range(self.n)]
while True:
f = augment(s, INF)
if f <= 0:
break
flow += f
return flow
# フローを流して点数を計算します
def calculate_score(sources, sinks, caps, claim_counts, my_claims):
ratio = 1000
g = Graph(NUM_NODES + 2)
# 各始点に向けて容量無限の辺を貼る
for source_node in sources:
g.add_edge(NUM_NODES, source_node, INF)
# 各終点から容量無限の辺を貼る
for sink_node in sinks:
g.add_edge(sink_node, NUM_NODES + 1, INF)
# 自分が取得した辺を貼る
for edge_index in range(NUM_EDGES):
if not my_claims[edge_index]:
continue
q = edge_index // 39
r = edge_index % 39
if r < 19:
v1 = 20 * q + r
v2 = 20 * q + r + 1
else:
v1 = 20 * q + r - 19
v2 = 20 * q + r - 19 + 20
cap = caps[edge_index] * ratio // claim_counts[edge_index]
g.add_edge(v1, v2, cap)
g.add_edge(v2, v1, cap)
return g.max_flow(NUM_NODES, NUM_NODES + 1) / ratio
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment