Skip to content

Instantly share code, notes, and snippets.

@ChenyangGao
Created September 8, 2022 08:49
Show Gist options
  • Save ChenyangGao/6a3f6177ce8da748413ce304156588f9 to your computer and use it in GitHub Desktop.
Save ChenyangGao/6a3f6177ce8da748413ce304156588f9 to your computer and use it in GitHub Desktop.
把pandas的DataFrame转化成Excel格式的字节数据
#!/usr/bin/env python3
# coding: utf-8
"""这个模块提供了工具函数,可以把 `pandas` 的 `DataFrame` 转换成
xlsx 格式的 Excel 文件的二进制数据
"""
__author__ = "ChenyangGao <https://chenyanggao.github.io/>"
__version__ = (0, 1)
__all__ = ["df_to_excel_bytes", "sql_to_excel_bytes"]
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from io import BytesIO
from os import PathLike
from typing import cast, Final, Iterable, IO, TypeVar, Union
from types import MappingProxyType, MethodType
from warnings import warn
# 安装 [pandas-stubs](https://pypi.org/project/pandas-stubs/) 以静态检查
from pandas import read_sql, DataFrame, ExcelWriter, RangeIndex
IO_T = TypeVar("IO_T", bytes, str)
# Excel 的工作表名有一些非法字符 []:*?/\\,需要把出现的非法字符变成合法字符 _
EXCEL_SHEETNAME_INVALID_CHARS_TRANSTABLE: Final[MappingProxyType] = \
MappingProxyType(dict.fromkeys(map(ord, "[]:*?/\\"), "_"))
# xlsx 格式的 Excel 文件,最大的行数
SHEET_MAX_NROWS: Final[int] = 2 ** 20
# xlsx 格式的 Excel 文件,最大的列数
SHEET_MAX_NCOLS: Final[int] = 2 ** 14
# xlsx 格式的 Excel 文件,工作表名最大的字符数
SHEETNAME_MAX_LENGTH: Final[int] = 31
def read_io(fio: IO[IO_T], /) -> IO_T:
"从头读取一个 IO 对象的数据"
pos: int = fio.tell()
try:
fio.seek(0)
return fio.read()
finally:
fio.seek(pos)
@contextmanager
def ctx_df_to_excel_bytes(**kwargs):
"""上下文管理器。
把 `pandas` 的 `DataFrame` 对象转换成 xlsx 格式的 Excel 文件的字节数据
:param kwargs: 关键字参数传给 `pandas` 的 `ExcelWriter` 构造器
> 请参考: pandas.io.Excel._base.ExcelWriter
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.ExcelWriter.html
:return: 上下文管理器,返回一个 `pandas` 的 `ExcelWriter` 对象,
但是增加了 3 个属性:
- bio: Excel 数据将被写入这个 `io.Bytes` 对象
- read: 从 `bio` 中读取 Excel 数据
- write: 把 `bio` 中的 Excel 数据写入一个路径
Example::
>>> import pandas as pd
>>> df = pd.DataFrame([[1,2],[3,4]])
>>> with ctx_df_to_excel_bytes() as writer:
... for i in range(10):
... df.to_excel(writer, sheet_name=str(i), index=False)
>>> data = writer.read()
>>> df2 = pd.read_excel(data, sheet_name="5")
>>> (df == df2).all(None)
True
"""
bio = kwargs["path"] = BytesIO()
def read(self) -> bytes:
return read_io(self.bio)
def write(self, path: Union[bytes, int, PathLike]) -> int:
return open(path, "wb").write(read(self))
with ExcelWriter(**kwargs) as writer:
setattr(writer, "bio", bio)
setattr(writer, "read", MethodType(read, writer))
setattr(writer, "write", MethodType(write, writer))
yield writer
def df_to_excel_bytes(
dfs: Union[DataFrame, dict[str, DataFrame], Iterable[DataFrame]]
) -> bytes:
"""把 `pandas` 的 `DataFrame` 对象转换成 xlsx 格式的 Excel 文件的字节数据
:param dfs: 一个或一批 `pandas` 的 `DataFrame` 对象
:return: Excel 文件的字节数据
Example::
>>> import pandas as pd
>>> df = pd.DataFrame([[1,2],[3,4]])
>>> data = df_to_excel_bytes([df]*10)
>>> df2 = pd.read_excel(data, sheet_name="5")
>>> (df == df2).all(None)
True
"""
dfd: dict[str, DataFrame]
if isinstance(dfs, DataFrame):
dfd = {"Sheet1": dfs}
elif isinstance(dfs, dict):
dfd = dfs
else:
dfd = {str(i): df for i, df in enumerate(dfs)}
with ctx_df_to_excel_bytes() as writer:
for old_k, df in dfd.items():
# 把名称 old_k 中的所有非法字符各自转换成下划线 _
k = old_k.translate(EXCEL_SHEETNAME_INVALID_CHARS_TRANSTABLE)
if old_k != k:
warn(f"Illegal characters found in sheetname, convert {old_k!r} to {k!r}")
index, columns = df.index, df.columns
nrows, ncols = len(index), len(columns)
index_nlevel, columns_nlevel = index.nlevels, columns.nlevels
# whether_output_index 判别了是否要在 Excel 中输出 index
# 只有当 index 和 columns 的 level 层数都是 1,并且 index 的开始值 start 为 0 且
# 步进 step 的值是 1,才不需要输出索引。
# TIPS: 如果 columns 的 level 层数大于 1,`pandas` 规定必须输出 index
whether_output_index = not (
columns_nlevel == 1 and
index_nlevel == 1 and
isinstance(index, RangeIndex) and
index.start == 0 and
index.step == 1
)
# rng_row_step: 除去 columns 的输出后,还剩多少行可供数据输出
rng_row_step: int
# rng_col_step: 除去 index 的输出后(也可能不输出),还剩多少列可供数据输出
rng_col_step: int
if whether_output_index:
# 如果 columns 的 level 层数大于 1,则会多输出一空白行
if columns_nlevel > 1:
rng_row_step = SHEET_MAX_NROWS - columns_nlevel - 1
else:
rng_row_step = SHEET_MAX_NROWS - 1
rng_col_step = SHEET_MAX_NCOLS - index_nlevel
else:
rng_row_step = SHEET_MAX_NROWS - 1
rng_col_step = SHEET_MAX_NCOLS
rng_row = range(0, nrows, rng_row_step)
rng_col = range(0, ncols, rng_col_step)
# suffix_template: 如果的行或列过大,导致一张工作表不能输出,则要
# 拆分成几张工作表,为此需要为同一个 DataFrame 输出的多个工作表
# 添加后缀,最后的格式形如({name}表示引用)
# {表名}^{行切分序号}_{列切分序号}
# 假设某个 DataFrame 被切分成了
# sheet^0_0, sheet^0_1, sheet^1_0, sheet^1_0
# 那么数据在原来的 DataFrame 中的分布则是
# |sheet^0_0 | sheet^0_1 |
# |——————————|———————————|
# |sheet^1_0 | sheet^1_0 |
suffix_template = ""
# TIPS: 工作表最多只能有 31 个字符,因此如果添加后缀后,导致和表名的组合多于
# 31 字符,那么就要从表名的尾部去掉一些字符
suffix_len = 0
if len(rng_row) > 1:
q, r = divmod(len(rng_row), 16)
l = q + (r > 0)
suffix_template += "^{0:0%dx}" % l
suffix_len += l + 1
if len(rng_col) > 1:
q, r = divmod(len(rng_col), 16)
l = q + (r > 0)
suffix_template += "_{1:0%dx}" % l
suffix_len += l + 1
if suffix_len > 31:
raise RuntimeError(f"DataFrame is too large: {old_k!r}")
if suffix_len:
for i, row0 in enumerate(rng_row):
for j, col0 in enumerate(rng_col):
suffix = suffix_template.format(i, j)
key = k[:31-suffix_len] + suffix
warn(f"DataFrame is too large, generate sub-sheet: {key!r} (of {old_k!r})")
df.iloc[row0:row0+rng_row_step, col0:col0+rng_col_step].to_excel(
writer, index=whether_output_index, sheet_name=key)
else:
key = k[:31]
if key != k:
warn(f"Sheetname is too long, convert {old_k!r} to {key!r}")
if key == "":
key = "Sheet1"
df.to_excel(writer, index=whether_output_index, sheet_name=key)
return writer.read()
def pandas_read_sqls(
sqls: Union[str, dict[str, str], Iterable[str]],
con,
read_workers: int = -1,
) -> dict[str, DataFrame]:
"""读取一批 SQL 查询语句,每个 SQL 的查询结果都是一个 `pandas` 的 `DataFrame` 对象。
:param sqls: 一个或一批 SQL 查询语句
:param con: SQL 查询的服务器连接
SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy connectable; str
connections are closed automatically. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
:param read_workers: 并发的查询数
- 如果小于 0 或等于 1,则并发数为 1,即不并发
- 如果等于 0,则用 ThreadPoolExecutor 默认的并发数
- 如果大于 0,则数值就是并发数
:return: 返回值的内容取决于参数 `sqls`,具体为
if isinstance(sqls, str):
return {"": pandas.read_sql(sqls, con)}
elif isinstance(sqls, dict):
return {k: pandas.read_sql(sql, con) for k, sql in sqls.items()}
else:
return {str(i): pandas.read_sql(sql, con) for i, sql in enumerate(sqls)}
"""
if isinstance(sqls, str):
return {"": read_sql(sqls, con)}
else:
if not isinstance(sqls, dict):
sqls = dict((str(i), sql) for i, sql in enumerate(sqls))
sqls = cast(dict[str, str], sqls)
if read_workers < 0 or read_workers == 1:
return {k: read_sql(sql, con) for k, sql in sqls.items()}
else:
with ThreadPoolExecutor(
None if read_workers == 0 else min(read_workers, len(sqls))
) as worker:
df_iter = worker.map(lambda sql: read_sql(sql, con), sqls)
return {k: df for k, df in zip(sqls, df_iter)}
def sql_to_excel_bytes(
sqls: Union[str, dict[str, str], Iterable[str]],
con,
read_workers: int = -1,
) -> bytes:
"""执行一些 SQL 查询语句,并把查询结果保存到同一个 xlsx 格式的 Excel 文件中,
然后读取这个 Excel 文件并返回字节数据。
:param sqls: 一个或一批 SQL 查询语句,工作表名的确定方式如下:
- 如果 `sql` 是 str,则工作表名为 "Sheet1",`sqls`是相应的 SQL
- 如果 `sql` 是 dict, 则字典的键是工作表名,值是相应的 SQL
> 注意:每个工作表的名字,不允许包含这些字符之一 []:*?/\\ ,
如果包含,则会被自动替换成 _
- 否则 `sql` 就是可迭代对象, 则值是 SQL,序号(从0开始递增)是相应的工作表名
> 注意:每个 SQL 查询返回的数据量,行数尽量 <= 1048575(==2**20-1,第 1 行是表头),
列数最好 <= 16384 (==2**14),否则会被自动拆分
:param con: SQL 查询的服务器连接
SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy connectable; str
connections are closed automatically. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
:param read_workers: 并发的查询数
- 如果小于 0 或等于 1,则并发数为 1,即不并发
- 如果等于 0,则用 ThreadPoolExecutor 默认的并发数
- 如果大于 0,则数值就是并发数
:return: Excel 文件数据,bytes 形式
Example::
可以用 sqllite3 进行测试
>>> sql = '''\\
... with recursive range(x) as (
... values(0)
... union all
... select x + 1 from range where x < POW(2, 21)
... )
... select * from range;'''
>>> data = sql_to_excel_bytes([sql]*2, "sqlite:///:memory:")
...
>>> import pandas as pd
>>> df0 = pd.read_excel(data, sheet_name="0^0")
>>> df1 = pd.read_excel(data, sheet_name="0^1")
>>> df2 = pd.read_excel(data, sheet_name="0^2")
>>> df = pd.concat([df0, df1, df2], ignore_index=True)
>>> (df.x == range(2 ** 21 + 1)).all()
True
"""
dfs = pandas_read_sqls(sqls, con, read_workers)
return df_to_excel_bytes(dfs)
if __name__ == "__main__":
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment