Skip to content

Instantly share code, notes, and snippets.

# First approach
class A:
vadifier = None
def create(self, ...):
...
if self.vadifier:
self.vadifier.run(new_version)
...
@gmbnomis
gmbnomis / test_prefetch_content.py
Last active February 11, 2019 19:21
Test for RemoteArtifactSaver stage
from django.test import TestCase
from pprint import pprint
from django.db import connection, reset_queries
from django.test.utils import override_settings
from pulpcore.plugin.models import Artifact, RemoteArtifact, ContentArtifact
from pulpcore.plugin.stages import (
DeclarativeContent,
DeclarativeArtifact,
RemoteArtifactSaver,
@gmbnomis
gmbnomis / async_stages.py
Created May 27, 2018 10:51
Async pipelines stages
import asyncio
import functools
import time
from contextlib import suppress
start_time = time.time()
def print_t(msg):
t = time.time() - start_time
print(f"{t:04.1f}: {msg}")