Last active
March 23, 2020 15:56
-
-
Save amotl/015ef6b336db55128798d7f1a9a67dea to your computer and use it in GitHub Desktop.
Multiple HTTP writes / HTTP streaming example for improving Mocket socket mock framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Demonstrate HTTP streaming to Mocket's "mockhttp". | |
https://github.com/mindflayer/python-mocket | |
The error is:: | |
self = <mocket.mockhttp.Request object at 0x108cf74c0>, data = b'POST /api/data HTTP/1.0\r\n' | |
def __init__(self, data): | |
> _, self.body = decode_from_bytes(data).split('\r\n\r\n', 1) | |
E ValueError: not enough values to unpack (expected 2, got 1) | |
.venv3/lib/python3.8/site-packages/mocket/mockhttp.py:23: ValueError | |
The reason is that ``data`` is essentially:: | |
b'POST /api/data HTTP/1.0\r\n' | |
which well fails on being split by ``\r\n\r\n`` appropriately. | |
So, when receiving a streamed response, Mocket's "mockhttp" | |
should not expect the data to be sent en bloc. | |
""" | |
import json | |
import socket | |
import pytest | |
import requests | |
from mocket import mocketize, Mocket | |
from mocket.mockhttp import Entry | |
@mocketize | |
@pytest.mark.mocket | |
def test_requests(): | |
""" | |
Using the ``requests`` module works perfectly. | |
""" | |
# Define HTTP conversation. | |
url = 'http://127.0.0.1/api/data' | |
Entry.single_register(Entry.POST, url) | |
# Invoke HTTP request. | |
requests.post(url, json={'hello': 'world'}) | |
# Proof that worked. | |
assert Mocket.last_request().body == '{"hello": "world"}' | |
@mocketize | |
@pytest.mark.mocket | |
def test_sockets(): | |
""" | |
Using raw sockets fails. | |
""" | |
# Define HTTP conversation. | |
url = 'http://127.0.0.1/api/data' | |
Entry.single_register(Entry.POST, url) | |
# Define HTTP url segments and data. | |
host = '127.0.0.1' | |
port = 80 | |
method = 'POST' | |
path = '/api/data' | |
data = json.dumps({'hello': 'world'}) | |
# Invoke HTTP request. | |
address = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] | |
sock = socket.socket(address[0], address[1], address[2]) | |
sock.connect(address[-1]) | |
sock.write("%s %s HTTP/1.0\r\n" % (method, path)) | |
sock.write("Host: %s\r\n" % host) | |
sock.write("Content-Type: application/json\r\n") | |
sock.write("Content-Length: %d\r\n" % len(data)) | |
sock.write("Connection: close\r\n\r\n") | |
sock.write(data) | |
#line = sock.readline() | |
#print('######## line:', line) | |
# Proof that worked. | |
assert Mocket.last_request().body == '{"hello": "world"}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment