Skip to content

Instantly share code, notes, and snippets.

Last active November 11, 2022 18:11
Show Gist options
  • Save M-griffin/65a23b7ea3d7529fd725 to your computer and use it in GitHub Desktop.
Save M-griffin/65a23b7ea3d7529fd725 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
PyPacketMail for x/84,
(c) 2015 Michael Griffin <>
This is a FidoNet Echomail Scanner / Tosser for x84 bbs.
This will mimic the basic functionality of Crashmail for
Reading and Writing mail packets.
*** Sample INI Section to be added to DEFAULT.INI in X84
# Setup for PyPacket Mail {Fidonet Tosser/Scanner}
inbound = /home/pi/Desktop/PyPacketMail/inbound
outbound = /home/pi/Desktop/PyPacketMail/outbound
pack = /home/pi/Desktop/PyPacketMail/pack
unpack = /home/pi/Desktop/PyPacketMail/unpack
bad = /home/pi/Desktop/PyPacketMail/bad
archive = /home/pi/Desktop/PyPacketMail/archive
# Fido Type Network Domain names, seperate with commas.
network_tags = agoranet, fidonet
# Network Specific Addresses and Area -> Tag Translations.
node_address = 46:1/140
export_address = 46:1/100
areas = agn_gen: general, agn_ads: bbs_ads, agn_bbs: bbs_discussion, agn_art: art, agn_dev: development,
agn_nix: unix_linux, agn_hub: hub_stats, agn_l46: league46, agn_tst: testing, agn_sys: sysop_area
default_area = agn_gen
# Network Specific Addresses and Area -> Tag Translations.
node_address = 1:154/140
export_address = 1:154/10
areas = fdn_ent: enthral_bbs
default_area = fdn_ent
Recent changeLog
* 0.0.1
- Added Default Areas, of only a network tag is used eg. agoranet, and no
specific area tag is selected. the default_area will then be used for
exporting messages.
__author__ = "Michael Griffin"
__copyright__ = "Copyright 2015"
__credits__ = "Jeff Quast"
__license__ = "MIT"
__version__ = "1.0.0"
__status__ = "Prototype"
import collections
import datetime
import zipfile
import struct
import glob
import os
# Reading Binary Packet Formats
from ctypes import LittleEndianStructure, Union, c_uint8
# x84 specific
from x84.bbs.ini import init, get_ini
from x84.cmdline import parse_args
# Database for holding FidoNet Specific Items and Kludges
FIDO_DB = 'pymail'
# Read in default .x84 INI File.
class FidonetConfiguration():
# Holds configuration values from x84 Default.ini
# also builds up lists of areas per network and
# their data -> tag translatons
def __init__(self):
self.__network_list = [] # List of Fido Networks
self.__node_address = {} # Your Address
self.__export_address = {} # Your Network Hub's Address
self.__network_areas = {} # Message Areas by network
self.__default_areas = {} # Default if no Valid Area Tag
self.__inbound_folder = None
self.__unpack_folder = None
self.read_configuration() # Load All INI settings on startup.
def add_network(self):
# Everything is built from the initial network address
self.__network_list = \
get_ini(section='fido_networks', key='network_tags', split=True)
def add_node_address(self):
# Node Addresses per network
if self.is_network_empty is False:
for net in self.__network_list:
# Loop network list and get network section.
self.__node_address[net] = \
get_ini(section=net, key='node_address', split=True)
def add_export_address(self):
# Export {mail hub} addresses per network
if self.is_network_empty is False:
for net in self.__network_list:
# Loop network list and get network section.
self.__export_address[net] = \
get_ini(section=net, key='export_address', split=True)
def add_network_areas(self):
# key value area to tag translations per network
if self.is_network_empty is False:
for net in self.__network_list:
# Loop network list and get network section.
self.__network_areas[net] = \
get_ini(section=net, key='areas', split=True)
def add_default_areas(self):
# No Valid area, then use default area
if self.is_network_empty is False:
for net in self.__network_list:
# Loop network list and get network section.
self.__default_areas[net] = \
get_ini(section=net, key='default_area', split=True)
def is_network_empty(self):
if bool(self.__network_list and True):
return False
return True
def inbound_folder(self):
return self.__inbound_folder
def unpack_folder(self):
return self.__unpack_folder
def check_network_address(self, address):
# verify node address, return network name
for key, val in self.__node_address.items():
if address in ''.join(val):
return '{network}'.format(network=key)
return None
def get_tag(self, network_name, network_area):
# transpose area to tag name
# seperate string value into k, v dist.
if self.is_network_empty is False:
for area in self.__network_areas[network_name]:
k, v = area.split(': ')
if network_area in k:
return v
return None
def count_network_areas(self):
# Just gets a general count of network keys
# should have at least 1 network with a value area string.
count = 0
if self.is_network_empty is False:
count = len(self.__network_areas)
return count
def read_configuration(self):
# Working Folders pull from .x84 Default INI
self.__inbound_folder = ''.join(get_ini(section='mailpacket', key='inbound', split=True))
self.__unpack_folder = ''.join(get_ini(section='mailpacket', key='unpack', split=True))
# read .x84 default.ini file for network info
# build dicts for all networks and their associations
print 'network_list: ' + ', '.join(str(x) for x in self.__network_list)
for key, val in self.__node_address.items():
print 'node_address: {key}, {value}'.format(key=key, value=val)
for key, val in self.__export_address.items():
print 'export_address: {key}, {value}'.format(key=key, value=val)
for key, val in self.__network_areas.items():
print 'network_areas: {key}, {value}'.format(key=key, value=val)
for key, val in self.__default_areas.items():
print 'default_areas: {key}, {value}'.format(key=key, value=val)
print ''
# Parse and Setup Fido-net addresses and areas
cfg = FidonetConfiguration()
print ''
print 'is network empty: {bool}'.format(bool=cfg.is_network_empty)
print 'num of network w/ areas: {count}'.format(count=cfg.count_network_areas())
print 'inbound_folder: {name}'.format(name=cfg.inbound_folder)
print 'unpack_folder : {name}'.format(name=cfg.unpack_folder)
print ''
# Make sure we have at least one network setup
assert cfg.is_network_empty is False
# Make sure we have at least one network area
assert cfg.count_network_areas() >= 1
# Make sure the Inbound directory is valid
assert os.path.isdir(cfg.inbound_folder)
# Check the Packet Folder.
assert os.path.isdir(cfg.unpack_folder)
# Handle count of Areas Processed
area_count = collections.defaultdict(int)
# Fido Packet 2 Structure
_struct_packet_header_fields = [
# Structure Size 58
('H', 'origin_node'),
('H', 'destination_node'),
('H', 'year'),
('H', 'month'),
('H', 'day'),
('H', 'hour'),
('H', 'minute'),
('H', 'second'),
('H', 'baud'),
('H', 'packet_type'),
('H', 'origin_network'),
('H', 'destination_network'),
('B', 'prod_code_low'),
('B', 'revision_major'),
('8s', 'password'),
('H', 'origin_zone'),
('H', 'destination_zone'),
('H', 'aux_network'),
('H', 'capWordA'),
('B', 'prod_code_hi'),
('B', 'revision_minor'),
('H', 'capWordB'),
('H', 'origin_zone2'),
('H', 'destination_zone2'),
('H', 'origin_point'),
('H', 'destination_point'),
('L', 'prod_data')
_struct_fidonet_packet = '<{0}'.format(
''.join(struct_val for struct_val, _ in _struct_packet_header_fields))
FidonetPacketHeader = collections.namedtuple(
'FidonetPacketHeader', [field_name for _, field_name in _struct_packet_header_fields])
class FlagBits(LittleEndianStructure):
# Captures the 1st Byte Set of Bit Flags in the Message Header
_fields_ = [
('private', c_uint8, 1), # asByte & 1
('crash', c_uint8, 1), # asByte & 2
('received', c_uint8, 1), # asByte & 4
('sent', c_uint8, 1), # asByte & 8
('file_attach', c_uint8, 1), # asByte & 16
('in_transit', c_uint8, 1), # asByte & 32
('orphan', c_uint8, 1), # asByte & 64
('kill_sent', c_uint8, 1), # asByte & 128
def get_dict(self):
# return ordered fields of bits
return collections.OrderedDict((f, getattr(self, f)) for f, v, i in self._fields_)
class FlagBits2(LittleEndianStructure):
# Captures the 2nd Byte Set of Bit Flags in the Message Header
_fields_ = [
('local', c_uint8, 1), # asByte & 256
('hold', c_uint8, 1), # asByte & 512
('unused', c_uint8, 1), # asByte & 1024
('file_request', c_uint8, 1), # asByte & 2048
('want_receipt', c_uint8, 1), # asByte & 4096
('is_receipt', c_uint8, 1), # asByte & 8192
('audit', c_uint8, 1), # asByte & 16384
('file_update', c_uint8, 1), # asByte & 32768
def get_dict(self):
# return ordered fields of bits
return collections.OrderedDict((f, getattr(self, f)) for f, v, i in self._fields_)
class Flags(Union):
# Union to set the In_Value flips the appropriate bits flags
# in_value can also be used to get the value of all set flags
_fields_ = [('bit', FlagBits),
('in_value', c_uint8)]
class Flags2(Union):
# Union to set the In_Value flips the appropriate bits flags
# in_value can also be used to get the value of all set flags
_fields_ = [('bit', FlagBits2),
('in_value', c_uint8)]
class SetFlags():
# Class to handle Bit Flags for Message Attributes
field = None
field2 = None
def __init__(self, field, field2):
# get flags from message header
:rtype : None
self.field = field
self.field2 = field2
def set_flags(self):
attributes = Flags()
attributes.in_value = self.field
attributes2 = Flags2()
attributes2.in_value = self.field2
# Test Print out first 8 Bits
# print attributes.bit.get_dict()
# Test Print out Second 8 Bits
# print attributes2.bit.get_dict()
# Fido Message Header Structure
_struct_message_header_fields = [
# Structure Size 14
('H', 'message_type'),
('H', 'origin_node'),
('H', 'destination_node'),
('H', 'origin_network'),
('H', 'destination_network'),
('B', 'attributes_flags1'),
('B', 'attributes_flags2'),
('H', 'cost'),
_struct_fidonet_message_header = '<{0}'.format(
''.join(struct_val for struct_val, _ in _struct_message_header_fields))
FidonetMessageHeader = collections.namedtuple(
'FidonetMessageHeader', [field_name for _, field_name in _struct_message_header_fields])
def read_cstring(file_object, offset):
# Function to read text up to null terminator
new_string = ""
# jump to offset.
assert isinstance(offset, object)
while True:
# read the file object
byte =
if not byte:
if byte in '\x00':
# Break on Null Terminated
new_string += str(byte)
return new_string
def read_message_text(file_object, offset):
# Function to read message text up to null terminator
assert isinstance(offset, object)
message_string = ""
for chunk in iter(lambda:, ''):
if chunk in '\x00':
message_string += chunk
return message_string
def track_area(area):
:rtype : None
if area_count[area] is not None:
area_count[area] += 1
area_count[area] = 1
def print_area_count():
# Print out Counts of messages per area
print ''
total_messages = 0
total_areas = 0
for area in area_count:
print u'Area: {0} -> Total Messages: {1}'.format(
area, area_count[area])
total_messages += area_count[area]
total_areas += 1
# hard coded for now, this will be setup with dupe checking
total_messages_imported = total_messages
print ''
print 'Areas: {0} -> Messages: {1} -> Imported -> {2}.'.format(
total_areas, total_messages, total_messages_imported)
class StoredFidoInfo(object):
# Holds Fido Specific Message and Kludge Data That
# is Absent from the standard message layout
def __init__(self, index):
self.idx = index
self.__status = None
self.__date_processed = None
self.__kludge = set()
def status(self, flag):
self.__status = flag
def kludge_lines(self, k_lines):
assert isinstance(k_lines, object)
self.__kludge = k_lines
def check_status(self):
return self.__status
def check_kludge(self):
return self.__kludge
def save(self):
# persist message index record to database
from x84.bbs import DBProxy
new = self.idx is None
with DBProxy(FIDO_DB, use_session=False) as db_index:
# Not Used, Messages are saved first, with Fido
# Data save with matching index.
if new:
self.idx = max(map(int, db_index.keys()) or [-1]) + 1
db_index['%d' % (self.idx,)] = self
class Message(object):
# Message Object that will be pasted into.
def __init__(self):
:rtype : None
:type self: str
self.date_time = None
self.user_to = None
self.user_from = None
self.subject = None
self.area = None
self.tag_line = None
self.origin_line = None
self.kludge_lines = collections.OrderedDict()
self.seen_by = []
self.raw_data = None
self.message_header = None
self.packet_header = None
self.packet_address = None = None
# Clean Message Text, Split with CR, remove any LF!
self.message_lines = None
def import_messages(self):
from x84.bbs.msgbase import Msg
# hook into x84 and write message to default database and
# keep separate database for fido specific fields.
# 'author':,
# 'subject': msg.subject,
# 'recipient': msg.recipient,
# 'parent': parent,
# 'tags': [tag for tag in msg.tags if tag != network['name']],
# 'ctime': to_utctime(msg.ctime)
store_msg = Msg()
store_msg.recipient = unicode(self.user_to, 'CP437') = unicode(self.user_from, 'CP437')
store_msg.subject = unicode(self.subject, 'CP437')
# Add Check here for Private Netmail messages
# Convert from CP437 for high ascii, later on read CHRS kludge for origin character set
store_msg.body = unicode('\r'.join(self.message_lines).replace('\x9d', ''), 'CP437')
# If area is a normal public echo, default is public
# Change to Network Name ie Agoranet
# Translate the area to the tag description.
# eg.. AGN_GEN -> general
area_tag = cfg.get_tag(, self.area)
# print 'Area Tag: ' + area_tag
if area_tag is not None:
# In testing
# if area is not a public echo, add to sysop group tag
# store_msg.tags.add(u''.join('sysop'))
# Convert Packet String to Date Time format.
# We should also get and check UTZ kludge line! Lateron for offset / Timezone.
# 26 Feb 15 18:04:00
date_object = datetime.datetime.strptime(self.date_time, '%d %b %y %H:%M:%S')
# do not save this message to network, we already received
# it from the network, set send_net=False
# Also avoid sending over X84 NET, ctime=date_object)
print 'Msg Index after save: {0}'.format(store_msg.idx)
# Setup and store the fido kludge data
fido_msg = StoredFidoInfo(store_msg.idx)
print 'Fido Index after save: {0}'.format(fido_msg.idx)
# get message index
# print 'Msg Index after save: {0}'.format(store_msg.idx)
# del store_msg
def add_kludge(self, line):
# Separates Kludge Lines into An Array of Fields
key, value = line.split(None, 1)
key = key[1:]
if key in self.kludge_lines:
assert isinstance(value, object)
self.kludge_lines[key] = [value]
def parse_lines(self):
# Breaks up the message data into fields
stage = 1
message_body = []
# Setup Message Lines by breaking up raw data
self.message_lines = [x.strip('\n') for x in self.raw_data.split('\r')]
for line in self.message_lines:
if len(line) == 0:
# Empty Lines are Newlines
elif stage == 1:
# Start and Middle of Message Text
if line.startswith('AREA:'):
# grab description config file and translate area name
self.area = line.split(':')[1].lower()
# print 'Area : ' + self.area
# Add count for area
elif line.startswith('\x01'):
elif line.startswith('--- '):
# Tracking Tag Lines might be a little much!
self.tag_line = line
# Leave Tag Line in message text
elif 'Origin:' in line[2:10]:
# note some systems like Synchronet doesn't use * for origin prefix!!
# need to put range in for this!! +2, 10
self.origin_line = line
# Leave Tag Line in message text
stage = 2
# not official, just preference to remove this invalid data record.
elif line.startswith('\x1ASAUCE00'):
# skip bad characters or records in messages
elif line.endswith('\x04'):
# Skip SAUCE record end lines!, shouldn't be posted.
# bad characters
elif stage == 2:
# Stage 2 After Origin Line Only
if line.startswith('\x01'):
elif line.startswith('SEEN-BY:'):
raise ValueError('Unexpected: %s' % line)
self.message_lines = message_body
# Import messages to x84
def __str__(self):
# Check this, should swap \r ? -MF
:rtype : str
return '\n'.join(self.message_lines)
def serialize(self):
# Build The Message for Writing out to Packet
lines = []
if self.area:
lines.append('AREA:%s' % self.area)
for key, kludge_value in self.kludge_lines.items():
for value in kludge_value:
# Check if these needs \r at end of line!!
lines.append('\x01%s %s' % (key, value))
if self.origin_line:
class ParsePackets(object):
area_count_dict = {}
def __init__(self, packet_processing):
# Inbound or Outbound processing.
:type packet_processing: str
_packet_processing = packet_processing
if _packet_processing in 'read':
elif _packet_processing in 'write':
def process_outbound():
# Scan for New Messages ready for sending out
:rtype : none
from x84.bbs import DBProxy
print 'process_outbound()'
# Status is a list of all Message Keys
status = set(int(key) for key in DBProxy(FIDO_DB).keys())
print status
# Scan message Status.
with DBProxy(FIDO_DB) as fido_db:
for key, values in fido_db.items():
# print key, values.check_status
# print key, values.check_kludge
for k, v in values.check_kludge.items():
# Grabs Key values of all Kludges
print k, v
# Work out kludge lines now.
# Example modern msg id.
TID: ['Mystic BBS 1.10', FastEcho, SBBSecho, MBSE-FIDO]
MSGID: ['18386.agn_l46@46:1/100 1955835b']
REPLY: ['46:1/145 4659a1ce']
TZUTC: ['-0600']
PATH: ['1/100']
CHRS: ['CP437 2']
def process_inbound():
# Process all packets waiting in the inbound_folder
:rtype : none
message_count = 0
for file_path_zip in glob.glob(os.path.join(cfg.inbound_folder, u'*.*')):
# Uncompress packet bundles, then loop to read packet/message headers/messages
# unzip a clean bundle
with zipfile.ZipFile(file_path_zip) as zip_obj:
print u'Uncompress Bundle: ' + os.path.basename(file_path_zip)
# Loop and process all packets
for file_name in os.listdir(cfg.unpack_folder):
# Parse Each Packet for the Header first.
print u'Parsing Mail Packet: ' + file_name
# Open then Parse Each Packet
fido_object = open(os.path.join(cfg.unpack_folder, file_name), 'rb')
# make Sure we don't read past the end of the file!
packet_header_read =[:58]
except EOFError:
# move to next packet if were at the end.
if not packet_header_read:
# move to next packet, log error here
print u'Error: unable to read packet header: ' + file_name
# Make sure we have correct size! Otherwise were done.
# print 'packet_header_read len: ' + str(len(packet_header_read))
if len(packet_header_read) < 58:
# End of File can have (2) Bytes, catch this.
# Read the Packet Header
fido_header = FidonetPacketHeader(
*struct.unpack(_struct_fidonet_packet, packet_header_read))
# Test the packet header
if fido_header.packet_type != 2:
print u'Error: fido packet not Type-2: ' + file_name
# Validate packet is addressed to this system
# Add 5D addresses? have @domain like @agoranet
if fido_header.destination_point != 0:
# 4D address
packet_address = '{zone}:{net}/{node}.{point}'.format(
zone=fido_header.destination_zone, net=fido_header.destination_network,
node=fido_header.destination_node, point=fido_header.destination_point)
# 3D Address no point.
packet_address = '{zone}:{net}/{node}'.format(
zone=fido_header.destination_zone, net=fido_header.destination_network,
# If Address is not in our network, skip to next packet.
current_network = cfg.check_network_address(packet_address)
if current_network is None:
print u'Error: packet not addressed to your node: {packet}, '\
print u'Packet Received for: {network} -> {packet}'\
.format(network=current_network, packet=packet_address)
assert isinstance(fido_header, object)
# print fido_header
offset = struct.calcsize(_struct_fidonet_packet)
message_count = 0
while True:
# Reset Position to right after Fido Header
# Try to parse the message header
# make Sure we don't read past the end of the file!
message_header_read =[:14]
except EOFError:
# move to next packet if were at the end.
if not message_header_read:
# move to next packet, log error here
# Make sure we have correct size! Otherwise were done.
# print 'message_header_read len: ' + str(len(message_header_read))
if len(message_header_read) <= 2:
# End of File can have (2) Bytes, catch this.
elif len(message_header_read) < 14:
# Read was short!
print u'Error: unable to read message header: ' + file_name
# Read the Message Header
fido_message_header = FidonetMessageHeader(
*struct.unpack(_struct_fidonet_message_header, message_header_read))
assert isinstance(fido_message_header, object)
# Update The Offset
offset += struct.calcsize(_struct_fidonet_message_header)
# Next move back to the next position
""" Next we need to parse For '\x00' terminated strings.
('20s', 'dateTime'),
('36s', 'toUsername'),
('36s', 'fromUsername'),
('72s', 'subject')
# Use cleaner way to keep track of offset!!
date_time_string = read_cstring(fido_object, offset)
offset += len(date_time_string) + 1
username_to = read_cstring(fido_object, offset)
offset += len(username_to) + 1
username_from = read_cstring(fido_object, offset)
offset += len(username_from) + 1
subject_string = read_cstring(fido_object, offset)
offset += len(subject_string) + 1
# We now read the entire message up to null terminator
message_string = read_message_text(fido_object, offset)
offset += len(message_string) + 1
# Breaks up the message and separates out kludge lines from text.
# print Message(message_string, username_to, username_from, subject_string)
current_message = Message()
current_message.date_time = date_time_string
current_message.user_to = username_to
current_message.user_from = username_from
current_message.subject = subject_string
current_message.raw_data = message_string
# Packet Headers will check for source / destination address
# mainly dupe checking
current_message.packet_header = fido_header
# Message Headers will be checked for Import/Export flags etc.
current_message.message_header = fido_message_header
# Populated the Current Network and Address. = current_network
current_message.packet_address = packet_address
# First Parse the Raw Data into Message Lines and
# break out Kludge lines from text
# if No errors then Import Message to x84
message_count += 1
# Cleanup
# del current_message
# Cleanup for next run
print u' Messages This Packet -> ' + str(message_count)
print '*' * 30
# Clear the unpack_folder here later on, leave for testing, just overwrites!
print u'End of Bundle'
print '*' * 60
# Clear out any packets before running next bundle
clear_files = glob.glob(os.path.join(cfg.unpack_folder, u'*.*'))
for f in clear_files:
class TossMessages(ParsePackets):
# handle incoming messages
def __init__(self):
# Inbound or Outbound processing.
:rtype : none
_packet_processing = 'read'
super(TossMessages, self).__init__(_packet_processing)
class ScanMessages(ParsePackets):
# handle outgoing messages
def __init__(self):
# Inbound or Outbound processing.
:rtype : none
_packet_processing = 'write'
super(ScanMessages, self).__init__(_packet_processing)
def main(background_daemon=False):
# Scan for Incoming Message and Import them
if not background_daemon:
# Import Message 80% Done.
# TossMessages()
# Export Messages WIP!
if __name__ == '__main__':
# do not execute message polling as a background thread.
Copy link

Cool i was actually looking for something like this for my x84 BBS.

Copy link

Thanks, it's still really early in development and only does initial importing. x/84's group system needs a bit more work as it doesn't handle private groups properly yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment