#! /usr/bin/env python
################################################################################
'''
TSA.py
Program to perfor Time Series Analysis over a stack of yearly data (e.g. NDVI).
Available estimates include linear tendency and Mann-Kandel Test
@Author: Javier Lopatin
@Email: javierlopatin@gmail.com
@Last revision: 18/05/2019
@Version: 1.0
Usage:
python TSA.py -i
-o
-j
Examples:
python TSA.py -i NDVI_timeSeries.tif -o Trends.tif -j 6
The program is based in a scripts obtained at: https://www.uni-goettingen.de/en/524375.html
I addapted the program to read big raster images in chuncks (blocks) of small
size to keep the CPU mamory low. Plus, parallel processing is implemented.
'''
################################################################################
import rasterio#, riomucho
import numpy as np
from scipy import stats
import concurrent.futures
#####################################################
################## FUNCTIONS ########################
#####################################################
def mk_test(x, alpha=0.05):
'''
Mann-Kendall-Test
Originally from: http://www.ambhas.com/codes/statlib.py
I've changed the script though, now its about 35x faster than the original (depending on time series length)
Input x must be a 1D list/array of numbers
'''
n = len(x)
# calculate S
listMa = np.matrix(x) # convert input List to 1D matrix
# calculate all possible differences in matrix
subMa = np.sign(listMa.T - listMa)
# with itself and save only sign of difference (-1,0,1)
# sum lower left triangle of matrix
s = np.sum(subMa[np.tril_indices(n, -1)])
# calculate the unique data
# return_counts=True returns a second array that is equivalent to tp in old version
unique_x = np.unique(x, return_counts=True)
g = len(unique_x[0])
# calculate the var(s)
if n == g: # there is no tie
var_s = (n * (n - 1) * (2 * n + 5)) / 18
else: # there are some ties in data
tp = unique_x[1]
var_s = (n * (n - 1) * (2 * n + 5) +
np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18
if s > 0:
z = (s - 1) / np.sqrt(var_s)
elif s == 0:
z = 0
elif s < 0:
z = (s + 1) / np.sqrt(var_s)
# calculate the p_value
p = 2 * (1 - stats.norm.cdf(abs(z))) # two tail test
h = abs(z) > stats.norm.ppf(1 - alpha / 2)
return h, p
def TSA(dstack):
'''Function that calculates linear regression and Mann-Kendall-pValue coefficients
for each raster pixel against continous time steps.
Input must be an array with shape [rows, columns, bands] or a string with the
full path to the file.
'''
# if funciton called from terminal:
if __name__ == "__main__":
# get to (raw, column, band) shape
dstack = np.transpose(dstack, [1, 2, 0])
# equally spaced time steps by length of inList
timeList = np.asarray(list(range(len(dstack))))
stepLen = len(dstack)
# stack to 1D array
dstack1D = dstack.reshape(-1)
# Break down dstack1D into a list, each element in list contains the single steps
# of one pixel -> List length is equal to number of pixels
# List can be used to use Pythons map() function
dstackList = [dstack1D[i:i + stepLen]
for i in range(0, len(dstack1D), stepLen)]
# initialise empty arrays to be filled by output values, arrays are 1D
slopeAr, intcptAr, rvalAr, pvalAr, stderrAr, mkPAr = [
np.zeros(dstack[0].reshape(-1).shape) for _ in range(6)]
# Use map() to iterate over each pixels timestep values for linear reg and Mann.Kendall
# map(function_to_apply, list_of_inputs)
# lambda function is a small anonymous function. Can have many arguments, but one expression
# lambda arguments : expression
# Method is about 10% faster than using 2 for-loops (one for x- and y-axis)
# lineral tendency
outListReg = list(
map((lambda x: stats.linregress(timeList, x)), dstackList))
# Mann-Kandel Test
outListMK = list(map((lambda x: mk_test(x)), dstackList))
for k in range(len(outListReg)):
slopeAr[k] = outListReg[k][0]
intcptAr[k] = outListReg[k][1]
rvalAr[k] = outListReg[k][2]
pvalAr[k] = outListReg[k][3]
stderrAr[k] = outListReg[k][4]
mkPAr[k] = outListMK[k][1]
outShape = dstack[0].shape
outTuple = (slopeAr.reshape(outShape),
intcptAr.reshape(outShape),
rvalAr.reshape(outShape),
pvalAr.reshape(outShape),
stderrAr.reshape(outShape),
mkPAr.reshape(outShape))
outStack = np.dstack(outTuple) # stack results
# get to (band, raw, column) shape
outStack = np.transpose(outStack, [2, 0, 1])
return outStack
def single_process(infile, outfile):
'''
Process infile in one-step. Use this with small
raster images (low memory use).
'''
# open infile and change metadata
with rasterio.open(infile) as src:
profile = src.profile
profile.update(count=6, dtype='float64')
dstack = src.read()
# fun TSA
tsa = TSA(dstack)
# save results
with rasterio.open(outfile, "w", **profile) as dst:
dst.write(tsa)
def parallel_process(infile, outfile, n_jobs):
"""
Process infile block-by-block with parallel processing
and write to a new file.
"""
from tqdm import tqdm # progress bar
with rasterio.Env():
with rasterio.open(infile) as src:
# Create a destination dataset based on source params. The
# destination will be tiled, and we'll process the tiles
# concurrently.
profile = src.profile
profile.update(blockxsize=128, blockysize=128,
count=6, dtype='float64', tiled=True)
with rasterio.open(outfile, "w", **profile) as dst:
# Materialize a list of destination block windows
# that we will use in several statements below.
windows = [window for ij, window in dst.block_windows()]
# This generator comprehension gives us raster data
# arrays for each window. Later we will zip a mapping
# of it with the windows list to get (window, result)
# pairs.
data_gen = (src.read(window=window) for window in windows)
with concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
) as executor:
# We map the TSA() function over the raster
# data generator, zip the resulting iterator with
# the windows list, and as pairs come back we
# write data to the destination dataset.
for window, result in zip(
tqdm(windows), executor.map(TSA, data_gen)
):
dst.write(result, window=window)
def main(infile, outfile, n_jobs):
'''
Check for the size of infile. if file is below 16384 observation [128 X 128].
If below, use single_process; if above use parallel_process.
'''
with rasterio.open(infile) as src:
width = src.width
height = src.height
if width*height <= 250000:
single_process(infile, outfile)
else:
parallel_process(infile, outfile, n_jobs)
#infile='/home/javier/Documents/SF_delta/Sentinel/TSA/test_year.tif'
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Time Series analysis with parallel raster processing")
parser.add_argument('-i', '--inputImage',
help='Input raster with yearly time series', type=str)
parser.add_argument('-o', '--outputImage',
help='Output raster with trend analysis', type=str)
#parser.add_argument('-a', '--alpha',
# help='Alpha level of significance for Mann-Kandel [default = 0.05]', type=float, default=0.05)
parser.add_argument(
"-j",
metavar="NUM_JOBS",
type=int,
default=4,
help="Number of concurrent jobs [default = all available]",
)
args = parser.parse_args()
main(args.inputImage, args.outputImage, args.j)
'''infile='/home/javier/Documents/SF_delta/Sentinel/TSA/X-004_Y-001/2015-2019_001-365_LEVEL4_TSA_SEN2L_EVI_C0_S0_FAVG_TY_C95T_FBY.tif'
outfile=infile[:-4]+'_TSA.tif'
len(windows)
# get windows from an input
with rasterio.open(infile) as src:
## grabbing the windows as an example. Default behavior is identical.
windows = [[window, ij] for ij, window in src.block_windows()]
options = src.meta
# since we are only writing to 2 bands
options.update(count=6)
global_args = {
'divide': 2
}
processes = 4
# run it
with riomucho.RioMucho(['input1.tif','input2.tif'], 'output.tif', TSA,
windows=windows,
global_args=global_args,
options=options) as rm:
rm.run(processes)'''