收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

" H. ?( ]! y: D/ y9 t; _. L
& R" d& r- t: {" T

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

8 P' [; b* [1 Q! P" V( j % m. X% k# Z( p; ~7 |+ H

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

9 v# `1 ~3 D- c1 r7 A& C" B $ s, n6 V+ o$ o+ Y

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

# I' d, c& ^+ n. Z

其中几个关键点解释一下:

* E1 q) K* D5 \, @! e

(1)首先定义拟合正太分布的函数

2 m4 h6 n @; R2 b0 |& }5 G% r
def norm_fit(data):, T( m2 O* Z7 b/ I2 m loc, scale = st.norm.fit(data) $ ]+ I' y; ]" o% j {! t return np.array([loc, scale])
+ h" L: f7 `9 m! L# W( J

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

: O# t: O1 p, W1 m) V- O2 m

(2)xarray分块

3 z# E( r* |; m. G, d. G
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) : v$ z6 t: R" z V- \8 G" @( W, b x = x.chunk({"lev":5, "lat":100,"lon":100})
2 ]2 T# {) N- v8 {" j

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

8 s1 ]( b) [' z

(3)xarray.apply_ufunc函数

+ f8 K1 V/ ?8 {- D) s: V
result = xr.apply_ufunc(norm_fit, 6 G& {' `0 t+ D# h1 ?7 z9 r x, / Z/ E5 E' y+ f; k2 b input_core_dims=[["time"]],# P; @( F0 C$ a4 ?# Z+ q! @$ V s( n output_core_dims=[["paras"]], + E+ r! M* n K# v- g$ g dask="parallelized",1 t6 n& _. z. H output_dtypes=[np.float], 6 P4 D$ x& M- j$ r2 ?: u$ ^- H* W dask_gufunc_kwargs={output_sizes:{"paras":2}} , f! n5 _, {4 l8 j" o, { c )
. V u) S( z- z' A6 P7 s

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

' I! R- P. u3 k u) f ' E: t* e# R. j& b

以下是全部代码:

* l9 a$ R. b- U. Q# r) U
from scipy import stats as st ' B+ g0 J6 E) c" ` 1 c6 z& d; E! X, X; U) Y: k import xarray as xr * ~2 [& v+ R) C% k0 }+ [* |3 ` import dask8 d$ t+ `# D; g$ G6 f import numpy as np$ G! _# O0 J& R, E. J8 d; ` from dask.diagnostics import ProgressBar ! U1 g+ E% O9 Z5 ?8 C7 t ! L' k6 m) q) ] def norm_fit(data):; S7 K% @% c5 F" U) U& O9 } loc, scale = st.norm.fit(data)- C# U* k$ m- h4 U1 h! q return np.array([loc, scale])- y/ J* Z. ~6 C; L ' |7 G; x ^5 r \- O+ u8 m5 G7 G rs = np.random.RandomState(0) k4 p. O: x. C7 W" [9 s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) : S4 `! g3 B6 i. d3 E! u x = x.chunk({"lev":5, "lat":100,"lon":100})3 A3 ?8 c8 d7 X4 j+ M1 _0 I 2 j }" H7 s3 M( @# l #使用apply_ufunc计算,并用dask的并行计算* I$ k8 V0 a6 E! { result = xr.apply_ufunc(norm_fit, $ x* \) H% ~4 \" ?: @ x, ! F: ]* A1 o9 M! @* C3 n6 _/ g input_core_dims=[["time"]], % I+ Y+ G% j+ M0 } | f output_core_dims=[["paras"]], ' _' p$ ~8 N, F, C/ H dask="parallelized",: h0 n, H& Y3 Q" v, m output_dtypes=[np.float], * W7 z; {% P/ B3 B" p' j1 X dask_gufunc_kwargs={output_sizes:{"paras":2}}: X# R4 Q$ M7 f( M+ A) q! n )) [( Q6 R$ J, P$ R& c4 f 4 f9 x9 C, K* _ #compute进行真实的计算并显示进度 , c* m& B. x4 S! ]. I( j$ R. ~ with ProgressBar(): 6 Q2 Z6 U% ?6 l2 A' V result = results.compute() 1 t9 l" [1 F W7 ?2 y; X7 p2 [! T9 m* b( l' H$ n) ?5 ~7 e2 U #结果冲命名保存到nc文件 6 x8 t' q; a$ s3 P5 B, v result = result.rename("norm_paras")4 G) I: w' H) H8 @- e; q: S$ o result = result.to_netcdf("norm_fit_paras.nc",compute=False)! u! Y; \! a+ j7 i2 A1 W2 q( q8 C with ProgressBar():8 K4 i) Z3 ]" N4 u( q6 t1 Q result.compute()
. [. x4 Y% x) S( }

转自:气海同途

2 K% c8 }6 d: x$ _6 B( j; N

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表