Skip to content

Instantly share code, notes, and snippets.

@mehdinourollah
Forked from hootan09/Python-sample.md
Created January 27, 2022 12:56
Show Gist options
  • Save mehdinourollah/2714afd3d53487fbd101f3d6364caa0d to your computer and use it in GitHub Desktop.
Save mehdinourollah/2714afd3d53487fbd101f3d6364caa0d to your computer and use it in GitHub Desktop.
Python Basic Command

Python Basic to Advanced Commands & Code

Some Sample of Python Code for Using

[Virtual env built-in]

$ python -m venv .

#### "windows activation"
c:/> ./Scripts/activate

#### "linux activation"
$ source env-name/bin/activate

#### "POSIX-compatible system activation"
$ . env-name/bin/activate

#### "deactive"
$ ./Scripts/deactivate

[install dependency in python]

$ pip install -r requirements.txt

#### "add to requirements.txt"
$ pip freeze > requirements.txt

[check & install pip with module]

$ python3 -m ensurepip

[upgrade pip]

$ sudo apt install python3-pip python3-dev
$ sudo -H pip3 install --upgrade pip

[Virtual env package]

$ sudo -H pip3 install virtualenv

$ virtualenv test

#### "windows"
c:/> Scripts/activate


#### "linux"
$ source testenv/bin/activate

deactivate

[decorator Sample (@)]

def uppercase(func):
    def wrapper():
        original_result = func()
        modified_result = original_result.upper()
        return modified_result
    return wrapper

@uppercase
def hello():
    """Return a string."""
    return 'hello world'


print(hello()) #HELLO WORLD

[decorator with arguments]

def trace(func):
    def wrapper(*args, **kwargs):
        print(f'TRACE: calling {func.__name__}() '
        f'with {args}, {kwargs}')
        original_result = func(*args, **kwargs)
        print(f'TRACE: {func.__name__}() '
            f'returned {original_result!r}')
        return original_result
    return wrapper
    
@trace
def say(name, line):
    return f'{name}: {line}'
    
>>> say('Jane', 'Hello, World')
'TRACE: calling say() with ("Jane", "Hello, World"), {}'
'TRACE: say() returned "Jane: Hello, World"'
'Jane: Hello, World'

#### * used for list & tuple
#### ** used for dictionaries

[poetry python manager(boilerplate)]

$ pip install --user poetry

$ poetry new my-project

# my-project/
# ├── README.rst
# ├── my_project
# │   └── __init__.py
# ├── pyproject.toml
# └── tests
#  ├── __init__.py
#  └── test_my_project.py

#### activate poetry venv
$ poetry shell

#### add package
$ poetry add <package-name>

[simple python docker file]

#### DockeFile
FROM python:3.9-slim
WORKDIR /app/
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY echo.py .
ENTRYPOINT ["python", "echo.py"]

#### alpine image (small & build from base image)
FROM alpine:3.13
WORKDIR /app/
RUN apk add --no-cache python3
COPY requirements.txt .
RUN apk add --no-cache py3-pip && \
 pip3 install -r requirements.txt && \
 apk del py3-pip
COPY echo.py .
CMD ["python", "echo.py"]

#### builld image
$ docker build -t <name> <path>

#### image size
$ docker images
# REPOSITORY     TAG    IMAGE   ID      CREATED             SIZE
# echo-alpine   latest  e7e3a2bc7b71    About a minute ago  53.7MB
# echo          latest  6b036d212e8f    40 minutes ago      126MB

#### run image
$ docker run -it --rm --publish 5000:5000 <image-name>

[python docker-compose]

#### docker-compose.yml
version: '3.8'
services:
    echo-server:
         # this tell Docker Compose to build image from
         # local (.) directory
         build: .
         # this is equivalent to "-p" option of
         # the "docker run" command
         ports:
         - "5000:5000"
         # this is equivalent to "-t" option of
         # the "docker run" command
         tty: true
         environment:
            - DATABASE_HOSTNAME=database
            - DATABASE_PORT=5432
            - DATABASE_PASSWORD=password
         depends_on:
            - database
         command:
            wait-for-it --service database:5432 --
            #### or
            #### watchmedo auto-restart --patterns "*.py" --recursive --
            python echo.py
         volumes:
            - .:/app/
    database:
        image: postgres
        restart: always
    cache:
        image: redis


####    wait-for-it and is actually written in Python so you can easily install it with pip.
####    wait-for-it by default times out after 15 seconds. After that timeout, it will start
####    the process after the -- mark regardless of whether it succeeded in connecting or
####    not. You can disable timeout using the --timeout 0 argument. Without the timeout, wait-for-it will wait indefinitely.

####    watchmedo is the basic usage format for reloading specified processes whenever there is a change to any Python file in the
####    current working directory. install with "$ pip install watchdog[watchmedo]"

#### run/stop
$ docker-compose up
$ docker-compose down

[bitwise operator (set and dictionary) python3.9]

>>> {1, 2, 3} & {1, 4}
{1}
>>> {1, 2, 3} | {1, 4}
{1, 2, 3, 4}
>>> {1, 2, 3} - {1, 4}
{2, 3}
>>> {1, 2, 3} ^ {1, 4}
{2, 3, 4}

dictionary_1 | dictionary_2 #### merge two dictionary

[type hinting]

from typing import Any

def get_ci(d: dict, key: str) -> Any:
    for k, v in d.items():
        if key.lower() == k.lower():
            return v

[unlimited number of positional arguments]

def concatenate(*items, delim: str):
    return delim.join(items)
 
#### example
>>> concatenate("John", "Doe", delim=" ")
'John Doe'
>>> concatenate("Ronald", "Reuel", "Tolkien", delim=" ")
'Ronald Reuel Tolkien'
>>> concatenate("Jay", delim=" ")
'Jay'
>>> concatenate(delim=" ")
''

[python OOP]

  • Python, unlike Kotlin and many other languages, freely permits multiple inheritance (although it often isn't a good idea)
  • Another important Python differentiator is the lack of private/public keywords that would control access to internal object attributes outside of the class definition.
#### simple class
class Point:
 def __init__(self, x, y):
     self.x = x
     self.y = y
 
 
 #### Every time an attribute is prefixed by __ (two underscores) within a class body
 #### it is renamed by the interpreter on the fly (private variable)
class MyClass:
 def __init__(self):
     self.__secret_value = 1

Class Descriptors

  • __ set __ (self, obj, value): This is called whenever the attribute is set.
  • __ get __ (self, obj, owner=None): This is called whenever the attribute is read (referred to as a getter).
  • __ delete __ (self, obj): This is called when del is invoked on the attribute.
#### Example
import random

class InitOnAccess:
 def __init__(self, init_func, *args, **kwargs):
     self.klass = init_func
     self.args = args
     self.kwargs = kwargs
     self._initialized = None
     
 def __get__(self, instance, owner):
     if self._initialized is None:
         print('initialized!')
         self._initialized = self.klass(*self.args, **self.kwargs)
     else:
         print('cached!')
         return self._initialized
         

class WithSortedRandoms:
 lazily_initialized = InitOnAccess(sorted,[random.random() for _ in range(5)])

#### OutPut
#### >>> m = WithSortedRandoms()
#### >>> m.lazily_initialized
#### initialized!
#### [0.2592159616928279, 0.32590583255950756, 0.4015520901807743,
#### 0.4148447834912816, 0.4187058605495758, 0.4534290894962043,
#### 0.4796775578337028, 0.6963642650184283, 0.8449725511007807,
#### 0.8808174325885045]
#### >>> m.lazily_initialized
#### cached!
#### [0.2592159616928279, 0.32590583255950756, 0.4015520901807743,
#### 0.4148447834912816, 0.4187058605495758, 0.4534290894962043,
#### 0.4796775578337028, 0.6963642650184283, 0.8449725511007807,
#### 0.8808174325885045] 

[Thread Pool with using two-way queues]

import time
from queue import Queue, Empty
from threading import Thread

import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

THREAD_POOL_SIZE = 4

def fetch_rates(base):
     response = requests.get(f"https://api.vatcomply.com/rates?base={base}")
     response.raise_for_status()
     rates = response.json()["rates"]
     
     # note: same currency exchanges to itself 1:1
     rates[base] = 1.
     return base, rates
     
def present_result(base, rates):
     rates_line = ", ".join([f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS ])
     print(f"1 {base} = {rates_line}")
     
def worker(work_queue, results_queue):
     while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
            
        try:
            result = fetch_rates(item)
        except Exception as err:
            results_queue.put(err)
        else:
            results_queue.put(result)
        finally:
            work_queue.task_done()
       
def main():
     work_queue = Queue()
     results_queue = Queue()
     
     for base in BASES:
        work_queue.put(base)
        
     threads = [
        Thread(target=worker,args=(work_queue, results_queue) ) for _ in range(THREAD_POOL_SIZE)]
     
     for thread in threads:
        thread.start()
        
     work_queue.join()
     
     while threads:
        threads.pop().join()
        
     while not results_queue.empty():
        result = results_queue.get()
         if isinstance(result, Exception):
            raise result
            
        present_result(*results)
        
if __name__ == "__main__":
     started = time.time()
     main()
     elapsed = time.time() - started
     
     print()
     print("time elapsed: {:.2f}s".format(elapsed)) 

[Multiprocessing (built-in multiprocessing module)]

multithreading is challenging. Dealing with threads in a sane and safe manner required a tremendous amount of code when compared to the synchronous approach. We had to set up a thread pool and communication queues, gracefully handle exceptions from threads, and also worry about thread safety when trying to provide a rate limiting capability. Dozens of lines of code are needed just to execute one function from some external library in parallel! And we rely on the promise from the external package creator that their library is thread-safe

from multiprocessing import Process
import os

def work(identifier):
     print(
     f'Hey, I am the process '
     f'{identifier}, pid: {os.getpid()}'
     )
     
def main():
     processes = [
     Process(target=work, args=(number,))
     for number in range(5)
     ]
     for process in processes:
        process.start()
        
     while processes:
        processes.pop().join()
 
 if __name__ == "__main__":
    main()

******

[Using process pools]

import time
from multiprocessing import Pool

import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

POOL_SIZE = 4

def fetch_rates(base):
     response = requests.get(
     f"https://api.vatcomply.com/rates?base={base}"
     )
     response.raise_for_status()
     rates = response.json()["rates"]
     # note: same currency exchanges to itself 1:1
     rates[base] = 1.
     return base, rates
     
 def present_result(base, rates):
     rates_line = ", ".join(
     [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
     )
     print(f"1 {base} = {rates_line}")
     
def main():
     with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_rates, BASES)
        
     for result in results:
        present_result(*result)
 
if __name__ == "__main__":
 started = time.time()
 main()
 elapsed = time.time() - started
 
 print()
 print("time elapsed: {:.2f}s".format(elapsed))

******

[Using multiprocessing.dummy as the multithreading interface]

from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool

def main(use_threads=False):
     if use_threads:
        pool_cls = ThreadPool
     else:
        pool_cls = ProcessPool
        
     with pool_cls(POOL_SIZE) as pool:
        results = pool.map(fetch_rates, BASES)
        
     for result in results:
        present_result(*result)
        
#### The dummy threading pool can also be imported from the multiprocessing.pool module as the ThreadPool class.
#### It will have the same implementation; 
#### the actual import path is just a matter of personal preference.

[Asynchronous programming]

The easiest way to think about asynchronous programming in Python is to imagine something similar to threads, but without system scheduling involved

import asyncio
import random

async def print_number(number):
     await asyncio.sleep(random.random())
     print(number)
     
if __name__ == "__main__":
     loop = asyncio.get_event_loop()
     
     loop.run_until_complete(
        asyncio.gather(*[
            print_number(number)
            for number in range(10)
        ])
     )
     loop.close()
     
#### (the * operator) to unpack the list of coroutines as arguments

sample example of async programming

import asyncio
import time

import aiohttp

from asyncrates import get_rates

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def present_result(base, rates):
     rates_line = ", ".join(
     [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
     )
     print(f"1 {base} = {rates_line}")
 
async def main():
     async with aiohttp.ClientSession() as session:
         for result in await asyncio.gather(*[
         get_rates(session, base)
            for base in BASES
         ]):
         present_result(*result)
         
if __name__ == "__main__":
     started = time.time()
     
     loop = asyncio.get_event_loop()
     loop.run_until_complete(main())
     
     elapsed = time.time() - started
     
     print()
     print("time elapsed: {:.2f}s".format(elapsed))

[Integrating non-asynchronous code with async using futures]

Fortunately for us, the Python standard library provides the concurrent.futures module, which is also integrated with the asyncio module. These two modules together allow you to schedule blocking functions to execute in threads or additional processes as if they were asynchronous non-blocking coroutines.

The most important classes in the concurrent.futures module are Executor and Future

The Executor class is a base class not intended for instantiation and has the following two concrete implementations:

  • ThreadPoolExecutor: This is the one that represents a pool of threads

  • ProcessPoolExecutor: This is the one that represents a pool of processes Every executor provides the following three methods:

useful methods:

  • submit(func, *args, **kwargs): This schedules the func function for execution in a pool of resources and returns the Future object representing the execution of a callable

  • map(func, *iterables, timeout=None, chunksize=1): This executes the func function over an iterable in a similar way to the multiprocessing.Pool.map() method

  • shutdown(wait=True): This shuts down the executor and frees all of its resources

>>> def loudly_return():
...     print("processing")
...     return 42
... 
>>> from concurrent.futures import ThreadPoolExecutor
>>> with ThreadPoolExecutor(1) as executor:
...     future = executor.submit(loudly_return)
...
processing
>>> future
<Future at 0x33cbf98 state=finished returned int>
>>> future.result()
42 

We can easily defer the blocking call to a separate thread with the loop.run_in_executor() call, while still leaving the fetch_rates() function as an awaitable coroutine, as follows:

async def fetch_rates(base):
     loop = asyncio.get_event_loop()
     response = await loop.run_in_executor(
        None, requests.get,
        f"https://api.vatcomply.com/rates?base={base}"
     )
     response.raise_for_status()
     rates = response.json()["rates"]
     # note: same currency exchanges to itself 1:1
     rates[base] = 1.
     return base, rates

[Class Docstrings sample]

class Animal:
    """
    A class used to represent an Animal

    ...

    Attributes
    ----------
    says_str : str
        a formatted string to print out what the animal says
    name : str
        the name of the animal
    sound : str
        the sound that the animal makes
    num_legs : int
        the number of legs the animal has (default 4)

    Methods
    -------
    says(sound=None)
        Prints the animals name and what sound it makes
    """

    says_str = "A {name} says {sound}"

    def __init__(self, name, sound, num_legs=4):
        """
        Parameters
        ----------
        name : str
            The name of the animal
        sound : str
            The sound the animal makes
        num_legs : int, optional
            The number of legs the animal (default is 4)
        """

        self.name = name
        self.sound = sound
        self.num_legs = num_legs

    def says(self, sound=None):
        """Prints what the animals name is and what sound it makes.

        If the argument `sound` isn't passed in, the default Animal
        sound is used.

        Parameters
        ----------
        sound : str, optional
            The sound the animal makes (default is None)

        Raises
        ------
        NotImplementedError
            If no sound is set for the animal or passed in as a
            parameter.
        """

        if self.sound is None and sound is None:
            raise NotImplementedError("Silent Animals are not supported!")

        out_sound = self.sound if sound is None else sound
        print(self.says_str.format(name=self.name, sound=out_sound))

[Package and Module Docstrings]

Package docstrings should be placed at the top of the package’s __ init __ .py file. This docstring should list the modules and sub-packages that are exported by the package.

Module docstrings are similar to class docstrings. Instead of classes and class methods being documented, it’s now the module and any functions found within. Module docstrings are placed at the top of the file even before any imports. Module docstrings should include the following:

  • A brief description of the module and its purpose

  • A list of any classes, exception, functions, and any other objects exported by the module The docstring for a module function should include the same items as a class method:

  • A brief description of what the function is and what it’s used for

  • Any arguments (both required and optional) that are passed including keyword arguments

  • Label any arguments that are considered optional

  • Any side effects that occur when executing the function

  • Any exceptions that are raised

  • Any restrictions on when the function can be called

"""Spreadsheet Column Printer

This script allows the user to print to the console all columns in the
spreadsheet. It is assumed that the first row of the spreadsheet is the
location of the columns.

This tool accepts comma separated value files (.csv) as well as excel
(.xls, .xlsx) files.

This script requires that `pandas` be installed within the Python
environment you are running this script in.

This file can also be imported as a module and contains the following
functions:

    * get_spreadsheet_cols - returns the column headers of the file
    * main - the main function of the script
"""

import argparse

import pandas as pd


def get_spreadsheet_cols(file_loc, print_cols=False):
    """Gets and prints the spreadsheet's header columns

    Parameters
    ----------
    file_loc : str
        The file location of the spreadsheet
    print_cols : bool, optional
        A flag used to print the columns to the console (default is
        False)

    Returns
    -------
    list
        a list of strings used that are the header columns
    """

    file_data = pd.read_excel(file_loc)
    col_headers = list(file_data.columns.values)

    if print_cols:
        print("\n".join(col_headers))

    return col_headers


def main():
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        'input_file',
        type=str,
        help="The spreadsheet file to pring the columns of"
    )
    args = parser.parse_args()
    get_spreadsheet_cols(args.input_file, print_cols=True)


if __name__ == "__main__":
    main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment