Skip to content

axfluxmdo.solvers

Gmsh mesh export requires the [fea] extra; the GetDP runner needs the external getdp binary (ONELAB bundle, or AXFLUXMDO_GETDP=/path/to/getdp).

axfluxmdo.solvers.gmsh_export

Gmsh geometry and mesh export.

Two models:

  • 2D unrolled (build_linear_2d_model / export_mesh): the annulus unrolled at the mean radius into a linear-machine cross-section — one pole pair, x circumferential in [0, 2*tau_p], y axial with y=0 at the air-gap center. Solvable as planar magnetostatics by GetDP; the slotless variant places stator iron directly at +g/2 so the magnetic gap is exactly motor.air_gap (the load line's circuit). The slotted variant opens winding slots so the Carter factor can be MEASURED.
  • 3D sector (export_3d_sector): an OCC annular sector with rotor / magnet / gap / winding / stator volumes for visualization and downstream meshing credibility; no solver hookup.

gmsh is a process-global singleton: every session goes through _gmsh_session (try/finally finalize), and all paths are resolved to absolute before writing (gmsh writes CWD-relative otherwise). Mesh files are written as MSH 2.2, the format GetDP handles best.

Linear2DLayout dataclass

Linear2DLayout(x_span_m: float, gap_midline_y_m: float, depth_m: float, pole_pitch_m: float, magnet_arc_ratio: float, slotted: bool, group_tags: dict[str, int], y_interfaces_m: dict[str, float])

Everything the GetDP template and the parser need to know about the mesh.

build_linear_2d_model

build_linear_2d_model(motor: AxialFluxMotor, *, slotted: bool = False, n_pole_pairs_modeled: int = 1, airgap_layers: int = 4, mesh_size_factor: float = 1.0, slots_per_pole_pair: int | None = None) -> Linear2DLayout

Populate the CURRENT gmsh model with the unrolled cross-section.

Requires an active gmsh session (see export_mesh for the session-owning wrapper). Returns the layout descriptor; the mesh is not yet generated.

Source code in src/axfluxmdo/solvers/gmsh_export.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def build_linear_2d_model(
    motor: AxialFluxMotor,
    *,
    slotted: bool = False,
    n_pole_pairs_modeled: int = 1,
    airgap_layers: int = 4,
    mesh_size_factor: float = 1.0,
    slots_per_pole_pair: int | None = None,
) -> Linear2DLayout:
    """Populate the CURRENT gmsh model with the unrolled cross-section.

    Requires an active gmsh session (see ``export_mesh`` for the session-owning
    wrapper). Returns the layout descriptor; the mesh is not yet generated.
    """
    gmsh = _import_gmsh()
    geo = gmsh.model.geo

    tau = motor.pole_pitch
    span = n_pole_pairs_modeled * 2.0 * tau
    g = motor.air_gap
    t_m = motor.magnet_thickness
    t_bi = motor.back_iron_thickness
    d_slot = motor.slot_depth
    t_sc = motor.stator_core_thickness

    y_rotor_bottom = -g / 2.0 - t_m - t_bi
    y_magnet_bottom = -g / 2.0 - t_m
    y_gap_bottom = -g / 2.0
    y_gap_top = +g / 2.0
    y_slot_top = y_gap_top + d_slot
    y_stator_top = y_slot_top + t_sc

    size_fine = mesh_size_factor * min(g, t_m) / 2.0
    size_coarse = 4.0 * size_fine

    point_cache: dict[tuple[float, float], int] = {}

    def pt(x: float, y: float, size: float) -> int:
        key = (round(x, 12), round(y, 12))
        if key not in point_cache:
            point_cache[key] = geo.addPoint(x, y, 0.0, size)
        return point_cache[key]

    line_cache: dict[tuple[int, int], int] = {}

    def ln(p1: int, p2: int) -> int:
        """Shared line cache so adjacent rectangles are conformal."""
        if (p1, p2) in line_cache:
            return line_cache[(p1, p2)]
        if (p2, p1) in line_cache:
            return -line_cache[(p2, p1)]
        tag = geo.addLine(p1, p2)
        line_cache[(p1, p2)] = tag
        return tag

    def rect(x0: float, x1: float, y0: float, y1: float, size: float) -> int:
        """Conformal rectangle surface; returns the surface tag."""
        p_bl, p_br = pt(x0, y0, size), pt(x1, y0, size)
        p_tr, p_tl = pt(x1, y1, size), pt(x0, y1, size)
        loop = geo.addCurveLoop([ln(p_bl, p_br), ln(p_br, p_tr), ln(p_tr, p_tl), ln(p_tl, p_bl)])
        return geo.addPlaneSurface([loop])

    surfaces: dict[str, list[int]] = {
        "ROTOR_IRON": [],
        "MAGNET_N": [],
        "MAGNET_S": [],
        "AIR": [],
        "AIRGAP": [],
        "WINDING": [],
        "STATOR_IRON": [],
    }

    # ALL bands share one global set of x-breakpoints (magnet edges + slot
    # edges when slotted) so every horizontal interface is conformal: adjacent
    # rectangles reuse the same line entities through the caches. A band with
    # private breakpoints would duplicate the shared edge geometrically — a
    # mesh crack, which the A_z formulation treats as a natural (infinite-mu)
    # boundary and silently decouples the stator.
    x_breaks: list[float] = [0.0]
    magnet_intervals: list[tuple[float, float, str]] = []
    for k in range(2 * n_pole_pairs_modeled):
        x0 = k * tau + 0.5 * tau * (1.0 - motor.magnet_arc_ratio)
        x1 = k * tau + 0.5 * tau * (1.0 + motor.magnet_arc_ratio)
        name = "MAGNET_N" if k % 2 == 0 else "MAGNET_S"
        magnet_intervals.append((x0, x1, name))
        x_breaks += [x0, x1]
    slot_intervals: list[tuple[float, float]] = []
    if slotted:
        n_slots = (slots_per_pole_pair or 2 * motor.phases) * n_pole_pairs_modeled
        slot_pitch = span / n_slots
        opening = motor.slot_width_fraction * slot_pitch
        for k in range(n_slots):
            center = (k + 0.5) * slot_pitch
            slot_intervals.append((center - opening / 2.0, center + opening / 2.0))
            x_breaks += [center - opening / 2.0, center + opening / 2.0]
    x_breaks.append(span)
    x_breaks = sorted(set(round(x, 12) for x in x_breaks))

    def band(y0: float, y1: float, size: float, classify) -> None:
        for xa, xb in zip(x_breaks[:-1], x_breaks[1:], strict=False):
            xm = 0.5 * (xa + xb)
            surfaces[classify(xm)].append(rect(xa, xb, y0, y1, size))

    # Rotor back iron (single band, coarse)
    band(y_rotor_bottom, y_magnet_bottom, size_coarse, lambda _x: "ROTOR_IRON")

    # Magnet band: magnets where covered, AIR between
    def classify_magnet(xm: float) -> str:
        for x0, x1, name in magnet_intervals:
            if x0 <= xm <= x1:
                return name
        return "AIR"

    band(y_magnet_bottom, y_gap_bottom, size_fine, classify_magnet)

    # Air gap (fine)
    band(y_gap_bottom, y_gap_top, size_fine, lambda _x: "AIRGAP")

    # Stator
    if not slotted:
        # Slotless: iron face directly at +g/2 — the load-line magnetic circuit.
        band(y_gap_top, y_stator_top, size_coarse, lambda _x: "STATOR_IRON")
    else:

        def classify_slot(xm: float) -> str:
            for x0, x1 in slot_intervals:
                if x0 <= xm <= x1:
                    return "WINDING"
            return "STATOR_IRON"

        band(y_gap_top, y_slot_top, size_fine, classify_slot)
        band(y_slot_top, y_stator_top, size_coarse, lambda _x: "STATOR_IRON")

    geo.synchronize()

    # Periodic constraint: right edges are translated copies of left edges.
    def edges_at_x(x_target: float) -> list[int]:
        tags = []
        for (p1, p2), tag in line_cache.items():
            x1y = point_cache_inv[p1]
            x2y = point_cache_inv[p2]
            if abs(x1y[0] - x_target) < 1e-12 and abs(x2y[0] - x_target) < 1e-12:
                tags.append(tag)
        return sorted(tags)

    point_cache_inv = {tag: xy for xy, tag in point_cache.items()}
    left_edges = edges_at_x(0.0)
    right_edges = edges_at_x(round(span, 12))
    translation = [1, 0, 0, span, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1]
    gmsh.model.mesh.setPeriodic(1, right_edges, left_edges, translation)

    # Boundary groups
    def horizontal_edges_at_y(y_target: float) -> list[int]:
        tags = []
        for (p1, p2), tag in line_cache.items():
            y1 = point_cache_inv[p1][1]
            y2 = point_cache_inv[p2][1]
            if abs(y1 - y_target) < 1e-12 and abs(y2 - y_target) < 1e-12:
                tags.append(tag)
        return sorted(tags)

    group_tags: dict[str, int] = {}
    for name, surfs in surfaces.items():
        if surfs:
            tag = gmsh.model.addPhysicalGroup(2, surfs)
            gmsh.model.setPhysicalName(2, tag, name)
            group_tags[name] = tag
    for name, edges in (
        ("PERIODIC_LEFT", left_edges),
        ("PERIODIC_RIGHT", right_edges),
        ("OUTER", horizontal_edges_at_y(y_rotor_bottom) + horizontal_edges_at_y(y_stator_top)),
    ):
        tag = gmsh.model.addPhysicalGroup(1, [abs(t) for t in edges])
        gmsh.model.setPhysicalName(1, tag, name)
        group_tags[name] = tag

    return Linear2DLayout(
        x_span_m=span,
        gap_midline_y_m=0.0,
        depth_m=motor.active_length,
        pole_pitch_m=tau,
        magnet_arc_ratio=motor.magnet_arc_ratio,
        slotted=slotted,
        group_tags=group_tags,
        y_interfaces_m={
            "rotor_bottom": y_rotor_bottom,
            "magnet_bottom": y_magnet_bottom,
            "gap_bottom": y_gap_bottom,
            "gap_top": y_gap_top,
            "slot_top": y_slot_top,
            "stator_top": y_stator_top,
        },
    )

export_mesh

export_mesh(motor: AxialFluxMotor, path: str | Path, *, geo_unrolled: bool = False, **build_kwargs) -> tuple[Path, Linear2DLayout]

Build the unrolled 2D model, mesh it, and write a .msh (MSH 2.2).

Source code in src/axfluxmdo/solvers/gmsh_export.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def export_mesh(
    motor: AxialFluxMotor,
    path: str | Path,
    *,
    geo_unrolled: bool = False,
    **build_kwargs,
) -> tuple[Path, Linear2DLayout]:
    """Build the unrolled 2D model, mesh it, and write a .msh (MSH 2.2)."""
    path = Path(path).resolve()
    path.parent.mkdir(parents=True, exist_ok=True)
    with _gmsh_session() as gmsh:
        gmsh.model.add("axfluxmdo_linear2d")
        layout = build_linear_2d_model(motor, **build_kwargs)
        gmsh.model.mesh.generate(2)
        gmsh.write(str(path))
        if geo_unrolled:
            gmsh.write(str(path.with_suffix(".geo_unrolled")))
    return path, layout

export_3d_sector

export_3d_sector(motor: AxialFluxMotor, path: str | Path, *, sector_poles: int = 1, mesh_size_factor: float = 1.0) -> Path

Export a 3D annular sector mesh (rotor/magnets/gap/winding/stator volumes).

Source code in src/axfluxmdo/solvers/gmsh_export.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def export_3d_sector(
    motor: AxialFluxMotor,
    path: str | Path,
    *,
    sector_poles: int = 1,
    mesh_size_factor: float = 1.0,
) -> Path:
    """Export a 3D annular sector mesh (rotor/magnets/gap/winding/stator volumes)."""
    path = Path(path).resolve()
    path.parent.mkdir(parents=True, exist_ok=True)
    angle = sector_poles * math.pi / motor.pole_pairs  # one pole = pi/p
    g = motor.air_gap
    t_m = motor.magnet_thickness

    with _gmsh_session() as gmsh:
        gmsh.model.add("axfluxmdo_sector3d")
        occ = gmsh.model.occ

        def annular_sector(z0: float, dz: float) -> int:
            outer = occ.addCylinder(0, 0, z0, 0, 0, dz, motor.outer_radius, angle=angle)
            inner = occ.addCylinder(0, 0, z0, 0, 0, dz, motor.inner_radius, angle=angle)
            out, _ = occ.cut([(3, outer)], [(3, inner)])
            return out[0][1]

        z = 0.0
        volumes: list[tuple[str, int]] = []
        layers = [
            ("ROTOR_IRON", motor.back_iron_thickness),
            ("MAGNET", t_m),
            ("AIRGAP", g),
            ("WINDING", motor.slot_depth),
            ("STATOR_IRON", motor.stator_core_thickness),
        ]
        for name, thickness in layers:
            if name == "MAGNET":
                # magnet arc sub-sector centered in the pole(s); rest of the band is air
                full = annular_sector(z, thickness)
                magnet_angle = motor.magnet_arc_ratio * math.pi / motor.pole_pairs
                magnet_tags = []
                for k in range(sector_poles):
                    pole_start = k * math.pi / motor.pole_pairs
                    rotate_by = pole_start + 0.5 * (math.pi / motor.pole_pairs - magnet_angle)
                    outer = occ.addCylinder(
                        0, 0, z, 0, 0, thickness, motor.outer_radius, angle=magnet_angle
                    )
                    inner = occ.addCylinder(
                        0, 0, z, 0, 0, thickness, motor.inner_radius, angle=magnet_angle
                    )
                    cut_out, _ = occ.cut([(3, outer)], [(3, inner)])
                    occ.rotate(cut_out, 0, 0, 0, 0, 0, 1, rotate_by)
                    magnet_tags.append(cut_out[0][1])
                air_parts, _ = occ.cut([(3, full)], [(3, t) for t in magnet_tags], removeTool=False)
                for dim_tag in air_parts:
                    volumes.append(("AIR", dim_tag[1]))
                for k, t in enumerate(magnet_tags):
                    volumes.append((f"MAGNET_{'N' if k % 2 == 0 else 'S'}", t))
            else:
                volumes.append((name, annular_sector(z, thickness)))
            z += thickness

        occ.fragment([(3, t) for _, t in volumes], [])
        occ.synchronize()

        groups: dict[str, list[int]] = {}
        for name, tag in volumes:
            groups.setdefault(name, []).append(tag)
        for name, tags in groups.items():
            ptag = gmsh.model.addPhysicalGroup(3, tags)
            gmsh.model.setPhysicalName(3, ptag, name)

        size = mesh_size_factor * min(g, t_m)
        gmsh.option.setNumber("Mesh.MeshSizeMin", size)
        gmsh.option.setNumber("Mesh.MeshSizeMax", 8.0 * size)
        gmsh.model.mesh.generate(3)
        gmsh.write(str(path))
    return path

axfluxmdo.solvers.getdp_runner

GetDP solver orchestration.

GetDP (https://getdp.info) is an external binary, never a Python dependency. Discovery order: the AXFLUXMDO_GETDP environment variable (must point at an existing file — a set-but-wrong override fails loudly), then getdp on PATH. All solver tests skip cleanly when the binary is absent.

SolverError

Bases: RuntimeError

GetDP invocation failed; the message carries the output tail.

find_getdp

find_getdp() -> str | None

Locate the getdp binary (env var override first, then PATH).

Source code in src/axfluxmdo/solvers/getdp_runner.py
33
34
35
36
37
38
39
40
41
def find_getdp() -> str | None:
    """Locate the getdp binary (env var override first, then PATH)."""
    override = os.environ.get(GETDP_ENV_VAR)
    if override:
        path = Path(override).expanduser()
        if not path.is_file():
            raise SolverError(f"{GETDP_ENV_VAR}={override!r} does not point to an existing file")
        return str(path)
    return shutil.which("getdp")

run_getdp

run_getdp(pro_path: str | Path, msh_path: str | Path, *, resolution: str = 'Magnetostatics2D', post_operation: str = 'gap_field', table_filename: str = 'gap_field.dat', workdir: str | Path | None = None, timeout: float = 120.0) -> Path

Run getdp on a rendered .pro + .msh; return the output table path.

Source code in src/axfluxmdo/solvers/getdp_runner.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def run_getdp(
    pro_path: str | Path,
    msh_path: str | Path,
    *,
    resolution: str = "Magnetostatics2D",
    post_operation: str = "gap_field",
    table_filename: str = "gap_field.dat",
    workdir: str | Path | None = None,
    timeout: float = 120.0,
) -> Path:
    """Run getdp on a rendered .pro + .msh; return the output table path."""
    getdp = find_getdp()
    if getdp is None:
        raise SolverError(
            "getdp binary not found; install GetDP (e.g. the ONELAB bundle from "
            f"https://onelab.info) and/or set {GETDP_ENV_VAR}=/path/to/getdp"
        )
    pro_path = Path(pro_path).resolve()
    msh_path = Path(msh_path).resolve()
    cwd = Path(workdir).resolve() if workdir is not None else pro_path.parent
    cmd = [
        getdp,
        str(pro_path),
        "-msh",
        str(msh_path),
        "-solve",
        resolution,
        "-pos",
        post_operation,
        "-v",
        "2",
    ]
    try:
        proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout)
    except subprocess.TimeoutExpired as exc:
        raise SolverError(f"getdp timed out after {timeout}s: {' '.join(cmd)}") from exc
    if proc.returncode != 0:
        tail = "\n".join((proc.stdout + "\n" + proc.stderr).splitlines()[-30:])
        raise SolverError(f"getdp failed (exit {proc.returncode}):\n{tail}")
    table = cwd / table_filename
    if not table.is_file():
        tail = "\n".join((proc.stdout + "\n" + proc.stderr).splitlines()[-30:])
        raise SolverError(f"getdp produced no output table at {table}:\n{tail}")
    return table

solve_open_circuit

solve_open_circuit(motor: AxialFluxMotor, workdir: str | Path | None = None, *, slotted: bool = False, n_samples: int = 720, magnet_temp_c: float | None = None, mesh_size_factor: float = 1.0) -> GapFieldSolution

One-call pipeline: mesh export -> .pro render -> getdp -> parsed gap field.

magnet_temp_c defaults to ambient (25 C) + MAGNET_TEMP_RISE_C = 65 C — exactly the magnet temperature AnalyticalModel uses at the default operating point, so default residual comparisons are apples-to-apples.

Source code in src/axfluxmdo/solvers/getdp_runner.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def solve_open_circuit(
    motor: AxialFluxMotor,
    workdir: str | Path | None = None,
    *,
    slotted: bool = False,
    n_samples: int = 720,
    magnet_temp_c: float | None = None,
    mesh_size_factor: float = 1.0,
) -> GapFieldSolution:
    """One-call pipeline: mesh export -> .pro render -> getdp -> parsed gap field.

    magnet_temp_c defaults to ambient (25 C) + MAGNET_TEMP_RISE_C = 65 C —
    exactly the magnet temperature AnalyticalModel uses at the default
    operating point, so default residual comparisons are apples-to-apples.
    """
    from axfluxmdo.solvers.getdp_templates import render_open_circuit_pro
    from axfluxmdo.solvers.gmsh_export import export_mesh

    if magnet_temp_c is None:
        magnet_temp_c = _DEFAULT_AMBIENT_C + MAGNET_TEMP_RISE_C

    if workdir is None:
        with tempfile.TemporaryDirectory(prefix="axfluxmdo_getdp_") as tmp:
            return solve_open_circuit(
                motor,
                tmp,
                slotted=slotted,
                n_samples=n_samples,
                magnet_temp_c=magnet_temp_c,
                mesh_size_factor=mesh_size_factor,
            )

    wd = Path(workdir).resolve()
    wd.mkdir(parents=True, exist_ok=True)
    msh_path, layout = export_mesh(
        motor, wd / "model.msh", slotted=slotted, mesh_size_factor=mesh_size_factor
    )
    pro_text = render_open_circuit_pro(
        motor, layout, magnet_temp_c=magnet_temp_c, n_samples=n_samples
    )
    pro_path = wd / "model.pro"
    pro_path.write_text(pro_text)
    table = run_getdp(pro_path, msh_path, workdir=wd)
    cols = parse_table(table)
    return GapFieldSolution(
        x_m=cols[:, 0],
        by_t=cols[:, 4],
        pole_pitch_m=motor.pole_pitch,
        magnet_arc_ratio=motor.magnet_arc_ratio,
        magnet_temp_c=magnet_temp_c,
        slotted=slotted,
    )

axfluxmdo.solvers.results_parser

Parse GetDP output tables into gap-field solutions.

GapFieldSolution dataclass

GapFieldSolution(x_m: ndarray, by_t: ndarray, pole_pitch_m: float, magnet_arc_ratio: float, magnet_temp_c: float, slotted: bool)

Axial flux density sampled along the air-gap midline of the unrolled model.

mean_b_t property

mean_b_t: float

Mean |By| over the magnet-covered intervals only.

This is the load-line B_g semantics — flux density UNDER the magnet — not the full-pitch mean (see mean_b_full_pitch_t).

fundamental_b1_t property

fundamental_b1_t: float

Amplitude of the spatial fundamental (period = one pole pair).

Trapezoid Fourier projection over the periodic span — robust to the duplicated x=0 / x=L endpoint that an OnLine sample produces (a naive FFT bin would double-count it).

from_table classmethod

from_table(path: str | Path, motor, *, magnet_temp_c: float, slotted: bool = False) -> GapFieldSolution

Build a solution from a (possibly committed golden) GetDP table.

Source code in src/axfluxmdo/solvers/results_parser.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def from_table(
    cls,
    path: str | Path,
    motor,
    *,
    magnet_temp_c: float,
    slotted: bool = False,
) -> GapFieldSolution:
    """Build a solution from a (possibly committed golden) GetDP table."""
    cols = parse_table(path)
    return cls(
        x_m=cols[:, 0],
        by_t=cols[:, 4],
        pole_pitch_m=motor.pole_pitch,
        magnet_arc_ratio=motor.magnet_arc_ratio,
        magnet_temp_c=magnet_temp_c,
        slotted=slotted,
    )

parse_table

parse_table(path: str | Path) -> np.ndarray

Parse a GetDP Format Table line print of a 3-component vector field.

GetDP Table rows carry a variable number of leading metadata columns (element type / index), but for a vector field printed OnLine the TRAILING SIX columns are always x y z vx vy vz. Returns an (n, 6) array of those columns, sorted by x. Sanity-checks that y and z are constant (it was a line sample).

Source code in src/axfluxmdo/solvers/results_parser.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def parse_table(path: str | Path) -> np.ndarray:
    """Parse a GetDP ``Format Table`` line print of a 3-component vector field.

    GetDP Table rows carry a variable number of leading metadata columns
    (element type / index), but for a vector field printed OnLine the
    TRAILING SIX columns are always ``x y z vx vy vz``. Returns an (n, 6)
    array of those columns, sorted by x. Sanity-checks that y and z are
    constant (it was a line sample).
    """
    data = np.loadtxt(path, comments="#", ndmin=2)
    if data.shape[1] < 6:
        raise ValueError(
            f"{path}: expected >= 6 columns (x y z vx vy vz at the end), got {data.shape[1]}"
        )
    cols = data[:, -6:]
    if np.ptp(cols[:, 1]) > 1e-9 or np.ptp(cols[:, 2]) > 1e-9:
        raise ValueError(f"{path}: y/z vary along the sample line; not an OnLine table")
    return cols[np.argsort(cols[:, 0])]