Skip to content

Instantly share code, notes, and snippets.

@nyatla
Created July 26, 2022 10:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nyatla/dd46364b6e8cefddf96a5f1d7975f0ca to your computer and use it in GitHub Desktop.
Save nyatla/dd46364b6e8cefddf96a5f1d7975f0ca to your computer and use it in GitHub Desktop.
二度と作りたくない遅延読み出しできるリングバッファ
from typing import Deque, Iterator,TypeVar,Sequence,Iterable,Union
from ..interfaces import IPeekableStream
T=TypeVar("T")
class PeekableStream2(IPeekableStream[T]):
""" iterをラップするPeekableストリームを生成します。
このクラスは制限付きで前方、後方の参照ができます。
"""
def __init__(self,src:Union[Iterator[T],Iterable[T]],inital_pos:int=0,bufsize=1024*8,remsize=1024,padding:T=None):
""" Peekableな範囲は、pos-remzizeからpos+(bufsize-remsize)の範囲です。
"""
if isinstance(src,Iterable):
src=iter(src)
elif isinstance(src,Iterator):
pass
else:
raise Exception("Invalid type %s"%(type(src)))
self._pos=inital_pos #現在の読み出し位置
self._iter:Iterator[T]=src
self._buf=[padding]*(bufsize+1)
self._remsize=remsize
self._head=remsize #読み出し可能位置の先頭
self._rp=self._head #getの読み出し位置
def seek(self,n:int):
buf=self._buf
buflen=len(buf)
inbuffer_size=(self._head-self._rp+buflen)%buflen #バッファ内のシーク可能サイズ
if inbuffer_size>=n:
#バッファ内のシークだけで終わる場合
self._rp=(self._rp+n+buflen)%buflen
self._pos=self._pos+n
return
#バッファ内のシーク
self._rp=(self._rp+inbuffer_size+buflen)%buflen
self._pos=self._pos+inbuffer_size
#読出し
rem_size=self._remsize
#読み飛ばしサイズを計算
skip_size=n-inbuffer_size
q=Deque[T](maxlen=rem_size)
try:
#キューに記録を取りながらスキップ
for _ in range(skip_size):
q.append(next(self._iter))
self._pos=self._pos+1
finally:
#キューをバッファに展開
h=self._head
for i in range(len(q)):
buf[h]=q.popleft()
h=(h+1)%buflen
#読取位置をheadに移動
self._head=h
self._rp=h
def get(self)->T:
d=self.peeks(1)
self.seek(1)
return d[0]
def gets(self,size:int)->Sequence[T]:
d=self.peeks(size)
# print(len(d),d)
self.seek(len(d))
return d
def peek(self,skip:int=0)->int:
return self.peeks(1,skip)[0]
def peeks(self,size:int,skip=0):
""" 前方容量を超える場所を選択しようとする場合、超えた部分についてはseek、またはgetするまで取得できない。
"""
#読み出し開始点は常にtail+remsize
buf=self._buf
buflen=len(buf)
remsize=self._remsize #遅行サイズ
if skip<-remsize:
# assert(rtail>=-remsize) #スキップサイズは遅行サイズよりは前方であること。 Errorの時はバッファアンダーフロー
raise IndexError("Underflow")
rhead=skip+size #読み出しサイズ
if rhead>=buflen-remsize:
#先行サイズより大きい読出しの場合にはサイズを制限
w=(buflen-remsize)-1
size=size-(rhead-w)
if size<=0:
raise IndexError("Overflow")
# print(rhead,size)
rhead=w
# assert(rhead<buflen-remsize) #先行部分は(バッファサイズ-遅行サイズ)より小さい事。 Errorの時はバッファオーバフロー
#後端の調整
# raise IndexError("Overflow")
#現在のデータサイズ
readable_size=(self._head-self._rp+buflen)%buflen
# print("B",size,readable_size)
#追記部分を読み出し
head=self._head
# pos=self._pos
nread=0
try:
# print("K",rhead,readable_size,rhead-readable_size)
for _ in range(rhead-readable_size):
buf[(head+nread)%buflen]=next(self._iter)
nread=nread+1
# pos=pos+1
# head=(head+1)%buflen
except StopIteration:
if nread+readable_size-skip<1:
raise #1ブロックもなければ再生成
finally:
self._head=(head+nread)%buflen
# self._pos=self._pos+nread
#読み出し可能な部分を読み出す
s=(self._rp+skip)%buflen
w=min((self._head-self._rp-skip+buflen)%buflen,size)
e=(s+w)%buflen
# print("K",s,e)
# print(">",s,e,size,buflen)
if s<e:
return tuple(buf[s:e])
else:
return tuple(buf[s:]+buf[:e])
@property
def pos(self):
return self._pos
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment