Skip to content

Instantly share code, notes, and snippets.

@x1ddos
Created October 9, 2012 10:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save x1ddos/3857841 to your computer and use it in GitHub Desktop.
Save x1ddos/3857841 to your computer and use it in GitHub Desktop.
App Engine pytestbed for Go
// Usage:
//
// func TestSomething(t *testing.T) {
// testbed.SetUp(app.SetAppengineContext)
// defer testbed.TearDown()
// resp := testbed.DoGet('/handler/that/uses/appengine/apis')
// // do something with resp
// }
package testbed
import (
"log"
"os/exec"
"io"
"bufio"
"errors"
"strconv"
"sync"
"net/http"
"net/http/httptest"
"appengine"
"appengine_internal"
"appengine_internal/remote_api"
"code.google.com/p/goprotobuf/proto"
)
type SetCtxFunc func(ctx appengine.Context)
var (
setCtxFunc SetCtxFunc
pipe *exec.Cmd
apiRead *bufio.Reader
apiWrite *bufio.Writer
mu sync.Mutex
)
func SetUp(ctxFunc SetCtxFunc) {
setCtxFunc = ctxFunc
pipe = exec.Command("./testbed.py")
w, err := pipe.StdinPipe()
if err != nil { log.Fatal(err) }
apiWrite = bufio.NewWriter(w)
r, err := pipe.StdoutPipe()
if err != nil { log.Fatal(err) }
apiRead = bufio.NewReader(r)
if err := pipe.Start(); err != nil {
log.Fatal(err)
}
}
func TearDown() {
pipe.Process.Kill()
}
func DoGet(path string) *httptest.ResponseRecorder {
url := "http://localhost" + path
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
setCtxFunc(&context{req})
http.DefaultServeMux.ServeHTTP(resp, req)
return resp
}
// It implements the appengine.Context interface.
// Really, this is a copy & paste from appengine_internal, no changes.
// I wanted it here just to play and see how it works.
type context struct {
req *http.Request
}
func (c *context) Call(service, method string, in, out proto.Message,
_ *appengine_internal.CallOptions) error {
data, err := proto.Marshal(in)
if err != nil {
return err
}
res, err := call(service, method, data)
if err != nil {
return err
}
return proto.Unmarshal(res, out)
}
func (c *context) Request() interface{} {
return c.req
}
func (c *context) logf(level, format string, args ...interface{}) {
log.Printf(level+": "+format, args...)
}
func (c *context) Debugf(format string, args ...interface{}) {
c.logf("DEBUG", format, args...)
}
func (c *context) Infof(format string, args ...interface{}) {
c.logf("INFO", format, args...)
}
func (c *context) Warningf(format string, args ...interface{}) {
c.logf("WARNING", format, args...)
}
func (c *context) Errorf(format string, args ...interface{}) {
c.logf("ERROR", format, args...)
}
func (c *context) Criticalf(format string, args ...interface{}) {
c.logf("CRITICAL", format, args...)
}
// FullyQualifiedAppID returns the fully-qualified application ID.
// This may contain a partition prefix (e.g. "s~" for High Replication apps),
// or a domain prefix (e.g. "example.com:").
func (c *context) FullyQualifiedAppID() string {
return c.req.Header.Get("X-AppEngine-Inbound-AppId")
}
func call(service, method string, data []byte) ([]byte, error) {
mu.Lock()
defer mu.Unlock()
req := &remote_api.Request{
ServiceName: &service,
Method: &method,
Request: data,
}
if err := write(apiWrite, req); err != nil {
return nil, err
}
res := &remote_api.Response{}
if err := read(apiRead, res); err != nil {
return nil, err
}
if ae := res.ApplicationError; ae != nil {
// All Remote API application errors are API-level failures.
return nil, &appengine_internal.APIError{
Service: service,
Detail: *ae.Detail,
Code: *ae.Code,
}
}
return res.Response, nil
}
// read reads a protocol buffer from the socketAPI socket.
func read(r *bufio.Reader, pb proto.Message) error {
b, err := r.ReadSlice('\n')
if err != nil {
return err
}
n, err := strconv.Atoi(string(b[:len(b)-1]))
if err != nil {
return err
}
if n < 0 {
return errors.New("appengine: negative message length")
}
b = make([]byte, n)
_, err = io.ReadFull(r, b)
if err != nil {
return err
}
return proto.Unmarshal(b, pb)
}
// write writes a protocol buffer to the socketAPI socket.
func write(w *bufio.Writer, pb proto.Message) error {
b, err := proto.Marshal(pb)
if err != nil {
return err
}
w.WriteString("#start\n")
_, err = w.WriteString(strconv.Itoa(len(b)))
if err != nil {
return err
}
err = w.WriteByte('\n')
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
w.WriteString("\n#end\n")
return w.Flush()
}
#!/usr/bin/env python -u
import sys
import logging
logging.basicConfig(filename='/tmp/testbed.log', level=logging.DEBUG)
APPENGINE_SDK_PATH = '/usr/local/google_appengine'
sys.path.insert(0, APPENGINE_SDK_PATH)
from api_server import fix_sys_path, API_SERVER_EXTRA_PATHS
fix_sys_path(API_SERVER_EXTRA_PATHS)
if 'google' in sys.modules:
del sys.modules['google']
import pickle
import traceback
from google.appengine.tools import api_server
from google.appengine.ext.remote_api import remote_api_pb
from google.appengine.ext import testbed
from google.appengine.runtime import apiproxy_errors
testbed = testbed.Testbed()
testbed.activate()
def reset():
"""Resets testbed"""
testbed.deactivate()
testbed.activate()
testbed.init_datastore_v3_stub()
testbed.init_memcache_stub()
testbed.init_taskqueue_stub()
def process(req):
"""Processes a single request passed in as protobuf-encoded data"""
response = remote_api_pb.Response()
try:
request = remote_api_pb.Request()
request.ParseFromString(req)
api_response = api_server._ExecuteRequest(request).Encode()
response.set_response(api_response)
except Exception, e:
logging.error('Exception while handling %s\n%s',
request,
traceback.format_exc())
response.set_exception(pickle.dumps(e))
if isinstance(e, apiproxy_errors.ApplicationError):
application_error = response.mutable_application_error()
application_error.set_code(e.application_error)
application_error.set_detail(e.error_detail)
return response.Encode()
MSG_START = '#start'
MSG_END = '#end'
MSG_RESET = '#reset'
MSG_QUIT = '#quit'
def start():
buf = []
started = False
while 1:
line = sys.stdin.readline()
stripped = line.strip()
if stripped == MSG_END:
req = ''.join(buf[1:])[:-1] # skip first line (len) and final \n
logging.debug(req)
resp = process(req)
resp = "%d\n" % len(resp) + resp
logging.debug(resp)
sys.stdout.write(resp)
buf = []
started = False
elif stripped == MSG_START:
started = True
buf = []
elif stripped == MSG_RESET:
reset()
buf = []
started = False
elif stripped == MSG_QUIT:
break
else:
buf.append(line)
if __name__ == '__main__':
reset()
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment