Skip to content

Instantly share code, notes, and snippets.

@mfm24
Created June 4, 2015 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfm24/a9440c0fb4cc49634594 to your computer and use it in GitHub Desktop.
Save mfm24/a9440c0fb4cc49634594 to your computer and use it in GitHub Desktop.
Progressive png writer
# -*- coding: utf-8 -*-
# MFM 2015-06-04
# Trying to make this send data over wsgi with a
# generator to give a time-dependant graph
# goal is to acceot a generator like:
# def source():
# while True:
# time.sleep(1)
# yield random()
# and get updates live in a webpage.
# need to use a zlib compress object and see if it works...
# We do a single IDAT per line!
#
# Changes from png_bargraph.py
# Uses 8-bits per pixel. This produces files
# pi.png: 94B (90B for bit version)
# random.png: 2KB (2KB for bit version)
# sine.png: 6286B (3642B for bit version) 6298B progressive
# 30818B with progressive IDATS! (one per line)
# So bits are more efficient, but more like 2x than 8x more
#
# This works fine in Chrome! Also works with a single IDAT, which can
# be slightly more efficient, but does need all data at once (for IDAT
# length to be valid...)
from __future__ import division
import itertools
import struct
import zlib
def yield_block(header, data):
assert len(header)==4, 'header must be 4 bytes!'
# length:
yield struct.pack('! L', len(data))
# chunk type, 4 byte header
yield header
# data
yield data
# crc
yield struct.pack(
'! L',
zlib.crc32("".join([header, data])) & 0xffffffff)
def compress_gen(gen):
# yield compressed bytes from gen
# we need to do a flush per line or else we don't get many
# sections
c_obj = zlib.compressobj()
for d in gen:
yield c_obj.compress(d)
yield c_obj.flush(zlib.Z_SYNC_FLUSH)
yield c_obj.flush()
def int_to_pngline(data, width, filt=0, white=0xFF, black=0):
# converts ints to a png line with a filter byte, then enough
# pixels to fill the width, starting with n white and padding with
# black
for d in data:
d = min(d, width)
r = [filt] + ([white] * d)
r += [black] * (width - d)
yield str(bytearray(r))
def make_bar_png(data_generator, width, height, progressive=True):
def make_line(line):
r = [0xFF] * line
r += [0x00] * (width - line)
return r
bit_depth = 8
color_type = 0 # grayscale
compression_method, filter_method, interlace_method = 0, 0, 0
yield str(bytearray([0x89, 'P', 'N', 'G', '\r', '\n', 0x1A, '\n']))
# our header block
for b in yield_block('IHDR',
struct.pack('! LLBBBBB',
width, height, bit_depth,
color_type, compression_method,
filter_method, interlace_method)):
yield b
#unfiltered data (start with 0 as filterbyte)
if progressive:
# we compress all data with single zlib chunk
# but can send intermediate IDATs as we go
# we need a 0 filter byte per line
src = compress_gen(int_to_pngline(data_generator, width))
for d in src:
if d:
for b in yield_block('IDAT', d):
yield b
else:
# we do a single IDAT:
dat = [str(bytearray([0]+make_line(d))) for d in data_generator]
for b in yield_block('IDAT', zlib.compress("".join(dat))):
yield b
for b in yield_block('IEND', ''):
yield b
def test():
import math
with open('../pi.png', 'w') as f:
dat = [int(x) for x in str(math.pi) if x != '.']
for d in make_bar_png(dat, 10, len(dat)):
f.write(d)
import random
with open('../random.png', 'w') as f:
dat = [int(500**random.random()) for x in xrange(512)]
for d in make_bar_png(dat, 500, len(dat)):
f.write(d)
with open('../sine.png', 'w') as f:
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)]
for d in make_bar_png(dat, 1024, len(dat)):
f.write(d)
def slow_gen(g, delay=0.01):
import time
for d in g:
time.sleep(delay)
yield d
def test_web():
# create a bottle app that yields our generator. Hopefully
# browsers will display as data arrives!
import bottle
@bottle.get('/slow_png.png')
def get_slow_png():
# lots of IDATs
bottle.response.content_type = "image/png"
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)]
return make_bar_png(slow_gen(dat), 1024, len(dat))
@bottle.get('/slow_png2.png')
def get_slow_png2():
# single IDAT, still displays progressively
bottle.response.content_type = "image/png"
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)]
data = list(make_bar_png(dat, 1024, len(dat), progressive=False))
return slow_gen("".join(data), delay=0.001)
@bottle.get('/fast_png.png')
def get_fast_png():
bottle.response.content_type = "image/png"
dat = [int(512*(1+math.sin(x/32.0))) for x in xrange(1024)]
return make_bar_png(dat, 1024, len(dat))
@bottle.get('/counter')
def get_counter():
for x in xrange(100):
yield "<br>%s" % x
time.sleep(1)
bottle.run(debug=True)
if __name__ == "__main__":
import math
import time
test_web()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment