Skip to content

Instantly share code, notes, and snippets.

@ruminations
Last active August 13, 2021 21:05
Show Gist options
  • Save ruminations/89a045dc0ef7edfb92304a0de0752ee0 to your computer and use it in GitHub Desktop.
Save ruminations/89a045dc0ef7edfb92304a0de0752ee0 to your computer and use it in GitHub Desktop.
An educational version of the timsort algorithm written in python.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module : timsort.py
Website: https://gist.github.com/ruminations/89a045dc0ef7edfb92304a0de0752ee0
License: https://github.com/ruminations/Licenses#design-license
Initial Copyright September 2018
This is a python implementation of the ideas presented by Tim Peters at:
https://svn.python.org/projects/python/trunk/Objects/listsort.txt
See also:
https://bugs.python.org/issue23515
https://hg.python.org/cpython/rev/325aec842e3e
https://svn.python.org/projects/python/trunk/Lib/test/sortperf.py
It has been written from scratch, based on Tim Peters' description - I
have not examined any existing work-horse implementations for guidance.
It can not hope to compete with the native C implementation of the python
builtin 'list.sort'. The purpose is to create an educational equivalent
that clarifies the mechanisms timsort uses. Other gists I have seen are
simply wrong. Some are very wrong. Extensive testing of this gist has
established it is both correct and complete. Sans commentary, 104 lines
of code implement the basic algorithm, about the same as incorrect gists,
sans their commentary. Galloping uses an additional 71 lines of code,
which the incorrect gists omit.
A critical requirement, ignored by other gists, is stability. Ensuring
stability introduces many subtleties, complicating the algorithm. We
have chosen to code for clarity in preference to optimal space or time
constraints. Re-implementation in a compiled language using alternate but
less clear coding would doubtless improve performance. In python, such
enhancements are moot, given the intrinsic overhead of a dynamic language.
An important requirement, absent from other gists, is that pointer shuffling
be done in place, so far as possible. Sorting small arrays is trivial, and
any algorithm will do. For large datasets, both speed and limiting memory
usage are critical.
Therefore, instead of shuffling with python slice idioms, which copy and
combine pointer lists, all operations index the whole dataset, limiting
scope by index ranges instead of slices. Then the only copying is to the
temporary array, implemented as a slice, that is intrinsic to timsort.
This implementation sub-classes the builtin 'list' data type which provides
succinct expression of many simple operations. Custom attributes track the
merge state and custom methods and properties fill out the functionality
required to implement timsort. The 'sort' method overrides the builtin
method, which, of course, is the work-horse implementation of timsort.
Algorithm summary
-----------------
Note: all comparisons implicitly apply a key function to each data item.
A 'run' is a sub-list of the input list that is either strictly descending
or ascending with possible equality:
descending : r0 > r1 > r2 > ...
ascending/equal : r0 <= r1 <= r2 <= ...
The minimum run length is selected by the property 'minrun' according to
Tim Peters' prescription.
Short runs are padded to 'minrun' with subsequent data using insertion sort.
Stability is preserved by always merging two consecutive runs. The list
of runs is managed as a stack with the top of the stack at the end of the
list. Merging maintains two invariants 'True' for any three sequential
runs in the stack. Let D,C,B,A be sequential runs where A is the stack top
above B, B is above C, and C is above D. Then the two invariants are:
1) len(C) > len(B) + len(A)
2) len(B) > len(A)
*3) len(D) > len(C) + len(B)
| The *3 condition corrects a pathological flaw that formal proof |
| methods uncovered in 2015. The *3 condition is necessary to |
| truly guarantee the first two conditions are invariant for any |
| three sequential runs. We implement the *3 condition for the |
| sake of correctness. In the context of python's dynamic list, |
| the flaw is moot, since the language automatically extends the |
| stack list's memory allocation as needed. The flaw may cause |
| out of bounds memory accesses when a fixed stack is allocated |
| based on the assumption that two invariants hold. In practice, |
| that will never happen for any real machine available in 2018. |
| |
| It is important, however, not to trivialize the formal proof. |
| Given the many subtleties of timsort, formal proof raises its |
| status from that of a pragmatic ad-hoc algorithm to that of a |
| fundamental method. Timsort's demonstrable superiority applied |
| to diverse datasets warrants more widespread understanding and |
| universal adoption of the algorithm. To that end, perhaps this |
| gist will illuminate, ultimately inspiring adoption of timsort. |
When the top runs fail to satisfy the invariants, merging commences until
the invariants are satisfied. The invariants imply that the lengths of
unmerged runs form an increasing sequence of unmerged lengths that grow
according to stack depth at least as fast as the Fibonacci series. The
stack has at most O(logtau(N)) entries where tau is the golden proportion
~1.618... and this keeps the stack small for very large data sets.
When merging, the smaller of C and A is merged with B, replacing C,B or B,A
in the stack, respectively. If len(C)==len(A) then B and A are merged to
reduce the freshest runs first.
The actual merger copies the smaller run and overwrites the original list
in place during the merge. The optimizing refinement of binary search
that avoids extraneous merge copying is implemented. Adaptive 'galloping'
is also implemented. The latter automatically recognizes clustered data,
speeding the sort for very orderly datasets. My implementation of adaptive
galloping probably differs from Tim Peters' implementation: I decrease the
galloping theshhold by the log2(cnt)+1 ('cnt.bit_length()') of the count of
successful gallops. This more rapidly stabilizes in cases where galloping
is useful than a decrement for each successful gallop. The threshhold is
simply incremented when the galloping heuristic proves to be a miscue.
"""
__all__=['TimSort']
__version__=20181004 # add galloping
__version_history__=(20180924,20180930)
## NOTA BENE:
# 'print' statements useful for debugging are commented out with '##'. It
# can be instructive to uncomment them in order to watch the algorithm work.
# Galloping code is marked with '###', making it easy to experiment without it.
class TimSort(list):
"""Sub-class of builtin 'list' with additional methods and properties
facilitating Tim Peters' 'timsort' sorting algorithm."""
__slots__=('_s','_t','_k','_mg') # stack, temp, key func, min gallop
# Stack is an ascending list of indices where a run begins. A run ends
# where the next run begins. 'self._s[0]==0' and 'max(self._s)==len(self)'.
# Sequential accumulated runs are defined by a sliding pair of indices.
def __init__(self,data):
"""Setup the algorithm using iterable 'data'."""
super(TimSort,self).__init__(data)
self._s,self._t,self._k,self._mg = [],[],(lambda v:v),7 # realize slots
def __str__(self):
"""Return a string representation: 'TimSort(< {} element list>)'. Use
'repr(...)' to see the actual list of data which may be very large."""
return "TimSort(< {} element list>)".format(len(self))
def _cmp(self,i,j,tmp=None): # all sort comparisons use this method
""" 'tmp' return value
------- --------------
'None' : 'key(self[i]) < key(self[j])'
'True' : 'key(self[i]) < key(self._t[j])'
'False' : 'key(self._t[i]) < key(self[j])'
'_cmp' return value is
-------- -----------------
(i,j...) : 'True' => ..i.. < ..j.. or ..j.. > ..i..
(i,j...) : 'False' => ..i.. >= ..j.. or ..j.. <= ..i.."""
a = self._t if (tmp is False) else self
b = self._t if (tmp is True ) else self
return self._k(a[i]) < self._k(b[j])
def insort(self,start=0,end=None,index=0):
"""Stable sort in place the sub-list delimited by the python range
'start' and 'end' using insertion sort. Returns 'None'.
'self[start:index]' is in ascending order (<= relation).
'self[index:end]' has no specific order.
'index <= start+1' => unsorted sub-list. 'index >= end' => NOP."""
index=start+max(index-start,1)
for i in range(index,end):
v=self[i] # save the item
j=self._h_place(i,start,i) # find a spot start <= j <= i
for k in range(i,j,-1): self[k]=self[k-1] # shuffle up
self[j]=v # insert the item
def reverse(self,start=0,end=None):
"""Reverse in place the sub-list delimited by the python range 'start'
and 'end'. Returns 'None'."""
i,j = start,(len(self)-1) if (end is None) else (end-1)
while i<j:
self[i],self[j] = self[j],self[i]
i+=1; j-=1
def _h_place(self,index,start,end,tmp=None):
"""Get placement index 'i' of item at 'index' in sorted sub-list
delimited by the 'range(start,end,1)':
ascending scan, shuffle down => return val == i
descending scan, shuffle up => return val-1 == i
Equality propagates high. Return values are in 'range(start,end+1)',
i.e. may '== end'.
'tmp == True' => 'index' indexes temporary,
'tmp == False' => 'start','end' index temporary,
'tmp == None' => all indices refer to data array."""
i,j = 0,end-start
while i<j: # binary search - equality propagates high
m = (i+j)>>1
i,j = (i,m) if (self._cmp(index,start+m,tmp) is True) else (m+1,j)
return start+i
def _l_place(self,index,start,end,tmp=None):
"""Get placement index 'i' of item at 'index' in sorted sub-list
delimited by the 'range(start,end,1)':
ascending scan, shuffle down => return val == i
descending scan, shuffle up => return val-1 == i
Equality propagates low. Return values are in 'range(start,end+1)',
i.e. may '== end'.
'tmp == True' => 'index' indexes temporary,
'tmp == False' => 'start','end' index temporary,
'tmp == None' => all indices refer to data array."""
i,j = 0,end-start
while i<j: # binary search - equality propagates low
m = (i+j)>>1
i,j = (i,m) if (self._cmp(start+m,index,tmp) is False) else (m+1,j)
return start+i
def sort(self,key=None,reverse=False):
"""Sort list in place in ascending order using Tim Peters' timsort
algorithm. Returns 'None'. 'key(item)' for 'item' a list element
returns the sort key to use, defaulting to 'item'. 'reverse==True'
reverses the list in place after sorting."""
self._k = key if key else (lambda v:v); l=len(self)
if l<2: return # nothing to do
mr=self.minrun; re=0; asc=None; self._s=[]
try: # collect ordered runs
while True: # break when compare raises 'IndexError' (#*)
rs,re = re,re+1 # loop update: new run
self._s.append(rs) # track new sub-array
while self.invariant is False: self.merge # ensure invariant
asc = not self._cmp(re,rs) #* 'True' => ascending run
while asc^self._cmp(re,re-1): re+=1 #* scan either run type
if not asc: self.reverse(rs,re) # convert to ascending
asc=None # done scanning
if (re-rs)<mr:
self.insort(rs,rs+mr,re); re=rs+mr #* extend to minimum length
except IndexError: # raised at new run, scan, or insort (#*)
if asc is False: self.reverse(rs,re) # raised by descending scan
if self._s[-1]!=l: self._s.append(l) # last terminal
while len(self._s)>2: self.merge
if reverse: self.reverse()
@property
def invariant(self):
"""Calculate and get the run stack invariant. 'self.invariant==False'
triggers merge operations. Returns 'True' for one run in stack."""
l=len(self._s)
if l<3: return True # one run
c,b,a = self._s[-3:]; B,A = b-c,a-b
if l==3: return (B>A) # two runs
d=self._s[-4]; C=c-d
if l==4: return (B>A) and (C>B+A) # three runs
e=self._s[-5]; D=d-e
return (B>A) and (C>B+A) and (D>C+B) # many runs
@property
def minrun(self):
"""Calculate the minimum run length that optimizes merge sort balance.
'self.minrun() in range(32,65) == True'. 'len(self)/self.minrun()'
is a power of 2 or is close to, but strictly less than a power of 2."""
r,l = 0,len(self)
while l>=64: r,l = r|(l&1),l>>1 # flag any set lower bits
return r+l # either 0 or 1 plus high 6 bits of l
@property
def merge(self):
"""Merge two of the three stack top runs. Returns 'None'."""
##print " entry",self._s
if len(self._s)==2: return # one run => done
a=self._s.pop(); b=self._s.pop(); c=self._s[-1]; B,A = b-c,a-b;
if len(self._s)==1: self._s.append(a) # two runs => last collapse
else: # collapse merge
self._s.pop(); d=self._s[-1]; C=c-d
if C<A:
self._s.extend([b,a]); B,A = C,B; c,b,a = d,c,b
else: self._s.extend([c,a])
g=0 ### g neg => from temp array; g pos => from in place
if B<=A: # copy merge low: B is temporary; scan up
c=self._h_place(b,c,b)
B=b-c # skip elements of B that are already in place
self._t=self[c:b]; i,j,k = 0,b,c
while i<B and j<a: # invariant: i+j-k == B
if self._cmp(j,i,True):
self[k]=self[j]; j+=1
g = 0 if (g<0) else g+1 ### adjust gallop trigger
else: # == goes here for stability (shuffling B up)
self[k]=self._t[i]; i+=1
g = 0 if (g>0) else g-1 ### adjust gallop trigger
k+=1
if abs(g)>=self._mg: i,j,k = self._g_asc(g,i,j,k,a); g=0 ### gallop
while i<B: # copy back values larger than all of A
self[k]=self._t[i]; i+=1; k+=1
else: # copy merge high: A is temporary; scan down
a=self._l_place(b-1,b,a)
A=a-b # skip elements of A that are already in place
self._t=self[b:a]; i,j,k = A-1,b-1,a-1
while -1<i and c-1<j: # invariant: i+j-k == -1
if self._cmp(i,j,False):
self[k]=self[j]; j-=1
g = 0 if (g<0) else g+1 ### adjust gallop trigger
else: # == goes here for stability (shuffling A down)
self[k]=self._t[i]; i-=1
g = 0 if (g>0) else g-1 ### adjust gallop trigger
k-=1
if abs(g)>=self._mg: i,j,k = self._g_dsc(g,i,j,k,c); g=0 ### gallop
while -1<i: # copy back values smaller than all of B
self[k]=self._t[i]; i-=1; k-=1
##print " exit",self._s
#
### Remaining code implements galloping.
#
def _s_dsc_B(self,index,start,end):
"""Descending gallop scan of B: find all B > A[i] (temporary)."""
g,l,b,e = 1,start-end,start+1,end # mirror of 'range(..)' parameters
while l>=g and self._cmp(index,b-g,False): g<<=1
b,e = (e+1,b-(g>>1)) if (l<g) else (b-g+1,b-(g>>1))
return self._h_place(index,b,e,False)
def _s_asc_A(self,index,start,end):
"""Ascending gallop scan of A: find all A < B[i] (temporary)."""
g,l,b,e = 1,end-start,start-1,end
while l>=g and self._cmp(b+g,index,True): g<<=1
b,e = (b+(g>>1)+1,e) if (l<g) else (b+(g>>1)+1,b+g)
return self._l_place(index,b,e,True)
def _s_dsc_A(self,index,start,end):
"""Descending gallop scan of A: find all (temporary) A <= B[j]."""
g,l,b,e = 1,start-end,start+1,end # mirror of 'range(..)' parameters
while l>=g and not self._cmp(b-g,index,False): g<<=1
b,e = (e+1,b-(g>>1)) if (l<g) else (b-g+1,b-(g>>1))
return self._l_place(index,b,e,False)
def _s_asc_B(self,index,start,end):
"""Ascending gallop scan of B: find all (temporary) B >= A[j]."""
g,l,b,e = 1,end-start,start-1,end
while l>=g and not self._cmp(index,b+g,True): g<<=1
b,e = (b+(g>>1)+1,e) if (l<g) else (b+(g>>1)+1,b+g)
return self._h_place(index,b,e,True)
def _g_dsc(self,g,i,j,k,c):
"""Adaptive galloping merge high, descending scan. Array A is temporary."""
self._mg+=1 # assume galloping won't pay
b,pays,fail = (g>0),0,0 # invariant i+j-k == -1
while -1<i and c-1<j: # alternately search A and B based on boolean 'b'
if b: # search for A[i] in B
m=self._s_dsc_B(i,j,c-1) # descending, copy up => m-1 is placement index
if j-m+1 < self._mg: fail+=1
else: pays+=1; fail=0
while j>=m: # copy B gallop up
self[k]=self[j] ; k-=1; j-=1
self[k]=self._t[i]; k-=1; i-=1
else: # search for B[j] in A
m=self._s_dsc_A(j,i,-1) # descending, copy up => m-1 is placement index
if i-m+1 < self._mg: fail+=1
else: pays+=1; fail=0
while i>=m: # copy A gallop over
self[k]=self._t[i] ; k-=1; i-=1
self[k]=self[j]; k-=1; j-=1
if fail<2: b = not b
else: break # two sequential alternate gallops failed to pay off
self._mg=max(1,self._mg-pays.bit_length()) # clamped log2(abs(..))+1 decrement
return i,j,k
def _g_asc(self,g,i,j,k,a):
"""Adaptive galloping merge low, ascending scan. Array B is temporary."""
self._mg+=1 # assume galloping won't pay
b,pays,fail,B = (g>0),0,0,i+j-k # invariant i+j-k == length of temporary
while i<B and j<a: # alternately search A and B based on boolean 'b'
if b: # search for B[i] in A
m=self._s_asc_A(i,j,a) # ascending, copy down => m is placement index
if m-j < self._mg: fail+=1
else: pays+=1; fail=0
while j<m: # copy A gallop down
self[k]=self[j] ; k+=1; j+=1
self[k]=self._t[i]; k+=1; i+=1
else: # search for A[0] in B
m=self._s_asc_B(j,i,B) # ascending, copy down => m is placement index
if m-i < self._mg: fail+=1
else: pays+=1; fail=0
while i<m: # copy B gallop over
self[k]=self._t[i] ; k+=1; i+=1
self[k]=self[j]; k+=1; j+=1
if fail<2: b = not b
else: break # two sequential alternate gallops failed to pay off
self._mg=max(1,self._mg-pays.bit_length()) # clamped log2(abs(..))+1 decrement
return i,j,k
if __name__=="__main__":
from random import randrange
trials,maxlength = 10000,4097; total=0
try:
for j in range(trials):
l=randrange(0,maxlength); total+=l<<1
d=[randrange(-l>>1,l>>1) for i in range(l)]
c=TimSort(d)
s=sorted(d); c.sort() # builtin sort; this implementation
if s!=c: raise RuntimeError
print "Functional trial {} sorted {} random data items.".format(j,len(c))
d=[(randrange(-9,10),i) for i in range(l)]
c=TimSort(d)
s=sorted(d,key=lambda t:t[0]); c.sort(key=lambda t:t[0])
if s!=c: raise RuntimeError
print " Stability trial {} sorted {} random repeating items in [-9:10].".format(j,len(c))
##raise RuntimeError
print
print "Successfully sorted {} random data items in {} random length records.".format(total,trials)
except RuntimeError:
errors=[ (i,t) for i,t in enumerate(zip(c,s,d))if t[0]!=t[1] ]
print "***Sorting errors (index,(error,correct,original)):"
##for t in errors[:]: print " {}".format(t)
print "Trial {} attempted to sort {} items returning {} items.".format(j,len(s),len(c))
print "{} sorting errors occurred using minimum run {}.".format(len(errors),c.minrun)
##print d#,repr(c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment