This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from broadcaster import Broadcaster | |
broadcaster = Broadcaster() | |
@dp.callback_query_handler(admin_cb.filter(action = 'broadcast'), user_id = config.BOT_ADMINS_LIST, state='*',) | |
async def account_menu_callback(callback_query: types.CallbackQuery, callback_data, state: FSMContext): | |
current_state = await state.get_state() | |
if current_state: | |
await state.finish() | |
await callback_query.answer() | |
await callback_query.message.answer('Рассылка. Пришлите сообщение для рассылки', reply_markup = keyboards.kb_cancel) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from aiogram.utils import exceptions | |
from aiogram.types.chat import Chat | |
from datetime import datetime | |
from aiogram import Bot | |
from string import Template | |
from collections.abc import Iterable | |
from abc import ABC, abstractmethod | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Task exception was never retrieved | |
future: <Task finished coro=<Dispatcher._process_polling_updates() done, defined at C:\Users\DNS\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\LocalCache\local-packages\Python37\site-packages\aiogram\dispatcher\dispatcher.py:409> exception=RuntimeError('Task <Task pending coro=<Handler.notify() running at C:\\Users\\DNS\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python37\\site-packages\\aiogram\\dispatcher\\handler.py:116> cb=[gather.<locals>._done_callback() at C:\\Program Files\\WindowsApps\\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\\lib\\asyncio\\tasks.py:691]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at C:\\Program Files\\WindowsApps\\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\\lib\\asyncio\\futures.py:351]> attached to a different loop')> | |
Traceback (most recent call last): | |
File "C:\Users\DNS\AppData\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
from logging.handlers import TimedRotatingFileHandler | |
FORMATTER = logging.Formatter("%(asctime)s | %(levelname)5s | %(name)s | %(message)s") | |
LOG_FILE = "my_app.log" | |
def get_file_handler(): | |
file_handler = TimedRotatingFileHandler(LOG_FILE, when='midnight') | |
file_handler.setFormatter(FORMATTER) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imp | |
from operator import truediv | |
import modules.scripts as scripts | |
import gradio as gr | |
import os | |
from modules import images | |
from modules.processing import process_images, Processed, StableDiffusionProcessingImg2Img | |
from modules.processing import Processed | |
from modules.shared import opts, cmd_opts, state |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import modules.scripts as scripts | |
import gradio as gr | |
from modules import sd_samplers, shared, images | |
from modules.processing import Processed, process_images | |
class Script(scripts.Script): | |
def title(self): | |
return "Steps grid" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import modules.scripts as scripts | |
import gradio as gr | |
from PIL import Image, ImageDraw | |
from modules import processing, images, devices | |
from modules.processing import Processed | |
from modules.shared import state |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from unittest.mock import patch, MagicMock, AsyncMock | |
import aiohttp | |
from logs import logs | |
class TestLogs(unittest.IsolatedAsyncioTestCase): | |
@patch('aiohttp.UnixConnector') | |
async def test_connector(self, mock_connector): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib.parse import quote, unquote, urlparse | |
from fastapi import FastAPI | |
from typing import Optional | |
app = FastAPI() | |
@app.get("/encode/") | |
def encode_link(link: Optional[str] = None): | |
if link is None: | |
return {"error": "No link provided"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package merge | |
func Merge[T any](cs ...<-chan T) <-chan T { | |
var wg sync.WaitGroup | |
out := make(chan T) | |
output := func(c <-chan T) { | |
for n := range c { | |
out <- n | |
} | |
wg.Done() |
OlderNewer