Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Game module for Clockwork that demonstrates terminal manipulation possibilities.
# -*- coding: utf-8 -*-
"""Weather monitor; example of client terminal manipulation."""
# Part of Clockwork MUD Server (https://github.com/whutch/cwmud)
# :copyright: (c) 2008 - 2016 Will Hutcheson
# :license: MIT (https://github.com/whutch/cwmud/blob/master/LICENSE.txt)
from ... import settings
from ...contrib.weather.patterns import WeatherPattern
from ...contrib.worldgen.maps import _render_map_data, render_map_from_layers
from ...core.events import EVENTS
from ...core.sessions import Session, SESSIONS
from ...core.timing import TIMERS
from ...core.utils.decorators import patch
from ...core.utils.funcs import joins
from ...libs.miniboa import NAWS
settings.IDLE_TIME = 0
settings.IDLE_TIME_MAX = 0
@patch(Session)
def _parse_input(self, data):
if not data:
return
# TODO: do something with it?
# TODO: process ^C as quit
@patch(Session)
def _send(self, data):
self._client.send_cc(data)
@patch(Session)
def send(self, data, *more, sep="", end=""):
"""Send text to the client tied to this session.
The resulting output will not be sent immediately, but will be put
in a queue to be sent during its next poll.
`data` and all members of `more` will be converted to strings
and joined together by `sep` via the joins function.
:param any data: An initial chunk of data
:param any more: Optional, any additional data to send
:param str sep: Optional, a separator to join the resulting output by
:param str end: Optional, a terminator appended to the resulting output
:returns None:
"""
self._output_queue.append(joins(data, *more, sep=sep) + end)
@patch(Session)
def poll(self, output_only=False):
"""Check the status of this session and process any queued IO.
:param bool output_only: Whether to only process the output queue
(this allows you to call poll from inside a
command without triggering an infinite loop)
:returns None:
"""
if self._client.active:
if not output_only:
# Process input through the command queue.
if self._client.cmd_ready and self.active:
data = self._client.get_command()
if data is not None:
self._parse_input(data)
# Process output from the output queue.
if self._output_queue and self._client:
output = "".join(self._output_queue)
self._send(output)
self._output_queue.clear()
# We don't need the default connect menu.
EVENTS.unhook("client_connected")
@EVENTS.hook("client_connected")
def _hook_client_connected(client):
session = SESSIONS.create(client)
session.color = True
session.client.request_do_sga()
session.client.request_naws()
session.client.request_terminal_type()
session.client.request_linemode_off()
session.client.request_will_echo()
RAIN_SEED = TIMERS.time % 10000
WIND_SEED = RAIN_SEED / 2
RAIN_PATTERN = WeatherPattern(time_source=lambda: TIMERS.time,
time_scale=2, formation_speed=0.25,
storm_scale=50, wind_scale=20,
seed=RAIN_SEED)
WIND_PATTERN = WeatherPattern(time_source=lambda: TIMERS.time,
time_scale=2, formation_speed=0.35,
storm_scale=100, wind_scale=20,
seed=WIND_SEED)
MAP_SIZES = set()
BASE_MAP_TILES = {}
MAP_STRINGS = {}
def get_weather_tile(rain_value, wind_value):
"""Get the ASCII tile for the given weather data.
:param float rain_value: The amount of rain, from -1 to 1
:param float wind_value: The amount of wind, from -1 to 1
:returns str: The ASCII tile
"""
if -0.03 <= wind_value <= 0.03:
if rain_value >= 0.7:
return "^YH"
else:
return "^wW"
else:
if rain_value >= 0.6:
return "^CL"
if rain_value >= 0.4:
return "^cR"
if rain_value >= 0.3:
return "^bR"
return None
def generate_base_map(width=80, height=24, center=(0, 0)):
"""Generate the base map tiles for the given window size.
:param int width: The width of the map
:param int height: The height of the map
:param tuple(int, int) center: The center coordinate of the map
:returns None:
"""
layers = _render_map_data(width, height, center=center)
tiles = render_map_from_layers(*layers, convert_color=False,
join_tiles=False)
BASE_MAP_TILES[(width, height, center)] = tiles
def update_weather_data():
"""Update the current weather data for all window sizes."""
MAP_SIZES.clear()
for session in SESSIONS.all():
if session.client.telnet_opt_dict[NAWS].reply_pending:
continue
width, height = session.client.columns - 25, session.client.rows - 2
center = (0, 0) # TODO: option sent from client?
MAP_SIZES.add((width, height, center))
RAIN_PATTERN.update()
WIND_PATTERN.update()
for (width, height, center) in MAP_SIZES:
if (width, height, center) not in BASE_MAP_TILES:
generate_base_map(width, height, center)
rain_data = RAIN_PATTERN.generate_layer(width, height, center=center,
octaves=2, fine_noise=0.1)
wind_data = WIND_PATTERN.generate_layer(width, height, center=center,
octaves=2, fine_noise=0.05)
# TODO: precolor strings for efficiency?
rows = []
sidebar = ("^MSeed: {:>#9.8}\n^YTime: {:>#6.5g}\n\n"
"^BRain Storms:\n"
"^GXY: {:=+#8g} {:=+#8g}\n^CMovement: {} @ {:#.4}^~\n\n"
"^BWind Storms:\n"
"^GXY: {:=+#8g} {:=+#8g}\n^CMovement: {} @ {:#.4}^~\n\n"
"^wStorm Legend:\n"
"^KLight ^bR^Kain\n^KHeavy ^cR^Kain\n"
"^CL^Kightning\n^KStrong ^wW^Kind\n"
"^YH^Kurricane Winds!^~"
.format(RAIN_PATTERN.seed,
round(RAIN_PATTERN.time_offset, 3),
round(RAIN_PATTERN._offset_x[1], 3),
round(RAIN_PATTERN._offset_y[1], 3),
RAIN_PATTERN.get_wind_direction(),
round(RAIN_PATTERN.get_wind_speed(), 3),
round(WIND_PATTERN._offset_x[1], 3),
round(WIND_PATTERN._offset_y[1], 3),
WIND_PATTERN.get_wind_direction(),
round(WIND_PATTERN.get_wind_speed(), 3))
.split("\n"))
for y in range(len(rain_data)):
tiles = []
for x in range(len(rain_data[y])):
tile = get_weather_tile(rain_data[y][x], wind_data[y][x])
if tile is None:
tile = BASE_MAP_TILES[(width, height, center)][y][x]
tiles.append(tile)
tiles.append("^~")
row = "".join(tiles)
if y < len(sidebar):
row = row + " " + sidebar[y]
rows.append(row)
MAP_STRINGS[(width, height, center)] = rows
def send_weather_frame_smart(session):
"""Send the current weather frame to the given session.
:param Session session: The session to send the weather frame to
:returns None:
"""
# TODO: transition messages, "Please wait.." etc
if session.client.telnet_opt_dict[NAWS].reply_pending:
# We're not done negotiating their screen size.
return
width, height = session.client.columns - 25, session.client.rows - 2
if "ready" not in session.flags:
# Setup their window.
session.send(chr(27), "[2J") # Clear their screen.
session.send(chr(27), "[1;1H")
session.send("AccuWeather 5000 Storm Tracking System")
session.send(chr(27), "[2;{}r".format(height + 1))
session.send(chr(27), "[{};1H".format(height + 2))
#session.send("Command: ")
session.flags.add("ready")
if (width, height, (0, 0)) not in MAP_SIZES:
return
session.send(chr(27), "7") # Save cursor position.
for line, row in enumerate(MAP_STRINGS[(width, height, (0, 0))], 2):
# TODO: keep set of last frame and only send delta
session.send(chr(27), "[{};1H".format(line), row)
session.send(chr(27), "8") # Restore cursor position.
def send_weather_frame_dumb(session):
"""Send the current weather frame to the given session in a dumb way.
:param Session session: The session to send the weather frame to
:returns None:
"""
if session.client.telnet_opt_dict[NAWS].reply_pending:
# We're not done negotiating their screen size.
return
width, height = session.client.columns - 25, session.client.rows - 2
if (width, height, (0, 0)) not in MAP_SIZES:
return
session.send("\nAccuWeather 5000 Storm Tracking System\n")
for row in MAP_STRINGS[(width, height, (0, 0))]:
session.send(row, "\n")
@TIMERS.create("10 pulses", repeat=-1)
def _update_frame():
update_weather_data()
for session in SESSIONS.all():
if session.client.terminal_type.lower() in ("blowtorch",):
send_weather_frame_dumb(session)
else:
send_weather_frame_smart(session)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.