Skip to content

Instantly share code, notes, and snippets.

@ostronom
Created July 17, 2018 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ostronom/308ecc654d5ee8b07a06121d5a74d9cd to your computer and use it in GitHub Desktop.
Save ostronom/308ecc654d5ee8b07a06121d5a74d9cd to your computer and use it in GitHub Desktop.
#/usr/bin/python
import sys
sys.path.append("/usr/local/lib/python2.7/site-packages")
import random
import grpc
from interface import authentication_pb2
from interface import authentication_pb2_grpc
from interface import groups_pb2
from interface import groups_pb2_grpc
from interface import messaging_pb2
from interface import messaging_pb2_grpc
from interface import peers_pb2
class Smoke(object):
def __init__(self, address):
self.chan = grpc.insecure_channel(address)
self.auth = authentication_pb2_grpc.AuthenticationStub(self.chan)
self.groups = groups_pb2_grpc.GroupsStub(self.chan)
self.messaging = messaging_pb2_grpc.MessagingStub(self.chan)
@property
def metadata(self):
return (('x-auth-ticket', self.token),)
def authenticate_connection(self):
resp = self.auth.RegisterDeprecatedDevice(
authentication_pb2.RegisterDeprecatedDeviceRequest()
)
self.token = resp.token
self.auth_id = resp.auth_id
def phone_auth(self):
self.used_phone = random.randint(700000000, 800000000)
transaction = self.auth.StartPhoneAuth(
authentication_pb2.RequestStartPhoneAuth(
phone_number = self.used_phone,
app_id = 1,
api_key = "api_key",
device_title = "device_title"
), metadata=self.metadata
)
try:
self.auth.ValidateCode(
authentication_pb2.RequestValidateCode(
transaction_hash=transaction.transaction_hash,
code = '1234'
), metadata=self.metadata
)
except grpc._channel._Rendezvous as e:
# Yikes..
if e.details() == "Unnoccupied phone number.":
pass
else:
raise
resp = self.auth.SignUp(
authentication_pb2.RequestSignUp(
transaction_hash=transaction.transaction_hash,
name="Adolf Hitler"
), metadata=self.metadata
)
self.user = resp.user
self.config = resp.config
print self.user
def random_id(self):
return random.randint(0, sys.maxsize)
def create_group(self, name):
return self.groups.CreateGroup(
groups_pb2.RequestCreateGroup(
rid=self.random_id(),
title=name,
group_type=groups_pb2.GROUPTYPE_GROUP
), metadata=self.metadata
)
def send_message(self, peer, text):
message = messaging_pb2.MessageContent(
textMessage=messaging_pb2.TextMessage(text=text)
)
print self.messaging.SendMessage(
messaging_pb2.RequestSendMessage(
peer=peer,
rid=self.random_id(),
message=message
), metadata=self.metadata
)
def run(self):
self.authenticate_connection()
self.phone_auth()
group = self.create_group("MY GROUP")
self.send_message(
peers_pb2.OutPeer(
type=peers_pb2.PEERTYPE_GROUP,
id=group.group.id,
access_hash=group.group.access_hash
),
"HELLO FROM GRPC"
)
print 'ok', self.token, self.auth_id
if __name__ == '__main__':
Smoke('35.228.128.125:8080').run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment