Skip to content

Instantly share code, notes, and snippets.

@gmankab
Last active June 10, 2024 10:46
Show Gist options
  • Save gmankab/753a5225aa291c7f04d0bc17110b432e to your computer and use it in GitHub Desktop.
Save gmankab/753a5225aa291c7f04d0bc17110b432e to your computer and use it in GitHub Desktop.
[solved] issue with resending file through pyrofork
import pyrogram.methods.advanced.save_file
import pyrogram.session.session
import pyrogram.client
import pyrogram.types
import pyrogram.raw
import pyrogram
import asyncio
import typing
import types
async def save_file_custom_wrapper(
self: pyrogram.client.Client,
path: str | typing.BinaryIO | typing.AsyncGenerator[bytes, None],
file_id: int | None = None,
file_part: int = 0,
progress: typing.Callable | None = None,
progress_args: tuple = ()
):
if isinstance(path, typing.AsyncGenerator):
# runs custom function if got async bytes generator
return await save_file_from_bytes_gen(
self=self,
bytes_gen=path,
)
else:
# runs default pyrofork's save_file function if got something else
return await pyrogram.methods.advanced.save_file.SaveFile.save_file( # type: ignore
self=self,
path=path,
file_id=file_id,
file_part=file_part,
progress=progress,
progress_args=progress_args,
)
async def save_file_from_bytes_gen(
self: pyrogram.client.Client,
bytes_gen: typing.AsyncGenerator[bytes, None],
file_name: str = 'filename',
):
'''
custom save_file variant that accepts async bytes generator
can be used only with big files
file size should be more than 10 * 1024 * 1024 bytes (10 megabytes)
can resend big file without using disk and without writing while file to disk
useful when file size is more then ram size
'''
assert self.me
file_size_limit_mib = 4000 if self.me.is_premium else 2000
session = pyrogram.session.session.Session(
self,
await self.storage.dc_id(), # type: ignore
await self.storage.auth_key(), # type: ignore
await self.storage.test_mode(), # type: ignore
is_media=True,
)
await session.start()
file_id = self.rnd_id()
file_total_parts: int = 0
file_size: int = 0
async for chunk in bytes_gen:
file_total_parts += 1
file_size += len(chunk)
if file_size > file_size_limit_mib * 1024 * 1024:
raise ValueError(f"can't upload files bigger than {file_size_limit_mib} MiB")
rpc = pyrogram.raw.functions.upload.SaveBigFilePart( # type: ignore
file_id=file_id,
file_part=file_total_parts - 1,
file_total_parts=file_total_parts,
bytes=chunk
)
await session.invoke(rpc)
return pyrogram.raw.types.input_file_big.InputFileBig(
id=file_id,
parts=file_total_parts,
name=file_name,
)
async def main():
client = pyrogram.client.Client(
name='tg_bot',
)
# replacing pyrofork's native save_file method with custom wrapper
client.save_file = types.MethodType(save_file_custom_wrapper, client)
await client.start()
source_msg = await client.get_messages(
chat_id='@TAndroidAPK',
message_ids=385,
)
assert isinstance(source_msg, pyrogram.types.Message)
assert source_msg.document.file_size > 10 * 1024 * 1024
stream_media = client.stream_media(
message=source_msg,
)
assert isinstance(stream_media, typing.AsyncGenerator)
await client.send_document(
chat_id='me',
document=stream_media, # type: ignore
)
asyncio.run(main())
@gmankab
Copy link
Author

gmankab commented Jun 9, 2024

how to run this code and reproduce problem:

curl -sSL gist.githubusercontent.com/gmankab/753a5225aa291c7f04d0bc17110b432e/raw/01c428e7e2f688152b3e2e4dffd7e772d89894c7/main.py -o main.py
python -m venv
.venb/bin/pip install -U pip
.venv/bin/pip install -U tgcrypto-pyrofork pyrofork
.venb/bin/python main.py

@gmankab
Copy link
Author

gmankab commented Jun 10, 2024

fixed code:

import pyrogram.methods.advanced.save_file
import pyrogram.session.session
import pyrogram.client
import pyrogram.types
import pyrogram.raw
import pyrogram
import asyncio
import typing
import types
import math


class FileChunkGenerator:
    def __init__(
        self,
        msg: pyrogram.types.Message,
        client: pyrogram.client.Client,
    ):
        assert msg.document.file_size > 10 * 1024 * 1024
        self.name = msg.document.file_name
        self.msg: pyrogram.types.Message = msg
        self.client = client
        self.return_next: bytes = b''
        self.max_chunk_size = 512 * 1024

    def __aiter__(self):
        return self.stream_512()

    async def stream_512(
        self,
    ) -> typing.AsyncGenerator[bytes, None]:
        stream_1024 = self.client.stream_media(self.msg)
        assert isinstance(stream_1024, typing.AsyncGenerator)
        async for chunk_1024 in stream_1024:
            while len(chunk_1024) > self.max_chunk_size:
                yield chunk_1024[:self.max_chunk_size]
                chunk_1024 = chunk_1024[self.max_chunk_size:]
            yield chunk_1024


async def save_file_custom_wrapper(
    self: pyrogram.client.Client,
    path: str | typing.BinaryIO | FileChunkGenerator,
    file_id: int | None = None,
    file_part: int = 0,
    progress: typing.Callable | None = None,
    progress_args: tuple = ()
):
    if isinstance(path, FileChunkGenerator):
        # runs custom function if got async bytes generator
        return await save_file_from_bytes_gen(
            self=self,
            file_chunk_gen=path,
        )
    else:
        # runs default pyrofork's save_file function if got something else
        return await pyrogram.methods.advanced.save_file.SaveFile.save_file( # type: ignore
            self=self,
            path=path,
            file_id=file_id,
            file_part=file_part,
            progress=progress,
            progress_args=progress_args,
        )


async def save_file_from_bytes_gen(
    self: pyrogram.client.Client,
    file_chunk_gen: FileChunkGenerator,
):
    '''
    - custom save_file variant that accepts async bytes generator
    - can be used only with big files
    - file size should be more than 10 * 1024 * 1024 bytes (10 megabytes)
    - can resend big file without using disk and without writing while file to disk
    - useful when file size is more then ram size
    '''
    assert self.me
    file_size_limit_mib = 4000 if self.me.is_premium else 2000
    session = pyrogram.session.session.Session(
        self,
        await self.storage.dc_id(), # type: ignore
        await self.storage.auth_key(), # type: ignore
        await self.storage.test_mode(), # type: ignore
        is_media=True,
    )
    await session.start()
    file_id = self.rnd_id()
    max_chunk_size = 512 * 1024
    file_total_parts: int = math.ceil(
        file_chunk_gen.msg.document.file_size / max_chunk_size
    )
    chunk_index: int = 0
    file_size: int = 0
    async for chunk in file_chunk_gen:
        assert len(chunk) <= max_chunk_size
        assert chunk_index < file_total_parts
        if chunk_index + 1 != file_total_parts:
            assert len(chunk) % 1024 == 0
            assert max_chunk_size % len(chunk) == 0
        print(f'uploading chunk {chunk_index}, len={len(chunk)}')
        file_size += len(chunk)
        if file_size > file_size_limit_mib * 1024 * 1024:
            raise ValueError(f"can't upload files bigger than {file_size_limit_mib} MiB")
        rpc = pyrogram.raw.functions.upload.SaveBigFilePart( # type: ignore
            file_id=file_id,
            file_part=chunk_index,
            file_total_parts=file_total_parts,
            bytes=chunk
        )
        await session.invoke(rpc)
        chunk_index += 1
    await session.stop()
    return pyrogram.raw.types.input_file_big.InputFileBig(
        id=file_id,
        parts=file_total_parts,
        name=file_chunk_gen.name,
    )


async def main():
    client = pyrogram.client.Client(
        name='tg_bot',
    )
    # replacing pyrofork's native save_file method with custom wrapper
    client.save_file = types.MethodType(save_file_custom_wrapper, client)
    await client.start()
    source_msg = await client.get_messages(
        chat_id='@gmanka',
        message_ids=11354,
    )
    assert isinstance(source_msg, pyrogram.types.Message)
    file_chunk_gen = FileChunkGenerator(
        msg=source_msg,
        client=client,
    )
    await client.send_document(
        chat_id='me',
        document=file_chunk_gen, # type: ignore
    )


asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment