1616from graphflow .core .executors import ExecutionResult
1717
1818
19+ @pytest .mark .distributed
1920class TestRayExecutor :
2021 """Test Ray executor functionality."""
2122
2223 def test_ray_executor_import (self ):
2324 """Test that Ray executor can be imported."""
25+ # Check if Ray is available before importing
26+ try :
27+ import ray
28+ except ImportError :
29+ pytest .skip ("Ray not available" )
30+
2431 try :
2532 from graphflow .executors .ray_executor import RayExecutor , create_ray_executor
2633 assert RayExecutor is not None
@@ -30,6 +37,12 @@ def test_ray_executor_import(self):
3037
3138 def test_ray_executor_initialization (self ):
3239 """Test Ray executor initialization."""
40+ # Check if Ray is available before importing
41+ try :
42+ import ray
43+ except ImportError :
44+ pytest .skip ("Ray not available" )
45+
3346 try :
3447 from graphflow .executors .ray_executor import RayExecutor
3548
@@ -72,6 +85,7 @@ def test_ray_cluster_executor_not_initialized(self):
7285 pytest .skip ("Ray not available" )
7386
7487
88+ @pytest .mark .distributed
7589class TestDaskExecutor :
7690 """Test Dask executor functionality."""
7791
@@ -123,6 +137,7 @@ def test_dask_distributed_executor(self):
123137 pytest .skip ("Dask not available" )
124138
125139
140+ @pytest .mark .distributed
126141class TestExecutorRegistry :
127142 """Test executor registry with distributed executors."""
128143
@@ -180,6 +195,7 @@ def test_auto_select_executor_with_distributed(self):
180195 assert executor == "processes"
181196
182197
198+ @pytest .mark .distributed
183199class TestDistributedExecution :
184200 """Test distributed execution scenarios."""
185201
@@ -202,6 +218,12 @@ def create_test_context(self):
202218
203219 def test_ray_execution_simulation (self ):
204220 """Test Ray execution with mocked Ray."""
221+ # Check if Ray is available before importing
222+ try :
223+ import ray
224+ except ImportError :
225+ pytest .skip ("Ray not available" )
226+
205227 try :
206228 from graphflow .executors .ray_executor import RayExecutor
207229
@@ -210,29 +232,35 @@ def test_ray_execution_simulation(self):
210232 inputs = {"df" : pd .DataFrame ({"value" : [1 , 2 , 3 ]})}
211233
212234 # Mock Ray components comprehensively
213- with patch ('ray.is_initialized' , return_value = False ):
214- with patch ('ray.init' ) as mock_init :
215- with patch ('ray.get' ) as mock_get :
216- with patch ('ray.remote' ) as mock_remote :
217- # Mock the remote function to return a callable that returns our expected result
218- mock_remote_func = MagicMock ()
219- mock_remote_func .remote .return_value = "mock_ray_object"
220- mock_remote .return_value = mock_remote_func
221-
222- # Mock ray.get to return our expected result
223- mock_get .return_value = pd .DataFrame ({"value" : [12 , 14 , 16 ]})
235+ with patch ('ray.init' ) as mock_init :
236+ with patch ('ray.get' ) as mock_get :
237+ with patch ('ray.remote' ) as mock_remote :
238+ # Mock the remote function to return a callable that returns our expected result
239+ mock_remote_func = MagicMock ()
240+ mock_remote_func .remote .return_value = "mock_ray_object"
241+ mock_remote .return_value = mock_remote_func
242+
243+ # Mock ray.get to return our expected result
244+ mock_get .return_value = pd .DataFrame ({"value" : [12 , 14 , 16 ]})
245+
246+ # Mock ray.is_initialized to return False initially, then True after init
247+ init_called = False
248+ def mock_is_initialized ():
249+ return init_called
250+
251+ def mock_init_side_effect (* args , ** kwargs ):
252+ nonlocal init_called
253+ init_called = True
254+
255+ mock_init .side_effect = mock_init_side_effect
256+
257+ with patch ('ray.is_initialized' , side_effect = mock_is_initialized ):
258+ executor = RayExecutor ()
259+ result = executor .execute_node (node_spec , context , inputs )
224260
225- # Mock ray.is_initialized to return True after init is called
226- def mock_is_initialized ():
227- return mock_init .called
228-
229- with patch ('ray.is_initialized' , side_effect = mock_is_initialized ):
230- executor = RayExecutor ()
231- result = executor .execute_node (node_spec , context , inputs )
232-
233- assert result .success
234- assert result .node_name == "test_func"
235- assert result .cache_hit is False
261+ assert result .success
262+ assert result .node_name == "test_func"
263+ assert result .cache_hit is False
236264 except ImportError :
237265 pytest .skip ("Ray not available" )
238266
@@ -258,6 +286,7 @@ def test_dask_execution_simulation(self):
258286 pytest .skip ("Dask not available" )
259287
260288
289+ @pytest .mark .distributed
261290class TestExecutorFactory :
262291 """Test executor factory functions."""
263292
@@ -291,11 +320,18 @@ def test_create_dask_executor(self):
291320 pytest .skip ("Dask not available" )
292321
293322
323+ @pytest .mark .distributed
294324class TestExecutorErrorHandling :
295325 """Test error handling in distributed executors."""
296326
297327 def test_ray_execution_error (self ):
298328 """Test Ray executor error handling."""
329+ # Check if Ray is available before importing
330+ try :
331+ import ray
332+ except ImportError :
333+ pytest .skip ("Ray not available" )
334+
299335 try :
300336 from graphflow .executors .ray_executor import RayExecutor
301337
@@ -311,25 +347,31 @@ def failing_func(df, param):
311347 context = Context (data = {"param" : 1 })
312348 inputs = {"df" : pd .DataFrame ({"value" : [1 , 2 , 3 ]})}
313349
314- with patch ('ray.is_initialized' , return_value = False ):
315- with patch ('ray.init' ) as mock_init :
316- with patch ('ray.get' , side_effect = ValueError ("Test error" )):
317- with patch ('ray.remote' ) as mock_remote :
318- # Mock the remote function
319- mock_remote_func = MagicMock ()
320- mock_remote_func .remote .return_value = "mock_ray_object"
321- mock_remote .return_value = mock_remote_func
322-
323- # Mock ray.is_initialized to return True after init is called
324- def mock_is_initialized ():
325- return mock_init .called
350+ with patch ('ray.init' ) as mock_init :
351+ with patch ('ray.get' , side_effect = ValueError ("Test error" )):
352+ with patch ('ray.remote' ) as mock_remote :
353+ # Mock the remote function
354+ mock_remote_func = MagicMock ()
355+ mock_remote_func .remote .return_value = "mock_ray_object"
356+ mock_remote .return_value = mock_remote_func
357+
358+ # Mock ray.is_initialized to return False initially, then True after init
359+ init_called = False
360+ def mock_is_initialized ():
361+ return init_called
362+
363+ def mock_init_side_effect (* args , ** kwargs ):
364+ nonlocal init_called
365+ init_called = True
366+
367+ mock_init .side_effect = mock_init_side_effect
368+
369+ with patch ('ray.is_initialized' , side_effect = mock_is_initialized ):
370+ executor = RayExecutor ()
371+ result = executor .execute_node (node_spec , context , inputs )
326372
327- with patch ('ray.is_initialized' , side_effect = mock_is_initialized ):
328- executor = RayExecutor ()
329- result = executor .execute_node (node_spec , context , inputs )
330-
331- assert not result .success
332- assert "Test error" in result .error
373+ assert not result .success
374+ assert "Test error" in result .error
333375 except ImportError :
334376 pytest .skip ("Ray not available" )
335377
0 commit comments