Skip to content

Instantly share code, notes, and snippets.

@MayerDaniel
Created December 13, 2017 18:59
Show Gist options
  • Save MayerDaniel/759df87cd42d40def1b6042ae4c6d1a6 to your computer and use it in GitHub Desktop.
Save MayerDaniel/759df87cd42d40def1b6042ae4c6d1a6 to your computer and use it in GitHub Desktop.
Edgar Code
from pyicloud import PyiCloudService
import sys
class iCloudInterface:
def __init__(self, id, password):
self.id = id
self.password = password
self.api = ''
def login(self):
self.api = PyiCloudService(self.id)
if self.api.requires_2fa:
print "Two-factor authentication required. Your trusted devices are:"
devices = self.api.trusted_devices
for i, device in enumerate(devices):
print " %s: %s" % (i, device.get('deviceName',
"SMS to %s" % device.get('phoneNumber')))
device = raw_input('Which device would you like to use? ')
device = devices[int(device)]
if not self.api.send_verification_code(device):
print "Failed to send verification code"
sys.exit(1)
code = raw_input('Please enter validation code: ')
if not self.api.validate_verification_code(device, code):
print "Failed to verify verification code"
sys.exit(1)
def get_contacts(self):
contacts = {}
for c in self.api.contacts.all():
name = c.get('firstName')
if c.get('lastName'):
name = name + ' ' + c.get('lastName')
phones = c.get('phones')
numbers = []
if phones:
for item in c.get('phones'):
numbers.append(item['field'])
else:
numbers = [None]
contacts[name] = { "numbers" : numbers}
return(contacts)
import edgar_cloud
import imessage
import re
import os
import random
import urllib
import urllib2
from bs4 import BeautifulSoup
import pickle
MESSAGE_CONTENT = 0
PHONE_NUMBER = 1
def save_obj(obj, name ):
with open('obj/'+ name + '.pkl', 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(name ):
with open('obj/' + name + '.pkl', 'rb') as f:
return pickle.load(f)
class Edgar():
def __init__(self, username, password):
self.cloud = edgar_cloud.iCloudInterface(username, password)
self.cloud.login()
self.contacts = self.cloud.get_contacts()
mem_path = 'obj/memories.pkl'
if not os.path.exists(mem_path):
file(mem_path, 'w').close()
self.memories = {}
save_obj(self.memories,'memories')
else:
self.memories = load_obj('memories')
song_path = 'obj/songs.pkl'
if not os.path.exists(song_path):
file(song_path, 'w').close()
self.songs = {}
save_obj(self.songs,'songs')
else:
self.songs = load_obj('songs')
def send_message(self, string, buddies, noRobot=False):
string = string.replace("'", "")
string = string.replace('"', '')
faces = ["<I^_^I>", "<Io_oI>", "<I*_*I>", "<IO_OI>", "<I-_-I>", "<I0_0I>", "<Io_0I>", "<IU_UI>", "<I+_+I>", "<I=_=I>"]
buddy_list = []
participants = []
for i, buddy in enumerate(buddies):
b = "theBuddy%s" % str(i)
buddy_list.append("set " + b + " to buddy \"" + buddy +"\"")
participants.append(b)
buddy_list = "\r".join(buddy_list)
participants = ", ".join(participants)
if not noRobot:
string = random.choice(faces) + " " + string
body = """
osascript -e 'tell application "Messages"
activate
%s
set thisChat to make new text chat with properties {participants:{%s}}
set thisMessage to send "%s" to thisChat
end tell' """ % (buddy_list, participants, string)
print(body)
os.system(body)
def here(self, buddies):
msg = "*Beep Boop* I am here!"
self.send_message(msg, buddies)
def what_am_i(self, buddies):
commands = ["Are you here? - Returns if I am running", "What are you? - Returns with help on how to use me", "Remember <key> is <value> - I will remember this relation", "What is <key> - I search my memories for a relation", "Odds <number> - I will generate 2 random numbers for odds within the range given", "Dog pic - returns a cute dog pic from the internet", "Set song of the week <url> <name> - sets that person's song of the week", "Song of the week <name>/all - gets a specific person/everyone's song of the week"]
msg = "I am a robot that lives inside of Dans texts. You can call me with @Edgar. \rI currently have the following commands:"
for i, com in enumerate(commands):
msg = msg + "\r" + com
self.send_message(msg, buddies)
def remember(self, buddies, string1, string2):
self.memories[string1.lower()] = string2
save_obj(self.memories, "memories")
msg = "Ok, I will remember " + string1 + " is " + string2
self.send_message(msg, buddies)
def recall(self, buddies, string1):
s1 = string1.lower()
if s1 in self.memories:
self.send_message(self.memories[s1], buddies)
else:
msg = "Sorry, I dont have a memory for " + string1
self.send_message(msg, buddies)
def set_song(self, buddies, song, name):
if name == 'all':
msg = "All is reserved, sorry"
self.send_message(msg, buddies)
else:
self.songs[name.lower()] = song
save_obj(self.songs, "songs")
msg = name + "s song of the week set as " + song
self.send_message(msg, buddies)
def get_song(self, buddies, name):
s1 = name.lower()
msg = ''
if s1 == 'all':
for key, value in self.songs.iteritems():
msg = msg + "\r" + key + ": " + value
msg = msg + "\r *Beep Boop*"
self.send_message(msg, buddies)
elif s1 in self.songs:
self.send_message(self.songs[s1], buddies)
else:
msg = "Sorry, I dont have a song of the week for " + name
self.send_message(msg, buddies)
def odds(self, buddies, number):
if number < 1:
msg = "Odds must be with 1 or above"
self.send_message(msg, buddies)
num1 = random.randint(1, number)
num2 = random.randint(1, number)
result = ""
if num1 == num2:
result = "I demand completion of the odds!!"
else:
result = "no dice."
msg = "%s and %s - %s" % (num1, num2, result)
self.send_message(msg, buddies)
def dog_pic(self, buddies):
url = 'https://imgur.com/r/dogpictures'
response = urllib2.urlopen(url)
html = response.read()
soup = BeautifulSoup(html)
dogs = soup.findAll(attrs={'class':'image-list-link'})
dog = random.choice(dogs)
dog_url = dog.find('img')['src']
print(dog_url)
dog_url = dog_url.replace("//", "")
msg = dog_url.replace("b.", ".")
self.send_message("Enjoy your dog picture!", buddies)
self.send_message(msg, buddies, noRobot=True)
def read(self, message):
if not message[MESSAGE_CONTENT]:
pass
text = message[MESSAGE_CONTENT].text
date = message[MESSAGE_CONTENT].date
numbers = message[PHONE_NUMBER]
command = text.split(" ")
buddies = []
print command
if(command[0] == "@Edgar" or command[0] == "@edgar"):
for name, info in self.contacts.iteritems():
for n in info["numbers"]:
for x in numbers:
if n == x:
buddies.append(name)
if buddies == []:
pass
else:
print buddies
command.pop(0)
command = " ".join(command)
if re.match(r"are you here.*", command, re.IGNORECASE):
self.here(buddies)
elif re.match(r"what are you.*", command, re.IGNORECASE):
self.what_am_i(buddies)
elif re.match(r"remember .*? is .*?", command, re.IGNORECASE):
string1 = re.search(r"remember (.*?) is", command, re.IGNORECASE).group(1)
string2 = re.search(r"(.*? is )(.*?)$", command, re.IGNORECASE).group(2)
self.remember(buddies, string1, string2)
elif re.match(r"what is .*?", command, re.IGNORECASE):
string1 = re.search(r"what is (.*?)\??$", command, re.IGNORECASE).group(1)
self.recall(buddies, string1)
elif re.match(r"did you hear( a)? 4.*?", command, re.IGNORECASE):
msg = ["I heard a 4", "I definitely heard 4, anyone else hear a 4?", "*Beep Boop* Thats a yar darg"]
self.send_message(random.choice(msg), buddies)
elif re.match(r"odds [0-9]*$", command, re.IGNORECASE):
number = [int(s) for s in command.split() if s.isdigit()][0]
self.odds(buddies, number)
elif re.match(r"dog pic.*?", command, re.IGNORECASE):
self.dog_pic(buddies)
elif re.match(r"song of the week .*?", command, re.IGNORECASE):
regex = re.search(r"song of the week (.*?)$", command, re.IGNORECASE)
name = regex.group(1)
self.get_song(buddies, name)
elif re.match(r"set song of the week (.*?) (.*?)$", command, re.IGNORECASE):
regex = re.search(r"set song of the week (.*?) (.*?)$", command, re.IGNORECASE)
song = regex.group(1)
name = regex.group(2)
self.set_song(buddies, song, name)
elif re.match(r"good bot.*?|thank you.*?|thanks.*?", command, re.IGNORECASE):
msg = "Glad I could be of assistance, meatbag. *Beep Boop*"
self.send_message(msg, buddies)
else:
self.send_message("I dont recognize that command. Text @Edgar what are you? for help", buddies)
from os.path import expanduser
import sqlite3
import datetime
OSX_EPOCH = 978307200
LAST_READ = -1
# Represents a user that iMessages can be exchanged with.
#
# Each user has...
# - an `id` property that uniquely identifies him or her in the Messages database
# - a `phone_or_email` property that is either the user's phone number or iMessage-enabled email address
class Recipient:
def __init__(self, id, phone_or_email):
self.id = id
self.phone_or_email = phone_or_email
def __repr__(self):
return "ID: " + str(self.id) + " Phone or email: " + self.phone_or_email
# Represents an iMessage message.
#
# Each message has:
# - a `text` property that holds the text contents of the message
# - a `date` property that holds the delivery date of the message
class Message:
def __init__(self, text, date):
self.text = text
self.date = date
def __repr__(self):
return "Text: " + self.text + " Date: " + str(self.date)
def login():
cloud = edgar_cloud.iCloudInterface(username, password)
cloud.login()
CONTACTS = cloud.get_contacts()
def _new_connection():
# The current logged-in user's Messages sqlite database is found at:
# ~/Library/Messages/chat.db
db_path = expanduser("~") + '/Library/Messages/chat.db'
return sqlite3.connect(db_path)
# Fetches all known recipients.
#
# The `id`s of the recipients fetched can be used to fetch all messages exchanged with a given recipient.
def get_all_recipients():
connection = _new_connection()
c = connection.cursor()
# The `handle` table stores all known recipients.
c.execute("SELECT * FROM `handle`")
recipients = []
for row in c:
recipients.append(Recipient(row[0], row[1]))
connection.close()
return recipients
# Fetches all messages exchanged with a given recipient.
def get_messages_for_recipient(id):
connection = _new_connection()
c = connection.cursor()
# The `message` table stores all exchanged iMessages.
c.execute("SELECT * FROM `message` WHERE handle_id=" + str(id))
messages = []
for row in c:
text = row[2]
if text is None:
continue
date = datetime.datetime.fromtimestamp(row[15] + OSX_EPOCH)
encoded_text = text.encode('ascii', 'ignore')
messages.append(Message(encoded_text, date))
print(messages)
connection.close()
return messages
def id_to_numbers(row_id):
connection = _new_connection()
c = connection.cursor()
# The `message` table stores all exchanged iMessages.
chat = ''
while chat == '':
c.execute("SELECT * FROM chat_message_join WHERE message_id=" + str(row_id))
for row in c:
print(row)
chat = row[0]
handles = []
print("CHAT = " + str(chat))
c.execute("SELECT * FROM chat_handle_join WHERE chat_id=" + str(chat))
for row in c:
handles.append(str(row[1]))
valid_nums = ", ".join(handles)
valid_nums = "(" + valid_nums + ")"
numbers = []
c.execute("SELECT * FROM handle WHERE ROWID IN" + valid_nums)
for row in c:
numbers.append(row[1])
connection.close()
print(numbers)
return numbers
# Fetches all messages exchanged with a given recipient.
def get_last_message():
global LAST_READ
connection = _new_connection()
c = connection.cursor()
# The `message` table stores all exchanged iMessages.
text = ''
row_id = ''
date = ''
if LAST_READ == -1:
c.execute("SELECT * FROM message WHERE ROWID = (SELECT MAX(ROWID) FROM message)")
else:
c.execute("SELECT * FROM message WHERE ROWID > " + str(LAST_READ))
messages = []
for row in c:
row_id = row[0]
text = row[2]
if text is None:
continue
date = datetime.datetime.fromtimestamp(row[15] + OSX_EPOCH)
encoded_text = text.encode('ascii', 'ignore')
message = Message(encoded_text, date)
handles = id_to_numbers(row_id)
LAST_READ = row_id
messages.append([message, handles])
return(messages)
connection.close()
import edgar_cloud
import imessage
import threading
import sys
import time
import os
import logging
import edgarbot
import getpass
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
sleep_time = 0.1
def get_Edgar(self, Edgar):
self.Edgar = Edgar
def on_modified(self, event):
messages = imessage.get_last_message()
threads = []
for message in messages:
t = threading.Thread(target=self.Edgar.read(message))
threads.append(t)
t.start()
class Listener:
def __init__(self, username, password):
self.Ed = edgarbot.Edgar(username, password)
def listen(self):
print "Edgar is listening!"
homedir = os.environ['HOME']
path = homedir + "/Library/Messages/"
event_handler = MyHandler()
event_handler.get_Edgar(self.Ed)
observer = Observer(timeout=0.5)
observer.event_queue.maxsize=10
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def main():
username = raw_input("Please input your icloud username: ")
password = getpass.getpass("Please enter your icloud password: ")
l = Listener(username, password)
l.listen()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment