Skip to content

Instantly share code, notes, and snippets.

@mcchae
Created May 2, 2018 06:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcchae/0f532ee2d300436c405334d0c4507043 to your computer and use it in GitHub Desktop.
Save mcchae/0f532ee2d300436c405334d0c4507043 to your computer and use it in GitHub Desktop.
python client <=> php service sample code
<?php
// get the HTTP http_method, path and body of the request
$http_method = $_SERVER['REQUEST_METHOD'];
$request = explode('/', trim($_SERVER['PATH_INFO'],'/'));
$input = json_decode(file_get_contents('php://input'),true);
// retrieve the method and key from the path
$method = preg_replace('/[^a-z0-9_]+/i','',array_shift($request));
$key = array_shift($request);
switch ($method) {
// ===========================================================================
case 'ping':
$data = array(
'method'=>$method,
'http_method'=>$http_method,
'success'=>True,
'timestamp'=>time()
);
break;
// ===========================================================================
case 'login':
if ($http_method != 'POST') {
http_response_code(405);
die('login must use POST');
}
$content = trim(file_get_contents("php://input"));
$decoded = json_decode($content, true);
// login check
$data = array(
'input'=>$decoded,
'success'=>True
);
break;
// ===========================================================================
case 'event_cid':
if ($http_method != 'POST') {
http_response_code(405);
die('login must use POST');
}
$content = trim(file_get_contents("php://input"));
// echo $content;
$decoded = json_decode($content, true);
$data = array(
'input'=>$decoded,
'success'=>True
);
break;
// ===========================================================================
case 'upload_done':
if ($http_method != 'POST') {
http_response_code(405);
die('login must use POST');
}
$content = trim(file_get_contents("php://input"));
$decoded = json_decode($content, true);
$data = array(
'input'=>$decoded,
'success'=>True
);
break;
// ===========================================================================
case 'absence_call_list':
if ($http_method != 'POST') {
http_response_code(405);
die('login must use POST');
}
$content = trim(file_get_contents("php://input"));
$decoded = json_decode($content, true);
$data = array(
'input'=>$decoded,
'success'=>True
);
break;
// ===========================================================================
default:
$data = array(
'method'=>$method,
'key'=>$key,
'http_method'=>$http_method,
'success'=>False,
'message'=>"unknown method"
);
break;
}
// create SQL based on HTTP http_method
switch ($http_method) {
case 'GET':
break;
case 'PUT':
break;
case 'POST':
break;
case 'DELETE':
break;
}
echo json_encode($data);
echo "\n";
#!/usr/bin/env python
# coding=utf8
"""
====================================
:mod:`api_test` rest client using requests
====================================
.. moduleauthor:: 채문창 <mcchae@gmail.com>
.. note:: MIT License
"""
# 설명
# =====
#
# rest service using Flask-RESTful
#
# RESTAPI (http://bcho.tistory.com/914)
# ==============================================================================
# | 리소스 | POST | GET | PUT | DELETE |
# +------------+--------------|--------------+-----------------+----------------
# | | create | read | update | delete |
# +------------+--------------|-------------+---------------+------------------+
# | /{rces} | 새로운rce 등록 | rces목록리턴 | 여러rces bulk갱신|여러/모든 rces삭제 |
# +------------+--------------|--------------+-----------------+---------------+
# |/{rces}/{id}| 에러 |{id}라는rce리턴 |{id} 라는 rce 갱신| {id} rce 삭제 |
# ==============================================================================
#
# The success Method : URI : actions are
# ==============================================================================
# HTTP Method | URI | Action |
# +------------+---------------------------------+-----------------------------|
# GET | http://{host}/api/v1.0/rces | retrieve list of rces(필터등)|
# POST | http://{host}/api/v1.0/rces | create a new rce |
# DELETE | http://{host}/api/v1.0/rces | delete all rces (필터) |
# +------------+---------------------------------+-----------------------------|
# GET | http://{host}/api/v1.0/rces/{id}| retrieve a rce |
# PUT | http://{host}/api/v1.0/rces/{id}| update a rce |
# DELETE | http://{host}/api/v1.0/rces/{id}| delete a rce |
# ==============================================================================
#
# 관련 작업자
# ===========
#
# 본 모듈은 다음과 같은 사람들이 관여했습니다:
# * 채문창
#
# 작업일지
# --------
#
# 다음과 같은 작업 사항이 있었습니다:
# * [2018/05/01]
# - 본 모듈 작업 시작
################################################################################
import os
import sys
import unittest
import datetime
# noinspection PyUnresolvedReferences,PyPackageRequirements
from api_util import RestClient, get_passwd_hash
from pprint import pformat
################################################################################
# noinspection PyMethodMayBeStatic,PyUnresolvedReferences
class TU (unittest.TestCase):
# ==========================================================================
isFirst = True
# ==========================================================================
def myInit(self):
if TU.isFirst:
TU.isFirst = False
TU.API_HOST = os.environ.get('API_HOST', 'localhost')
if not TU.API_HOST:
raise RuntimeError(
'API_HOST environment variable is not set!')
TU.API_PORT = os.environ.get('API_PORT', 8888)
if not TU.API_PORT:
raise RuntimeError(
'API_PORT environment variable is not set!')
TU.rc_api = RestClient(TU.API_HOST, TU.API_PORT, '_',
api_name='api.php')
# ==========================================================================
def setUp(self):
self.myInit()
# ==========================================================================
def tearDown(self):
pass
# ==========================================================================
def test0000_init(self):
self.assertTrue(True)
# ==========================================================================
def test0005_ping_api_all(self):
TU.rc_api.set_api_url('ping')
rc, rj = TU.rc_api.get_http()
self.assertTrue(rc == 200 and rj['success'] and rj['timestamp'])
print('\nafter ping json=%s' % pformat(rj))
print('Server Time: %s' %
datetime.datetime.fromtimestamp(rj['timestamp']).
strftime('%Y-%m-%dT%H:%M:%S'))
# ==========================================================================
def test0050_login(self):
TU.rc_api.set_api_url('login')
p_json = {
'user_id': 'admin',
'password': get_passwd_hash('admin_000'),
}
rc, rj = TU.rc_api.post_http(p_json)
self.assertTrue(rc == 200 and rj['success'])
print('\nafter login json=%s' % pformat(rj))
# ==========================================================================
def test0100_event_cid(self):
TU.rc_api.set_api_url('event_cid')
p_json = {
'sCaller': '010-5552-2778',
'sCallee': '02-123-4567',
'sResult': 'result...',
'sClSeqno': '0012345',
'sMiSeqno': '0000123'
}
rc, rj = TU.rc_api.post_http(p_json)
self.assertTrue(rc == 200 and rj['success'])
print('\nafter event_cid json=%s' % pformat(rj))
# ==========================================================================
def test0110_upload_done(self):
TU.rc_api.set_api_url('upload_done')
p_json = {
'type': 1, # 1/유선전화, 2/VoIP전화
'm_code': 'C0001', # 지점코드
'phone': '010-3827-8172', # 컴연결번호
# [고객전화번호]_[날짜]_[녹음시작시간]_장비에서제공하는일련번호
'filename': '010xxxxxxxx_2018년04월19일_07시03분15초_00000001'
}
rc, rj = TU.rc_api.post_http(p_json)
self.assertTrue(rc == 200 and rj['success'])
print('\nafter upload_done json=%s' % pformat(rj))
# ==========================================================================
def test0120__absence_call_list(self):
TU.rc_api.set_api_url('absence_call_list')
call_list = []
for i in range(5):
call_list.append({
'sCaller': '010-5552-277%d' % i,
'sCallee': '02-123-4567',
'sDate': datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
'sClSeqno': '001234%d' % i,
'sResult': 'result...%d' % i,
'sKind': (i+2)*100
})
p_json = {
'call_list': call_list
}
rc, rj = TU.rc_api.post_http(p_json)
self.assertTrue(rc == 200 and rj['success'])
print('\nafter event_cid json=%s' % pformat(rj))
################################################################################
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TU)
result = unittest.TextTestRunner(verbosity=2).run(suite)
ret = not result.wasSuccessful()
sys.exit(ret)
#!/usr/bin/env python
# coding=utf8
################################################################################
import sys
import requests
import json
import collections
import urllib.parse
import hashlib
from requests.utils import requote_uri
################################################################################
def get_passwd_hash(password):
hash_object = hashlib.sha256(password.encode())
return hash_object.hexdigest()
################################################################################
def convert_str(data):
"""dict 혹은 list 멤버 등을 돌면서 다음과 같은 작업 수행
- unicode 이면 utf8로 변환하여 리턴
- 문자열이면 문자열 리턴
- dict 이면 이를 해체하여 개별 키:값 쌍을 Recursive 하게 호출하여 재조립
- list나 tuple 이면 이를 해체하여 개별 값 요소를 Recursive 하게 호출하여
재조립
:param data: 변환을 위한 객체
>>> d = { u'spam': u'eggs', u'foo': True, u'bar': { u'baz': 97 } }
>>> print d
{u'foo': True, u'bar': {u'baz': 97}, u'spam': u'eggs'}
>>> d2 = convert_str(d)
>>> print d2
{'foo': True, 'bar': {'baz': 97}, 'spam': 'eggs'}
"""
if isinstance(data, str):
return data
elif isinstance(data, collections.Mapping):
return dict(map(convert_str, data.items()))
elif isinstance(data, collections.Iterable):
return type(data)(map(convert_str, data))
return data
################################################################################
class RestClient(object):
""" Restful Client simulation class """
# ==========================================================================
def __init__(self, host, port, table, api_version=None,
api_name=None,
use_https=False, url_prefix=''):
self.use_https = use_https
proto = 'https' if use_https else 'http'
self.url_base = '%s://%s:%s' % (proto, host, port)
self.url_path = ''
if url_prefix and url_prefix[-1] == '/':
url_prefix = url_prefix[:-1]
self.url_prefix = url_prefix
self.api_name = api_name
self.api_version = api_version
self.session = requests.Session()
self.set_resource(table)
# ========================================================================
def set_resource(self, table_or_func,
_prefix=None, _api_name=None, _api_version=None):
url_prefix = self.url_prefix if not _prefix else _prefix
api_name = self.api_name if not _api_name else _api_name
api_version = self.api_version if not _api_version else _api_version
if url_prefix:
if url_prefix[-1] == '/':
url_prefix = url_prefix[:-1]
# self.url_path = '%s/%s/%s/%s' \
# % (url_prefix, api_name, api_version, table_or_func)
self.url_path = url_prefix
else:
# self.url_path = '%s/%s/%s' \
# % (api_name, api_version, table_or_func)
self.url_path = ''
if api_name:
self.url_path += '/%s' % api_name
if api_version:
self.url_path += '/%s' % api_version
if table_or_func:
self.url_path += '/%s' % table_or_func
if self.url_path.startswith('/'):
self.url_path.lstrip('/')
return True
# ==========================================================================
def set_api_url(self, table_or_func):
# REST client use requests session so we need to chagne like this
self.set_resource(table_or_func)
# ==========================================================================
def get_all(self, where_json=None):
"""
GET methos on .../Resouce on RESTful
:param where_json:
:return:
"""
urlpath = self.url_path
if where_json is None:
where_json = {}
return self.do_http("GET", urlpath, p_json=where_json,
use_param_url=True)
# ==========================================================================
def create(self, a_data):
"""
POST methos on .../Resouce on RESTful
:param a_data:
:return:
"""
urlpath = self.url_path
return self.do_http("POST", urlpath, p_json=a_data)
# ==========================================================================
def update_all(self, set_json, where_json=None):
"""
DELETE methos on .../Resouce on RESTful
:param set_json:
:param where_json:
:return:
"""
urlpath = self.url_path
if where_json is None:
where_json = {}
if where_json:
p_json = {'sets': set_json, 'filter': where_json}
else:
p_json = {'sets': set_json}
return self.do_http("PUT", urlpath, p_json=p_json)
# ==========================================================================
def delete_all(self, where_json=None):
"""
DELETE methos on .../Resouce on RESTful
:param where_json:
:return:
"""
urlpath = self.url_path
if where_json is None:
where_json = {}
return self.do_http("DELETE", urlpath, p_json=where_json)
# ==========================================================================
def get(self, resouce_id, p_json=None):
"""
GET methos on .../Resouce/rid on RESTful
:param resouce_id:
:param p_json:
:return:
"""
urlpath = '%s/%s' % (self.url_path, resouce_id)
if not p_json:
return self.do_http("GET", urlpath)
return self.do_http("GET", urlpath, p_json=p_json, use_param_url=True)
# ==========================================================================
def update(self, resouce_id, a_data):
"""
PUT methos on .../Resouce/rid on RESTful
:param resouce_id:
:param a_data:
:return:
"""
urlpath = '%s/%s' % (self.url_path, resouce_id)
return self.do_http("PUT", urlpath, a_data)
# ==========================================================================
def delete(self, resouce_id):
"""
DELETE methos on .../Resouce/rid on RESTful
:param resouce_id:
:return:
"""
urlpath = '%s/%s' % (self.url_path, resouce_id)
return self.do_http("DELETE", urlpath)
# ==========================================================================
def get_http(self, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('GET', self.url_path,
p_json=p_json, use_param_url=True)
# ==========================================================================
def post_http(self, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('POST', self.url_path, p_json=p_json)
# ==========================================================================
def put_http(self, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('PUT', self.url_path, p_json=p_json)
# ==========================================================================
def delete_http(self, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('DELETE', self.url_path, p_json=p_json)
# ==========================================================================
def do_http(self, method, urlpath, p_json=None, use_param_url=False):
"""
DO HTTP base call
:param method:
:param urlpath:
:param p_json:
:param use_param_url:
:return:
"""
try:
if p_json is None:
p_json = {}
_kwargs = {}
if urlpath[0] == '/':
urlpath = urlpath[1:]
url = '%s/%s' % (self.url_base, urlpath)
if use_param_url:
url += "?json=%s" % requote_uri(json.dumps(p_json))
else:
_kwargs['json'] = p_json
if self.use_https:
_kwargs['verify'] = False
if method.upper() == 'GET':
response = self.session.get(url, **_kwargs)
elif method.upper() == 'PUT':
response = self.session.put(url, **_kwargs)
elif method.upper() == 'POST':
response = self.session.post(url, **_kwargs)
elif method.upper() == 'DELETE':
response = self.session.delete(url, **_kwargs)
else:
raise RuntimeError("Invalid HTTP method <%s>" % method.upper())
try:
r_json = convert_str(json.loads(response.text)) \
if response.status_code == 200 else {}
return response.status_code, r_json
except Exception:
sys.stderr.write('cannot convert json: %s' % response.text)
raise
except Exception:
raise
################################################################################
class ApiClient(object):
""" Restful Client simulation class """
# ==========================================================================
def __init__(self, url):
self.url = url.lower()
self.use_https = self.url.startswith('https:')
self.session = requests.Session()
# ==========================================================================
def get(self, url_postfix, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('GET', url_postfix,
p_json=p_json, use_param_url=True)
# ==========================================================================
def post(self, url_postfix, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('POST', url_postfix, p_json=p_json)
# ==========================================================================
def put(self, url_postfix, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('PUT', url_postfix, p_json=p_json)
# ==========================================================================
def delete(self, url_postfix, p_json=None):
if p_json is None:
p_json = {}
return self.do_http('DELETE', url_postfix, p_json=p_json)
# ==========================================================================
def do_http(self, method, url_postfix, p_json=None, use_param_url=False):
if p_json is None:
p_json = {}
_kwargs = {}
if url_postfix[0] == '/':
url_postfix = url_postfix[1:]
url = '%s/%s' % (self.url, url_postfix)
if use_param_url:
url += "?json=%s" % urllib.parse.quote(json.dumps(p_json))
else:
_kwargs['json'] = p_json
if self.use_https:
_kwargs['verify'] = False
if method.upper() == 'GET':
response = self.session.get(url, **_kwargs)
elif method.upper() == 'PUT':
response = self.session.put(url, **_kwargs)
elif method.upper() == 'POST':
response = self.session.post(url, **_kwargs)
elif method.upper() == 'DELETE':
response = self.session.delete(url, **_kwargs)
else:
raise RuntimeError("Invalid HTTP method <%s>" % method.upper())
r_json = convert_str(json.loads(response.text)) \
if response.status_code == 200 else {}
return response.status_code, r_json
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment