Created
December 13, 2017 18:59
-
-
Save MayerDaniel/759df87cd42d40def1b6042ae4c6d1a6 to your computer and use it in GitHub Desktop.
Edgar Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyicloud import PyiCloudService | |
import sys | |
class iCloudInterface: | |
def __init__(self, id, password): | |
self.id = id | |
self.password = password | |
self.api = '' | |
def login(self): | |
self.api = PyiCloudService(self.id) | |
if self.api.requires_2fa: | |
print "Two-factor authentication required. Your trusted devices are:" | |
devices = self.api.trusted_devices | |
for i, device in enumerate(devices): | |
print " %s: %s" % (i, device.get('deviceName', | |
"SMS to %s" % device.get('phoneNumber'))) | |
device = raw_input('Which device would you like to use? ') | |
device = devices[int(device)] | |
if not self.api.send_verification_code(device): | |
print "Failed to send verification code" | |
sys.exit(1) | |
code = raw_input('Please enter validation code: ') | |
if not self.api.validate_verification_code(device, code): | |
print "Failed to verify verification code" | |
sys.exit(1) | |
def get_contacts(self): | |
contacts = {} | |
for c in self.api.contacts.all(): | |
name = c.get('firstName') | |
if c.get('lastName'): | |
name = name + ' ' + c.get('lastName') | |
phones = c.get('phones') | |
numbers = [] | |
if phones: | |
for item in c.get('phones'): | |
numbers.append(item['field']) | |
else: | |
numbers = [None] | |
contacts[name] = { "numbers" : numbers} | |
return(contacts) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import edgar_cloud | |
import imessage | |
import re | |
import os | |
import random | |
import urllib | |
import urllib2 | |
from bs4 import BeautifulSoup | |
import pickle | |
MESSAGE_CONTENT = 0 | |
PHONE_NUMBER = 1 | |
def save_obj(obj, name ): | |
with open('obj/'+ name + '.pkl', 'wb') as f: | |
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) | |
def load_obj(name ): | |
with open('obj/' + name + '.pkl', 'rb') as f: | |
return pickle.load(f) | |
class Edgar(): | |
def __init__(self, username, password): | |
self.cloud = edgar_cloud.iCloudInterface(username, password) | |
self.cloud.login() | |
self.contacts = self.cloud.get_contacts() | |
mem_path = 'obj/memories.pkl' | |
if not os.path.exists(mem_path): | |
file(mem_path, 'w').close() | |
self.memories = {} | |
save_obj(self.memories,'memories') | |
else: | |
self.memories = load_obj('memories') | |
song_path = 'obj/songs.pkl' | |
if not os.path.exists(song_path): | |
file(song_path, 'w').close() | |
self.songs = {} | |
save_obj(self.songs,'songs') | |
else: | |
self.songs = load_obj('songs') | |
def send_message(self, string, buddies, noRobot=False): | |
string = string.replace("'", "") | |
string = string.replace('"', '') | |
faces = ["<I^_^I>", "<Io_oI>", "<I*_*I>", "<IO_OI>", "<I-_-I>", "<I0_0I>", "<Io_0I>", "<IU_UI>", "<I+_+I>", "<I=_=I>"] | |
buddy_list = [] | |
participants = [] | |
for i, buddy in enumerate(buddies): | |
b = "theBuddy%s" % str(i) | |
buddy_list.append("set " + b + " to buddy \"" + buddy +"\"") | |
participants.append(b) | |
buddy_list = "\r".join(buddy_list) | |
participants = ", ".join(participants) | |
if not noRobot: | |
string = random.choice(faces) + " " + string | |
body = """ | |
osascript -e 'tell application "Messages" | |
activate | |
%s | |
set thisChat to make new text chat with properties {participants:{%s}} | |
set thisMessage to send "%s" to thisChat | |
end tell' """ % (buddy_list, participants, string) | |
print(body) | |
os.system(body) | |
def here(self, buddies): | |
msg = "*Beep Boop* I am here!" | |
self.send_message(msg, buddies) | |
def what_am_i(self, buddies): | |
commands = ["Are you here? - Returns if I am running", "What are you? - Returns with help on how to use me", "Remember <key> is <value> - I will remember this relation", "What is <key> - I search my memories for a relation", "Odds <number> - I will generate 2 random numbers for odds within the range given", "Dog pic - returns a cute dog pic from the internet", "Set song of the week <url> <name> - sets that person's song of the week", "Song of the week <name>/all - gets a specific person/everyone's song of the week"] | |
msg = "I am a robot that lives inside of Dans texts. You can call me with @Edgar. \rI currently have the following commands:" | |
for i, com in enumerate(commands): | |
msg = msg + "\r" + com | |
self.send_message(msg, buddies) | |
def remember(self, buddies, string1, string2): | |
self.memories[string1.lower()] = string2 | |
save_obj(self.memories, "memories") | |
msg = "Ok, I will remember " + string1 + " is " + string2 | |
self.send_message(msg, buddies) | |
def recall(self, buddies, string1): | |
s1 = string1.lower() | |
if s1 in self.memories: | |
self.send_message(self.memories[s1], buddies) | |
else: | |
msg = "Sorry, I dont have a memory for " + string1 | |
self.send_message(msg, buddies) | |
def set_song(self, buddies, song, name): | |
if name == 'all': | |
msg = "All is reserved, sorry" | |
self.send_message(msg, buddies) | |
else: | |
self.songs[name.lower()] = song | |
save_obj(self.songs, "songs") | |
msg = name + "s song of the week set as " + song | |
self.send_message(msg, buddies) | |
def get_song(self, buddies, name): | |
s1 = name.lower() | |
msg = '' | |
if s1 == 'all': | |
for key, value in self.songs.iteritems(): | |
msg = msg + "\r" + key + ": " + value | |
msg = msg + "\r *Beep Boop*" | |
self.send_message(msg, buddies) | |
elif s1 in self.songs: | |
self.send_message(self.songs[s1], buddies) | |
else: | |
msg = "Sorry, I dont have a song of the week for " + name | |
self.send_message(msg, buddies) | |
def odds(self, buddies, number): | |
if number < 1: | |
msg = "Odds must be with 1 or above" | |
self.send_message(msg, buddies) | |
num1 = random.randint(1, number) | |
num2 = random.randint(1, number) | |
result = "" | |
if num1 == num2: | |
result = "I demand completion of the odds!!" | |
else: | |
result = "no dice." | |
msg = "%s and %s - %s" % (num1, num2, result) | |
self.send_message(msg, buddies) | |
def dog_pic(self, buddies): | |
url = 'https://imgur.com/r/dogpictures' | |
response = urllib2.urlopen(url) | |
html = response.read() | |
soup = BeautifulSoup(html) | |
dogs = soup.findAll(attrs={'class':'image-list-link'}) | |
dog = random.choice(dogs) | |
dog_url = dog.find('img')['src'] | |
print(dog_url) | |
dog_url = dog_url.replace("//", "") | |
msg = dog_url.replace("b.", ".") | |
self.send_message("Enjoy your dog picture!", buddies) | |
self.send_message(msg, buddies, noRobot=True) | |
def read(self, message): | |
if not message[MESSAGE_CONTENT]: | |
pass | |
text = message[MESSAGE_CONTENT].text | |
date = message[MESSAGE_CONTENT].date | |
numbers = message[PHONE_NUMBER] | |
command = text.split(" ") | |
buddies = [] | |
print command | |
if(command[0] == "@Edgar" or command[0] == "@edgar"): | |
for name, info in self.contacts.iteritems(): | |
for n in info["numbers"]: | |
for x in numbers: | |
if n == x: | |
buddies.append(name) | |
if buddies == []: | |
pass | |
else: | |
print buddies | |
command.pop(0) | |
command = " ".join(command) | |
if re.match(r"are you here.*", command, re.IGNORECASE): | |
self.here(buddies) | |
elif re.match(r"what are you.*", command, re.IGNORECASE): | |
self.what_am_i(buddies) | |
elif re.match(r"remember .*? is .*?", command, re.IGNORECASE): | |
string1 = re.search(r"remember (.*?) is", command, re.IGNORECASE).group(1) | |
string2 = re.search(r"(.*? is )(.*?)$", command, re.IGNORECASE).group(2) | |
self.remember(buddies, string1, string2) | |
elif re.match(r"what is .*?", command, re.IGNORECASE): | |
string1 = re.search(r"what is (.*?)\??$", command, re.IGNORECASE).group(1) | |
self.recall(buddies, string1) | |
elif re.match(r"did you hear( a)? 4.*?", command, re.IGNORECASE): | |
msg = ["I heard a 4", "I definitely heard 4, anyone else hear a 4?", "*Beep Boop* Thats a yar darg"] | |
self.send_message(random.choice(msg), buddies) | |
elif re.match(r"odds [0-9]*$", command, re.IGNORECASE): | |
number = [int(s) for s in command.split() if s.isdigit()][0] | |
self.odds(buddies, number) | |
elif re.match(r"dog pic.*?", command, re.IGNORECASE): | |
self.dog_pic(buddies) | |
elif re.match(r"song of the week .*?", command, re.IGNORECASE): | |
regex = re.search(r"song of the week (.*?)$", command, re.IGNORECASE) | |
name = regex.group(1) | |
self.get_song(buddies, name) | |
elif re.match(r"set song of the week (.*?) (.*?)$", command, re.IGNORECASE): | |
regex = re.search(r"set song of the week (.*?) (.*?)$", command, re.IGNORECASE) | |
song = regex.group(1) | |
name = regex.group(2) | |
self.set_song(buddies, song, name) | |
elif re.match(r"good bot.*?|thank you.*?|thanks.*?", command, re.IGNORECASE): | |
msg = "Glad I could be of assistance, meatbag. *Beep Boop*" | |
self.send_message(msg, buddies) | |
else: | |
self.send_message("I dont recognize that command. Text @Edgar what are you? for help", buddies) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os.path import expanduser | |
import sqlite3 | |
import datetime | |
OSX_EPOCH = 978307200 | |
LAST_READ = -1 | |
# Represents a user that iMessages can be exchanged with. | |
# | |
# Each user has... | |
# - an `id` property that uniquely identifies him or her in the Messages database | |
# - a `phone_or_email` property that is either the user's phone number or iMessage-enabled email address | |
class Recipient: | |
def __init__(self, id, phone_or_email): | |
self.id = id | |
self.phone_or_email = phone_or_email | |
def __repr__(self): | |
return "ID: " + str(self.id) + " Phone or email: " + self.phone_or_email | |
# Represents an iMessage message. | |
# | |
# Each message has: | |
# - a `text` property that holds the text contents of the message | |
# - a `date` property that holds the delivery date of the message | |
class Message: | |
def __init__(self, text, date): | |
self.text = text | |
self.date = date | |
def __repr__(self): | |
return "Text: " + self.text + " Date: " + str(self.date) | |
def login(): | |
cloud = edgar_cloud.iCloudInterface(username, password) | |
cloud.login() | |
CONTACTS = cloud.get_contacts() | |
def _new_connection(): | |
# The current logged-in user's Messages sqlite database is found at: | |
# ~/Library/Messages/chat.db | |
db_path = expanduser("~") + '/Library/Messages/chat.db' | |
return sqlite3.connect(db_path) | |
# Fetches all known recipients. | |
# | |
# The `id`s of the recipients fetched can be used to fetch all messages exchanged with a given recipient. | |
def get_all_recipients(): | |
connection = _new_connection() | |
c = connection.cursor() | |
# The `handle` table stores all known recipients. | |
c.execute("SELECT * FROM `handle`") | |
recipients = [] | |
for row in c: | |
recipients.append(Recipient(row[0], row[1])) | |
connection.close() | |
return recipients | |
# Fetches all messages exchanged with a given recipient. | |
def get_messages_for_recipient(id): | |
connection = _new_connection() | |
c = connection.cursor() | |
# The `message` table stores all exchanged iMessages. | |
c.execute("SELECT * FROM `message` WHERE handle_id=" + str(id)) | |
messages = [] | |
for row in c: | |
text = row[2] | |
if text is None: | |
continue | |
date = datetime.datetime.fromtimestamp(row[15] + OSX_EPOCH) | |
encoded_text = text.encode('ascii', 'ignore') | |
messages.append(Message(encoded_text, date)) | |
print(messages) | |
connection.close() | |
return messages | |
def id_to_numbers(row_id): | |
connection = _new_connection() | |
c = connection.cursor() | |
# The `message` table stores all exchanged iMessages. | |
chat = '' | |
while chat == '': | |
c.execute("SELECT * FROM chat_message_join WHERE message_id=" + str(row_id)) | |
for row in c: | |
print(row) | |
chat = row[0] | |
handles = [] | |
print("CHAT = " + str(chat)) | |
c.execute("SELECT * FROM chat_handle_join WHERE chat_id=" + str(chat)) | |
for row in c: | |
handles.append(str(row[1])) | |
valid_nums = ", ".join(handles) | |
valid_nums = "(" + valid_nums + ")" | |
numbers = [] | |
c.execute("SELECT * FROM handle WHERE ROWID IN" + valid_nums) | |
for row in c: | |
numbers.append(row[1]) | |
connection.close() | |
print(numbers) | |
return numbers | |
# Fetches all messages exchanged with a given recipient. | |
def get_last_message(): | |
global LAST_READ | |
connection = _new_connection() | |
c = connection.cursor() | |
# The `message` table stores all exchanged iMessages. | |
text = '' | |
row_id = '' | |
date = '' | |
if LAST_READ == -1: | |
c.execute("SELECT * FROM message WHERE ROWID = (SELECT MAX(ROWID) FROM message)") | |
else: | |
c.execute("SELECT * FROM message WHERE ROWID > " + str(LAST_READ)) | |
messages = [] | |
for row in c: | |
row_id = row[0] | |
text = row[2] | |
if text is None: | |
continue | |
date = datetime.datetime.fromtimestamp(row[15] + OSX_EPOCH) | |
encoded_text = text.encode('ascii', 'ignore') | |
message = Message(encoded_text, date) | |
handles = id_to_numbers(row_id) | |
LAST_READ = row_id | |
messages.append([message, handles]) | |
return(messages) | |
connection.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import edgar_cloud | |
import imessage | |
import threading | |
import sys | |
import time | |
import os | |
import logging | |
import edgarbot | |
import getpass | |
from watchdog.observers import Observer | |
from watchdog.events import FileSystemEventHandler | |
class MyHandler(FileSystemEventHandler): | |
sleep_time = 0.1 | |
def get_Edgar(self, Edgar): | |
self.Edgar = Edgar | |
def on_modified(self, event): | |
messages = imessage.get_last_message() | |
threads = [] | |
for message in messages: | |
t = threading.Thread(target=self.Edgar.read(message)) | |
threads.append(t) | |
t.start() | |
class Listener: | |
def __init__(self, username, password): | |
self.Ed = edgarbot.Edgar(username, password) | |
def listen(self): | |
print "Edgar is listening!" | |
homedir = os.environ['HOME'] | |
path = homedir + "/Library/Messages/" | |
event_handler = MyHandler() | |
event_handler.get_Edgar(self.Ed) | |
observer = Observer(timeout=0.5) | |
observer.event_queue.maxsize=10 | |
observer.schedule(event_handler, path, recursive=False) | |
observer.start() | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
observer.stop() | |
observer.join() | |
def main(): | |
username = raw_input("Please input your icloud username: ") | |
password = getpass.getpass("Please enter your icloud password: ") | |
l = Listener(username, password) | |
l.listen() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment