Skip to content

Instantly share code, notes, and snippets.

@elzekool
Last active April 22, 2019 09:28
Show Gist options
  • Save elzekool/3d26e7ff2a09c7d2925153bf268a089b to your computer and use it in GitHub Desktop.
Save elzekool/3d26e7ff2a09c7d2925153bf268a089b to your computer and use it in GitHub Desktop.
import sqlite3
import os
import clamd
import shutil
import time
from subprocess import check_output
# Set to the path that should be scanned (absolute path, ie: /home/luna/docker)
pathToScan = '/home/elzekool/Downloads'
# Set to the path of the quarantine dir
quarantineDir = '/var/infected'
# Set path to store scan cache
scanCachePath = '/var/daily-scan-file-cache.db'
# Location where list of files that could not be scanned are placed at
unscanableFileList = '/var/daily-scan-needs-human-intervention.txt'
# Time after which files that are not seen anymore are evicted from cache
scanCacheEvictionInterval = 14*60*60*24
if os.access(pathToScan, os.R_OK) == False:
print("Error: Scan path %s is not readable" % (pathToScan))
exit(1)
if os.access(quarantineDir, os.W_OK) == False:
print("Error: Quarantine directory %s is not writable" % (quarantineDir))
exit(1)
print("Opening database")
conn = sqlite3.connect(scanCachePath)
conn.row_factory = sqlite3.Row
def create_database(db):
c = db.cursor()
c.execute('CREATE TABLE IF NOT EXISTS files (hash text, scanned_hash text, first_seen integer, last_seen integer, path text)')
c.execute('CREATE UNIQUE INDEX IF NOT EXISTS unq_files_path ON files (path)')
db.commit()
def close_database(db):
print("Closing database")
db.close()
def update_file_list(db):
print("Update file info list")
c = db.cursor()
runAtTime = int(time.time())
for r, d, f in os.walk(pathToScan):
for file in f:
filePath = os.path.join(r, file)
if os.access(filePath, os.R_OK) == False or os.path.isfile(filePath) == False:
print("Skipping " + filePath + " File not readable or is a special file")
continue
fileHash = "%s_%s" % (check_output([ "cksum", filePath ]).split()[0], str(os.path.getsize(filePath)))
c.execute(
'INSERT INTO files (hash, scanned_hash, first_seen, last_seen, path) VALUES (?, "", ?, ?, ?) ON CONFLICT(path) DO UPDATE SET hash = ?, last_seen = ?;',
( fileHash, runAtTime, runAtTime, filePath, fileHash, runAtTime )
)
#print('Upserted file: %s' % (filePath))
c.execute('SELECT COUNT(*) FROM files WHERE first_seen = last_seen')
print("Found %d new files" % (c.fetchone()[0]))
c.execute('SELECT COUNT(*) FROM files WHERE first_seen != last_seen AND hash != scanned_hash')
print("Found %d changed files" % (c.fetchone()[0]))
c.execute('DELETE FROM files WHERE last_seen < ?', [ runAtTime - scanCacheEvictionInterval ])
db.commit()
def scan_changed_files(db):
print("Scanning changed files")
c = db.cursor()
c.execute('SELECT path FROM files WHERE hash != scanned_hash')
theClams = clamd.ClamdUnixSocket(path="/run/clamav/clamd.ctl")
for row in c.fetchall():
filePath = row[0]
print("Scanning %s" % (filePath))
res = theClams.scan(filePath)
fileScanResult = res[res.keys()[0]]
if fileScanResult[0] == 'ERROR':
print("COULD NOT SCAN %s: %s" % (filePath, fileScanResult[1]))
with open(unscanableFileList, "a") as unscanableFile:
unscanableFile.write("%s: %s\n" % (filePath, fileScanResult[1]))
if fileScanResult[0] == 'FOUND':
print("POSSIBLE VIRUS FOUND IN %s" % (filePath))
shutil.move(filePath, quarantineDir)
c.execute('UPDATE files SET scanned_hash = hash')
db.commit()
create_database(conn)
update_file_list(conn)
scan_changed_files(conn)
close_database(conn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment