Created
September 1, 2014 05:10
-
-
Save yudanta/47fb63dc3d590155c8e1 to your computer and use it in GitHub Desktop.
smsclient.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# how to use just create instance from this class, you can set client id or client secret from constructor | |
# Methods | |
# 1. get token get_token() //getting token key | |
# 2. check_token(token) // for checking tokon is valid or not | |
# 3. sending_sms(number, message, token, *args, **kwargs) //sending message => return message id | |
# 4. check_status_message(message_id, token, *args, **kwargs) //check status message | |
# | |
import requests | |
import json | |
import sys | |
import time | |
class smsCliApp(): | |
#CONS | |
BASE_URL = '<>' | |
OAUTH_TOKEN_URL = 'oauth/access_token' | |
OAUTH_TEST_TOKEN = 'oauth/test' | |
SENDING_SMS_URL = 'api/v1/messages/new' | |
CHECK_STATUS_SMS_URL = 'api/v1/messages/' | |
CLIENT_ID = '<>' | |
CLIENT_SECRET = '<>' | |
SCOPE = 'admin' | |
messages = '' | |
errors = '' | |
default_sender = 'tselbn' | |
def __init__(self, *args, **kwargs): | |
if 'client_id' in kwargs: | |
self.CLIENT_ID = kwargs['client_id'] | |
if 'client_secret' in kwargs: | |
self.CLIENT_SECRET = kwargs['client_secret'] | |
if 'scope' in kwargs: | |
self.SCOPE = kwargs['scope'] | |
return None | |
def get_token(self): | |
post_payload = {'grant_type':'client_credentials', 'client_id':self.CLIENT_ID, 'client_secret':self.CLIENT_SECRET, 'scope':self.SCOPE} | |
get_token = requests.post(''.join([self.BASE_URL, self.OAUTH_TOKEN_URL]), data=post_payload) | |
if get_token: | |
json_data = json.loads(get_token.text) | |
#print json_data | |
self.set_messages('success get token for authentication') | |
return json_data['access_token'] | |
else: | |
self.set_errors('Authentication error') | |
return False | |
def check_token(self, token): | |
token_payload = {'access_token':token} | |
checking_token = requests.get(''.join([self.BASE_URL, self.OAUTH_TEST_TOKEN]), params=token_payload) | |
if checking_token: | |
cek_data = json.loads(checking_token.text) | |
print checking_token.text | |
if 'code' in cek_data: | |
if cek_data['code'] == 200: | |
self.set_messages('token is valid') | |
return True | |
else: | |
self.set_messages('token invalid') | |
self.set_errors('token invalid') | |
return False | |
else: | |
self.set_messages('cannot create request for checking token') | |
self.set_errors('cannot create request for checking token') | |
return False | |
def sending_sms(self, number, message, token, *args, **kwargs): | |
sms_payload = {'phone_number':number, 'message':message, 'access_token':token, 'sender':self.default_sender} | |
sending = requests.post(''.join([self.BASE_URL, self.SENDING_SMS_URL]), data=sms_payload) | |
if sending: | |
sending_status = json.loads(sending.text) | |
if sending_status['status'] == 200: | |
self.set_messages(sending_status['message']) | |
return sending_status['message_id'] | |
else: | |
self.set_messages(sending_status['message']) | |
self.set_errors(sending_status['message']) | |
return False | |
else: | |
self.set_errors('cannot create requests to server app') | |
return False | |
def check_status_message(self, message_id, token,*args, **kwargs): | |
check_payload = {'access_token':token} | |
check_status_sms = requests.get(''.join([self.BASE_URL, self.CHECK_STATUS_SMS_URL, str(message_id)]), params=check_payload) | |
print check_status_sms.text | |
if check_status_sms: | |
status_data = json.loads(check_status_sms.text) | |
if status_data['status'] == 200: | |
self.set_messages(status_data['data']['status']) | |
return status_data['data']['status'] | |
else: | |
self.set_errors(status_data['messages']) | |
return status_data['messages'] | |
else: | |
self.set_errors('cannot create request to server') | |
return False | |
def set_messages(self, msg): | |
self.messages = msg | |
def set_errors(self, errors): | |
self.errors = errors | |
def get_messages(self): | |
return self.messages | |
def get_errors(self): | |
return self.errors | |
welcomeMsg = "to use: python smsCliApp.py {phone number} {message} {nTimes}" | |
def main(): | |
if len(sys.argv) <= 1: | |
print welcomeMsg | |
sys.exit(1) | |
else: | |
number = sys.argv[1] | |
message = sys.argv[2] | |
times = int(sys.argv[3]) | |
app = smsCliApp() | |
token = app.get_token() | |
if token: | |
#print token | |
for x in xrange(1,times): | |
print 'sending message at n:', x | |
sending = app.sending_sms(number, message, token) | |
if sending: | |
print app.get_messages() | |
#print sending | |
else: | |
print app.get_errors() | |
#set sleep | |
time.sleep(5) | |
else: | |
print 'cannot authenticate to server' | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment