11# SPDX-License-Identifier: Apache-2.0
22# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies.
33# SPDX-FileContributor: Alexandre Benedicto
4+ import logging
45from dataclasses import dataclass
56from enum import Enum
67from pathlib import Path
78from typing import Optional , Type , TypeAlias
8- from vtkmodules .vtkCommonDataModel import vtkPointSet , vtkUnstructuredGrid
9+ from typing_extensions import Self , Union
10+ from xml .etree import ElementTree as ET
11+ from vtkmodules .vtkCommonDataModel import vtkPointSet , vtkUnstructuredGrid , vtkDataSet
912from vtkmodules .vtkIOCore import vtkWriter
1013from vtkmodules .vtkIOLegacy import vtkDataReader , vtkUnstructuredGridWriter , vtkUnstructuredGridReader
1114from vtkmodules .vtkIOXML import ( vtkXMLGenericDataObjectReader , vtkXMLUnstructuredGridWriter , vtkXMLWriter ,
1215 vtkXMLStructuredGridWriter )
13- from geos .utils .Logger import getLogger
16+
17+ from geos .utils .Logger import ( getLogger , Logger )
1418
1519__doc__ = """
1620Input and Output methods for various VTK mesh formats.
@@ -104,19 +108,28 @@ def _readData( filepath: str, readerClass: VtkReaderClass ) -> Optional[ vtkPoin
104108 return output
105109
106110
107- def _writeData ( mesh : vtkPointSet , writerClass : VtkWriterClass , output : str , isBinary : bool ) -> int :
111+ def _writeData ( mesh : vtkPointSet ,
112+ writerClass : VtkWriterClass ,
113+ output : str ,
114+ isBinary : bool ,
115+ logger : Union [ Logger , None ] = None ) -> int :
108116 """Generic helper to write a VTK file using a specific writer class.
109117
110118 Args:
111119 mesh (vtkPointSet): The grid data to write.
112120 writerClass (VtkWriterClass): The VTK writer class to use.
113121 output (str): The output file path.
114122 isBinary (bool): Whether to write the file in binary mode (True) or ASCII (False).
123+ logger (Union[Logger, None], optional): A logger to manage the output messages.
124+ Defaults to None, an internal logger is used.
115125
116126 Returns:
117127 int: 1 if success, 0 otherwise.
118128 """
119- ioLogger .info ( f"Writing mesh to '{ output } ' using { writerClass .__name__ } ..." )
129+ if logger is None :
130+ logger = ioLogger
131+
132+ logger .info ( f"Writing mesh to '{ output } ' using { writerClass .__name__ } ..." )
120133 writer = writerClass ()
121134 writer .SetFileName ( output )
122135 writer .SetInputData ( mesh )
@@ -125,10 +138,10 @@ def _writeData( mesh: vtkPointSet, writerClass: VtkWriterClass, output: str, isB
125138 if isinstance ( writer , vtkXMLWriter ):
126139 if isBinary :
127140 writer .SetDataModeToBinary ()
128- ioLogger .info ( "Data mode set to Binary." )
141+ logger .info ( "Data mode set to Binary." )
129142 else :
130143 writer .SetDataModeToAscii ()
131- ioLogger .info ( "Data mode set to ASCII." )
144+ logger .info ( "Data mode set to ASCII." )
132145
133146 return writer .Write ()
134147
@@ -220,7 +233,10 @@ def readUnstructuredGrid( filepath: str ) -> vtkUnstructuredGrid:
220233 return mesh
221234
222235
223- def writeMesh ( mesh : vtkPointSet , vtkOutput : VtkOutput , canOverwrite : bool = False ) -> int :
236+ def writeMesh ( mesh : vtkPointSet ,
237+ vtkOutput : VtkOutput ,
238+ canOverwrite : bool = False ,
239+ logger : Union [ Logger , None ] = None ) -> int :
224240 """
225241 Writes a vtkPointSet to a file.
226242
@@ -231,6 +247,8 @@ def writeMesh( mesh: vtkPointSet, vtkOutput: VtkOutput, canOverwrite: bool = Fal
231247 vtkOutput (VtkOutput): Configuration for the output file.
232248 canOverwrite (bool, optional): If False, raises an error if the file
233249 already exists. Defaults to False.
250+ logger (Union[Logger, None], optional): A logger to manage the output messages.
251+ Defaults to None, an internal logger is used.
234252
235253 Raises:
236254 FileExistsError: If the output file exists and `canOverwrite` is False.
@@ -240,6 +258,9 @@ def writeMesh( mesh: vtkPointSet, vtkOutput: VtkOutput, canOverwrite: bool = Fal
240258 Returns:
241259 int: Returns 1 on success, consistent with the VTK writer's return code.
242260 """
261+ if logger is None :
262+ logger = ioLogger
263+
243264 outputPath = Path ( vtkOutput .output )
244265 if outputPath .exists () and not canOverwrite :
245266 raise FileExistsError ( f"File '{ outputPath } ' already exists. Set canOverwrite=True to replace it." )
@@ -256,13 +277,96 @@ def writeMesh( mesh: vtkPointSet, vtkOutput: VtkOutput, canOverwrite: bool = Fal
256277 if not writerClass :
257278 raise ValueError ( f"Writing to extension '{ outputPath .suffix } ' is not supported." )
258279
259- successCode = _writeData ( mesh , writerClass , str ( outputPath ), vtkOutput .isDataModeBinary )
280+ successCode = _writeData ( mesh , writerClass , str ( outputPath ), vtkOutput .isDataModeBinary , logger )
260281 if not successCode :
261282 raise RuntimeError ( f"VTK writer failed to write file '{ outputPath } '." )
262283
263- ioLogger .info ( f"Successfully wrote mesh to '{ outputPath } '." )
284+ logger .info ( f"Successfully wrote mesh to '{ outputPath } '." )
264285 return successCode
265286
266287 except ( ValueError , RuntimeError ) as e :
267- ioLogger .error ( e )
288+ logger .error ( e )
268289 raise
290+
291+
292+ class PVDReader :
293+
294+ def __init__ ( self : Self , filename : str , logger : Union [ Logger , None ] = None ) -> None :
295+ """PVD Reader class.
296+
297+ Args:
298+ filename (str): PVD filename with full path.
299+ logger (Union[Logger, None], optional): A logger to manage the output messages.
300+ Defaults to None, an internal logger is used.
301+ """
302+ self .logger : Logger
303+ if logger is None :
304+ self .logger = getLogger ( "PVD Reader" , True )
305+ else :
306+ self .logger = logging .getLogger ( f"{ logger .name } " )
307+ self .logger .setLevel ( logging .INFO )
308+ self .logger .propagate = False
309+
310+ self .filename = filename
311+ self .dir = Path ( filename ).parent
312+ self .datasets = {}
313+ self ._read ()
314+
315+ def _read ( self ) -> None :
316+ tree = ET .parse ( self .filename )
317+ root = tree .getroot ()
318+ datasets = root [ 0 ].findall ( 'DataSet' )
319+
320+ for n , dataset in enumerate ( datasets ):
321+ timestep = float ( dataset .attrib .get ( 'timestep' , 0 ) )
322+ datasetFile = Path ( dataset .attrib .get ( 'file' ) )
323+ self .datasets [ n ] = ( timestep , datasetFile )
324+
325+ self .logger .info ( "All filenames from PVD file have been read." )
326+
327+ def getDataSetAtTimeIndex ( self : Self , timeIndex : int ) -> vtkDataSet :
328+ """Get the dataset corresponding to requested time index.
329+
330+ Args:
331+ timeIndex (int): Time index
332+
333+ Returns:
334+ vtkDataSet: Dataset
335+ """
336+ return readMesh ( self .dir / self .datasets [ timeIndex ][ 1 ] )
337+
338+ def getAllTimestepsValues ( self : Self ) -> list [ float ]:
339+ """Get the list of all timesteps values from the PVD.
340+
341+ Returns:
342+ list[float]: List of timesteps values.
343+ """
344+ return [ value [ 0 ] for _ , value in self .datasets .items () ]
345+
346+
347+ def createPVD ( outputDir : Path ,
348+ pvdFilename : str ,
349+ outputFiles : list [ tuple [ int , str ] ],
350+ logger : Union [ Logger , None ] = None ) -> None :
351+ """Create PVD collection file.
352+
353+ Args:
354+ outputDir (Path): Output directory
355+ pvdFilename (str): Output PVD filename
356+ outputFiles (list[tuple[int, str]]): List containing all the filenames of the PVD files
357+ logger (Union[Logger, None], optional): A logger to manage the output messages.
358+ Defaults to None, an internal logger is used.
359+ """
360+ if logger is None :
361+ logger = getLogger ( "createPVD" , True )
362+
363+ pvdPath = outputDir / pvdFilename
364+ with open ( pvdPath , 'w' ) as f :
365+ f .write ( '<VTKFile type="Collection" version="0.1">\n ' )
366+ f .write ( ' <Collection>\n ' )
367+ for t , fname in outputFiles :
368+ f .write ( f' <DataSet timestep="{ t } " file="{ fname } "/>\n ' )
369+ f .write ( ' </Collection>\n ' )
370+ f .write ( '</VTKFile>\n ' )
371+
372+ logger .info ( f"PVD created: { pvdPath } ." )
0 commit comments