Skip to content

Instantly share code, notes, and snippets.

@pandapknaepel
Created June 25, 2025 07:16
Show Gist options
  • Save pandapknaepel/23167c760ccacc679a0051dc3f2e45b9 to your computer and use it in GitHub Desktop.
Save pandapknaepel/23167c760ccacc679a0051dc3f2e45b9 to your computer and use it in GitHub Desktop.
Scans the repository for sensitive data
#!/usr/bin/env python3
"""
Enhanced Sensitive Data Scanner for Weltmaschine Repository
Scans for credentials, API keys, secrets, and other sensitive information
with advanced detection capabilities and intelligent filtering
"""
import os
import re
import json
import base64
import math
import csv
import subprocess
from pathlib import Path
from typing import List, Dict, Tuple, Set, Optional
from collections import Counter
class SensitiveDataScanner:
def __init__(self, root_path: str):
self.root_path = Path(root_path)
self.findings = []
self.gitignore_patterns = self._load_gitignore_patterns()
# Enhanced patterns for sensitive data
self.patterns = {
'api_keys': [
r'api[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{20,}',
r'apikey["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{20,}',
r'x-api-key["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{20,}',
r'openai[_-]?api[_-]?key["\s]*[:=]["\s]*sk-[a-zA-Z0-9]{32,}',
r'github[_-]?token["\s]*[:=]["\s]*gh[pous]_[a-zA-Z0-9]{36}',
],
'connection_strings': [
r'connectionstring["\s]*[:=]["\s]*[^"\n]{30,}',
r'server["\s]*[:=]["\s]*[^"\n;]{10,}',
r'password["\s]*[:=]["\s]*[^"\n;]{5,}',
r'user\s?id["\s]*[:=]["\s]*[^"\n;]{3,}',
r'data\s?source["\s]*[:=]["\s]*[^"\n;]{5,}',
r'postgres://[^\s\n"\']{20,}',
r'mysql://[^\s\n"\']{20,}',
r'mongodb://[^\s\n"\']{20,}',
],
'secrets': [
r'secret["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{16,}',
r'client[_-]?secret["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{16,}',
r'app[_-]?secret["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{16,}',
r'webhook[_-]?secret["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{16,}',
],
'tokens': [
r'token["\s]*[:=]["\s]*[a-zA-Z0-9_\-\.]{20,}',
r'access[_-]?token["\s]*[:=]["\s]*[a-zA-Z0-9_\-\.]{20,}',
r'refresh[_-]?token["\s]*[:=]["\s]*[a-zA-Z0-9_\-\.]{20,}',
r'bearer["\s]+[a-zA-Z0-9_\-\.]{20,}',
r'jwt["\s]*[:=]["\s]*[a-zA-Z0-9_\-\.]{20,}',
r'vault[_-]?token["\s]*[:=]["\s]*[a-zA-Z0-9_\-\.]{20,}',
],
'passwords': [
r'password["\s]*[:=]["\s]*[^"\n\s]{6,}',
r'pwd["\s]*[:=]["\s]*[^"\n\s]{6,}',
r'passwd["\s]*[:=]["\s]*[^"\n\s]{6,}',
],
'database_credentials': [
r'host["\s]*[:=]["\s]*[^"\n\s;]{5,}',
r'database["\s]*[:=]["\s]*[^"\n\s;]{3,}',
r'uid["\s]*[:=]["\s]*[^"\n\s;]{3,}',
],
'cloud_keys': [
r'aws[_-]?access[_-]?key["\s]*[:=]["\s]*[A-Z0-9]{16,}',
r'aws[_-]?secret[_-]?access[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9/+=]{28,}',
r'azure[_-]?client[_-]?secret["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{32,}',
r'google[_-]?api[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9_\-]{32,}',
r'AKIA[0-9A-Z]{16}', # AWS Access Key ID
],
'certificates': [
r'-----BEGIN\s+(PRIVATE\s+KEY|RSA\s+PRIVATE\s+KEY|CERTIFICATE)',
r'-----END\s+(PRIVATE\s+KEY|RSA\s+PRIVATE\s+KEY|CERTIFICATE)',
r'-----BEGIN\s+OPENSSH\s+PRIVATE\s+KEY-----',
],
'crypto_keys': [
r'private[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9+/=]{32,}',
r'public[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9+/=]{32,}',
r'encryption[_-]?key["\s]*[:=]["\s]*[a-zA-Z0-9+/=]{16,}',
],
'base64_encoded': [
r'[A-Za-z0-9+/]{40,}={0,2}', # Base64 strings 40+ chars
],
'email_addresses': [
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
],
'ip_addresses': [
r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', # IPv4
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', # IPv6
],
'urls_with_credentials': [
r'https?://[^:]+:[^@]+@[^\s\n"\']+',
],
'dotnet_specific': [
r'DefaultConnection["\s]*[:=]["\s]*[^"\n]{20,}',
r'ApplicationInsights["\s]*[:=]["\s]*[a-zA-Z0-9\-]{30,}',
r'ServiceBusConnectionString["\s]*[:=]["\s]*[^"\n]{30,}',
]
}
# Whitelist for known safe values to reduce false positives
self.whitelisted_patterns = {
# Common .NET test/example values
'CancellationToken.None',
'Guid.NewGuid()',
'DateTime.Now',
'Environment.GetEnvironmentVariable',
'your-vault-token-here',
'your-token-here',
'test_token',
'dGVzdF90b2tlbg==', # base64 for "test_token"
'example.com',
'localhost',
'127.0.0.1',
'TEST_SHOP_ID',
'XB71FABAF8D68A4A0149D99D15D9796CA',
}
# Context patterns that indicate false positives
self.false_positive_contexts = [
r'CancellationToken\s*=\s*CancellationToken\.None',
r'var\s+\w+\s*=\s*[A-Za-z0-9+/]{40,}={0,2}', # Variable assignments
r'\.ToString\(\)',
r'GetEnvironmentVariable',
r'configuration\.GetSection',
]
# File extensions to scan
self.scan_extensions = {
'.json', '.xml', '.yml', '.yaml', '.config', '.env', '.ini',
'.cs', '.js', '.ts', '.sql', '.txt', '.md', '.sh', '.ps1',
'.properties', '.conf', '.cfg'
}
# Files to always check regardless of extension
self.always_scan_files = {
'appsettings.json', 'appsettings.development.json', 'appsettings.production.json',
'web.config', 'app.config', '.env', '.env.local', '.env.production',
'docker-compose.yml', 'docker-compose.yaml', 'dockerfile'
}
# Directories to skip
self.skip_dirs = {
'.git', 'node_modules', 'bin', 'obj', '.vs', '.vscode',
'packages', 'TestResults', '.nuget'
}
def calculate_entropy(self, text: str) -> float:
"""Calculate Shannon entropy of text to identify random strings"""
if not text:
return 0.0
# Count character frequencies
char_counts = Counter(text)
text_len = len(text)
# Calculate entropy
entropy = 0.0
for count in char_counts.values():
p = count / text_len
entropy -= p * math.log2(p)
return entropy
def is_likely_base64_credential(self, text: str) -> Optional[str]:
"""Check if base64 string contains potential credentials"""
try:
# Must be valid base64
if not re.match(r'^[A-Za-z0-9+/]*={0,2}$', text):
return None
# Must be reasonable length for credentials
if len(text) < 20:
return None
# Try to decode
decoded = base64.b64decode(text).decode('utf-8', errors='ignore')
# Check for credential-like patterns in decoded text
cred_patterns = [
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # Email
r'[^:]+:[^@\s]{6,}', # user:password
r'Bearer\s+[a-zA-Z0-9_\-\.]+', # Bearer token
r'sk-[a-zA-Z0-9]+', # OpenAI API key format
]
for pattern in cred_patterns:
if re.search(pattern, decoded, re.IGNORECASE):
return decoded
except Exception:
pass
return None
def is_false_positive(self, match: str, context: str) -> bool:
"""Enhanced false positive detection"""
# Check whitelist
for whitelisted in self.whitelisted_patterns:
if whitelisted in match:
return True
# Check context patterns
for fp_pattern in self.false_positive_contexts:
if re.search(fp_pattern, context, re.IGNORECASE):
return True
# Check if it's a variable name or method call
if re.search(r'(var|const|let)\s+\w+\s*=.*' + re.escape(match), context):
return True
if re.search(r'\w+\.' + re.escape(match), context):
return True
return False
def _load_gitignore_patterns(self) -> List[str]:
"""Load .gitignore patterns from all .gitignore files in the repository"""
patterns = []
# Find all .gitignore files
for gitignore_file in self.root_path.rglob('.gitignore'):
try:
with open(gitignore_file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
# Convert gitignore patterns to relative paths from repo root
relative_gitignore_dir = gitignore_file.parent.relative_to(self.root_path)
if relative_gitignore_dir != Path('.'):
pattern = str(relative_gitignore_dir / line)
else:
pattern = line
patterns.append(pattern)
except Exception as e:
print(f"Warning: Could not read {gitignore_file}: {e}")
return patterns
def _is_gitignored(self, file_path: Path) -> bool:
"""Check if a file should be ignored based on .gitignore patterns"""
try:
# Use git check-ignore command for accurate gitignore checking
result = subprocess.run(
['git', 'check-ignore', str(file_path)],
cwd=self.root_path,
capture_output=True,
text=True
)
# If git check-ignore returns 0, the file is ignored
return result.returncode == 0
except Exception:
# Fallback to manual pattern matching if git is not available
return self._manual_gitignore_check(file_path)
def _manual_gitignore_check(self, file_path: Path) -> bool:
"""Manual gitignore pattern matching as fallback"""
relative_path = str(file_path.relative_to(self.root_path))
for pattern in self.gitignore_patterns:
# Simple pattern matching (not full gitignore spec)
if pattern.endswith('*'):
if relative_path.startswith(pattern[:-1]):
return True
elif '*' in pattern:
# Convert to regex for wildcard matching
regex_pattern = pattern.replace('*', '.*').replace('?', '.')
if re.match(regex_pattern, relative_path):
return True
elif pattern in relative_path or relative_path.startswith(pattern):
return True
return False
def should_scan_file(self, file_path: Path) -> bool:
"""Determine if a file should be scanned"""
# Skip gitignored files
if self._is_gitignored(file_path):
return False
if file_path.name.lower() in self.always_scan_files:
return True
return file_path.suffix.lower() in self.scan_extensions
def scan_file(self, file_path: Path) -> List[Dict]:
"""Enhanced scan of a single file for sensitive data"""
findings = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Check each pattern category
for category, patterns in self.patterns.items():
for pattern in patterns:
matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
match_text = match.group()
# Get line number
line_num = content[:match.start()].count('\n') + 1
# Get context (surrounding lines)
lines = content.split('\n')
start_line = max(0, line_num - 2)
end_line = min(len(lines), line_num + 1)
context = '\n'.join(lines[start_line:end_line])
# Enhanced filtering
if self.is_false_positive(match_text, context):
continue
# Calculate entropy for potential random strings
entropy = self.calculate_entropy(match_text)
# Special handling for base64 encoded strings
decoded_content = None
if category == 'base64_encoded':
decoded_content = self.is_likely_base64_credential(match_text)
if not decoded_content:
continue # Skip if not likely credential
# Risk scoring
risk_score = self.calculate_risk_score(category, match_text, entropy, file_path)
finding = {
'file': str(file_path.relative_to(self.root_path)),
'category': category,
'pattern': pattern,
'match': match_text,
'line': line_num,
'context': context,
'entropy': round(entropy, 2),
'risk_score': risk_score
}
if decoded_content:
finding['decoded_content'] = decoded_content
findings.append(finding)
except Exception as e:
print(f"Error scanning {file_path}: {e}")
return findings
def calculate_risk_score(self, category: str, match: str, entropy: float, file_path: Path) -> int:
"""Calculate risk score (1-10) for a finding"""
score = 5 # Base score
# Category-based scoring
high_risk_categories = {'certificates', 'cloud_keys', 'base64_encoded'}
medium_risk_categories = {'api_keys', 'secrets', 'tokens'}
if category in high_risk_categories:
score += 3
elif category in medium_risk_categories:
score += 2
elif category in {'passwords', 'connection_strings'}:
score += 1
# Entropy-based scoring (higher entropy = more likely to be real secret)
if entropy > 4.5:
score += 2
elif entropy > 3.5:
score += 1
# File type based scoring
sensitive_files = {'.env', 'appsettings.json', '.config', '.yml', '.yaml'}
if file_path.suffix.lower() in sensitive_files or file_path.name.lower() in sensitive_files:
score += 2
# Test files are lower risk
if 'test' in str(file_path).lower() or 'mock' in str(file_path).lower():
score -= 2
# Length-based scoring (longer strings more likely to be real)
if len(match) > 50:
score += 1
return max(1, min(10, score)) # Clamp between 1-10
def scan_directory(self) -> None:
"""Scan the entire directory tree"""
print(f"Scanning repository: {self.root_path}")
for root, dirs, files in os.walk(self.root_path):
# Skip certain directories
dirs[:] = [d for d in dirs if d not in self.skip_dirs]
root_path = Path(root)
for file in files:
file_path = root_path / file
if self.should_scan_file(file_path):
file_findings = self.scan_file(file_path)
self.findings.extend(file_findings)
def generate_report(self) -> str:
"""Generate an enhanced formatted report of findings"""
if not self.findings:
return "โœ… No sensitive data patterns found!"
# Sort findings by risk score (highest first)
sorted_findings = sorted(self.findings, key=lambda x: x['risk_score'], reverse=True)
high_risk = [f for f in sorted_findings if f['risk_score'] >= 8]
medium_risk = [f for f in sorted_findings if 5 <= f['risk_score'] < 8]
low_risk = [f for f in sorted_findings if f['risk_score'] < 5]
report = f"๐Ÿ” ENHANCED SENSITIVE DATA SCAN RESULTS\n"
report += f"{'='*60}\n"
report += f"Total findings: {len(self.findings)}\n"
report += f"๐Ÿšจ High Risk (8-10): {len(high_risk)} findings\n"
report += f"โš ๏ธ Medium Risk (5-7): {len(medium_risk)} findings\n"
report += f"โ„น๏ธ Low Risk (1-4): {len(low_risk)} findings\n\n"
# High risk findings first
if high_risk:
report += f"๐Ÿšจ HIGH RISK FINDINGS\n"
report += f"{'='*40}\n"
for finding in high_risk:
report += self._format_finding(finding)
report += f"\n"
if medium_risk:
report += f"โš ๏ธ MEDIUM RISK FINDINGS\n"
report += f"{'='*40}\n"
for finding in medium_risk:
report += self._format_finding(finding)
report += f"\n"
if low_risk:
report += f"โ„น๏ธ LOW RISK FINDINGS\n"
report += f"{'='*40}\n"
for finding in low_risk[:10]: # Limit low risk to first 10
report += self._format_finding(finding)
if len(low_risk) > 10:
report += f" ... and {len(low_risk) - 10} more low risk findings\n"
report += f"\n"
# Summary statistics
report += self._generate_summary_stats()
return report
def _format_finding(self, finding: Dict) -> str:
"""Format a single finding for the report"""
risk_emoji = "๐Ÿšจ" if finding['risk_score'] >= 8 else "โš ๏ธ" if finding['risk_score'] >= 5 else "โ„น๏ธ"
output = f" {risk_emoji} RISK SCORE: {finding['risk_score']}/10 | ENTROPY: {finding['entropy']}\n"
output += f" ๐Ÿ“„ File: {finding['file']}\n"
output += f" ๐Ÿ“ Line: {finding['line']}\n"
output += f" ๐Ÿท๏ธ Category: {finding['category'].upper().replace('_', ' ')}\n"
output += f" ๐Ÿ” Pattern: {finding['pattern']}\n"
output += f" โš ๏ธ Match: {finding['match'][:100]}{'...' if len(finding['match']) > 100 else ''}\n"
if 'decoded_content' in finding:
output += f" ๐Ÿ”“ Decoded: {finding['decoded_content'][:100]}{'...' if len(finding['decoded_content']) > 100 else ''}\n"
output += f" ๐Ÿ“ Context:\n"
for line in finding['context'].split('\n'):
output += f" {line}\n"
output += f"\n"
return output
def _generate_summary_stats(self) -> str:
"""Generate summary statistics"""
# Group by category
by_category = {}
for finding in self.findings:
category = finding['category']
if category not in by_category:
by_category[category] = []
by_category[category].append(finding)
# Group by file
by_file = {}
for finding in self.findings:
file = finding['file']
if file not in by_file:
by_file[file] = {'count': 0, 'max_risk': 0}
by_file[file]['count'] += 1
by_file[file]['max_risk'] = max(by_file[file]['max_risk'], finding['risk_score'])
report = f"๐Ÿ“Š SUMMARY STATISTICS\n"
report += f"{'-'*40}\n"
report += f"By Category:\n"
for category, findings in sorted(by_category.items(), key=lambda x: len(x[1]), reverse=True):
avg_risk = sum(f['risk_score'] for f in findings) / len(findings)
report += f" {len(findings):2d} {category.replace('_', ' ').title():<20} (avg risk: {avg_risk:.1f})\n"
report += f"\nBy File (top 15):\n"
sorted_files = sorted(by_file.items(), key=lambda x: (x[1]['max_risk'], x[1]['count']), reverse=True)
for file, stats in sorted_files[:15]:
report += f" {stats['count']:2d} issues (max risk: {stats['max_risk']}) {file}\n"
return report
def save_to_csv(self, filename: str) -> None:
"""Save findings to CSV file"""
if not self.findings:
print("No findings to save.")
return
fieldnames = [
'risk_score', 'category', 'file', 'line', 'entropy',
'match', 'decoded_content', 'pattern', 'context'
]
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for finding in sorted(self.findings, key=lambda x: x['risk_score'], reverse=True):
row = {
'risk_score': finding['risk_score'],
'category': finding['category'],
'file': finding['file'],
'line': finding['line'],
'entropy': finding['entropy'],
'match': finding['match'][:200], # Truncate long matches
'decoded_content': finding.get('decoded_content', '')[:200] if finding.get('decoded_content') else '',
'pattern': finding['pattern'],
'context': finding['context'].replace('\n', '\\n')[:500] # Truncate and escape newlines
}
writer.writerow(row)
print(f"Findings saved to {filename}")
def scan_git_history(self, max_commits: int = 10) -> List[Dict]:
"""Scan recent git commits for removed credentials"""
try:
import subprocess
# Get recent commits with their diffs
cmd = ['git', 'log', f'-{max_commits}', '--oneline', '--no-merges']
result = subprocess.run(cmd, cwd=self.root_path, capture_output=True, text=True)
if result.returncode != 0:
return []
git_findings = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
commit_hash = line.split()[0]
# Get diff for this commit
diff_cmd = ['git', 'show', commit_hash]
diff_result = subprocess.run(diff_cmd, cwd=self.root_path, capture_output=True, text=True)
if diff_result.returncode == 0:
# Scan diff for sensitive patterns
for category, patterns in self.patterns.items():
for pattern in patterns:
matches = re.finditer(pattern, diff_result.stdout, re.IGNORECASE | re.MULTILINE)
for match in matches:
git_findings.append({
'commit': commit_hash,
'category': category,
'match': match.group(),
'context': diff_result.stdout[max(0, match.start()-100):match.end()+100]
})
return git_findings
except Exception as e:
print(f"Error scanning git history: {e}")
return []
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Enhanced Sensitive Data Scanner')
parser.add_argument('--path', default="/mnt/c/Users/Panda/Projects/dotnet/weltmaschine",
help='Path to scan')
parser.add_argument('--git-history', action='store_true',
help='Also scan git history for removed credentials')
parser.add_argument('--min-risk', type=int, default=1, choices=range(1, 11),
help='Minimum risk score to report (1-10)')
parser.add_argument('--output', choices=['text', 'json', 'csv'], default='text',
help='Output format')
parser.add_argument('--output-file', type=str,
help='Output file path (required for CSV output)')
args = parser.parse_args()
scanner = SensitiveDataScanner(args.path)
scanner.scan_directory()
# Filter by minimum risk score
scanner.findings = [f for f in scanner.findings if f['risk_score'] >= args.min_risk]
if args.output == 'csv':
if not args.output_file:
print("Error: --output-file is required for CSV output")
exit(1)
scanner.save_to_csv(args.output_file)
elif args.output == 'json':
if args.output_file:
with open(args.output_file, 'w') as f:
json.dump(scanner.findings, f, indent=2)
print(f"JSON output saved to {args.output_file}")
else:
print(json.dumps(scanner.findings, indent=2))
else:
if args.output_file:
with open(args.output_file, 'w') as f:
f.write(scanner.generate_report())
print(f"Report saved to {args.output_file}")
else:
print(scanner.generate_report())
# Optional git history scan
if args.git_history:
print("\n" + "="*60)
print("๐Ÿ•ฐ๏ธ GIT HISTORY SCAN")
print("="*60)
git_findings = scanner.scan_git_history()
if git_findings:
for finding in git_findings:
print(f"Commit {finding['commit']}: {finding['category']} - {finding['match'][:50]}...")
else:
print("No sensitive data found in recent git history.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment