Source code for yt_idv.scene_data.block_collection

from collections import defaultdict

import numpy as np
import traitlets
from yt.data_objects.data_containers import YTDataContainer

from yt_idv.opengl_support import Texture3D, VertexArray, VertexAttribute
from yt_idv.scene_data.base_data import SceneData


[docs] class BlockCollection(SceneData): name = "block_collection" data_source = traitlets.Instance(YTDataContainer) texture_objects = traitlets.Dict(value_trait=traitlets.Instance(Texture3D)) bitmap_objects = traitlets.Dict(value_trait=traitlets.Instance(Texture3D)) blocks = traitlets.Dict(default_value=()) scale = traitlets.Bool(False) blocks_by_grid = traitlets.Instance(defaultdict, (list,)) grids_by_block = traitlets.Dict(default_value=()) _yt_geom_str = traitlets.Unicode("cartesian") compute_min_max = traitlets.Bool(True) always_normalize = traitlets.Bool(False) @traitlets.default("vertex_array") def _default_vertex_array(self): return VertexArray(name="block_info", each=1)
[docs] def add_data(self, field, no_ghost=False): r"""Adds a source of data for the block collection. Given a `data_source` and a `field` to populate from, adds the data to the block collection so that is able to be rendered. Parameters ---------- data_source : YTRegion A YTRegion object to use as a data source. field : string A field to populate from. no_ghost : bool (False) Should we speed things up by skipping ghost zone generation? """ self.data_source.tiles.set_fields([field], [False], no_ghost=no_ghost) self._yt_geom_str = str(self.data_source.ds.geometry) # note: casting to string for compatibility with new and old geometry # attributes (now an enum member in latest yt), # see https://github.com/yt-project/yt/pull/4244 # Every time we change our data source, we wipe all existing ones. # We now set up our vertices into our current data source. vert, dx, le, re = [], [], [], [] min_val = +np.inf max_val = -np.inf if self.scale and self._yt_geom_str == "cartesian": left_min = np.ones(3, "f8") * np.inf right_max = np.ones(3, "f8") * -np.inf for block in self.data_source.tiles.traverse(): np.minimum(left_min, block.LeftEdge, left_min) np.maximum(right_max, block.LeftEdge, right_max) scale = right_max.max() - left_min.min() for block in self.data_source.tiles.traverse(): block.LeftEdge -= left_min block.LeftEdge /= scale block.RightEdge -= left_min block.RightEdge /= scale for i, block in enumerate(self.data_source.tiles.traverse()): min_val = min(min_val, np.nanmin(np.abs(block.my_data[0])).min()) max_val = max(max_val, np.nanmax(np.abs(block.my_data[0])).max()) self.blocks[id(block)] = (i, block) vert.append([1.0, 1.0, 1.0, 1.0]) dds = (block.RightEdge - block.LeftEdge) / block.source_mask.shape dx.append(dds.tolist()) le.append(block.LeftEdge.tolist()) re.append(block.RightEdge.tolist()) for g, node, (sl, _, gi) in self.data_source.tiles.slice_traverse(): block = node.data self.blocks_by_grid[g.id - g._id_offset].append((id(block), gi)) self.grids_by_block[id(node.data)] = (g.id - g._id_offset, sl) if self.compute_min_max: if hasattr(min_val, "in_units"): min_val = min_val.d if hasattr(max_val, "in_units"): max_val = max_val.d self.min_val = min_val self.max_val = max_val # Now we set up our buffer vert = np.array(vert, dtype="f4") dx = np.array(dx, dtype="f4") le = np.array(le) re = np.array(re) if self._yt_geom_str == "cartesian": # Note: the block LeftEdge and RightEdge arrays are plain np arrays in # units of code_length, so need to convert to unitary units (in range 0,1) # after the fact: units = self.data_source.ds.units ratio = (units.code_length / units.unitary).base_value dx = dx * ratio le = le * ratio re = re * ratio LE = np.array([b.LeftEdge for i, b in self.blocks.values()]).min(axis=0) RE = np.array([b.RightEdge for i, b in self.blocks.values()]).max(axis=0) self.diagonal = np.sqrt(((RE - LE) ** 2).sum()) elif self._yt_geom_str == "spherical": rad_index = self.data_source.ds.coordinates.axis_id["r"] max_r = self.data_source.ds.domain_right_edge[rad_index] le[:, rad_index] = le[:, rad_index] / max_r re[:, rad_index] = re[:, rad_index] / max_r dx[:, rad_index] = dx[:, rad_index] / max_r self._set_geometry_attributes(le, re, dx) self.vertex_array.attributes.append( VertexAttribute(name="model_vertex", data=vert) ) self.vertex_array.attributes.append(VertexAttribute(name="in_dx", data=dx)) self.vertex_array.attributes.append( VertexAttribute(name="in_left_edge", data=le.astype("f4")) ) self.vertex_array.attributes.append( VertexAttribute(name="in_right_edge", data=re.astype("f4")) ) # Now we set up our textures self._load_textures()
def _set_geometry_attributes(self, le, re, dx): # set any vertex_array attributes that depend on the yt geometry type # # for spherical coordinates, the radial component of le, re and dx # should already be normalized in the range of (0, 1) if self._yt_geom_str == "cartesian": return elif self._yt_geom_str == "spherical": from yt_idv.utilities.coordinate_utilities import ( SphericalMixedCoordBBox, cartesian_bboxes_edges, ) axis_id = self.data_source.ds.coordinates.axis_id # first, we need an approximation of the grid spacing # in cartesian coordinates. this is used by the # ray tracing engine to determine along-ray step size # so doesn't have to be exact. the ordering also # doesn't matter since it's the min value that will # influence step size. So here, we find some representative # lengths: the change in radius across the element, # the arc lengths of an element using average values of r # and theta where needed (average values avoid the edge case # of 0. values, which will cause the shader to crash) dr = dx[:, axis_id["r"]] rh = (le[:, axis_id["r"]] + re[:, axis_id["r"]]) / 2 rdtheta = rh * dx[:, axis_id["theta"]] th = (le[:, axis_id["theta"]] + re[:, axis_id["theta"]]) / 2 xy = rh * np.sin(th) rdphi = xy * dx[:, axis_id["phi"]] dx_cart = np.column_stack([dr, rdtheta, rdphi]) # cartesian bbox calculations bbox_handler = SphericalMixedCoordBBox() le_cart, re_cart = cartesian_bboxes_edges( bbox_handler, le[:, axis_id["r"]], le[:, axis_id["theta"]], le[:, axis_id["phi"]], re[:, axis_id["r"]], re[:, axis_id["theta"]], re[:, axis_id["phi"]], ) le_cart = np.column_stack(le_cart) re_cart = np.column_stack(re_cart) # cartesian le, re, width of whole domain domain_le = le_cart.min(axis=0) domain_re = re_cart.max(axis=0) domain_wid = domain_re - domain_le max_wid = np.max(domain_wid) # these will get passed down as uniforms to go from screen coords of # 0,1 to cartesian coords of domain_le to domain_re from which full # spherical coords can be calculated. self.cart_bbox_max_width = max_wid self.cart_bbox_le = domain_le self.cart_bbox_center = (domain_re + domain_le) / 2.0 self.cart_min_dx = np.min(np.linalg.norm(dx_cart)) self.vertex_array.attributes.append( VertexAttribute(name="le_cart", data=le_cart.astype("f4")) ) self.vertex_array.attributes.append( VertexAttribute(name="re_cart", data=re_cart.astype("f4")) ) self.vertex_array.attributes.append( VertexAttribute(name="dx_cart", data=dx_cart.astype("f4")) ) # does not seem that diagonal is used anywhere, but recalculating to # be safe... self.diagonal = np.sqrt(((re_cart - le_cart) ** 2).sum()) else: raise NotImplementedError( f"{self.name} does not implement {self._yt_geom_str} geometries." )
[docs] def viewpoint_iter(self, camera): for block in self.data_source.tiles.traverse(viewpoint=camera.position): vbo_i, _ = self.blocks[id(block)] yield (vbo_i, self.texture_objects[vbo_i], self.bitmap_objects[vbo_i])
[docs] def filter_callback(self, callback): # This is not efficient. It calls it once for each node in a grid. # We do this the slow way because of the problem of ordering the way we # iterate over the grids and nodes. This can be fixed at some point. for g_ind in self.blocks_by_grid: blocks = self.blocks_by_grid[g_ind] # Does this need an offset? grid = self.data_source.index.grids[g_ind] new_bitmap = callback(grid).astype("uint8") for b_id, _ in blocks: _, sl = self.grids_by_block[b_id] vbo_i, _ = self.blocks[b_id] self.bitmap_objects[vbo_i].data = new_bitmap[sl]
def _load_textures(self): for block_id in sorted(self.blocks): vbo_i, block = self.blocks[block_id] n_data = np.abs(block.my_data[0]).copy(order="F").astype("float32").d # Avoid setting to NaNs if self.max_val != self.min_val or self.always_normalize: n_data = self._normalize_by_min_max(n_data) # blocks filled with identically 0 values will be # skipped by the shader, so offset by a tiny value. # see https://github.com/yt-project/yt_idv/issues/171 n_data[n_data == 0.0] += np.finfo(np.float32).eps data_tex = Texture3D(data=n_data) bitmap_tex = Texture3D( data=block.source_mask * 255, min_filter="nearest", mag_filter="nearest" ) self.texture_objects[vbo_i] = data_tex self.bitmap_objects[vbo_i] = bitmap_tex _grid_id_list = None @property def grid_id_list(self): """the 0-indexed grid ids that contain all the blocks""" if self._grid_id_list is None: gl = [gid for gid, _ in self.grids_by_block.values()] self._grid_id_list = np.unique(gl).tolist() return self._grid_id_list @property def intersected_grids(self): return [self.data_source.ds.index.grids[gid] for gid in self.grid_id_list]
def _block_collection_outlines( block_collection: BlockCollection, display_name: str = "block outlines", segments_per_edge: int = 20, outline_type: str = "blocks", ): """ Build a CurveCollection and CurveCollectionRendering from BlockCollection bounding boxes for non-cartesian geometries. """ if outline_type not in ("blocks", "grids"): msg = f"outline_type must be blocks or grids, found {outline_type}" raise ValueError(msg) if block_collection._yt_geom_str not in ("spherical",): msg = "_curves_from_block_data is not implemented for " msg += f"{block_collection._yt_geom_str} geometry." raise NotImplementedError(msg) from yt_idv.scene_components.curves import CurveCollectionRendering from yt_idv.scene_data.curve import CurveCollection data_collection = CurveCollection() if outline_type == "blocks": block_iterator = block_collection.data_source.tiles.traverse() else: # note this can be simplified after # https://github.com/yt-project/yt_idv/pull/179 gids = [gid for gid, _ in block_collection.grids_by_block.values()] gids = np.unique(gids) ds = block_collection.data_source.ds block_iterator = [ds.index.grids[gid] for gid in gids] if block_collection._yt_geom_str == "spherical": from yt_idv.utilities.coordinate_utilities import spherical_to_cartesian # should move this down to cython to speed it up axis_id = block_collection.data_source.ds.coordinates.axis_id n_verts = segments_per_edge + 1 rad_index = axis_id["r"] max_r = block_collection.data_source.ds.domain_right_edge[rad_index] for block in block_iterator: le_i = block.LeftEdge re_i = block.RightEdge r_min = le_i[axis_id["r"]] / max_r r_max = re_i[axis_id["r"]] / max_r theta_min = le_i[axis_id["theta"]] theta_max = re_i[axis_id["theta"]] phi_min = le_i[axis_id["phi"]] phi_max = re_i[axis_id["phi"]] theta_vals = np.linspace(theta_min, theta_max, n_verts) phi_vals = np.linspace(phi_min, phi_max, n_verts) # the r-variation will be straight lines always, only use 2 verts r_vals = np.linspace(r_min, r_max, 2) for r_val in (r_min, r_max): r = np.full(theta_vals.shape, r_val) for phi_val in (phi_min, phi_max): phi = np.full(theta_vals.shape, phi_val) x, y, z = spherical_to_cartesian(r, theta_vals, phi) xyz = np.column_stack([x, y, z]) data_collection.add_curve(xyz) for theta_val in (theta_min, theta_max): theta = np.full(phi_vals.shape, theta_val) x, y, z = spherical_to_cartesian(r, theta, phi_vals) xyz = np.column_stack([x, y, z]) data_collection.add_curve(xyz) for phi_val in (phi_min, phi_max): phi = np.full(r_vals.shape, phi_val) for theta_val in (theta_min, theta_max): theta = np.full(r_vals.shape, theta_val) x, y, z = spherical_to_cartesian(r_vals, theta, phi) xyz = np.column_stack([x, y, z]) data_collection.add_curve(xyz) data_collection.add_data() # call add_data() after done adding curves data_rendering = CurveCollectionRendering(data=data_collection) data_rendering.display_name = display_name return data_collection, data_rendering