|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 8 I% d5 T: {* d9 o" q, z! T3 q
) `1 Z+ |( T0 F% T# a2 O- C 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 % D0 V% {/ F( M( S0 P D' S
2 O/ D4 E# T* c. ?7 N7 k0 |4 F N3 u
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
" W, l1 y9 M x! c% s
7 G2 p* f% X# ? 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 ' U$ t1 n1 d: i3 K% ~
其中几个关键点解释一下:
3 `( }( a$ L9 K (1)首先定义拟合正太分布的函数 ) j7 `" h" J5 T' l+ P! O
def norm_fit(data):9 y! j& l/ x! ^, M# j4 d r
loc, scale = st.norm.fit(data): V9 m5 W8 }! b. Q3 b; f
return np.array([loc, scale])
- s5 U3 v( ~( {0 s 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
+ c+ t9 h" [) P/ A, r% p (2)xarray分块
^2 i9 a1 ?8 F+ o4 q) ? x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])9 q- f$ h5 Y* u. w) c" M/ N4 G& ]
x = x.chunk({"lev":5, "lat":100,"lon":100})
) F5 C* q, \6 t+ r, V0 C# `8 o1 i xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 " Q/ I. N. K' w1 m5 Q: G2 B
(3)xarray.apply_ufunc函数 1 s5 l& Z3 _% b% b( G5 a
result = xr.apply_ufunc(norm_fit,
u2 E7 A t$ U, u x," ?4 x3 F: Y) e) v3 C5 f: j
input_core_dims=[["time"]],
# }2 V4 e P- Q7 S& B/ |5 d output_core_dims=[["paras"]],( I# ~: c% I# ?0 G$ }8 Y! p! I
dask="parallelized",
* i& J* B7 T: h2 b% w output_dtypes=[np.float],4 U J( ^ i2 h2 Y' m
dask_gufunc_kwargs={output_sizes:{"paras":2}}
7 Y3 ?0 x( @, f/ U ) & @) C5 S1 L. r) x
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) f3 J/ n) ~' L- y
: s0 u; T; K0 z! \ 以下是全部代码: " b1 q# @" }! P/ k
from scipy import stats as st
) V$ [* C9 ^8 H) f
/ |( u- ?( n h0 \( e import xarray as xr6 f0 Z; o8 D7 |* W! e) o/ c6 D
import dask/ X; D3 u9 z# d5 C: w: W
import numpy as np
3 b% @ N- Q. k- l: l. S from dask.diagnostics import ProgressBar
& _6 z6 D- h9 m6 F$ n* I d7 J$ W- K7 g, E% E+ i
def norm_fit(data):
' T4 c* ]# A! ^( l3 V) |( e loc, scale = st.norm.fit(data)" M2 S' S ~9 l& p6 m+ E
return np.array([loc, scale])8 |; `( f3 ?( W% ^6 l+ c* F
0 c8 O' n! g L* E. F/ H
rs = np.random.RandomState(0), f5 q# K6 t# e9 F9 R
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
, v1 ^, }8 m. O6 S/ B, ? x = x.chunk({"lev":5, "lat":100,"lon":100})0 f" k+ ?8 z! A! i! D: V, U
/ h& \' ?& A% y* K #使用apply_ufunc计算,并用dask的并行计算
; E% a' L3 e1 N* j3 [ result = xr.apply_ufunc(norm_fit,$ I' u/ c5 H. Q4 [
x,6 u1 `, h1 ~ |3 ] _5 e
input_core_dims=[["time"]],& A4 ]) F( g6 a, O3 M2 \
output_core_dims=[["paras"]],
e- ~; Y' l, V& l# Y: }; e dask="parallelized",) G w, G1 R7 O. u: B# r
output_dtypes=[np.float],
* r0 O ^- J0 K( A/ X8 N7 \ dask_gufunc_kwargs={output_sizes:{"paras":2}}
4 P" Q9 M- F1 B2 m6 M+ h! ^5 w )* w- f7 i1 g4 d% J5 v
. W7 }0 q5 j0 V& J: _ x
#compute进行真实的计算并显示进度6 Q' ?. K6 ?9 b: }
with ProgressBar():7 x$ H& Q7 \, e" Y% G# E2 k9 R2 r
result = results.compute()
; l, ]" c5 I7 p" I
4 ~! U; N9 P+ ^ #结果冲命名保存到nc文件 b0 b& N2 V1 d
result = result.rename("norm_paras")
& W: o' ]. d* i) M& l result = result.to_netcdf("norm_fit_paras.nc",compute=False)# a b; M& u- e/ X% o* _
with ProgressBar():
S! C) l) q' q6 l. h* v result.compute() * f8 S, t5 J1 Q0 E. R
转自:气海同途 A( U, e& H* `+ X
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |