@@ -1734,156 +1734,7 @@ def test_format_prediction_task_regression(self):
17341734 res = format_prediction (regression , * ignored_input )
17351735 self .assertListEqual (res , [0 ] * 5 )
17361736
1737- @pytest .mark .sklearn ()
1738- @unittest .skipIf (
1739- Version (sklearn .__version__ ) < Version ("0.21" ),
1740- reason = "couldn't perform local tests successfully w/o bloating RAM" ,
1741- )
1742- @mock .patch ("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs" )
1743- def test__run_task_get_arffcontent_2 (self , parallel_mock ):
1744- """Tests if a run executed in parallel is collated correctly."""
1745- task = openml .tasks .get_task (7 ) # Supervised Classification on kr-vs-kp
1746- x , y = task .get_X_and_y ()
1747- num_instances = x .shape [0 ]
1748- line_length = 6 + len (task .class_labels )
1749- loss = "log" if Version (sklearn .__version__ ) < Version ("1.3" ) else "log_loss"
1750- clf = sklearn .pipeline .Pipeline (
1751- [
1752- (
1753- "cat_handling" ,
1754- ColumnTransformer (
1755- transformers = [
1756- (
1757- "cat" ,
1758- OneHotEncoder (handle_unknown = "ignore" ),
1759- x .select_dtypes (include = ["object" , "category" ]).columns ,
1760- )
1761- ],
1762- remainder = "passthrough" ,
1763- ),
1764- ),
1765- ("clf" , SGDClassifier (loss = loss , random_state = 1 )),
1766- ]
1767- )
1768- n_jobs = 2
1769- backend = "loky" if Version (joblib .__version__ ) > Version ("0.11" ) else "multiprocessing"
1770- with parallel_backend (backend , n_jobs = n_jobs ):
1771- res = openml .runs .functions ._run_task_get_arffcontent (
1772- extension = self .extension ,
1773- model = clf ,
1774- task = task ,
1775- add_local_measures = True ,
1776- n_jobs = n_jobs ,
1777- )
1778- # This unit test will fail if joblib is unable to distribute successfully since the
1779- # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it
1780- # is not and the mock call_count should remain 0 while the subsequent check of actual
1781- # results should also hold, only on successful distribution of tasks to workers.
1782- # The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold()
1783- # block and mocking this function doesn't affect rest of the pipeline, but is adequately
1784- # indicative if _run_model_on_fold() is being called or not.
1785- assert parallel_mock .call_count == 0
1786- assert isinstance (res [0 ], list )
1787- assert len (res [0 ]) == num_instances
1788- assert len (res [0 ][0 ]) == line_length
1789- assert len (res [2 ]) == 7
1790- assert len (res [3 ]) == 7
1791- expected_scores = [
1792- 0.9625 ,
1793- 0.953125 ,
1794- 0.965625 ,
1795- 0.9125 ,
1796- 0.98125 ,
1797- 0.975 ,
1798- 0.9247648902821317 ,
1799- 0.9404388714733543 ,
1800- 0.9780564263322884 ,
1801- 0.9623824451410659 ,
1802- ]
1803- scores = [v for k , v in res [2 ]["predictive_accuracy" ][0 ].items ()]
1804- np .testing .assert_array_almost_equal (
1805- scores ,
1806- expected_scores ,
1807- decimal = 2 ,
1808- err_msg = "Observed performance scores deviate from expected ones." ,
1809- )
18101737
1811- @pytest .mark .sklearn ()
1812- @unittest .skipIf (
1813- Version (sklearn .__version__ ) < Version ("0.21" ),
1814- reason = "couldn't perform local tests successfully w/o bloating RAM" ,
1815- )
1816- @mock .patch ("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs" )
1817- def test_joblib_backends (self , parallel_mock ):
1818- """Tests evaluation of a run using various joblib backends and n_jobs."""
1819- task = openml .tasks .get_task (7 ) # Supervised Classification on kr-vs-kp
1820- x , y = task .get_X_and_y ()
1821- num_instances = x .shape [0 ]
1822- line_length = 6 + len (task .class_labels )
1823-
1824- backend_choice = (
1825- "loky" if Version (joblib .__version__ ) > Version ("0.11" ) else "multiprocessing"
1826- )
1827- for n_jobs , backend , call_count in [
1828- (1 , backend_choice , 10 ),
1829- (2 , backend_choice , 10 ),
1830- (- 1 , backend_choice , 10 ),
1831- (1 , "threading" , 20 ),
1832- (- 1 , "threading" , 30 ),
1833- (1 , "sequential" , 40 ),
1834- ]:
1835- clf = sklearn .model_selection .RandomizedSearchCV (
1836- estimator = sklearn .pipeline .Pipeline (
1837- [
1838- (
1839- "cat_handling" ,
1840- ColumnTransformer (
1841- transformers = [
1842- (
1843- "cat" ,
1844- OneHotEncoder (handle_unknown = "ignore" ),
1845- x .select_dtypes (include = ["object" , "category" ]).columns ,
1846- )
1847- ],
1848- remainder = "passthrough" ,
1849- ),
1850- ),
1851- ("clf" , sklearn .ensemble .RandomForestClassifier (n_estimators = 5 )),
1852- ]
1853- ),
1854- param_distributions = {
1855- "clf__max_depth" : [3 , None ],
1856- "clf__max_features" : [1 , 2 , 3 , 4 ],
1857- "clf__min_samples_split" : [2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ],
1858- "clf__min_samples_leaf" : [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ],
1859- "clf__bootstrap" : [True , False ],
1860- "clf__criterion" : ["gini" , "entropy" ],
1861- },
1862- random_state = 1 ,
1863- cv = sklearn .model_selection .StratifiedKFold (
1864- n_splits = 2 ,
1865- shuffle = True ,
1866- random_state = 1 ,
1867- ),
1868- n_iter = 5 ,
1869- n_jobs = n_jobs ,
1870- )
1871- with parallel_backend (backend , n_jobs = n_jobs ):
1872- res = openml .runs .functions ._run_task_get_arffcontent (
1873- extension = self .extension ,
1874- model = clf ,
1875- task = task ,
1876- add_local_measures = True ,
1877- n_jobs = n_jobs ,
1878- )
1879- assert type (res [0 ]) == list
1880- assert len (res [0 ]) == num_instances
1881- assert len (res [0 ][0 ]) == line_length
1882- # usercpu_time_millis_* not recorded when n_jobs > 1
1883- # *_time_millis_* not recorded when n_jobs = -1
1884- assert len (res [2 ]["predictive_accuracy" ][0 ]) == 10
1885- assert len (res [3 ]["predictive_accuracy" ][0 ]) == 10
1886- assert parallel_mock .call_count == call_count
18871738
18881739 @unittest .skipIf (
18891740 Version (sklearn .__version__ ) < Version ("0.20" ),
@@ -1981,3 +1832,167 @@ def test_delete_unknown_run(mock_delete, test_files_directory, test_api_key):
19811832 run_url = "https://test.openml.org/api/v1/xml/run/9999999"
19821833 assert run_url == mock_delete .call_args .args [0 ]
19831834 assert test_api_key == mock_delete .call_args .kwargs .get ("params" , {}).get ("api_key" )
1835+
1836+
1837+ @pytest .mark .sklearn ()
1838+ @unittest .skipIf (
1839+ Version (sklearn .__version__ ) < Version ("0.21" ),
1840+ reason = "couldn't perform local tests successfully w/o bloating RAM" ,
1841+ )
1842+ @mock .patch ("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs" )
1843+ def test__run_task_get_arffcontent_2 (parallel_mock ):
1844+ """Tests if a run executed in parallel is collated correctly."""
1845+ task = openml .tasks .get_task (7 ) # Supervised Classification on kr-vs-kp
1846+ x , y = task .get_X_and_y ()
1847+ num_instances = x .shape [0 ]
1848+ line_length = 6 + len (task .class_labels )
1849+ loss = "log" if Version (sklearn .__version__ ) < Version ("1.3" ) else "log_loss"
1850+ clf = sklearn .pipeline .Pipeline (
1851+ [
1852+ (
1853+ "cat_handling" ,
1854+ ColumnTransformer (
1855+ transformers = [
1856+ (
1857+ "cat" ,
1858+ OneHotEncoder (handle_unknown = "ignore" ),
1859+ x .select_dtypes (include = ["object" , "category" ]).columns ,
1860+ )
1861+ ],
1862+ remainder = "passthrough" ,
1863+ ),
1864+ ),
1865+ ("clf" , SGDClassifier (loss = loss , random_state = 1 )),
1866+ ]
1867+ )
1868+ n_jobs = 2
1869+ backend = "loky" if Version (joblib .__version__ ) > Version ("0.11" ) else "multiprocessing"
1870+ from openml_sklearn import SklearnExtension
1871+ extension = SklearnExtension ()
1872+ with parallel_backend (backend , n_jobs = n_jobs ):
1873+ res = openml .runs .functions ._run_task_get_arffcontent (
1874+ extension = extension ,
1875+ model = clf ,
1876+ task = task ,
1877+ add_local_measures = True ,
1878+ n_jobs = n_jobs ,
1879+ )
1880+ # This unit test will fail if joblib is unable to distribute successfully since the
1881+ # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it
1882+ # is not and the mock call_count should remain 0 while the subsequent check of actual
1883+ # results should also hold, only on successful distribution of tasks to workers.
1884+ # The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold()
1885+ # block and mocking this function doesn't affect rest of the pipeline, but is adequately
1886+ # indicative if _run_model_on_fold() is being called or not.
1887+ assert parallel_mock .call_count == 0
1888+ assert isinstance (res [0 ], list )
1889+ assert len (res [0 ]) == num_instances
1890+ assert len (res [0 ][0 ]) == line_length
1891+ assert len (res [2 ]) == 7
1892+ assert len (res [3 ]) == 7
1893+ expected_scores = [
1894+ 0.9625 ,
1895+ 0.953125 ,
1896+ 0.965625 ,
1897+ 0.9125 ,
1898+ 0.98125 ,
1899+ 0.975 ,
1900+ 0.9247648902821317 ,
1901+ 0.9404388714733543 ,
1902+ 0.9780564263322884 ,
1903+ 0.9623824451410659 ,
1904+ ]
1905+ scores = [v for k , v in res [2 ]["predictive_accuracy" ][0 ].items ()]
1906+ np .testing .assert_array_almost_equal (
1907+ scores ,
1908+ expected_scores ,
1909+ decimal = 2 ,
1910+ err_msg = "Observed performance scores deviate from expected ones." ,
1911+ )
1912+
1913+
1914+ @pytest .mark .sklearn ()
1915+ @unittest .skipIf (
1916+ Version (sklearn .__version__ ) < Version ("0.21" ),
1917+ reason = "couldn't perform local tests successfully w/o bloating RAM" ,
1918+ )
1919+ @mock .patch ("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs" )
1920+ @pytest .mark .parametrize (
1921+ ("n_jobs" , "backend" , "call_count" ),
1922+ [
1923+ # `None` picks the backend based on joblib version (loky or multiprocessing) and
1924+ # spawns multiple processes if n_jobs != 1, which means the mock is not applied.
1925+ (2 , None , 0 ),
1926+ (- 1 , None , 0 ),
1927+ (1 , None , 10 ), # with n_jobs=1 the mock *is* applied, since there is no new subprocess
1928+ (1 , "sequential" , 10 ),
1929+ (1 , "threading" , 10 ),
1930+ (- 1 , "threading" , 10 ), # the threading backend does preserve mocks even with parallelizing
1931+ ]
1932+ )
1933+ def test_joblib_backends (parallel_mock , n_jobs , backend , call_count ):
1934+ """Tests evaluation of a run using various joblib backends and n_jobs."""
1935+ if backend is None :
1936+ backend = (
1937+ "loky" if Version (joblib .__version__ ) > Version ("0.11" ) else "multiprocessing"
1938+ )
1939+
1940+ task = openml .tasks .get_task (7 ) # Supervised Classification on kr-vs-kp
1941+ x , y = task .get_X_and_y ()
1942+ num_instances = x .shape [0 ]
1943+ line_length = 6 + len (task .class_labels )
1944+
1945+ clf = sklearn .model_selection .RandomizedSearchCV (
1946+ estimator = sklearn .pipeline .Pipeline (
1947+ [
1948+ (
1949+ "cat_handling" ,
1950+ ColumnTransformer (
1951+ transformers = [
1952+ (
1953+ "cat" ,
1954+ OneHotEncoder (handle_unknown = "ignore" ),
1955+ x .select_dtypes (include = ["object" , "category" ]).columns ,
1956+ )
1957+ ],
1958+ remainder = "passthrough" ,
1959+ ),
1960+ ),
1961+ ("clf" , sklearn .ensemble .RandomForestClassifier (n_estimators = 5 )),
1962+ ]
1963+ ),
1964+ param_distributions = {
1965+ "clf__max_depth" : [3 , None ],
1966+ "clf__max_features" : [1 , 2 , 3 , 4 ],
1967+ "clf__min_samples_split" : [2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ],
1968+ "clf__min_samples_leaf" : [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ],
1969+ "clf__bootstrap" : [True , False ],
1970+ "clf__criterion" : ["gini" , "entropy" ],
1971+ },
1972+ random_state = 1 ,
1973+ cv = sklearn .model_selection .StratifiedKFold (
1974+ n_splits = 2 ,
1975+ shuffle = True ,
1976+ random_state = 1 ,
1977+ ),
1978+ n_iter = 5 ,
1979+ n_jobs = n_jobs ,
1980+ )
1981+ from openml_sklearn import SklearnExtension
1982+ extension = SklearnExtension ()
1983+ with parallel_backend (backend , n_jobs = n_jobs ):
1984+ res = openml .runs .functions ._run_task_get_arffcontent (
1985+ extension = extension ,
1986+ model = clf ,
1987+ task = task ,
1988+ add_local_measures = True ,
1989+ n_jobs = n_jobs ,
1990+ )
1991+ assert type (res [0 ]) == list
1992+ assert len (res [0 ]) == num_instances
1993+ assert len (res [0 ][0 ]) == line_length
1994+ # usercpu_time_millis_* not recorded when n_jobs > 1
1995+ # *_time_millis_* not recorded when n_jobs = -1
1996+ assert len (res [2 ]["predictive_accuracy" ][0 ]) == 10
1997+ assert len (res [3 ]["predictive_accuracy" ][0 ]) == 10
1998+ assert parallel_mock .call_count == call_count
0 commit comments