Skip to content

Instantly share code, notes, and snippets.

@windo
Created May 13, 2024 21:13
Show Gist options
  • Save windo/39d4e4bdccadea8a40a347d4b1c5d467 to your computer and use it in GitHub Desktop.
Save windo/39d4e4bdccadea8a40a347d4b1c5d467 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.11
from concurrent import futures
import datetime
import json
import logging
import random
import requests
import time
HOST = 'pseudovote.net'
BULLETIN = 'mfivknreecph'
THREADS = 100
VOTERS = 10000
INTERVAL = 5.0
LINGER_UPTO = datetime.timedelta(seconds=10)
LEVEL = logging.INFO
class Voter:
def __init__(
self, host: str, bulletin: str, pseudonym: str, vote: str,
linger: datetime.timedelta,
):
self._host = host
self._bulletin = bulletin
self._pseudonym = pseudonym
self._vote = vote
self._linger = linger
self._state: str | None = None
self._done = False
self._pings = 0
self._vote_attempts = 0
self._success = False
def cancel(self):
self._done = True
@property
def pings(self) -> int:
return self._pings
@property
def vote_attempts(self) -> int:
return self._vote_attempts
@property
def success(self) -> bool:
return self._success
def _try_vote(self):
if self._success:
return
vote = requests.request(
'POST', f'https://{self._host}/api/vote', data={
'pseudonym': self._pseudonym,
'bulletin_token': self._bulletin,
'content': self._vote,
},
)
self._vote_attempts += 1
if vote.status_code == 429: # Too Many Requests
# Try again later.
logging.debug('Too many requests, will try to vote later')
return
if not vote.ok:
logging.warning('Vote failed: %s %s', vote.status_code, vote.text)
else:
self._success = True
def vote(self):
start = datetime.datetime.now()
end = start + self._linger
# open stream
stream = requests.request(
'GET', f'https://{self._host}/api/bulletin/{self._bulletin}', stream=True,
)
# linger
for line in stream.iter_lines():
if line == b'event: ping':
logging.debug('ping')
self._pings += 1
elif line == b'':
pass
elif line.startswith(b'data: {'):
d = json.loads(line[6:])
state = d.get('state', None)
if state == 'incoming-vote':
logging.debug('vote seen')
else:
logging.debug('state %s', state)
self._state = state
elif line.startswith(b'data: '):
pass
else:
logging.warning('unknown input: %s', line)
if not self.success and self._state == 'wait-end':
self._try_vote()
if datetime.datetime.now() >= end:
break
if self._done:
break
logging.basicConfig(level=LEVEL)
logging.info('Creating %d executor threads', THREADS)
p = futures.ThreadPoolExecutor(THREADS)
def generate_random() -> str:
return random.randbytes(10).hex()
logging.info('Creating %d voters', VOTERS)
voters = []
for _ in range(VOTERS):
v = Voter(
host=HOST,
bulletin=BULLETIN,
linger=LINGER_UPTO,
pseudonym=generate_random(),
vote=generate_random(),
)
voters.append(v)
logging.info('Kicking off voters')
waiting = []
for v in voters:
waiting.append(p.submit(v.vote))
logging.info('Waiting for results')
try:
while waiting:
time.sleep(INTERVAL)
new_waiting = []
for w in waiting:
if w.done():
e = w.exception()
if e is not None:
logging.warning('Exception from voter: %s', e)
else:
new_waiting.append(w)
waiting = new_waiting
logging.debug('%d voters left', len(waiting))
print('futures running: ', sum(
w.running() for w in waiting
))
print('votes attempted: ', sum(
v.vote_attempts for v in voters
))
print('votes succeeded: ', sum(
v.success for v in voters
))
print('pings received: ', sum(
v.pings for v in voters
))
except KeyboardInterrupt:
logging.info('wrapping up')
for v in voters:
v.cancel()
p.shutdown(wait=False, cancel_futures=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment