Created
May 3, 2019 10:44
-
-
Save rkhapov/e8425b60c6100b0bbc67dc14606cedbc to your computer and use it in GitHub Desktop.
pop3 client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import base64 | |
import select | |
import ssl | |
import json | |
import argparse | |
import os | |
import shutil | |
from email.parser import Parser | |
from email.policy import default | |
CRLF = b'\r\n' | |
class Config: | |
def __init__(self, user, password, server, port, letters_dir, use_ssl): | |
self.user = user | |
self.password = password | |
self.server = server | |
self.port = port | |
self.letters_dir = letters_dir | |
self.use_ssl = use_ssl | |
def load_config(filename): | |
with open(filename, 'r') as config_file: | |
d = json.load(config_file) | |
return Config(d['user'], d['password'], d['server'], d['port'], d['letters_dir'], d['use_ssl']) | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='simple pop3 client') | |
parser.add_argument('--config', required=False, default='mail_config.json', help='config file', type=str) | |
parser.add_argument('--count', required=False, default=1, help='amount of letters to download', type=int) | |
return parser.parse_args() | |
class Attachment: | |
def __init__(self, name, data): | |
self.name = name | |
self.data = data | |
class LetterInfo: | |
def __init__(self, text, subject, from_, to, date, attachments): | |
self.text = text | |
self.subject = subject | |
self.from_ = from_ | |
self.to = to | |
self.date = date | |
self.attachments = attachments | |
def save_at(self, path): | |
if os.path.isdir(path): | |
shutil.rmtree(path) | |
os.mkdir(path) | |
with open(os.path.join(path, 'letter.txt'), 'w') as letter_file: | |
letter_file.write(f'From: {self.from_}\n') | |
letter_file.write(f'To: {self.to}\n') | |
letter_file.write(f'Date: {self.date}\n') | |
letter_file.write(f'Subject: {self.subject}\n') | |
letter_file.write(f'Attachments: {" ".join(map(lambda x: x.name, self.attachments))}\n') | |
letter_file.write(f'Text:\n{self.text}\n') | |
os.mkdir(os.path.join(path, 'attachments')) | |
for attachment in self.attachments: | |
with open(os.path.join(path, 'attachments', attachment.name), 'wb') as attachment_file: | |
attachment_file.write(attachment.data) | |
class Pop3Response: | |
def __init__(self, successful, data): | |
self.successful = successful | |
self.data = data | |
def __str__(self): | |
if self.successful: | |
return f'[SUCCESSFUL RESPONSE]\n{self.data}' | |
return f'[FAILED RESPONSE]\n{self.data}' | |
class Pop3Client: | |
def __init__(self, host, port, user, password, use_ssl): | |
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
if use_ssl: | |
self.__sock = ssl.wrap_socket(self.__sock, ssl_version=ssl.PROTOCOL_SSLv23) | |
self.__sock.connect((host, port)) | |
self.__sock.settimeout(10) | |
self._login(user, password) | |
def _login(self, user, password): | |
print(f'Login for {user}...') | |
_ = self._read_single_line() | |
user_response = self.execute_raw('USER', [user]) | |
if not user_response.successful: | |
raise ValueError(f'User command failed: {user_response.data}') | |
pass_response = self.execute_raw('PASS', [password]) | |
if not pass_response.successful: | |
raise ValueError(f'Password command failed: {pass_response.data}') | |
print(f'Successfully login for: {user}') | |
def __enter__(self): | |
return self | |
def __exit__(self, e, et, tb): | |
self.__sock.close() | |
return False | |
def retr(self, i): | |
resp = self.execute_raw('RETR', [i], is_long_cmd=True) | |
if not resp.successful: | |
print(resp.data) | |
return | |
headers = Parser(policy=default).parsestr(resp.data) | |
from_ = headers['From'] | |
to = headers['To'] | |
date = headers['Date'] | |
subject = headers['Subject'] | |
parts = list(headers.iter_parts()) | |
text = parts[0].get_payload() | |
attachments = [] | |
for i in range(1, len(parts)): | |
attachment = self._read_attachment(parts[i]) | |
if attachment: | |
attachments.append(attachment) | |
return LetterInfo(text, subject, from_, to, date, attachments) | |
def _read_attachment(self, h): | |
name = '' | |
if h['Content-Type'].maintype != 'image' or h['Content-Transfer-Encoding'].cte != 'base64': | |
print(f'Cant read attachment of type: {h["Content-Type"].content_type} and cte {h["Content-Transfer-Encoding"].cte}') | |
return None | |
for p in h['Content-Type'].split(';'): | |
if p.find('name=') == -1: | |
continue | |
pp = p.split('=') | |
name = pp[1].strip('"') | |
break | |
data = base64.b64decode(''.join(h.get_payload().split())) | |
return Attachment(name, data) | |
def execute_raw(self, command, args=None, is_long_cmd=False): | |
cmd = command.encode() | |
if args: | |
cmd += b' ' + b' '.join(map(lambda x: str(x).encode(), args)) | |
if not cmd.endswith(CRLF): | |
cmd += CRLF | |
self.__sock.sendall(cmd) | |
if not is_long_cmd: | |
return self._read_response_with_single_line() | |
return self._read_long_response() | |
def _read_long_response(self): | |
resp = self._read_response_with_single_line() | |
lines = [] | |
if not resp.successful: | |
return resp | |
line = self._read_single_line() | |
while line != b'.': | |
if not line.startswith(b'..'): | |
lines.append(line) | |
else: | |
lines.append(b'.') | |
line = self._read_single_line() | |
return Pop3Response(True, '\n'.join(map(lambda x: x.decode(), lines))) | |
def _read_response_with_single_line(self): | |
line = self._read_single_line().decode() | |
successful = line.startswith('+OK') | |
data = (line[4:] if len(line) > 3 else line[3:]) if successful else (line[5:] if len(line) > 4 else line[4:]) | |
return Pop3Response(successful, data) | |
def _read_single_line(self): | |
line = bytearray() | |
while True: | |
line.extend(self.__sock.recv(1)) | |
if line.endswith(CRLF): | |
break | |
return line[:-2] | |
def main(): | |
try: | |
args = parse_args() | |
config = load_config(args.config) | |
if not os.path.isdir(config.letters_dir): | |
os.mkdir(config.letters_dir) | |
with Pop3Client(config.server, config.port, config.user, config.password, config.use_ssl) as client: | |
for i in range(args.count): | |
letter = client.retr(i + 1) | |
letter.save_at(os.path.join(config.letters_dir, str(i + 1))) | |
except (FileNotFoundError, KeyError): | |
print('Invalid config file') | |
except socket.timeout: | |
print('Network operation timeout') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment