Skip to content

Instantly share code, notes, and snippets.

@leafsummer
Created July 23, 2019 10:59
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save leafsummer/7dfc2496869ab886d4bb8b36546b9e92 to your computer and use it in GitHub Desktop.
[collect_linux_info]
import sys
import socket
from zipfile import ZipFile
import logging
from datetime import datetime
import os
import shutil
import subprocess
import glob
import pwd
import re
import csv
import grp
import time
try:
from hashlib import md5 as m_md5
except ImportError:
from md5 import md5 as m_md5
size_max_log = 10 * 1024 * 1024
start_fs = '/'
skipped_dir = []
if len(skipped_dir):
skipped_dir = [os.path.join(start_fs, d) for d in skipped_dir]
mime_filter = []
level_debug = logging.INFO
# path file to collects
etc_passwd = os.path.join(start_fs, '/etc/passwd')
etc_shadow = os.path.join(start_fs, '/etc/shadow')
etc_bashrc = os.path.join(start_fs, '/etc/bash.bashrc')
etc_profile = os.path.join(start_fs, '/etc/profile')
etc_cron = os.path.join(start_fs, '/etc/cron.*')
var_cron = os.path.join(start_fs, '/var/spool/cron/')
etc_folder_d = os.path.join(start_fs, '/etc/*.d')
etc_other = [os.path.join(start_fs, '/etc/ld.so.preload')]
etc_other.append(os.path.join(start_fs, '/etc/rc.local'))
etc_other.append(os.path.join(start_fs, '/etc/init/'))
etc_other.append(os.path.join(start_fs, '/etc/rsyslog.conf'))
etc_other.append(os.path.join(start_fs, '/etc/audit/'))
systemd_usr = os.path.join(start_fs, '/usr/lib/systemd/')
systemd_etc = os.path.join(start_fs, '/etc/systemd/')
var_log_to_collect = [os.path.join(start_fs, '/var/log/message*')]
var_log_to_collect.append(os.path.join(start_fs, '/var/log/secure*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/maillog*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/cron*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/spooler*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/boot.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/auth.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/syslog*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/kern.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/mail.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/mail.err*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/ufw.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/audit/audit.log*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/wtmp*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/log/btmp*'))
var_log_to_collect.append(os.path.join(start_fs, '/var/run/utmp*'))
# command to launch
netstat = ['netstat', '-anpetul']
ss = ['ss', '-tp']
ps = ['ps', '-ewo', '%p,%P,%x,%t,%u,%c,%a']
last = ['last', '-Faixw']
lsof = ['lsof', '-R']
du = ['du', '-sh']
fdisk = ['fdisk', '-l']
hostname = ['hostname']
uname = ['uname', '-r']
ifconfig = ['ifconfig', '-a']
os_version = ['cat', '/proc/version']
whoami = ['who', 'am', 'i']
uname_os_name = ['uname']
lsmod = ['lsmod']
arp = ['arp', '-a']
route = ['ip', 'route']
top = ['top', '-b', '-n', '1', '-d', '1', '-c']
# output
pattern_last_output = "([^\s]+)\s+(\([^\)]*\)|system \S+|\S+)(.+[^\d])(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
# header de fichiers output
header_ss = ['State', 'Recv-Q', 'Send-Q', 'Local Address:Port', 'Peer Address:Port', 'Users']
header_last_output = ['User', 'Way of connection', 'Date', 'Remote host']
header_netstat = ['Proto', 'Recv-Q', 'Send-Q', 'Local Address', 'Remote Address', 'State', 'User', 'Inode',
'PID/Program name']
header_fs = ['path', 'mime', 'filesize', 'owner', 'group', 'atime', 'mtime', 'ctime', 'inode']
header_lsmod = ['Module', 'Size', 'Used_by_Count', 'Used_by_Modules']
class utils(object):
def __init__(self, args):
self.args = args
def walk(self, path):
# parcours de tous les fichiers et dossiers a partir de path
# ajout d'un filtre pour exclure ce qui n'est pas sur le meme device ? (par exemple dans les args mettre all ou juste device)
pass
@staticmethod
def exec_cmd(cmd, raw_res=False):
cmd_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if not raw_res:
res = []
for p in cmd_process.stdout:
res.append(p.replace("\n", ""))
return res
else:
return cmd_process.stdout.read()
@staticmethod
def exec_cmd_file(cmd):
cmd_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd_process.stdout
@staticmethod
def open_csv(header, fname):
writer = csv.DictWriter(open(fname, 'w'), header)
if hasattr(writer, 'writeheader'):
writer.writeheader()
return writer
@staticmethod
def writerow(map, writer):
writer.writerow(map)
@staticmethod
def write_to_csv(map, header, fname):
f = open(fname, "w")
writer = csv.DictWriter(f, fieldnames=header)
if hasattr(writer, 'writeheader'):
writer.writeheader()
count = 0
for m in map:
if count == 0 or m == {}:
pass
else:
writer.writerow(m)
count += 1
@staticmethod
def write_to_file(f, data):
f = open(f, "w")
for elem in data:
f.write(elem)
f.close()
@staticmethod
def list_to_map(keys, data, delimiter=None):
res = []
for d in data:
map = {}
if delimiter:
line = d.split(delimiter)
else:
line = d
for j in range(len(line)):
if len(keys) == len(line):
map[keys[j]] = line[j]
res.append(map)
return res
def du(path):
"""disk usage in human readable format (e.g. '2,1GB')"""
du_cmd = du.append(path)
return subprocess.check_output(du_cmd).split()[0].decode('utf-8')
@staticmethod
def zip_file(list_to_zip, zip_filename, output_dir, logger):
my_zip = ZipFile(os.path.join(output_dir, zip_filename), 'w', allowZip64=True)
for path in list_to_zip:
new_path = path.replace('/.', '/_')
try:
my_zip.write(path, new_path)
except Exception, e:
logger.warning(e.strerror + ' ' + path)
my_zip.close()
@staticmethod
def convert_timestamp(timestamp):
return datetime.fromtimestamp(
int(timestamp)
).strftime('%Y-%m-%d %H:%M:%S')
@staticmethod
def os_type():
try:
return utils.exec_cmd(uname_os_name, True)
except:
return "mac"
@staticmethod
def get_globs(path):
output = []
for entry in glob.glob(path):
if os.path.isdir(entry):
output.extend(utils.walk_dir(entry))
else:
output.append(entry)
return output
@staticmethod
def get_globs_nowalking(path):
return glob.glob(path)
@staticmethod
def walk_dir(path):
output = []
list_dirs = os.walk(path)
for root, dirs, files in list_dirs:
for f in files:
output.append(os.path.join(root, f))
return output
class FileSystem(object):
def __init__(self, args):
self.args = args
def _check_dir(self, f):
check_dir = [f.startswith(d) for d in skipped_dir]
if len(check_dir) == 0:
return True
return not any(check_dir)
def get_infos_fs(self):
writer = None
self.args['logger'].info('Start make time line FS')
for dirName, subdirList, fileList in os.walk(start_fs):
for f in fileList:
path = os.path.join(dirName, f)
if self._check_dir(path):
record = self._get_file_metada(path)
if not record:
continue
record['path'] = path
cmd = ['file', path]
res = utils.exec_cmd(cmd)
mime = res[0].split(':')[1].lstrip()
record['mime'] = mime
if not writer:
writer = utils.open_csv(header_fs,
os.path.join(self.args['output_dir'], 'fs.csv'))
if hasattr(writer, 'writeheader'):
writer.writeheader()
writer.writerow(record)
self.args['logger'].info('Timeline FS done')
def _get_file_metada(self, fname):
try:
stats = os.lstat(fname)
except:
stats = None
return stats
try:
pwd_struct = pwd.getpwuid(stats.st_uid)
except:
pwd_struct = None
try:
grp_struct = grp.getgrgid(stats.st_gid)
except:
grp_struct = None
meta_data = {
'filesize': stats.st_size,
'mtime': utils.convert_timestamp(stats.st_mtime),
'atime': utils.convert_timestamp(stats.st_atime),
'ctime': utils.convert_timestamp(stats.st_ctime),
'owner': (stats.st_uid, pwd_struct),
'group': (stats.st_gid, grp_struct),
'inode': stats.st_ino,
}
if hasattr(stats, 'st_birthtime'):
meta_data['crtime'] = utils.convert_timestamp(stats.st_birthtime)
return meta_data
class LiveInformations(object):
def __init__(self, args=None):
self.args = args
self._info_path = os.path.join(self.args["output_dir"], "additionnal_informations.txt")
self._additional_info = {}
self._package_mgr = None
def get_processes_new(self):
def _readline(path):
f = open(path)
line = f.readline().strip()
f.close()
return line
def _tstostr(ts):
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
def _md5(fname):
hash_md5 = m_md5()
f = open(fname, "rb")
for chunk in iter(lambda: f.read(4096), ""):
hash_md5.update(chunk)
f.close()
return hash_md5.hexdigest()
self.args['logger'].info('Get porcess list')
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
header = ['starttime', 'user', 'pid', 'ppid', 'path', 'md5', 'cmdline']
map = []
for pid in pids:
try:
proc = '/proc/{0}/'.format(pid)
pstat = os.stat(proc)
uid = pstat.st_uid
ctime = pstat.st_ctime
comm = _readline(proc + 'comm')
cmdline = _readline(proc + 'cmdline').strip('\x00').replace('\x00', ' ')
stat = _readline(proc + 'stat')
ppid = stat.split(' ')[3]
cwd = os.readlink(proc + 'cwd')
try:
user = pwd.getpwuid(uid)[0]
except:
user = str(uid)
try:
exe = os.readlink(proc + 'exe')
except:
exe = None
if exe == None:
# comm = '[' + comm + ']'
path = '[' + comm + ']'
else:
if os.path.exists(cwd + exe):
path = os.path.realpath(cwd + exe)
else:
path = exe
if os.path.exists(path):
md5 = _md5(path)
else:
md5 = ""
map.append({'starttime': _tstostr(ctime), 'user': user, 'pid': pid, 'ppid': ppid, 'path': path,
'md5': md5, 'cmdline': cmdline})
except Exception, e:
self.args['logger'].warning(e)
self.args['logger'].warning("Get process {0} info failed".format(pid))
# print ",".join((_tstostr(ctime), user, pid, ppid, path, md5, cmdline))
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "process.csv"))
utils.write_to_csv(map, header, os.path.join(self.args['output_dir'], "process.csv"))
def _get_header(self, data, delimiter, offset=0):
fields = data[offset].split(delimiter)
header = []
for f in fields:
f = f.replace(" ", "")
if f != "":
header.append(f)
return header
def _delete_spaces(self, data):
res_without_spaces = []
for r in data:
temp = []
for value in r.split(' '):
if value is not "":
temp.append(value)
res_without_spaces.append(temp)
return res_without_spaces
def _get_kernel_version(self):
try:
self.args['logger'].info(' '.join(uname))
self._additional_info['kernel'] = utils.exec_cmd(uname, True).rstrip()
except:
self.args['logger'].error("%s command failed" % ' '.join(uname))
def _get_os_infos(self):
try:
self.args['logger'].info(' '.join(os_version))
self._additional_info["os_informations"] = utils.exec_cmd(os_version, True).rstrip()
except:
self.args['logger'].error("%s command failed" % ' '.join(os_version))
def _get_user(self):
# get user who is logged on
try:
self.args['logger'].info(' '.join(whoami))
self._additional_info["user"] = utils.exec_cmd(whoami, True)
except:
self.args['logger'].error("%s command failed" % ' '.join(whoami))
def _get_network_card(self):
try:
self.args['logger'].info(' '.join(ifconfig))
res = utils.exec_cmd(ifconfig, True).rstrip()
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "ifconfig_all"))
utils.write_to_file(os.path.join(self.args['output_dir'], "ifconfig_all"), res)
cards = res.split("\n\n")
count = 1
for c in cards:
li = c.split('\n')
self._additional_info["network_card_" + str(count)] = li[0] + "\n\t" + li[1] + "\n\t" + li[2]
count += 1
except:
self.args['logger'].error("%s command failed" % ' '.join(ifconfig))
def _get_hostname(self):
try:
self.args['logger'].info(' '.join(hostname))
self._additional_info['hostname'] = utils.exec_cmd(hostname, True).rstrip()
except:
self.args['logger'].error("%s command failed" % ' '.join(hostname))
def _get_distinfo(self):
if os.path.exists('/etc/redhat-release'):
self._package_mgr = 'rpm'
f = open('/etc/redhat-release')
self._additional_info['dist-version'] = f.readline().strip()
f.close()
elif os.path.exists('/etc/issue'):
self._package_mgr = 'dpkg'
f = open('/etc/issue')
self._additional_info['dist-version'] = f.readline().strip()
f.close()
else:
pass
def get_arp_table(self):
try:
self.args['logger'].info(' '.join(arp))
res = utils.exec_cmd(arp)
header = ['hostname', 'ip', 'mac', 'interface']
map = []
for line in res:
line = line.split(" ")
map.append({'hostname': line[0], 'ip': line[1][1:-1], 'mac': line[3], 'interface': line[6]})
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "arp_table.csv"))
utils.write_to_csv(map, header, os.path.join(self.args['output_dir'], "arp_table.csv"))
except:
self.args['logger'].error("%s command failed" % ' '.join(arp))
def get_route_table(self):
try:
self.args['logger'].info(' '.join(route))
res = utils.exec_cmd(route, True)
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "route_table"))
utils.write_to_file(os.path.join(self.args['output_dir'], "route_table"), res)
except:
self.args['logger'].error("%s command failed" % ' '.join(route))
def get_top(self):
try:
self.args['logger'].info(' '.join(top))
res = utils.exec_cmd(top, True)
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "top"))
utils.write_to_file(os.path.join(self.args['output_dir'], "top"), res)
except:
self.args['logger'].error("%s command failed" % ' '.join(top))
def get_network_connections(self):
# fais un ss -tp
try:
self.args['logger'].info(' '.join(ss))
res = utils.exec_cmd(ss)
res_without_spaces = self._delete_spaces(res)
map = utils.list_to_map(header_ss, res_without_spaces)
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "ss_sockets.csv"))
utils.write_to_csv(map, header_ss, os.path.join(self.args['output_dir'], "ss_sockets.csv"))
except:
self.args['logger'].error("%s command failed" % ' '.join(ss))
try:
self.args['logger'].info(' '.join(netstat))
res = utils.exec_cmd(netstat)
res_without_spaces = self._delete_spaces(res)
del res_without_spaces[0]
map = utils.list_to_map(header_netstat, res_without_spaces)
self.args['logger'].info(
'Write in csv file %s ' % os.path.join(self.args['output_dir'], "netstat_sockets.csv"))
utils.write_to_csv(map, header_netstat, os.path.join(self.args['output_dir'], "netstat_sockets.csv"))
except:
self.args['logger'].error("%s command failed" % ' '.join(netstat))
def get_logon(self):
# recupere tous les utilisateurs qui se sont connectes a la machine ainsi que ceux qui le sont encore
try:
res = utils.exec_cmd(last)
self.args['logger'].info(' '.join(last))
r_bis = []
for r in res:
temp = []
matchObj = re.match(pattern_last_output, r)
if matchObj:
temp.append(matchObj.group(1))
temp.append(matchObj.group(2))
temp.append(matchObj.group(3).strip())
temp.append(matchObj.group(4))
if temp is not []:
r_bis.append(temp)
map = utils.list_to_map(header_last_output, r_bis)
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "logon.csv"))
utils.write_to_csv(map, header_last_output, os.path.join(self.args['output_dir'], "logon.csv"))
except:
self.args['logger'].error("%s command failed" % ' '.join(last))
def get_handle(self):
try:
res = utils.exec_cmd_file(lsof)
self.args['logger'].info(' '.join(lsof))
utils.write_to_file(os.path.join(self.args["output_dir"], "handle.txt"), res)
self.args['logger'].info('Write in text file %s ' % os.path.join(self.args['output_dir'], "handles.txt"))
except:
self.args['logger'].error("%s command failed" % ' '.join(lsof))
def get_modules(self):
try:
self.args['logger'].info(' '.join(lsmod))
res = utils.exec_cmd(lsmod)
res_without_spaces = self._delete_spaces(res)
# Add '-' to module entries that are used by 0 modules
# so that every entry has same number of fields
for entry in res_without_spaces:
if len(entry) == 3:
entry.append('-')
map = utils.list_to_map(header_lsmod, res_without_spaces)
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "modules.csv"))
utils.write_to_csv(map, header_lsmod, os.path.join(self.args['output_dir'], "modules.csv"))
except:
self.args['logger'].error("%s command failed" % ' '.join(lsmod))
def get_additionnal_info(self):
self._get_kernel_version()
self._get_hostname()
self._get_network_card()
self._get_os_infos()
self._get_user()
self._get_distinfo()
f = open(self._info_path, "w")
for key, value in sorted(self._additional_info.items()):
f.write(key + ' : ' + value + '\n')
f.close()
def zzz_get_file_integrity(self):
if self._package_mgr == 'rpm':
comm = ['rpm', '-Va']
out_file_name = 'file_integrity_rpm'
else:
comm = ['dpkg', '-V']
out_file_name = 'file_integrity_dpkg'
self.args['logger'].info(' '.join(comm))
res = utils.exec_cmd(comm, True)
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], out_file_name))
utils.write_to_file(os.path.join(self.args['output_dir'], out_file_name), res)
try:
pass
except:
self.args['logger'].error("%s command failed" % ' '.join(lsmod))
class Dump(object):
def __init__(self, args):
self.args = args
self._homes = self._get_home()
def _get_home(self):
homes = []
if os.path.isfile(etc_passwd):
f = open(etc_passwd, 'r')
for line in f:
line = line.split(':')
if line[6] not in ['/bin/false', '/usr/sbin/nologin', '/bin/sync'] and \
(line[5].startswith('/root') or line[5].startswith('/home')):
homes.append(line[5])
f.close()
return list(set(homes))
def get_temp_dirtree(self):
file_to_zip = []
file_to_zip.extend(utils.walk_dir('/tmp'))
file_to_zip = list(set(file_to_zip))
self.args['logger'].info('Log tmp tree to tmp_dirtree.txt')
utils.write_to_file(os.path.join(self.args['output_dir'], 'tmp_dirtree.txt'), '\n'.join(file_to_zip))
def get_varlog_dirtree(self):
file_to_zip = []
file_to_zip.extend(utils.walk_dir('/var/log'))
file_to_zip = list(set(file_to_zip))
self.args['logger'].info('Log tmp tree to varlog_dirtree.txt')
utils.write_to_file(os.path.join(self.args['output_dir'], 'varlog_dirtree.txt'), '\n'.join(file_to_zip))
def autorun(self):
file_to_zip = []
#dir_collect = glob.glob(etc_folder_d)
#cron_dir = glob.glob(etc_cron_rep)
#cron_dir.append(var_cron)
#systemd_usr_dir = glob.glob(systemd_usr)
#systemd_etc_dir = glob.glob(systemd_etc)
self.args['logger'].info('Collect %s ' % etc_folder_d)
#for d in dir_collect:
# for dirName, subdirList, fileList in os.walk(d):
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList])
file_to_zip.extend(utils.get_globs(etc_folder_d))
self.args['logger'].info('Collect %s ' % etc_cron)
#for d in cron_dir:
# for dirName, subdirList, fileList in os.walk(d):
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList])
file_to_zip.extend(utils.get_globs(etc_cron))
self.args['logger'].info('Collect %s ' % var_cron)
file_to_zip.extend(utils.walk_dir(var_cron))
self.args['logger'].info('Collect %s ' % etc_other)
for d in etc_other:
if os.path.isdir(d):
file_to_zip.extend(utils.walk_dir(d))
else:
file_to_zip.append(d)
self.args['logger'].info('Collect %s ' % systemd_usr)
file_to_zip.extend(utils.walk_dir(systemd_usr))
#for d in systemd_usr_dir:
# for dirName, subdirList, fileList in os.walk(d):
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList])
self.args['logger'].info('Collect %s ' % systemd_etc)
file_to_zip.extend(utils.walk_dir(systemd_etc))
#for d in systemd_etc_dir:
# for dirName, subdirList, fileList in os.walk(d):
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList])
self.args['logger'].info('Zip file autorun.zip')
utils.zip_file(list(set(file_to_zip)), 'autorun.zip', self.args['output_dir'], self.args['logger'])
def collect_users(self):
list_to_zip = []
self.args['logger'].info('Collect users')
if os.path.isfile(etc_passwd):
list_to_zip.append(etc_passwd)
if os.path.isfile(etc_shadow):
list_to_zip.append(etc_shadow)
if os.path.isfile(etc_bashrc):
list_to_zip.append(etc_bashrc)
if os.path.isfile(etc_profile):
list_to_zip.append(etc_profile)
for home in self._homes:
if os.path.exists(home):
# collect hidden file
for fpath in utils.get_globs_nowalking(os.path.join(start_fs, os.path.join(home, '.*'))):
if os.path.isfile(fpath):
list_to_zip.append(fpath)
# collect .ssh directory
list_to_zip.extend(utils.walk_dir(os.path.join(start_fs, os.path.join(home, '.ssh'))))
# collect home dirtree
file_to_zip = []
file_to_zip.extend(utils.walk_dir(home))
file_to_zip = list(set(file_to_zip))
dirtree_filename = os.path.join(self.args['output_dir'], '{0}_dirtree.txt'.format(home.replace('/', '_').strip('_')))
self.args['logger'].info('Log home dirtree to {0}'.format(dirtree_filename))
utils.write_to_file(dirtree_filename, '\n'.join(file_to_zip))
utils.zip_file(list_to_zip, 'users_home.zip', self.args['output_dir'], self.args['logger'])
def _collect_ssh_profile(self):
self.args['logger'].info('Collect Know Hosts')
list_knows_host = []
for home in self._homes:
if os.path.exists(home):
list_knows_host.extend(glob.glob(os.path.join(start_fs, os.path.join(home, '.ssh/known_hosts'))))
if len(list_knows_host) > 0:
utils.zip_file(list_knows_host, 'know_hosts.zip', self.args['output_dir'], self.args['logger'])
def _collect_log_bak(self):
files_list_to_zip = {}
self.args['logger'].info('Zip of /var/log')
for dirName, subdirList, fileList in os.walk(os.path.join(start_fs, '/var/log')):
for fname in fileList:
absolut_path = os.path.join(dirName, fname)
size = os.stat(absolut_path).st_size
if size < size_max_log:
files_list_to_zip[os.path.join(dirName, fname)] = size
files_list_to_zip_sorted = sorted(files_list_to_zip.items(), key=lambda x: x[1])
utils.zip_file(dict(files_list_to_zip).keys(), 'var_log.zip', self.args['output_dir'], self.args['logger'])
self.args['logger'].info('Zip of /var log is finished')
pass
def collect_log(self):
files_list_to_zip = []
self.args['logger'].info('Collect logs from /var/log')
for log in var_log_to_collect:
files_list_to_zip.extend(utils.get_globs(log))
# logs = glob.glob(log)
# for log in logs:
# files_list_to_zip.append(log)
utils.zip_file(files_list_to_zip, 'var_log.zip', self.args['output_dir'], self.args['logger'])
self.args['logger'].info('Clooect logs from /var/log is finished')
def dump_dir(self):
# recupere tous les dossiers que l'on aura mis en arguments
pass
def _active_part(self, block, disk):
for line in block.split("\n"):
if disk in line and "*" in line:
return disk
def _list_disks(self, res):
disks = []
for blocks in res:
matchob = re.match("\n?Dis[a-z]{1,3}\s([^:]+)", blocks)
if matchob:
disks.append(matchob.group(1).replace('\xc2\xa0', ''))
return disks
def _get_mbr(self, disks):
for d in disks:
disk_name = d.replace("/", "_")
f = open(d, "rb")
output = open(os.path.join(self.args['output_dir'], "mbr" + disk_name), "wb")
output.write(f.read(512))
output.close()
f.close()
def dump_mbr(self):
if utils.os_type() == "mac":
pass
else:
self.args['logger'].info('Collect active MBR')
r = utils.exec_cmd(fdisk, True)
res = re.split("\\n\\s*\\n", r)
disks = self._list_disks(res)
self.args['logger'].debug('Disks name : %s' % str(disks))
has_active_part = []
for blocks in res:
if disks:
for d in disks:
m = self._active_part(blocks, d)
if m:
has_active_part.append(m)
if has_active_part:
self._get_mbr(has_active_part)
class Factory(object):
def __init__(self, args):
self.args = args
self.profiles = \
{'fast': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump]},
'all': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump, FileSystem]},
'advanced': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump, FileSystem]},
'dump': {'module': 'SglabIR_Collector', 'class': [Dump]}
}
pass
def execute(self):
for p in self.args['profiles']:
if p in self.profiles:
for cl in self.profiles[p]['class']:
c = cl(self.args)
for attr in dir(c):
if attr != 'args' and not attr.startswith('_'):
getattr(c, attr)()
pass
def banner():
print(r"""
___ _ _ ___ ___
/ __| __ _| |__ _| |__|_ _| _ \
\__ \/ _` | / _` | '_ \| || /
|___/\__, |_\__,_|_.__/___|_|_\
|___/
""")
def set_logger(args):
# Stream logger class for printing only INFO level messages
class InfoStreamHandler(logging.StreamHandler):
def __init__(self, stream):
logging.StreamHandler.__init__(self, stream)
# initiating the logger and the string format
logger = logging.getLogger("SglabIR")
logger.setLevel(level_debug)
if 'level_debug' in args:
logger.setLevel(args['level_debug'])
log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# initiating the filehandler
fh = logging.FileHandler(os.path.join(args["output_dir"], "SglabIR.log"), encoding="UTF-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(log_format)
logger.addHandler(fh)
# initiatinig the stream handler
fs = InfoStreamHandler(sys.stdout)
fs.setFormatter(log_format)
if 'level_debug' in args:
fs.setLevel(args['level_debug'])
logger.addHandler(fs)
args["logger"] = logger
def parse_command_line():
"""Parse command line arguments and return them in a way that python can use directly"""
args = {}
try:
import argparse
parser = argparse.ArgumentParser(description="SglabIR")
parser.add_argument("--profiles", dest="profiles",
help=(
"List of profiles: fast,dump,all"
"\n use: --profiles fast or --profiles dump --profiles all"))
parser.add_argument("--output_dir", dest="output_dir", help="Directory to extract data")
parser.add_argument("--dir_zip", dest='dir_zip', help='directory to store zip')
parser.add_argument("--debug", dest="debug", default=False, action='store_true', help="debug level")
arguments = parser.parse_args()
if not arguments.output_dir:
print('No output directory specified. Using "output" as default')
arguments.output_dir = 'output'
if not arguments.profiles:
print('No profile specified. Using "fast" as default')
arguments.profiles = 'fast'
args['output_dir'] = arguments.output_dir
args['dir_zip'] = arguments.dir_zip
if not arguments.dir_zip:
args['dir_zip'] = args['output_dir']
args['profiles'] = arguments.profiles.split(';')
if arguments.debug:
args['level_debug'] = logging.DEBUG
except Exception, e:
args['output_dir'] = 'output'
args['dir_zip'] = 'output'
args['profiles'] = ['fast']
return args
def create_output_dir(args):
hostname = socket.gethostname()
date_collect = datetime.now().strftime("%Y-%m-%d_%H%M%S")
path = os.path.join(args['output_dir'], hostname, date_collect)
try:
os.makedirs(path)
except Exception, e:
print("I/O error({0}): {1}".format(e.errno, e.strerror))
sys.exit(-1)
except ValueError:
print("Could not convert data")
sys.exit(-1)
except:
print("Unexpected error:", sys.exc_info()[0])
return path
def set_zip_evidences(args):
path_output_dir = args['output_dir']
items = path_output_dir.split(os.path.sep)[::-1]
name_zip_file = items[0] + '_' + items[1] + '.zip'
zip_path = os.path.join(args['dir_zip'], name_zip_file)
args['logger'].info('Create zip File %s ' % name_zip_file)
my_zip = ZipFile(zip_path, 'w', allowZip64=True)
for dirName, subdirList, fileList in os.walk(path_output_dir, topdown=False):
for fname in fileList:
path = os.path.join(dirName, fname)
new_path = os.path.join(dirName, fname).replace('/.', '/_')
my_zip.write(path, new_path)
my_zip.close()
shutil.rmtree(os.path.dirname(path_output_dir))
args['logger'].info('Delete folder %s' % path_output_dir)
def main():
if os.geteuid() != 0:
print('This program should be run as root.')
sys.exit(-1)
args = parse_command_line()
args['output_dir'] = create_output_dir(args)
set_logger(args)
f = Factory(args)
f.execute()
set_zip_evidences(args)
pass
if __name__ == '__main__':
banner()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment