This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import time | |
| def test(): | |
| x = np.ones(100_000_000, dtype='int8') | |
| y = np.ones(100_000_000, dtype='int8') | |
| start = time.monotonic() | |
| for _ in range(100): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @retry(stop=stop_after_attempt(20), | |
| wait=wait_incrementing(start=1, increment=1, max=5), | |
| before=my_before_log(logger), | |
| retry=check_status) | |
| async def get_status(url_template: str, session: ClientSession) -> int: | |
| status_list = [200, 300, 400, 500] | |
| url = url_template.format(codes=random.choice(status_list)) | |
| print(f"Begin to get status from {url}") | |
| async with session.get(url) as response: | |
| return response.status |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def check_status(retry_state: RetryCallState) -> bool: | |
| outcome: Future = retry_state.outcome | |
| if outcome.exception(): | |
| return True | |
| return outcome.result() > 300 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import logging | |
| import random | |
| import sys | |
| import aiohttp | |
| from aiohttp import ClientTimeout, ClientSession | |
| from tenacity import * | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async def aiomultiprocess_main(): | |
| """ | |
| Integrating multiprocessing and asyncio with the help of aiomultiprocess, | |
| requires only a simple rewriting of the main function | |
| """ | |
| start = time.monotonic() | |
| all_books = [] | |
| async with Pool() as pool: | |
| detail_urls = [] | |
| async for urls in pool.map(fetch_list, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import csv | |
| import time | |
| from string import Template | |
| from urllib.parse import urljoin | |
| from aiohttp import request | |
| from aiomultiprocess import Pool | |
| from bs4 import BeautifulSoup | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def main(): | |
| records: list[dict] = read_file(FILE_NAME) | |
| dataframe = build_dataframe(records) | |
| print(dataframe.groupby('status_code').count()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def build_dataframe(records: list[dict]) -> pd.DataFrame: | |
| result: pd.DataFrame = pd.DataFrame.from_records(records, index='timestamp') | |
| return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def read_file(name: str) -> list[dict]: | |
| result = [] | |
| with open(name, 'r') as f: | |
| for line in f: | |
| obj: dict = process_line(line) | |
| result.append(obj) | |
| return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def process_line(text: str) -> dict: | |
| parse_result = compiler.parse(text) | |
| return parse_result.named if parse_result else {} |
NewerOlder