Last active
September 24, 2019 17:36
-
-
Save Avlyssna/7bd476ca2282faf8fd505c47b7923b4f to your computer and use it in GitHub Desktop.
An interface built for iMail's CAL format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
'''Control Access List Interface | |
This module provides an interface to iMail's CAL file format (.ACC). This | |
format is most significantly used in the `POP3d32.acc` file, located within | |
iMail's installation directory. The file itself controls which IP addresses or | |
IP address ranges are blacklisted or whitelisted for IMAP/POP access. | |
Another significant file is the `smtpd32.acc` file. This file controls which IP | |
addresses or IP address ranges are blacklisted or whitelisted for SMTP access. | |
Note, any changes made to a CAL file will need a service restart before | |
going into effect. | |
''' | |
__version__ = '1.0.0' | |
__license__ = 'MIT' | |
from struct import pack, unpack | |
from socket import inet_ntoa, inet_aton | |
_MAGIC_NUMBER = 1150 | |
_WHITELIST_MODE = 0 | |
_BLACKLIST_MODE = 1 | |
class CorruptHeaderError(ValueError): | |
'''Raised when the CAL magic number is invalid or missing.''' | |
pass | |
class UnsupportedModeError(NotImplementedError): | |
'''Raised when whitelist mode is enabled.''' | |
pass | |
class AccessEntry: | |
'''The base class for our CAL entries; not meant to be used on its own.''' | |
def __init__(self, address, mask='255.255.255.255', attempts=0, expiration=None, comment=''): | |
self.address = address | |
self.mask = mask | |
self.attempts = attempts | |
self.expiration = expiration | |
self.comment = comment | |
class SenderAccessEntry(AccessEntry): | |
'''To be used only with SMTP CALs.''' | |
def to_stream(self, stream): | |
stream.write(inet_aton(self.address)) | |
stream.write(inet_aton(self.mask)) | |
stream.write(pack('<I', self.expiration or 0)) | |
stream.write(pack('<I', 1)) | |
stream.write(pack('<I', self.attempts)) | |
stream.write(self.comment.ljust(63)[:63].encode() + b'\x00') | |
stream.write(pack('<I', 0)) | |
@classmethod | |
def from_stream(this, stream): | |
# This is the banned IP address. | |
address = inet_ntoa(stream.read(4)) | |
# This is the subnet mask of the address. | |
mask = inet_ntoa(stream.read(4)) | |
# This is a timestamp for when the ban expires. | |
expiration = unpack('<I', stream.read(4))[0] | |
# Not sure exactly what this is. It appears to be set to 1 when a | |
# malformed request triggers the ban or a manual ban is applied. This | |
# is opposed to a 0 value when harvesting prevention triggers the ban. | |
unknown_1 = unpack('<I', stream.read(4))[0] or None | |
# This is the amount of failed attempts made by the address. | |
attempts = unpack('<I', stream.read(4))[0] | |
# This is a comment for the record (can be user or system-specified). | |
comment = stream.read(64)[:-1].decode().strip() | |
# Not sure exactly what this is either. | |
unknown_2 = unpack('<I', stream.read(4))[0] | |
return this(address, mask, attempts, expiration, comment) | |
class RetrieverAccessEntry(AccessEntry): | |
'''To be used only with IMAP/POP CALs.''' | |
def to_stream(self, stream): | |
stream.write(inet_aton(self.address)) | |
stream.write(inet_aton(self.mask)) | |
stream.write(pack('<I', 1)) | |
stream.write(pack('<I', self.attempts)) | |
stream.write(pack('<I', self.expiration or 0)) | |
stream.write(self.comment.ljust(63)[:63].encode() + b'\x00') | |
@classmethod | |
def from_stream(this, stream): | |
# This is the banned IP address. | |
address = inet_ntoa(stream.read(4)) | |
# This is the subnet mask of the address. | |
mask = inet_ntoa(stream.read(4)) | |
# Not sure exactly what this is. | |
unknown_1 = unpack('<I', stream.read(4))[0] | |
# This is the amount of failed attempts made by the address. | |
attempts = unpack('<I', stream.read(4))[0] | |
# This is a timestamp for when the ban expires. | |
expiration = unpack('<I', stream.read(4))[0] or None | |
# This is a comment for the record (can be user or system-specified). | |
comment = stream.read(64)[:-1].decode().strip() | |
return this(address, mask, attempts, expiration, comment) | |
class AccessList: | |
'''The base class for our CALs; not meant to be used on its own.''' | |
def __init__(self, entries=None, mode=_BLACKLIST_MODE): | |
self.entries = entries or [] | |
self.mode = mode | |
def to_file(self, path): | |
with open(path, 'wb') as file: | |
self.to_stream(file) | |
@classmethod | |
def from_file(this, path): | |
with open(path, 'rb') as file: | |
return this.from_stream(file) | |
class SenderAccessList(AccessList): | |
'''Used for the SMTP CAL.''' | |
def to_stream(self, stream): | |
stream.write(pack('<I', _MAGIC_NUMBER)) | |
stream.write(pack('<I', 0)) | |
stream.write(pack('<I', 0)) | |
stream.write(pack('<I', _BLACKLIST_MODE)) | |
stream.write(pack('<I', len(self.entries))) | |
for entry in self.entries: | |
entry.to_stream(stream) | |
@classmethod | |
def from_stream(this, stream): | |
# This is the file's magic number. | |
magic_number = unpack('<I', stream.read(4))[0] | |
if magic_number != _MAGIC_NUMBER: | |
raise CorruptHeaderError('The stream contains an invalid CAL magic number!') | |
# It's currently unknown what these two values are used for. | |
unknown_1 = unpack('<I', stream.read(4))[0] | |
unknown_2 = unpack('<I', stream.read(4))[0] | |
# This is assumed to be the mode of operation (white/blacklist). | |
mode = unpack('<I', stream.read(4))[0] | |
if mode != _BLACKLIST_MODE: | |
raise UnsupportedModeError('Currently, only blacklist mode is supported!') | |
# This is the count of entries in the list. | |
count = unpack('<I', stream.read(4))[0] | |
entries = [] | |
for index in range(count): | |
entries.append(SenderAccessEntry.from_stream(stream)) | |
return this(entries, mode) | |
class RetrieverAccessList(AccessList): | |
'''Used for the IMAP/POP CAL.''' | |
def to_stream(self, stream): | |
stream.write(pack('<I', _MAGIC_NUMBER)) | |
stream.write(pack('<I', _BLACKLIST_MODE)) | |
stream.write(pack('<I', len(self.entries))) | |
for entry in self.entries: | |
entry.to_stream(stream) | |
@classmethod | |
def from_stream(this, stream): | |
# This is the file's magic number. | |
magic_number = unpack('<I', stream.read(4))[0] | |
if magic_number != _MAGIC_NUMBER: | |
raise CorruptHeaderError('The stream contains an invalid CAL magic number!') | |
# This is assumed to be the mode of operation (white/blacklist). | |
mode = unpack('<I', stream.read(4))[0] | |
if mode != _BLACKLIST_MODE: | |
raise UnsupportedModeError('Currently, only blacklist mode is supported!') | |
# This is the count of entries in the list. | |
count = unpack('<I', stream.read(4))[0] | |
entries = [] | |
for index in range(count): | |
entries.append(RetrieverAccessEntry.from_stream(stream)) | |
return this(entries, mode) | |
if __name__ == '__main__': | |
# Here, we create an example IMAP/POP CAL to block access from Google's DNS. | |
retriever_cal = RetrieverAccessList() | |
retriever_cal.entries.append(RetrieverAccessEntry('8.8.8.8')) | |
retriever_cal.to_file('POP3d32-example.acc') | |
# We can then read it's contents back. | |
retriever_cal = RetrieverAccessList.from_file('POP3d32-example.acc') | |
for entry in retriever_cal.entries: | |
print(entry.address, '/', entry.mask) | |
# Here, we create an example SMTP CAL to block access from Google's DNS. | |
sender_cal = SenderAccessList() | |
sender_cal.entries.append(SenderAccessEntry('8.8.8.8')) | |
sender_cal.to_file('smtpd32-example.acc') | |
# We can then read it's contents back. | |
sender_cal = SenderAccessList.from_file('smtpd32-example.acc') | |
for entry in sender_cal.entries: | |
print(entry.address, '/', entry.mask) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment