import zarr
import moraine as mr
import math
import itertools
from moraine.utils_ import is_cuda_available
if is_cuda_available():
import cupy as cp
co
Covariance and Coherence Matrix Estimator
emperical_co
emperical_co (rslc:numpy.ndarray, is_shp:numpy.ndarray, block_size:int=128)
Maximum likelihood covariance estimator.
Type | Default | Details | |
---|---|---|---|
rslc | ndarray | rslc stack, dtype: cupy.complexfloating |
|
is_shp | ndarray | shp bool, dtype: cupy.bool |
|
block_size | int | 128 | the CUDA block size, it only affects the calculation speed |
Returns | tuple | the covariance and coherence matrix cov and coh |
This function is deprecated as estimate coherence for all pixels are not necessary. Please use emperical_co_pc
instead.
The cov
and coh
is defined as:
\[ cov = E(z_1z_2^*) \quad coh=\frac{E(z_1z_2^*)}{\sqrt{E(|z_1|^2)E(|z_2|^2)}} \]
where \(z_1\) and \(z_2\) are the reference and secondary SLC images, respectively. cov
and coh
are estimated as:
\[ cov = \frac{\sum_{i=1}^{L}z_1^{i}z_2^{i*}}{L} \quad coh = \frac{\sum_{i=1}^{L}z_1^{i}z_2^{i*}}{\sqrt(\sum_{i=1}^{L}|z_1^{i}|^2)(\sum_{i=1}^{L}|z_2^{i}|^2)} \]
using all selected SHPs. Their shapes are [nlines,width,nimages,nimages]. Note dim 2 and dim 3 are reference images and secondary images, respectively.
The rslc
is a three dimentional cupy ndarray
. The dtype
should be cupy.complex64
. From outerest to innerest, the three dimentions are azimuth, range and image. is_shp
is a four dimentional cupy ndarray
. It describes if pixels in the search window are SHP to the central pixel. From outerest ot innerest, they are azimuth, range, secondary pixel relative azimuth, secondary pixel relative range.
Here is an example:
# synthetic data
= np.random.rand(5,10,17).astype(np.float32) + 1j*np.random.rand(5,10,17).astype(np.float32)
rslc = 1;
half_az_win = 2;
half_r_win = np.random.choice(a=[False, True], size=(5,10,2*half_az_win+1,2*half_r_win+1), p=[0.3, 0.7])
is_shp for i, j, k, l in itertools.product(range(is_shp.shape[0]),range(is_shp.shape[1]),range(is_shp.shape[2]),range(is_shp.shape[3])):
if (k == half_az_win) and (l == half_r_win):
= True
is_shp[i,j,k,l] if (i+k-half_az_win<0) or (i+k-half_az_win>=rslc.shape[0]) or (j+l-half_r_win<0) or (j+l-half_r_win>=rslc.shape[1]):
= False is_shp[i,j,k,l]
print(rslc.shape, is_shp.shape, is_shp[2,3])
(5, 10, 17) (5, 10, 3, 5) [[ True False False True True]
[ True False True True True]
[False True True True True]]
rslc
is a stack of 17 rslc images. Each of the image has 5 pixel in azimuth dimention and 10 pixels in range dimention. It shows for pixel (2,3), the (3*5) window around it has 2 SHPs to it (the central one is itself).
if is_cuda_available():
= cp.asarray(rslc)
rslc_cp = cp.asarray(is_shp)
is_shp_cp = emperical_co(rslc_cp,is_shp_cp)
cov_cp,coh_cp print(cov_cp.shape, coh_cp.shape)
(5, 10, 17, 17) (5, 10, 17, 17)
Both cov
and coh
are complex data. The shape shows each covarience or coherence matrix is 17 by 17 since there are 17 images. And cov
and coh
are matrix for all 5*10 pixels.
emperical_co_pc
emperical_co_pc (rslc:numpy.ndarray, idx:numpy.ndarray, pc_is_shp:numpy.ndarray, block_size:int=128, image_pairs:numpy.ndarray=None)
Maximum likelihood covariance estimator for point cloud data.
Type | Default | Details | |
---|---|---|---|
rslc | ndarray | rslc stack, dtype: np.complex64 |
|
idx | ndarray | index of point target (azimuth_index, range_index), dtype: np.int32 , shape: (n_pc, 2) |
|
pc_is_shp | ndarray | shp bool, dtype: bool |
|
block_size | int | 128 | the CUDA block size, it only affects the calculation speed |
image_pairs | ndarray | None | only coherence of those image pairs will estimated, dtype: np.int32 , shape: (n_image_pair, 2) |
Returns | ndarray | coh , dtype:np.complex64 , shape(n_pc, n_image_pair) |
emperical_co_pc
is the emperical_co
on sparse data, e.g., DSs. rslc
is same as emperical_co
. sp_idx
is the index, i.e., a tuple of (azimuth_idx, range_idx). Each index is 1D array. pc_is_shp
is similar to is_shp
in emperical_co
but it only contains information about the point cloud data. It is a 3D array with shape [number_of_point,az_win,r_win].
Compared with emperical_co
, emperical_co_pc
only estimate coherence at specific position so the memory usage is much small.
emperical_co
only return the coherence of specified image pairs to save data volume (compressed coherence). By default, only the upper triangle (with offset=1) of the coherence matrix is returned.
Example:
# synthetic data
= np.random.rand(5,10,17).astype(np.float32) + 1j*np.random.rand(5,10,17).astype(np.float32)
rslc = 1;
half_az_win = 2;
half_r_win
= np.random.choice(a=[False, True], size=(5,10,2*half_az_win+1,2*half_r_win+1), p=[0.9, 0.1])
is_shp # ensure the format of is_shp is correct:
for i, j, k, l in itertools.product(range(is_shp.shape[0]),range(is_shp.shape[1]),range(is_shp.shape[2]),range(is_shp.shape[3])):
if (k == half_az_win) and (l == half_r_win):
= True
is_shp[i,j,k,l] if (i+k-half_az_win<0) or (i+k-half_az_win>=rslc.shape[0]) or (j+l-half_r_win<0) or (j+l-half_r_win>=rslc.shape[1]):
= False
is_shp[i,j,k,l]
= np.count_nonzero(is_shp,axis=(-2,-1))
shp_num = shp_num >= 3
is_ds_can = is_shp[is_ds_can]
ds_can_is_shp = np.stack(np.where(is_ds_can),axis=-1)
ds_can_idx
print(rslc.shape,ds_can_idx.shape,ds_can_is_shp.shape)
(5, 10, 17) (15, 2) (15, 3, 5)
rslc
is a stack of 17 rslc images. Each of the image has 5 pixel in azimuth dimention and 10 pixels in range dimention. ds_can_idx
shows the index of the DS candidates and ds_can_is_shp
shows the corrosponding SHPs.
= emperical_co_pc(rslc,ds_can_idx,ds_can_is_shp) ds_can_coh
ds_can_coh.shape
(15, 136)
The shape of the coherence matrix is (17,17) while only the upper triangle of it is returned (136 elements).
Or, we can specify the image pairs:
= mr.TempNet.from_bandwidth(rslc.shape[0],bandwidth=3)
tempnet print(tempnet.image_pairs.shape) # 9 rslc image pairs
= emperical_co_pc(rslc,ds_can_idx,ds_can_is_shp,image_pairs=tempnet.image_pairs)
ds_can_coh print(ds_can_coh.shape)
(9, 2)
(15, 9)
Input cupy.ndarray
is also supported:
if is_cuda_available():
= cp.asarray(rslc)
rslc_cp = cp.asarray(ds_can_idx)
ds_can_idx_cp = cp.asarray(ds_can_is_shp)
ds_can_is_shp_cp = emperical_co_pc(rslc_cp,ds_can_idx_cp,ds_can_is_shp_cp) ds_can_coh_cp
uncompress_coh
uncompress_coh (coh:numpy.ndarray, image_pairs:numpy.ndarray=None)
uncompress coh matrix to a hermitian matrix
Type | Default | Details | |
---|---|---|---|
coh | ndarray | compressed coherence stack, dtype: np.complex64 , shape(…,n_image_pair) |
|
image_pairs | ndarray | None | image pairs, dtype: np.int32 , shape: (n_image_pair, 2), all pairs by default |
Returns | ndarray | uncompressed, dtype:np.complex64 , shape(…, n_image_pair) |
Example:
= 5
nimages = mr.TempNet.from_bandwidth(nimages,bandwidth=2)
tempnet print(tempnet.image_pairs.shape)
= np.random.rand(tempnet.image_pairs.shape[0]).astype(np.float32) + 1j*np.random.rand(tempnet.image_pairs.shape[0]).astype(np.float32)
coh = uncompress_coh(coh,tempnet.image_pairs)
uncompressed_coh uncompressed_coh.real
(7, 2)
array([[1. , 0.3080705 , 0.08792049, 0. , 0. ],
[0.3080705 , 1. , 0.8791743 , 0.56917405, 0. ],
[0.08792049, 0.8791743 , 1. , 0.66460145, 0.7009682 ],
[0. , 0.56917405, 0.66460145, 1. , 0.28084692],
[0. , 0. , 0.7009682 , 0.28084692, 1. ]],
dtype=float32)
ad_intf_pc
ad_intf_pc (ref_rslc:numpy.ndarray, sec_rslc:numpy.ndarray, idx:numpy.ndarray, pc_is_shp:numpy.ndarray, block_size:int=128)
Adaptive multilooking interferogram generation for point cloud data.
Type | Default | Details | |
---|---|---|---|
ref_rslc | ndarray | reference rslc, dtype: np.complex64 |
|
sec_rslc | ndarray | secondary rslc, dtype: np.complex64 |
|
idx | ndarray | index of point target (azimuth_index, range_index), dtype: np.int32 , shape: (n_pc, 2) |
|
pc_is_shp | ndarray | shp bool, dtype: bool |
|
block_size | int | 128 | the CUDA block size, it only affects the calculation speed |
Returns | ndarray | coh , dtype:np.complex64 , shape(n_pc, n_image_pair) |
adf_intf_pc
is similar to emperical_co_pc
but only one interferogram generated.
Example:
# synthetic data
= np.random.rand(5,10,17).astype(np.float32) + 1j*np.random.rand(5,10,17).astype(np.float32)
rslc = 1;
half_az_win = 2;
half_r_win
= np.random.choice(a=[False, True], size=(5,10,2*half_az_win+1,2*half_r_win+1), p=[0.9, 0.1])
is_shp # ensure the format of is_shp is correct:
for i, j, k, l in itertools.product(range(is_shp.shape[0]),range(is_shp.shape[1]),range(is_shp.shape[2]),range(is_shp.shape[3])):
if (k == half_az_win) and (l == half_r_win):
= True
is_shp[i,j,k,l] if (i+k-half_az_win<0) or (i+k-half_az_win>=rslc.shape[0]) or (j+l-half_r_win<0) or (j+l-half_r_win>=rslc.shape[1]):
= False
is_shp[i,j,k,l]
= np.count_nonzero(is_shp,axis=(-2,-1))
shp_num = shp_num >= 3
is_ds_can = is_shp[is_ds_can]
ds_can_is_shp = np.stack(np.where(is_ds_can),axis=-1)
ds_can_idx
print(rslc.shape,ds_can_idx.shape,ds_can_is_shp.shape)
(5, 10, 17) (16, 2) (16, 3, 5)
= emperical_co_pc(rslc,ds_can_idx,ds_can_is_shp)
ds_can_coh = ad_intf_pc(rslc[:,:,0],rslc[:,:,1],ds_can_idx,ds_can_is_shp)
inf 0],inf) np.testing.assert_array_equal(ds_can_coh[:,
Covariance and Coherence Matrix Regularizer
isPD
isPD (co:numpy.ndarray)
Type | Details | |
---|---|---|
co | ndarray | absolute value of complex coherence/covariance stack |
Returns | ndarray | bool array indicating wheather coherence/covariance is positive define |
This function is deprecated.
This function tells if the matrix is positive defined or not.
Code for generating data for doc
= zarr.open('../../data/rslc.zarr/','r')[600:650,600:650]
rslc if is_cuda_available():
= cp.asarray(rslc)
rslc
# SHP selection
= 5; r_half_win = 5
az_half_win = 2*az_half_win+1; r_win = 2*r_half_win+1
az_win
= cp.abs(rslc)**2
rmli = cp.sort(rmli,axis=-1)
sorted_rmli = mr.ks_test(sorted_rmli,az_half_win=az_half_win,r_half_win=r_half_win)
p = p < 0.05
is_shp
# Select DS candidate
= cp.count_nonzero(is_shp,axis=(-2,-1))
shp_num = shp_num >= 50
is_ds_can = is_shp[is_ds_can]
ds_can_is_shp = cp.stack(cp.where(is_ds_can),axis=-1)
ds_can_idx
= uncompress_coh(emperical_co_pc(rslc,ds_can_idx,ds_can_is_shp)) ds_can_coh
if is_cuda_available():
print(ds_can_coh.shape)
(149, 17, 17)
if is_cuda_available():
= isPD(ds_can_coh)
isPD_ds_can print(isPD_ds_can)
[ True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True True True True True True True True
True True True True True]
All coherence matrix are positive defined.
nearestPD
nearestPD (co:numpy.ndarray)
Find the nearest positive-definite matrix to input matrix.
Type | Details | |
---|---|---|
co | ndarray | stack of matrix with shape […,N,N] |
Returns | ndarray | nearest positive definite matrix of input, shape […,N,N] |
nearest
means the Frobenius norm of the difference is minimized.
regularize_spectral
regularize_spectral (coh:numpy.ndarray, beta:Union[float,numpy.ndarray])
Spectral regularizer for coherence matrix.
Type | Details | |
---|---|---|
coh | ndarray | stack of matrix with shape […,N,N] |
beta | Union | the regularization parameter, a float number or cupy ndarray with shape […] |
Returns | ndarray | regularized matrix, shape […,N,N] |
This function is deprecated.
regularize_spectral
can regularize the absolute value of coherence matrix for better phase linking. It is first presented in (Zwieback 2022).
Examples:
Code for generating data for doc
= zarr.open('../../data/rslc.zarr/','r')[600:605,600:610]
rslc if is_cuda_available():
= cp.asarray(rslc)
rslc
# SHP selection
= 1; r_half_win = 2
az_half_win = 2*az_half_win+1; r_win = 2*r_half_win+1
az_win
= cp.abs(rslc)**2
rmli = cp.sort(rmli,axis=-1)
sorted_rmli = mr.ks_test(sorted_rmli,az_half_win=az_half_win,r_half_win=r_half_win)
p = p < 0.05
is_shp
= emperical_co(rslc,is_shp) cov,coh
if is_cuda_available():
print(coh.shape)
(5, 10, 17, 17)
if is_cuda_available():
= regularize_spectral(coh,0.1) regularized_coh1
More general, bata
can be a cp.ndarray
:
if is_cuda_available():
= cp.ones(coh.shape[:-2])/10
beta = regularize_spectral(coh,beta) regularized_coh2