pl

Phase Linking
from matplotlib import pyplot as plt
import zarr
import colorcet
import moraine.cli as mc
from moraine.utils_ import is_cuda_available

source

emi

 emi (coh:str, ph:str, emi_quality:str, ref:int=0, chunks:int=None,
      cuda:bool=False, processes=None, n_workers=None,
      threads_per_worker=None, rmm_pool_size=0.9, **dask_cluster_arg)

Phase linking with EMI estimator.

Type Default Details
coh str coherence matrix
ph str output, wrapped phase
emi_quality str output, pixel quality
ref int 0 reference image for phase
chunks int None # chunk size of output zarr dataset, optional. Default: same as coh.
cuda bool False if use cuda for processing, false by default
processes NoneType None use process for dask worker over thread, the default is False for cpu, only applied if cuda==False
n_workers NoneType None number of dask worker, the default is 1 for cpu, number of GPU for cuda
threads_per_worker NoneType None number of threads per dask worker, the default is 2 for cpu, only applied if cuda==False
rmm_pool_size float 0.9 set the rmm pool size, only applied when cuda==True
dask_cluster_arg VAR_KEYWORD
ds_can_coh = './co/ds_can_coh.zarr'
ds_can_ph = './pl/ds_can_ph.zarr'
ds_can_emi_quality = './pl/ds_can_emi_quality.zarr'
ref = 0
logger = mc.get_logger()
emi(ds_can_coh,ds_can_ph,ds_can_emi_quality,ref=ref)
2025-06-24 21:49:42 - log_args - INFO - running function: emi
2025-06-24 21:49:42 - log_args - INFO - fetching args:
2025-06-24 21:49:42 - log_args - INFO - coh = './co/ds_can_coh.zarr'
2025-06-24 21:49:42 - log_args - INFO - ph = './pl/ds_can_ph.zarr'
2025-06-24 21:49:42 - log_args - INFO - emi_quality = './pl/ds_can_emi_quality.zarr'
2025-06-24 21:49:42 - log_args - INFO - ref = 0
2025-06-24 21:49:42 - log_args - INFO - chunks = None
2025-06-24 21:49:42 - log_args - INFO - cuda = False
2025-06-24 21:49:42 - log_args - INFO - processes = None
2025-06-24 21:49:42 - log_args - INFO - n_workers = None
2025-06-24 21:49:42 - log_args - INFO - threads_per_worker = None
2025-06-24 21:49:42 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:49:42 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:49:42 - log_args - INFO - fetching args done.
2025-06-24 21:49:42 - zarr_info - INFO - ./co/ds_can_coh.zarr zarray shape, chunks, dtype: (732727, 136), (100000, 1), complex64
2025-06-24 21:49:42 - emi - INFO - starting dask cluster.
2025-06-24 21:49:45 - emi - INFO - dask cluster started.
2025-06-24 21:49:45 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=2, memory=1.46 TiB)
2025-06-24 21:49:45 - darr_info - INFO - coh dask array shape, chunksize, dtype: (732727, 136), (100000, 136), complex64
2025-06-24 21:49:45 - emi - INFO - phase linking with EMI.
2025-06-24 21:49:45 - emi - INFO - got ph and emi_quality.
2025-06-24 21:49:45 - darr_info - INFO - ph dask array shape, chunksize, dtype: (732727, 17), (100000, 17), complex64
2025-06-24 21:49:45 - darr_info - INFO - emi_quality dask array shape, chunksize, dtype: (732727,), (100000,), float32
2025-06-24 21:49:45 - emi - INFO - saving ph and emi_quality.
2025-06-24 21:49:45 - zarr_info - INFO - ./pl/ds_can_ph.zarr zarray shape, chunks, dtype: (732727, 17), (100000, 1), complex64
2025-06-24 21:49:45 - zarr_info - INFO - ./pl/ds_can_emi_quality.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:49:45 - emi - INFO - computing graph setted. doing all the computing.
2025-06-24 21:49:52 - emi - INFO - computing finished.leted |  6.2s
2025-06-24 21:49:52 - emi - INFO - dask cluster closed.
CPU times: user 2min 23s, sys: 12.4 s, total: 2min 35s
Wall time: 9.55 s
if is_cuda_available():
    emi(ds_can_coh,ds_can_ph,ds_can_emi_quality,ref=ref,cuda=True)
2025-06-24 21:49:52 - log_args - INFO - running function: emi
2025-06-24 21:49:52 - log_args - INFO - fetching args:
2025-06-24 21:49:52 - log_args - INFO - coh = './co/ds_can_coh.zarr'
2025-06-24 21:49:52 - log_args - INFO - ph = './pl/ds_can_ph.zarr'
2025-06-24 21:49:52 - log_args - INFO - emi_quality = './pl/ds_can_emi_quality.zarr'
2025-06-24 21:49:52 - log_args - INFO - ref = 0
2025-06-24 21:49:52 - log_args - INFO - chunks = None
2025-06-24 21:49:52 - log_args - INFO - cuda = True
2025-06-24 21:49:52 - log_args - INFO - processes = None
2025-06-24 21:49:52 - log_args - INFO - n_workers = None
2025-06-24 21:49:52 - log_args - INFO - threads_per_worker = None
2025-06-24 21:49:52 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:49:52 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:49:52 - log_args - INFO - fetching args done.
2025-06-24 21:49:52 - zarr_info - INFO - ./co/ds_can_coh.zarr zarray shape, chunks, dtype: (732727, 136), (100000, 1), complex64
2025-06-24 21:49:52 - emi - INFO - starting dask cluster.
2025-06-24 21:49:58 - emi - INFO - dask cluster started.
2025-06-24 21:49:58 - dask_cluster_info - INFO - dask cluster: LocalCUDACluster(dashboard_link='http://127.0.0.1:8787/status', workers=8, threads=8, memory=1.46 TiB)
2025-06-24 21:49:58 - darr_info - INFO - coh dask array shape, chunksize, dtype: (732727, 136), (100000, 136), complex64
2025-06-24 21:49:58 - emi - INFO - phase linking with EMI.
2025-06-24 21:49:58 - emi - INFO - got ph and emi_quality.
2025-06-24 21:49:58 - darr_info - INFO - ph dask array shape, chunksize, dtype: (732727, 17), (100000, 17), complex64
2025-06-24 21:49:58 - darr_info - INFO - emi_quality dask array shape, chunksize, dtype: (732727,), (100000,), float32
2025-06-24 21:49:58 - emi - INFO - saving ph and emi_quality.
2025-06-24 21:49:59 - zarr_info - INFO - ./pl/ds_can_ph.zarr zarray shape, chunks, dtype: (732727, 17), (100000, 1), complex64
2025-06-24 21:49:59 - zarr_info - INFO - ./pl/ds_can_emi_quality.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:49:59 - emi - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:03 - emi - INFO - computing finished.leted |  3.9s
2025-06-24 21:50:05 - emi - INFO - dask cluster closed.
CPU times: user 656 ms, sys: 1.26 s, total: 1.92 s
Wall time: 13.3 s
ds_can_gix = './shp/ds_can_gix.zarr'
emi_quality = './pl/emi_quality.zarr'
rslc_zarr = zarr.open('./raw/rslc.zarr/','r')
mc.pc2ras(ds_can_gix, ds_can_emi_quality,emi_quality,rslc_zarr.shape[:2])
2025-06-24 21:50:05 - log_args - INFO - running function: pc2ras
2025-06-24 21:50:05 - log_args - INFO - fetching args:
2025-06-24 21:50:05 - log_args - INFO - idx = './shp/ds_can_gix.zarr'
2025-06-24 21:50:05 - log_args - INFO - pc = './pl/ds_can_emi_quality.zarr'
2025-06-24 21:50:05 - log_args - INFO - ras = './pl/emi_quality.zarr'
2025-06-24 21:50:05 - log_args - INFO - shape = (2500, 1834)
2025-06-24 21:50:05 - log_args - INFO - chunks = (1000, 1000)
2025-06-24 21:50:05 - log_args - INFO - processes = False
2025-06-24 21:50:05 - log_args - INFO - n_workers = 1
2025-06-24 21:50:05 - log_args - INFO - threads_per_worker = 1
2025-06-24 21:50:05 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:05 - log_args - INFO - fetching args done.
2025-06-24 21:50:05 - zarr_info - INFO - ./shp/ds_can_gix.zarr zarray shape, chunks, dtype: (732727, 2), (100000, 1), int32
2025-06-24 21:50:05 - pc2ras - INFO - loading gix into memory.
2025-06-24 21:50:05 - pc2ras - INFO - starting dask local cluster.
2025-06-24 21:50:05 - pc2ras - INFO - dask local cluster started.
2025-06-24 21:50:05 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=1, memory=1.46 TiB)
2025-06-24 21:50:05 - pc2ras - INFO - start to work on ./pl/ds_can_emi_quality.zarr
2025-06-24 21:50:05 - zarr_info - INFO - ./pl/ds_can_emi_quality.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:50:05 - darr_info - INFO - pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:05 - pc2ras - INFO - create ras dask array
2025-06-24 21:50:05 - darr_info - INFO - ras dask array shape, chunksize, dtype: (2500, 1834), (2500, 1834), float32
2025-06-24 21:50:05 - pc2ras - INFO - save ras to ./pl/emi_quality.zarr
2025-06-24 21:50:05 - zarr_info - INFO - ./pl/emi_quality.zarr zarray shape, chunks, dtype: (2500, 1834), (1000, 1000), float32
2025-06-24 21:50:05 - pc2ras - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:05 - pc2ras - INFO - computing finished.ed |  0.1s
2025-06-24 21:50:05 - pc2ras - INFO - dask cluster closed.
emi_zarr = zarr.open('./pl/emi_quality.zarr','r')
fig, ax = plt.subplots(1,1,figsize=(10,10))
pcm = ax.imshow(emi_zarr[:],interpolation='nearest',vmin=1.0,vmax=1.3,cmap=colorcet.cm.fire)
ax.set(title='EMI quality factor',xlabel='Range Index',ylabel='Azimuth Index')
fig.colorbar(pcm)
plt.show()


source

ds_temp_coh

 ds_temp_coh (coh:str, ph:str, t_coh:str=None, tnet:str=None,
              chunks:int=None, cuda:bool=False, processes=None,
              n_workers=None, threads_per_worker=None, rmm_pool_size=0.9,
              **dask_cluster_arg)

DS temporal coherence.

Type Default Details
coh str coherence matrix
ph str wrapped phase
t_coh str None output, temporal coherence
tnet str None temporal network
chunks int None point cloud chunk size, same as coh by default
cuda bool False if use cuda for processing, false by default
processes NoneType None use process for dask worker over thread, the default is False for cpu, only applied if cuda==False
n_workers NoneType None number of dask worker, the default is 1 for cpu, number of GPU for cuda
threads_per_worker NoneType None number of threads per dask worker, the default is 2 for cpu, only applied if cuda==False
rmm_pool_size float 0.9 set the rmm pool size, only applied when cuda==True
dask_cluster_arg VAR_KEYWORD

This function estimate the temporal coherence of DSs which is defined as (Ferretti et al. 2011):

Ferretti, Alessandro, Alfio Fumagalli, Fabrizio Novali, Claudio Prati, Fabio Rocca, and Alessio Rucci. 2011. “A New Algorithm for Processing Interferometric Data-Stacks: SqueeSAR.” IEEE Transactions on Geoscience and Remote Sensing 49 (9): 3460–70. https://doi.org/10.1109/TGRS.2011.2124465.

\[\gamma = \frac{1}{N^2-N} \sum_{n=1}^{N} \sum_{k \neq n}^{N} e^{i\phi_{nk}} e^{-i(\theta_n-\theta_k)}\]

Where \(\phi_{nk}\) is the phase of complex coherence matrix and \(\theta_{n}\) is the phase after phase linking.

ds_can_coh = './co/ds_can_coh.zarr'
ds_can_ph = './pl/ds_can_ph.zarr'
ds_can_t_coh = './pl/ds_can_t_coh.zarr'
ds_temp_coh(ds_can_coh,ds_can_ph, ds_can_t_coh)
if is_cuda_available():
    ds_temp_coh(ds_can_coh,ds_can_ph, ds_can_t_coh,cuda=True)
2025-06-24 21:50:06 - log_args - INFO - running function: ds_temp_coh
2025-06-24 21:50:06 - log_args - INFO - fetching args:
2025-06-24 21:50:06 - log_args - INFO - coh = './co/ds_can_coh.zarr'
2025-06-24 21:50:06 - log_args - INFO - ph = './pl/ds_can_ph.zarr'
2025-06-24 21:50:06 - log_args - INFO - t_coh = './pl/ds_can_t_coh.zarr'
2025-06-24 21:50:06 - log_args - INFO - tnet = None
2025-06-24 21:50:06 - log_args - INFO - chunks = None
2025-06-24 21:50:06 - log_args - INFO - cuda = False
2025-06-24 21:50:06 - log_args - INFO - processes = None
2025-06-24 21:50:06 - log_args - INFO - n_workers = None
2025-06-24 21:50:06 - log_args - INFO - threads_per_worker = None
2025-06-24 21:50:06 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:50:06 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:06 - log_args - INFO - fetching args done.
2025-06-24 21:50:06 - zarr_info - INFO - ./co/ds_can_coh.zarr zarray shape, chunks, dtype: (732727, 136), (100000, 1), complex64
2025-06-24 21:50:06 - zarr_info - INFO - ./pl/ds_can_ph.zarr zarray shape, chunks, dtype: (732727, 17), (100000, 1), complex64
2025-06-24 21:50:06 - ds_temp_coh - INFO - starting dask local cluster.
2025-06-24 21:50:06 - ds_temp_coh - INFO - dask local cluster started.
2025-06-24 21:50:06 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=2, memory=1.46 TiB)
2025-06-24 21:50:06 - darr_info - INFO - coh dask array shape, chunksize, dtype: (732727, 136), (100000, 136), complex64
2025-06-24 21:50:06 - darr_info - INFO - ph dask array shape, chunksize, dtype: (732727, 17), (100000, 17), complex64
2025-06-24 21:50:06 - ds_temp_coh - INFO - Estimate temporal coherence for DS.
2025-06-24 21:50:06 - ds_temp_coh - INFO - got temporal coherence t_coh.
2025-06-24 21:50:06 - darr_info - INFO - t_coh dask array shape, chunksize, dtype: (732727,), (100000,), float32
2025-06-24 21:50:06 - ds_temp_coh - INFO - saving t_coh.
2025-06-24 21:50:06 - ds_temp_coh - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:08 - ds_temp_coh - INFO - computing finished. 1.5s
2025-06-24 21:50:08 - ds_temp_coh - INFO - dask cluster closed.
2025-06-24 21:50:08 - log_args - INFO - running function: ds_temp_coh
2025-06-24 21:50:08 - log_args - INFO - fetching args:
2025-06-24 21:50:08 - log_args - INFO - coh = './co/ds_can_coh.zarr'
2025-06-24 21:50:08 - log_args - INFO - ph = './pl/ds_can_ph.zarr'
2025-06-24 21:50:08 - log_args - INFO - t_coh = './pl/ds_can_t_coh.zarr'
2025-06-24 21:50:08 - log_args - INFO - tnet = None
2025-06-24 21:50:08 - log_args - INFO - chunks = None
2025-06-24 21:50:08 - log_args - INFO - cuda = True
2025-06-24 21:50:08 - log_args - INFO - processes = None
2025-06-24 21:50:08 - log_args - INFO - n_workers = None
2025-06-24 21:50:08 - log_args - INFO - threads_per_worker = None
2025-06-24 21:50:08 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:50:08 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:08 - log_args - INFO - fetching args done.
2025-06-24 21:50:08 - zarr_info - INFO - ./co/ds_can_coh.zarr zarray shape, chunks, dtype: (732727, 136), (100000, 1), complex64
2025-06-24 21:50:08 - zarr_info - INFO - ./pl/ds_can_ph.zarr zarray shape, chunks, dtype: (732727, 17), (100000, 1), complex64
2025-06-24 21:50:08 - ds_temp_coh - INFO - starting dask local cluster.
2025-06-24 21:50:14 - ds_temp_coh - INFO - dask local cluster started.
2025-06-24 21:50:14 - dask_cluster_info - INFO - dask cluster: LocalCUDACluster(dashboard_link='http://127.0.0.1:8787/status', workers=8, threads=8, memory=1.46 TiB)
2025-06-24 21:50:14 - darr_info - INFO - coh dask array shape, chunksize, dtype: (732727, 136), (100000, 136), complex64
2025-06-24 21:50:14 - darr_info - INFO - ph dask array shape, chunksize, dtype: (732727, 17), (100000, 17), complex64
2025-06-24 21:50:14 - ds_temp_coh - INFO - Estimate temporal coherence for DS.
2025-06-24 21:50:14 - ds_temp_coh - INFO - got temporal coherence t_coh.
2025-06-24 21:50:14 - darr_info - INFO - t_coh dask array shape, chunksize, dtype: (732727,), (100000,), float32
2025-06-24 21:50:14 - ds_temp_coh - INFO - saving t_coh.
2025-06-24 21:50:14 - ds_temp_coh - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:15 - ds_temp_coh - INFO - computing finished. 1.5s
2025-06-24 21:50:17 - ds_temp_coh - INFO - dask cluster closed.
ds_can_gix = './shp/ds_can_gix.zarr'
t_coh = './pl/t_coh.zarr'
rslc_zarr = zarr.open('./raw/rslc.zarr/','r')
mc.pc2ras(ds_can_gix, ds_can_t_coh,t_coh,rslc_zarr.shape[:2])
2025-06-24 21:50:17 - log_args - INFO - running function: pc2ras
2025-06-24 21:50:17 - log_args - INFO - fetching args:
2025-06-24 21:50:17 - log_args - INFO - idx = './shp/ds_can_gix.zarr'
2025-06-24 21:50:17 - log_args - INFO - pc = './pl/ds_can_t_coh.zarr'
2025-06-24 21:50:17 - log_args - INFO - ras = './pl/t_coh.zarr'
2025-06-24 21:50:17 - log_args - INFO - shape = (2500, 1834)
2025-06-24 21:50:17 - log_args - INFO - chunks = (1000, 1000)
2025-06-24 21:50:17 - log_args - INFO - processes = False
2025-06-24 21:50:17 - log_args - INFO - n_workers = 1
2025-06-24 21:50:17 - log_args - INFO - threads_per_worker = 1
2025-06-24 21:50:17 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:17 - log_args - INFO - fetching args done.
2025-06-24 21:50:17 - zarr_info - INFO - ./shp/ds_can_gix.zarr zarray shape, chunks, dtype: (732727, 2), (100000, 1), int32
2025-06-24 21:50:17 - pc2ras - INFO - loading gix into memory.
2025-06-24 21:50:17 - pc2ras - INFO - starting dask local cluster.
2025-06-24 21:50:18 - pc2ras - INFO - dask local cluster started.
2025-06-24 21:50:18 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=1, memory=1.46 TiB)
2025-06-24 21:50:18 - pc2ras - INFO - start to work on ./pl/ds_can_t_coh.zarr
2025-06-24 21:50:18 - zarr_info - INFO - ./pl/ds_can_t_coh.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:50:18 - darr_info - INFO - pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:18 - pc2ras - INFO - create ras dask array
2025-06-24 21:50:18 - darr_info - INFO - ras dask array shape, chunksize, dtype: (2500, 1834), (2500, 1834), float32
2025-06-24 21:50:18 - pc2ras - INFO - save ras to ./pl/t_coh.zarr
2025-06-24 21:50:18 - zarr_info - INFO - ./pl/t_coh.zarr zarray shape, chunks, dtype: (2500, 1834), (1000, 1000), float32
2025-06-24 21:50:18 - pc2ras - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:18 - pc2ras - INFO - computing finished.ed |  0.1s
2025-06-24 21:50:18 - pc2ras - INFO - dask cluster closed.
t_coh_zarr = zarr.open(t_coh,'r')
fig, ax = plt.subplots(1,1,figsize=(10,10))
pcm = ax.imshow(t_coh_zarr[:],interpolation='nearest')
ax.set(title='DS temporal coherence',xlabel='Range Index',ylabel='Azimuth Index')
fig.colorbar(pcm)
plt.show()


source

emperical_co_emi_temp_coh_pc

 emperical_co_emi_temp_coh_pc (rslc:str, is_shp_dir:str, gix:str,
                               ph_dir:str, emi_quality_dir:str,
                               t_coh_dir:str, batch_size:int=1000,
                               chunks:int=None, cuda:bool=False,
                               processes=None, n_workers=None,
                               threads_per_worker=None, rmm_pool_size=0.9,
                               **dask_cluster_arg)

estimating emperical coherence matrix, phase linking and estimating temporal coherence on point cloud data.

Type Default Details
rslc str input: rslc stack, shape (nlines, width, nimages)
is_shp_dir str input: directory for bool array indicating the SHPs of pc
gix str input: bool array indicating pc, shape (2, n_points)
ph_dir str output: directory that hold complex coherence matrix for pc
emi_quality_dir str output: directory that hold emi quality
t_coh_dir str output: directory that hold temporal coherence
batch_size int 1000 input, batch size
chunks int None parallel processing azimuth/range chunk size, optional. Default: rslc.chunks[:2]
cuda bool False if use cuda for processing, false by default
processes NoneType None use process for dask worker over thread, the default is False for cpu, only applied if cuda==False
n_workers NoneType None number of dask worker, the default is 1 for cpu, number of GPU for cuda
threads_per_worker NoneType None number of threads per dask worker, the default is 2 for cpu, only applied if cuda==False
rmm_pool_size float 0.9 set the rmm pool size, only applied when cuda==True
dask_cluster_arg VAR_KEYWORD
rslc = './raw/rslc.zarr/'
ds_can_is_shp_dir = './co/ds_can_is_shp/'
ds_can_ph_dir = './pl/ds_can_ph'
ds_can_emi_quality_dir = './pl/ds_can_emi_quality'
ds_can_t_coh_dir = './pl/ds_can_t_coh'
emperical_co_emi_temp_coh_pc(rslc,ds_can_is_shp_dir,ds_can_gix,ds_can_ph_dir,ds_can_emi_quality_dir,ds_can_t_coh_dir,chunks=(1000,1000))
2025-06-24 21:50:18 - log_args - INFO - running function: emperical_co_emi_temp_coh_pc
2025-06-24 21:50:18 - log_args - INFO - fetching args:
2025-06-24 21:50:18 - log_args - INFO - rslc = './raw/rslc.zarr/'
2025-06-24 21:50:18 - log_args - INFO - is_shp_dir = './co/ds_can_is_shp/'
2025-06-24 21:50:18 - log_args - INFO - gix = './shp/ds_can_gix.zarr'
2025-06-24 21:50:18 - log_args - INFO - ph_dir = './pl/ds_can_ph'
2025-06-24 21:50:18 - log_args - INFO - emi_quality_dir = './pl/ds_can_emi_quality'
2025-06-24 21:50:18 - log_args - INFO - t_coh_dir = './pl/ds_can_t_coh'
2025-06-24 21:50:18 - log_args - INFO - batch_size = 1000
2025-06-24 21:50:18 - log_args - INFO - chunks = (1000, 1000)
2025-06-24 21:50:18 - log_args - INFO - cuda = False
2025-06-24 21:50:18 - log_args - INFO - processes = None
2025-06-24 21:50:18 - log_args - INFO - n_workers = None
2025-06-24 21:50:18 - log_args - INFO - threads_per_worker = None
2025-06-24 21:50:18 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:50:18 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:18 - log_args - INFO - fetching args done.
2025-06-24 21:50:18 - zarr_info - INFO - ./raw/rslc.zarr/ zarray shape, chunks, dtype: (2500, 1834, 17), (1000, 1000, 1), complex64
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - azimuth window size and half azimuth window size: 11, 5
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - range window size and half range window size: 11, 5
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - parallel processing azimuth chunk size: 1000
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - parallel processing range chunk size: 1000
2025-06-24 21:50:18 - zarr_info - INFO - ./shp/ds_can_gix.zarr zarray shape, chunks, dtype: (732727, 2), (100000, 1), int32
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - loading gix into memory.
2025-06-24 21:50:18 - emperical_co_emi_temp_coh_pc - INFO - convert gix to the order of ras chunk
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - starting dask cluster.
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - dask cluster started.
2025-06-24 21:50:25 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=2, memory=1.46 TiB)
2025-06-24 21:50:25 - darr_info - INFO - rslc_overlap dask array shape, chunksize, dtype: (2520, 1844, 17), (1010, 1005, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - gix in ras chunk order dask array shape, chunksize, dtype: (732727, 2), (201403, 2), int32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - estimating coherence matrix chunk by chunk.
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 0 dask array shape, chunksize, dtype: (201097, 11, 11), (201097, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 0 dask array shape, chunksize, dtype: (201097, 17), (201097, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 0 dask array shape, chunksize, dtype: (201097,), (201097,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 0 dask array shape, chunksize, dtype: (201097,), (201097,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 0
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/0.zarr zarray shape, chunks, dtype: (201097, 17), (201097, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/0.zarr zarray shape, chunks, dtype: (201097,), (201097,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/0.zarr zarray shape, chunks, dtype: (201097,), (201097,), float32
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 1 dask array shape, chunksize, dtype: (137562, 11, 11), (137562, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 1 dask array shape, chunksize, dtype: (137562, 17), (137562, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 1 dask array shape, chunksize, dtype: (137562,), (137562,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 1 dask array shape, chunksize, dtype: (137562,), (137562,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 1
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/1.zarr zarray shape, chunks, dtype: (137562, 17), (137562, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/1.zarr zarray shape, chunks, dtype: (137562,), (137562,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/1.zarr zarray shape, chunks, dtype: (137562,), (137562,), float32
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 2 dask array shape, chunksize, dtype: (201403, 11, 11), (201403, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 2 dask array shape, chunksize, dtype: (201403, 17), (201403, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 2 dask array shape, chunksize, dtype: (201403,), (201403,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 2 dask array shape, chunksize, dtype: (201403,), (201403,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 2
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/2.zarr zarray shape, chunks, dtype: (201403, 17), (201403, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/2.zarr zarray shape, chunks, dtype: (201403,), (201403,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/2.zarr zarray shape, chunks, dtype: (201403,), (201403,), float32
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 3 dask array shape, chunksize, dtype: (73518, 11, 11), (73518, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 3 dask array shape, chunksize, dtype: (73518, 17), (73518, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 3 dask array shape, chunksize, dtype: (73518,), (73518,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 3 dask array shape, chunksize, dtype: (73518,), (73518,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 3
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/3.zarr zarray shape, chunks, dtype: (73518, 17), (73518, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/3.zarr zarray shape, chunks, dtype: (73518,), (73518,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/3.zarr zarray shape, chunks, dtype: (73518,), (73518,), float32
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 4 dask array shape, chunksize, dtype: (78445, 11, 11), (78445, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 4 dask array shape, chunksize, dtype: (78445, 17), (78445, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 4 dask array shape, chunksize, dtype: (78445,), (78445,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 4 dask array shape, chunksize, dtype: (78445,), (78445,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 4
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/4.zarr zarray shape, chunks, dtype: (78445, 17), (78445, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/4.zarr zarray shape, chunks, dtype: (78445,), (78445,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/4.zarr zarray shape, chunks, dtype: (78445,), (78445,), float32
2025-06-24 21:50:25 - darr_info - INFO - is_shp for chunk 5 dask array shape, chunksize, dtype: (40702, 11, 11), (40702, 11, 11), bool
2025-06-24 21:50:25 - darr_info - INFO - ph for chunk 5 dask array shape, chunksize, dtype: (40702, 17), (40702, 17), complex64
2025-06-24 21:50:25 - darr_info - INFO - emi_quality for chunk 5 dask array shape, chunksize, dtype: (40702,), (40702,), float32
2025-06-24 21:50:25 - darr_info - INFO - t_coh for chunk 5 dask array shape, chunksize, dtype: (40702,), (40702,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 5
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_ph/5.zarr zarray shape, chunks, dtype: (40702, 17), (40702, 1), complex64
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_emi_quality/5.zarr zarray shape, chunks, dtype: (40702,), (40702,), float32
2025-06-24 21:50:25 - zarr_info - INFO - pl/ds_can_t_coh/5.zarr zarray shape, chunks, dtype: (40702,), (40702,), float32
2025-06-24 21:50:25 - emperical_co_emi_temp_coh_pc - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - computing finished.
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - dask cluster closed.
CPU times: user 6min 54s, sys: 16.2 s, total: 7min 11s
Wall time: 12.8 s
if is_cuda_available():
    emperical_co_emi_temp_coh_pc(rslc,ds_can_is_shp_dir,ds_can_gix,ds_can_ph_dir,ds_can_emi_quality_dir,ds_can_t_coh_dir,chunks=(1000,1000),cuda=True)
2025-06-24 21:50:31 - log_args - INFO - running function: emperical_co_emi_temp_coh_pc
2025-06-24 21:50:31 - log_args - INFO - fetching args:
2025-06-24 21:50:31 - log_args - INFO - rslc = './raw/rslc.zarr/'
2025-06-24 21:50:31 - log_args - INFO - is_shp_dir = './co/ds_can_is_shp/'
2025-06-24 21:50:31 - log_args - INFO - gix = './shp/ds_can_gix.zarr'
2025-06-24 21:50:31 - log_args - INFO - ph_dir = './pl/ds_can_ph'
2025-06-24 21:50:31 - log_args - INFO - emi_quality_dir = './pl/ds_can_emi_quality'
2025-06-24 21:50:31 - log_args - INFO - t_coh_dir = './pl/ds_can_t_coh'
2025-06-24 21:50:31 - log_args - INFO - batch_size = 1000
2025-06-24 21:50:31 - log_args - INFO - chunks = (1000, 1000)
2025-06-24 21:50:31 - log_args - INFO - cuda = True
2025-06-24 21:50:31 - log_args - INFO - processes = None
2025-06-24 21:50:31 - log_args - INFO - n_workers = None
2025-06-24 21:50:31 - log_args - INFO - threads_per_worker = None
2025-06-24 21:50:31 - log_args - INFO - rmm_pool_size = 0.9
2025-06-24 21:50:31 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:31 - log_args - INFO - fetching args done.
2025-06-24 21:50:31 - zarr_info - INFO - ./raw/rslc.zarr/ zarray shape, chunks, dtype: (2500, 1834, 17), (1000, 1000, 1), complex64
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - azimuth window size and half azimuth window size: 11, 5
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - range window size and half range window size: 11, 5
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - parallel processing azimuth chunk size: 1000
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - parallel processing range chunk size: 1000
2025-06-24 21:50:31 - zarr_info - INFO - ./shp/ds_can_gix.zarr zarray shape, chunks, dtype: (732727, 2), (100000, 1), int32
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - loading gix into memory.
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - convert gix to the order of ras chunk
2025-06-24 21:50:31 - emperical_co_emi_temp_coh_pc - INFO - starting dask cluster.
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - dask cluster started.
2025-06-24 21:50:38 - dask_cluster_info - INFO - dask cluster: LocalCUDACluster(dashboard_link='http://127.0.0.1:8787/status', workers=8, threads=8, memory=1.46 TiB)
2025-06-24 21:50:38 - darr_info - INFO - rslc_overlap dask array shape, chunksize, dtype: (2520, 1844, 17), (1010, 1005, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - gix in ras chunk order dask array shape, chunksize, dtype: (732727, 2), (201403, 2), int32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - estimating coherence matrix chunk by chunk.
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 0 dask array shape, chunksize, dtype: (201097, 11, 11), (201097, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 0 dask array shape, chunksize, dtype: (201097, 17), (201097, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 0 dask array shape, chunksize, dtype: (201097,), (201097,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 0 dask array shape, chunksize, dtype: (201097,), (201097,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 0
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/0.zarr zarray shape, chunks, dtype: (201097, 17), (201097, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/0.zarr zarray shape, chunks, dtype: (201097,), (201097,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/0.zarr zarray shape, chunks, dtype: (201097,), (201097,), float32
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 1 dask array shape, chunksize, dtype: (137562, 11, 11), (137562, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 1 dask array shape, chunksize, dtype: (137562, 17), (137562, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 1 dask array shape, chunksize, dtype: (137562,), (137562,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 1 dask array shape, chunksize, dtype: (137562,), (137562,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 1
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/1.zarr zarray shape, chunks, dtype: (137562, 17), (137562, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/1.zarr zarray shape, chunks, dtype: (137562,), (137562,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/1.zarr zarray shape, chunks, dtype: (137562,), (137562,), float32
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 2 dask array shape, chunksize, dtype: (201403, 11, 11), (201403, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 2 dask array shape, chunksize, dtype: (201403, 17), (201403, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 2 dask array shape, chunksize, dtype: (201403,), (201403,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 2 dask array shape, chunksize, dtype: (201403,), (201403,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 2
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/2.zarr zarray shape, chunks, dtype: (201403, 17), (201403, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/2.zarr zarray shape, chunks, dtype: (201403,), (201403,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/2.zarr zarray shape, chunks, dtype: (201403,), (201403,), float32
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 3 dask array shape, chunksize, dtype: (73518, 11, 11), (73518, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 3 dask array shape, chunksize, dtype: (73518, 17), (73518, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 3 dask array shape, chunksize, dtype: (73518,), (73518,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 3 dask array shape, chunksize, dtype: (73518,), (73518,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 3
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/3.zarr zarray shape, chunks, dtype: (73518, 17), (73518, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/3.zarr zarray shape, chunks, dtype: (73518,), (73518,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/3.zarr zarray shape, chunks, dtype: (73518,), (73518,), float32
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 4 dask array shape, chunksize, dtype: (78445, 11, 11), (78445, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 4 dask array shape, chunksize, dtype: (78445, 17), (78445, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 4 dask array shape, chunksize, dtype: (78445,), (78445,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 4 dask array shape, chunksize, dtype: (78445,), (78445,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 4
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/4.zarr zarray shape, chunks, dtype: (78445, 17), (78445, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/4.zarr zarray shape, chunks, dtype: (78445,), (78445,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/4.zarr zarray shape, chunks, dtype: (78445,), (78445,), float32
2025-06-24 21:50:38 - darr_info - INFO - is_shp for chunk 5 dask array shape, chunksize, dtype: (40702, 11, 11), (40702, 11, 11), bool
2025-06-24 21:50:38 - darr_info - INFO - ph for chunk 5 dask array shape, chunksize, dtype: (40702, 17), (40702, 17), complex64
2025-06-24 21:50:38 - darr_info - INFO - emi_quality for chunk 5 dask array shape, chunksize, dtype: (40702,), (40702,), float32
2025-06-24 21:50:38 - darr_info - INFO - t_coh for chunk 5 dask array shape, chunksize, dtype: (40702,), (40702,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - saving ph, emi_quality, t_coh for chunk 5
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_ph/5.zarr zarray shape, chunks, dtype: (40702, 17), (40702, 1), complex64
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_emi_quality/5.zarr zarray shape, chunks, dtype: (40702,), (40702,), float32
2025-06-24 21:50:38 - zarr_info - INFO - pl/ds_can_t_coh/5.zarr zarray shape, chunks, dtype: (40702,), (40702,), float32
2025-06-24 21:50:38 - emperical_co_emi_temp_coh_pc - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:44 - emperical_co_emi_temp_coh_pc - INFO - computing finished.
2025-06-24 21:50:44,947 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1252, in heartbeat
    response = await retry_operation(
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 455, in retry_operation
    return await retry(
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry
    return await coro()
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1395, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1154, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 236, in read
    convert_stream_closed_error(self, e)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:42264 remote=tcp://127.0.0.1:38831>: Stream is closed
2025-06-24 21:50:44,951 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/worker.py", line 1252, in heartbeat
    response = await retry_operation(
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 455, in retry_operation
    return await retry(
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry
    return await coro()
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1395, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/core.py", line 1154, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 236, in read
    convert_stream_closed_error(self, e)
  File "/users/kangl/miniforge3/envs/work/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:42228 remote=tcp://127.0.0.1:38831>: Stream is closed
2025-06-24 21:50:47 - emperical_co_emi_temp_coh_pc - INFO - dask cluster closed.

Then concat the result:

ds_can_ph_ = './pl/ds_can_ph_.zarr'
ds_can_emi_quality_ = './pl/ds_can_emi_quality_.zarr'
ds_can_t_coh_ = './pl/ds_can_t_coh_.zarr'
ds_can_key = './co/ds_can_key.zarr'
chunks = zarr.open(ds_can_gix,'r').chunks[0]
mc.pc_concat(
    [ds_can_ph_dir,ds_can_emi_quality_dir,ds_can_t_coh_dir],
    [ds_can_ph_,ds_can_emi_quality_,ds_can_t_coh_],
    key=ds_can_key,
    chunks=chunks)
2025-06-24 21:50:47 - log_args - INFO - running function: pc_concat
2025-06-24 21:50:47 - log_args - INFO - fetching args:
2025-06-24 21:50:47 - log_args - INFO - pcs = ['./pl/ds_can_ph', './pl/ds_can_emi_quality', './pl/ds_can_t_coh']
2025-06-24 21:50:47 - log_args - INFO - pc = ['./pl/ds_can_ph_.zarr', './pl/ds_can_emi_quality_.zarr', './pl/ds_can_t_coh_.zarr']
2025-06-24 21:50:47 - log_args - INFO - key = './co/ds_can_key.zarr'
2025-06-24 21:50:47 - log_args - INFO - chunks = 100000
2025-06-24 21:50:47 - log_args - INFO - processes = False
2025-06-24 21:50:47 - log_args - INFO - n_workers = 1
2025-06-24 21:50:47 - log_args - INFO - threads_per_worker = 1
2025-06-24 21:50:47 - log_args - INFO - dask_cluster_arg = {}
2025-06-24 21:50:47 - log_args - INFO - fetching args done.
2025-06-24 21:50:47 - pc_concat - INFO - input pcs: [[Path('pl/ds_can_ph/0.zarr'), Path('pl/ds_can_ph/1.zarr'), Path('pl/ds_can_ph/2.zarr'), Path('pl/ds_can_ph/3.zarr'), Path('pl/ds_can_ph/4.zarr'), Path('pl/ds_can_ph/5.zarr')], [Path('pl/ds_can_emi_quality/0.zarr'), Path('pl/ds_can_emi_quality/1.zarr'), Path('pl/ds_can_emi_quality/2.zarr'), Path('pl/ds_can_emi_quality/3.zarr'), Path('pl/ds_can_emi_quality/4.zarr'), Path('pl/ds_can_emi_quality/5.zarr')], [Path('pl/ds_can_t_coh/0.zarr'), Path('pl/ds_can_t_coh/1.zarr'), Path('pl/ds_can_t_coh/2.zarr'), Path('pl/ds_can_t_coh/3.zarr'), Path('pl/ds_can_t_coh/4.zarr'), Path('pl/ds_can_t_coh/5.zarr')]]
2025-06-24 21:50:47 - pc_concat - INFO - output pc: ['./pl/ds_can_ph_.zarr', './pl/ds_can_emi_quality_.zarr', './pl/ds_can_t_coh_.zarr']
2025-06-24 21:50:47 - pc_concat - INFO - load key
2025-06-24 21:50:47 - zarr_info - INFO - ./co/ds_can_key.zarr zarray shape, chunks, dtype: (732727,), (100000,), int64
2025-06-24 21:50:47 - pc_concat - INFO - starting dask local cluster.
2025-06-24 21:50:47 - pc_concat - INFO - dask local cluster started.
2025-06-24 21:50:47 - dask_cluster_info - INFO - dask cluster: LocalCluster(dashboard_link='http://10.211.48.18:8787/status', workers=1, threads=1, memory=1.46 TiB)
2025-06-24 21:50:47 - pc_concat - INFO - read pc from [Path('pl/ds_can_ph/0.zarr'), Path('pl/ds_can_ph/1.zarr'), Path('pl/ds_can_ph/2.zarr'), Path('pl/ds_can_ph/3.zarr'), Path('pl/ds_can_ph/4.zarr'), Path('pl/ds_can_ph/5.zarr')]
2025-06-24 21:50:47 - darr_info - INFO - concatenated pc dask array shape, chunksize, dtype: (732727, 17), (732727, 1), complex64
2025-06-24 21:50:47 - pc_concat - INFO - sort pc according to key
2025-06-24 21:50:47 - darr_info - INFO - sorted pc dask array shape, chunksize, dtype: (732727, 17), (732727, 1), complex64
2025-06-24 21:50:47 - pc_concat - INFO - save pc to ./pl/ds_can_ph_.zarr
2025-06-24 21:50:47 - zarr_info - INFO - ./pl/ds_can_ph_.zarr zarray shape, chunks, dtype: (732727, 17), (100000, 1), complex64
2025-06-24 21:50:47 - pc_concat - INFO - read pc from [Path('pl/ds_can_emi_quality/0.zarr'), Path('pl/ds_can_emi_quality/1.zarr'), Path('pl/ds_can_emi_quality/2.zarr'), Path('pl/ds_can_emi_quality/3.zarr'), Path('pl/ds_can_emi_quality/4.zarr'), Path('pl/ds_can_emi_quality/5.zarr')]
2025-06-24 21:50:47 - darr_info - INFO - concatenated pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:47 - pc_concat - INFO - sort pc according to key
2025-06-24 21:50:47 - darr_info - INFO - sorted pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:47 - pc_concat - INFO - save pc to ./pl/ds_can_emi_quality_.zarr
2025-06-24 21:50:47 - zarr_info - INFO - ./pl/ds_can_emi_quality_.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:50:47 - pc_concat - INFO - read pc from [Path('pl/ds_can_t_coh/0.zarr'), Path('pl/ds_can_t_coh/1.zarr'), Path('pl/ds_can_t_coh/2.zarr'), Path('pl/ds_can_t_coh/3.zarr'), Path('pl/ds_can_t_coh/4.zarr'), Path('pl/ds_can_t_coh/5.zarr')]
2025-06-24 21:50:47 - darr_info - INFO - concatenated pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:47 - pc_concat - INFO - sort pc according to key
2025-06-24 21:50:47 - darr_info - INFO - sorted pc dask array shape, chunksize, dtype: (732727,), (732727,), float32
2025-06-24 21:50:47 - pc_concat - INFO - save pc to ./pl/ds_can_t_coh_.zarr
2025-06-24 21:50:47 - zarr_info - INFO - ./pl/ds_can_t_coh_.zarr zarray shape, chunks, dtype: (732727,), (100000,), float32
2025-06-24 21:50:47 - pc_concat - INFO - computing graph setted. doing all the computing.
2025-06-24 21:50:48 - pc_concat - INFO - computing finished.|  1.0s
2025-06-24 21:50:48 - pc_concat - INFO - dask cluster closed.