Source code for bldfm.utils

import numpy as np
import numba

from bldfm import config


[docs] def compute_wind_fields(u_rot, wind_dir): """ Computes the zonal (u) and meridional (v) wind components from a rotated wind speed and direction using the meteorological convention. Parameters: u_rot (float): Rotated wind speed. wind_dir (float): Wind direction in degrees (meteorological convention: direction the wind is coming FROM, clockwise from north). 0 = from north, 90 = from east, 180 = from south, 270 = from west. Returns: tuple: A tuple (u, v) where: - u (float): Zonal wind component (east-west, positive = eastward). - v (float): Meridional wind component (north-south, positive = northward). """ wind_dir = np.deg2rad(wind_dir) u = -u_rot * np.sin(wind_dir) v = -u_rot * np.cos(wind_dir) return u, v
[docs] def ideal_source(nxy, domain, src_loc=None, shape="diamond"): """ Creates a synthetic source field in the shape of a circle or diamond. Useful for testing purposes. Parameters: nxy (tuple): Number of grid points in the x and y directions (nx, ny). domain (tuple): Physical dimensions of the domain (xmax, ymax). shape (str): Shape of the source field. Options are "circle" or "diamond". Default is "diamond". Returns: numpy.ndarray: A 2D array representing the source field. """ nx, ny = nxy xmx, ymx = domain dx = xmx / nx dy = ymx / ny if src_loc is None: # source in the middle of the domain src_loc = (xmx / 2, ymx / 2) xs, ys = src_loc x = np.linspace(0.0, xmx, nx) y = np.linspace(0.0, ymx, ny) X, Y = np.meshgrid(x, y) q0 = np.zeros([ny, nx]) if shape == "diamond": R0 = xmx / 12 R = np.abs(X - xs) + np.abs(Y - ys) q0 = np.where(R < R0, 1.0, 0.0) if shape == "circle": R0 = xmx / 12 R = np.sqrt((X - xs) ** 2 + (Y - ys) ** 2) q0 = np.where(R < R0, 1.0, 0.0) if shape == "point": sig = 4.0 * dx Rsq = (X - xs) ** 2 + (Y - ys) ** 2 q0 = np.exp(-Rsq / 2.0 / sig**2) / sig / np.sqrt(2.0 * np.pi) return q0
[docs] def point_measurement(f, g): """ Computes the convolution of two 2D arrays evaluated at a specific point. Parameters: f (numpy.ndarray): First 2D array. g (numpy.ndarray): Second 2D array. Returns: float: The result of the convolution at the specified point. """ return np.sum(f * g)
[docs] def parallelize(func): _compiled = {} def wrapper(*args, **kwargs): use_parallel = config.NUM_THREADS > 1 if use_parallel not in _compiled: _compiled[use_parallel] = numba.jit( nopython=True, parallel=use_parallel, cache=True )(func) return _compiled[use_parallel](*args, **kwargs) return wrapper