Skip to content

Instantly share code, notes, and snippets.

@khirazo
Last active December 18, 2023 03:43
Show Gist options
  • Save khirazo/d979417011d8fcbab0af6a8135191021 to your computer and use it in GitHub Desktop.
Save khirazo/d979417011d8fcbab0af6a8135191021 to your computer and use it in GitHub Desktop.
Send UDP timestamped Syslog messages for SIEM test purpose
'''
Created on 2023/03/04
@author: khirazo
'''
import logging.handlers
import time, threading
from datetime import datetime
# logging.basicConfig(level = logging.DEBUG)
UDP_SERVER = "127.0.0.1"
UDP_PORT = 514
BASE_MESSAGE = "Syslog Message Test - device_time={} index={} seq={} [end]"
MESSAGES_PER_THREAD = 100
NUM_THREADS = 5
# MESSAGES_PER_THREAD 100 * NUM_THREADS 5 = 500 EPS
SLEEP_INTERVAL_SEC = 1
def get_timestamped_message(index=0, counter=0):
now = datetime.now()
return BASE_MESSAGE.format(now.strftime("%Y/%m/%d %H:%M:%S.%f"), index, counter)
class SyslogThread(threading.Thread):
def __init__(self, ts_logger, thread_num, start_num):
super().__init__()
self.ts_logger = ts_logger
self.thread_num = thread_num
self.start_num = start_num
def run(self):
for i in range(MESSAGES_PER_THREAD):
self.ts_logger.info(get_timestamped_message(self.thread_num, self.start_num + i))
def main():
counter = 0
ts_logger = logging.getLogger('timestamp_sender')
ts_logger.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address = (UDP_SERVER, UDP_PORT))
ts_logger.addHandler(handler)
while True:
start_time = time.perf_counter()
threads = []
for i in range(NUM_THREADS):
thread = SyslogThread(ts_logger, i, counter + i * MESSAGES_PER_THREAD)
thread.start()
threads.append(thread)
for thread in threads:
thread.join(timeout=900)
counter += NUM_THREADS * MESSAGES_PER_THREAD
end_time = time.perf_counter()
time.sleep(SLEEP_INTERVAL_SEC - (end_time - start_time))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment