Skip to content

Instantly share code, notes, and snippets.

@prashantkumarpathak
Last active July 25, 2021 07:27
Show Gist options
  • Save prashantkumarpathak/060bbbbffe17843d84cfa4ba9a705f6a to your computer and use it in GitHub Desktop.
Save prashantkumarpathak/060bbbbffe17843d84cfa4ba9a705f6a to your computer and use it in GitHub Desktop.
PyTest Code for Verifying the Logs Flow Between Components
"""
Sample PyTest Code to Demonstrate the System Integration Testing of an Application
"""
import re
import pytest
import paramiko
@pytest.fixture
def _do_ssh_logging(request, cluster_ip):
# Validate the IP pattern
valid_ip_pattern = re.compile(r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)')
is_ip_good = valid_ip_pattern.match(cluster_ip)
if is_ip_good:
print(f" SSH to {cluster_ip} ")
logging_client = paramiko.SSHClient()
logging_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
logging_client.connect(cluster_ip, 22, 'user_name', 'password')
print("Connected Successfully !!!")
request.cls._client = logging_client
yield
logging_client.close()
else:
print("Invalid IP Address Entered !!!")
exit()
class TestLogFlow:
"""
Test to verify the syslog has been received to the component_1 from various syslog servers
"""
# Consider the component_1 ip s 10.100.0.1
@pytest.mark.parametrize('cluster_ip', ['10.100.0.1'])
def test_component1_receives_data(self, _do_ssh_logging):
"""
Input : Do a NetCat of a sample logs (for your testing) and it will get stored inside /rsyslog/Internal.syslog.log
This operation is to replicate the scenario which is similar like receiving syslog from external sources in real time.
Place a sample.log to your component_1 node and NetCat using the command shown below
Here 514 is the port on which component_1 is receiving logs
"""
stdin, stdout, stderr = self._client.exec_command("echo <path to your file> | nc localhost 514", get_pty=True)
assert stderr.channel.recv_exit_status() == 0
print(f" NetCat command is successfully!!!\n ")
# Command to verify the logs which sends from a outside server in this case from NetCat command has been received
grep_command= 'cat /opt/data/input/rsyslog/Internal.syslog.log | grep -o -i "Testing Purpose String"'
stdin, stdout, stderr = self._client.exec_command(grep_command, get_pty=True)
print("Output is ####", stdout.readlines())
print(f" Grep command ran successfully for the pattern!!! \n")
assert stderr.channel.recv_exit_status() == 0
"""
Test to verify the syslogs from component_1 is forwarded successfully to component_2 under rsyslog folder
"""
# Consider the component_2 ip s 10.100.0.2
@pytest.mark.parametrize('cluster_ip', ['10.100.0.2'])
def test_component2_receives_data(self, _do_ssh_logging):
# Command to verify the component_2 is able to receives logs from component_1 through syslog forwarding
grep_command= 'cat /opt/data/input/rsyslog/Internal.syslog.log | grep -o -i "Testing Purpose String"'
stdin, stdout, stderr = self._client.exec_command(grep_command, get_pty=True)
print("Output is ####", stdout.readlines())
print(f" Grep command ran successfully for the pattern!!! \n")
assert stderr.channel.recv_exit_status() == 0
"""
The below step is to replicate a real application scenario where the application is applying algorithm to detect
the alerts on raw logs received from data lake sources i.e. component_1 in this case.
If the logs receives at rsyslog has a specific pattern as per requirement in this case "location_flag_set:true"
then add an alert flag in the logs and forward it to component_3 which is responsible for creating incident ID.
"""
print(f" Checking if the logs has pattern to identify as Alert ")
grep_command= 'cat /opt/data/input/rsyslog/Internal.syslog.log | grep -o -i "location_flag_set:true"'
stdin, stdout, stderr = self._client.exec_command(grep_command, get_pty=True)
print("Output is ####", stdout.readlines())
print(f" Alert Check has been done !!! \n")
assert stderr.channel.recv_exit_status() == 0
# Update the log by adding an Alert flag pattern and it will get forwarded to component_3 i.e. incident creation
print(f" Applying the Alert Rule Algorithm on the log ")
log_path = '/opt/data/input/rsyslog/Internal.syslog.log'
sed_cmd = "sed -i 's/location_flag_set/alert_type location_flag_set/' " + log_path
stdin, stdout, stderr = self._client.exec_command(sed_cmd, get_pty=True)
print(f" Alert Rule has been applied successfully !!! \n")
assert stderr.channel.recv_exit_status() == 0
# Consider the component_3 ip s 10.100.0.3
@pytest.mark.parametrize('cluster_ip', ['10.100.0.3'])
def test_component3_creates_tickets(self, _do_ssh_logging):
print(f" Verify the component_3 is able to receive logs from component_2 through syslog forwarder")
grep_command = 'cat /opt/data/input/rsyslog/Incident_Responder.log | grep -o -i "alert_type"'
stdin, stdout, stderr = self._client.exec_command(grep_command, get_pty=True)
print("Output is ####", stdout.readlines())
print(f" Grep command ran successfully for the pattern!!! \n")
assert stderr.channel.recv_exit_status() == 0
"""
The below step is replication of real time application where based on the log contains alert or not
it will create an Incident Responder with a unique ID and the same can be verify from that service log.
"""
# Update the log by adding an Incident ID
print(f" Creating the Incident ID by adding a ID number in the logs ")
log_path = '/opt/data/input/rsyslog/Incident_Responder.log'
sed_cmd = "sed -i 's/alert_type/IR-ID-3243 alert_type/' " + log_path
stdin, stdout, stderr = self._client.exec_command(sed_cmd, get_pty=True)
print(f" Incident ID has been created successfully !!! \n")
assert stderr.channel.recv_exit_status() == 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment