Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active August 29, 2015 14:15
Show Gist options
  • Save sixy6e/cd7964d52928e22a056e to your computer and use it in GitHub Desktop.
Save sixy6e/cd7964d52928e22a056e to your computer and use it in GitHub Desktop.
Water body characterization across time
#!/usr/bin/env python
import os
from os.path import join as pjoin, dirname, basename
import cPickle as pickle
import argparse
import numpy
import luigi
import pandas
from datetime import datetime as dt
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
def combine_all_cells():
# Config params
id_name = CONFIG.get('work', 'feature_name')
base_out_dir = CONFIG.get('work', 'output_directory')
base_out_fname = CONFIG.get('outputs', 'final_output_filename')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
combined_cell_wc_fname = CONFIG.get('outputs',
'combined_cell_wc_filename')
vector_fname = CONFIG.get('work', 'vector_filename')
ngroups = int(CONFIG.get('internals', 'pandas_groups'))
chunksize = int(CONFIG.get('internals', 'pandas_chunksize'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
headings = [id_name, "Timestamp", "Total_Pixel_Count", "WATER_NOT_PRESENT",
"NO_DATA", "MASKED_NO_CONTIGUITY",
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW",
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW",
"MASKED_CLOUD", "WATER_PRESENT"]
items = ['Timestamp', id_name]
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5')
tmp1_store = pandas.HDFStore(tmp1_fname)
# TODO Need to remove the index column before we write the first
# combined file
for cell in cells:
cell_dir = pjoin(base_out_dir, cell)
cell_wc_fname = pjoin(cell_dir, combined_cell_wc_fname)
# Open the current cells WC result file
store = pandas.HDFStore(cell_wc_fname, 'r')
if '/data' in store.keys():
# We have data to retrieve
df = store['data']
df.drop('index', 1, inplace=True)
tmp1_store.append('data', df, index=False)
tmp1_store.flush()
store.close()
tmp1_store.close()
# Another method
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314
# We need to group as many records we can into more memory manageable
# chunks, that also balance well for I/O
tmp1_store = pandas.HDFStore(tmp1_fname)
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5')
tmp2_store = pandas.HDFStore(tmp2_fname)
for chunk in tmp1_store.select('data', chunksize=chunksize):
g = chunk.groupby(chunk[id_name] % ngroups)
for grp, grouped in g:
tmp2_store.append('group_{}'.format(int(grp)), grouped,
data_columns=headings, index=False)
tmp2_store.flush()
tmp1_store.close()
tmp2_store.close()
# Define the output file
out_fname = pjoin(base_out_dir, base_out_fname)
combined_store = pandas.HDFStore(out_fname, 'w')
# Now read the grouped data and write to disk
tmp2_store = pandas.HDFStore(tmp2_fname)
for key in tmp2_store.keys():
df = tmp2_store[key].groupby(items, as_index=False).sum()
combined_store.append('data', df, data_columns=headings)
combined_store.flush()
# Add metadata
metadata_group = '/Metadata'
metadata = {'Vector Filename': basename(vector_fname)}
metadata = pandas.DataFrame(metadata, index=[0])
combined_store[metadata_group] = metadata
# Save and close the file
combined_store.close()
tmp1_store.close()
tmp2_store.close()
# Clean up temporary files
os.remove(tmp1_fname)
os.remove(tmp2_fname)
if __name__ == '__main__':
combine_all_cells()
[work]
output_directory = /g/data/v10/testing_ground/jps547/water_characterisation/FullRun2
logs_directory = %(output_directory)s/logs
vector_filename = /g/data/v10/testing_ground/jps547/water_characterisation/Waterbodies.shp
feature_name = AUSHYDRO_ID
[internals]
envelope = 0
pattern = *.tif
cells_per_node = 10
pandas_groups = 10
pandas_chunksize = 1000000
water_directory = /g/data/u46/wofs/water_f7s/extents
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp
[outputs]
cells_list = cells_to_process.pkl
tiles_list = tiles.pkl
pbs_filename = run_wc_pbsdsh.bash
water_files_list = water_files.pkl
water_characterisation_out_format = wc_result_{}.h5
combined_cell_wc_filename = combined_cell_wc_results.h5
rasterise_filename = rasterised_result.tif
final_output_filename = water_characterisation_results.h5
groups_filename_format = tmp_group_{}.h5
#[core]
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg
[pbs]
project = v10
queue = normal
walltime = 05:00:00
email = joshua.sixsmith@ga.gov.au
modules = python/2.7.6,gdal/1.10.1,agdc-api,pandas,IDL_functions/0.2,gaip/test
[loggers]
keys=root
[handlers]
keys=consoleHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=ERROR
handlers=consoleHandler
[handler_consoleHandler]
class=StreamHandler
level=ERROR
formatter=simpleFormatter
args=(sys.stderr,)
[formatter_simpleFormatter]
format=%(levelname)s: %(message)s
#!/usr/bin/env python
import os
import glob
import sys
import logging
import argparse
# Debugging
import datetime
import pdb
import numpy
from osgeo import gdal
from osgeo import ogr
import pandas
# ga-neo-nfrip repo
from WaterExtent import WaterExtent
from fileSystem import Directory
# IDL_functions repo
from IDL_functions import histogram
# image_processing repo
from image_processing.segmentation.rasterise import Rasterise
from image_processing.segmentation.segmentation import SegmentVisitor
def getFiles(path, pattern):
"""
Just an internal function to find files given an file extension.
This isn't really designed to go beyond development demonstration
for this analytical workflow.
"""
# Get the current directory so we can change back later
CWD = os.getcwd()
os.chdir(path)
# Find the files matching the pattern
files = glob.glob(pattern)
files = [os.path.abspath(i) for i in files]
# Change back to the original directory
os.chdir(CWD)
return files
def getWaterExtents(file_list, sort=True):
"""
Given a list of water extent image files, create a list of
waterExtent class objects, and if sort by time, old -> new, if
sort=True.
:param file_list:
A list containing filepath names to water extent image files.
:param sort:
A boolean keyword indicating if the waterExtent objects
should be sorted before they're returned. Default is True.
:return:
A list of waterExtent class objects, one for every water
image file in file_list, and optionally sorted by date,
old -> new.
"""
waterExtents = []
cellId = None
for f in file_list:
waterExtent = WaterExtent(f)
# check for lon, lat consistency
if cellId:
thisCellId = [waterExtent.lon, waterExtent.lat]
if thisCellId != cellId:
msg = ("Extents must be from same cell. At file {} got {}, "
"expecting {}")
msg = msg.format(f, thisCellId, cellId)
logging.error(msg)
sys.exit(1)
else:
cellId = [waterExtent.lon, waterExtent.lat]
waterExtents.append(waterExtent)
if sort:
# all good, so now sort the extents by datetime
logging.info("Collected %d files. Now sort them." % len(file_list))
sortedWaterExtents = sorted(waterExtents, key=lambda extent: extent.getDatetime())
return (sortedWaterExtents, cellId)
else:
msg = "Collected {} files. Sorting not applied.".format(len(file_list))
logging.info(msg)
return (waterExtents, cellId)
# Define the main process that will work in a tiling fashion, i.e. process a
# chunk at a time rather than all at once
# Alternatively we do this through luigi
def tiled_main(vector_file, cell_list, indir, outdir, pattern, logpath):
"""
"""
# setup logging file ... log to <outputPath>/../logs/createWaterExtent_<hostname>_pid.log
log_file = "waterExtentVectorSummary_{}_{}.log".format(os.uname()[1],
os.getpid())
logPath = os.path.join(logpath, log_file)
logging.basicConfig(filename=logPath,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%d/%m/%Y %H:%M:%S', level=logging.INFO)
baseOutputDir = Directory(outdir)
if not baseOutputDir.exists():
logging.error("%s does not exist" % baseOutputDir.getPath())
sys.exit(1)
logging.info("Opening vector file %s" %vector_file)
vec_ds = ogr.Open(vector_file)
layer = vec_ds.GetLayer()
# Initialise dicts to hold feature names, and hydro_id
feature_names = {}
hydro_id = {}
# Dicts to hold forward and backward mapping of fid's and seg id's
seg2fid = {}
fid2seg = {}
fid_list = []
fid_df = {}
logging.info("Gathering attribute information for each feature.")
# These Field Id's are unique to NGIG's vector datasets
for feature in layer:
fid = feature.GetFID()
feature_names[fid] = feature.GetField("NAME")
hydro_id[fid] = feature.GetField("AUSHYDRO_I")
seg2fid[fid+1] = fid
fid2seg[fid] = fid + 1
fid_list.append(fid)
fid_df[fid] = pandas.DataFrame()
# Initialise the dataframe to store the results
df = pandas.DataFrame()
df['FID'] = fid_list
nfeatures = len(fid_list)
min_fid = df['FID'].min()
max_fid = df['FID'].max()
# We offset the min and max fid's by 1 as the rasterisation will be
# created that way. The array of shape (10) is arbitrary
h = histogram(numpy.zeros((10), dtype='int32'), Max=max_fid+1,
Min=min_fid+1)
# This will be used as the input keyword and changes will be made in place
t_area = h['histogram']
# Create an output file that we can continually append data
store = pandas.HDFStore(os.path.join(outdir, 'Test_Results.h5'))
for cell in cell_list:
logging.info("Processing Cell ID: {}".format(cell))
celldir = os.path.join(indir, cell)
# Process the current tile/chunk which in this case is a datacube cell
result_df = tiled_processing(vector_file, t_area, min_fid, max_fid,
celldir, pattern)
# We don't need to define columns up front
# We can define an empty dataframe and append the data
# That way columnss can be defined within the script
for key in result_df:
# Group names shouldn't start with a number
group_name = "FID_{}".format(key)
store.append(group_name, result_df[key])
# Combine FIDs with identical timestamps and sum the pixel counts
# Including the hydro_id and fid as groupby's should exclude them from
# the summation.
# The filename and Feature Name fields will be removed as a result of the
# summation. Feature Name could potentially be kept
group_items = ['Time Stamp', 'AUSHYDRO_ID', 'FID']
for key in store.keys():
store[key] = store[key].groupby(group_items).sum()
# Save and close the file
store.close()
def tiled_processing(vector_file, input_hist, Min_id, Max_id, indir, pattern):
"""
The main processing routine.
:param indir:
A string containing the file system pathname to a directory
containing the water extent image files.
:param outdir:
A string containing the file system pathname to a directory
that will contain the result output.
:param logpath:
A string containing the file system pathname to a directory
that will contain the operation system logging information.
:param pattern:
A string containing the image extents file extension pattern,
eg '*.tif'.
:param vector_file:
A string containing the file system pathname to an OGR
A string containing the file system pathname to an OGR
compatible vector file.
:param outfname):
A string containing the ststem file pathname for the output
csv file.
:return:
Nothing, main() acts as a procedure.
"""
# Get a list of water_extent files
files = getFiles(indir, pattern)
# Get the water_extent objects and sort them by date
sortedWaterExtents, cellId = getWaterExtents(files)
# lat and lon will be helpful
lon = cellId[0]
lat = cellId[1]
# Rasterise the features
# We can use the first image file as the base
# Rasterising the feature id's will be fid + 1
# eg an fid of 10 is a segment id of 11, this allows for 0 to be the fill
# value
segments_ds = Rasterise(RasterFilename=files[0], VectorFilename=vector_file)
logging.info("Rasterising features.")
segments_ds.rasterise()
# Extract the array
veg2rast = segments_ds.segemented_array
# Initialise the segment visitor
seg_vis = SegmentVisitor(veg2rast)
# Update the total area (recursive histogram technique)
# input keyword modifies in-place
recursive_h = histogram(veg2rast.ravel(), input=input_hist, Min=Min_id,
Max=Max_id)
# Get specific attribute records
logging.info("Opening vector file %s" %vector_file)
vec_ds = ogr.Open(vector_file)
layer = vec_ds.GetLayer()
# Define the headings for the data frame
headings = ["Filename", "Time Stamp", "Feature Name", "AUSHYDRO_ID",
"FID", "Total Pixel Count", "WATER_NOT_PRESENT",
"NO_DATA", "MASKED_NO_CONTIGUITY",
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW",
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW",
"MASKED_CLOUD", "WATER_PRESENT"]
# Initialise dicts to hold feature names, hydro_id and data frame
feature_names = {}
hydro_id = {}
fid_df = {}
# Dicts to hold forward and backward mapping of fid's and segment id's
# seg = segment
# fid = feature id
seg2fid = {}
fid2seg = {}
logging.info("Gathering attribute information for each feature.")
# These Field Id's are unique to NGIG's vector datasets
for feature in layer:
fid = feature.GetFID()
feature_names[fid] = feature.GetField("NAME")
hydro_id[fid] = feature.GetField("AUSHYDRO_I")
seg2fid[fid+1] = fid
fid2seg[fid] = fid + 1
fid_df[fid] = pandas.DataFrame(columns=headings)
# Go back to the start of the vector file
layer.ResetReading()
# Replace any occurences of None with UNKNOWN
for key in feature_names:
if feature_names[key] == None:
feature_names[key] = 'UNKNOWN'
# Loop over each WaterExtent file
for waterExtent in sortedWaterExtents:
logging.info("Processing %s" % waterExtent.filename)
# Read the waterLayer from the extent file
waterLayer = waterExtent.getArray()
# timestamp
timestamp = waterExtent.getDatetime()
# Loop over each feature Id
# Skip any FID's that don't exist in the current spatial extent
for key in fid2seg:
if fid2seg[key] > seg_vis.max_segID:
continue
data = seg_vis.getSegmentData(waterLayer, segmentID=fid2seg[key])
# dimensions of the data which will be 1D
dim = data.shape
# Returns are 1D arrays, so check if we have an empty array
if dim[0] == 0:
continue # Empty bin, (no data), skipping
FID = key
# Characterise the polygon's makeup based on the WoFS data.
h = histogram(data, Min=0, Max=128)
hist = h['histogram']
# Area is the number of pixels
total_area = dim[0]
"""
A WaterTile stores 1 data layer encoded as unsigned BYTE values as described in the WaterConstants.py file.
Note - legal (decimal) values are:
0: no water in pixel
1: no data (one or more bands) in source NBAR image
2-127: pixel masked for some reason (refer to MASKED bits)
128: water in pixel
Values 129-255 are illegal (i.e. if bit 7 set, all others must be unset)
WATER_PRESENT (dec 128) bit 7: 1=water present, 0=no water if all other bits zero
MASKED_CLOUD (dec 64) bit 6: 1=pixel masked out due to cloud, 0=unmasked
MASKED_CLOUD_SHADOW (dec 32) bit 5: 1=pixel masked out due to cloud shadow, 0=unmasked
MASKED_HIGH_SLOPE (dec 16) bit 4: 1=pixel masked out due to high slope, 0=unmasked
MASKED_TERRAIN_SHADOW (dec 8) bit 3: 1=pixel masked out due to terrain shadow, 0=unmasked
MASKED_SEA_WATER (dec 4) bit 2: 1=pixel masked out due to being over sea, 0=unmasked
MASKED_NO_CONTIGUITY (dec 2) bit 1: 1=pixel masked out due to lack of data contiguity, 0=unmasked
NO_DATA (dec 1) bit 0: 1=pixel masked out due to NO_DATA in NBAR source, 0=valid data in NBAR
WATER_NOT_PRESENT (dec 0) All bits zero indicated valid observation, no water present
"""
# [0..128] bins were generated, i.e 129 bins
WATER_NOT_PRESENT = hist[0]
NO_DATA = hist[1]
MASKED_NO_CONTIGUITY = hist[2]
MASKED_SEA_WATER = hist[4]
MASKED_TERRAIN_SHADOW = hist[8]
MASKED_HIGH_SLOPE = hist[16]
MASKED_CLOUD_SHADOW = hist[32]
MASKED_CLOUD = hist[64]
WATER_PRESENT = hist[128]
format_dict = {'Filename': waterExtent.filename,
'Time Stamp': timestamp,
'Feature Name': feature_names[key],
'AUSHYDRO_ID': hydro_id[key],
'FID': FID,
'Total Pixel Count': total_area,
'WATER_NOT_PRESENT': WATER_NOT_PRESENT,
'NO_DATA': NO_DATA,
'MASKED_NO_CONTIGUITY': MASKED_NO_CONTIGUITY,
'MASKED_SEA_WATER': MASKED_SEA_WATER,
'MASKED_TERRAIN_SHADOW': MASKED_TERRAIN_SHADOW,
'MASKED_HIGH_SLOPE': MASKED_HIGH_SLOPE,
'MASKED_CLOUD_SHADOW': MASKED_CLOUD_SHADOW,
'MASKED_CLOUD': MASKED_CLOUD,
'WATER_PRESENT': WATER_PRESENT}
# Append the new data to the FID data frame
fid_df[FID] = fid_df[FID].append(format_dict, ignore_index=True)
return fid_df
if __name__ == '__main__':
description = 'Polygon characterisation of WoFS data through time.'
parser = argparse.ArgumentParser(description)
parser.add_argument('--outdir', dest="baseOutputPath", help="Output base directory.", required=True)
parser.add_argument('--log', dest="logPath", help="Directory where log files will be written.", required=True)
parser.add_argument('--indir', required=True, help="Input directory containing water extent files.")
parser.add_argument('--sfx', default='*.tif', help="File suffix to search for. Default is '*.tif'")
parser.add_argument('--vector', required=True, help="An OGR compatible vector file.")
parser.add_argument('--outname', default='WaterExtentVectorSummary.csv', help="The name of the output file to contain the summary. Default is 'WaterExtentVectorSummary.csv'.")
# Collect the arguments
args = parser.parse_args()
# Retrieve command arguments
baseOutputPath = args.baseOutputPath
log = args.logPath
path = args.indir
pattern = args.sfx
vector_file = args.vector
outfname = args.outname
# For testing we're defining the cells to process.
# In future we'll get these returned from the API
# Run
#cell_list = ['144_-041', '145_-041', '147_-041', '148_-041', '145_-042',
# '146_-042', '147_-042', '148_-042', '145_-043', '146_-043',
# '147_-043', '146_-044']
#cell_list = ['144_-041']
cell_list = ['146_-037']
tiled_main(vector_file, cell_list, path, baseOutputPath, pattern, log)
#!/usr/bin/env python
import os
from os.path import join as pjoin, dirname, basename
import cPickle as pickle
import argparse
import numpy
import luigi
import pandas
from datetime import datetime as dt
from water_body_characterisation import get_files
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
class WriteGroupTask(luigi.Task):
group_id = luigi.IntParameter()
def requires(self):
return []
def output(self):
out_fname = pjoin(CONFIG.get('work', 'output_directory'),
'WriteGroupTask_{}.completed'.format(self.group_id))
return luigi.LocalTarget(out_fname)
def run(self):
# Config params
id_name = CONFIG.get('work', 'feature_name')
base_out_dir = CONFIG.get('work', 'output_directory')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
combined_cell_wc_fname = CONFIG.get('outputs',
'combined_cell_wc_filename')
ngroups = int(CONFIG.get('internals', 'pandas_groups')
chunksize = int(CONFIG.get('internals', 'pandas_chunksize')
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
# Define base headings
headings = [id_name, "Timestamp", "Total_Pixel_Count",
"WATER_NOT_PRESENT",
"NO_DATA", "MASKED_NO_CONTIGUITY",
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW",
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW",
"MASKED_CLOUD", "WATER_PRESENT"]
# Define the output store
out_format = CONFIG.get('outputs', 'groups_filename_format')
store_fname = pjoin(base_out_dir, out_format.format(self.group_id))
group_store = pandas.HDFStore(store_fname)
for cell in cells:
cell_dir = pjoin(base_out_dir, cell)
cell_wc_fname = pjoin(cell_dir, combined_cell_wc_fname)
# Open the current cells WC result file
store = pandas.HDFStore(cell_wc_fname, 'r')
if '/data' in store.keys():
groups = store['data'].groupby(store['data'][id_name]
% ngroups)
else:
store.close()
continue
for grp, grouped in groups:
if grp != self.group_id:
continue
group_store.append('data', grouped, data_columns=headings,
index=False)
store.close()
group_store.close()
with self.output().open('w') as outf:
outf.write('completed')
class ManageAllGroupWriteTasks(luigi.Task):
ngroups = luigi.IntParameter()
def requires(self):
tasks = []
for i in range(self.ngroups):
tasks.append(WriteGroupTask(i))
return tasks
def output(self):
out_fname = pjoin(CONFIG.get('work', 'output_directory'),
'ManageAllGroupWriteTasks.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('completed')
class CombineAllGroups(luigi.Task):
ngroups = luigi.IntParameter()
def requires(self):
return [ManageAllGroupWriteTasks(self.ngroups)]
def output(self):
out_fname = pjoin(CONFIG.get('work', 'output_directory'),
'CombineAllGroups.completed')
return luigi.LocalTarget(out_fname)
def run(self):
id_name = CONFIG.get('work', 'feature_name')
out_dir = CONFIG.get('work', 'output_directory')
out_format = CONFIG.get('outputs', 'groups_filename_format')
files = get_files(out_dir, out_format.format('*'))
base_out_fname = CONFIG.get('outputs', 'final_output_filename')
out_fname = pjoin(out_dir, base_out_fname)
combined_store = pandas.HDFStore(out_fname, 'w')
# Define base headings
headings = [id_name, "Timestamp", "Total_Pixel_Count",
"WATER_NOT_PRESENT",
"NO_DATA", "MASKED_NO_CONTIGUITY",
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW",
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW",
"MASKED_CLOUD", "WATER_PRESENT"]
items = ['Timestamp', id_name]
for f in files:
store = pandas.HDFStore(f, 'r')
if not '/data' in store.keys():
store.close()
continue
df = store['data'].groupby(items, as_index=False).sum()
combined_store.append('data', df, data_columns=headings)
store.close()
combined_store.close()
with self.output().open('w') as outf:
outf.write('completed')
if __name__ == '__main__':
ngroups = int(CONFIG.get('work', 'groups'))
luigi.build([CombineAllGroups(ngroups)], local_scheduler=True, workers=16)
luigi.run()
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=05:00:00,mem=32GB,ncpus=16
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load gdal/1.10.1
module load IDL_functions/0.2
module load gaip/test
module load agdc-api
module load pandas
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/parallel_combine.py
python $PY_FILE
#!/bin/bash
#PBS -P v10
#PBS -q normal
#PBS -l walltime=10:00:00,mem=1600GB,ncpus=800
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load gdal/1.10.1
module load gaip/test
module load agdc-api
module load pandas
module unload IDL_functions
module load IDL_functions/0.2
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/workflow.py
mpirun -n 16 python $PY_FILE
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=05:00:00,mem=16GB,ncpus=1
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load gdal/1.10.1
module load IDL_functions/0.2
module load gaip/test
module load agdc-api
module load pandas
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/combine.py
python $PY_FILE
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=00:10:00,mem=3GB,ncpus=1
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load gdal/1.10.1
module load IDL_functions/0.2
module load gaip/test
module load agdc-api
module load pandas
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/submit_query.py
python $PY_FILE
#!/usr/bin/env python
import luigi
import logging
import os
from os.path import join as pjoin, exists, dirname
import cPickle as pickle
import subprocess
import ogr
from water_body_characterisation import spatial_intersection
from water_body_characterisation import get_files
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
PBS_DSH = (
"""#!/bin/bash
#PBS -P {project}
#PBS -q {queue}
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB
#PBS -l wd
#PBS -me
#PBS -M {email}
NNODES={nnodes}
for i in $(seq 1 $NNODES); do
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \
--tile $[$i - 1]" &
done;
wait
""")
# DEPRECATED
class CellQueryTask(luigi.Task):
"""
A task to generate a list of which cells will be analysed
in the throughout the workflow.
DEPRECATED in favor of cell_query()
"""
def requires(self):
return []
def output(self):
out_dir = CONFIG.get('work', 'output_directory')
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list'))
return luigi.LocalTarget(out_fname)
def run(self):
out_dir = CONFIG.get('work', 'output_directory')
envelope = bool(CONFIG.get('internals', 'envelope'))
vector_fname = CONFIG.get('work', 'vector_filename')
dcube_vector_fname = CONFIG.get('internals', 'cell_grid')
fids = spatial_intersection(dcube_vector_fname, vector_fname,
envelope=envelope)
vec_ds = ogr.Open(dcube_vector_fname)
layer = vec_ds.GetLayer(0)
cell_dirs = []
for fid in fids:
feat = layer.GetFeature(fid)
cell_dir = "{:03.0f}_{:04.0f}".format(feat.GetField("XMIN"),
feat.GetField("YMIN"))
cell_dirs.append(cell_dir)
out_cell_dir = pjoin(out_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
with self.output().open('w') as outf:
pickle.dump(cell_dirs, outf)
# DEPRECATED
class FindWaterFilesTask(luigi.Task):
"""
Find the WaterExtents files and generate a task list for a
cell.
DEPRECATED in favour of find_water_files().
"""
data_dir = luigi.Parameter()
out_dir = luigi.Parameter()
def requires(self):
return []
def output(self):
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'water_files_list'))
return luigi.LocalTarget(out_fname)
def run(self):
pattern = CONFIG.get('work', 'pattern')
files = get_files(self.data_dir, pattern)
with self.output().open('w') as out_file:
pickle.dump(files, out_file)
# DEPRECATED
class AllCellsFindFilesTask(luigi.Task):
"""
A helper task that kicks off file listings for each cell.
DEPRECATED in favour of find_water_files().
"""
water_dir = CONFIG.get('internals', 'water_directory')
base_out_dir = CONFIG.get('work', 'output_directory')
def requires(self):
cells_list_fname = pjoin(self.base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells:
data_dir = pjoin(self.water_dir, cell)
out_dir = pjoin(self.base_out_dir, cell)
tasks.append(FindWaterFilesTask(data_dir, out_dir))
return tasks
def output(self):
out_fname = pjoin(self.base_out_dir, 'AllCellsFindFilesTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
def cell_query():
"""
A simple procedure to generate a list which cells that will
be analysed throughout the workflow.
"""
out_dir = CONFIG.get('work', 'output_directory')
envelope = bool(CONFIG.get('internals', 'envelope'))
vector_fname = CONFIG.get('work', 'vector_filename')
dcube_vector_fname = CONFIG.get('internals', 'cell_grid')
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list'))
fids = spatial_intersection(dcube_vector_fname, vector_fname,
envelope=envelope)
vec_ds = ogr.Open(dcube_vector_fname)
layer = vec_ds.GetLayer(0)
cell_dirs = []
for fid in fids:
feat = layer.GetFeature(fid)
cell_dir = "{:03.0f}_{:04.0f}".format(feat.GetField("XMIN"),
feat.GetField("YMIN"))
cell_dirs.append(cell_dir)
out_cell_dir = pjoin(out_dir, cell_dir)
if not exists(out_cell_dir):
os.makedirs(out_cell_dir)
with open(out_fname, 'w') as outf:
pickle.dump(cell_dirs, outf)
def find_water_files():
"""
A simple procedure that finds water files from each cell that
needs to be analysed.
"""
base_out_dir = CONFIG.get('work', 'output_directory')
water_dir = CONFIG.get('internals', 'water_directory')
pattern = CONFIG.get('internals', 'pattern')
base_out_fname = CONFIG.get('outputs', 'water_files_list')
cells_list_fname = pjoin(base_out_dir, CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
for cell in cells:
data_dir = pjoin(water_dir, cell)
out_dir = pjoin(base_out_dir, cell)
out_fname = pjoin(out_dir, base_out_fname)
files = get_files(data_dir, pattern)
with open(out_fname, 'w') as out_file:
pickle.dump(files, out_file)
def create_tiles(array_size, tile_size=25):
"""
A minor function to tile a 1D array or list into smaller manageable
portions.
"""
idx_start = range(0, array_size, tile_size)
idx_tiles = []
for idx_st in idx_start:
if idx_st + tile_size < array_size:
idx_end = idx_st + tile_size
else:
idx_end = array_size
idx_tiles.append((idx_st, idx_end))
return idx_tiles
if __name__ == '__main__':
# Create the output directory
out_dir = CONFIG.get('work', 'output_directory')
if not exists(out_dir):
os.makedirs(out_dir)
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/run_wc_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Just turn these into a simple function calls. In this framework we don't
# need a luigi workload for a simple single task
cell_query()
find_water_files()
cells_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
# Create the tiles list that contains groups of cells to operate on
# Each node will have a certain number of cells to work on
cpnode = int(CONFIG.get('internals', 'cells_per_node'))
tiles = create_tiles(len(cells), cpnode)
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'w') as out_file:
pickle.dump(tiles, out_file)
# Setup the modules to use for the job
modules = CONFIG.get('pbs', 'modules').split(',')
modules = ['module load {}; '.format(module) for module in modules]
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;')
modules = ''.join(modules)
# Calculate the job node and cpu requirements
nnodes = len(tiles)
ncpus = nnodes * 16
mem = nnodes * 32
# Populate the PBS shell script
project = CONFIG.get('pbs', 'project')
queue = CONFIG.get('pbs', 'queue')
walltime = CONFIG.get('pbs', 'walltime')
email = CONFIG.get('pbs', 'email')
py_file = pjoin(dirname(__file__), 'workflow.py')
pbs_job = PBS_DSH.format(project=project, queue=queue,
walltime=walltime, ncpus=ncpus, mem=mem,
email=email, nnodes=nnodes,
modules=modules, pyfile=py_file)
# Out put the shell script to disk
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename'))
with open(pbs_fname, 'w') as out_file:
out_file.write(pbs_job)
subprocess.check_call(['qsub', pbs_fname])
#!/usr/bin/env python
import os
import glob
import sys
import logging
# Debugging
import datetime
#import pdb
import numpy
from osgeo import gdal
from osgeo import ogr
import pandas
# ga-neo-nfrip repo
# Until a module is setup we'll define a temp PYTHONPATH
sys.path.append('/home/547/jps547/git_repositories/ga-neo-nfrip/system/common')
sys.path.append('/home/547/jps547/git_repositories/ga-neo-nfrip/system/water')
from WaterExtent import WaterExtent
from fileSystem import Directory
# IDL_functions repo
from IDL_functions import histogram
# image_processing repo
from image_processing.segmentation.rasterise import Rasterise
from image_processing.segmentation.segmentation import SegmentVisitor
from gaip import GriddedGeoBox
from gaip import read_img
from gaip import write_img
def spatial_intersection(input_vector_fname, base_vector_fname, envelope=True):
"""
Performs a spatial intersection of feature geometry to and
returns a list of FID's from the base vector file.
:param input_vector_fname:
A string containing the full file path name to an
OGR compliant vector file.
:param base_vector_fname:
A string containing the full file path name to an
OGR compliant vector file. This file will be used to select
features from.
:param envelope:
If set to True (Default), then the envelope of each feature
will be used rather than the geometry of the feature to perform
intersection.
:return:
A `list` containing the FID's of the base vector.
"""
vec_ds1 = ogr.Open(input_vector_fname)
vec_ds2 = ogr.Open(base_vector_fname)
lyr1 = vec_ds1.GetLayer()
lyr2 = vec_ds2.GetLayer()
fids = []
if envelope:
for feat2 in lyr2:
geom = feat2.GetGeometryRef()
xmin, xmax, ymin, ymax = geom.GetEnvelope()
lyr1.SetSpatialFilterRect(xmin, ymin, xmax, ymax)
for feat1 in lyr1:
fids.append(feat1.GetFID())
lyr1.SetSpatialFilter(None)
else:
for feat2 in lyr2:
ref = feat2.GetGeometryRef()
lyr1.SetSpatialFilter(ref)
for feat1 in lyr1:
fids.append(feat1.GetFID())
lyr1.SetSpatialFilter(None)
fids = numpy.unique(numpy.array(fids)).tolist()
return fids
def rasterise_vector(vector_filename, image_filename, out_filename):
"""
Given a vector and an image, rasterise the vector using the
images dimensions and spatial extents as a base.
:param vector_filename:
A `string` containing the full file path name to the
vector that is to be rasterised.
:param image_filename:
A `string` containing the full file path name to the
image that is to be used as a base when rasterising the
vector file.
:param out_filename:
A `string` containing the full file path name to be
used for creating the output image.
:return:
None. The output image is written directly to disk.
"""
segments_ds = Rasterise(image_filename, vector_filename)
segments_ds.rasterise()
geobox = GriddedGeoBox.from_gdal_dataset(gdal.Open(image_filename))
write_img(segments_ds.segmented_array, out_filename, fmt="GTiff",
geobox=geobox, nodata=0)
def get_files(path, pattern):
"""
Just an internal function to find files given an file extension.
This isn't really designed to go beyond development demonstration
for this analytical workflow.
"""
# Get the current directory so we can change back later
cwd = os.getcwd()
os.chdir(path)
# Find the files matching the pattern
files = glob.glob(pattern)
files = [os.path.abspath(i) for i in files]
# Change back to the original directory
os.chdir(cwd)
return files
def get_water_extents(file_list, sort=True):
"""
Given a list of water extent image files, create a list of
waterExtent class objects, and if sort by time, old -> new, if
sort=True.
:param file_list:
A list containing filepath names to water extent image files.
:param sort:
A boolean keyword indicating if the waterExtent objects
should be sorted before they're returned. Default is True.
:return:
A list of waterExtent class objects, one for every water
image file in file_list, and optionally sorted by date,
old -> new.
"""
water_extents = []
cell_id = None
for f in file_list:
water_extent = WaterExtent(f)
# check for lon, lat consistency
if cell_id:
this_cell_id = [water_extent.lon, water_extent.lat]
if this_cell_id != cell_id:
msg = ("Extents must be from same cell. At file {} got {}, "
"expecting {}")
msg = msg.format(f, this_cell_id, cell_id)
logging.error(msg)
sys.exit(1)
else:
cell_id = [water_extent.lon, water_extent.lat]
water_extents.append(water_extent)
if sort:
# all good, so now sort the extents by datetime
logging.info("Collected %d files. Now sort them." % len(file_list))
sorted_water_extents = sorted(water_extents,
key=lambda extent: extent.getDatetime())
return (sorted_water_extents, cell_id)
else:
msg = "Collected {} files. Sorting not applied.".format(len(file_list))
logging.info(msg)
return (water_extents, cell_id)
def water_characterisation(water_filename, rasterised_filename, id_name='SID',
id_map=None):
"""
Calculates the water characterisation.
"""
# Initialiase a blank dataframe
headings = [id_name, "Timestamp", "Total_Pixel_Count", "WATER_NOT_PRESENT",
"NO_DATA", "MASKED_NO_CONTIGUITY",
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW",
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW",
"MASKED_CLOUD", "WATER_PRESENT"]
df = pandas.DataFrame(columns=headings)
water_extent = WaterExtent(water_filename)
# timestamp
timestamp = water_extent.getDatetime()
# Read the rasterised image
img = read_img(rasterised_filename)
# Initialise the segment visitor
seg_vis = SegmentVisitor(img)
# Initialise dicts to hold, hydro_id and data frame
#seg_df = {}
# Get the potential segement ID's
if seg_vis.n_segments == 0:
return df
seg_ids = range(seg_vis.min_seg_id, seg_vis.max_seg_id + 1)
# Read the water layer data
water_data = water_extent.getArray()
# Create a mapping of SID's that map to iteslf if we have None
if id_map is None:
id_map = {}
for i in seg_ids:
id_map[i] = i
for seg_id in seg_ids:
data = seg_vis.get_segment_data(water_data, segment_id=seg_id)
# dimensions of the data which will be 1D
dim = data.shape
# Returns are 1D arrays, so check if we have an empty array
if dim[0] == 0:
continue # Empty bin, (no data), skipping
# Characterise the polygon's makeup based on the WoFS data.
h = histogram(data, Min=0, Max=128)
hist = h['histogram']
# Area is the number of pixels
total_area = dim[0]
"""
A WaterTile stores 1 data layer encoded as unsigned BYTE values as
described in the WaterConstants.py file.
Note - legal (decimal) values are:
0: no water in pixel
1: no data (one or more bands) in source NBAR image
2-127: pixel masked for some reason (refer to MASKED bits)
128: water in pixel
Values 129-255 are illegal (i.e. if bit 7 set, all others must be
unset).
WATER_PRESENT (dec 128) bit 7: 1 = water present,
0 = no water if all other bits zero
MASKED_CLOUD (dec 64) bit 6: 1 = pixel masked out due to
cloud, 0 = unmasked
MASKED_CLOUD_SHADOW (dec 32) bit 5: 1 = pixel masked out due to
cloud shadow, 0 = unmasked
MASKED_HIGH_SLOPE (dec 16) bit 4: 1 = pixel masked out due to
high slope, 0 = unmasked
MASKED_TERRAIN_SHADOW (dec 8) bit 3: 1 = pixel masked out due to
terrain shadow, 0 = unmasked
MASKED_SEA_WATER (dec 4) bit 2: 1 = pixel masked out due to
being over sea, 0 = unmasked
MASKED_NO_CONTIGUITY (dec 2) bit 1: 1 = pixel masked out due to
lack of data contiguity, 0 = unmasked
NO_DATA (dec 1) bit 0: 1 = pixel masked out due to
NO_DATA in NBAR source, 0 = valid data in NBAR
WATER_NOT_PRESENT (dec 0) All bits zero indicated valid
observation, no water present
"""
# [0..128] bins were generated, i.e 129 bins
WATER_NOT_PRESENT = hist[0]
NO_DATA = hist[1]
MASKED_NO_CONTIGUITY = hist[2]
MASKED_SEA_WATER = hist[4]
MASKED_TERRAIN_SHADOW = hist[8]
MASKED_HIGH_SLOPE = hist[16]
MASKED_CLOUD_SHADOW = hist[32]
MASKED_CLOUD = hist[64]
WATER_PRESENT = hist[128]
format_dict = {id_name: id_map[seg_id],
'Timestamp': timestamp,
'Total_Pixel_Count': total_area,
'WATER_NOT_PRESENT': WATER_NOT_PRESENT,
'NO_DATA': NO_DATA,
'MASKED_NO_CONTIGUITY': MASKED_NO_CONTIGUITY,
'MASKED_SEA_WATER': MASKED_SEA_WATER,
'MASKED_TERRAIN_SHADOW': MASKED_TERRAIN_SHADOW,
'MASKED_HIGH_SLOPE': MASKED_HIGH_SLOPE,
'MASKED_CLOUD_SHADOW': MASKED_CLOUD_SHADOW,
'MASKED_CLOUD': MASKED_CLOUD,
'WATER_PRESENT': WATER_PRESENT}
# Append the new data to the FID data frame
df = df.append(format_dict, ignore_index=True)
return df
#!/usr/bin/env python
import luigi
from datetime import datetime as dt
import logging
import os
from os.path import join as pjoin, exists, dirname, basename
import cPickle as pickle
import argparse
import ogr
import pandas
from water_body_characterisation import spatial_intersection
from water_body_characterisation import get_files
from water_body_characterisation import rasterise_vector
from water_body_characterisation import get_water_extents
from water_body_characterisation import water_characterisation
from submit_query import create_tiles
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg'))
class CellWCTasks(luigi.Task):
"""
For a given cell define a water characterisation task for each
water file.
"""
out_dir = luigi.Parameter()
def requires(self):
base_name = CONFIG.get('outputs', 'water_characterisation_out_format')
base_name = pjoin(self.out_dir, base_name)
water_list = pjoin(self.out_dir,
CONFIG.get('outputs', 'water_files_list'))
with open(water_list, 'r') as infile:
files = pickle.load(infile)
sorted_water_extents, _ = get_water_extents(files)
targets = []
for water_extent in sorted_water_extents:
out_fname = base_name.format(water_extent.getDatetime())
out_fname = out_fname.replace(' ', '-')
water_fname = water_extent.path
targets.append(WaterCharacterisationTask(water_fname, out_fname))
return targets
def output(self):
out_fname = pjoin(self.out_dir,
'CellWCTasks.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class AllCellsWCTask(luigi.Task):
"""
A helper task to issue water characterisation tasks to each
cell.
"""
base_out_dir = CONFIG.get('work', 'output_directory')
def requires(self):
cells_list_fname = pjoin(self.base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells:
out_dir = pjoin(self.base_out_dir, cell)
tasks.append(CellWCTasks(out_dir))
return tasks
def output(self):
out_fname = pjoin(self.base_out_dir,
'AllCellsWCTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class AllCellsRasteriseTaskTiled(luigi.Task):
"""
A helper task that kicks off RasteriseTask's for each cell.
"""
idx1 = luigi.IntParameter()
idx2 = luigi.IntParameter()
base_out_dir = CONFIG.get('work', 'output_directory')
def requires(self):
cells_list_fname = pjoin(self.base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells[self.idx1:self.idx2]:
out_dir = pjoin(self.base_out_dir, cell)
tasks.append(RasteriseTask(out_dir))
return tasks
def output(self):
out_fname = 'AllCellsRasteriseTaskTiled_{}:{}.completed'
out_fname = out_fname.format(self.idx1, self.idx2)
out_fname = pjoin(self.base_out_dir, out_fname)
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class RasteriseTask(luigi.Task):
"""
Computes the rasterisation for a cell.
"""
out_dir = luigi.Parameter()
def requires(self):
return []
def output(self):
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed')
return luigi.LocalTarget(out_fname)
def run(self):
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'rasterise_filename'))
water_files_list = pjoin(self.out_dir,
CONFIG.get('outputs', 'water_files_list'))
with open(water_files_list, 'r') as infile:
water_files = pickle.load(infile)
vector_fname = CONFIG.get('work', 'vector_filename')
rasterise_vector(vector_fname, water_files[0], out_fname)
with self.output().open('w') as outf:
outf.write('Complete')
msg = "Rasterise completed for {} time {}".format(out_fname, dt.now())
logging.debug(msg)
class WaterCharacterisationTask(luigi.Task):
"""
Computes the water characterisation task.
"""
water_fname = luigi.Parameter()
out_fname = luigi.Parameter()
def requires(self):
return [RasteriseTask(dirname(self.out_fname))]
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
rasterised_fname = pjoin(dirname(self.out_fname),
CONFIG.get('outputs', 'rasterise_filename'))
id_name = CONFIG.get('work', 'feature_name')
# Open the vector file
vector_fname = CONFIG.get('work', 'vector_filename')
vec_ds = ogr.Open(vector_fname)
layer = vec_ds.GetLayer()
# Dicts to hold forward and backward mapping of fid's and segment id's
hydro_id = {}
# Get the attribute of interest to save as the group name
# ogr will only display a certain number of characters
# customise the attribute name for the time being, we can put in the
# config file later
attribute = "AUSHYDRO_I"
for feature in layer:
fid = feature.GetFID()
hydro_id[fid + 1] = feature.GetField(attribute)
layer.ResetReading()
layer = None
vec_ds = None
result = water_characterisation(self.water_fname, rasterised_fname,
id_name=id_name, id_map=hydro_id)
# Open the output hdf5 file
store = pandas.HDFStore(self.output().path)
# Write the dataframe
store.append('data', result)
# Save and close the file
store.close()
class CombineCellWCTasksTiled(luigi.Task):
"""
A helper task that creates CombineWCTasks for each cell.
"""
idx1 = luigi.IntParameter()
idx2 = luigi.IntParameter()
def requires(self):
base_out_dir = CONFIG.get('work', 'output_directory')
cells_list_fname = pjoin(base_out_dir,
CONFIG.get('outputs', 'cells_list'))
with open(cells_list_fname, 'r') as infile:
cells = pickle.load(infile)
tasks = []
for cell in cells[self.idx1:self.idx2]:
out_dir = pjoin(base_out_dir, cell)
tasks.append(CombineWCTasks(out_dir))
return tasks
def output(self):
out_dir = CONFIG.get('work', 'output_directory')
out_fname = pjoin(out_dir, 'CombineCellWCTasksTiled_{}:{}.completed')
out_fname = out_fname.format(self.idx1, self.idx2)
return luigi.LocalTarget(out_fname)
def run(self):
with self.output().open('w') as outf:
outf.write('Completed')
class CombineWCTasks(luigi.Task):
"""
Combines all water characterisation from a single cell into a
single file.
"""
out_dir = luigi.Parameter()
def requires(self):
return [CellWCTasks(self.out_dir)]
def output(self):
out_fname = pjoin(self.out_dir, 'CombineWCTasks.completed')
return luigi.LocalTarget(out_fname)
def run(self):
# Get an os listing of the WC files for the current out_dir
# or define the names
base_name = CONFIG.get('outputs', 'water_characterisation_out_format')
base_name = pjoin(self.out_dir, base_name)
# Create an output file that we can continually append data
out_fname = pjoin(self.out_dir,
CONFIG.get('outputs', 'combined_cell_wc_filename'))
combined_store = pandas.HDFStore(out_fname)
# We'll use the water extent files to derive the filenames of the
# hdf5 files we need to open
water_list = pjoin(self.out_dir,
CONFIG.get('outputs', 'water_files_list'))
with open(water_list, 'r') as infile:
files = pickle.load(infile)
# Sort the files
sorted_water_extents, _ = get_water_extents(files)
# Open the first file; if we have '/data' then we have polygons to
# extract data for
file_name = base_name.format(sorted_water_extents[0].getDatetime())
file_name = file_name.replace(' ', '-')
store = pandas.HDFStore(file_name)
# If there is nothing in the first file there will be nothing for
# every file
if '/data' in store.keys():
# We have data to retrieve
headings = store['data'].columns.tolist()
store.close()
df = pandas.DataFrame(columns=headings)
for water_extent in sorted_water_extents:
# Derive the filename of the hdf5 file to open
wc_fname = base_name.format(water_extent.getDatetime())
wc_fname = wc_fname.replace(' ', '-')
# Open, append and close
store = pandas.HDFStore(wc_fname)
df = df.append(store['data'])
store.close()
# Re-define the table index
df.reset_index(inplace=True)
# Write to disk
combined_store.append('data', df)
# Save and close the file
combined_store.close()
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
# Setup command-line arguments
desc = "Processes water characterisation for a given set of cells."
hlp = ("The tile/chunk index to retieve from the tiles list. "
"(Needs to have been previously computed to a file named tiles.pkl")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--tile', type=int, help=hlp)
parsed_args = parser.parse_args()
tile_idx = parsed_args.tile
# setup logging
log_dir = CONFIG.get('work', 'logs_directory')
if not exists(log_dir):
os.makedirs(log_dir)
logfile = "{log_path}/run_wc_{uname}_{pid}.log"
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1],
pid=os.getpid())
logging_level = logging.INFO
logging.basicConfig(filename=logfile, level=logging_level,
format=("%(asctime)s: [%(name)s] (%(levelname)s) "
"%(message)s "))
# Get the list of tiles (groups of cells that each node will operate on
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'),
CONFIG.get('outputs', 'tiles_list'))
with open(tiles_list_fname, 'r') as in_file:
tiles = pickle.load(in_file)
# Initialise the job
tile = tiles[tile_idx]
tasks = [CombineCellWCTasksTiled(tile[0], tile[1])]
# Just test with computing the rasterisation first
#tasks = [AllCellsRasteriseTaskTiled(tile[0], tile[1])]
luigi.build(tasks, local_scheduler=True, workers=16)
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment