Created
July 26, 2022 10:48
-
-
Save nyatla/dd46364b6e8cefddf96a5f1d7975f0ca to your computer and use it in GitHub Desktop.
二度と作りたくない遅延読み出しできるリングバッファ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Deque, Iterator,TypeVar,Sequence,Iterable,Union | |
from ..interfaces import IPeekableStream | |
T=TypeVar("T") | |
class PeekableStream2(IPeekableStream[T]): | |
""" iterをラップするPeekableストリームを生成します。 | |
このクラスは制限付きで前方、後方の参照ができます。 | |
""" | |
def __init__(self,src:Union[Iterator[T],Iterable[T]],inital_pos:int=0,bufsize=1024*8,remsize=1024,padding:T=None): | |
""" Peekableな範囲は、pos-remzizeからpos+(bufsize-remsize)の範囲です。 | |
""" | |
if isinstance(src,Iterable): | |
src=iter(src) | |
elif isinstance(src,Iterator): | |
pass | |
else: | |
raise Exception("Invalid type %s"%(type(src))) | |
self._pos=inital_pos #現在の読み出し位置 | |
self._iter:Iterator[T]=src | |
self._buf=[padding]*(bufsize+1) | |
self._remsize=remsize | |
self._head=remsize #読み出し可能位置の先頭 | |
self._rp=self._head #getの読み出し位置 | |
def seek(self,n:int): | |
buf=self._buf | |
buflen=len(buf) | |
inbuffer_size=(self._head-self._rp+buflen)%buflen #バッファ内のシーク可能サイズ | |
if inbuffer_size>=n: | |
#バッファ内のシークだけで終わる場合 | |
self._rp=(self._rp+n+buflen)%buflen | |
self._pos=self._pos+n | |
return | |
#バッファ内のシーク | |
self._rp=(self._rp+inbuffer_size+buflen)%buflen | |
self._pos=self._pos+inbuffer_size | |
#読出し | |
rem_size=self._remsize | |
#読み飛ばしサイズを計算 | |
skip_size=n-inbuffer_size | |
q=Deque[T](maxlen=rem_size) | |
try: | |
#キューに記録を取りながらスキップ | |
for _ in range(skip_size): | |
q.append(next(self._iter)) | |
self._pos=self._pos+1 | |
finally: | |
#キューをバッファに展開 | |
h=self._head | |
for i in range(len(q)): | |
buf[h]=q.popleft() | |
h=(h+1)%buflen | |
#読取位置をheadに移動 | |
self._head=h | |
self._rp=h | |
def get(self)->T: | |
d=self.peeks(1) | |
self.seek(1) | |
return d[0] | |
def gets(self,size:int)->Sequence[T]: | |
d=self.peeks(size) | |
# print(len(d),d) | |
self.seek(len(d)) | |
return d | |
def peek(self,skip:int=0)->int: | |
return self.peeks(1,skip)[0] | |
def peeks(self,size:int,skip=0): | |
""" 前方容量を超える場所を選択しようとする場合、超えた部分についてはseek、またはgetするまで取得できない。 | |
""" | |
#読み出し開始点は常にtail+remsize | |
buf=self._buf | |
buflen=len(buf) | |
remsize=self._remsize #遅行サイズ | |
if skip<-remsize: | |
# assert(rtail>=-remsize) #スキップサイズは遅行サイズよりは前方であること。 Errorの時はバッファアンダーフロー | |
raise IndexError("Underflow") | |
rhead=skip+size #読み出しサイズ | |
if rhead>=buflen-remsize: | |
#先行サイズより大きい読出しの場合にはサイズを制限 | |
w=(buflen-remsize)-1 | |
size=size-(rhead-w) | |
if size<=0: | |
raise IndexError("Overflow") | |
# print(rhead,size) | |
rhead=w | |
# assert(rhead<buflen-remsize) #先行部分は(バッファサイズ-遅行サイズ)より小さい事。 Errorの時はバッファオーバフロー | |
#後端の調整 | |
# raise IndexError("Overflow") | |
#現在のデータサイズ | |
readable_size=(self._head-self._rp+buflen)%buflen | |
# print("B",size,readable_size) | |
#追記部分を読み出し | |
head=self._head | |
# pos=self._pos | |
nread=0 | |
try: | |
# print("K",rhead,readable_size,rhead-readable_size) | |
for _ in range(rhead-readable_size): | |
buf[(head+nread)%buflen]=next(self._iter) | |
nread=nread+1 | |
# pos=pos+1 | |
# head=(head+1)%buflen | |
except StopIteration: | |
if nread+readable_size-skip<1: | |
raise #1ブロックもなければ再生成 | |
finally: | |
self._head=(head+nread)%buflen | |
# self._pos=self._pos+nread | |
#読み出し可能な部分を読み出す | |
s=(self._rp+skip)%buflen | |
w=min((self._head-self._rp-skip+buflen)%buflen,size) | |
e=(s+w)%buflen | |
# print("K",s,e) | |
# print(">",s,e,size,buflen) | |
if s<e: | |
return tuple(buf[s:e]) | |
else: | |
return tuple(buf[s:]+buf[:e]) | |
@property | |
def pos(self): | |
return self._pos | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment