Skip to content

Instantly share code, notes, and snippets.

Last active June 19, 2018 08:55
Show Gist options
  • Save chadyred/ae6a6c2962b559082ae2c30180f6cd0a to your computer and use it in GitHub Desktop.
Save chadyred/ae6a6c2962b559082ae2c30180f6cd0a to your computer and use it in GitHub Desktop.
Allow use to get a given column on a CSV with a dataframe
import abc
import sys
import json, base64
from typing import Callable, IO, cast
import requests
import pandas as pd
import attr
class FileData(object):
path = attr.ib(type=str)
content = attr.ib(init="",type=str)
dataframe = attr.ib(init="",type=str)
class ParamsData():
params = attr.ib(default={})
headers = attr.ib(default={})
class MessageTemplating(metaclass=abc.ABCMeta):
def format_message(self, message: 'Messagerable') -> str:
"""Message factoring"""
class Messagerable(metaclass=abc.ABCMeta):
def print_with_on(self, message: str, messagerTemplate : 'MessageTemplating', stream: IO[str]) -> 'Messagerable':
"""Message factoring"""
class SystemIoManipulator(metaclass=abc.ABCMeta):
"""Able to read anything"""
class Reader(SystemIoManipulator):
"""Able to read anything"""
class FileReader(Reader):
def read_element(self, file: 'File', action: Callable[['Messagerable'], None]) -> 'FileReader':
"""File reader of a computer system"""
class ReaderOfFile(FileReader):
def read_element(self, file: 'File', action: Callable[['File'], None]) -> 'FileReader':
"""File reader of a computer system"""
file.content = open(file.path).read()
return self
class Messager(Messagerable):
def print_with_on(self, message: str, messagerTemplate : 'MessageTemplating', stream: IO[str]) -> 'Messagerable':
lambda message_formatted: stream.write(message_formatted)
return self
class MessagerTemplate(MessageTemplating):
def format_message(self, message: str, action: Callable[[str], None]) -> str:
"""Show notice"""
action("Notice : " + message + '\n')
return self
class Parser(SystemIoManipulator):
"""Parse anything"""
class JsonParser(SystemIoManipulator):
"""Parse file content to json"""
def make_json_of_string_with_then_do(self, content: str, action: Callable[ [str], None ]) -> 'JsonParser':
"""Tranform content of file in json"""
class MyJsonParser(JsonParser):
"""Parse file content to many format"""
def make_json_of_string_with_then_do(self, content: str, action: Callable[ [str], None ]) -> 'JsonParser':
"""Tranform content of file in json which convert to str (ex unicode in python2)"""
json = json.loads(content)
return self
class CsvParser(Parser):
"""Parse csv content to many format"""
def add_padding(self, b64_string) -> str:
"""Add padding"""
def make_me_a_series_of_a_csv_column_with_then_do(self, file: 'File', column: str, encodage: str, action: Callable[['File'], None]) -> 'CsvParser':
def extract_content_of_dataframe_given_column_base_64_encoded(self, file: 'File', column: str, encodage: str, action: Callable[['File'], None]) -> 'CsvParser':
class MyCsvParser(Parser):
"""Parse csv content to many format"""
def add_padding(self, b64_string) -> str:
b64_string += "=" * ((4 - len(b64_string) % 4) % 4) #ugh
return b64_string
def make_me_a_series_of_a_csv_column_with_then_do(self, file: 'File', column: str, encodage: str, action: Callable[['File'], None]) -> 'CsvParser':
"""Parse CSV"""
dataframe = pd.read_csv(file.path, sep=',', encoding=encodage)
# Todo one more step to handle dataframe specific column to get data of matrix
dataframe[column] = dataframe[column].apply(lambda y: self.add_padding(y))
file.dataframe = dataframe
return self
def extract_content_of_dataframe_given_column_base_64_encoded(self, file: 'File', column: str, encodage: str, action: Callable[['File'], None]) -> 'CsvParser':
file.dataframe['result'] = file.dataframe[column][2:].apply(lambda s : MyBase64Decoder().simple_decode(s))
return self
class HostCaller(metaclass=abc.ABCMeta):
def make_me_a_call(self, host: str, params: dict, action: Callable[['Messagerable'], None]) -> 'HostCaller':
"""Call host with credential"""
class ApiCallerVirusTotal(HostCaller):
def make_me_a_call(self, host: str, paramsData: 'ParamsData', action: Callable[['Messagerable'], None]) -> str:
"""Call host with credential"""
response =, paramsData.params, headers=paramsData.headers)
json = response.json()
return self
class Decoder(metaclass=abc.ABCMeta):
"""Decode anything"""
class Base64Decoder(Decoder):
def simple_decode(self, base64Attr: str, encodage='latin-1') -> bytes:
"""decode base 64"""
def decode_this_with_then_do(self, base64Attr: str, action: Callable[[str], None], encodage='latin-1') -> 'MyBase64Decoder':
"""decode base 64"""
class MyBase64Decoder(Base64Decoder):
def simple_decode(self, base64Attr: str, encodage='latin-1') -> bytes:
"""decode base 64"""
return base64.b64decode(base64Attr).decode(encodage)
def decode_this_with_then_do(self, base64Attr: str, action: Callable[[str], None], encodage='latin-1') -> 'MyBase64Decoder':
"""decode base 64"""
return self
if __name__ == '__main__':
lambda file: MyCsvParser().extract_content_of_dataframe_given_column_base_64_encoded(
lambda file: Messager().print_with_on(
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment