收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

: n0 H5 L% a; W4 _9 i3 y7 _4 a& o
) Y( x% u" Z; q% k+ Y# Q; _

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

! t$ l, q2 v$ R8 L8 S8 h$ @( M" W9 ~3 C( {5 \

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

" j1 p% Q1 D: P6 W, A ; V5 i, h) d. g! D. Z5 o8 {

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

/ t- G/ b' R3 H+ [" [ W

其中几个关键点解释一下:

* i- S, p* Z" i M9 G/ f

(1)首先定义拟合正太分布的函数

7 K2 Z7 g: l7 x4 `0 B. l% R
def norm_fit(data): ( G, @* k& O5 }4 C0 y loc, scale = st.norm.fit(data) : k8 V; `- l% q9 l( u9 U# M return np.array([loc, scale])
" t6 v7 y/ u! g+ K6 d0 F5 Y* G( J

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

0 m$ j: b) d1 s+ i

(2)xarray分块

6 D; I* X; j8 P* J0 Z1 y0 Z" Q* x
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])4 v" T4 h9 w( ^% `7 y1 l5 C3 y x = x.chunk({"lev":5, "lat":100,"lon":100})
. J# }2 V+ P0 n L$ m2 d t

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

D. }; A) _" V% \1 A

(3)xarray.apply_ufunc函数

' g1 v! f5 @# j! ], A1 J
result = xr.apply_ufunc(norm_fit, , w$ K- R6 l: I, k- e. G. A9 C x, * e8 C8 K+ k, R- Y8 Y* }0 l input_core_dims=[["time"]], 2 s5 j5 o0 ~2 w' ~6 V- l output_core_dims=[["paras"]], 7 O# E2 o% @4 Z7 z+ J8 L0 h* I0 s dask="parallelized", 7 R& |" M U, f output_dtypes=[np.float],. G* u: e0 m& ~% J; U% [8 r7 n0 e dask_gufunc_kwargs={output_sizes:{"paras":2}}- t E& @; A7 o9 Z2 N )
6 P9 p. \3 z6 L3 F

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

# }; G5 z4 q/ }$ K: [$ O3 L+ ]1 r4 G9 Z

以下是全部代码:

L0 z# B9 E; V# T( Y
from scipy import stats as st, L% C% B/ v+ J& |& `; `0 e ) d3 A$ v! Q$ m3 F# |* d import xarray as xr2 Y1 h2 W9 I& r1 g3 X/ e import dask3 O5 O( r; X7 s9 }$ @4 b import numpy as np, x# q8 `' S3 n6 f) X" A$ U( p from dask.diagnostics import ProgressBar3 \3 M5 _1 y6 b A 2 \' u+ `2 N0 p! c" s9 k def norm_fit(data): 8 V3 D: C+ Y1 F/ Y! l loc, scale = st.norm.fit(data)) x$ x" q/ [) V" K4 @ return np.array([loc, scale])1 U+ J, c/ ]9 d ^+ T . Z1 g% v" e8 x" @% w* l rs = np.random.RandomState(0) : z$ C$ l8 g" [+ ^: ~ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])$ V7 ?& A5 k5 x# u8 I: N+ w x = x.chunk({"lev":5, "lat":100,"lon":100})6 c1 e3 E L1 y # T9 z, n- ~5 E6 t8 k! v #使用apply_ufunc计算,并用dask的并行计算- k& G7 [5 v# t% x7 Q$ j6 P1 ^ result = xr.apply_ufunc(norm_fit,# Q/ r f5 S9 e. y# o: ^) g x, ) m/ @9 ^* \( q' J input_core_dims=[["time"]], # N* \7 g- O+ j% H output_core_dims=[["paras"]], - _& A0 P4 X2 i' i dask="parallelized",$ [& ?# o0 V% f# \4 |7 ]3 M, v output_dtypes=[np.float], k- ^" v: J- e4 k3 l4 n2 Y dask_gufunc_kwargs={output_sizes:{"paras":2}}# q; R! f1 B4 e1 O* v8 R ) & j3 g/ U( Q- x 2 i- }4 e1 h8 V' w9 p: b' y #compute进行真实的计算并显示进度5 X! c) O( S! w) ]1 {& X+ G with ProgressBar():: j- f' M. `8 h result = results.compute() 4 ^6 H# T1 g6 M0 ^( g h6 I1 M 2 A' O9 H2 j, H7 w #结果冲命名保存到nc文件. S6 K5 X0 K( T result = result.rename("norm_paras") 6 ]. ^2 h# v' B/ h) o result = result.to_netcdf("norm_fit_paras.nc",compute=False) $ L% {# c9 ]7 _" \% I4 x. s4 r with ProgressBar(): - t9 o! o% e& M' O result.compute()
X( b4 e- u" k: C

转自:气海同途

. J: S) I. x1 [5 I/ R' d" l

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表