|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
1 I. q* Y& F7 a8 i. B7 U & U8 A# c! | B' C0 B, f
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
( e! M2 b9 J _9 {! }2 l+ b+ C1 S. ?- e) e8 o, F" L" O
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 . R& w e0 M& m+ M7 \" R. Z. U1 \
- ^8 y. k" g: p& l7 |0 i! Z" l$ K2 L 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 w* [5 \* R' Y. e
其中几个关键点解释一下:
+ k- @: q' r. Z3 ~ (1)首先定义拟合正太分布的函数 2 u$ X0 p( i$ l1 r2 h" W+ F- i7 K+ S. h+ u
def norm_fit(data):
2 a4 c, p# |0 ^' y# m* W loc, scale = st.norm.fit(data)# }* g3 k- p, F7 |4 d: V) l
return np.array([loc, scale])
' C: e9 ~. h; B% N: c 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
0 Z3 @$ B( R% O8 v) A4 s, T (2)xarray分块
1 { @+ ?9 u0 D) l' f. X# ]. g# o x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
) s# s. v0 y1 H! k! c# o2 F x = x.chunk({"lev":5, "lat":100,"lon":100})
9 k o$ ]2 D4 l) p( C xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
y% m; o+ m* ^- R (3)xarray.apply_ufunc函数
. t7 E' w9 e$ k' k result = xr.apply_ufunc(norm_fit,6 i, \; K" n& S, _7 V
x,: Y: ^9 q$ N7 Z; X1 P
input_core_dims=[["time"]],
6 |. @* S( T+ J9 |. b0 G( ^ output_core_dims=[["paras"]],, `5 {3 g1 e$ }4 a( p
dask="parallelized",
9 P4 ?! s t' G8 R1 `. r output_dtypes=[np.float],
: a& D' t# u, ~4 G dask_gufunc_kwargs={output_sizes:{"paras":2}}# s5 u' P9 r: w- I
)
6 _0 ~5 I% p+ X, A apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
# U" y1 u8 |8 @4 T, T' v
% k! P4 E7 w5 E+ Q 以下是全部代码: 0 U+ ^2 U. v: A" V" M
from scipy import stats as st! Z+ x" m# Q+ g
5 d3 b& F$ |: @& D, B
import xarray as xr/ }! D) k5 D& h. }( X# g7 H% G
import dask" s7 o+ K8 j1 Y i2 _' a
import numpy as np
6 u; ]1 t5 X; v5 \* }( J from dask.diagnostics import ProgressBar
* a a3 Y6 A, Y' W, N5 y6 s: C. n5 l
" ^) Y3 P8 w8 w) G) M def norm_fit(data):
* O: Q6 @& s- J& S- o5 K" T" D loc, scale = st.norm.fit(data)! Y! P% g4 `0 _% x
return np.array([loc, scale])$ g- g0 @; w! I
6 J1 {+ l8 W5 u rs = np.random.RandomState(0)* z- y* L1 X) U- d
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
& m& n! `/ L# f$ r x = x.chunk({"lev":5, "lat":100,"lon":100})0 ~% M6 p3 [! f; f8 f& M7 l
% C" @7 Y9 [/ w: ]# N0 y' ?# x
#使用apply_ufunc计算,并用dask的并行计算9 w- `# d- U1 j( ^6 q) F& B
result = xr.apply_ufunc(norm_fit,6 V% L, g( @" K7 K
x,
1 ^3 w# A. q* Q4 k+ h input_core_dims=[["time"]],4 m7 U) [# w1 k
output_core_dims=[["paras"]],. A1 o: ~9 S6 G
dask="parallelized",
, x7 F, c( y+ u7 G/ g output_dtypes=[np.float],) b1 P: o# Y6 q& b! X! d
dask_gufunc_kwargs={output_sizes:{"paras":2}}
+ V" ~6 w/ j3 {3 L )
. S5 F# V- i. C2 X* D" c, H' L5 l! }0 e" [
#compute进行真实的计算并显示进度+ t- Y, l5 ?6 w* M, g5 O
with ProgressBar():1 G9 Q) h. F# R' Z
result = results.compute()1 G5 b$ D8 a- t5 p
) H1 ]$ D' q+ q$ X- K6 H5 s5 h #结果冲命名保存到nc文件) Z2 |+ X# W+ A) p" ]0 J" a
result = result.rename("norm_paras")
; z4 Z" ?! r' Z' N& Q' B0 t result = result.to_netcdf("norm_fit_paras.nc",compute=False)0 u. r/ s% f5 n6 d
with ProgressBar():
! b; Z) t' z. K+ p7 | result.compute()
/ R0 b) z/ X( {' f0 G 转自:气海同途 ' v$ R5 a3 o" P+ w+ H9 ^
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |