Created July 8, 2022 10:53
# command line interface to sequential SAR change detection
# mort canty June, 2022
import sys
import getopt
import time
import math
from ast import literal_eval
import ee
# *****************************************
# The sequential change detection algorithm
# *****************************************
def chi2cdf(chi2, df):
"""Calculates Chi square cumulative distribution function for
df degrees of freedom using the built-in incomplete gamma
function gammainc().
return ee.Image(chi2.divide(2)).gammainc(ee.Number(df).divide(2))
def det(im):
"""Calculates determinant of 2x2 diagonal covariance matrix."""
return im.expression('b(0)*b(1)')
def log_det_sum(im_list, j):
"""Returns log of determinant of the sum of the first j images in im_list."""
sumj = ee.ImageCollection(im_list.slice(0, j)).reduce(ee.Reducer.sum())
return ee.Image(det(sumj)).log()
def log_det(im_list, j):
"""Returns log of the determinant of the jth image in im_list."""
im = ee.Image(ee.List(im_list).get(j.subtract(1)))
return ee.Image(det(im)).log()
def pval(im_list, j, m=4.4):
"""Calculates -2logRj for im_list and returns P value and -2mlogRj."""
im_list = ee.List(im_list)
j = ee.Number(j)
m2logRj = (log_det_sum(im_list, j.subtract(1))
.add(log_det(im_list, j))
# correction to simple Wilks approximation
# pv = ee.Image.constant(1).subtract(chi2cdf(m2logRj, 2))
one= ee.Number(1)
rhoj = one.subtract(one.add(one.divide(j.multiply(j.subtract(one)))).divide(6).divide(m))
omega2j = one.subtract(one.divide(rhoj)).pow(2.0).divide(-2)
rhojm2logRj = m2logRj.multiply(rhoj)
pv = ee.Image.constant(1).subtract(chi2cdf(rhojm2logRj,2) \
.add(chi2cdf(rhojm2logRj,6) \
.multiply(omega2j)) \
.subtract(chi2cdf(rhojm2logRj,2) \
return (pv, m2logRj)
def p_values(im_list,m=4.4):
"""Pre-calculates the P-value array for a list of images."""
im_list = ee.List(im_list)
k = im_list.length()
def ells_map(ell):
"""Arranges calculation of pval for combinations of k and j."""
ell = ee.Number(ell)
# Slice the series from k-l+1 to k (image indices start from 0).
im_list_ell = im_list.slice(k.subtract(ell), k)
def js_map(j):
"""Applies pval calculation for combinations of k and j."""
j = ee.Number(j)
pv1, m2logRj1 = pval(im_list_ell, j)
return ee.Feature(None, {'pv': pv1, 'm2logRj': m2logRj1})
# Map over j=2,3,...,l.
js = ee.List.sequence(2, ell)
pv_m2logRj = ee.FeatureCollection(
# Calculate m2logQl from collection of m2logRj images.
m2logQl = ee.ImageCollection(pv_m2logRj.aggregate_array('m2logRj')).sum()
# correction to simple Wilks approximation
# pvQl = ee.Image.constant(1).subtract(chi2cdf(m2logQl, ell.subtract(1).multiply(2)))
one = ee.Number(1)
f = ell.subtract(1).multiply(2)
rho = one.subtract(ell.divide(m).subtract(one.divide(ell.multiply(m))).divide(f).divide(3))
omega2 = f.multiply(one.subtract(one.divide(rho)).pow(2)).divide(-4)
rhom2logQl = m2logQl.multiply(rho)
pvQl = ee.Image.constant(1).subtract(chi2cdf(rhom2logQl,f) \
.add(chi2cdf(rhom2logQl,f.add(4)).multiply(omega2)) \
pvs = ee.List(pv_m2logRj.aggregate_array('pv')).add(pvQl)
return pvs
# Map over l = k to 2.
ells = ee.List.sequence(k, 2, -1)
pv_arr =
# Return the P value array ell = k,...,2, j = 2,...,l.
return pv_arr
def filter_j(current, prev):
"""Calculates change maps; iterates over j indices of pv_arr."""
pv = ee.Image(current)
prev = ee.Dictionary(prev)
pvQ = ee.Image(prev.get('pvQ'))
i = ee.Number(prev.get('i'))
cmap = ee.Image(prev.get('cmap'))
smap = ee.Image(prev.get('smap'))
fmap = ee.Image(prev.get('fmap'))
bmap = ee.Image(prev.get('bmap'))
alpha = ee.Image(prev.get('alpha'))
j = ee.Number(prev.get('j'))
cmapj = cmap.multiply(0).add(i.add(j).subtract(1))
# Check Rj? Ql? Row i?
tst =
# Then update cmap...
cmap = cmap.where(tst, cmapj)
# ...and fmap...
fmap = fmap.where(tst, fmap.add(1))
# ...and smap only if in first row.
smap = ee.Algorithms.If(i.eq(1), smap.where(tst, cmapj), smap)
# Create bmap band and add it to bmap image.
idx = i.add(j).subtract(2)
tmp =
bname = bmap.bandNames().get(idx)
tmp = tmp.where(tst, 1)
tmp = tmp.rename([bname])
bmap = bmap.addBands(tmp, [bname], True)
return ee.Dictionary({'i': i, 'j': j.add(1), 'alpha': alpha, 'pvQ': pvQ,
'cmap': cmap, 'smap': smap, 'fmap': fmap, 'bmap':bmap})
def filter_i(current, prev):
"""Arranges calculation of change maps; iterates over row-indices of pv_arr."""
current = ee.List(current)
pvs = current.slice(0, -1 )
pvQ = ee.Image(current.get(-1))
prev = ee.Dictionary(prev)
i = ee.Number(prev.get('i'))
alpha = ee.Image(prev.get('alpha'))
median = prev.get('median')
# Filter Ql p value if desired.
pvQ = ee.Algorithms.If(median, pvQ.focal_median(1.5), pvQ)
cmap = prev.get('cmap')
smap = prev.get('smap')
fmap = prev.get('fmap')
bmap = prev.get('bmap')
first = ee.Dictionary({'i': i, 'j': 1, 'alpha': alpha ,'pvQ': pvQ,
'cmap': cmap, 'smap': smap, 'fmap': fmap, 'bmap': bmap})
result = ee.Dictionary(ee.List(pvs).iterate(filter_j, first))
return ee.Dictionary({'i': i.add(1), 'alpha': alpha, 'median': median,
'cmap': result.get('cmap'), 'smap': result.get('smap'),
'fmap': result.get('fmap'), 'bmap': result.get('bmap')})
def dmap_iter(current, prev):
"""Reclassifies values in directional change maps."""
prev = ee.Dictionary(prev)
j = ee.Number(prev.get('j'))
image = ee.Image(current)
avimg = ee.Image(prev.get('avimg'))
diff = image.subtract(avimg)
# Get positive/negative definiteness.
posd = ee.Image(
negd = ee.Image(
bmap = ee.Image(prev.get('bmap'))
bmapj =
dmap = ee.Image.constant(ee.List.sequence(1, 3))
bmapj = bmapj.where(bmapj,
bmapj = bmapj.where(bmapj.And(posd),
bmapj = bmapj.where(bmapj.And(negd),
bmap = bmap.addBands(bmapj, overwrite=True)
# Update avimg with provisional means.
i = ee.Image(prev.get('i')).add(1)
avimg = avimg.add(image.subtract(avimg).divide(i))
# Reset avimg to current image and set i=1 if change occurred.
avimg = avimg.where(bmapj, image)
i = i.where(bmapj, 1)
return ee.Dictionary({'avimg': avimg, 'bmap': bmap, 'j': j.add(1), 'i': i})
def change_maps(im_list, median=False, alpha=0.01):
"""Calculates thematic change maps."""
k = im_list.length()
# Pre-calculate the P value array.
pv_arr = ee.List(p_values(im_list))
# Filter P values for change maps.
cmap = ee.Image(im_list.get(0)).select(0).multiply(0)
bmap = ee.Image.constant(ee.List.repeat(0,k.subtract(1))).add(cmap)
alpha = ee.Image.constant(alpha)
first = ee.Dictionary({'i': 1, 'alpha': alpha, 'median': median,
'cmap': cmap, 'smap': cmap, 'fmap': cmap, 'bmap': bmap})
result = ee.Dictionary(pv_arr.iterate(filter_i, first))
# Post-process bmap for change direction.
bmap = ee.Image(result.get('bmap'))
avimg = ee.Image(im_list.get(0))
j = ee.Number(0)
i = ee.Image.constant(1)
first = ee.Dictionary({'avimg': avimg, 'bmap': bmap, 'j': j, 'i': i})
dmap = ee.Dictionary(im_list.slice(1).iterate(dmap_iter, first)).get('bmap')
return ee.Dictionary(result.set('bmap', dmap))
# --------------------------
# Auxiliary functions
# -------------------------
def multipoly2polylist(multipoly):
''' Convert a multipolygon to a list of polygons'''
def fetchpoly(listelem):
return ee.Geometry.Polygon(multipoly.coordinates().get(listelem))
size = multipoly.coordinates().size()
polylist = ee.List.sequence(0, size.add(-1), 1)
def getS1collection(t1, t2, poly, platform = 'A'):
''' Return the longest sequence of S1 images within interval [t1,t2] which completely
overlap poly and have a unique relative orbit number and node '''
try :
collection = ee.ImageCollection('COPERNICUS/S1_GRD') \
.filterBounds(poly) \
.filterDate(ee.Date(t1), ee.Date(t2)) \
.filter(ee.Filter.eq('transmitterReceiverPolarisation', ['VV','VH'])) \
.filter(ee.Filter.eq('resolution_meters', 10)) \
.filter(ee.Filter.eq('instrumentMode', 'IW')) \
if platform != 'BOTH':
collection = collection.filter(ee.Filter.eq('platform_number', platform))
count = collection.size().getInfo()
if count < 2:
raise ValueError('Less than 2 images found')
collectionD = collection.filter(ee.Filter.eq('orbitProperties_pass', 'DESCENDING'))
countD = collectionD.size().getInfo()
collectionA = collection.filter(ee.Filter.eq('orbitProperties_pass', 'ASCENDING'))
countA = collectionA.size().getInfo()
if countA > countD:
collection = collectionA
node = 'ASCENDING'
collection = collectionD
relativeorbitnumbers = map(int,ee.List(collection.aggregate_array('relativeOrbitNumber_start')).getInfo())
rons = list(set(relativeorbitnumbers))
ron = rons[0]
if len(rons) == 2:
collection0 = collection.filter(ee.Filter.eq('relativeOrbitNumber_start', int(rons[0])))
count0 = collection0.size().getInfo()
collection1 = collection.filter(ee.Filter.eq('relativeOrbitNumber_start', int(rons[1])))
count1 = collection1.size().getInfo()
if count1 > count0:
ron = rons[1]
count = count1
collection = collection1
count = count0
collection = collection0
return (collection.sort('system:time_start'), count, node, ron)
except Exception as e:
print('Error: %s'%e)
def get_vvvh(image):
''' get 'VV' and 'VH' bands from sentinel-1 imageCollection and restore linear signal from db-values '''
def convert_timestamp_list(tsl):
''' Make timestamps in YYYYMMDD format '''
tsl= [x.replace('/','') for x in tsl]
tsl = ['T20'+x[4:]+x[0:4] for x in tsl]
return tsl
def clipList(current,prev):
''' clip a list of images and multiply by ENL'''
imlist = ee.List(ee.Dictionary(prev).get('imlist'))
poly = ee.Dictionary(prev).get('poly')
ctr = ee.Number(ee.Dictionary(prev).get('ctr'))
imlist = imlist.add(ee.Image(current).multiply(4.4).clip(poly))
return ee.Dictionary({'imlist':imlist,'poly':poly,'ctr':ctr.add(1)})
def assemble_and_run(t1, t2, poly, platform, alpha):
''' Collect a time sequence and return the change maps '''
# Gather the time series
collection, count, node, ron = getS1collection(t1, t2, poly, platform)
print('Images found: %i \nNode: %s \nRelative orbit: %i \nPlatform: %s'%(count, node, ron, platform))
acquisition_times = ee.List(collection.aggregate_array('system:time_start'))
pList =
first = ee.Dictionary({'imlist':ee.List([]),'poly':poly,'ctr':ee.Number(0)})
imList = ee.List(ee.Dictionary(pList.iterate(clipList,first)).get('imlist'))
#Run the algorithm
return (change_maps(imList, median = True, alpha = alpha), acquisition_times)
except Exception as e:
print('Error: %s'%e)
def export_as_image(result, assetpath, acquisition_times, poly):
''' Export change maps as a single multi-band image '''
smap = ee.Image(result.get('smap')).byte()
cmap = ee.Image(result.get('cmap')).byte()
fmap = ee.Image(result.get('fmap')).byte()
bmap = ee.Image(result.get('bmap')).byte()
timestamplist = []
for timestamp in acquisition_times.getInfo():
tmp = time.gmtime(int(timestamp)/1000)
timestamplist.append(time.strftime('%x', tmp))
timestamplist = convert_timestamp_list(timestamplist)
# in case of duplicates
timestamplist1 = [timestamplist[i] + '_' + str(i+1) for i in range(len(timestamplist))]
cmaps =,smap,fmap,bmap).rename(['cmap','smap','fmap']+timestamplist1[1:])
assexport = ee.batch.Export.image.toAsset(cmaps.byte().clip(poly),
pyramidingPolicy={".default": 'sample'},
print('Exporting change maps to %s\ntask id: %s'%(assetpath,str(
def export_as_image_collection(result, assetpath, acquisition_times, poly):
''' Export the bitemporal change maps as images with properties 'system:time_start'
and 'system:time_end' to an existing ImageCollection '''
def export_tagged_images(current, prev):
current = ee.Number(current)
current_str = ee.String(current)
prev = ee.Dictionary(prev)
# assetpath = prev.get('assetpath')
bmap = ee.Image(prev.get('bmap'))
acquisition_times = ee.List(prev.get('acquisition_times'))
image =
image = image.set('system:time_start', acquisition_times.get(current)) \
.set('system:time_end', acquisition_times.get(current.add(1))) \
.set('system:image_id', ee.String('bmap').cat(current_str))
assetId = ee.String(assetpath).cat(ee.String('bmap').cat(current_str))
assexport = ee.batch.Export.image.toAsset(image.clip(poly),
pyramidingPolicy={".default": 'sample'},
# assexport.start()
return ee.Dictionary({'bmap': bmap, 'assetpath': assetpath, 'acquisition_times': acquisition_times})
bmap = ee.Image(result.get('bmap')).byte()
size = bmap.bandNames().size()
bands = ee.List.sequence(0, size.add(-1))
first = ee.Dictionary({ 'bmap': bmap, 'assetpath': assetpath, 'acquisition_times': acquisition_times })
result = ee.Dictionary(bands.iterate(export_tagged_images, first))
except Exception as e:
print('Error: %s'%e)
# -------------------
# main routine
# -------------------
def main():
usage = '''Usage:
Perform sequential SAR change detection
python %s [OPTIONS] assetname
-h this help
-c <list> polygon coordinates of area of interest (default: Houston AOI)
-t <list> time interval (default: ['2018-01-01','2019-01-01'])
-a <float> false positive rate (default: 0.01)
-p <int> platform ('A', 'B' or 'BOTH' default: 'A')
-d <boolean> If set: export as ee.ImageCollection, else: export as ee.Image
If -d is not set:
For a sequence of k observations, change maps are exported to GEE assets as k+3-band image:
band 1: cmap interval of last recorded change (0 ... k-1) byte
band 2: smap interval of first recorded change (0 ... k-1) byte
band 3: fmap number of changes in all (0, ... k-1) byte
bands 4 through k+3: bitemporal change maps (labeled as end of interval time)
consisting of loewner order of changes in each interval (0, 1, 2, 3) byte
Export the bitemporal change maps as images with properties 'system:time_start' and 'system:time_end'
to an existing ImageCollection
options, args = getopt.getopt(sys.argv[1:], 'hc:t:a:p:d')
# Houston AOI
# coords = [[[-96.06878489327627, 29.823701939611126],
# [-96.06878489327627, 29.1423007492751],
# [-95.00860422921377, 29.1423007492751],
# [-95.00860422921377, 29.823701939611126]]]
# Juelich AOI
coords = [[[6.348381042480468, 50.88527552329112],
[6.479530334472656, 50.88527552329112],
[6.479530334472656, 50.94696387166774],
[6.348381042480468, 50.94696387166774],
[6.348381042480468, 50.88527552329112]]]
t1, t2 = ['2018-01-01','2019-01-01']
alpha = 0.01
platform = 'A'
as_collection = False
for option, value in options:
if option == '-h':
elif option == '-c':
coords = literal_eval(value)
elif option == '-t':
t1, t2 = literal_eval(value)
elif option == '-a':
alpha = literal_eval(value)
elif option == '-p':
platform = value
elif option == '-d':
as_collection = True
assetpath = 'projects/ee-mortcanty/assets/contract/' + args[0]
poly = ee.Geometry.Polygon(coords)
print('Sequential SAR Change Detection')
result, acquisition_times = assemble_and_run(t1, t2, poly, platform, alpha)
if as_collection:
export_as_image_collection(result, assetpath, acquisition_times, poly)
export_as_image(result, assetpath, acquisition_times, poly)
if __name__ == '__main__':
