Skip to content

Instantly share code, notes, and snippets.

@suroegin
Last active August 1, 2017 12:19
Show Gist options
  • Save suroegin/12c70c9a82da58bd93d2584b12afcf77 to your computer and use it in GitHub Desktop.
Save suroegin/12c70c9a82da58bd93d2584b12afcf77 to your computer and use it in GitHub Desktop.

Concurrency (сопрограммы)

In file concurrency.py:

from collections import deque
from time import time, sleep as sys_sleep


class coroutine(object):
    """Делает из функции сопрограмму на базе расширенного генератора."""
    _current = None

    def __init__(self, callable):
        self._callable = callable

    def __call__(self, *args, **kwargs):
        corogen = self._callable(*args, **kwargs)
        cls = self.__class__
        if cls._current is None:
            try:
                cls._current = corogen
                next(corogen)
            finally:
                cls._current = None
        return corogen


def sleep(timeout):
    """Приостанавливает выполнение до получения события "таймаут истек"."""
    corogen = coroutine._current
    dispatcher.setup_timeout(corogen, timeout)
    revent = yield
    return revent


class Dispatcher(object):
    """Объект реализующий диспечер событий."""
    def __init__(self):
        self._pending = deque()
        self._deadline = time() + 3600.0

    def setup_timeout(self, corogen, timeout):
        deadline = time() + timeout
        self._deadline = min([self._deadline, deadline])
        self._pending.append([corogen, deadline])
        self._pending = deque(sorted(self._pending, key=lambda a: a[1]))

    def run(self):
        """Запускает цикл обработки событий."""
        while len(self._pending) > 0:
            timeout = self._deadline - time()
            self._deadline = time() + 3600.0
            if timeout > 0:
                sys_sleep(timeout)
            while len(self._pending) > 0:
                if self._pending[0][1] <= time():
                    corogen, _ = self._pending.popleft()
                    try:
                        coroutine._current = corogen
                        corogen.send("timeout")
                    except StopIteration:
                        pass
                    finally:
                        coroutine._current = None
                else:
                    break

dispatcher = Dispatcher()
run = lambda: dispatcher.run()

In file sample.py:

from concurency import coroutine, sleep, run

@coroutine
def hello(name, timeout):
    while True:
        yield from sleep(timeout)
        print("Привет, {}!".format(name))

hello("Петров", 2.0)
hello("Иванов", 3.0)
hello("Мир", 5.0)
run()

Decorators

Changes behavior in depends on situations...

from functools import wraps

def makeScreenshot_Decorator(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        if not isClientInResult:
            return "Can't find our client"
        return f(*args, **kwargs)
    return decorated

@makeScreenshot_Decorator
def makeScreenshot():
    return "Did and sent screenshot"

isClientInResult = False
makeScreenshot()

Descriptors examples

Example 1

class MyDescriptor:
    def __init__(self):
        self.__age = 0
    def __get__(self, instance, owner):
        return self.__age
    def __set__(self, instance, value):
        if not isinstance(value, int):
            raise TypeError("Age must be Integer value!")
        if value < 0 or value > 120:
            raise ValueError("Age must be between 0 and 120!")
        self.__age = value
    def __delete__(self):
        del self.__age

class Person:
    age = MyDescriptor()
    def __init__(self, name, age):
        self.name = name
        self.age = age
    def __str__(self):
        return "{0} is {1} years old!".format(self.name, self.age)

Example 2

class MyDescriptor:
    def __init__(self):
        self.__age = {}
    def __get__(self, instance, owner):
        return self.__age[instance]
    def __set__(self, instance, value):
        if not isinstance(value, int):
            raise TypeError("Age must be Integer value!")
        if value < 0 or value > 120:
            raise ValueError("Age must be between 0 and 120!")
        self.__age[instance] = value
    def __delete__(self):
        del self.__age[instance]

class Person:
    age = MyDescriptor()
    def __init__(self, name, age):
        self.name = name
        self.age = age
    def __str__(self):
        return "{0} is {1} years old!".format(self.name, self.age)

Get list of function names

import sys
import ast


def top_level_functions(body):
    return (f for f in body if isinstance(f, ast.FunctionDef))


def parse_ast(filename):
    with open(filename, "rt") as file:
        return ast.parse(file.read(), filename=filename)


if __name__ == "__main__":
    for filename in sys.argv[1:]:
        print(filename)
        tree = parse_ast(filename)
        for func in top_level_functions(tree.body):
            print("  {0}".format(func.name))

Logging

With class

class FirstClass(object):
    def __init__(self):
        self.current_number = 0
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)
        logger_handler = logging.FileHandler('python_logging.log')
        logger_handler.setLevel(logging.DEBUG)
        logger_formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
        logger_handler.setFormatter(logger_formatter)
        self.logger.addHandler(logger_handler)
        self.logger.info("Completed configuring logger()!")


    def increment_number(self):
        self.current_number += 1
        self.logger.warning("Incrementing number!")


    def decrement_number(self):
        self.current_number -= 1
        self.logger.warning("Decrementing number!")


    def clear_number(self):
        self.current_number = 0

PyInotify

Library that detect and notify file and folder changes.

Example

import pyinotify

class MyEventHandler(pyinotify.ProcessEvent):
    def process_IN_ACCESS(self, event):
        print("ACCESS event:", event.pathname)
    def process_IN_ATTRIB(self, event):
        print("ATTRIB event:", event.pathname)
    def process_IN_CLOSE_NOWRITE(self, event):
        print("CLOSE_NOWRITE event:", event.pathname)
    def process_IN_CLOSE_WRITE(self, event):
        print("CLOSE_WRITE event:", event.pathname)
    def process_IN_CREATE(self, event):
        print("CREATE event:", event.pathname)
    def process_IN_DELETE(self, event):
        print("DELETE event:", event.pathname)
    def process_IN_MODIFY(self, event):
        print("MODIFY event:", event.pathname)
    def process_IN_OPEN(self, event):
        print("OPEN event:", event.pathname)

def main():
    # watch manager
    wm = pyinotify.WatchManager()
    wm.add_watch('/home/performics/Job/', pyinotify.ALL_EVENTS, rec=True)

    # event handler
    eh = MyEventHandler()

    # notifier
    notifier = pyinotify.Notifier(wm, eh)
    notifier.loop()

if __name__ == '__main__':
    main()

Split words in strings without spaces

wordList = file('words.txt','r').read().split()
words = set( s.lower() for s in wordList )

def splitString(s):
    found = []

    def rec(stringLeft, wordsSoFar):
        if not stringLeft:
            found.append(wordsSoFar)
        for pos in xrange(1, len(stringLeft)+1):
            if stringLeft[:pos] in words:
                rec(stringLeft[pos:], wordsSoFar + [stringLeft[:pos]])

    rec(s.lower(), [])
    return found
    
    
     /// /// /// 
     
     
     WORD_FREQUENCIES = {
    'file': 0.00123,
    'files': 0.00124,
    'save': 0.002,
    'ave': 0.00001,
    'as': 0.00555
}

def split_text(text, word_frequencies, cache):
    if text in cache:
        return cache[text]
    if not text:
        return 1, []
    best_freq, best_split = 0, []
    for i in xrange(1, len(text) + 1):
        word, remainder = text[:i], text[i:]
        freq = word_frequencies.get(word, None)
        if freq:
            remainder_freq, remainder = split_text(
                    remainder, word_frequencies, cache)
            freq *= remainder_freq
            if freq > best_freq:
                best_freq = freq
                best_split = [word] + remainder
    cache[text] = (best_freq, best_split)
    return cache[text]

print split_text('filesaveas', WORD_FREQUENCIES, {})

--> (1.3653e-08, ['file', 'save', 'as'])

/// /// ///

A naive algorithm won't give good results when applied to real-world data. Here is a 20-line algorithm that exploits relative word frequency to give accurate results for real-word text.

(If you want an answer to your original question which does not use word frequency, you need to refine what exactly is meant by "longest word": is it better to have a 20-letter word and ten 3-letter words, or is it better to have five 10-letter words? Once you settle on a precise definition, you just have to change the line defining wordcost to reflect the intended meaning.)

The idea

The best way to proceed is to model the distribution of the output. A good first approximation is to assume all words are independently distributed. Then you only need to know the relative frequency of all words. It is reasonable to assume that they follow Zipf's law, that is the word with rank n in the list of words has probability roughly 1/(n log N) where N is the number of words in the dictionary.

Once you have fixed the model, you can use dynamic programming to infer the position of the spaces. The most likely sentence is the one that maximizes the product of the probability of each individual word, and it's easy to compute it with dynamic programming. Instead of directly using the probability we use a cost defined as the logarithm of the inverse of the probability to avoid overflows.

The code

from math import log

# Build a cost dictionary, assuming Zipf's law and cost = -math.log(probability).
words = open("words-by-frequency.txt").read().split()
wordcost = dict((k, log((i+1)*log(len(words)))) for i,k in enumerate(words))
maxword = max(len(x) for x in words)

def infer_spaces(s):
    """Uses dynamic programming to infer the location of spaces in a string
    without spaces."""

    # Find the best match for the i first characters, assuming cost has
    # been built for the i-1 first characters.
    # Returns a pair (match_cost, match_length).
    def best_match(i):
        candidates = enumerate(reversed(cost[max(0, i-maxword):i]))
        return min((c + wordcost.get(s[i-k-1:i], 9e999), k+1) for k,c in candidates)

    # Build the cost array.
    cost = [0]
    for i in range(1,len(s)+1):
        c,k = best_match(i)
        cost.append(c)

    # Backtrack to recover the minimal-cost string.
    out = []
    i = len(s)
    while i>0:
        c,k = best_match(i)
        assert c == cost[i]
        out.append(s[i-k:i])
        i -= k

    return " ".join(reversed(out))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment