Created
April 11, 2022 05:54
-
-
Save grubberr/640bf8d1d7d9b83418cc0b8c21b70cbb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/Users/ant/temp/.venv/bin/python3 | |
import io | |
import csv | |
import random | |
import string | |
import requests | |
import responses | |
import tracemalloc | |
def generate_csv_data(columns: int, rows: int): | |
fieldnames = ['col_' + str(n) for n in range(columns)] | |
data = [] | |
for _ in range(rows): | |
row = {} | |
for col_name in fieldnames: | |
row[col_name] = ''.join(random.choices(string.ascii_lowercase, k=5)) | |
data.append(row) | |
with io.StringIO("", newline="") as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect="unix") | |
writer.writeheader() | |
for line in data: | |
writer.writerow(line) | |
return csvfile.getvalue() | |
def filter_null_bytes(s: str): | |
""" | |
https://github.com/airbytehq/airbyte/issues/8300 | |
""" | |
res = s.replace("\x00", "") | |
if len(res) < len(s): | |
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(s), len(res)) | |
return res | |
def download_data(url: str): | |
response = requests.get(url, stream=True) | |
g = response.iter_lines(decode_unicode=True) | |
g = (filter_null_bytes(s) for s in g) | |
csv_data = csv.DictReader(g, dialect="unix") | |
for n, row in enumerate(csv_data, 1): | |
yield n, row | |
def download_data_old(url: str): | |
response = requests.get(url, stream=False) | |
decoded_content = filter_null_bytes(response.content.decode("utf-8")) | |
fp = io.StringIO(decoded_content, newline="") | |
csv_data = csv.DictReader(fp, dialect="unix") | |
for n, row in enumerate(csv_data, 1): | |
yield n, row | |
@responses.activate | |
def test(): | |
url = "https://salesforce.com/services/data/jobs/query" | |
text = generate_csv_data(100, 10000) | |
responses.add("GET", url, body=text) | |
tracemalloc.start() | |
for x in download_data(url): | |
pass | |
fs, fp = tracemalloc.get_traced_memory() | |
first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2 | |
print(f"first_size_in_mb: {first_size_in_mb}") | |
print(f"first_peak_in_mb: {first_peak_in_mb}") | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment