Created
July 23, 2019 10:59
Star
You must be signed in to star a gist
[collect_linux_info]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
from zipfile import ZipFile | |
import logging | |
from datetime import datetime | |
import os | |
import shutil | |
import subprocess | |
import glob | |
import pwd | |
import re | |
import csv | |
import grp | |
import time | |
try: | |
from hashlib import md5 as m_md5 | |
except ImportError: | |
from md5 import md5 as m_md5 | |
size_max_log = 10 * 1024 * 1024 | |
start_fs = '/' | |
skipped_dir = [] | |
if len(skipped_dir): | |
skipped_dir = [os.path.join(start_fs, d) for d in skipped_dir] | |
mime_filter = [] | |
level_debug = logging.INFO | |
# path file to collects | |
etc_passwd = os.path.join(start_fs, '/etc/passwd') | |
etc_shadow = os.path.join(start_fs, '/etc/shadow') | |
etc_bashrc = os.path.join(start_fs, '/etc/bash.bashrc') | |
etc_profile = os.path.join(start_fs, '/etc/profile') | |
etc_cron = os.path.join(start_fs, '/etc/cron.*') | |
var_cron = os.path.join(start_fs, '/var/spool/cron/') | |
etc_folder_d = os.path.join(start_fs, '/etc/*.d') | |
etc_other = [os.path.join(start_fs, '/etc/ld.so.preload')] | |
etc_other.append(os.path.join(start_fs, '/etc/rc.local')) | |
etc_other.append(os.path.join(start_fs, '/etc/init/')) | |
etc_other.append(os.path.join(start_fs, '/etc/rsyslog.conf')) | |
etc_other.append(os.path.join(start_fs, '/etc/audit/')) | |
systemd_usr = os.path.join(start_fs, '/usr/lib/systemd/') | |
systemd_etc = os.path.join(start_fs, '/etc/systemd/') | |
var_log_to_collect = [os.path.join(start_fs, '/var/log/message*')] | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/secure*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/maillog*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/cron*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/spooler*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/boot.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/auth.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/syslog*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/kern.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/mail.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/mail.err*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/ufw.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/audit/audit.log*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/wtmp*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/log/btmp*')) | |
var_log_to_collect.append(os.path.join(start_fs, '/var/run/utmp*')) | |
# command to launch | |
netstat = ['netstat', '-anpetul'] | |
ss = ['ss', '-tp'] | |
ps = ['ps', '-ewo', '%p,%P,%x,%t,%u,%c,%a'] | |
last = ['last', '-Faixw'] | |
lsof = ['lsof', '-R'] | |
du = ['du', '-sh'] | |
fdisk = ['fdisk', '-l'] | |
hostname = ['hostname'] | |
uname = ['uname', '-r'] | |
ifconfig = ['ifconfig', '-a'] | |
os_version = ['cat', '/proc/version'] | |
whoami = ['who', 'am', 'i'] | |
uname_os_name = ['uname'] | |
lsmod = ['lsmod'] | |
arp = ['arp', '-a'] | |
route = ['ip', 'route'] | |
top = ['top', '-b', '-n', '1', '-d', '1', '-c'] | |
# output | |
pattern_last_output = "([^\s]+)\s+(\([^\)]*\)|system \S+|\S+)(.+[^\d])(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" | |
# header de fichiers output | |
header_ss = ['State', 'Recv-Q', 'Send-Q', 'Local Address:Port', 'Peer Address:Port', 'Users'] | |
header_last_output = ['User', 'Way of connection', 'Date', 'Remote host'] | |
header_netstat = ['Proto', 'Recv-Q', 'Send-Q', 'Local Address', 'Remote Address', 'State', 'User', 'Inode', | |
'PID/Program name'] | |
header_fs = ['path', 'mime', 'filesize', 'owner', 'group', 'atime', 'mtime', 'ctime', 'inode'] | |
header_lsmod = ['Module', 'Size', 'Used_by_Count', 'Used_by_Modules'] | |
class utils(object): | |
def __init__(self, args): | |
self.args = args | |
def walk(self, path): | |
# parcours de tous les fichiers et dossiers a partir de path | |
# ajout d'un filtre pour exclure ce qui n'est pas sur le meme device ? (par exemple dans les args mettre all ou juste device) | |
pass | |
@staticmethod | |
def exec_cmd(cmd, raw_res=False): | |
cmd_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if not raw_res: | |
res = [] | |
for p in cmd_process.stdout: | |
res.append(p.replace("\n", "")) | |
return res | |
else: | |
return cmd_process.stdout.read() | |
@staticmethod | |
def exec_cmd_file(cmd): | |
cmd_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return cmd_process.stdout | |
@staticmethod | |
def open_csv(header, fname): | |
writer = csv.DictWriter(open(fname, 'w'), header) | |
if hasattr(writer, 'writeheader'): | |
writer.writeheader() | |
return writer | |
@staticmethod | |
def writerow(map, writer): | |
writer.writerow(map) | |
@staticmethod | |
def write_to_csv(map, header, fname): | |
f = open(fname, "w") | |
writer = csv.DictWriter(f, fieldnames=header) | |
if hasattr(writer, 'writeheader'): | |
writer.writeheader() | |
count = 0 | |
for m in map: | |
if count == 0 or m == {}: | |
pass | |
else: | |
writer.writerow(m) | |
count += 1 | |
@staticmethod | |
def write_to_file(f, data): | |
f = open(f, "w") | |
for elem in data: | |
f.write(elem) | |
f.close() | |
@staticmethod | |
def list_to_map(keys, data, delimiter=None): | |
res = [] | |
for d in data: | |
map = {} | |
if delimiter: | |
line = d.split(delimiter) | |
else: | |
line = d | |
for j in range(len(line)): | |
if len(keys) == len(line): | |
map[keys[j]] = line[j] | |
res.append(map) | |
return res | |
def du(path): | |
"""disk usage in human readable format (e.g. '2,1GB')""" | |
du_cmd = du.append(path) | |
return subprocess.check_output(du_cmd).split()[0].decode('utf-8') | |
@staticmethod | |
def zip_file(list_to_zip, zip_filename, output_dir, logger): | |
my_zip = ZipFile(os.path.join(output_dir, zip_filename), 'w', allowZip64=True) | |
for path in list_to_zip: | |
new_path = path.replace('/.', '/_') | |
try: | |
my_zip.write(path, new_path) | |
except Exception, e: | |
logger.warning(e.strerror + ' ' + path) | |
my_zip.close() | |
@staticmethod | |
def convert_timestamp(timestamp): | |
return datetime.fromtimestamp( | |
int(timestamp) | |
).strftime('%Y-%m-%d %H:%M:%S') | |
@staticmethod | |
def os_type(): | |
try: | |
return utils.exec_cmd(uname_os_name, True) | |
except: | |
return "mac" | |
@staticmethod | |
def get_globs(path): | |
output = [] | |
for entry in glob.glob(path): | |
if os.path.isdir(entry): | |
output.extend(utils.walk_dir(entry)) | |
else: | |
output.append(entry) | |
return output | |
@staticmethod | |
def get_globs_nowalking(path): | |
return glob.glob(path) | |
@staticmethod | |
def walk_dir(path): | |
output = [] | |
list_dirs = os.walk(path) | |
for root, dirs, files in list_dirs: | |
for f in files: | |
output.append(os.path.join(root, f)) | |
return output | |
class FileSystem(object): | |
def __init__(self, args): | |
self.args = args | |
def _check_dir(self, f): | |
check_dir = [f.startswith(d) for d in skipped_dir] | |
if len(check_dir) == 0: | |
return True | |
return not any(check_dir) | |
def get_infos_fs(self): | |
writer = None | |
self.args['logger'].info('Start make time line FS') | |
for dirName, subdirList, fileList in os.walk(start_fs): | |
for f in fileList: | |
path = os.path.join(dirName, f) | |
if self._check_dir(path): | |
record = self._get_file_metada(path) | |
if not record: | |
continue | |
record['path'] = path | |
cmd = ['file', path] | |
res = utils.exec_cmd(cmd) | |
mime = res[0].split(':')[1].lstrip() | |
record['mime'] = mime | |
if not writer: | |
writer = utils.open_csv(header_fs, | |
os.path.join(self.args['output_dir'], 'fs.csv')) | |
if hasattr(writer, 'writeheader'): | |
writer.writeheader() | |
writer.writerow(record) | |
self.args['logger'].info('Timeline FS done') | |
def _get_file_metada(self, fname): | |
try: | |
stats = os.lstat(fname) | |
except: | |
stats = None | |
return stats | |
try: | |
pwd_struct = pwd.getpwuid(stats.st_uid) | |
except: | |
pwd_struct = None | |
try: | |
grp_struct = grp.getgrgid(stats.st_gid) | |
except: | |
grp_struct = None | |
meta_data = { | |
'filesize': stats.st_size, | |
'mtime': utils.convert_timestamp(stats.st_mtime), | |
'atime': utils.convert_timestamp(stats.st_atime), | |
'ctime': utils.convert_timestamp(stats.st_ctime), | |
'owner': (stats.st_uid, pwd_struct), | |
'group': (stats.st_gid, grp_struct), | |
'inode': stats.st_ino, | |
} | |
if hasattr(stats, 'st_birthtime'): | |
meta_data['crtime'] = utils.convert_timestamp(stats.st_birthtime) | |
return meta_data | |
class LiveInformations(object): | |
def __init__(self, args=None): | |
self.args = args | |
self._info_path = os.path.join(self.args["output_dir"], "additionnal_informations.txt") | |
self._additional_info = {} | |
self._package_mgr = None | |
def get_processes_new(self): | |
def _readline(path): | |
f = open(path) | |
line = f.readline().strip() | |
f.close() | |
return line | |
def _tstostr(ts): | |
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) | |
def _md5(fname): | |
hash_md5 = m_md5() | |
f = open(fname, "rb") | |
for chunk in iter(lambda: f.read(4096), ""): | |
hash_md5.update(chunk) | |
f.close() | |
return hash_md5.hexdigest() | |
self.args['logger'].info('Get porcess list') | |
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] | |
header = ['starttime', 'user', 'pid', 'ppid', 'path', 'md5', 'cmdline'] | |
map = [] | |
for pid in pids: | |
try: | |
proc = '/proc/{0}/'.format(pid) | |
pstat = os.stat(proc) | |
uid = pstat.st_uid | |
ctime = pstat.st_ctime | |
comm = _readline(proc + 'comm') | |
cmdline = _readline(proc + 'cmdline').strip('\x00').replace('\x00', ' ') | |
stat = _readline(proc + 'stat') | |
ppid = stat.split(' ')[3] | |
cwd = os.readlink(proc + 'cwd') | |
try: | |
user = pwd.getpwuid(uid)[0] | |
except: | |
user = str(uid) | |
try: | |
exe = os.readlink(proc + 'exe') | |
except: | |
exe = None | |
if exe == None: | |
# comm = '[' + comm + ']' | |
path = '[' + comm + ']' | |
else: | |
if os.path.exists(cwd + exe): | |
path = os.path.realpath(cwd + exe) | |
else: | |
path = exe | |
if os.path.exists(path): | |
md5 = _md5(path) | |
else: | |
md5 = "" | |
map.append({'starttime': _tstostr(ctime), 'user': user, 'pid': pid, 'ppid': ppid, 'path': path, | |
'md5': md5, 'cmdline': cmdline}) | |
except Exception, e: | |
self.args['logger'].warning(e) | |
self.args['logger'].warning("Get process {0} info failed".format(pid)) | |
# print ",".join((_tstostr(ctime), user, pid, ppid, path, md5, cmdline)) | |
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "process.csv")) | |
utils.write_to_csv(map, header, os.path.join(self.args['output_dir'], "process.csv")) | |
def _get_header(self, data, delimiter, offset=0): | |
fields = data[offset].split(delimiter) | |
header = [] | |
for f in fields: | |
f = f.replace(" ", "") | |
if f != "": | |
header.append(f) | |
return header | |
def _delete_spaces(self, data): | |
res_without_spaces = [] | |
for r in data: | |
temp = [] | |
for value in r.split(' '): | |
if value is not "": | |
temp.append(value) | |
res_without_spaces.append(temp) | |
return res_without_spaces | |
def _get_kernel_version(self): | |
try: | |
self.args['logger'].info(' '.join(uname)) | |
self._additional_info['kernel'] = utils.exec_cmd(uname, True).rstrip() | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(uname)) | |
def _get_os_infos(self): | |
try: | |
self.args['logger'].info(' '.join(os_version)) | |
self._additional_info["os_informations"] = utils.exec_cmd(os_version, True).rstrip() | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(os_version)) | |
def _get_user(self): | |
# get user who is logged on | |
try: | |
self.args['logger'].info(' '.join(whoami)) | |
self._additional_info["user"] = utils.exec_cmd(whoami, True) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(whoami)) | |
def _get_network_card(self): | |
try: | |
self.args['logger'].info(' '.join(ifconfig)) | |
res = utils.exec_cmd(ifconfig, True).rstrip() | |
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "ifconfig_all")) | |
utils.write_to_file(os.path.join(self.args['output_dir'], "ifconfig_all"), res) | |
cards = res.split("\n\n") | |
count = 1 | |
for c in cards: | |
li = c.split('\n') | |
self._additional_info["network_card_" + str(count)] = li[0] + "\n\t" + li[1] + "\n\t" + li[2] | |
count += 1 | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(ifconfig)) | |
def _get_hostname(self): | |
try: | |
self.args['logger'].info(' '.join(hostname)) | |
self._additional_info['hostname'] = utils.exec_cmd(hostname, True).rstrip() | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(hostname)) | |
def _get_distinfo(self): | |
if os.path.exists('/etc/redhat-release'): | |
self._package_mgr = 'rpm' | |
f = open('/etc/redhat-release') | |
self._additional_info['dist-version'] = f.readline().strip() | |
f.close() | |
elif os.path.exists('/etc/issue'): | |
self._package_mgr = 'dpkg' | |
f = open('/etc/issue') | |
self._additional_info['dist-version'] = f.readline().strip() | |
f.close() | |
else: | |
pass | |
def get_arp_table(self): | |
try: | |
self.args['logger'].info(' '.join(arp)) | |
res = utils.exec_cmd(arp) | |
header = ['hostname', 'ip', 'mac', 'interface'] | |
map = [] | |
for line in res: | |
line = line.split(" ") | |
map.append({'hostname': line[0], 'ip': line[1][1:-1], 'mac': line[3], 'interface': line[6]}) | |
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "arp_table.csv")) | |
utils.write_to_csv(map, header, os.path.join(self.args['output_dir'], "arp_table.csv")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(arp)) | |
def get_route_table(self): | |
try: | |
self.args['logger'].info(' '.join(route)) | |
res = utils.exec_cmd(route, True) | |
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "route_table")) | |
utils.write_to_file(os.path.join(self.args['output_dir'], "route_table"), res) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(route)) | |
def get_top(self): | |
try: | |
self.args['logger'].info(' '.join(top)) | |
res = utils.exec_cmd(top, True) | |
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], "top")) | |
utils.write_to_file(os.path.join(self.args['output_dir'], "top"), res) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(top)) | |
def get_network_connections(self): | |
# fais un ss -tp | |
try: | |
self.args['logger'].info(' '.join(ss)) | |
res = utils.exec_cmd(ss) | |
res_without_spaces = self._delete_spaces(res) | |
map = utils.list_to_map(header_ss, res_without_spaces) | |
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "ss_sockets.csv")) | |
utils.write_to_csv(map, header_ss, os.path.join(self.args['output_dir'], "ss_sockets.csv")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(ss)) | |
try: | |
self.args['logger'].info(' '.join(netstat)) | |
res = utils.exec_cmd(netstat) | |
res_without_spaces = self._delete_spaces(res) | |
del res_without_spaces[0] | |
map = utils.list_to_map(header_netstat, res_without_spaces) | |
self.args['logger'].info( | |
'Write in csv file %s ' % os.path.join(self.args['output_dir'], "netstat_sockets.csv")) | |
utils.write_to_csv(map, header_netstat, os.path.join(self.args['output_dir'], "netstat_sockets.csv")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(netstat)) | |
def get_logon(self): | |
# recupere tous les utilisateurs qui se sont connectes a la machine ainsi que ceux qui le sont encore | |
try: | |
res = utils.exec_cmd(last) | |
self.args['logger'].info(' '.join(last)) | |
r_bis = [] | |
for r in res: | |
temp = [] | |
matchObj = re.match(pattern_last_output, r) | |
if matchObj: | |
temp.append(matchObj.group(1)) | |
temp.append(matchObj.group(2)) | |
temp.append(matchObj.group(3).strip()) | |
temp.append(matchObj.group(4)) | |
if temp is not []: | |
r_bis.append(temp) | |
map = utils.list_to_map(header_last_output, r_bis) | |
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "logon.csv")) | |
utils.write_to_csv(map, header_last_output, os.path.join(self.args['output_dir'], "logon.csv")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(last)) | |
def get_handle(self): | |
try: | |
res = utils.exec_cmd_file(lsof) | |
self.args['logger'].info(' '.join(lsof)) | |
utils.write_to_file(os.path.join(self.args["output_dir"], "handle.txt"), res) | |
self.args['logger'].info('Write in text file %s ' % os.path.join(self.args['output_dir'], "handles.txt")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(lsof)) | |
def get_modules(self): | |
try: | |
self.args['logger'].info(' '.join(lsmod)) | |
res = utils.exec_cmd(lsmod) | |
res_without_spaces = self._delete_spaces(res) | |
# Add '-' to module entries that are used by 0 modules | |
# so that every entry has same number of fields | |
for entry in res_without_spaces: | |
if len(entry) == 3: | |
entry.append('-') | |
map = utils.list_to_map(header_lsmod, res_without_spaces) | |
self.args['logger'].info('Write in csv file %s ' % os.path.join(self.args['output_dir'], "modules.csv")) | |
utils.write_to_csv(map, header_lsmod, os.path.join(self.args['output_dir'], "modules.csv")) | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(lsmod)) | |
def get_additionnal_info(self): | |
self._get_kernel_version() | |
self._get_hostname() | |
self._get_network_card() | |
self._get_os_infos() | |
self._get_user() | |
self._get_distinfo() | |
f = open(self._info_path, "w") | |
for key, value in sorted(self._additional_info.items()): | |
f.write(key + ' : ' + value + '\n') | |
f.close() | |
def zzz_get_file_integrity(self): | |
if self._package_mgr == 'rpm': | |
comm = ['rpm', '-Va'] | |
out_file_name = 'file_integrity_rpm' | |
else: | |
comm = ['dpkg', '-V'] | |
out_file_name = 'file_integrity_dpkg' | |
self.args['logger'].info(' '.join(comm)) | |
res = utils.exec_cmd(comm, True) | |
self.args['logger'].info('Write in txt file %s ' % os.path.join(self.args['output_dir'], out_file_name)) | |
utils.write_to_file(os.path.join(self.args['output_dir'], out_file_name), res) | |
try: | |
pass | |
except: | |
self.args['logger'].error("%s command failed" % ' '.join(lsmod)) | |
class Dump(object): | |
def __init__(self, args): | |
self.args = args | |
self._homes = self._get_home() | |
def _get_home(self): | |
homes = [] | |
if os.path.isfile(etc_passwd): | |
f = open(etc_passwd, 'r') | |
for line in f: | |
line = line.split(':') | |
if line[6] not in ['/bin/false', '/usr/sbin/nologin', '/bin/sync'] and \ | |
(line[5].startswith('/root') or line[5].startswith('/home')): | |
homes.append(line[5]) | |
f.close() | |
return list(set(homes)) | |
def get_temp_dirtree(self): | |
file_to_zip = [] | |
file_to_zip.extend(utils.walk_dir('/tmp')) | |
file_to_zip = list(set(file_to_zip)) | |
self.args['logger'].info('Log tmp tree to tmp_dirtree.txt') | |
utils.write_to_file(os.path.join(self.args['output_dir'], 'tmp_dirtree.txt'), '\n'.join(file_to_zip)) | |
def get_varlog_dirtree(self): | |
file_to_zip = [] | |
file_to_zip.extend(utils.walk_dir('/var/log')) | |
file_to_zip = list(set(file_to_zip)) | |
self.args['logger'].info('Log tmp tree to varlog_dirtree.txt') | |
utils.write_to_file(os.path.join(self.args['output_dir'], 'varlog_dirtree.txt'), '\n'.join(file_to_zip)) | |
def autorun(self): | |
file_to_zip = [] | |
#dir_collect = glob.glob(etc_folder_d) | |
#cron_dir = glob.glob(etc_cron_rep) | |
#cron_dir.append(var_cron) | |
#systemd_usr_dir = glob.glob(systemd_usr) | |
#systemd_etc_dir = glob.glob(systemd_etc) | |
self.args['logger'].info('Collect %s ' % etc_folder_d) | |
#for d in dir_collect: | |
# for dirName, subdirList, fileList in os.walk(d): | |
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList]) | |
file_to_zip.extend(utils.get_globs(etc_folder_d)) | |
self.args['logger'].info('Collect %s ' % etc_cron) | |
#for d in cron_dir: | |
# for dirName, subdirList, fileList in os.walk(d): | |
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList]) | |
file_to_zip.extend(utils.get_globs(etc_cron)) | |
self.args['logger'].info('Collect %s ' % var_cron) | |
file_to_zip.extend(utils.walk_dir(var_cron)) | |
self.args['logger'].info('Collect %s ' % etc_other) | |
for d in etc_other: | |
if os.path.isdir(d): | |
file_to_zip.extend(utils.walk_dir(d)) | |
else: | |
file_to_zip.append(d) | |
self.args['logger'].info('Collect %s ' % systemd_usr) | |
file_to_zip.extend(utils.walk_dir(systemd_usr)) | |
#for d in systemd_usr_dir: | |
# for dirName, subdirList, fileList in os.walk(d): | |
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList]) | |
self.args['logger'].info('Collect %s ' % systemd_etc) | |
file_to_zip.extend(utils.walk_dir(systemd_etc)) | |
#for d in systemd_etc_dir: | |
# for dirName, subdirList, fileList in os.walk(d): | |
# file_to_zip.extend([os.path.join(dirName, f) for f in fileList]) | |
self.args['logger'].info('Zip file autorun.zip') | |
utils.zip_file(list(set(file_to_zip)), 'autorun.zip', self.args['output_dir'], self.args['logger']) | |
def collect_users(self): | |
list_to_zip = [] | |
self.args['logger'].info('Collect users') | |
if os.path.isfile(etc_passwd): | |
list_to_zip.append(etc_passwd) | |
if os.path.isfile(etc_shadow): | |
list_to_zip.append(etc_shadow) | |
if os.path.isfile(etc_bashrc): | |
list_to_zip.append(etc_bashrc) | |
if os.path.isfile(etc_profile): | |
list_to_zip.append(etc_profile) | |
for home in self._homes: | |
if os.path.exists(home): | |
# collect hidden file | |
for fpath in utils.get_globs_nowalking(os.path.join(start_fs, os.path.join(home, '.*'))): | |
if os.path.isfile(fpath): | |
list_to_zip.append(fpath) | |
# collect .ssh directory | |
list_to_zip.extend(utils.walk_dir(os.path.join(start_fs, os.path.join(home, '.ssh')))) | |
# collect home dirtree | |
file_to_zip = [] | |
file_to_zip.extend(utils.walk_dir(home)) | |
file_to_zip = list(set(file_to_zip)) | |
dirtree_filename = os.path.join(self.args['output_dir'], '{0}_dirtree.txt'.format(home.replace('/', '_').strip('_'))) | |
self.args['logger'].info('Log home dirtree to {0}'.format(dirtree_filename)) | |
utils.write_to_file(dirtree_filename, '\n'.join(file_to_zip)) | |
utils.zip_file(list_to_zip, 'users_home.zip', self.args['output_dir'], self.args['logger']) | |
def _collect_ssh_profile(self): | |
self.args['logger'].info('Collect Know Hosts') | |
list_knows_host = [] | |
for home in self._homes: | |
if os.path.exists(home): | |
list_knows_host.extend(glob.glob(os.path.join(start_fs, os.path.join(home, '.ssh/known_hosts')))) | |
if len(list_knows_host) > 0: | |
utils.zip_file(list_knows_host, 'know_hosts.zip', self.args['output_dir'], self.args['logger']) | |
def _collect_log_bak(self): | |
files_list_to_zip = {} | |
self.args['logger'].info('Zip of /var/log') | |
for dirName, subdirList, fileList in os.walk(os.path.join(start_fs, '/var/log')): | |
for fname in fileList: | |
absolut_path = os.path.join(dirName, fname) | |
size = os.stat(absolut_path).st_size | |
if size < size_max_log: | |
files_list_to_zip[os.path.join(dirName, fname)] = size | |
files_list_to_zip_sorted = sorted(files_list_to_zip.items(), key=lambda x: x[1]) | |
utils.zip_file(dict(files_list_to_zip).keys(), 'var_log.zip', self.args['output_dir'], self.args['logger']) | |
self.args['logger'].info('Zip of /var log is finished') | |
pass | |
def collect_log(self): | |
files_list_to_zip = [] | |
self.args['logger'].info('Collect logs from /var/log') | |
for log in var_log_to_collect: | |
files_list_to_zip.extend(utils.get_globs(log)) | |
# logs = glob.glob(log) | |
# for log in logs: | |
# files_list_to_zip.append(log) | |
utils.zip_file(files_list_to_zip, 'var_log.zip', self.args['output_dir'], self.args['logger']) | |
self.args['logger'].info('Clooect logs from /var/log is finished') | |
def dump_dir(self): | |
# recupere tous les dossiers que l'on aura mis en arguments | |
pass | |
def _active_part(self, block, disk): | |
for line in block.split("\n"): | |
if disk in line and "*" in line: | |
return disk | |
def _list_disks(self, res): | |
disks = [] | |
for blocks in res: | |
matchob = re.match("\n?Dis[a-z]{1,3}\s([^:]+)", blocks) | |
if matchob: | |
disks.append(matchob.group(1).replace('\xc2\xa0', '')) | |
return disks | |
def _get_mbr(self, disks): | |
for d in disks: | |
disk_name = d.replace("/", "_") | |
f = open(d, "rb") | |
output = open(os.path.join(self.args['output_dir'], "mbr" + disk_name), "wb") | |
output.write(f.read(512)) | |
output.close() | |
f.close() | |
def dump_mbr(self): | |
if utils.os_type() == "mac": | |
pass | |
else: | |
self.args['logger'].info('Collect active MBR') | |
r = utils.exec_cmd(fdisk, True) | |
res = re.split("\\n\\s*\\n", r) | |
disks = self._list_disks(res) | |
self.args['logger'].debug('Disks name : %s' % str(disks)) | |
has_active_part = [] | |
for blocks in res: | |
if disks: | |
for d in disks: | |
m = self._active_part(blocks, d) | |
if m: | |
has_active_part.append(m) | |
if has_active_part: | |
self._get_mbr(has_active_part) | |
class Factory(object): | |
def __init__(self, args): | |
self.args = args | |
self.profiles = \ | |
{'fast': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump]}, | |
'all': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump, FileSystem]}, | |
'advanced': {'module': 'SglabIR_collector', 'class': [LiveInformations, Dump, FileSystem]}, | |
'dump': {'module': 'SglabIR_Collector', 'class': [Dump]} | |
} | |
pass | |
def execute(self): | |
for p in self.args['profiles']: | |
if p in self.profiles: | |
for cl in self.profiles[p]['class']: | |
c = cl(self.args) | |
for attr in dir(c): | |
if attr != 'args' and not attr.startswith('_'): | |
getattr(c, attr)() | |
pass | |
def banner(): | |
print(r""" | |
___ _ _ ___ ___ | |
/ __| __ _| |__ _| |__|_ _| _ \ | |
\__ \/ _` | / _` | '_ \| || / | |
|___/\__, |_\__,_|_.__/___|_|_\ | |
|___/ | |
""") | |
def set_logger(args): | |
# Stream logger class for printing only INFO level messages | |
class InfoStreamHandler(logging.StreamHandler): | |
def __init__(self, stream): | |
logging.StreamHandler.__init__(self, stream) | |
# initiating the logger and the string format | |
logger = logging.getLogger("SglabIR") | |
logger.setLevel(level_debug) | |
if 'level_debug' in args: | |
logger.setLevel(args['level_debug']) | |
log_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
# initiating the filehandler | |
fh = logging.FileHandler(os.path.join(args["output_dir"], "SglabIR.log"), encoding="UTF-8") | |
fh.setLevel(logging.DEBUG) | |
fh.setFormatter(log_format) | |
logger.addHandler(fh) | |
# initiatinig the stream handler | |
fs = InfoStreamHandler(sys.stdout) | |
fs.setFormatter(log_format) | |
if 'level_debug' in args: | |
fs.setLevel(args['level_debug']) | |
logger.addHandler(fs) | |
args["logger"] = logger | |
def parse_command_line(): | |
"""Parse command line arguments and return them in a way that python can use directly""" | |
args = {} | |
try: | |
import argparse | |
parser = argparse.ArgumentParser(description="SglabIR") | |
parser.add_argument("--profiles", dest="profiles", | |
help=( | |
"List of profiles: fast,dump,all" | |
"\n use: --profiles fast or --profiles dump --profiles all")) | |
parser.add_argument("--output_dir", dest="output_dir", help="Directory to extract data") | |
parser.add_argument("--dir_zip", dest='dir_zip', help='directory to store zip') | |
parser.add_argument("--debug", dest="debug", default=False, action='store_true', help="debug level") | |
arguments = parser.parse_args() | |
if not arguments.output_dir: | |
print('No output directory specified. Using "output" as default') | |
arguments.output_dir = 'output' | |
if not arguments.profiles: | |
print('No profile specified. Using "fast" as default') | |
arguments.profiles = 'fast' | |
args['output_dir'] = arguments.output_dir | |
args['dir_zip'] = arguments.dir_zip | |
if not arguments.dir_zip: | |
args['dir_zip'] = args['output_dir'] | |
args['profiles'] = arguments.profiles.split(';') | |
if arguments.debug: | |
args['level_debug'] = logging.DEBUG | |
except Exception, e: | |
args['output_dir'] = 'output' | |
args['dir_zip'] = 'output' | |
args['profiles'] = ['fast'] | |
return args | |
def create_output_dir(args): | |
hostname = socket.gethostname() | |
date_collect = datetime.now().strftime("%Y-%m-%d_%H%M%S") | |
path = os.path.join(args['output_dir'], hostname, date_collect) | |
try: | |
os.makedirs(path) | |
except Exception, e: | |
print("I/O error({0}): {1}".format(e.errno, e.strerror)) | |
sys.exit(-1) | |
except ValueError: | |
print("Could not convert data") | |
sys.exit(-1) | |
except: | |
print("Unexpected error:", sys.exc_info()[0]) | |
return path | |
def set_zip_evidences(args): | |
path_output_dir = args['output_dir'] | |
items = path_output_dir.split(os.path.sep)[::-1] | |
name_zip_file = items[0] + '_' + items[1] + '.zip' | |
zip_path = os.path.join(args['dir_zip'], name_zip_file) | |
args['logger'].info('Create zip File %s ' % name_zip_file) | |
my_zip = ZipFile(zip_path, 'w', allowZip64=True) | |
for dirName, subdirList, fileList in os.walk(path_output_dir, topdown=False): | |
for fname in fileList: | |
path = os.path.join(dirName, fname) | |
new_path = os.path.join(dirName, fname).replace('/.', '/_') | |
my_zip.write(path, new_path) | |
my_zip.close() | |
shutil.rmtree(os.path.dirname(path_output_dir)) | |
args['logger'].info('Delete folder %s' % path_output_dir) | |
def main(): | |
if os.geteuid() != 0: | |
print('This program should be run as root.') | |
sys.exit(-1) | |
args = parse_command_line() | |
args['output_dir'] = create_output_dir(args) | |
set_logger(args) | |
f = Factory(args) | |
f.execute() | |
set_zip_evidences(args) | |
pass | |
if __name__ == '__main__': | |
banner() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment