Skip to content

Instantly share code, notes, and snippets.

@Avlyssna
Last active September 24, 2019 17:36
Show Gist options
  • Save Avlyssna/7bd476ca2282faf8fd505c47b7923b4f to your computer and use it in GitHub Desktop.
Save Avlyssna/7bd476ca2282faf8fd505c47b7923b4f to your computer and use it in GitHub Desktop.
An interface built for iMail's CAL format
#!/usr/bin/env python3
'''Control Access List Interface
This module provides an interface to iMail's CAL file format (.ACC). This
format is most significantly used in the `POP3d32.acc` file, located within
iMail's installation directory. The file itself controls which IP addresses or
IP address ranges are blacklisted or whitelisted for IMAP/POP access.
Another significant file is the `smtpd32.acc` file. This file controls which IP
addresses or IP address ranges are blacklisted or whitelisted for SMTP access.
Note, any changes made to a CAL file will need a service restart before
going into effect.
'''
__version__ = '1.0.0'
__license__ = 'MIT'
from struct import pack, unpack
from socket import inet_ntoa, inet_aton
_MAGIC_NUMBER = 1150
_WHITELIST_MODE = 0
_BLACKLIST_MODE = 1
class CorruptHeaderError(ValueError):
'''Raised when the CAL magic number is invalid or missing.'''
pass
class UnsupportedModeError(NotImplementedError):
'''Raised when whitelist mode is enabled.'''
pass
class AccessEntry:
'''The base class for our CAL entries; not meant to be used on its own.'''
def __init__(self, address, mask='255.255.255.255', attempts=0, expiration=None, comment=''):
self.address = address
self.mask = mask
self.attempts = attempts
self.expiration = expiration
self.comment = comment
class SenderAccessEntry(AccessEntry):
'''To be used only with SMTP CALs.'''
def to_stream(self, stream):
stream.write(inet_aton(self.address))
stream.write(inet_aton(self.mask))
stream.write(pack('<I', self.expiration or 0))
stream.write(pack('<I', 1))
stream.write(pack('<I', self.attempts))
stream.write(self.comment.ljust(63)[:63].encode() + b'\x00')
stream.write(pack('<I', 0))
@classmethod
def from_stream(this, stream):
# This is the banned IP address.
address = inet_ntoa(stream.read(4))
# This is the subnet mask of the address.
mask = inet_ntoa(stream.read(4))
# This is a timestamp for when the ban expires.
expiration = unpack('<I', stream.read(4))[0]
# Not sure exactly what this is. It appears to be set to 1 when a
# malformed request triggers the ban or a manual ban is applied. This
# is opposed to a 0 value when harvesting prevention triggers the ban.
unknown_1 = unpack('<I', stream.read(4))[0] or None
# This is the amount of failed attempts made by the address.
attempts = unpack('<I', stream.read(4))[0]
# This is a comment for the record (can be user or system-specified).
comment = stream.read(64)[:-1].decode().strip()
# Not sure exactly what this is either.
unknown_2 = unpack('<I', stream.read(4))[0]
return this(address, mask, attempts, expiration, comment)
class RetrieverAccessEntry(AccessEntry):
'''To be used only with IMAP/POP CALs.'''
def to_stream(self, stream):
stream.write(inet_aton(self.address))
stream.write(inet_aton(self.mask))
stream.write(pack('<I', 1))
stream.write(pack('<I', self.attempts))
stream.write(pack('<I', self.expiration or 0))
stream.write(self.comment.ljust(63)[:63].encode() + b'\x00')
@classmethod
def from_stream(this, stream):
# This is the banned IP address.
address = inet_ntoa(stream.read(4))
# This is the subnet mask of the address.
mask = inet_ntoa(stream.read(4))
# Not sure exactly what this is.
unknown_1 = unpack('<I', stream.read(4))[0]
# This is the amount of failed attempts made by the address.
attempts = unpack('<I', stream.read(4))[0]
# This is a timestamp for when the ban expires.
expiration = unpack('<I', stream.read(4))[0] or None
# This is a comment for the record (can be user or system-specified).
comment = stream.read(64)[:-1].decode().strip()
return this(address, mask, attempts, expiration, comment)
class AccessList:
'''The base class for our CALs; not meant to be used on its own.'''
def __init__(self, entries=None, mode=_BLACKLIST_MODE):
self.entries = entries or []
self.mode = mode
def to_file(self, path):
with open(path, 'wb') as file:
self.to_stream(file)
@classmethod
def from_file(this, path):
with open(path, 'rb') as file:
return this.from_stream(file)
class SenderAccessList(AccessList):
'''Used for the SMTP CAL.'''
def to_stream(self, stream):
stream.write(pack('<I', _MAGIC_NUMBER))
stream.write(pack('<I', 0))
stream.write(pack('<I', 0))
stream.write(pack('<I', _BLACKLIST_MODE))
stream.write(pack('<I', len(self.entries)))
for entry in self.entries:
entry.to_stream(stream)
@classmethod
def from_stream(this, stream):
# This is the file's magic number.
magic_number = unpack('<I', stream.read(4))[0]
if magic_number != _MAGIC_NUMBER:
raise CorruptHeaderError('The stream contains an invalid CAL magic number!')
# It's currently unknown what these two values are used for.
unknown_1 = unpack('<I', stream.read(4))[0]
unknown_2 = unpack('<I', stream.read(4))[0]
# This is assumed to be the mode of operation (white/blacklist).
mode = unpack('<I', stream.read(4))[0]
if mode != _BLACKLIST_MODE:
raise UnsupportedModeError('Currently, only blacklist mode is supported!')
# This is the count of entries in the list.
count = unpack('<I', stream.read(4))[0]
entries = []
for index in range(count):
entries.append(SenderAccessEntry.from_stream(stream))
return this(entries, mode)
class RetrieverAccessList(AccessList):
'''Used for the IMAP/POP CAL.'''
def to_stream(self, stream):
stream.write(pack('<I', _MAGIC_NUMBER))
stream.write(pack('<I', _BLACKLIST_MODE))
stream.write(pack('<I', len(self.entries)))
for entry in self.entries:
entry.to_stream(stream)
@classmethod
def from_stream(this, stream):
# This is the file's magic number.
magic_number = unpack('<I', stream.read(4))[0]
if magic_number != _MAGIC_NUMBER:
raise CorruptHeaderError('The stream contains an invalid CAL magic number!')
# This is assumed to be the mode of operation (white/blacklist).
mode = unpack('<I', stream.read(4))[0]
if mode != _BLACKLIST_MODE:
raise UnsupportedModeError('Currently, only blacklist mode is supported!')
# This is the count of entries in the list.
count = unpack('<I', stream.read(4))[0]
entries = []
for index in range(count):
entries.append(RetrieverAccessEntry.from_stream(stream))
return this(entries, mode)
if __name__ == '__main__':
# Here, we create an example IMAP/POP CAL to block access from Google's DNS.
retriever_cal = RetrieverAccessList()
retriever_cal.entries.append(RetrieverAccessEntry('8.8.8.8'))
retriever_cal.to_file('POP3d32-example.acc')
# We can then read it's contents back.
retriever_cal = RetrieverAccessList.from_file('POP3d32-example.acc')
for entry in retriever_cal.entries:
print(entry.address, '/', entry.mask)
# Here, we create an example SMTP CAL to block access from Google's DNS.
sender_cal = SenderAccessList()
sender_cal.entries.append(SenderAccessEntry('8.8.8.8'))
sender_cal.to_file('smtpd32-example.acc')
# We can then read it's contents back.
sender_cal = SenderAccessList.from_file('smtpd32-example.acc')
for entry in sender_cal.entries:
print(entry.address, '/', entry.mask)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment