Skip to content

Instantly share code, notes, and snippets.

@amotl
Last active March 23, 2020 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amotl/015ef6b336db55128798d7f1a9a67dea to your computer and use it in GitHub Desktop.
Save amotl/015ef6b336db55128798d7f1a9a67dea to your computer and use it in GitHub Desktop.
Multiple HTTP writes / HTTP streaming example for improving Mocket socket mock framework
"""
Demonstrate HTTP streaming to Mocket's "mockhttp".
https://github.com/mindflayer/python-mocket
The error is::
self = <mocket.mockhttp.Request object at 0x108cf74c0>, data = b'POST /api/data HTTP/1.0\r\n'
def __init__(self, data):
> _, self.body = decode_from_bytes(data).split('\r\n\r\n', 1)
E ValueError: not enough values to unpack (expected 2, got 1)
.venv3/lib/python3.8/site-packages/mocket/mockhttp.py:23: ValueError
The reason is that ``data`` is essentially::
b'POST /api/data HTTP/1.0\r\n'
which well fails on being split by ``\r\n\r\n`` appropriately.
So, when receiving a streamed response, Mocket's "mockhttp"
should not expect the data to be sent en bloc.
"""
import json
import socket
import pytest
import requests
from mocket import mocketize, Mocket
from mocket.mockhttp import Entry
@mocketize
@pytest.mark.mocket
def test_requests():
"""
Using the ``requests`` module works perfectly.
"""
# Define HTTP conversation.
url = 'http://127.0.0.1/api/data'
Entry.single_register(Entry.POST, url)
# Invoke HTTP request.
requests.post(url, json={'hello': 'world'})
# Proof that worked.
assert Mocket.last_request().body == '{"hello": "world"}'
@mocketize
@pytest.mark.mocket
def test_sockets():
"""
Using raw sockets fails.
"""
# Define HTTP conversation.
url = 'http://127.0.0.1/api/data'
Entry.single_register(Entry.POST, url)
# Define HTTP url segments and data.
host = '127.0.0.1'
port = 80
method = 'POST'
path = '/api/data'
data = json.dumps({'hello': 'world'})
# Invoke HTTP request.
address = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0]
sock = socket.socket(address[0], address[1], address[2])
sock.connect(address[-1])
sock.write("%s %s HTTP/1.0\r\n" % (method, path))
sock.write("Host: %s\r\n" % host)
sock.write("Content-Type: application/json\r\n")
sock.write("Content-Length: %d\r\n" % len(data))
sock.write("Connection: close\r\n\r\n")
sock.write(data)
#line = sock.readline()
#print('######## line:', line)
# Proof that worked.
assert Mocket.last_request().body == '{"hello": "world"}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment