Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active December 18, 2015 05:00
Show Gist options
  • Save sixy6e/9ea690493970eb12a237 to your computer and use it in GitHub Desktop.
Save sixy6e/9ea690493970eb12a237 to your computer and use it in GitHub Desktop.
pilbara
[work]
output_directory = /g/data/v10/testing_ground/jps547/sue-fyfe/results
logs_directory = %(output_directory)s/logs
satellites = LS5,LS7,LS8
min_date = 1985_01_01
max_date = 2015_12_31
vector_filename = /g/data/v10/testing_ground/jps547/sue-fyfe/LH_Nominated_EC_Saltmarsh.shp
[internals]
envelope = 0
cells_per_node = 1
pandas_groups = 10
pandas_chunksize = 1000000
water_directory = /g/data/u46/wofs/water_f7s/extents
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp
[outputs]
cells_list = cells_to_process.pkl
tiles_list = tiles.pkl
pbs_filename = class_dist_pbsdsh.bash
query_filename = datasets_list.pkl
stats_filename_format = class_dist_result_{}.h5
combined_cell_stats_filename = combined_cell_stats.h5
rasterise_filename = rasterised_result.tif
final_output_filename = saltmarsh_class_distribution.h5
groups_filename_format = tmp_group_{}.h5
#[core]
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg
[pbs]
project = v10
queue = normal
walltime = 05:00:00
email = joshua.sixsmith@ga.gov.au
modules = geo-processing,IDL_functions/0.3,pyephem/3.7.5.1,numexpr,eo-tools/0.4,gaip/test,agdc-api/0.1.0-b20150807
#!/usr/bin/env python
import numpy
from datacube.api.utils import get_dataset_data_with_pq, TCI_COEFFICIENTS
from datacube.api.utils import calculate_tassel_cap_index, TasselCapIndex
from datacube.api.utils import calculate_ndvi
from datacube.api.utils import get_dataset_metadata
#def classifier(arg25_dataset, pq25_dataset):
# """
# Runs the classifier designed by S. Fyfe.
# """
# # Get the metadata
# md = get_dataset_metadata(arg25_dataset)
# cols, rows = md.shape
#
# # Read the data and mask pixels via the PQ dataset
# data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset)
#
# # Get the wetness coefficients and calculate
# coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS]
# wetness = calculate_tassel_cap_index(data, coef)
#
# # NDVI
# ndvi = calculate_ndvi(data[arg25_dataset.bands.RED],
# data[arg25_dataset.bands.NEAR_INFRARED],
# output_ndv=numpy.nan)
#
# # Dump the reflectance data, the classifier only needs tc_wetness and ndvi
# del data
#
# # Allocate the result
# classified = numpy.zeros((rows,cols), dtype='uint8')
#
# # Water
# r1 = wetness > 0
# classified[r1] = 1
# _tmp = ~r1
#
# r2 = _tmp & ((wetness >= -250) & (wetness < 0))
#
# # non-veg
# r3 = r2 & (ndvi <= 0.3)
# classified[r3] = 2
# #_tmp2 = ~r3
# _tmp2 = _tmp & r2 & ~r3
#
# #r4 = _tmp & _tmp2 & (ndvi <= 0.45)
# r4 = _tmp2 & (ndvi <= 0.45)
# #_tmp3 = ~r4
# _tmp3 = _tmp2 & ~r4
#
# # saltmarsh
# classified[r4] = 3
#
# #r5 = _tmp & _tmp2 &_tmp3 & (ndvi <= 0.6)
# r5 = _tmp3 & (ndvi <= 0.6)
#
# # mangrove/saltmarsh
# classified[r5] = 4
#
# # mangrove
# #_tmp5 = _tmp & _tmp2 &_tmp3 & ~r5
# _tmp5 = _tmp3 & ~r5
# classified[_tmp5] = 5
#
# _tmp5 = _tmp & ~r2
#
# r6 = _tmp5 & (wetness <= -750)
#
# r7 = r6 & (ndvi >= 0.3)
#
# # saltmarsh
# classified[r7] = 3
#
# # non-veg
# _tmp4 = r6 & ~r7
# classified[_tmp4] = 2
#
# _tmp3 = _tmp5 & ~r6
# r8 = _tmp3 & (ndvi <= 0.3)
#
# # non-veg
# classified[r8] = 2
#
# r9 = _tmp3 & ~r8 & (ndvi <= 0.45)
#
# # saltmarsh
# classified[r9] = 3
# _tmp4 = _tmp3 & ~r9
#
# r10 = _tmp4 & (ndvi <= 0.6)
#
# # mangrove/saltmarsh
# classified[r10] = 4
#
# # mangrove
# classified[_tmp4 & ~r10] = 5
#
# # set any nulls
# valid = numpy.isfinite(ndvi)
# classified[~valid] = 0
#
# return classified
def classifier(arg25_dataset, pq25_dataset):
"""
Runs the classifier designed by S. Fyfe.
"""
# Get the metadata
md = get_dataset_metadata(arg25_dataset)
cols, rows = md.shape
# Read the data and mask pixels via the PQ dataset
data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset)
# Get the wetness coefficients and calculate
coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS]
wetness = calculate_tassel_cap_index(data, coef)
# NDVI
ndvi = calculate_ndvi(data[arg25_dataset.bands.RED],
data[arg25_dataset.bands.NEAR_INFRARED],
output_ndv=numpy.nan)
# Dump the reflectance data, the classifier only needs tc_wetness and ndvi
del data
# Allocate the result
classified = numpy.zeros((rows,cols), dtype='uint8')
# Water
r1 = wetness > 0
classified[r1] = 1
_tmp = ~r1
#r2 = _tmp & ((wetness >= -250) & (wetness < 0))
r2 = (wetness >= -250) & (wetness < 0)
r3 = ndvi <= 0.3
#_tmp2 = _tmp & r2 & ~r3
_tmp2 = _tmp & r2
# non-veg
classified[_tmp2 & r3] = 2
_tmp3 = _tmp2 & ~r3
r4 = ndvi <= 0.45
# saltmarsh
classified[_tmp3 & r4] = 3
_tmp2 = _tmp3 & ~r4
r5 = ndvi <= 0.6
# mangrove/saltmarsh
classified[_tmp2 & r5] = 4
# mangrove
classified[_tmp2 & ~r5] = 5
# finished rhs of r2
_tmp2 = _tmp & ~r2
r6 = wetness < -750
r7 = ndvi >= 0.3
_tmp3 = _tmp2 & r6
# saltmarsh
classified[_tmp3 & r7] = 3
# non-veg
classified[_tmp3 & ~r7] = 2
r8 = ndvi <= 0.3
_tmp3 = _tmp2 & ~r6
# non-veg
classified[_tmp3 & r8] = 2
r9 = ndvi <= 0.45
_tmp2 = _tmp3 & ~r8
# saltmarsh
classified[_tmp2 & r9] = 3
r10 = ndvi <= 0.6
_tmp3 = _tmp2 & ~r9
# mangrove-saltmarsh
classified[_tmp3 & r10] = 4
# mangrove
classified[_tmp3 & ~r10] = 5
# set any nulls
valid = numpy.isfinite(ndvi)
classified[~valid] = 0
return classified
def classifier3(arg25_dataset, pq25_dataset):
"""
Runs the classifier designed by S. Fyfe.
"""
# Get the metadata
md = get_dataset_metadata(arg25_dataset)
cols, rows = md.shape
# Read the data and mask pixels via the PQ dataset
data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset)
# Get the wetness coefficients and calculate
coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS]
wetness = calculate_tassel_cap_index(data, coef)
# NDVI
ndvi = calculate_ndvi(data[arg25_dataset.bands.RED],
data[arg25_dataset.bands.NEAR_INFRARED],
output_ndv=numpy.nan)
# Dump the reflectance data, the classifier only needs tc_wetness and ndvi
del data
# Allocate the result
classified = numpy.zeros((rows,cols), dtype='uint8')
# Water
r1 = wetness > 0
classified[r1] = 2
_tmp = ~r1
#r2 = _tmp & ((wetness >= -250) & (wetness < 0))
r2 = (wetness >= -250) & (wetness < 0)
r3 = ndvi <= 0.3
#_tmp2 = _tmp & r2 & ~r3
_tmp2 = _tmp & r2
# non-veg
classified[_tmp2 & r3] = 5
_tmp3 = _tmp2 & ~r3
r4 = ndvi <= 0.45
# saltmarsh
classified[_tmp3 & r4] = 6
_tmp2 = _tmp3 & ~r4
r5 = ndvi <= 0.6
# mangrove/saltmarsh
classified[_tmp2 & r5] = 7
# mangrove
classified[_tmp2 & ~r5] = 3
# finished rhs of r2
_tmp2 = _tmp & ~r2
r6 = wetness < -750
r7 = ndvi >= 0.3
_tmp3 = _tmp2 & r6
# saltmarsh
classified[_tmp3 & r7] = 8
# non-veg
classified[_tmp3 & ~r7] = 4
r8 = ndvi <= 0.3
_tmp3 = _tmp2 & ~r6
# non-veg
classified[_tmp3 & r8] = 9
r9 = ndvi <= 0.45
_tmp2 = _tmp3 & ~r8
# saltmarsh
classified[_tmp2 & r9] = 10
r10 = ndvi <= 0.6
_tmp3 = _tmp2 & ~r9
# mangrove-saltmarsh
classified[_tmp3 & r10] = 11
# mangrove
classified[_tmp3 & ~r10] = 1
# set any nulls
valid = numpy.isfinite(ndvi)
classified[~valid] = 0
return classified
#!/usr/bin/env python
import luigi
import os
from os.path import join as pjoin, exists, dirname
import cPickle as pickle
import glob
import argparse
import logging
import pandas
import rasterio
from datacube.api.model import DatasetType
from classifier import classifier
from zonal_stats import zonal_stats
from zonal_stats import zonal_class_distribution
from image_processing.segmentation import rasterise_vector
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
class RasteriseTask(luigi.Task):
"""
Computes the rasterisation for a cell.
"""
out_dir = luigi.Parameter()
def requires(self):
return []
def output(self):
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'rasterise_filename'))
ds_list_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
vector_fname = CONFIG.get('work', 'vector_filename')
img_fname = ds_list[0].datasets[DatasetType.FC25].path
with rasterio.open(img_fname) as src:
crs = src.crs
transform = src.affine
height = src.height
width = src.width
res = rasterise_vector(vector_fname, shape=(height, width),
transform=transform, crs=crs)
kwargs = {'count': 1,
'width': width,
'height': height,
'crs': crs,
'transform': transform,
'dtype': res.dtype.name,
'nodata': 0}
with rasterio.open(out_fname, 'w', **kwargs) as src:
src.write(1, res)
# We could just set the image as the Luigi completion target...
with self.output().open('w') as outf:
outf.write('Complete')
class ClassifierStatsTask(luigi.Task):
"""
Computes a zonal class distribution task for the required dataset.
"""
idx = luigi.IntParameter()
out_fname = luigi.Parameter()
def requires(self):
return [RasteriseTask(dirname(self.out_fname))]
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
rasterised_fname = pjoin(dirname(self.out_fname),
CONFIG.get('outputs', 'rasterise_filename'))
ds_list_fname = pjoin(dirname(self.out_fname),
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
dataset = ds_list[self.idx]
nbar_ds = dataset.datasets[DatasetType.ARG25]
pq_ds = dataset.datasets[DatasetType.PQ25]
classified_img = classifier(nbar_ds, pq_ds)
# hard code; as this will be short lived due to agdc-v2 development
class_ids = [0, 1, 2, 3, 4, 5]
with rasterio.open(rasterised_fname, 'r') as src:
zones_img = src.read(1)
result = zonal_class_distribution(classified_img, zones_img,
class_ids=class_ids)
# Set the timestamp
result['Timestamp'] = dataset.start_datetime
# Open the output hdf5 file
store = pandas.HDFStore(self.output().path)
# Write the dataframe
store.append('data', result)
# Save and close the file
store.close()
class CellStatsTask(luigi.Task):
"""
For a given cell define a classifier stats task for each required Dataset.
"""
out_dir = luigi.Parameter()
def requires(self):
base_name = CONFIG.get('outputs', 'stats_filename_format')
base_name = pjoin(self.out_dir, base_name)
ds_list_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
targets = []
for idx, ds in enumerate(ds_list):
timestamp = bytes(ds.start_datetime).replace(' ', '-')
out_fname = base_name.format(timestamp)
targets.append(ClassifierStatsTask(idx, out_fname))
return targets
def output(self):
out_fname = pjoin(self.out_dir, 'CellStatsTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class CombineCellStatsTask(luigi.Task):
"""
Combines all stats files from a single cell into a single file.
"""
out_dir = luigi.Parameter()
def requires(self):
return [CellStatsTask(self.out_dir)]
def output(self):
out_fname = pjoin(self.out_dir, 'CombineCellStatsTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
# Get a list of the stats files for each timeslice
stats_files_list = glob.glob(pjoin(self.out_dir, '*.h5'))
# Create an output file that we can continually append data
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs',
'combined_cell_stats_filename'))
combined_store = pandas.HDFStore(out_fname)
store = pandas.HDFStore(stats_files_list[0])
# If there is nothing in the first file there will be nothing for
# every file
if '/data' in store.keys():
# We have data to retrieve
headings = store['data'].columns.tolist()
store.close()
df = pandas.DataFrame(columns=headings)
for sfile in stats_files_list:
store = pandas.HDFStore(sfile, 'r')
df = df.append(store['data'])
store.close()
df.reset_index(inplace=True)
# Write to disk
combined_store.append('data', df)
with self.output().open('w') as outf:
outf.write('Completed')
class RunCombineCellStatsTasks(luigi.Task):
"""
Issues CombineCellStatsTask's to each cell associated
with the tile defined by the start and end index.
"""
idx1 = luigi.IntParameter()
idx2 = luigi.IntParameter()
def requires(self):
base_out_dir = CONFIG.get('work', 'output_directory')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells[self.idx1:self.idx2]:
out_dir = pjoin(base_out_dir, cell)
tasks.append(CombineCellStatsTask(out_dir))
return tasks
def output(self):
out_dir = CONFIG.get('work', 'output_directory')
out_fname = pjoin(out_dir,
'RunCombineCellStatsTasks_{}:{}.completed')
out_fname = out_fname.format(self.idx1, self.idx2)
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
# Setup command-line arguments
desc = "Processes zonal stats for a given set of cells."
hlp = ("The tile/chunk index to retieve from the tiles list. "
"(Needs to have been previously computed to a file named tiles.pkl")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--tile', type=int, help=hlp)
parsed_args = parser.parse_args()
tile_idx = parsed_args.tile
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/stats_workflow_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Get the list of tiles (groups of cells that each node will operate on
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'),
CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'r') as in_file:
tiles = pickle.load(in_file)
# Initialise the job
tile = tiles[tile_idx]
tasks = [RunCombineCellStatsTasks(tile[0], tile[1])]
luigi.build(tasks, local_scheduler=True, workers=16)
luigi.run()
#!/usr/bin/env python
import os
from os.path import join as pjoin, dirname, basename
import cPickle as pickle
import argparse
import luigi
import numpy
import pandas
import ogr
from eotools.vector import retrieve_attribute_table
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
def combine_all_cells():
# Config params
base_out_dir = CONFIG.get('work', 'output_directory')
base_out_fname = CONFIG.get('outputs', 'final_output_filename')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
combined_cell_stats_fname = CONFIG.get('outputs',
'combined_cell_stats_filename')
vector_fname = CONFIG.get('work', 'vector_filename')
ngroups = int(CONFIG.get('internals', 'pandas_groups'))
chunksize = int(CONFIG.get('internals', 'pandas_chunksize'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
headings = ['SID', 'Timestamp', 'Band', 'Observed_Count', 'Min', 'Max',
'Sum', 'Sum_of_Squares']
items = ['Timestamp', 'SID', 'Band']
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5')
tmp1_store = pandas.HDFStore(tmp1_fname)
for cell in cells:
cell_dir = pjoin(base_out_dir, cell)
cell_stats_fname = pjoin(cell_dir, combined_cell_stats_fname)
# Open the current cells WC result file
store = pandas.HDFStore(cell_stats_fname, 'r')
if '/data' in store.keys():
# We have data to retrieve
df = store['data']
df.drop('index', 1, inplace=True)
tmp1_store.append('data', df, index=False)
tmp1_store.flush()
store.close()
tmp1_store.close()
# Chunking method
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314
# We need to group as many records we can into more memory manageable
# chunks, that also balance well for I/O
tmp1_store = pandas.HDFStore(tmp1_fname)
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5')
tmp2_store = pandas.HDFStore(tmp2_fname)
for chunk in tmp1_store.select('data', chunksize=chunksize):
g = chunk.groupby(chunk['SID'] % ngroups)
for grp, grouped in g:
tmp2_store.append('group_{}'.format(int(grp)), grouped,
data_columns=headings, index=False)
tmp2_store.flush()
tmp1_store.close()
tmp2_store.close()
# Define the output file
out_fname = pjoin(base_out_dir, base_out_fname)
combined_store = pandas.HDFStore(out_fname, 'w')
new_headings = ['FID', 'Timestamp', 'Band', 'Observed_Count', 'Min', 'Max',
'Sum', 'Sum_of_Squares', 'Mean', 'Variance', 'StdDev']
# Now read the grouped data and write to disk
tmp2_store = pandas.HDFStore(tmp2_fname)
for key in tmp2_store.keys():
grouped = tmp2_store[key].groupby(items, as_index=False)
df = grouped.agg({'Observed_Count': numpy.sum, 'Min': numpy.min,
'Max': numpy.max, 'Sum': numpy.sum,
'Sum_of_Squares': numpy.sum})
# Account for the offset between the feature and segment ID's
df['SID'] = df['SID'] - 1
# Change the segment id column name to feature id
df.rename(columns={'SID': 'FID'}, inplace=True)
# Calculate the mean, variance, stddev
df['Mean'] = df['Sum'] / df['Observed_Count']
df['Variance'] = ((df['Sum_of_Squares'] - (df['Observed_Count'] *
df['Mean']**2)) /
(df['Observed_Count'] - 1))
df['StdDev'] = numpy.sqrt(df['Variance'].values)
# Write the group to disk
combined_store.append('data', df, data_columns=new_headings)
combined_store.flush()
# Add metadata
metadata_group = 'Metadata'
metadata = {'Vector_Filename': basename(vector_fname)}
metadata = pandas.DataFrame(metadata, index=[0])
combined_store[metadata_group] = metadata
# Add the vector attribute table
vec_ds = ogr.Open(vector_fname)
lyr = vec_ds.GetLayer()
attribute_table = retrieve_attribute_table(lyr)
combined_store['attribute_table'] = attribute_table
# Save and close the file
combined_store.close()
tmp1_store.close()
tmp2_store.close()
# Clean up temporary files
os.remove(tmp1_fname)
os.remove(tmp2_fname)
def combine_all_cells_distribution():
"""
"""
# Config params
base_out_dir = CONFIG.get('work', 'output_directory')
base_out_fname = CONFIG.get('outputs', 'final_output_filename')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
combined_cell_stats_fname = CONFIG.get('outputs',
'combined_cell_stats_filename')
vector_fname = CONFIG.get('work', 'vector_filename')
ngroups = int(CONFIG.get('internals', 'pandas_groups'))
chunksize = int(CONFIG.get('internals', 'pandas_chunksize'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
# 1st stage, combine all the results from each cell into a single file
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5')
tmp1_store = pandas.HDFStore(tmp1_fname, 'w')
for cell in cells:
cell_dir = pjoin(base_out_dir, cell)
cell_stats_fname = pjoin(cell_dir, combined_cell_stats_fname)
# Open the current cells WC result file
store = pandas.HDFStore(cell_stats_fname, 'r')
if '/data' in store.keys():
# We have data to retrieve
df = store['data']
headings = df.columns.values.tolist()
df.drop('index', 1, inplace=True)
tmp1_store.append('data', df, index=False)
tmp1_store.flush()
store.close()
tmp1_store.close()
# Chunking method
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314
# We need to group as many records we can into more memory manageable
# chunks, that also balance well for I/O
tmp1_store = pandas.HDFStore(tmp1_fname)
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5')
tmp2_store = pandas.HDFStore(tmp2_fname, 'w')
for chunk in tmp1_store.select('data', chunksize=chunksize):
g = chunk.groupby(chunk['SID'] % ngroups)
for grp, grouped in g:
tmp2_store.append('group_{}'.format(int(grp)), grouped,
data_columns=headings, index=False)
tmp2_store.flush()
tmp1_store.close()
tmp2_store.close()
# TODO We need a generic way of allowing a user to insert custom
# classification headings as opposed to the numeric code
# without begin specific like we did for WOfS.
new_headings = ['FID' if x == 'SID' else x for x in headings]
items = ['Timestamp', 'SID']
# Define the output file
out_fname = pjoin(base_out_dir, base_out_fname)
combined_store = pandas.HDFStore(out_fname, 'w')
# Now read the grouped data and write to disk
tmp2_store = pandas.HDFStore(tmp2_fname)
for key in tmp2_store.keys():
df = tmp2_store[key].groupby(items, as_index=False).sum()
# Change the segment id column name to feature id
df.rename(columns={'SID': 'FID'}, inplace=True)
combined_store.append('data', df, data_columns=new_headings)
combined_store.flush()
# Save and close the file
combined_store.close()
tmp1_store.close()
tmp2_store.close()
# Clean up temporary files
os.remove(tmp1_fname)
os.remove(tmp2_fname)
if __name__ == '__main__':
# combine_all_cells()
combine_all_cells_distribution()
[work]
output_directory = /g/data/v10/testing_ground/jps547/pilbara/results
logs_directory = %(output_directory)s/logs
satellites = LS5,LS7,LS8
min_date = 1985_01_01
max_date = 2015_12_31
vector_filename = /g/data/v10/testing_ground/jps547/pilbara/CaneRiver_Nanutarra_MtMinnie_Peedamulla_RedHill_Yarraloola_Merge_WGS84_LandSystemType.shp
[internals]
envelope = 0
cells_per_node = 1
pandas_groups = 10
pandas_chunksize = 1000000
water_directory = /g/data/u46/wofs/water_f7s/extents
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp
[outputs]
cells_list = cells_to_process.pkl
tiles_list = tiles.pkl
pbs_filename = zonal_stats_pbsdsh.bash
query_filename = datasets_list.pkl
stats_filename_format = stats_result_{}.h5
combined_cell_stats_filename = combined_cell_stats.h5
rasterise_filename = rasterised_result.tif
final_output_filename = pilbara_zonal_stats_results.h5
groups_filename_format = tmp_group_{}.h5
#[core]
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg
[pbs]
project = v10
queue = normal
walltime = 05:00:00
email = joshua.sixsmith@ga.gov.au
modules = python/2.7.6,gdal/1.11.1-python,agdc-api,pandas,IDL_functions/test,EOtools/test,gaip/test
"""
Data access functions
---------------------
"""
import rasterio
from rasterio import crs
from rasterio.warp import RESAMPLING
def write_img(array, filename, fmt='ENVI', geobox=None, nodata=None, compress=None, tags=None):
"""
Writes a 2D/3D image to disk using rasterio.
:param array:
A 2D/3D NumPy array.
:param filename:
A string containing the output file name.
:param fmt:
A string containing a GDAL compliant image fmt. Default is
'ENVI'.
:param geobox:
An instance of a GriddedGeoBox object.
:param nodata:
A value representing the no data value for the array.
:param compress:
A compression algorithm name (e.g. 'lzw').
:param tags:
A dictionary of dataset-level metadata.
"""
# Get the datatype of the array
dtype = array.dtype.name
# Check for excluded datatypes
excluded_dtypes = ['int64', 'int8', 'uint64']
if dtype in excluded_dtypes:
msg = "Datatype not supported: {dt}".format(dt=dtype)
raise TypeError(msg)
ndims = array.ndim
dims = array.shape
# Get the (z, y, x) dimensions (assuming BSQ interleave)
if ndims == 2:
samples = dims[1]
lines = dims[0]
bands = 1
elif ndims == 3:
samples = dims[2]
lines = dims[1]
bands = dims[0]
else:
print 'Input array is not of 2 or 3 dimensions!!!'
err = 'Array dimensions: {dims}'.format(dims=ndims)
raise IndexError(err)
# If we have a geobox, then retrieve the geotransform and projection
if geobox is not None:
transform = geobox.affine
projection = bytes(geobox.crs.ExportToWkt())
else:
transform = None
projection = None
kwargs = {'count': bands,
'width': samples,
'height': lines,
'crs': projection,
'transform': transform,
'dtype': dtype,
'driver': fmt,
'nodata': nodata}
if fmt == 'GTiff' and compress is not None:
kwargs.update({'compress': compress})
with rasterio.open(filename, 'w', **kwargs) as outds:
if bands == 1:
outds.write_band(1, array)
else:
for i in range(bands):
outds.write_band(i + 1, array[i])
if tags is not None:
outds.update_tags(**tags)
#!/usr/bin/env python
import luigi
from datetime import date
import os
from os.path import join as pjoin, exists, dirname
import cPickle as pickle
import subprocess
import logging
import ogr
from datacube.api.query import list_tiles_as_list
from datacube.api.model import DatasetType, Satellite, BANDS
from datacube.config import Config
from eotools.vector import spatial_intersection
import pdb
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
PBS_DSH = (
"""#!/bin/bash
#PBS -P {project}
#PBS -q {queue}
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB
#PBS -l wd
#PBS -me
#PBS -M {email}
NNODES={nnodes}
for i in $(seq 1 $NNODES); do
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \
--tile $[$i - 1]" &
done;
wait
""")
def get_cells():
"""
A simple procedure to generate a list which cells that will
be analysed throughout the workflow.
"""
out_dir = CONFIG.get('work', 'output_directory')
envelope = bool(CONFIG.get('internals', 'envelope'))
vector_fname = CONFIG.get('work', 'vector_filename')
dcube_vector_fname = CONFIG.get('internals', 'cell_grid')
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list'))
fids = spatial_intersection(dcube_vector_fname, vector_fname,
envelope=envelope)
vec_ds = ogr.Open(dcube_vector_fname)
layer = vec_ds.GetLayer(0)
cell_dirs = []
cells = []
for fid in fids:
feat = layer.GetFeature(fid)
xmin = int(feat.GetField("XMIN"))
ymin = int(feat.GetField("YMIN"))
#cell_dir = "{:03.0f}_{:04.0f}".format(xmin, ymin)
cell_dir = "{}_{}".format(xmin, ymin)
cell_dirs.append(cell_dir)
cells.append((xmin, ymin))
out_cell_dir = pjoin(out_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
with open(out_fname, 'w') as outf:
pickle.dump(cell_dirs, outf)
return cells
def query_cells(cell_list, satellites, min_date, max_date, dataset_type,
output_dir):
"""
"""
config = Config()
base_out_fname = CONFIG.get('outputs', 'query_filename')
for cell in cell_list:
x_cell = [int(cell[0])]
y_cell = [int(cell[1])]
tiles = list_tiles_as_list(x=x_cell, y=y_cell, acq_min=min_date,
acq_max=max_date, datasets=dataset_type,
satellites=satellites,
database=config.get_db_database(),
user=config.get_db_username(),
password=config.get_db_password(),
host=config.get_db_host(),
port=config.get_db_port())
out_dir = pjoin(output_dir, '{}_{}'.format(cell[0], cell[1]))
out_fname = pjoin(out_dir, base_out_fname)
with open(out_fname, 'w') as outf:
pickle.dump(tiles, outf)
def create_tiles(array_size, tile_size=25):
"""
A minor function to tile a 1D array or list into smaller manageable
portions.
"""
idx_start = range(0, array_size, tile_size)
idx_tiles = []
for idx_st in idx_start:
if idx_st + tile_size < array_size:
idx_end = idx_st + tile_size
else:
idx_end = array_size
idx_tiles.append((idx_st, idx_end))
return idx_tiles
if __name__ == '__main__':
# Create the output directory
out_dir = CONFIG.get('work', 'output_directory')
if not exists(out_dir):
os.makedirs(out_dir)
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/query_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Just turn these into a simple function calls. In this framework we don't
# need a luigi workload for a simple single task
cells = get_cells()
# Define the fractional cover dataset
# TODO have this defined through the config.cfg
fc_ds_type = DatasetType.FC25
# Get the satellites we wish to query
satellites = CONFIG.get('work', 'satellites')
satellites = [Satellite(i) for i in satellites.split(',')]
# Get the min/max date range to query
min_date = CONFIG.get('work', 'min_date')
min_date = [int(i) for i in min_date.split('_')]
min_date = date(min_date[0], min_date[1], min_date[2])
max_date = CONFIG.get('work', 'max_date')
max_date = [int(i) for i in max_date.split('_')]
max_date = date(max_date[0], max_date[1], max_date[2])
output_dir = CONFIG.get('work', 'output_directory')
query_cells(cells, satellites, min_date, max_date, fc_ds_type, output_dir)
# Create the tiles list that contains groups of cells to operate on
# Each node will have a certain number of cells to work on
cpnode = int(CONFIG.get('internals', 'cells_per_node'))
tiles = create_tiles(len(cells), cpnode)
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'w') as out_file:
pickle.dump(tiles, out_file)
# Setup the modules to use for the job
modules = CONFIG.get('pbs', 'modules').split(',')
modules = ['module load {}; '.format(module) for module in modules]
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;')
modules = ''.join(modules)
# Calculate the job node and cpu requirements
nnodes = len(tiles)
ncpus = nnodes * 16
mem = nnodes * 32
# Populate the PBS shell script
project = CONFIG.get('pbs', 'project')
queue = CONFIG.get('pbs', 'queue')
walltime = CONFIG.get('pbs', 'walltime')
email = CONFIG.get('pbs', 'email')
py_file = pjoin(dirname(__file__), 'workflow.py')
pbs_job = PBS_DSH.format(project=project, queue=queue,
walltime=walltime, ncpus=ncpus, mem=mem,
email=email, nnodes=nnodes,
modules=modules, pyfile=py_file)
# Out put the shell script to disk
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename'))
with open(pbs_fname, 'w') as out_file:
out_file.write(pbs_job)
#subprocess.check_call(['qsub', pbs_fname])
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=05:00:00,mem=16GB,ncpus=1
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load gdal/1.10.1
module load IDL_functions/test
module load gaip/test
module load agdc-api
module load pandas
PY_FILE=/home/547/jps547/git_repositories/sixy6e/9ea690493970eb12a237/combine_stats.py
python $PY_FILE
#!/usr/bin/env python
import luigi
from datetime import date
import os
from os.path import join as pjoin, exists, dirname, abspath
import cPickle as pickle
import subprocess
import logging
import ogr
from datacube.api.query import list_tiles_as_list
from datacube.api.model import DatasetType, Satellite, BANDS
from datacube.config import Config
from eotools.vector import spatial_intersection
import pdb
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
PBS_DSH = (
"""#!/bin/bash
#PBS -P {project}
#PBS -q {queue}
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB
#PBS -l wd
#PBS -me
#PBS -M {email}
NNODES={nnodes}
for i in $(seq 1 $NNODES); do
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \
--tile $[$i - 1]" &
done;
wait
""")
def get_cells():
"""
A simple procedure to generate a list which cells that will
be analysed throughout the workflow.
"""
out_dir = CONFIG.get('work', 'output_directory')
envelope = bool(int(CONFIG.get('internals', 'envelope')))
vector_fname = CONFIG.get('work', 'vector_filename')
dcube_vector_fname = CONFIG.get('internals', 'cell_grid')
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list'))
fids = spatial_intersection(vector_fname, dcube_vector_fname,
envelope=envelope)
print fids
vec_ds = ogr.Open(dcube_vector_fname)
layer = vec_ds.GetLayer(0)
cell_dirs = []
cells = []
for fid in fids:
feat = layer.GetFeature(fid)
xmin = int(feat.GetField("XMIN"))
ymin = int(feat.GetField("YMIN"))
#cell_dir = "{:03.0f}_{:04.0f}".format(xmin, ymin)
cell_dir = "{}_{}".format(xmin, ymin)
cell_dirs.append(cell_dir)
cells.append((xmin, ymin))
out_cell_dir = pjoin(out_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
with open(out_fname, 'w') as outf:
pickle.dump(cell_dirs, outf)
return cells
def query_cells(cell_list, satellites, min_date, max_date, dataset_types,
output_dir):
"""
"""
config = Config()
base_out_fname = CONFIG.get('outputs', 'query_filename')
for cell in cell_list:
x_cell = [int(cell[0])]
y_cell = [int(cell[1])]
tiles = list_tiles_as_list(x=x_cell, y=y_cell, acq_min=min_date,
acq_max=max_date,
dataset_types=dataset_types,
satellites=satellites)
out_dir = pjoin(output_dir, '{}_{}'.format(cell[0], cell[1]))
out_fname = pjoin(out_dir, base_out_fname)
with open(out_fname, 'w') as outf:
pickle.dump(tiles, outf)
def create_tiles(array_size, tile_size=25):
"""
A minor function to tile a 1D array or list into smaller manageable
portions.
"""
idx_start = range(0, array_size, tile_size)
idx_tiles = []
for idx_st in idx_start:
if idx_st + tile_size < array_size:
idx_end = idx_st + tile_size
else:
idx_end = array_size
idx_tiles.append((idx_st, idx_end))
return idx_tiles
if __name__ == '__main__':
# Create the output directory
out_dir = CONFIG.get('work', 'output_directory')
if not exists(out_dir):
os.makedirs(out_dir)
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/query_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Just turn these into a simple function calls. In this framework we don't
# need a luigi workload for a simple single task
cells = get_cells()
# Define the fractional cover dataset
# TODO have this defined through the config.cfg
ds_types = [DatasetType.ARG25, DatasetType.PQ25]
# Get the satellites we wish to query
satellites = CONFIG.get('work', 'satellites')
satellites = [Satellite(i) for i in satellites.split(',')]
# Get the min/max date range to query
min_date = CONFIG.get('work', 'min_date')
min_date = [int(i) for i in min_date.split('_')]
min_date = date(min_date[0], min_date[1], min_date[2])
max_date = CONFIG.get('work', 'max_date')
max_date = [int(i) for i in max_date.split('_')]
max_date = date(max_date[0], max_date[1], max_date[2])
output_dir = CONFIG.get('work', 'output_directory')
query_cells(cells, satellites, min_date, max_date, ds_types, output_dir)
# Create the tiles list that contains groups of cells to operate on
# Each node will have a certain number of cells to work on
cpnode = int(CONFIG.get('internals', 'cells_per_node'))
tiles = create_tiles(len(cells), cpnode)
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'w') as out_file:
pickle.dump(tiles, out_file)
# Setup the modules to use for the job
modules = CONFIG.get('pbs', 'modules').split(',')
modules = ['module load {}; '.format(module) for module in modules]
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;')
modules = ''.join(modules)
# Calculate the job node and cpu requirements
nnodes = len(tiles)
ncpus = nnodes * 16
mem = nnodes * 32
# Populate the PBS shell script
project = CONFIG.get('pbs', 'project')
queue = CONFIG.get('pbs', 'queue')
walltime = CONFIG.get('pbs', 'walltime')
email = CONFIG.get('pbs', 'email')
py_file = pjoin(dirname(abspath(__file__)), 'classifier_workflow.py')
pbs_job = PBS_DSH.format(project=project, queue=queue,
walltime=walltime, ncpus=ncpus, mem=mem,
email=email, nnodes=nnodes,
modules=modules, pyfile=py_file)
# Out put the shell script to disk
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename'))
with open(pbs_fname, 'w') as out_file:
out_file.write(pbs_job)
#subprocess.check_call(['qsub', pbs_fname])
#!/usr/bin/env python
import luigi
import os
from os.path import join as pjoin, exists, dirname
import cPickle as pickle
import glob
import argparse
import logging
import pandas
import rasterio
from datacube.api.model import DatasetType
from zonal_stats import zonal_stats
from image_processing.segmentation import rasterise_vector
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
class RasteriseTask(luigi.Task):
"""
Computes the rasterisation for a cell.
"""
out_dir = luigi.Parameter()
def requires(self):
return []
def output(self):
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'rasterise_filename'))
ds_list_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
vector_fname = CONFIG.get('work', 'vector_filename')
img_fname = ds_list[0].datasets[DatasetType.FC25].path
with rasterio.open(img_fname) as src:
crs = src.crs
transform = src.affine
height = src.height
width = src.width
res = rasterise_vector(vector_fname, shape=(height, width),
transform=transform, crs=crs)
kwargs = {'count': 1,
'width': width,
'height': height,
'crs': crs,
'transform': transform,
'dtype': res.dtype.name,
'nodata': 0}
with rasterio.open(out_fname, 'w', **kwargs) as src:
src.write(1, res)
# We could just set the image as the Luigi completion target...
with self.output().open('w') as outf:
outf.write('Complete')
class StatsTask(luigi.Task):
"""
Computes a stats task for the required dataset.
"""
idx = luigi.IntParameter()
out_fname = luigi.Parameter()
def requires(self):
return [RasteriseTask(dirname(self.out_fname))]
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
rasterised_fname = pjoin(dirname(self.out_fname),
CONFIG.get('outputs', 'rasterise_filename'))
ds_list_fname = pjoin(dirname(self.out_fname),
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
dataset = ds_list[self.idx]
dataset_type = DatasetType.FC25
result = zonal_stats(dataset, rasterised_fname, dataset_type)
# Open the output hdf5 file
store = pandas.HDFStore(self.output().path)
# Write the dataframe
store.append('data', result)
# Save and close the file
store.close()
class CellStatsTask(luigi.Task):
"""
For a given cell define a stats task for each required DatasetType.
eg For each FC file.
"""
out_dir = luigi.Parameter()
def requires(self):
base_name = CONFIG.get('outputs', 'stats_filename_format')
base_name = pjoin(self.out_dir, base_name)
ds_list_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'query_filename'))
with open(ds_list_fname, 'r') as infile:
ds_list = pickle.load(infile)
targets = []
for idx, ds in enumerate(ds_list):
timestamp = bytes(ds.start_datetime).replace(' ', '-')
out_fname = base_name.format(timestamp)
targets.append(StatsTask(idx, out_fname))
return targets
def output(self):
out_fname = pjoin(self.out_dir, 'CellStatsTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class CombineCellStatsTask(luigi.Task):
"""
Combines all stats files from a single cell into a single file.
"""
out_dir = luigi.Parameter()
def requires(self):
return [CellStatsTask(self.out_dir)]
def output(self):
out_fname = pjoin(self.out_dir, 'CombineCellStatsTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
# Get a list of the stats files for each timeslice
stats_files_list = glob.glob(pjoin(self.out_dir, '*.h5'))
# Create an output file that we can continually append data
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs',
'combined_cell_stats_filename'))
combined_store = pandas.HDFStore(out_fname)
store = pandas.HDFStore(stats_files_list[0])
# If there is nothing in the first file there will be nothing for
# every file
if '/data' in store.keys():
# We have data to retrieve
headings = store['data'].columns.tolist()
store.close()
df = pandas.DataFrame(columns=headings)
for sfile in stats_files_list:
store = pandas.HDFStore(sfile, 'r')
df = df.append(store['data'])
store.close()
df.reset_index(inplace=True)
# Write to disk
combined_store.append('data', df)
with self.output().open('w') as outf:
outf.write('Completed')
class RunCombineCellStatsTasks(luigi.Task):
"""
Issues CombineCellStatsTask's to each cell associated
with the tile defined by the start and end index.
"""
idx1 = luigi.IntParameter()
idx2 = luigi.IntParameter()
def requires(self):
base_out_dir = CONFIG.get('work', 'output_directory')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells[self.idx1:self.idx2]:
out_dir = pjoin(base_out_dir, cell)
tasks.append(CombineCellStatsTask(out_dir))
return tasks
def output(self):
out_dir = CONFIG.get('work', 'output_directory')
out_fname = pjoin(out_dir,
'RunCombineCellStatsTasks_{}:{}.completed')
out_fname = out_fname.format(self.idx1, self.idx2)
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
# Setup command-line arguments
desc = "Processes zonal stats for a given set of cells."
hlp = ("The tile/chunk index to retieve from the tiles list. "
"(Needs to have been previously computed to a file named tiles.pkl")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--tile', type=int, help=hlp)
parsed_args = parser.parse_args()
tile_idx = parsed_args.tile
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/stats_workflow_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Get the list of tiles (groups of cells that each node will operate on
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'),
CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'r') as in_file:
tiles = pickle.load(in_file)
# Initialise the job
tile = tiles[tile_idx]
tasks = [RunCombineCellStatsTasks(tile[0], tile[1])]
luigi.build(tasks, local_scheduler=True, workers=16)
luigi.run()
#!/usr/bin/env python
import numpy
import gdal
import pandas
from datacube.api.model import DatasetType
from datacube.api.utils import get_dataset_data_with_pq
from eotools.drivers.stacked_dataset import StackedDataset
from eotools.geobox import GriddedGeoBox
from image_processing.segmentation import Segments
from idl_functions import histogram
def zonal_stats(dataset, rasterised_fname, dataset_type):
"""
Computes the Observed Count, Min, Max, Sum and Sum of Squares for
the segments defined by the rasterised image. The stats are
derived from the `dataset` defined by the `dataset_type`.
:param dataset:
A class of type `Dataset`.
:param rasterised_fname:
A string containing the full file pathname of an image
containing the rasterised features. These features will be
interpreted as segments.
:param dataset_type:
A class of type `DatasetType`.
:return:
A `pandas.DataFrame` containing the statistics for each segment
and for each raster band contained with the `dataset_type`.
"""
# Initialiase a blank dataframe
headings = ["SID", "Timestamp", "Band", "Observed_Count", "Min", "Max",
"Sum", "Sum_of_Squares"]
df = pandas.DataFrame(columns=headings, dtype=numpy.float)
# Read the rasterised image
sd = StackedDataset(rasterised_fname)
img = sd.read_raster_band()
# Initialise the segment visitor
seg_vis = Segments(img)
# Do we have any data to analyse???
if seg_vis.n_segments == 0:
return df
# We need to get the PQ data and the DatasetType of interest
pq_ds = dataset.datasets[DatasetType.PQ25]
ds = dataset.datasets[dataset_type]
timestamp = dataset.start_datetime
bands = ds.bands
no_data = -999
for band in bands:
# When the api has a release of get_pq_mask this will have to do
# It'll re-compute the PQ every time which is not ideal
# Otherwise go back to eotools???
ds_data = (get_dataset_data_with_pq(ds, pq_ds, bands=[band],
ndv=no_data)[band]).astype('float')
# Set no-data to NaN
ds_data[ds_data == no_data] = numpy.nan
# Loop over each segment and get the data.
# In other instances we may just need the locations
for seg_id in seg_vis.segment_ids:
data = seg_vis.data(ds_data, segment_id=seg_id)
# dimensions of the data which will be 1D
dim = data.shape
# Returns are 1D arrays, so check if we have an empty array
if dim[0] == 0:
continue # Empty bin, (no data), skipping
# Compute the stats
count = numpy.sum(numpy.isfinite(data))
sum_ = numpy.nansum(data)
sum_sq = numpy.nansum(data**2)
min_ = numpy.nanmin(data)
max_ = numpy.nanmax(data)
format_dict = {"SID": seg_id,
"Timestamp": timestamp,
"Band": band.name,
"Observed_Count": count,
"Min": min_,
"Max": max_,
"Sum": sum_,
"Sum_of_Squares": sum_sq}
# Append the stat to the data frame
df = df.append(format_dict, ignore_index=True)
return df
def zonal_class_distribution(classified_image, zonal_image, class_ids=None):
"""
Calculates the classification distribution for each zone.
"""
# Initialise the segment visitor
seg_vis = Segments(zonal_image)
# Define the min/max class distribution if we didn't receive any class ID's
if class_ids is None:
min_class = 0
max_class = numpy.max(classified_image)
# if max_class == min_class:
# We have a completely unclassified image
# TODO Should we do anything???
# If someone is continually calling this routine to build up
# a table of results for numerous images based on the same
# classification schema/algorithm, then idealy they should provide
# a list of class id's, otherwise there could be mis-matching
# histograms???
else:
min_class = numpy.min(class_ids)
max_class = numpy.max(class_ids)
# Initialise the dict to store our results
results = {}
# base class name
cname_format = 'Class_{}'
# Retrieve the class distributions for each segment/zone
for zid in seg_vis.segment_ids:
data = seg_vis.data(classified_image, segment_id=zid)
# Skip invalid segments
if data.size == 0:
continue
results[zid] = histogram(data, minv=min_class,
maxv=max_class)['histogram']
results = pandas.DataFrame(results).transpose()
results['SID'] = results.index.values
results['PixelCount'] = seg_vis.histogram[results.index.values]
# convert the class names from ints to strings
for col in results.columns:
if type(col) != bytes:
cname = cname_format.format(col)
results.rename(columns={col: cname}, inplace=True)
return results
[work]
output_directory = /g/data/v10/testing_ground/jps547/pilbara/results
logs_directory = %(output_directory)s/logs
satellites = LS5,LS7,LS8
min_date = 1985_01_01
max_date = 2015_12_31
vector_filename = /g/data/v10/testing_ground/jps547/pilbara/CaneRiver_Nanutarra_MtMinnie_Peedamulla_RedHill_Yarraloola_Merge_WGS84_LandSystemType.shp
[internals]
envelope = 0
cells_per_node = 1
pandas_groups = 10
pandas_chunksize = 1000000
water_directory = /g/data/u46/wofs/water_f7s/extents
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp
[outputs]
cells_list = cells_to_process.pkl
tiles_list = tiles.pkl
pbs_filename = zonal_stats_pbsdsh.bash
query_filename = datasets_list.pkl
stats_filename_format = stats_result_{}.h5
combined_cell_stats_filename = combined_cell_stats.h5
rasterise_filename = rasterised_result.tif
final_output_filename = pilbara_zonal_stats_results.h5
groups_filename_format = tmp_group_{}.h5
#[core]
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg
[pbs]
project = v10
queue = normal
walltime = 05:00:00
email = joshua.sixsmith@ga.gov.au
modules = python/2.7.6,gdal/1.11.1-python,agdc-api,pandas,IDL_functions/test,EOtools/test,gaip/test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment