Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active August 29, 2015 14:23
Show Gist options
  • Save sixy6e/0a67ae069f1587995948 to your computer and use it in GitHub Desktop.
Save sixy6e/0a67ae069f1587995948 to your computer and use it in GitHub Desktop.
baresoil_percentiles

A baresoils percentiles mosaicing workflow. Just to be used as a computational baseline.

#!/usr/bin/env python
from datetime import date
from os.path import join as pjoin
import gdal
import luigi
import numpy
import numexpr
from datacube.api.model import DatasetType, Fc25Bands, Ls57Arg25Bands
from datacube.api.model import Pq25Bands
from datacube.api.model import Satellite
from datacube.api.query import list_tiles_as_list
from datacube.config import Config
from datacube.api.utils import get_dataset_metadata, get_mask_pqa
from datacube.api.utils import NDV, calculate_ndvi#, date_to_integer
from datacube.api.utils import get_dataset_data, get_mask_wofs
from eotools.tiling import generate_tiles
from eotools.tiling import TiledOutput
from eotools.geobox import GriddedGeoBox
from eotools.pq_utils import extract_pq_flags
from eotools.pq_utils import pq_apply_dict
def bs_workflow(tiles, percentile=90, xtile=None, ytile=None,
out_fnames=None):
"""
A baseline workflow for doing the baresoil percentile, NBAR, FC
corresponding mosaics.
"""
# Get some basic image info
ds_type = DatasetType.FC25
ds = tiles[0]
dataset = ds.datasets[ds_type]
md = get_dataset_metadata(dataset)
samples, lines = md.shape
time_slices = len(tiles)
geobox = GriddedGeoBox.from_gdal_dataset(gdal.Open(dataset.path))
# Initialise the tiling scheme for processing
if xtile is None:
xtile = samples
if ytile is None:
ytile = lines
chunks = generate_tiles(samples, lines, xtile=samples, ytile=ytile,
generator=False)
# Define no-data
no_data_value = NDV
nan = numpy.float32(numpy.nan) # for the FC dtype no need for float64
# Define the output files
if out_fnames is None:
nbar_outfname = 'nbar_best_pixel'
fc_outfname = 'fc_best_pixel'
sat_outfname = 'sat_best_pixel'
date_outfnme = 'date_best_pixel'
else:
nbar_outfname = out_fnames[0]
fc_outfname = out_fnames[1]
sat_outfname = out_fnames[2]
date_outfnme = out_fnames[3]
nbar_outnb = len(Ls57Arg25Bands)
fc_outnb = len(Fc25Bands)
out_dtype = gdal.GDT_Int16
nbar_outds = TiledOutput(nbar_outfname, samples=samples, lines=lines,
bands=nbar_outnb, dtype=out_dtype,
nodata=no_data_value, geobox=geobox)
fc_outds = TiledOutput(fc_outfname, samples=samples, lines=lines,
bands=fc_outnb, dtype=out_dtype,
nodata=no_data_value, geobox=geobox)
sat_outds = TiledOutput(sat_outfname, samples=samples, lines=lines,
dtype=out_dtype, nodata=no_data_value,
geobox=geobox)
date_outds = TiledOutput(date_outfnme, samples=samples, lines=lines,
dtype=gdal.GDT_Float64, nodata=no_data_value,
geobox=geobox)
satellite_code = {Satellite.LS5: 5, Satellite.LS7: 7, Satellite.LS8: 8}
fc_bands_subset = [Fc25Bands.PHOTOSYNTHETIC_VEGETATION,
Fc25Bands.NON_PHOTOSYNTHETIC_VEGETATION,
Fc25Bands.UNMIXING_ERROR]
# Loop over each spatial tile/chunk and build up the time series
for chunk in chunks:
ys, ye = chunk[0]
xs, xe = chunk[1]
ysize = ye - ys
xsize = xe - xs
dims = (time_slices, ysize, xsize)
# Initialise the intermediate and best_pixel output arrays
data = {}
best_pixel_nbar = {}
best_pixel_fc = {}
stack_bare_soil = numpy.zeros(dims, dtype='float32')
stack_sat = numpy.zeros(dims, dtype='int16')
stack_date = numpy.zeros(dims, dtype='int32')
best_pixel_satellite = numpy.zeros((ysize, xsize), dtype='int16')
best_pixel_date = numpy.zeros((ysize, xsize), dtype='float64')
best_pixel_satellite.fill(no_data_value)
best_pixel_date.fill(no_data_value)
stack_nbar = {}
for band in Ls57Arg25Bands:
stack_nbar[band] = numpy.zeros(dims, dtype='int16')
best_pixel_nbar[band] = numpy.zeros((ysize, xsize),
dtype='int16')
best_pixel_nbar[band].fill(no_data_value)
stack_fc = {}
for band in fc_bands_subset:
stack_fc[band] = numpy.zeros(dims, dtype='int16')
best_pixel_fc[band] = numpy.zeros((ysize, xsize),
dtype='int16')
best_pixel_fc[band].fill(no_data_value)
for idx, ds in enumerate(tiles):
pqa = ds.datasets[DatasetType.PQ25]
nbar = ds.datasets[DatasetType.ARG25]
fc = ds.datasets[DatasetType.FC25]
try:
wofs = ds.datasets[DatasetType.WATER]
except KeyError:
print "Missing water for:\n {}".format(ds.end_datetime)
wofs = None
# mask = numpy.zeros((ysize, xsize), dtype='bool')
# TODO update to use the api's version of extract_pq
#pq_data = get_dataset_data(pqa, x=xs, y=ys, x_size=xsize,
# y_size=ysize)[Pq25Bands.PQ]
#mask = extract_pq_flags(pq_data, combine=True)
#mask = ~mask
mask = get_mask_pqa(pqa, x=xs, y=ys, x_size=xsize, y_size=ysize)
# WOfS
if wofs is not None:
mask = get_mask_wofs(wofs, x=xs, y=ys, x_size=xsize,
y_size=ysize, mask=mask)
# NBAR
data[DatasetType.ARG25] = get_dataset_data(nbar, x=xs, y=ys,
x_size=xsize,
y_size=ysize)
# NDVI
red = data[DatasetType.ARG25][Ls57Arg25Bands.RED]
nir = data[DatasetType.ARG25][Ls57Arg25Bands.NEAR_INFRARED]
ndvi = calculate_ndvi(red, nir)
ndvi[mask] = no_data_value
mask |= numexpr.evaluate("(ndvi < 0.0) | (ndvi > 0.3)")
# FC
data[DatasetType.FC25] = get_dataset_data(fc, x=xs, y=ys,
x_size=xsize,
y_size=ysize)
bare_soil = data[DatasetType.FC25][Fc25Bands.BARE_SOIL]
mask |= numexpr.evaluate("(bare_soil < 0) | (bare_soil > 8000)")
# apply the mask to each dataset and insert into the 3D array
for band in Ls57Arg25Bands:
data[DatasetType.ARG25][band][mask] = no_data_value
stack_nbar[band][idx] = data[DatasetType.ARG25][band]
for band in fc_bands_subset:
data[DatasetType.FC25][band][mask] = no_data_value
stack_fc[band][idx] = data[DatasetType.FC25][band]
# Add bare soil, satellite and date to the 3D arrays
stack_bare_soil[idx] = bare_soil
stack_bare_soil[idx][mask] = nan
stack_sat[idx][:] = satellite_code[fc.satellite]
dtime = int(ds.end_datetime.strftime('%Y%m%d'))
stack_date[idx][:] = dtime
# Calcualte the percentile
pct_fc = numpy.nanpercentile(stack_bare_soil, percentile,
axis=0, interpolation='nearest')
# Loop over each time slice and generate a mosaic for each dataset_type
for idx in range(time_slices):
pct_idx = pct_fc == stack_bare_soil[idx]
for band in Ls57Arg25Bands:
band_data = stack_nbar[band]
best_pixel_nbar[band][pct_idx] = band_data[idx][pct_idx]
for band in fc_bands_subset:
band_data = stack_fc[band]
best_pixel_fc[band][pct_idx] = band_data[idx][pct_idx]
best_pixel_satellite[pct_idx] = stack_sat[idx][pct_idx]
best_pixel_date[pct_idx] = stack_date[idx][pct_idx]
# Output the current spatial chunk for each dataset
for band in Ls57Arg25Bands:
bn = band.value
band_data = best_pixel_nbar[band]
nbar_outds.write_tile(band_data, chunk, raster_band=bn)
for band in fc_bands_subset:
bn = band.value
band_data = best_pixel_fc[band]
fc_outds.write_tile(band_data, chunk, raster_band=bn)
fc_outds.write_tile(pct_fc, chunk,
raster_band=Fc25Bands.BARE_SOIL.value)
sat_outds.write_tile(best_pixel_satellite, chunk)
date_outds.write_tile(best_pixel_date, chunk)
# Close the output files
nbar_outds.close()
fc_outds.close()
sat_outds.close()
date_outds.close()
if __name__ == '__main__':
config = Config()
#satellites = [Satellite(i) for i in ['LS5', 'LS7', 'LS8']]
satellites = [Satellite(i) for i in ['LS5', 'LS7']]
min_date = date(1987, 01, 01)
max_date = date(2015, 12, 31)
ds_type = DatasetType.ARG25
x_cell = [146]
y_cell = [-34]
tiles = list_tiles_as_list(x=x_cell, y=y_cell, acq_min=min_date,
acq_max=max_date,
satellites=satellites,
datasets=ds_type,
database=config.get_db_database(),
user=config.get_db_username(),
password=config.get_db_password(),
host=config.get_db_host(),
port=config.get_db_port())
#bs_workflow(tiles, outdir='/g/data/v10/testing_ground/jps547/percentiles')
#bs_workflow(tiles, outdir='/g/data/v10/testing_ground/jps547/percentiles/pct_95', percentile=95)
bs_workflow(tiles, outdir='/g/data/v10/testing_ground/jps547/percentiles/test', percentile=95)
(lp1
S'/g/data/v10/testing_ground/jps547/percentiles/test2/146_-34/datasets_list.pkl'
p2
a.
[work]
output_directory = /g/data/v10/testing_ground/jps547/percentiles/test2
logs_directory = %(output_directory)s/logs
[db]
satellites = LS5,LS7
min_date = 2008_01_01
max_date = 2008_03_31
cell_list = 1
xcells = 121,142,118,146,135
ycells = -29,-22,-23,-34,-18
cell_xmin = 146
cell_ymin = -34
cell_xmax = 147
cell_ymax = -33
[internals]
percentiles = 90,95
cells_per_node = 10
x_tile_size = -1
y_tile_size = 100
[outputs]
pbs_filename = bare_soil_pbsdsh.bash
query_filename = datasets_list.pkl
queries_list = cell_queries.pkl
cell_groups = cell_groups.pkl
groups = NBAR,FC,Satellite,date
basefname_format = {group}_best_pixel_{pct}_percentile.dat
[pbs]
project = v10
queue = express
walltime = 01:00:00
email = joshua.sixsmith@ga.gov.au
modules = python/2.7.6,numpy/1.9.2,gdal,rasterio,eo-tools/0.4rc,luigi-mpi,agdc-api/0.1.0-b20150622-DEWNR
#!/usr/bin/env python
import luigi
import os
from os.path import join as pjoin, exists, dirname
import cPickle as pickle
import glob
import argparse
import logging
from datacube.api.model import DatasetType
from baresoil_percentile_baseline import bs_workflow
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
class PercentileTask(luigi.Task):
"""
Computes the percentile task for a given cell.
"""
query_fname = luigi.Parameter()
pct = luigi.IntParameter()
output_dir = luigi.Parameter()
def requires(self):
return []
def output(self):
basename = pjoin(self.output_dir, 'percentile_{}.completed')
out_fname = basename.format(self.pct)
return luigi.LocalTarget(out_fname)
def run(self):
data_groups = CONFIG.get('outputs', 'groups').split(',')
base_fname_format = pjoin(self.output_dir,
CONFIG.get('outputs', 'basefname_format'))
out_fnames = []
for group in data_groups:
out_fnames.append(base_fname_format.format(group=group,
pct=self.pct))
# Load the db query
with open(self.query_fname, 'r') as f:
db_tiles = pickle.load(f)
# Get the processing tile sizes
xtile = int(CONFIG.get('internals', 'x_tile_size'))
ytile = int(CONFIG.get('internals', 'y_tile_size'))
xtile = None if xtile <= 0 else xtile
ytile = None if ytile <= 0 else ytile
# Execute
bs_workflow(db_tiles, percentile=self.pct, xtile=xtile,
ytile=ytile, out_fnames=out_fnames)
with self.output().open('w') as outf:
outf.write('Complete')
class DefinePercentileTasks(luigi.Task):
"""
Issues PercentileTask's to each cell associated
with the tile defined by the start and end index.
"""
idx1 = luigi.IntParameter()
idx2 = luigi.IntParameter()
def requires(self):
base_out_dir = CONFIG.get('work', 'output_directory')
queries_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'queries_list'))
with open(queries_fname, 'r') as infile:
cell_queries = pickle.load(infile)
percentiles = CONFIG.get('internals', 'percentiles').split(',')
percentiles = [int(pct) for pct in percentiles]
tasks = []
for cell_query in cell_queries[self.idx1:self.idx2]:
print cell_query
out_dir = dirname(cell_query)
for pct in percentiles:
tasks.append(PercentileTask(cell_query, pct, out_dir))
return tasks
def output(self):
out_dir = CONFIG.get('work', 'output_directory')
out_fname = pjoin(out_dir,
'DefinePercentileTasks{}:{}.completed')
out_fname = out_fname.format(self.idx1, self.idx2)
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
# Setup command-line arguments
desc = "Processes zonal stats for a given set of cells."
hlp = ("The tile/chunk index to retieve from the tiles list. "
"(Needs to have been previously computed to a file named tiles.pkl")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--tile', type=int, help=hlp)
parsed_args = parser.parse_args()
tile_idx = parsed_args.tile
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/stats_workflow_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Get the list of tiles (groups of cells that each node will operate on
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'),
CONFIG.get('outputs', 'cell_groups'))
with open(tiles_list_fname, 'r') as in_file:
tiles = pickle.load(in_file)
# Initialise the job
tile = tiles[tile_idx]
tasks = [DefinePercentileTasks(tile[0], tile[1])]
luigi.build(tasks, local_scheduler=True, workers=16)
luigi.run()
#!/usr/bin/env python
import luigi
import argparse
from datetime import date
import os
from os.path import join as pjoin, exists, dirname, abspath
import cPickle as pickle
import subprocess
import logging
from datacube.api.query import list_tiles_as_list
from datacube.api.model import DatasetType, Satellite, BANDS
from eotools.tiling import generate_tiles
PBS_DSH = (
"""#!/bin/bash
#PBS -P {project}
#PBS -q {queue}
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB
#PBS -l wd
#PBS -me
#PBS -M {email}
NNODES={nnodes}
for i in $(seq 1 $NNODES); do
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \
--tile $[$i - 1]" &
done;
wait
""")
def query_cells(xcells, ycells, satellites, min_date, max_date, dataset_types,
output_dir):
"""
Query the DB for each cell.
Currently the config file for this workflow requires the user to
specify a rectangular region. I have another workflow that takes a
vector file as input.
"""
base_out_fname = CONFIG.get('outputs', 'query_filename')
cell_queries = []
for ycell in ycells:
for xcell in xcells:
# create the output directory
cell_dir = '{}_{}'.format(int(xcell), int(ycell))
out_cell_dir = pjoin(output_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
tiles = list_tiles_as_list(x=[xcell], y=[ycell],
acq_min=min_date,
acq_max=max_date,
dataset_types=dataset_types,
satellites=satellites)
out_fname = pjoin(out_cell_dir, base_out_fname)
cell_queries.append(out_fname)
with open(out_fname, 'w') as outf:
pickle.dump(tiles, outf)
return cell_queries
def query_cells2(xcells, ycells, satellites, min_date, max_date, dataset_types,
output_dir):
"""
Query the DB for each cell.
Currently the config file for this workflow requires the user to
specify a rectangular region. I have another workflow that takes a
vector file as input.
"""
base_out_fname = CONFIG.get('outputs', 'query_filename')
cell_queries = []
for i in range(len(ycells)):
ycell = ycells[i]
xcell = xcells[i]
# create the output directory
cell_dir = '{}_{}'.format(int(xcell), int(ycell))
out_cell_dir = pjoin(output_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
tiles = list_tiles_as_list(x=[xcell], y=[ycell],
acq_min=min_date,
acq_max=max_date,
dataset_types=dataset_types,
satellites=satellites)
out_fname = pjoin(out_cell_dir, base_out_fname)
cell_queries.append(out_fname)
with open(out_fname, 'w') as outf:
pickle.dump(tiles, outf)
return cell_queries
if __name__ == '__main__':
desc = "Queries the AGDC for the percentile workflow."
parser = argparse.ArgumentParser(description=desc)
parser.add_argument("--cfg",
help=("The config file used to setup "
"baresoil workflow."))
args = parser.parse_args()
cfg = args.cfg
# Setup the config file
global CONFIG
if cfg is None:
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
else:
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(cfg)
# Create the output directory
out_dir = CONFIG.get('work', 'output_directory')
if not exists(out_dir):
os.makedirs(out_dir)
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/query_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Define the DatasetTypes
# TODO have this defined through the config.cfg
ds_types = [DatasetType.ARG25, DatasetType.FC25, DatasetType.PQ25,
DatasetType.WATER]
# Get the satellites we wish to query
satellites = CONFIG.get('db', 'satellites')
satellites = [Satellite(i) for i in satellites.split(',')]
# Get the min/max date range to query
min_date = CONFIG.get('db', 'min_date')
min_date = [int(i) for i in min_date.split('_')]
min_date = date(min_date[0], min_date[1], min_date[2])
max_date = CONFIG.get('db', 'max_date')
max_date = [int(i) for i in max_date.split('_')]
max_date = date(max_date[0], max_date[1], max_date[2])
# Do we use a list a recatangular block of cells
cell_list = int(CONFIG.get('db', 'cell_list'))
if cell_list <= 0:
# Get the min/max cell range to query
cell_xmin = int(CONFIG.get('db', 'cell_xmin'))
cell_ymin = int(CONFIG.get('db', 'cell_ymin'))
cell_xmax = int(CONFIG.get('db', 'cell_xmax'))
cell_ymax = int(CONFIG.get('db', 'cell_ymax'))
xcells = range(cell_xmin, cell_xmax, 1)
ycells = range(cell_ymin, cell_ymax, 1)
else:
xcells = CONFIG.get('db', 'xcells')
xcells = [int(x) for x in xcells.split(',')]
ycells = CONFIG.get('db', 'ycells')
ycells = [int(y) for y in ycells.split(',')]
output_dir = CONFIG.get('work', 'output_directory')
cell_queries = query_cells2(xcells, ycells, satellites, min_date, max_date,
ds_types, output_dir)
queries_fname = pjoin(output_dir, CONFIG.get('outputs', 'queries_list'))
with open(queries_fname, 'w') as outf:
pickle.dump(cell_queries, outf)
# Create the tiles list that contains groups of cells to operate on
# Each node will have a certain number of cells to work on
cpnode = int(CONFIG.get('internals', 'cells_per_node'))
tiles = generate_tiles(len(cell_queries), 1, cpnode, 1, False)
tiles = [x for y, x in tiles]
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'cell_groups'))
with open(tiles_list_fname, 'w') as out_file:
pickle.dump(tiles, out_file)
# Setup the modules to use for the job
modules = CONFIG.get('pbs', 'modules').split(',')
modules = ['module load {}; '.format(module) for module in modules]
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;')
modules = ''.join(modules)
# Calculate the job node and cpu requirements
nnodes = len(tiles)
ncpus = nnodes * 16
mem = nnodes * 32
# Populate the PBS shell script
project = CONFIG.get('pbs', 'project')
queue = CONFIG.get('pbs', 'queue')
walltime = CONFIG.get('pbs', 'walltime')
email = CONFIG.get('pbs', 'email')
py_file = abspath(pjoin(dirname(__file__), 'percentile_workflow.py'))
py_path = abspath(dirname(__file__))
pbs_job = PBS_DSH.format(project=project, queue=queue,
walltime=walltime, ncpus=ncpus, mem=mem,
email=email, nnodes=nnodes,
modules=modules,
pyfile=py_file)
# Out put the shell script to disk
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename'))
with open(pbs_fname, 'w') as out_file:
out_file.write(pbs_job)
subprocess.check_call(['qsub', pbs_fname])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment