public
Last active

GeoJSON Django GeoQuerySet Serializer

  • Download Gist
geojson.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import datetime
import decimal
 
from django.core.serializers.python import Serializer as PythonSerializer
from django.core.serializers.json import DjangoJSONEncoder
from django.utils.encoding import is_protected_type, smart_unicode
from django.utils import simplejson as json
from django.utils import datetime_safe
from django.contrib.gis.geos.geometry import GEOSGeometry
from django.contrib.gis.db.models.fields import GeometryField
 
 
class DjangoGeoJSONEncoder(DjangoJSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
d = datetime_safe.new_datetime(o)
return d.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT))
elif isinstance(o, datetime.date):
d = datetime_safe.new_date(o)
return d.strftime(self.DATE_FORMAT)
elif isinstance(o, datetime.time):
return o.strftime(self.TIME_FORMAT)
elif isinstance(o, decimal.Decimal):
return float(o)
elif isinstance(o, GEOSGeometry):
return json.loads(o.geojson)
else:
return super(DjangoGeoJSONEncoder, self).default(o)
 
 
class Serializer(PythonSerializer):
def start_serialization(self, queryset):
self.feature_collection = {"type": "FeatureCollection", "features": []}
self.feature_collection["crs"] = self.get_crs(queryset)
self._current = None
 
def get_crs(self, queryset):
if self.crs == False:
return None
crs = {}
srid = self.options.get("srid", None)
if srid is None:
field = self.get_geometry_field(queryset)
srid = field.srid
# obj = queryset[0]
# geom = field._get_val_from_obj(obj)
#srid = geom.srid
crs["type"] = "link"
properties = {}
properties["href"] = "http://spatialreference.org/ref/epsg/%s/" % (str(srid))
properties["type"] = "proj4"
crs["properties"] = properties
return crs
 
def get_geometry_field(self, queryset):
fieldname = self.options.get("fieldname", None)
fields = queryset.model._meta.fields
geometry_fields = [f for f in fields if isinstance(f, GeometryField)]
if fieldname:
fieldnames = [x.name for x in geometry_fields]
try:
field = geometry_fields[fieldnames.index(fieldname)]
except IndexError:
raise Exception("%s is not a valid geometry field on %s" % (fieldname, queryset.model.__class__.__name__))
else:
# We only currently support one geometry field
field = geometry_fields[0]
return field
 
def start_object(self, obj):
self._current = {"type": "Feature", "properties": {}}
 
def end_object(self, obj):
self.feature_collection["features"].append(self._current)
self._current = None
 
def end_serialization(self):
self.options.pop('stream', None)
self.options.pop('fields', None)
self.options.pop('use_natural_keys', None)
 
self.options.pop('crs', None)
self.options.pop('srid', None)
 
json.dump(self.feature_collection, self.stream, cls=DjangoGeoJSONEncoder, **self.options)
 
def handle_field(self, obj, field):
value = field._get_val_from_obj(obj)
if is_protected_type(value):
self._current['properties'][field.name] = value
elif isinstance(value, GEOSGeometry):
self._current['geometry'] = value
else:
self._current['properties'][field.name] = field.value_to_string(obj)
 
def getvalue(self):
if callable(getattr(self.stream, 'getvalue', None)):
return self.stream.getvalue()
 
def handle_fk_field(self, obj, field):
related = getattr(obj, field.name)
if related is not None:
if self.use_natural_keys and hasattr(related, 'natural_key'):
related = related.natural_key()
else:
if field.rel.field_name == related._meta.pk.name:
# Related to remote object via primary key
related = related._get_pk_val()
else:
# Related to remote object via other field
related = smart_unicode(getattr(related, field.rel.field_name), strings_only=True)
self._current['properties'][field.name] = related
 
def handle_m2m_field(self, obj, field):
if field.rel.through._meta.auto_created:
if self.use_natural_keys and hasattr(field.rel.to, 'natural_key'):
m2m_value = lambda value: value.natural_key()
else:
m2m_value = lambda value: smart_unicode(value._get_pk_val(), strings_only=True)
self._current['properties'][field.name] = [m2m_value(related)
for related in getattr(obj, field.name).iterator()]
 
def serialize(self, queryset, **options):
"""
Serialize a queryset.
"""
self.options = options
 
self.stream = options.get("stream", StringIO())
self.selected_fields = options.get("fields")
self.use_natural_keys = options.get("use_natural_keys", False)
 
self.crs = options.get("crs", True)
 
self.start_serialization(queryset)
 
opt = queryset.model._meta
local_fields = queryset.model._meta.local_fields
many_to_many_fields = queryset.model._meta.many_to_many
 
for obj in queryset:
self.start_object(obj)
for field in local_fields:
if field.serialize or field.primary_key:
if field.rel is None:
if self.selected_fields is None or field.attname in self.selected_fields:
self.handle_field(obj, field)
else:
if self.selected_fields is None or field.attname[:-3] in self.selected_fields:
self.handle_fk_field(obj, field)
for field in many_to_many_fields:
if field.serialize:
if self.selected_fields is None or field.attname in self.selected_fields:
self.handle_m2m_field(obj, field)
self.end_object(obj)
self.end_serialization()
return self.getvalue()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.