From 9809bc75f4b725cd2a5c9066c4f9f60385b19487 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 08:43:43 -0400 Subject: [PATCH 01/19] refactoring to run all setuid file access on user-specific worker processes (akin to JupyterHub) --- fileglancer/filestore.py | 29 + fileglancer/server.py | 665 +++++++++++------------ fileglancer/settings.py | 4 + fileglancer/user_worker.py | 1025 ++++++++++++++++++++++++++++++++++++ fileglancer/worker_pool.py | 318 +++++++++++ 5 files changed, 1681 insertions(+), 360 deletions(-) create mode 100644 fileglancer/user_worker.py create mode 100644 fileglancer/worker_pool.py diff --git a/fileglancer/filestore.py b/fileglancer/filestore.py index ce3417d6..484ff74a 100644 --- a/fileglancer/filestore.py +++ b/fileglancer/filestore.py @@ -658,6 +658,35 @@ def stream_file_range(self, path: str = None, start: int = 0, end: int = 0, buff file_handle.close() + @staticmethod + def _stream_contents(file_handle, buffer_size: int = DEFAULT_BUFFER_SIZE) -> Generator[bytes, None, None]: + """Stream from an open file handle. Handle is closed when done.""" + try: + while True: + chunk = file_handle.read(buffer_size) + if not chunk: + break + yield chunk + finally: + file_handle.close() + + @staticmethod + def _stream_range(start: int, end: int, content_length: int, + file_handle, buffer_size: int = DEFAULT_BUFFER_SIZE) -> Generator[bytes, None, None]: + """Stream a byte range from an open file handle. Handle is closed when done.""" + try: + file_handle.seek(start) + remaining = content_length + while remaining > 0: + chunk_size = min(buffer_size, remaining) + chunk = file_handle.read(chunk_size) + if not chunk: + break + yield chunk + remaining -= len(chunk) + finally: + file_handle.close() + def rename_file_or_dir(self, old_path: str, new_path: str): """ Rename a file at the given old path to the new path. diff --git a/fileglancer/server.py b/fileglancer/server.py index 365a0b75..3f812223 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -38,6 +38,7 @@ from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext, UserContextConfigurationError from fileglancer.filestore import Filestore, RootCheckError from fileglancer.log import AccessLogMiddleware +from fileglancer.worker_pool import WorkerPool, WorkerError, WorkerDead from fileglancer import sshkeys from x2s3.utils import get_read_access_acl, get_nosuchbucket_response, get_error_response @@ -214,17 +215,47 @@ def create_app(settings): # Define ui_dir for serving static files and SPA ui_dir = PathLib(__file__).parent / "ui" + # Per-user persistent worker pool (only used when use_access_flags=True) + worker_pool = WorkerPool(settings) if settings.use_access_flags else None + def _get_user_context(username: str) -> UserContext: if settings.use_access_flags: return EffectiveUserContext(username) else: return CurrentUserContext() + async def _worker_exec(username: str, action: str, **kwargs): + """Dispatch an action to the per-user worker and return the result. - def _get_file_proxy_client(sharing_key: str, captured_path: str) -> Tuple[FileProxyClient | Response, UserContext | None, str]: - """Resolve a sharing key and captured path to a FileProxyClient. + When use_access_flags=True, dispatches to the persistent worker pool. + When use_access_flags=False (dev/test mode), runs the action directly + in the current process since no identity switching is needed. - Returns (client, user_context, subpath) on success, or (error_response, None, "") on failure. + Raises HTTPException on worker-level errors or dead workers. + """ + if worker_pool is not None: + try: + worker = await worker_pool.get_worker(username) + return await worker.execute(action, **kwargs) + except WorkerDead as e: + logger.error(f"Worker dead for {username}: {e}") + raise HTTPException(status_code=503, detail="Service temporarily unavailable") + except WorkerError as e: + raise # Let caller handle application-level errors + else: + # Dev/test mode: run action directly in-process + from fileglancer.user_worker import _ACTIONS, WorkerContext + handler = _ACTIONS.get(action) + if handler is None: + raise HTTPException(status_code=500, detail=f"Unknown action: {action}") + ctx = WorkerContext(username=username, db_url=settings.db_url) + request = {"action": action, **kwargs} + return handler(request, ctx) + + def _resolve_proxy_info(sharing_key: str, captured_path: str) -> Tuple[dict | Response, str]: + """Resolve a sharing key to proxy info (mount_path, target_name, username, subpath). + + Returns (info_dict, subpath) on success, or (error_response, "") on failure. """ def try_strip_prefix(captured: str, prefix: str) -> str | None: if captured == prefix: @@ -237,27 +268,25 @@ def try_strip_prefix(captured: str, prefix: str) -> str | None: proxied_path = db.get_proxied_path_by_sharing_key(session, sharing_key) if not proxied_path: - return get_nosuchbucket_response(captured_path), None, "" + return get_nosuchbucket_response(captured_path), "" - # Match captured_path against the stored url_prefix. - # The unquote() fallback handles clients like Vol-E viewer that send URLs - # with literal % characters instead of proper URL encoding — FastAPI - # auto-decodes path params, so we need to match the decoded form too. subpath = try_strip_prefix(captured_path, proxied_path.url_prefix) if subpath is None: subpath = try_strip_prefix(captured_path, unquote(proxied_path.url_prefix)) if subpath is None: - return get_error_response(404, "NoSuchKey", f"Path mismatch for sharing key {sharing_key}", captured_path), None, "" + return get_error_response(404, "NoSuchKey", f"Path mismatch for sharing key {sharing_key}", captured_path), "" fsp = db.get_file_share_path(session, proxied_path.fsp_name) if not fsp: - return get_error_response(400, "InvalidArgument", f"File share path {proxied_path.fsp_name} not found", captured_path), None, "" - # Expand ~ to user's home directory before constructing the mount path + return get_error_response(400, "InvalidArgument", f"File share path {proxied_path.fsp_name} not found", captured_path), "" expanded_mount_path = os.path.expanduser(fsp.mount_path) mount_path = f"{expanded_mount_path}/{proxied_path.path}" target_name = captured_path.rsplit('/', 1)[-1] if captured_path else os.path.basename(proxied_path.path) - # Use 256KB buffer for better performance on network filesystems - return FileProxyClient(proxy_kwargs={'target_name': target_name}, path=mount_path, buffer_size=256*1024), _get_user_context(proxied_path.username), subpath + return { + "mount_path": mount_path, + "target_name": target_name, + "username": proxied_path.username, + }, subpath @asynccontextmanager @@ -352,6 +381,11 @@ def mask_password(url: str) -> str: else: logger.debug(f"No notifications file found at {notifications_file}") + # Start worker pool eviction loop (only when using access flags) + if worker_pool is not None: + await worker_pool.start_eviction_loop() + logger.info("Worker pool started") + # Start cluster job monitor try: await apps_module.start_job_monitor() @@ -368,6 +402,14 @@ def mask_password(url: str) -> str: except Exception as e: logger.warning(f"Error stopping cluster job monitor: {e}") + # Cleanup: shut down all workers + if worker_pool is not None: + try: + await worker_pool.shutdown_all() + logger.info("Worker pool shut down") + except Exception as e: + logger.warning(f"Error shutting down worker pool: {e}") + app = FastAPI(lifespan=lifespan) # Add custom access log middleware @@ -927,14 +969,18 @@ async def create_proxied_path(fsp_name: str = Query(..., description="The name o _validate_url_prefix(url_prefix) sharing_name = url_prefix logger.info(f"Creating proxied path for {username} with sharing name {sharing_name} and fsp_name {fsp_name} and path {path} (url_prefix={url_prefix})") + # Validate the user can access the path via worker + validation = await _worker_exec(username, "validate_proxied_path", fsp_name=fsp_name, path=path) + if "error" in validation: + raise HTTPException(status_code=400, detail=validation["error"]) + with db.get_db_session(settings.db_url) as session: - with _get_user_context(username): # Necessary to validate the user can access the proxied path - try: - new_path = db.create_proxied_path(session, username, sharing_name, fsp_name, path, url_prefix=url_prefix) - return _convert_proxied_path(new_path, settings.external_proxy_url) - except ValueError as e: - logger.error(f"Error creating proxied path: {e}") - raise HTTPException(status_code=400, detail=str(e)) + try: + new_path = db.create_proxied_path(session, username, sharing_name, fsp_name, path, url_prefix=url_prefix) + return _convert_proxied_path(new_path, settings.external_proxy_url) + except ValueError as e: + logger.error(f"Error creating proxied path: {e}") + raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/proxied-path", response_model=ProxiedPathResponse, @@ -969,14 +1015,25 @@ async def update_proxied_path(sharing_key: str = Path(..., description="The shar path: Optional[str] = Query(default=None, description="The path relative to the file share path mount point"), sharing_name: Optional[str] = Query(default=None, description="The sharing path of the proxied path"), username: str = Depends(get_current_user)): + # If path or fsp_name is changing, validate access via worker + if path is not None or fsp_name is not None: + with db.get_db_session(settings.db_url) as session: + existing = db.get_proxied_path_by_sharing_key(session, sharing_key) + if existing: + validate_fsp = fsp_name or existing.fsp_name + validate_path = path or existing.path + validation = await _worker_exec(username, "validate_proxied_path", + fsp_name=validate_fsp, path=validate_path) + if "error" in validation: + raise HTTPException(status_code=400, detail=validation["error"]) + with db.get_db_session(settings.db_url) as session: - with _get_user_context(username): # Necessary to validate the user can access the proxied path - try: - updated = db.update_proxied_path(session, username, sharing_key, new_path=path, new_sharing_name=sharing_name, new_fsp_name=fsp_name) - return _convert_proxied_path(updated, settings.external_proxy_url) - except ValueError as e: - logger.error(f"Error updating proxied path: {e}") - raise HTTPException(status_code=400, detail=str(e)) + try: + updated = db.update_proxied_path(session, username, sharing_key, new_path=path, new_sharing_name=sharing_name, new_fsp_name=fsp_name) + return _convert_proxied_path(updated, settings.external_proxy_url) + except ValueError as e: + logger.error(f"Error updating proxied path: {e}") + raise HTTPException(status_code=400, detail=str(e)) @app.delete("/api/proxied-path/{sharing_key}", description="Delete a proxied path by sharing key") @@ -1060,41 +1117,81 @@ async def target_dispatcher(request: Request, if 'acl' in request.query_params: return get_read_access_acl() - client, ctx, subpath = _get_file_proxy_client(sharing_key, path) - if isinstance(client, Response): - return client + info, subpath = _resolve_proxy_info(sharing_key, path) + if isinstance(info, Response): + return info if list_type: if list_type == 2: - with ctx: - return await client.list_objects_v2(continuation_token, delimiter, \ - encoding_type, fetch_owner, max_keys, prefix, start_after) + result = await _worker_exec(info["username"], "s3_list_objects", + mount_path=info["mount_path"], + target_name=info["target_name"], + continuation_token=continuation_token, + delimiter=delimiter, + encoding_type=encoding_type, + fetch_owner=fetch_owner, + max_keys=max_keys, + prefix=prefix, + start_after=start_after) + return Response(content=result["body"], media_type=result.get("media_type", "application/xml"), + status_code=result.get("status_code", 200)) else: return get_error_response(400, "InvalidArgument", f"Invalid list type {list_type}", path) else: range_header = request.headers.get("range") - # Open file in user context, then immediately exit - # The file descriptor retains access rights after we switch back to root - with ctx: - handle = await client.open_object(subpath, range_header) - - # Context exited! Now stream without holding the lock - if isinstance(handle, ObjectHandle): - return client.stream_object(handle) + result = await _worker_exec(info["username"], "s3_open_object", + mount_path=info["mount_path"], + target_name=info["target_name"], + path=subpath, + range_header=range_header) + + if result.get("type") == "handle": + # Worker validated access and returned file metadata + # Open the file in main process (root can read anything) + resolved_path = result["resolved_path"] + if resolved_path is None: + return get_error_response(404, "NoSuchKey", "File not found", subpath) + + file_handle = open(resolved_path, "rb") + from x2s3.client_file import FileObjectHandle, file_iterator + handle = FileObjectHandle( + target_name=result["target_name"], + key=result["key"], + status_code=result["status_code"], + headers=result["headers"], + media_type=result.get("media_type"), + content_length=result["content_length"], + file_handle=file_handle, + start=result["start"], + end=result["end"], + ) + return StreamingResponse( + file_iterator(handle, 256 * 1024), + status_code=handle.status_code, + headers=handle.headers, + media_type=handle.media_type, + ) else: - # Error response (e.g., file not found, invalid range) - return handle + # Error response + return Response( + content=result.get("body", ""), + status_code=result.get("status_code", 500), + headers=result.get("headers", {}), + ) @app.head("/files/{sharing_key}/{path:path}") async def head_object(sharing_key: str, path: str = ''): try: - client, ctx, subpath = _get_file_proxy_client(sharing_key, path) - if isinstance(client, Response): - return client - with ctx: - return await client.head_object(subpath) + info, subpath = _resolve_proxy_info(sharing_key, path) + if isinstance(info, Response): + return info + result = await _worker_exec(info["username"], "s3_head_object", + mount_path=info["mount_path"], + target_name=info["target_name"], + path=subpath) + return Response(headers=result.get("headers", {}), status_code=result.get("status_code", 200)) except: logger.opt(exception=sys.exc_info()).info("Error requesting head") return get_error_response(500, "InternalError", "Error requesting HEAD", path) @@ -1130,59 +1227,18 @@ def _get_filestore(path_name: str): @app.get("/api/profile", description="Get the current user's profile") async def get_profile(username: str = Depends(get_current_user)): """Get the current user's profile""" - with _get_user_context(username): - - # Find matching file share path for home directory - with db.get_db_session(settings.db_url) as session: - paths = db.get_file_share_paths(session) - - # First, check if there's a "home" FSP (for ~/ paths) - home_fsp = next((fsp for fsp in paths if fsp.mount_path in ('~', '~/')), None) - if home_fsp: - home_directory_name = "." - else: - # If no "home" FSP exists, fall back to finding by mount path - home_directory_path = os.path.expanduser(f"~{username}") - home_parent = os.path.dirname(home_directory_path) - home_fsp = next((fsp for fsp in paths if fsp.mount_path == home_parent), None) - home_directory_name = os.path.basename(home_directory_path) - - home_fsp_name = home_fsp.name if home_fsp else None - - # Get user groups - user_groups = [] - try: - user_info = pwd.getpwnam(username) - all_groups = grp.getgrall() - for group in all_groups: - if username in group.gr_mem: - user_groups.append(group.gr_name) - primary_group = grp.getgrgid(user_info.pw_gid).gr_name - if primary_group not in user_groups: - user_groups.append(primary_group) - except Exception as e: - logger.error(f"Error getting groups for user {username}: {str(e)}") - - return { - "username": username, - "homeFileSharePathName": home_fsp_name, - "homeDirectoryName": home_directory_name, - "groups": user_groups, - } + result = await _worker_exec(username, "get_profile") + return result # SSH Key Management endpoints @app.get("/api/ssh-keys", response_model=sshkeys.SSHKeyListResponse, description="List Fileglancer-managed SSH keys") async def list_ssh_keys(username: str = Depends(get_current_user)): """List SSH keys with 'fileglancer' in the comment from authorized_keys""" - with _get_user_context(username): - try: - ssh_dir = sshkeys.get_ssh_directory() - keys = sshkeys.list_ssh_keys(ssh_dir) - return sshkeys.SSHKeyListResponse(keys=keys) - except Exception as e: - logger.error(f"Error listing SSH keys for {username}: {e}") - raise HTTPException(status_code=500, detail=str(e)) + result = await _worker_exec(username, "list_ssh_keys") + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + return sshkeys.SSHKeyListResponse(keys=[sshkeys.SSHKeyInfo(**k) for k in result["keys"]]) @app.post("/api/ssh-keys/generate-temp", description="Generate a temporary SSH key and return private key for one-time copy") @@ -1194,21 +1250,23 @@ async def generate_temp_ssh_key( The private key is streamed securely and the temporary files are deleted after the response is sent. Key info is included in response headers: - - X-SSH-Key-Filename - - X-SSH-Key-Type - X-SSH-Key-Fingerprint - X-SSH-Key-Comment """ - with _get_user_context(username): - try: - ssh_dir = sshkeys.get_ssh_directory() - return sshkeys.generate_temp_key_and_authorize(ssh_dir, request.passphrase) - - except RuntimeError as e: - raise HTTPException(status_code=500, detail=str(e)) - except Exception as e: - logger.error(f"Error generating temp SSH key for {username}: {e}") - raise HTTPException(status_code=500, detail=str(e)) + result = await _worker_exec(username, "generate_ssh_key", passphrase=request.passphrase) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + # Reconstruct the response with headers + headers = {} + if result.get("fingerprint"): + headers["X-SSH-Key-Fingerprint"] = result["fingerprint"] + if result.get("comment"): + headers["X-SSH-Key-Comment"] = result["comment"] + return Response( + content=result["private_key"], + media_type="application/x-pem-file", + headers=headers, + ) # File content endpoint @app.head("/api/content/{path_name:path}") @@ -1222,40 +1280,32 @@ async def head_file_content(path_name: str, else: filestore_name, _, subpath = path_name.partition('/') - with _get_user_context(username): - filestore, error = _get_filestore(filestore_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) - - file_name = subpath.split('/')[-1] if subpath else '' - content_type = guess_content_type(file_name) - - try: - file_info = filestore.get_file_info(subpath) - - is_binary = filestore.check_is_binary(subpath) - - headers = { - 'Accept-Ranges': 'bytes', - 'X-Is-Binary': 'true' if is_binary else 'false', - } - - if content_type == 'application/octet-stream' and file_name: - headers['Content-Disposition'] = f'attachment; filename="{file_name}"' - - if hasattr(file_info, 'size') and file_info.size is not None: - headers['Content-Length'] = str(file_info.size) - - if hasattr(file_info, 'last_modified') and file_info.last_modified is not None: - headers['Last-Modified'] = format_timestamp(file_info.last_modified) - - return Response(status_code=200, headers=headers, media_type=content_type) + result = await _worker_exec(username, "head_file", fsp_name=filestore_name, subpath=subpath) + if result.get("redirect"): + redirect_url = f"/api/content/{result['fsp_name']}" + if result.get("subpath"): + redirect_url += f"?subpath={result['subpath']}" + return RedirectResponse(url=redirect_url, status_code=307) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + + info = result["info"] + file_name = subpath.split('/')[-1] if subpath else '' + content_type = result["content_type"] + is_binary = result["is_binary"] + + headers = { + 'Accept-Ranges': 'bytes', + 'X-Is-Binary': 'true' if is_binary else 'false', + } + if content_type == 'application/octet-stream' and file_name: + headers['Content-Disposition'] = f'attachment; filename="{file_name}"' + if info.get("size") is not None: + headers['Content-Length'] = str(info["size"]) + if info.get("last_modified") is not None: + headers['Last-Modified'] = format_timestamp(info["last_modified"]) - except FileNotFoundError: - logger.warning(f"File not found in {filestore_name}: {subpath}") - raise HTTPException(status_code=404, detail="File not found") - except PermissionError: - raise HTTPException(status_code=403, detail="Permission denied") + return Response(status_code=200, headers=headers, media_type=content_type) @app.get("/api/content/{path_name:path}") @@ -1267,59 +1317,25 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s else: filestore_name, _, subpath = path_name.partition('/') - # Open file with user's permissions, then immediately release the context - # The file descriptor retains the access rights after we switch back to root - with _get_user_context(username): - filestore, error = _get_filestore(filestore_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) + # Worker validates path and returns metadata (runs as user) + result = await _worker_exec(username, "open_file", fsp_name=filestore_name, subpath=subpath) - file_name = subpath.split('/')[-1] if subpath else '' - content_type = guess_content_type(file_name) + if result.get("redirect"): + redirect_url = f"/api/content/{result['fsp_name']}" + if result.get("subpath"): + redirect_url += f"?subpath={result['subpath']}" + return RedirectResponse(url=redirect_url, status_code=307) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) - try: - file_info = filestore.get_file_info(subpath) - if file_info.is_dir: - raise HTTPException(status_code=400, detail="Cannot download directory content") - - file_size = file_info.size - - # Open the file while we have user's permissions - full_path = filestore._check_path_in_root(subpath) - file_handle = open(full_path, 'rb') - - except RootCheckError as e: - # Path attempts to escape root directory - try to find a valid fsp for this absolute path - logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}") - - # Use the full_path from the exception - full_path = e.full_path - - with db.get_db_session(settings.db_url) as session: - match = db.find_fsp_from_absolute_path(session, full_path) + full_path = result["full_path"] + file_size = result["file_size"] + content_type = result["content_type"] + file_name = subpath.split('/')[-1] if subpath else '' - if match: - fsp, relative_subpath = match - # Construct the correct URL - if relative_subpath: - redirect_url = f"/api/content/{fsp.name}?subpath={relative_subpath}" - else: - redirect_url = f"/api/content/{fsp.name}" - - logger.info(f"Redirecting from /api/content/{filestore_name}?subpath={subpath} to {redirect_url}") - return RedirectResponse(url=redirect_url, status_code=307) - - # If no match found, return the original error message - logger.error(f"No valid file share found for path: {full_path}") - raise HTTPException(status_code=400, detail=str(e)) - except FileNotFoundError: - logger.error(f"File not found in {filestore_name}: {subpath}") - raise HTTPException(status_code=404, detail="File or directory not found") - except PermissionError: - raise HTTPException(status_code=403, detail="Permission denied") - - # Context exited! We're back to root, but file_handle retains user's access rights - # Now we can stream the file asynchronously without holding the user context lock + # Open file in main process — the worker validated access; + # main process runs as root so it can open the validated path + file_handle = open(full_path, 'rb') range_header = request.headers.get('Range') @@ -1344,8 +1360,10 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s if content_type == 'application/octet-stream' and file_name: headers['Content-Disposition'] = f'attachment; filename="{file_name}"' + # Construct a temporary filestore just for streaming + # (stream_file_range only needs the file_handle) return StreamingResponse( - filestore.stream_file_range(start=start, end=end, file_handle=file_handle), + Filestore._stream_range(start=start, end=end, content_length=content_length, file_handle=file_handle), status_code=206, headers=headers, media_type=content_type @@ -1360,7 +1378,7 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s headers['Content-Disposition'] = f'attachment; filename="{file_name}"' return StreamingResponse( - filestore.stream_file_contents(file_handle=file_handle), + Filestore._stream_contents(file_handle=file_handle), status_code=200, headers=headers, media_type=content_type @@ -1379,73 +1397,27 @@ async def get_file_metadata(path_name: str, subpath: Optional[str] = Query(''), else: filestore_name, _, subpath = path_name.partition('/') - with _get_user_context(username): - filestore, error = _get_filestore(filestore_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) - - try: - with db.get_db_session(settings.db_url) as session: - file_info = filestore.get_file_info(subpath, current_user=username, session=session) - logger.trace(f"File info: {file_info}") - - result = {"info": json.loads(file_info.model_dump_json())} - - if file_info.is_dir: - try: - if limit is not None: - files, has_more, next_cursor, total_count = filestore.yield_file_infos_paginated( - subpath, current_user=username, session=session, - limit=limit, cursor=cursor - ) - result["files"] = [json.loads(f.model_dump_json()) for f in files] - result["has_more"] = has_more - result["next_cursor"] = next_cursor - result["total_count"] = total_count - else: - files = list(filestore.yield_file_infos(subpath, current_user=username, session=session)) - result["files"] = [json.loads(f.model_dump_json()) for f in files] - except PermissionError: - logger.error(f"Permission denied when listing files in directory: {subpath}") - result["files"] = [] - result["error"] = "Permission denied when listing directory contents" - return JSONResponse(content=result, status_code=403) - except FileNotFoundError: - logger.error(f"Directory not found during listing: {subpath}") - result["files"] = [] - result["error"] = "Directory contents not found" - return JSONResponse(content=result, status_code=404) - - return result - - except RootCheckError as e: - # Path attempts to escape root directory - try to find a valid fsp for this absolute path - logger.info(f"RootCheckError caught for {filestore_name}/{subpath}: {e}") - - full_path = e.full_path - - with db.get_db_session(settings.db_url) as session: - match = db.find_fsp_from_absolute_path(session, full_path) - - if match: - fsp, relative_subpath = match - # Construct the correct URL - if relative_subpath: - redirect_url = f"/api/files/{fsp.name}?subpath={relative_subpath}" - else: - redirect_url = f"/api/files/{fsp.name}" - - logger.info(f"Redirecting from /api/files/{filestore_name}?subpath={subpath} to {redirect_url}") - return RedirectResponse(url=redirect_url, status_code=307) - - # If no match found, return the original error message - logger.error(f"No valid file share found for path: {full_path}") - raise HTTPException(status_code=400, detail=str(e)) - except FileNotFoundError: - logger.error(f"File or directory not found: {subpath}") - raise HTTPException(status_code=404, detail="File or directory not found") - except PermissionError: - raise HTTPException(status_code=403, detail="Permission denied") + if limit is not None: + result = await _worker_exec(username, "list_dir_paged", + fsp_name=filestore_name, subpath=subpath, + limit=limit, cursor=cursor) + else: + result = await _worker_exec(username, "list_dir", + fsp_name=filestore_name, subpath=subpath) + + if result.get("redirect"): + redirect_url = f"/api/files/{result['fsp_name']}" + if result.get("subpath"): + redirect_url += f"?subpath={result['subpath']}" + return RedirectResponse(url=redirect_url, status_code=307) + if "error" in result and "status_code" in result: + status_code = result["status_code"] + if status_code == 403 or status_code == 404: + return JSONResponse(content=result, status_code=status_code) + raise HTTPException(status_code=status_code, detail=result["error"]) + if "error" in result: + raise HTTPException(status_code=500, detail=result["error"]) + return result @app.post("/api/files/{path_name}") @@ -1474,30 +1446,19 @@ async def create_file_or_dir(path_name: str, # Use the validated and sanitized path for all operations validated_subpath = normalized_path - with _get_user_context(username): - filestore, error = _get_filestore(path_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) - - try: - file_type = body.get("type") - if file_type == "directory": - logger.info(f"User {username} creating directory {path_name}/{validated_subpath}") - # Path is validated above - safe to use in filesystem operation - filestore.create_dir(validated_subpath) - elif file_type == "file": - logger.info(f"User {username} creating file {path_name}/{validated_subpath}") - # Path is validated above - safe to use in filesystem operation - filestore.create_empty_file(validated_subpath) - else: - raise HTTPException(status_code=400, detail="Invalid file type") - - except FileExistsError: - raise HTTPException(status_code=409, detail="A file or directory with this name already exists") - except PermissionError as e: - raise HTTPException(status_code=403, detail=str(e)) + file_type = body.get("type") + if file_type == "directory": + logger.info(f"User {username} creating directory {path_name}/{validated_subpath}") + result = await _worker_exec(username, "create_dir", fsp_name=path_name, subpath=validated_subpath) + elif file_type == "file": + logger.info(f"User {username} creating file {path_name}/{validated_subpath}") + result = await _worker_exec(username, "create_file", fsp_name=path_name, subpath=validated_subpath) + else: + raise HTTPException(status_code=400, detail="Invalid file type") - return JSONResponse(status_code=201, content={"message": "Item created"}) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + return JSONResponse(status_code=201, content={"message": "Item created"}) @app.patch("/api/files/{path_name}") @@ -1506,47 +1467,26 @@ async def update_file_or_dir(path_name: str, body: Dict = Body(...), username: str = Depends(get_current_user)): """Handle PATCH requests to rename or update file permissions""" - with _get_user_context(username): - filestore, error = _get_filestore(path_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) - old_file_info = filestore.get_file_info(subpath, username) - new_path = body.get("path") - new_permissions = body.get("permissions") - - # Validate and sanitize new_path if renaming - validated_new_path = new_path - if new_path is not None and new_path != old_file_info.path: - # Normalize the path to prevent path traversal - normalized_new_path = os.path.normpath(new_path) - - # Security check: Ensure normalized path doesn't escape directory - if normalized_new_path.startswith('..') or os.path.isabs(normalized_new_path): - raise HTTPException(status_code=400, detail="New path cannot escape the current directory") - - # Validate the filename portion for invalid characters - new_filename = os.path.basename(normalized_new_path) - _validate_filename(new_filename) - - # Use the validated path - validated_new_path = normalized_new_path - - try: - if new_permissions is not None and new_permissions != old_file_info.permissions: - logger.info(f"User {username} changing permissions of {old_file_info.absolute_path} to {new_permissions}") - filestore.change_file_permissions(subpath, new_permissions) - - if new_path is not None and new_path != old_file_info.path: - logger.info(f"User {username} renaming {old_file_info.absolute_path} to {validated_new_path}") - # Path is validated above - safe to use in filesystem operation - filestore.rename_file_or_dir(old_file_info.path, validated_new_path) - - except PermissionError as e: - raise HTTPException(status_code=403, detail=str(e)) - except OSError as e: - raise HTTPException(status_code=500, detail=str(e)) - - return JSONResponse(status_code=200, content={"message": "Permissions changed"}) + new_path = body.get("path") + new_permissions = body.get("permissions") + + # Validate and sanitize new_path if renaming + validated_new_path = new_path + if new_path is not None: + normalized_new_path = os.path.normpath(new_path) + if normalized_new_path.startswith('..') or os.path.isabs(normalized_new_path): + raise HTTPException(status_code=400, detail="New path cannot escape the current directory") + new_filename = os.path.basename(normalized_new_path) + _validate_filename(new_filename) + validated_new_path = normalized_new_path + + result = await _worker_exec(username, "update_file", + fsp_name=path_name, subpath=subpath, + new_path=validated_new_path, + new_permissions=new_permissions) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + return JSONResponse(status_code=200, content={"message": "Permissions changed"}) @app.delete("/api/files/{fsp_name}") @@ -1554,18 +1494,11 @@ async def delete_file_or_dir(fsp_name: str, subpath: Optional[str] = Query(''), username: str = Depends(get_current_user)): """Handle DELETE requests to remove a file or (empty) directory""" - with _get_user_context(username): - filestore, error = _get_filestore(fsp_name) - if filestore is None: - raise HTTPException(status_code=404 if "not found" in error else 500, detail=error) - - try: - logger.info(f"User {username} deleting {filestore.get_root_path()}/{subpath}") - filestore.remove_file_or_dir(subpath) - except PermissionError as e: - raise HTTPException(status_code=403, detail=str(e)) - - return JSONResponse(status_code=200, content={"message": "Item deleted"}) + logger.info(f"User {username} deleting {fsp_name}/{subpath}") + result = await _worker_exec(username, "delete", fsp_name=fsp_name, subpath=subpath) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) + return JSONResponse(status_code=200, content={"message": "Item deleted"}) # --- Apps & Jobs API --- @@ -1751,14 +1684,8 @@ async def update_user_app(body: ManifestFetchRequest, description="Validate file/directory paths for app parameters") async def validate_paths(body: PathValidationRequest, username: str = Depends(get_current_user)): - errors = {} - with _get_user_context(username): - with db.get_db_session(settings.db_url) as session: - for param_key, path_value in body.paths.items(): - error = apps_module.validate_path_in_filestore(path_value, session) - if error: - errors[param_key] = error - return PathValidationResponse(errors=errors) + result = await _worker_exec(username, "validate_paths", paths=body.paths) + return PathValidationResponse(errors=result.get("errors", {})) @app.get("/api/cluster-defaults", description="Get cluster configuration defaults") @@ -1791,8 +1718,7 @@ async def submit_job(body: JobSubmitRequest, container=body.container, container_args=body.container_args, ) - with _get_user_context(username): - return _convert_job(db_job) + return _convert_job(db_job) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: @@ -1805,8 +1731,17 @@ async def get_jobs(status: Optional[str] = Query(None, description="Filter by st username: str = Depends(get_current_user)): with db.get_db_session(settings.db_url) as session: db_jobs = db.get_jobs_by_username(session, username, status) - with _get_user_context(username): - jobs = [_convert_job(j) for j in db_jobs] + # For listing, read service_url for running service jobs via worker + jobs = [] + for j in db_jobs: + service_url = None + if getattr(j, 'entry_point_type', 'job') == 'service' and j.status == 'RUNNING': + try: + result = await _worker_exec(username, "get_service_url", job_id=j.id) + service_url = result.get("service_url") + except Exception: + pass + jobs.append(_convert_job(j, service_url=service_url)) return JobResponse(jobs=jobs) @app.get("/api/jobs/{job_id}", response_model=Job, @@ -1817,8 +1752,16 @@ async def get_job(job_id: int, db_job = db.get_job(session, job_id, username) if db_job is None: raise HTTPException(status_code=404, detail="Job not found") - with _get_user_context(username): - return _convert_job(db_job, include_files=True) + # Read file paths and service URL via worker + files_result = await _worker_exec(username, "get_job_file_paths", job_id=job_id) + service_url = None + if getattr(db_job, 'entry_point_type', 'job') == 'service' and db_job.status == 'RUNNING': + try: + svc_result = await _worker_exec(username, "get_service_url", job_id=job_id) + service_url = svc_result.get("service_url") + except Exception: + pass + return _convert_job(db_job, service_url=service_url, files=files_result.get("files")) @app.post("/api/jobs/{job_id}/cancel", description="Cancel a running job") @@ -1826,8 +1769,7 @@ async def cancel_job(job_id: int, username: str = Depends(get_current_user)): try: db_job = await apps_module.cancel_job(job_id, username) - with _get_user_context(username): - return _convert_job(db_job) + return _convert_job(db_job) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -1849,12 +1791,14 @@ async def get_job_file(job_id: int, if file_type not in ("script", "stdout", "stderr"): raise HTTPException(status_code=400, detail="file_type must be script, stdout, or stderr") try: - with _get_user_context(username): - content = await apps_module.get_job_file_content(job_id, username, file_type) + result = await _worker_exec(username, "get_job_file", job_id=job_id, file_type=file_type) + if "error" in result: + raise HTTPException(status_code=result.get("status_code", 404), detail=result["error"]) + content = result.get("content") if content is None: raise HTTPException(status_code=404, detail=f"File not found: {file_type}") return PlainTextResponse(content) - except ValueError as e: + except WorkerError as e: raise HTTPException(status_code=404, detail=str(e)) def _ensure_utc(dt: Optional[datetime]) -> Optional[datetime]: @@ -1870,11 +1814,12 @@ def _ensure_utc(dt: Optional[datetime]) -> Optional[datetime]: return dt.replace(tzinfo=UTC) return dt - def _convert_job(db_job: db.JobDB, include_files: bool = False) -> Job: - """Convert a database JobDB to a Pydantic Job model.""" - files = None - if include_files: - files = apps_module.get_job_file_paths(db_job) + def _convert_job(db_job: db.JobDB, service_url: str = None, files: dict = None) -> Job: + """Convert a database JobDB to a Pydantic Job model. + + File-reading fields (service_url, files) must be passed in pre-computed + by the caller, since they require user-context file I/O. + """ return Job( id=db_job.id, app_url=db_job.app_url, @@ -1894,7 +1839,7 @@ def _convert_job(db_job: db.JobDB, include_files: bool = False) -> Job: container_args=db_job.container_args, pull_latest=db_job.pull_latest, cluster_job_id=db_job.cluster_job_id, - service_url=apps_module.get_service_url(db_job), + service_url=service_url, created_at=_ensure_utc(db_job.created_at), started_at=_ensure_utc(db_job.started_at), finished_at=_ensure_utc(db_job.finished_at), diff --git a/fileglancer/settings.py b/fileglancer/settings.py index 3b21bdef..8b597789 100644 --- a/fileglancer/settings.py +++ b/fileglancer/settings.py @@ -100,6 +100,10 @@ class Settings(BaseSettings): # Useful for setting up scheduler env (e.g., /misc/lsf/conf/profile.lsf). env_source_script: Optional[str] = None + # Worker pool settings + worker_pool_max_workers: int = 50 + worker_pool_idle_timeout: int = 300 # seconds + # Cluster / Apps settings (mirrors py-cluster-api ClusterConfig) cluster: ClusterSettings = ClusterSettings() diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py new file mode 100644 index 00000000..0777085b --- /dev/null +++ b/fileglancer/user_worker.py @@ -0,0 +1,1025 @@ +"""Persistent per-user worker subprocess. + +This module is the entry point for long-lived worker subprocesses spawned by +WorkerPool. Each worker runs as a single user (identity set at fork time) +and handles all user-scoped operations: file I/O, cluster jobs, git ops, +SSH key management, etc. + +Protocol: + - IPC over a Unix socketpair (fd passed via FGC_WORKER_FD env var) + - Messages are length-prefixed JSON: 4-byte big-endian length + JSON body + - Worker reads requests, dispatches to action handlers, writes responses + - {"action": "shutdown"} triggers a clean exit + +The worker runs a synchronous request/response loop. Cluster operations +that use py-cluster-api's async API are run via _run_async() per-request. + +In dev/test mode (use_access_flags=False), action handlers are called +directly in-process from server.py, so _run_async() must handle being +called from within an existing event loop. +""" + +from __future__ import annotations + +import asyncio +import ctypes +import ctypes.util +import json +import logging +import os +import pwd +import socket +import struct +import sys +from pathlib import Path +from typing import Any, Optional + +from loguru import logger + + +# Length-prefix format: 4-byte big-endian unsigned int +_HEADER_FMT = "!I" +_HEADER_SIZE = struct.calcsize(_HEADER_FMT) + + +def _run_async(coro): + """Run an async coroutine, handling both subprocess and in-process contexts. + + In subprocess mode (no running event loop), uses asyncio.run(). + In dev/test mode (called from within FastAPI's event loop), uses + a new event loop in a thread to avoid "cannot be called from a running loop". + """ + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop — we're in the subprocess + return asyncio.run(coro) + else: + # Inside an event loop (dev/test mode) — run in a thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(asyncio.run, coro) + return future.result() + + +def _set_pdeathsig(): + """Ask the kernel to send SIGTERM when our parent process dies. + + This prevents orphan workers if the main process is killed without + a chance to clean up. Linux-only (PR_SET_PDEATHSIG = 1). + """ + try: + import signal + libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) + PR_SET_PDEATHSIG = 1 + result = libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM, 0, 0, 0) + if result != 0: + logger.warning(f"prctl(PR_SET_PDEATHSIG) failed: errno={ctypes.get_errno()}") + except Exception as e: + logger.warning(f"Could not set PR_SET_PDEATHSIG: {e}") + + +def _send(sock: socket.socket, data: dict): + """Send a length-prefixed JSON message.""" + payload = json.dumps(data, default=str).encode() + header = struct.pack(_HEADER_FMT, len(payload)) + sock.sendall(header + payload) + + +def _recv(sock: socket.socket) -> dict: + """Receive a length-prefixed JSON message.""" + header = _recvall(sock, _HEADER_SIZE) + if header is None: + raise ConnectionError("Parent closed connection") + (length,) = struct.unpack(_HEADER_FMT, header) + payload = _recvall(sock, length) + if payload is None: + raise ConnectionError("Parent closed connection mid-message") + return json.loads(payload) + + +def _recvall(sock: socket.socket, n: int) -> Optional[bytes]: + """Read exactly n bytes from socket.""" + data = bytearray() + while len(data) < n: + chunk = sock.recv(n - len(data)) + if not chunk: + return None + data.extend(chunk) + return bytes(data) + + +# --------------------------------------------------------------------------- +# Action handlers — file operations +# --------------------------------------------------------------------------- + +def _get_filestore(fsp_name: str, db_url: str): + """Look up a FileSharePath and return a Filestore instance.""" + from fileglancer import database as db + from fileglancer.filestore import Filestore + + with db.get_db_session(db_url) as session: + fsp = db.get_file_share_path(session, fsp_name) + if fsp is None: + return None, f"File share path '{fsp_name}' not found" + + filestore = Filestore(fsp) + try: + filestore.get_file_info(None) + except FileNotFoundError: + return None, f"File share path '{fsp_name}' is not mounted" + + return filestore, None + + +def _action_list_dir(request: dict, ctx: WorkerContext) -> dict: + """List directory contents.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + current_user = ctx.username + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error, "status_code": 404 if "not found" in error else 500} + + from fileglancer import database as db + from fileglancer.filestore import RootCheckError + + try: + with db.get_db_session(ctx.db_url) as session: + file_info = filestore.get_file_info(subpath, current_user=current_user, session=session) + result = {"info": json.loads(file_info.model_dump_json())} + + if file_info.is_dir: + try: + files = list(filestore.yield_file_infos(subpath, current_user=current_user, session=session)) + result["files"] = [json.loads(f.model_dump_json()) for f in files] + except PermissionError: + result["files"] = [] + result["error"] = "Permission denied when listing directory contents" + result["status_code"] = 403 + except FileNotFoundError: + result["files"] = [] + result["error"] = "Directory contents not found" + result["status_code"] = 404 + + return result + except RootCheckError as e: + # Path escapes root — check if it belongs to another file share + with db.get_db_session(ctx.db_url) as session: + match = db.find_fsp_from_absolute_path(session, e.full_path) + if match: + fsp, relative_subpath = match + return {"redirect": True, "fsp_name": fsp.name, "subpath": relative_subpath or ""} + return {"error": str(e), "status_code": 400} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} + + +def _action_list_dir_paged(request: dict, ctx: WorkerContext) -> dict: + """List directory contents with pagination.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + current_user = ctx.username + limit = request.get("limit", 200) + cursor = request.get("cursor") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error, "status_code": 404 if "not found" in error else 500} + + from fileglancer import database as db + from fileglancer.filestore import RootCheckError + + try: + with db.get_db_session(ctx.db_url) as session: + file_info = filestore.get_file_info(subpath, current_user=current_user, session=session) + result = {"info": json.loads(file_info.model_dump_json())} + + if file_info.is_dir: + try: + files, has_more, next_cursor, total_count = filestore.yield_file_infos_paginated( + subpath, current_user=current_user, session=session, + limit=limit, cursor=cursor + ) + result["files"] = [json.loads(f.model_dump_json()) for f in files] + result["has_more"] = has_more + result["next_cursor"] = next_cursor + result["total_count"] = total_count + except PermissionError: + result["files"] = [] + result["error"] = "Permission denied when listing directory contents" + result["status_code"] = 403 + except FileNotFoundError: + result["files"] = [] + result["error"] = "Directory contents not found" + result["status_code"] = 404 + + return result + except RootCheckError as e: + with db.get_db_session(ctx.db_url) as session: + match = db.find_fsp_from_absolute_path(session, e.full_path) + if match: + fsp, relative_subpath = match + return {"redirect": True, "fsp_name": fsp.name, "subpath": relative_subpath or ""} + return {"error": str(e), "status_code": 400} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} + + +def _action_get_file_info(request: dict, ctx: WorkerContext) -> dict: + """Get metadata for a single file or directory.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + from fileglancer import database as db + + with db.get_db_session(ctx.db_url) as session: + file_info = filestore.get_file_info(subpath, current_user=ctx.username, session=session) + return {"info": json.loads(file_info.model_dump_json())} + + +def _action_check_binary(request: dict, ctx: WorkerContext) -> dict: + """Check if a file is binary.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + is_binary = filestore.check_is_binary(subpath) + return {"is_binary": is_binary} + + +def _action_open_file(request: dict, ctx: WorkerContext) -> dict: + """Open a file and return its metadata. + + The actual file content is read and returned as part of the response + for simplicity. For very large files, fd passing via SCM_RIGHTS + could be added as an optimization. + """ + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + from fileglancer.filestore import RootCheckError + from fileglancer.utils import guess_content_type + + try: + file_info = filestore.get_file_info(subpath) + if file_info.is_dir: + return {"error": "Cannot download directory content", "status_code": 400} + + file_name = subpath.split('/')[-1] if subpath else '' + content_type = guess_content_type(file_name) + full_path = filestore._check_path_in_root(subpath) + + return { + "full_path": full_path, + "file_size": file_info.size, + "content_type": content_type, + } + except RootCheckError as e: + from fileglancer import database as db + with db.get_db_session(ctx.db_url) as session: + match = db.find_fsp_from_absolute_path(session, e.full_path) + if match: + fsp, relative_subpath = match + return {"redirect": True, "fsp_name": fsp.name, "subpath": relative_subpath or ""} + return {"error": str(e), "status_code": 400} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} + + +def _action_head_file(request: dict, ctx: WorkerContext) -> dict: + """Get file metadata and binary check for HEAD requests.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + from fileglancer.filestore import RootCheckError + from fileglancer.utils import guess_content_type + + try: + file_info = filestore.get_file_info(subpath, current_user=ctx.username) + file_name = subpath.split('/')[-1] if subpath else '' + content_type = guess_content_type(file_name) + is_binary = filestore.check_is_binary(subpath) if not file_info.is_dir else False + + return { + "info": json.loads(file_info.model_dump_json()), + "content_type": content_type, + "is_binary": is_binary, + } + except RootCheckError as e: + from fileglancer import database as db + with db.get_db_session(ctx.db_url) as session: + match = db.find_fsp_from_absolute_path(session, e.full_path) + if match: + fsp, relative_subpath = match + return {"redirect": True, "fsp_name": fsp.name, "subpath": relative_subpath or ""} + return {"error": str(e), "status_code": 400} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} + + +def _action_create_dir(request: dict, ctx: WorkerContext) -> dict: + """Create a directory.""" + fsp_name = request["fsp_name"] + subpath = request["subpath"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.create_dir(subpath) + return {"ok": True} + except FileExistsError: + return {"error": "A file or directory with this name already exists", "status_code": 409} + except PermissionError as e: + return {"error": str(e), "status_code": 403} + + +def _action_create_file(request: dict, ctx: WorkerContext) -> dict: + """Create an empty file.""" + fsp_name = request["fsp_name"] + subpath = request["subpath"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.create_empty_file(subpath) + return {"ok": True} + except FileExistsError: + return {"error": "A file or directory with this name already exists", "status_code": 409} + except PermissionError as e: + return {"error": str(e), "status_code": 403} + + +def _action_rename(request: dict, ctx: WorkerContext) -> dict: + """Rename a file or directory.""" + fsp_name = request["fsp_name"] + old_path = request["old_path"] + new_path = request["new_path"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.rename_file_or_dir(old_path, new_path) + return {"ok": True} + except PermissionError as e: + return {"error": str(e), "status_code": 403} + except OSError as e: + return {"error": str(e), "status_code": 500} + + +def _action_delete(request: dict, ctx: WorkerContext) -> dict: + """Delete a file or directory.""" + fsp_name = request["fsp_name"] + subpath = request["subpath"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.remove_file_or_dir(subpath) + return {"ok": True} + except PermissionError as e: + return {"error": str(e), "status_code": 403} + except OSError as e: + return {"error": str(e), "status_code": 500} + + +def _action_chmod(request: dict, ctx: WorkerContext) -> dict: + """Change file permissions.""" + fsp_name = request["fsp_name"] + subpath = request["subpath"] + permissions = request["permissions"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.change_file_permissions(subpath, permissions) + return {"ok": True} + except PermissionError as e: + return {"error": str(e), "status_code": 403} + except OSError as e: + return {"error": str(e), "status_code": 500} + + +def _action_update_file(request: dict, ctx: WorkerContext) -> dict: + """Handle rename and/or permission change on a file.""" + fsp_name = request["fsp_name"] + subpath = request.get("subpath", "") + new_path = request.get("new_path") + new_permissions = request.get("new_permissions") + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + old_file_info = filestore.get_file_info(subpath, ctx.username) + result = {"info": json.loads(old_file_info.model_dump_json())} + + if new_permissions is not None and new_permissions != old_file_info.permissions: + filestore.change_file_permissions(subpath, new_permissions) + + if new_path is not None and new_path != old_file_info.path: + filestore.rename_file_or_dir(old_file_info.path, new_path) + + result["ok"] = True + return result + except PermissionError as e: + return {"error": str(e), "status_code": 403} + except OSError as e: + return {"error": str(e), "status_code": 500} + + +def _action_validate_paths(request: dict, ctx: WorkerContext) -> dict: + """Validate file/directory paths for app parameters.""" + from fileglancer.apps.core import validate_path_in_filestore + from fileglancer import database as db + + paths = request["paths"] + errors = {} + with db.get_db_session(ctx.db_url) as session: + for param_key, path_value in paths.items(): + error = validate_path_in_filestore(path_value, session) + if error: + errors[param_key] = error + return {"errors": errors} + + +def _action_get_profile(request: dict, ctx: WorkerContext) -> dict: + """Get user profile information.""" + import grp as _grp + + from fileglancer import database as db + + username = ctx.username + + with db.get_db_session(ctx.db_url) as session: + paths = db.get_file_share_paths(session) + + home_fsp = next((fsp for fsp in paths if fsp.mount_path in ('~', '~/')), None) + if home_fsp: + home_directory_name = "." + else: + home_directory_path = os.path.expanduser(f"~{username}") + home_parent = os.path.dirname(home_directory_path) + home_fsp = next((fsp for fsp in paths if fsp.mount_path == home_parent), None) + home_directory_name = os.path.basename(home_directory_path) + + home_fsp_name = home_fsp.name if home_fsp else None + + user_groups = [] + try: + user_info = pwd.getpwnam(username) + all_groups = _grp.getgrall() + for group in all_groups: + if username in group.gr_mem: + user_groups.append(group.gr_name) + primary_group = _grp.getgrgid(user_info.pw_gid).gr_name + if primary_group not in user_groups: + user_groups.append(primary_group) + except Exception as e: + logger.error(f"Error getting groups for user {username}: {e}") + + return { + "username": username, + "homeFileSharePathName": home_fsp_name, + "homeDirectoryName": home_directory_name, + "groups": user_groups, + } + + +# --------------------------------------------------------------------------- +# Action handlers — SSH keys +# --------------------------------------------------------------------------- + +def _action_list_ssh_keys(request: dict, ctx: WorkerContext) -> dict: + """List SSH keys.""" + from fileglancer import sshkeys + try: + ssh_dir = sshkeys.get_ssh_directory() + keys = sshkeys.list_ssh_keys(ssh_dir) + return {"keys": [k.model_dump() for k in keys]} + except Exception as e: + return {"error": str(e), "status_code": 500} + + +def _action_generate_ssh_key(request: dict, ctx: WorkerContext) -> dict: + """Generate a temporary SSH key and authorize it.""" + from fileglancer import sshkeys + try: + ssh_dir = sshkeys.get_ssh_directory() + passphrase = request.get("passphrase") + result = sshkeys.generate_temp_key_and_authorize(ssh_dir, passphrase) + # TempKeyResponse is a Response object; extract the data we need + return { + "private_key": result.body.decode() if hasattr(result, 'body') else str(result), + "fingerprint": result.headers.get("X-SSH-Key-Fingerprint", "") if hasattr(result, 'headers') else "", + "comment": result.headers.get("X-SSH-Key-Comment", "") if hasattr(result, 'headers') else "", + } + except Exception as e: + return {"error": str(e), "status_code": 500} + + +# --------------------------------------------------------------------------- +# Action handlers — job files +# --------------------------------------------------------------------------- + +def _action_get_job_file(request: dict, ctx: WorkerContext) -> dict: + """Read job file content (script, stdout, stderr).""" + from fileglancer.apps.core import get_job_file_content + job_id = request["job_id"] + file_type = request["file_type"] + content = get_job_file_content(job_id, ctx.username, file_type) + if content is None: + return {"content": None} + return {"content": content} + + +def _action_get_job_file_paths(request: dict, ctx: WorkerContext) -> dict: + """Get job file path info.""" + from fileglancer.apps.core import get_job_file_paths + from fileglancer import database as db + from fileglancer.settings import get_settings + + settings = get_settings() + job_id = request["job_id"] + + with db.get_db_session(settings.db_url) as session: + db_job = db.get_job(session, job_id, ctx.username) + if db_job is None: + return {"error": f"Job {job_id} not found", "status_code": 404} + files = get_job_file_paths(db_job) + return {"files": files} + + +def _action_get_service_url(request: dict, ctx: WorkerContext) -> dict: + """Read service URL from job work directory.""" + from fileglancer.apps.core import get_service_url + from fileglancer import database as db + from fileglancer.settings import get_settings + + settings = get_settings() + job_id = request["job_id"] + + with db.get_db_session(settings.db_url) as session: + db_job = db.get_job(session, job_id, ctx.username) + if db_job is None: + return {"error": f"Job {job_id} not found", "status_code": 404} + url = get_service_url(db_job) + return {"service_url": url} + + +# --------------------------------------------------------------------------- +# Action handlers — S3 proxy +# --------------------------------------------------------------------------- + +def _action_s3_list_objects(request: dict, ctx: WorkerContext) -> dict: + """S3-compatible list objects.""" + from x2s3.client_file import FileProxyClient + + mount_path = request["mount_path"] + target_name = request["target_name"] + buffer_size = request.get("buffer_size", 256 * 1024) + + client = FileProxyClient( + proxy_kwargs={"target_name": target_name}, + path=mount_path, + buffer_size=buffer_size, + ) + + # list_objects_v2 is async def but does only sync I/O + result = _run_async(client.list_objects_v2( + continuation_token=request.get("continuation_token"), + delimiter=request.get("delimiter"), + encoding_type=request.get("encoding_type"), + fetch_owner=request.get("fetch_owner"), + max_keys=request.get("max_keys", 1000), + prefix=request.get("prefix"), + start_after=request.get("start_after"), + )) + # Result is a fastapi Response object + return {"body": result.body.decode(), "media_type": result.media_type, "status_code": result.status_code} + + +def _action_s3_head_object(request: dict, ctx: WorkerContext) -> dict: + """S3-compatible head object.""" + from x2s3.client_file import FileProxyClient + + mount_path = request["mount_path"] + target_name = request["target_name"] + path = request["path"] + + client = FileProxyClient( + proxy_kwargs={"target_name": target_name}, + path=mount_path, + ) + + result = _run_async(client.head_object(path)) + headers = dict(result.headers) if hasattr(result, 'headers') else {} + return {"headers": headers, "status_code": result.status_code} + + +def _action_s3_open_object(request: dict, ctx: WorkerContext) -> dict: + """S3-compatible open object — open the file and return metadata. + + The actual file descriptor stays open in this process; + for now we return the path so the main process can open it + with the fd still valid via the user context. + Actually: the worker opens the file and returns metadata. + The main process will open its own fd (the worker already proved + access is valid by running as the user). + """ + from x2s3.client_file import FileProxyClient, FileObjectHandle + from x2s3.utils import get_nosuchkey_response, get_error_response + + mount_path = request["mount_path"] + target_name = request["target_name"] + path = request["path"] + range_header = request.get("range_header") + + client = FileProxyClient( + proxy_kwargs={"target_name": target_name}, + path=mount_path, + buffer_size=request.get("buffer_size", 256 * 1024), + ) + + result = _run_async(client.open_object(path, range_header)) + + if isinstance(result, FileObjectHandle): + # Return metadata; the file handle stays open in the worker + # The main process will need to stream from here + response = { + "type": "handle", + "status_code": result.status_code, + "headers": result.headers, + "media_type": result.media_type, + "content_length": result.content_length, + "key": result.key, + "target_name": result.target_name, + "start": result.start, + "end": result.end, + # Include the resolved path so main process can open it + "resolved_path": client._safe_path(path), + } + result.close() + return response + else: + # Error response + return { + "type": "error_response", + "body": result.body.decode() if hasattr(result, 'body') else "", + "status_code": result.status_code, + "headers": dict(result.headers) if hasattr(result, 'headers') else {}, + } + + +# --------------------------------------------------------------------------- +# Action handlers — proxied path validation +# --------------------------------------------------------------------------- + +def _action_validate_proxied_path(request: dict, ctx: WorkerContext) -> dict: + """Validate that the user can access a proxied path. + + Runs within the user's context (the worker IS the user), so + filesystem permission checks just work. + """ + fsp_name = request["fsp_name"] + path = request["path"] + + filestore, error = _get_filestore(fsp_name, ctx.db_url) + if filestore is None: + return {"error": error} + + try: + filestore.get_file_info(path) + return {"ok": True} + except (FileNotFoundError, PermissionError) as e: + return {"error": str(e)} + + +# --------------------------------------------------------------------------- +# Action handlers — cluster operations (absorbed from apps/worker.py) +# --------------------------------------------------------------------------- + +def _action_submit(request: dict, ctx: WorkerContext) -> dict: + """Create work dir, symlink repo, submit job via py-cluster-api.""" + from cluster_api import create_executor, ResourceSpec + + config = request["cluster_config"] + config.pop("extra_args", None) + + executor = create_executor(**config) + + work_dir = Path(request["work_dir"]) + work_dir.mkdir(parents=True, exist_ok=True) + + cached_repo_dir = request["cached_repo_dir"] + repo_link = work_dir / "repo" + if repo_link.is_symlink() or repo_link.exists(): + repo_link.unlink() + repo_link.symlink_to(cached_repo_dir) + + res = request["resources"] + resource_spec = ResourceSpec( + cpus=res.get("cpus"), + gpus=res.get("gpus"), + memory=res.get("memory"), + walltime=res.get("walltime"), + queue=res.get("queue"), + work_dir=res["work_dir"], + stdout_path=res.get("stdout_path"), + stderr_path=res.get("stderr_path"), + extra_directives=res.get("extra_directives"), + extra_args=res.get("extra_args"), + ) + + job = _run_async(executor.submit( + command=request["command"], + name=request["job_name"], + resources=resource_spec, + )) + + return {"job_id": job.job_id, "script_path": job.script_path} + + +def _action_cancel(request: dict, ctx: WorkerContext) -> dict: + """Cancel a cluster job via py-cluster-api.""" + from cluster_api import create_executor + + config = request["cluster_config"] + config.pop("extra_args", None) + + executor = create_executor(**config) + _run_async(executor.cancel(request["job_id"])) + return {"status": "ok"} + + +def _action_poll(request: dict, ctx: WorkerContext) -> dict: + """Poll job statuses via py-cluster-api.""" + from cluster_api import create_executor + from cluster_api._types import JobRecord, JobStatus + + config = request["cluster_config"] + config.pop("extra_args", None) + + executor = create_executor(**config) + + known_statuses = request.get("job_statuses", {}) + for cid in request["cluster_job_ids"]: + db_status = known_statuses.get(cid, "PENDING").lower() + try: + seed_status = JobStatus(db_status) + except ValueError: + seed_status = JobStatus.PENDING + executor._jobs[cid] = JobRecord( + job_id=cid, + name="", + command="", + status=seed_status, + ) + + _run_async(executor.poll()) + + jobs = {} + for cid, record in executor.jobs.items(): + jobs[cid] = { + "status": record.status.value, + "exit_code": record.exit_code, + "exec_host": record.exec_host, + "start_time": record.start_time.isoformat() if record.start_time else None, + "finish_time": record.finish_time.isoformat() if record.finish_time else None, + } + + return {"jobs": jobs} + + +def _action_reconnect(request: dict, ctx: WorkerContext) -> dict: + """Reconnect to existing jobs via py-cluster-api.""" + from cluster_api import create_executor + + config = request["cluster_config"] + config.pop("extra_args", None) + + executor = create_executor(**config) + reconnected = _run_async(executor.reconnect()) + + jobs = {} + for record in reconnected: + jobs[record.job_id] = { + "status": record.status.value, + "name": record.name, + "exit_code": record.exit_code, + "exec_host": record.exec_host, + "start_time": record.start_time.isoformat() if record.start_time else None, + "finish_time": record.finish_time.isoformat() if record.finish_time else None, + } + + return {"jobs": jobs} + + +# --------------------------------------------------------------------------- +# Action handlers — git/manifest operations (absorbed from apps/worker.py) +# --------------------------------------------------------------------------- + +def _action_ensure_repo(request: dict, ctx: WorkerContext) -> dict: + """Clone or update a GitHub repo in the current user's cache.""" + from fileglancer.apps.core import _ensure_repo_cache + repo_dir = _run_async(_ensure_repo_cache( + url=request["url"], + pull=request.get("pull", False), + )) + return {"repo_dir": str(repo_dir)} + + +def _action_discover_manifests(request: dict, ctx: WorkerContext) -> dict: + """Clone/pull repo and discover all manifests.""" + from fileglancer.apps.core import _ensure_repo_cache, _find_manifests_in_repo + repo_dir = _run_async(_ensure_repo_cache( + url=request["url"], + pull=True, + )) + results = _find_manifests_in_repo(repo_dir) + return { + "manifests": [ + {"path": path, "manifest": manifest.model_dump(mode="json")} + for path, manifest in results + ] + } + + +def _action_read_manifest(request: dict, ctx: WorkerContext) -> dict: + """Fetch and read a single manifest from a cached repo.""" + from fileglancer.apps.core import _ensure_repo_cache, _read_manifest_file + repo_dir = _run_async(_ensure_repo_cache( + url=request["url"], + pull=request.get("pull", False), + )) + manifest_path = request.get("manifest_path", "") + target_dir = repo_dir / manifest_path if manifest_path else repo_dir + manifest = _read_manifest_file(target_dir) + return {"manifest": manifest.model_dump(mode="json")} + + +# --------------------------------------------------------------------------- +# Action registry +# --------------------------------------------------------------------------- + +_ACTIONS: dict[str, Any] = { + # File operations + "list_dir": _action_list_dir, + "list_dir_paged": _action_list_dir_paged, + "get_file_info": _action_get_file_info, + "check_binary": _action_check_binary, + "open_file": _action_open_file, + "head_file": _action_head_file, + "create_dir": _action_create_dir, + "create_file": _action_create_file, + "rename": _action_rename, + "delete": _action_delete, + "chmod": _action_chmod, + "update_file": _action_update_file, + "validate_paths": _action_validate_paths, + "validate_proxied_path": _action_validate_proxied_path, + "get_profile": _action_get_profile, + # SSH keys + "list_ssh_keys": _action_list_ssh_keys, + "generate_ssh_key": _action_generate_ssh_key, + # Job files + "get_job_file": _action_get_job_file, + "get_job_file_paths": _action_get_job_file_paths, + "get_service_url": _action_get_service_url, + # S3 proxy + "s3_list_objects": _action_s3_list_objects, + "s3_head_object": _action_s3_head_object, + "s3_open_object": _action_s3_open_object, + # Cluster operations + "submit": _action_submit, + "cancel": _action_cancel, + "poll": _action_poll, + "reconnect": _action_reconnect, + # Git/manifest operations + "ensure_repo": _action_ensure_repo, + "discover_manifests": _action_discover_manifests, + "read_manifest": _action_read_manifest, +} + + +# --------------------------------------------------------------------------- +# Worker context and main loop +# --------------------------------------------------------------------------- + +class WorkerContext: + """Holds per-worker state.""" + + def __init__(self, username: str, db_url: str): + self.username = username + self.db_url = db_url + + +def main(): + """Worker entry point — run the request/response loop.""" + + # Set up orphan prevention + _set_pdeathsig() + + # Configure logging + log_level = os.environ.get("FGC_LOG_LEVEL", "INFO").upper() + + # Use loguru for worker logging, output to stderr + logger.remove() + logger.add(sys.stderr, level=log_level) + + # Configure cluster_api logging + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(logging.Formatter( + "%(levelname)s | %(name)s:%(funcName)s:%(lineno)d - %(message)s" + )) + cluster_logger = logging.getLogger("cluster_api") + cluster_logger.addHandler(handler) + cluster_logger.setLevel(log_level) + + # Get the socket fd from environment + fd = int(os.environ["FGC_WORKER_FD"]) + sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) + os.close(fd) # close the original fd, we have a dup now + + # Determine username + uid = os.getuid() + try: + username = pwd.getpwuid(uid).pw_name + except KeyError: + username = str(uid) + + db_url = os.environ.get("FGC_DB_URL", "") + ctx = WorkerContext(username=username, db_url=db_url) + + logger.info( + f"Worker started for {username} " + f"(uid={uid} euid={os.geteuid()} pid={os.getpid()})" + ) + + # Main request/response loop + while True: + try: + request = _recv(sock) + except ConnectionError: + logger.info("Parent connection closed, exiting") + break + + action = request.get("action") + + if action == "shutdown": + logger.info(f"Shutdown requested, exiting") + break + + handler = _ACTIONS.get(action) + if handler is None: + _send(sock, {"error": f"Unknown action: {action}"}) + continue + + try: + result = handler(request, ctx) + _send(sock, result) + except Exception as e: + logger.exception(f"Error handling action {action}") + _send(sock, {"error": str(e)}) + + sock.close() + logger.info("Worker exiting") + + +if __name__ == "__main__": + main() diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py new file mode 100644 index 00000000..49018000 --- /dev/null +++ b/fileglancer/worker_pool.py @@ -0,0 +1,318 @@ +"""Per-user persistent worker pool. + +Manages a pool of long-lived subprocess workers, one per active user. +Each worker runs with the target user's real UID/GID/groups (set at +fork time via subprocess.Popen kwargs), so the main Uvicorn process +never calls seteuid/setegid/setgroups. + +Workers communicate with the main process over a Unix socketpair using +a length-prefixed JSON protocol. The main process dispatches all +user-scoped work (file I/O, cluster ops, git ops) to the appropriate +worker, keeping the async event loop free. + +Usage from server.py: + + pool = WorkerPool(settings) + worker = await pool.get_worker(username) + result = await worker.execute("list_dir", fsp_name="home", subpath="Documents") +""" + +from __future__ import annotations + +import asyncio +import ctypes +import ctypes.util +import grp +import json +import os +import pwd +import struct +import subprocess +import sys +import time +from typing import Any, Optional + +from loguru import logger + +from fileglancer.settings import Settings + + +# Length-prefix format: 4-byte big-endian unsigned int +_HEADER_FMT = "!I" +_HEADER_SIZE = struct.calcsize(_HEADER_FMT) +_MAX_MESSAGE_SIZE = 64 * 1024 * 1024 # 64 MB safety limit + + +class WorkerError(Exception): + """Raised when a worker returns an error response.""" + pass + + +class WorkerDead(Exception): + """Raised when the worker subprocess has died unexpectedly.""" + pass + + +class UserWorker: + """Wraps a single persistent worker subprocess for one user. + + IPC uses a Unix socketpair with length-prefixed JSON messages. + The worker handles one request at a time (serial). + """ + + def __init__(self, username: str, process: subprocess.Popen, + reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.username = username + self.process = process + self.reader = reader + self.writer = writer + self.last_activity = time.monotonic() + self._busy = False + + @property + def is_alive(self) -> bool: + return self.process.poll() is None + + @property + def is_busy(self) -> bool: + return self._busy + + async def execute(self, action: str, **kwargs) -> Any: + """Send a request to the worker and return the parsed response. + + Raises WorkerError on application-level errors from the worker. + Raises WorkerDead if the subprocess has exited. + """ + if not self.is_alive: + raise WorkerDead(f"Worker for {self.username} is dead (rc={self.process.returncode})") + + self._busy = True + self.last_activity = time.monotonic() + try: + request = {"action": action, **kwargs} + await self._send(request) + response = await self._recv() + + if response.get("error"): + raise WorkerError(response["error"]) + + return response + except (BrokenPipeError, ConnectionResetError, asyncio.IncompleteReadError) as e: + raise WorkerDead(f"Worker for {self.username} connection lost: {e}") from e + finally: + self._busy = False + self.last_activity = time.monotonic() + + async def _send(self, data: dict): + payload = json.dumps(data).encode() + header = struct.pack(_HEADER_FMT, len(payload)) + self.writer.write(header + payload) + await self.writer.drain() + + async def _recv(self) -> dict: + header = await self.reader.readexactly(_HEADER_SIZE) + (length,) = struct.unpack(_HEADER_FMT, header) + if length > _MAX_MESSAGE_SIZE: + raise WorkerError(f"Response too large: {length} bytes") + payload = await self.reader.readexactly(length) + return json.loads(payload) + + async def shutdown(self, timeout: float = 5.0): + """Ask the worker to shut down gracefully, then force-kill if needed.""" + if not self.is_alive: + return + try: + await self._send({"action": "shutdown"}) + except (BrokenPipeError, ConnectionResetError, OSError): + pass + + # Wait for clean exit + try: + await asyncio.wait_for( + asyncio.get_event_loop().run_in_executor(None, self.process.wait), + timeout=timeout, + ) + except asyncio.TimeoutError: + logger.warning(f"Worker for {self.username} did not exit in {timeout}s, killing") + self.process.kill() + self.process.wait() + + self.writer.close() + + +class WorkerPool: + """Manages per-user persistent worker subprocesses. + + Workers are spawned on demand and evicted after idle timeout. + """ + + def __init__(self, settings: Settings): + self.settings = settings + self._workers: dict[str, UserWorker] = {} + self._locks: dict[str, asyncio.Lock] = {} + self._eviction_task: Optional[asyncio.Task] = None + self.max_workers = 50 + self.idle_timeout = 300 # seconds + + def _get_lock(self, username: str) -> asyncio.Lock: + if username not in self._locks: + self._locks[username] = asyncio.Lock() + return self._locks[username] + + async def get_worker(self, username: str) -> UserWorker: + """Get or create a worker for the given user.""" + # Fast path: worker exists and is alive + worker = self._workers.get(username) + if worker is not None and worker.is_alive: + worker.last_activity = time.monotonic() + return worker + + # Slow path: need to create or replace worker + async with self._get_lock(username): + # Double-check after acquiring lock + worker = self._workers.get(username) + if worker is not None and worker.is_alive: + worker.last_activity = time.monotonic() + return worker + + # Clean up dead worker if present + if worker is not None: + logger.warning(f"Worker for {username} found dead, replacing") + self._workers.pop(username, None) + + # Evict LRU worker if at capacity + if len(self._workers) >= self.max_workers: + await self._evict_lru() + + # Spawn new worker + new_worker = await self._spawn_worker(username) + self._workers[username] = new_worker + return new_worker + + async def _spawn_worker(self, username: str) -> UserWorker: + """Spawn a new persistent worker subprocess for the given user.""" + pw = pwd.getpwnam(username) + + # Build identity kwargs (only switch if running as root) + identity_kwargs: dict = {} + if os.geteuid() == 0: + groups = [g.gr_gid for g in grp.getgrall() if username in g.gr_mem] + if pw.pw_gid not in groups: + groups.append(pw.pw_gid) + identity_kwargs = { + "user": pw.pw_uid, + "group": pw.pw_gid, + "extra_groups": groups, + } + + # Create Unix socketpair for IPC + parent_sock, child_sock = os.socketpair() + + env = { + **os.environ, + "HOME": pw.pw_dir, + "FGC_LOG_LEVEL": self.settings.log_level, + "FGC_DB_URL": self.settings.db_url, + "FGC_WORKER_FD": str(child_sock.fileno()), + } + + logger.info( + f"Spawning persistent worker for {username} " + f"(uid={pw.pw_uid} gid={pw.pw_gid})" + ) + + process = subprocess.Popen( + [sys.executable, "-m", "fileglancer.user_worker"], + env=env, + pass_fds=(child_sock.fileno(),), + stderr=subprocess.PIPE, + **identity_kwargs, + ) + + # Close child's end in the parent + child_sock.close() + + # Wrap the parent socket fd in asyncio streams + loop = asyncio.get_event_loop() + reader, writer = await asyncio.open_connection(sock=parent_sock) + + # Start a background task to forward worker stderr to loguru + asyncio.create_task(self._forward_stderr(username, process)) + + worker = UserWorker(username, process, reader, writer) + return worker + + async def _forward_stderr(self, username: str, process: subprocess.Popen): + """Forward worker stderr lines to loguru in the background.""" + try: + loop = asyncio.get_event_loop() + while True: + line = await loop.run_in_executor(None, process.stderr.readline) + if not line: + break + logger.debug(f"[worker:{username}] {line.decode().rstrip()}") + except Exception: + pass + + async def _evict_lru(self): + """Evict the least-recently-used idle worker.""" + candidates = [ + (w.last_activity, name, w) + for name, w in self._workers.items() + if not w.is_busy + ] + if not candidates: + logger.warning("Worker pool at capacity with no idle workers to evict") + return + + candidates.sort() + _, name, worker = candidates[0] + logger.info(f"Evicting LRU worker for {name}") + await worker.shutdown() + self._workers.pop(name, None) + + async def start_eviction_loop(self): + """Start the background eviction loop.""" + if self._eviction_task is None or self._eviction_task.done(): + self._eviction_task = asyncio.create_task(self._eviction_loop()) + + async def _eviction_loop(self): + """Periodically evict idle workers.""" + while True: + await asyncio.sleep(60) + now = time.monotonic() + to_evict = [] + for name, worker in list(self._workers.items()): + if not worker.is_busy and (now - worker.last_activity) > self.idle_timeout: + to_evict.append(name) + elif not worker.is_alive: + to_evict.append(name) + + for name in to_evict: + worker = self._workers.pop(name, None) + if worker is not None: + if worker.is_alive: + logger.info(f"Evicting idle worker for {name}") + await worker.shutdown() + else: + logger.info(f"Removing dead worker for {name}") + + async def shutdown_all(self): + """Shut down all workers (called during server shutdown).""" + if self._eviction_task and not self._eviction_task.done(): + self._eviction_task.cancel() + try: + await self._eviction_task + except asyncio.CancelledError: + pass + + tasks = [] + for name, worker in list(self._workers.items()): + logger.info(f"Shutting down worker for {name}") + tasks.append(worker.shutdown(timeout=10.0)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + self._workers.clear() + self._locks.clear() From e88956cc46ac17e728fc541a1aacae4b77293ab1 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:08:15 -0400 Subject: [PATCH 02/19] pass file descriptors using SCM_RIGHTS for performance --- fileglancer/server.py | 41 +++++++------- fileglancer/user_worker.py | 67 ++++++++++++++++------- fileglancer/worker_pool.py | 107 +++++++++++++++++++++++++++---------- 3 files changed, 147 insertions(+), 68 deletions(-) diff --git a/fileglancer/server.py b/fileglancer/server.py index 3f812223..0ec707a2 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -231,6 +231,11 @@ async def _worker_exec(username: str, action: str, **kwargs): When use_access_flags=False (dev/test mode), runs the action directly in the current process since no identity switching is needed. + If the worker opens a file and passes back a file descriptor (e.g. + open_file, s3_open_object), the response dict will contain a + ``_file_handle`` key with an open file object. Callers that don't + need it can ignore this key. + Raises HTTPException on worker-level errors or dead workers. """ if worker_pool is not None: @@ -250,7 +255,10 @@ async def _worker_exec(username: str, action: str, **kwargs): raise HTTPException(status_code=500, detail=f"Unknown action: {action}") ctx = WorkerContext(username=username, db_url=settings.db_url) request = {"action": action, **kwargs} - return handler(request, ctx) + result = handler(request, ctx) + # Strip the raw fd (not meaningful in-process), keep _file_handle + result.pop("_fd", None) + return result def _resolve_proxy_info(sharing_key: str, captured_path: str) -> Tuple[dict | Response, str]: """Resolve a sharing key to proxy info (mount_path, target_name, username, subpath). @@ -1140,20 +1148,16 @@ async def target_dispatcher(request: Request, else: range_header = request.headers.get("range") - result = await _worker_exec(info["username"], "s3_open_object", - mount_path=info["mount_path"], - target_name=info["target_name"], - path=subpath, - range_header=range_header) - - if result.get("type") == "handle": - # Worker validated access and returned file metadata - # Open the file in main process (root can read anything) - resolved_path = result["resolved_path"] - if resolved_path is None: - return get_error_response(404, "NoSuchKey", "File not found", subpath) + result = await _worker_exec( + info["username"], "s3_open_object", + mount_path=info["mount_path"], + target_name=info["target_name"], + path=subpath, + range_header=range_header) - file_handle = open(resolved_path, "rb") + file_handle = result.pop("_file_handle", None) + if result.get("type") == "handle" and file_handle is not None: + # Worker opened the file and passed the fd via SCM_RIGHTS from x2s3.client_file import FileObjectHandle, file_iterator handle = FileObjectHandle( target_name=result["target_name"], @@ -1317,7 +1321,7 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s else: filestore_name, _, subpath = path_name.partition('/') - # Worker validates path and returns metadata (runs as user) + # Worker opens the file as the user and passes the fd back result = await _worker_exec(username, "open_file", fsp_name=filestore_name, subpath=subpath) if result.get("redirect"): @@ -1328,15 +1332,12 @@ async def get_file_content(request: Request, path_name: str, subpath: Optional[s if "error" in result: raise HTTPException(status_code=result.get("status_code", 500), detail=result["error"]) - full_path = result["full_path"] + file_handle = result.get("_file_handle") + file_size = result["file_size"] content_type = result["content_type"] file_name = subpath.split('/')[-1] if subpath else '' - # Open file in main process — the worker validated access; - # main process runs as root so it can open the validated path - file_handle = open(full_path, 'rb') - range_header = request.headers.get('Range') if range_header: diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py index 0777085b..db77c095 100644 --- a/fileglancer/user_worker.py +++ b/fileglancer/user_worker.py @@ -86,6 +86,21 @@ def _send(sock: socket.socket, data: dict): sock.sendall(header + payload) +def _send_with_fd(sock: socket.socket, data: dict, fd: int): + """Send a length-prefixed JSON message with a file descriptor via SCM_RIGHTS.""" + import array as _array + + payload = json.dumps(data, default=str).encode() + header = struct.pack(_HEADER_FMT, len(payload)) + full_msg = header + payload + + fds = _array.array("i", [fd]) + sock.sendmsg( + [full_msg], + [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)], + ) + + def _recv(sock: socket.socket) -> dict: """Receive a length-prefixed JSON message.""" header = _recvall(sock, _HEADER_SIZE) @@ -261,11 +276,11 @@ def _action_check_binary(request: dict, ctx: WorkerContext) -> dict: def _action_open_file(request: dict, ctx: WorkerContext) -> dict: - """Open a file and return its metadata. + """Open a file and return its metadata + open file descriptor. - The actual file content is read and returned as part of the response - for simplicity. For very large files, fd passing via SCM_RIGHTS - could be added as an optimization. + The worker opens the file as the user and passes the fd back to the + main process via SCM_RIGHTS. The response includes "_fd" key with the + fd number — the main loop uses _send_with_fd() for these responses. """ fsp_name = request["fsp_name"] subpath = request.get("subpath", "") @@ -286,10 +301,15 @@ def _action_open_file(request: dict, ctx: WorkerContext) -> dict: content_type = guess_content_type(file_name) full_path = filestore._check_path_in_root(subpath) + # Open the file — the fd retains user's access rights + file_handle = open(full_path, 'rb') + fd = file_handle.fileno() + return { - "full_path": full_path, "file_size": file_info.size, "content_type": content_type, + "_fd": fd, + "_file_handle": file_handle, # kept alive until fd is sent } except RootCheckError as e: from fileglancer import database as db @@ -653,17 +673,13 @@ def _action_s3_head_object(request: dict, ctx: WorkerContext) -> dict: def _action_s3_open_object(request: dict, ctx: WorkerContext) -> dict: - """S3-compatible open object — open the file and return metadata. - - The actual file descriptor stays open in this process; - for now we return the path so the main process can open it - with the fd still valid via the user context. - Actually: the worker opens the file and returns metadata. - The main process will open its own fd (the worker already proved - access is valid by running as the user). + """S3-compatible open object — open the file and pass the fd back. + + The worker opens the file as the user via FileProxyClient.open_object(), + then passes the file descriptor to the main process via SCM_RIGHTS. + The main process wraps it in a StreamingResponse. """ from x2s3.client_file import FileProxyClient, FileObjectHandle - from x2s3.utils import get_nosuchkey_response, get_error_response mount_path = request["mount_path"] target_name = request["target_name"] @@ -679,8 +695,8 @@ def _action_s3_open_object(request: dict, ctx: WorkerContext) -> dict: result = _run_async(client.open_object(path, range_header)) if isinstance(result, FileObjectHandle): - # Return metadata; the file handle stays open in the worker - # The main process will need to stream from here + # Keep the file handle alive and pass the fd + fd = result.file_handle.fileno() response = { "type": "handle", "status_code": result.status_code, @@ -691,10 +707,11 @@ def _action_s3_open_object(request: dict, ctx: WorkerContext) -> dict: "target_name": result.target_name, "start": result.start, "end": result.end, - # Include the resolved path so main process can open it - "resolved_path": client._safe_path(path), + "_fd": fd, + "_file_handle": result.file_handle, # kept alive until fd is sent } - result.close() + # Don't close the handle — the fd needs to survive transfer + # The main process will close it after streaming return response else: # Error response @@ -1012,7 +1029,17 @@ def main(): try: result = handler(request, ctx) - _send(sock, result) + + # If the result contains a file descriptor, send it via SCM_RIGHTS + fd = result.pop("_fd", None) + file_handle = result.pop("_file_handle", None) + if fd is not None: + _send_with_fd(sock, result, fd) + # Close our copy of the fd — the main process has its own now + if file_handle is not None: + file_handle.close() + else: + _send(sock, result) except Exception as e: logger.exception(f"Error handling action {action}") _send(sock, {"error": str(e)}) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 49018000..3973ef6f 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -6,26 +6,30 @@ never calls seteuid/setegid/setgroups. Workers communicate with the main process over a Unix socketpair using -a length-prefixed JSON protocol. The main process dispatches all -user-scoped work (file I/O, cluster ops, git ops) to the appropriate -worker, keeping the async event loop free. +a length-prefixed JSON protocol. When a worker response includes a file +descriptor (e.g. an opened file for streaming), it arrives transparently +via SCM_RIGHTS — callers see a ``_file_handle`` key in the response dict. Usage from server.py: pool = WorkerPool(settings) worker = await pool.get_worker(username) result = await worker.execute("list_dir", fsp_name="home", subpath="Documents") + + # For actions that open files, the fd arrives automatically: + result = await worker.execute("open_file", fsp_name="home", subpath="data.bin") + file_handle = result.get("_file_handle") # open file object, or None """ from __future__ import annotations +import array import asyncio -import ctypes -import ctypes.util import grp import json import os import pwd +import socket import struct import subprocess import sys @@ -56,16 +60,17 @@ class WorkerDead(Exception): class UserWorker: """Wraps a single persistent worker subprocess for one user. - IPC uses a Unix socketpair with length-prefixed JSON messages. - The worker handles one request at a time (serial). + IPC uses a blocking Unix socket accessed from a thread (via + run_in_executor) so the async event loop is never blocked. + All receives use recvmsg(), which transparently handles both + plain messages and messages carrying file descriptors via SCM_RIGHTS. """ def __init__(self, username: str, process: subprocess.Popen, - reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + sock: socket.socket): self.username = username self.process = process - self.reader = reader - self.writer = writer + self.sock = sock self.last_activity = time.monotonic() self._busy = False @@ -80,6 +85,9 @@ def is_busy(self) -> bool: async def execute(self, action: str, **kwargs) -> Any: """Send a request to the worker and return the parsed response. + If the worker sends a file descriptor (SCM_RIGHTS), the response + dict will contain a ``_file_handle`` key with an open file object. + Raises WorkerError on application-level errors from the worker. Raises WorkerDead if the subprocess has exited. """ @@ -90,39 +98,81 @@ async def execute(self, action: str, **kwargs) -> Any: self.last_activity = time.monotonic() try: request = {"action": action, **kwargs} - await self._send(request) - response = await self._recv() + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + None, self._send_and_recv, request) if response.get("error"): + # Close any fd that arrived with an error response + fh = response.pop("_file_handle", None) + if fh is not None: + fh.close() raise WorkerError(response["error"]) return response - except (BrokenPipeError, ConnectionResetError, asyncio.IncompleteReadError) as e: + except (BrokenPipeError, ConnectionResetError, OSError) as e: raise WorkerDead(f"Worker for {self.username} connection lost: {e}") from e finally: self._busy = False self.last_activity = time.monotonic() - async def _send(self, data: dict): - payload = json.dumps(data).encode() + def _send_and_recv(self, request: dict) -> dict: + """Send a request and receive the response (blocking, runs in thread). + + Uses recvmsg() which transparently handles optional SCM_RIGHTS fds. + """ + # Send + payload = json.dumps(request).encode() header = struct.pack(_HEADER_FMT, len(payload)) - self.writer.write(header + payload) - await self.writer.drain() + self.sock.sendall(header + payload) - async def _recv(self) -> dict: - header = await self.reader.readexactly(_HEADER_SIZE) + # Receive header + header = self._recvall(_HEADER_SIZE) (length,) = struct.unpack(_HEADER_FMT, header) if length > _MAX_MESSAGE_SIZE: raise WorkerError(f"Response too large: {length} bytes") - payload = await self.reader.readexactly(length) - return json.loads(payload) + + # Receive payload + optional fd via recvmsg + fds = array.array("i") + body = b"" + while len(body) < length: + msg, ancdata, flags, addr = self.sock.recvmsg( + length - len(body), + socket.CMSG_LEN(struct.calcsize("i")), # space for 1 fd + ) + if not msg: + raise ConnectionError("Worker closed connection mid-message") + body += msg + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + response = json.loads(body) + + # If an fd arrived, wrap it in a file object + if fds: + response["_file_handle"] = os.fdopen(fds[0], "rb") + + return response + + def _recvall(self, n: int) -> bytes: + """Read exactly n bytes from the socket.""" + data = bytearray() + while len(data) < n: + chunk = self.sock.recv(n - len(data)) + if not chunk: + raise ConnectionError("Worker closed connection") + data.extend(chunk) + return bytes(data) async def shutdown(self, timeout: float = 5.0): """Ask the worker to shut down gracefully, then force-kill if needed.""" if not self.is_alive: return try: - await self._send({"action": "shutdown"}) + payload = json.dumps({"action": "shutdown"}).encode() + header = struct.pack(_HEADER_FMT, len(payload)) + self.sock.sendall(header + payload) except (BrokenPipeError, ConnectionResetError, OSError): pass @@ -137,7 +187,10 @@ async def shutdown(self, timeout: float = 5.0): self.process.kill() self.process.wait() - self.writer.close() + try: + self.sock.close() + except OSError: + pass class WorkerPool: @@ -232,15 +285,13 @@ async def _spawn_worker(self, username: str) -> UserWorker: # Close child's end in the parent child_sock.close() - # Wrap the parent socket fd in asyncio streams - loop = asyncio.get_event_loop() - reader, writer = await asyncio.open_connection(sock=parent_sock) + # Keep the socket blocking — all I/O runs in a thread via run_in_executor + parent_sock.setblocking(True) # Start a background task to forward worker stderr to loguru asyncio.create_task(self._forward_stderr(username, process)) - worker = UserWorker(username, process, reader, writer) - return worker + return UserWorker(username, process, parent_sock) async def _forward_stderr(self, username: str, process: subprocess.Popen): """Forward worker stderr lines to loguru in the background.""" From e07027bd60511a8763a5ae0d4760db632641590b Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:36:03 -0400 Subject: [PATCH 03/19] fixed code --- fileglancer/worker_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 3973ef6f..d28acebf 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -259,7 +259,7 @@ async def _spawn_worker(self, username: str) -> UserWorker: } # Create Unix socketpair for IPC - parent_sock, child_sock = os.socketpair() + parent_sock, child_sock = socket.socketpair() env = { **os.environ, From c4c6c4cdca63a3be44f3d980d220641250153f18 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:39:43 -0400 Subject: [PATCH 04/19] added worker tests and fixed SCM_RIGHTS bug --- fileglancer/worker_pool.py | 48 ++-- tests/test_worker.py | 532 +++++++++++++++++++++++++++++++++++++ 2 files changed, 559 insertions(+), 21 deletions(-) create mode 100644 tests/test_worker.py diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index d28acebf..19e198b3 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -119,34 +119,50 @@ async def execute(self, action: str, **kwargs) -> Any: def _send_and_recv(self, request: dict) -> dict: """Send a request and receive the response (blocking, runs in thread). - Uses recvmsg() which transparently handles optional SCM_RIGHTS fds. + All receives use recvmsg() so that SCM_RIGHTS file descriptors are + captured transparently — the ancillary data arrives with the first + bytes of the message, so we must use recvmsg for the header too. """ # Send payload = json.dumps(request).encode() header = struct.pack(_HEADER_FMT, len(payload)) self.sock.sendall(header + payload) - # Receive header - header = self._recvall(_HEADER_SIZE) - (length,) = struct.unpack(_HEADER_FMT, header) + # Receive header + payload + optional fd, all via recvmsg + fds = array.array("i") + raw = b"" + # First, read at least the header + while len(raw) < _HEADER_SIZE: + msg, ancdata, flags, addr = self.sock.recvmsg( + max(_HEADER_SIZE - len(raw), 4096), + socket.CMSG_LEN(struct.calcsize("i")), + ) + if not msg: + raise ConnectionError("Worker closed connection") + raw += msg + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + (length,) = struct.unpack(_HEADER_FMT, raw[:_HEADER_SIZE]) if length > _MAX_MESSAGE_SIZE: raise WorkerError(f"Response too large: {length} bytes") - # Receive payload + optional fd via recvmsg - fds = array.array("i") - body = b"" - while len(body) < length: + # We may have read some payload bytes with the header + total_needed = _HEADER_SIZE + length + while len(raw) < total_needed: msg, ancdata, flags, addr = self.sock.recvmsg( - length - len(body), - socket.CMSG_LEN(struct.calcsize("i")), # space for 1 fd + total_needed - len(raw), + socket.CMSG_LEN(struct.calcsize("i")), ) if not msg: raise ConnectionError("Worker closed connection mid-message") - body += msg + raw += msg for cmsg_level, cmsg_type, cmsg_data in ancdata: if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + body = raw[_HEADER_SIZE:_HEADER_SIZE + length] response = json.loads(body) # If an fd arrived, wrap it in a file object @@ -155,16 +171,6 @@ def _send_and_recv(self, request: dict) -> dict: return response - def _recvall(self, n: int) -> bytes: - """Read exactly n bytes from the socket.""" - data = bytearray() - while len(data) < n: - chunk = self.sock.recv(n - len(data)) - if not chunk: - raise ConnectionError("Worker closed connection") - data.extend(chunk) - return bytes(data) - async def shutdown(self, timeout: float = 5.0): """Ask the worker to shut down gracefully, then force-kill if needed.""" if not self.is_alive: diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 00000000..f4f75a2a --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,532 @@ +"""Tests for the per-user persistent worker infrastructure. + +Tests the IPC protocol (length-prefixed JSON, SCM_RIGHTS fd passing), +worker lifecycle (spawn, execute, shutdown, crash recovery), +and the in-process dev-mode fallback. +""" + +import asyncio +import json +import os +import socket +import struct +import tempfile +import time + +import pytest + +from fileglancer.user_worker import ( + _send, + _send_with_fd, + _recv, + _ACTIONS, + WorkerContext, + _HEADER_FMT, + _HEADER_SIZE, +) +from fileglancer.worker_pool import ( + UserWorker, + WorkerPool, + WorkerError, + WorkerDead, +) + + +# --------------------------------------------------------------------------- +# IPC protocol tests (user_worker.py _send/_recv/_send_with_fd) +# --------------------------------------------------------------------------- + +class TestIPCProtocol: + """Test the length-prefixed JSON wire protocol.""" + + def test_send_recv_roundtrip(self): + """A message sent with _send can be read back with _recv.""" + a, b = socket.socketpair() + try: + msg = {"action": "test", "value": 42, "nested": {"key": "val"}} + _send(a, msg) + result = _recv(b) + assert result == msg + finally: + a.close() + b.close() + + def test_send_recv_empty_dict(self): + """Empty dicts round-trip correctly.""" + a, b = socket.socketpair() + try: + _send(a, {}) + assert _recv(b) == {} + finally: + a.close() + b.close() + + def test_send_recv_large_message(self): + """Messages larger than a single recv buffer work.""" + a, b = socket.socketpair() + try: + # Create a message larger than typical socket buffer + big_value = "x" * 100_000 + msg = {"data": big_value} + _send(a, msg) + result = _recv(b) + assert result["data"] == big_value + finally: + a.close() + b.close() + + def test_send_recv_multiple_messages(self): + """Multiple sequential messages on the same socket.""" + a, b = socket.socketpair() + try: + for i in range(10): + _send(a, {"seq": i}) + for i in range(10): + result = _recv(b) + assert result == {"seq": i} + finally: + a.close() + b.close() + + def test_recv_connection_closed(self): + """_recv raises ConnectionError when the peer closes the socket.""" + a, b = socket.socketpair() + a.close() + with pytest.raises(ConnectionError): + _recv(b) + b.close() + + def test_send_with_fd_passes_file_descriptor(self): + """_send_with_fd sends a file descriptor via SCM_RIGHTS.""" + import array + + a, b = socket.socketpair() + try: + # Create a temp file and send its fd + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("hello from fd passing") + temp_path = f.name + + fd_to_send = os.open(temp_path, os.O_RDONLY) + try: + msg = {"type": "handle", "size": 21} + _send_with_fd(a, msg, fd_to_send) + + # Receive using recvmsg for EVERYTHING (header + payload + ancillary) + # The fd arrives with the first bytes, so we must use recvmsg from the start + fds = array.array("i") + raw = b"" + total_header = _HEADER_SIZE + while len(raw) < total_header: + data, ancdata, flags, addr = b.recvmsg( + 4096, + socket.CMSG_LEN(struct.calcsize("i")), + ) + raw += data + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + (length,) = struct.unpack(_HEADER_FMT, raw[:_HEADER_SIZE]) + total_needed = _HEADER_SIZE + length + while len(raw) < total_needed: + data, ancdata, flags, addr = b.recvmsg( + total_needed - len(raw), + socket.CMSG_LEN(struct.calcsize("i")), + ) + raw += data + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + payload = raw[_HEADER_SIZE:_HEADER_SIZE + length] + result = json.loads(payload) + assert result == {"type": "handle", "size": 21} + assert len(fds) == 1 + + # Read from the received fd + received_fd = fds[0] + with os.fdopen(received_fd, 'r') as f: + content = f.read() + assert content == "hello from fd passing" + finally: + os.close(fd_to_send) + os.unlink(temp_path) + finally: + a.close() + b.close() + + +# --------------------------------------------------------------------------- +# UserWorker IPC integration tests (worker_pool.py _send_and_recv) +# --------------------------------------------------------------------------- + +class TestUserWorkerIPC: + """Test UserWorker's _send_and_recv with a mock worker on the other end.""" + + def _make_worker_pair(self): + """Create a UserWorker connected to a mock 'worker' socket.""" + parent, child = socket.socketpair() + parent.setblocking(True) + + # Create a fake Popen-like object + class FakeProcess: + returncode = None + def poll(self): return None + def wait(self): pass + def kill(self): pass + + worker = UserWorker("testuser", FakeProcess(), parent) + return worker, child + + def test_send_and_recv_basic(self): + """Basic request/response over the socket.""" + worker, child = self._make_worker_pair() + try: + # Simulate worker: read request, send response + def mock_worker(): + req = _recv(child) + assert req["action"] == "ping" + _send(child, {"status": "pong"}) + + import threading + t = threading.Thread(target=mock_worker) + t.start() + + result = worker._send_and_recv({"action": "ping"}) + assert result == {"status": "pong"} + t.join() + finally: + worker.sock.close() + child.close() + + def test_send_and_recv_with_fd(self): + """Response with SCM_RIGHTS fd is auto-wrapped in _file_handle.""" + worker, child = self._make_worker_pair() + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("fd test content") + temp_path = f.name + + def mock_worker(): + req = _recv(child) + fd = os.open(temp_path, os.O_RDONLY) + _send_with_fd(child, {"type": "handle", "size": 15}, fd) + os.close(fd) + + import threading + t = threading.Thread(target=mock_worker) + t.start() + + result = worker._send_and_recv({"action": "open_file"}) + assert result["type"] == "handle" + assert "_file_handle" in result + + fh = result["_file_handle"] + content = fh.read().decode() + fh.close() + assert content == "fd test content" + + t.join() + os.unlink(temp_path) + finally: + worker.sock.close() + child.close() + + def test_send_and_recv_no_fd(self): + """Normal response without fd has no _file_handle key.""" + worker, child = self._make_worker_pair() + try: + def mock_worker(): + _recv(child) + _send(child, {"files": [1, 2, 3]}) + + import threading + t = threading.Thread(target=mock_worker) + t.start() + + result = worker._send_and_recv({"action": "list_dir"}) + assert result == {"files": [1, 2, 3]} + assert "_file_handle" not in result + t.join() + finally: + worker.sock.close() + child.close() + + +# --------------------------------------------------------------------------- +# UserWorker async execute tests +# --------------------------------------------------------------------------- + +class TestUserWorkerExecute: + """Test the async execute() method.""" + + def _make_worker_pair(self): + parent, child = socket.socketpair() + parent.setblocking(True) + + class FakeProcess: + returncode = None + def poll(self): return None + def wait(self): pass + def kill(self): pass + + worker = UserWorker("testuser", FakeProcess(), parent) + return worker, child + + @pytest.mark.asyncio + async def test_execute_success(self): + worker, child = self._make_worker_pair() + try: + import threading + def mock_worker(): + _recv(child) + _send(child, {"result": "ok"}) + + t = threading.Thread(target=mock_worker) + t.start() + + result = await worker.execute("test_action") + assert result == {"result": "ok"} + t.join() + finally: + worker.sock.close() + child.close() + + @pytest.mark.asyncio + async def test_execute_worker_error(self): + worker, child = self._make_worker_pair() + try: + import threading + def mock_worker(): + _recv(child) + _send(child, {"error": "something broke"}) + + t = threading.Thread(target=mock_worker) + t.start() + + with pytest.raises(WorkerError, match="something broke"): + await worker.execute("bad_action") + t.join() + finally: + worker.sock.close() + child.close() + + @pytest.mark.asyncio + async def test_execute_dead_worker(self): + parent, child = socket.socketpair() + parent.setblocking(True) + child.close() + + class DeadProcess: + returncode = 1 + def poll(self): return 1 + def wait(self): pass + def kill(self): pass + + worker = UserWorker("testuser", DeadProcess(), parent) + with pytest.raises(WorkerDead): + await worker.execute("anything") + parent.close() + + @pytest.mark.asyncio + async def test_execute_with_fd_transparent(self): + """execute() transparently includes _file_handle when worker sends fd.""" + worker, child = self._make_worker_pair() + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("transparent fd") + temp_path = f.name + + import threading + def mock_worker(): + _recv(child) + fd = os.open(temp_path, os.O_RDONLY) + _send_with_fd(child, {"content_type": "text/plain"}, fd) + os.close(fd) + + t = threading.Thread(target=mock_worker) + t.start() + + result = await worker.execute("open_file") + assert result["content_type"] == "text/plain" + assert "_file_handle" in result + + fh = result["_file_handle"] + assert fh.read().decode() == "transparent fd" + fh.close() + + t.join() + os.unlink(temp_path) + finally: + worker.sock.close() + child.close() + + +# --------------------------------------------------------------------------- +# Action handler tests (user_worker.py actions run in-process) +# --------------------------------------------------------------------------- + +class TestActionHandlers: + """Test action handlers directly (simulates dev/test mode).""" + + @pytest.fixture + def temp_dir(self): + d = tempfile.mkdtemp() + # Create test files + with open(os.path.join(d, "hello.txt"), "w") as f: + f.write("hello world") + os.makedirs(os.path.join(d, "subdir")) + with open(os.path.join(d, "subdir", "nested.txt"), "w") as f: + f.write("nested content") + yield d + import shutil + shutil.rmtree(d) + + @pytest.fixture + def ctx(self, temp_dir): + """Create a WorkerContext with a test database.""" + from fileglancer.settings import get_settings + settings = get_settings() + return WorkerContext(username=os.environ.get("USER", "test"), db_url=settings.db_url) + + def test_get_profile(self, ctx): + handler = _ACTIONS["get_profile"] + result = handler({"action": "get_profile"}, ctx) + assert "username" in result + assert "groups" in result + assert isinstance(result["groups"], list) + + def test_unknown_action(self): + """Unknown actions are not in the registry.""" + assert "nonexistent_action" not in _ACTIONS + + def test_validate_paths_empty(self, ctx): + handler = _ACTIONS["validate_paths"] + result = handler({"action": "validate_paths", "paths": {}}, ctx) + assert result == {"errors": {}} + + +# --------------------------------------------------------------------------- +# Worker main loop integration test +# --------------------------------------------------------------------------- + +class TestWorkerMainLoop: + """Test the worker subprocess main loop via socketpair (no actual subprocess).""" + + def _run_worker_loop(self, child_sock): + """Run the worker main loop in a thread using the given socket.""" + import threading + + def target(): + # Simulate what main() does, but with our socket + sock = child_sock + uid = os.getuid() + try: + username = os.environ.get("USER", str(uid)) + except KeyError: + username = str(uid) + + from fileglancer.settings import get_settings + settings = get_settings() + ctx = WorkerContext(username=username, db_url=settings.db_url) + + while True: + try: + request = _recv(sock) + except ConnectionError: + break + + action = request.get("action") + if action == "shutdown": + break + + handler = _ACTIONS.get(action) + if handler is None: + _send(sock, {"error": f"Unknown action: {action}"}) + continue + + try: + result = handler(request, ctx) + fd = result.pop("_fd", None) + file_handle = result.pop("_file_handle", None) + if fd is not None: + _send_with_fd(sock, result, fd) + if file_handle is not None: + file_handle.close() + else: + _send(sock, result) + except Exception as e: + _send(sock, {"error": str(e)}) + + sock.close() + + t = threading.Thread(target=target, daemon=True) + t.start() + return t + + def test_shutdown_message(self): + """Worker exits cleanly on shutdown message.""" + parent, child = socket.socketpair() + t = self._run_worker_loop(child) + + _send(parent, {"action": "shutdown"}) + t.join(timeout=5) + assert not t.is_alive() + parent.close() + + def test_unknown_action_returns_error(self): + """Worker returns error for unknown actions.""" + parent, child = socket.socketpair() + t = self._run_worker_loop(child) + + _send(parent, {"action": "totally_fake"}) + result = _recv(parent) + assert "error" in result + assert "Unknown action" in result["error"] + + _send(parent, {"action": "shutdown"}) + t.join(timeout=5) + parent.close() + + def test_get_profile_via_loop(self): + """End-to-end: send get_profile through the worker loop.""" + parent, child = socket.socketpair() + t = self._run_worker_loop(child) + + _send(parent, {"action": "get_profile"}) + result = _recv(parent) + assert "username" in result + assert "groups" in result + + _send(parent, {"action": "shutdown"}) + t.join(timeout=5) + parent.close() + + def test_multiple_requests(self): + """Worker handles multiple sequential requests.""" + parent, child = socket.socketpair() + t = self._run_worker_loop(child) + + # Send several requests + _send(parent, {"action": "get_profile"}) + r1 = _recv(parent) + assert "username" in r1 + + _send(parent, {"action": "validate_paths", "paths": {}}) + r2 = _recv(parent) + assert r2 == {"errors": {}} + + _send(parent, {"action": "shutdown"}) + t.join(timeout=5) + parent.close() + + def test_connection_close_exits_loop(self): + """Worker exits when parent closes the socket.""" + parent, child = socket.socketpair() + t = self._run_worker_loop(child) + + # Close without sending shutdown — worker should detect and exit + parent.close() + t.join(timeout=5) + assert not t.is_alive() From 6d7ffa47a318e5193c1d3c92e741f9e0cd706bf5 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:42:43 -0400 Subject: [PATCH 05/19] fixed recvmsg bug --- fileglancer/server.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fileglancer/server.py b/fileglancer/server.py index 0ec707a2..95f54145 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -246,7 +246,7 @@ async def _worker_exec(username: str, action: str, **kwargs): logger.error(f"Worker dead for {username}: {e}") raise HTTPException(status_code=503, detail="Service temporarily unavailable") except WorkerError as e: - raise # Let caller handle application-level errors + raise HTTPException(status_code=500, detail=str(e)) else: # Dev/test mode: run action directly in-process from fileglancer.user_worker import _ACTIONS, WorkerContext @@ -1799,8 +1799,10 @@ async def get_job_file(job_id: int, if content is None: raise HTTPException(status_code=404, detail=f"File not found: {file_type}") return PlainTextResponse(content) - except WorkerError as e: - raise HTTPException(status_code=404, detail=str(e)) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) def _ensure_utc(dt: Optional[datetime]) -> Optional[datetime]: """Re-attach UTC timezone to naive datetimes from the DB. From 3b6b455e709be4df10d00d8400715d6207b66034 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:44:57 -0400 Subject: [PATCH 06/19] add lock so workers only handle one request at a time --- fileglancer/worker_pool.py | 46 +++++++++++++++++++++----------------- tests/test_worker.py | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 20 deletions(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 19e198b3..3381f8f6 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -73,6 +73,7 @@ def __init__(self, username: str, process: subprocess.Popen, self.sock = sock self.last_activity = time.monotonic() self._busy = False + self._lock = asyncio.Lock() # serialize requests to the worker @property def is_alive(self) -> bool: @@ -88,33 +89,38 @@ async def execute(self, action: str, **kwargs) -> Any: If the worker sends a file descriptor (SCM_RIGHTS), the response dict will contain a ``_file_handle`` key with an open file object. + Requests are serialized per-worker via an asyncio lock — the worker + subprocess handles one request at a time, so concurrent callers + must not interleave their sends/receives on the shared socket. + Raises WorkerError on application-level errors from the worker. Raises WorkerDead if the subprocess has exited. """ if not self.is_alive: raise WorkerDead(f"Worker for {self.username} is dead (rc={self.process.returncode})") - self._busy = True - self.last_activity = time.monotonic() - try: - request = {"action": action, **kwargs} - loop = asyncio.get_event_loop() - response = await loop.run_in_executor( - None, self._send_and_recv, request) - - if response.get("error"): - # Close any fd that arrived with an error response - fh = response.pop("_file_handle", None) - if fh is not None: - fh.close() - raise WorkerError(response["error"]) - - return response - except (BrokenPipeError, ConnectionResetError, OSError) as e: - raise WorkerDead(f"Worker for {self.username} connection lost: {e}") from e - finally: - self._busy = False + async with self._lock: + self._busy = True self.last_activity = time.monotonic() + try: + request = {"action": action, **kwargs} + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + None, self._send_and_recv, request) + + if response.get("error"): + # Close any fd that arrived with an error response + fh = response.pop("_file_handle", None) + if fh is not None: + fh.close() + raise WorkerError(response["error"]) + + return response + except (BrokenPipeError, ConnectionResetError, OSError) as e: + raise WorkerDead(f"Worker for {self.username} connection lost: {e}") from e + finally: + self._busy = False + self.last_activity = time.monotonic() def _send_and_recv(self, request: dict) -> dict: """Send a request and receive the response (blocking, runs in thread). diff --git a/tests/test_worker.py b/tests/test_worker.py index f4f75a2a..214e9ce2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -363,6 +363,47 @@ def mock_worker(): child.close() + @pytest.mark.asyncio + async def test_concurrent_execute_serialized(self): + """Concurrent execute() calls are serialized — responses never get swapped.""" + worker, child = self._make_worker_pair() + try: + import threading + + def mock_worker(): + """Echo worker: returns the action name in the response.""" + for _ in range(20): + try: + req = _recv(child) + except ConnectionError: + break + action = req.get("action", "unknown") + if action == "shutdown": + break + # Simulate some work + time.sleep(0.01) + _send(child, {"action_echo": action, "seq": req.get("seq")}) + + t = threading.Thread(target=mock_worker, daemon=True) + t.start() + + # Fire 10 concurrent requests with different actions + async def make_request(seq): + action = f"action_{seq}" + result = await worker.execute(action, seq=seq) + # Verify we got OUR response back, not someone else's + assert result["action_echo"] == action + assert result["seq"] == seq + + await asyncio.gather(*[make_request(i) for i in range(10)]) + + _send(child, {"action": "shutdown"}) # won't be read, but close cleanly + t.join(timeout=5) + finally: + worker.sock.close() + child.close() + + # --------------------------------------------------------------------------- # Action handler tests (user_worker.py actions run in-process) # --------------------------------------------------------------------------- From 942dbcac43eaa9d40dcc07cb4402bee7954429af Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:46:29 -0400 Subject: [PATCH 07/19] show log when worker triggers 500 error --- fileglancer/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fileglancer/server.py b/fileglancer/server.py index 95f54145..ad5e63e8 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -246,6 +246,7 @@ async def _worker_exec(username: str, action: str, **kwargs): logger.error(f"Worker dead for {username}: {e}") raise HTTPException(status_code=503, detail="Service temporarily unavailable") except WorkerError as e: + logger.error(f"Worker error for {username} action={action}: {e}") raise HTTPException(status_code=500, detail=str(e)) else: # Dev/test mode: run action directly in-process From 0dd3e0962a4ae744df595624c182112e57733eca Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 11:49:50 -0400 Subject: [PATCH 08/19] worker error handling --- fileglancer/server.py | 5 +++-- fileglancer/user_worker.py | 4 ++-- fileglancer/worker_pool.py | 9 +++++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/fileglancer/server.py b/fileglancer/server.py index ad5e63e8..13640c55 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -246,8 +246,9 @@ async def _worker_exec(username: str, action: str, **kwargs): logger.error(f"Worker dead for {username}: {e}") raise HTTPException(status_code=503, detail="Service temporarily unavailable") except WorkerError as e: - logger.error(f"Worker error for {username} action={action}: {e}") - raise HTTPException(status_code=500, detail=str(e)) + if e.status_code >= 500: + logger.error(f"Worker error for {username} action={action}: {e}") + raise HTTPException(status_code=e.status_code, detail=str(e)) else: # Dev/test mode: run action directly in-process from fileglancer.user_worker import _ACTIONS, WorkerContext diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py index db77c095..56802b86 100644 --- a/fileglancer/user_worker.py +++ b/fileglancer/user_worker.py @@ -320,9 +320,9 @@ def _action_open_file(request: dict, ctx: WorkerContext) -> dict: return {"redirect": True, "fsp_name": fsp.name, "subpath": relative_subpath or ""} return {"error": str(e), "status_code": 400} except FileNotFoundError: - return {"error": "File or directory not found", "status_code": 404} + return {"error": f"File or directory not found: {fsp_name}/{subpath}", "status_code": 404} except PermissionError: - return {"error": "Permission denied", "status_code": 403} + return {"error": f"Permission denied: {fsp_name}/{subpath}", "status_code": 403} def _action_head_file(request: dict, ctx: WorkerContext) -> dict: diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 3381f8f6..7c95a734 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -49,7 +49,9 @@ class WorkerError(Exception): """Raised when a worker returns an error response.""" - pass + def __init__(self, message: str, status_code: int = 500): + super().__init__(message) + self.status_code = status_code class WorkerDead(Exception): @@ -113,7 +115,10 @@ async def execute(self, action: str, **kwargs) -> Any: fh = response.pop("_file_handle", None) if fh is not None: fh.close() - raise WorkerError(response["error"]) + raise WorkerError( + response["error"], + status_code=response.get("status_code", 500), + ) return response except (BrokenPipeError, ConnectionResetError, OSError) as e: From b613c6db4c6e91fb8f5e8a6ac75f40d3411c58a9 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:21:07 -0400 Subject: [PATCH 09/19] enforce max_workers cap when all workers are busy When the pool is at capacity and all workers are busy, _evict_lru() returns without evicting. Previously, get_worker() would unconditionally spawn a new worker anyway, silently exceeding the limit. Now it raises a WorkerError(503) so the caller gets a proper "try again later" instead of unbounded subprocess growth. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/worker_pool.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 7c95a734..32564ce5 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -254,6 +254,10 @@ async def get_worker(self, username: str) -> UserWorker: if len(self._workers) >= self.max_workers: await self._evict_lru() + # If still at capacity (all workers busy), refuse rather than exceed the limit + if len(self._workers) >= self.max_workers: + raise WorkerError("Worker pool at capacity, try again later", status_code=503) + # Spawn new worker new_worker = await self._spawn_worker(username) self._workers[username] = new_worker From 5fc0bdf07fdf121721a83c5bc709dae35b81f3f4 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:21:31 -0400 Subject: [PATCH 10/19] use settings for worker pool max_workers and idle_timeout WorkerPool.__init__ was hardcoding max_workers=50 and idle_timeout=300 instead of reading from Settings.worker_pool_max_workers and worker_pool_idle_timeout. The eviction loop sleep is now also capped at the idle_timeout so shorter timeouts are honored promptly. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/worker_pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 32564ce5..ff607ebe 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -221,8 +221,8 @@ def __init__(self, settings: Settings): self._workers: dict[str, UserWorker] = {} self._locks: dict[str, asyncio.Lock] = {} self._eviction_task: Optional[asyncio.Task] = None - self.max_workers = 50 - self.idle_timeout = 300 # seconds + self.max_workers = settings.worker_pool_max_workers + self.idle_timeout = settings.worker_pool_idle_timeout def _get_lock(self, username: str) -> asyncio.Lock: if username not in self._locks: @@ -351,7 +351,7 @@ async def start_eviction_loop(self): async def _eviction_loop(self): """Periodically evict idle workers.""" while True: - await asyncio.sleep(60) + await asyncio.sleep(min(60, self.idle_timeout)) now = time.monotonic() to_evict = [] for name, worker in list(self._workers.items()): From 8841c9d928d030613598dd61d7da613ae2db6307 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:21:58 -0400 Subject: [PATCH 11/19] fix file descriptor leaks in _send_and_recv Two fd leak paths: (1) if an exception occurs mid-receive, any fds already collected from SCM_RIGHTS ancillary data were never closed; (2) if multiple fds arrived (unlikely but possible), only fds[0] was wrapped and the rest leaked. Now all fds are closed on error, and extra fds beyond the first are closed on success. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/worker_pool.py | 84 ++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index ff607ebe..1690f2a5 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -142,43 +142,57 @@ def _send_and_recv(self, request: dict) -> dict: # Receive header + payload + optional fd, all via recvmsg fds = array.array("i") raw = b"" - # First, read at least the header - while len(raw) < _HEADER_SIZE: - msg, ancdata, flags, addr = self.sock.recvmsg( - max(_HEADER_SIZE - len(raw), 4096), - socket.CMSG_LEN(struct.calcsize("i")), - ) - if not msg: - raise ConnectionError("Worker closed connection") - raw += msg - for cmsg_level, cmsg_type, cmsg_data in ancdata: - if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: - fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - - (length,) = struct.unpack(_HEADER_FMT, raw[:_HEADER_SIZE]) - if length > _MAX_MESSAGE_SIZE: - raise WorkerError(f"Response too large: {length} bytes") - - # We may have read some payload bytes with the header - total_needed = _HEADER_SIZE + length - while len(raw) < total_needed: - msg, ancdata, flags, addr = self.sock.recvmsg( - total_needed - len(raw), - socket.CMSG_LEN(struct.calcsize("i")), - ) - if not msg: - raise ConnectionError("Worker closed connection mid-message") - raw += msg - for cmsg_level, cmsg_type, cmsg_data in ancdata: - if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: - fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - - body = raw[_HEADER_SIZE:_HEADER_SIZE + length] - response = json.loads(body) - - # If an fd arrived, wrap it in a file object + try: + # First, read at least the header + while len(raw) < _HEADER_SIZE: + msg, ancdata, flags, addr = self.sock.recvmsg( + max(_HEADER_SIZE - len(raw), 4096), + socket.CMSG_LEN(struct.calcsize("i")), + ) + if not msg: + raise ConnectionError("Worker closed connection") + raw += msg + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + (length,) = struct.unpack(_HEADER_FMT, raw[:_HEADER_SIZE]) + if length > _MAX_MESSAGE_SIZE: + raise WorkerError(f"Response too large: {length} bytes") + + # We may have read some payload bytes with the header + total_needed = _HEADER_SIZE + length + while len(raw) < total_needed: + msg, ancdata, flags, addr = self.sock.recvmsg( + total_needed - len(raw), + socket.CMSG_LEN(struct.calcsize("i")), + ) + if not msg: + raise ConnectionError("Worker closed connection mid-message") + raw += msg + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + + body = raw[_HEADER_SIZE:_HEADER_SIZE + length] + response = json.loads(body) + except Exception: + # Close any fds received before the error to prevent leaks + for fd_val in fds: + try: + os.close(fd_val) + except OSError: + pass + raise + + # If an fd arrived, wrap it in a file object and close any extras if fds: response["_file_handle"] = os.fdopen(fds[0], "rb") + for extra_fd in fds[1:]: + try: + os.close(extra_fd) + except OSError: + pass return response From 18be1aee6415bbaf89a7c7ce7bc55bd74eb94f3a Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:22:19 -0400 Subject: [PATCH 12/19] add error handling to _action_get_file_info and _action_check_binary These handlers were missing try/except for FileNotFoundError and PermissionError, unlike all neighboring file action handlers. Exceptions would propagate to the worker main loop's generic handler and return 500 instead of proper 404/403. Also add status_code to _get_filestore error responses for consistency with other handlers. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/user_worker.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py index 56802b86..39ff7ae6 100644 --- a/fileglancer/user_worker.py +++ b/fileglancer/user_worker.py @@ -253,13 +253,18 @@ def _action_get_file_info(request: dict, ctx: WorkerContext) -> dict: filestore, error = _get_filestore(fsp_name, ctx.db_url) if filestore is None: - return {"error": error} + return {"error": error, "status_code": 404 if "not found" in error else 500} from fileglancer import database as db - with db.get_db_session(ctx.db_url) as session: - file_info = filestore.get_file_info(subpath, current_user=ctx.username, session=session) - return {"info": json.loads(file_info.model_dump_json())} + try: + with db.get_db_session(ctx.db_url) as session: + file_info = filestore.get_file_info(subpath, current_user=ctx.username, session=session) + return {"info": json.loads(file_info.model_dump_json())} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} def _action_check_binary(request: dict, ctx: WorkerContext) -> dict: @@ -269,10 +274,15 @@ def _action_check_binary(request: dict, ctx: WorkerContext) -> dict: filestore, error = _get_filestore(fsp_name, ctx.db_url) if filestore is None: - return {"error": error} + return {"error": error, "status_code": 404 if "not found" in error else 500} - is_binary = filestore.check_is_binary(subpath) - return {"is_binary": is_binary} + try: + is_binary = filestore.check_is_binary(subpath) + return {"is_binary": is_binary} + except FileNotFoundError: + return {"error": "File or directory not found", "status_code": 404} + except PermissionError: + return {"error": "Permission denied", "status_code": 403} def _action_open_file(request: dict, ctx: WorkerContext) -> dict: From aa73390f09fd7207fa47916a5b91b457f1c44cc6 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:22:49 -0400 Subject: [PATCH 13/19] use os.getgrouplist() for worker supplementary groups _spawn_worker() was using grp.getgrall() to build the child process's supplementary groups. On LDAP/NSS-backed systems, getgrall() may not enumerate all effective groups, causing workers to lose access to files that the user can normally reach. os.getgrouplist() queries NSS directly and matches the approach used by user_context.py. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/worker_pool.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 1690f2a5..7a66beb1 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -25,7 +25,6 @@ import array import asyncio -import grp import json import os import pwd @@ -284,9 +283,7 @@ async def _spawn_worker(self, username: str) -> UserWorker: # Build identity kwargs (only switch if running as root) identity_kwargs: dict = {} if os.geteuid() == 0: - groups = [g.gr_gid for g in grp.getgrall() if username in g.gr_mem] - if pw.pw_gid not in groups: - groups.append(pw.pw_gid) + groups = os.getgrouplist(username, pw.pw_gid) identity_kwargs = { "user": pw.pw_uid, "group": pw.pw_gid, From f351e7db5f0730d26cd0daf850396141430c1878 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:23:02 -0400 Subject: [PATCH 14/19] replace bare except with except Exception in head_object Bare except: catches SystemExit and KeyboardInterrupt, preventing clean shutdown. Use except Exception: instead. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fileglancer/server.py b/fileglancer/server.py index 13640c55..c3c948a3 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -1198,7 +1198,7 @@ async def head_object(sharing_key: str, path: str = ''): target_name=info["target_name"], path=subpath) return Response(headers=result.get("headers", {}), status_code=result.get("status_code", 200)) - except: + except Exception: logger.opt(exception=sys.exc_info()).info("Error requesting head") return get_error_response(500, "InternalError", "Error requesting HEAD", path) From 5d678f854b139ddb0dc9ea696e306449c046ecd5 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:23:30 -0400 Subject: [PATCH 15/19] use model_dump(mode='json') instead of json.loads(model_dump_json()) The json.loads(model.model_dump_json()) pattern serializes to a JSON string then immediately parses it back to a dict. model_dump(mode='json') produces the same dict directly without the string round-trip, which matters for directory listings with many files. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/user_worker.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py index 39ff7ae6..def72134 100644 --- a/fileglancer/user_worker.py +++ b/fileglancer/user_worker.py @@ -163,12 +163,12 @@ def _action_list_dir(request: dict, ctx: WorkerContext) -> dict: try: with db.get_db_session(ctx.db_url) as session: file_info = filestore.get_file_info(subpath, current_user=current_user, session=session) - result = {"info": json.loads(file_info.model_dump_json())} + result = {"info": file_info.model_dump(mode="json")} if file_info.is_dir: try: files = list(filestore.yield_file_infos(subpath, current_user=current_user, session=session)) - result["files"] = [json.loads(f.model_dump_json()) for f in files] + result["files"] = [f.model_dump(mode="json") for f in files] except PermissionError: result["files"] = [] result["error"] = "Permission denied when listing directory contents" @@ -211,7 +211,7 @@ def _action_list_dir_paged(request: dict, ctx: WorkerContext) -> dict: try: with db.get_db_session(ctx.db_url) as session: file_info = filestore.get_file_info(subpath, current_user=current_user, session=session) - result = {"info": json.loads(file_info.model_dump_json())} + result = {"info": file_info.model_dump(mode="json")} if file_info.is_dir: try: @@ -219,7 +219,7 @@ def _action_list_dir_paged(request: dict, ctx: WorkerContext) -> dict: subpath, current_user=current_user, session=session, limit=limit, cursor=cursor ) - result["files"] = [json.loads(f.model_dump_json()) for f in files] + result["files"] = [f.model_dump(mode="json") for f in files] result["has_more"] = has_more result["next_cursor"] = next_cursor result["total_count"] = total_count @@ -260,7 +260,7 @@ def _action_get_file_info(request: dict, ctx: WorkerContext) -> dict: try: with db.get_db_session(ctx.db_url) as session: file_info = filestore.get_file_info(subpath, current_user=ctx.username, session=session) - return {"info": json.loads(file_info.model_dump_json())} + return {"info": file_info.model_dump(mode="json")} except FileNotFoundError: return {"error": "File or directory not found", "status_code": 404} except PermissionError: @@ -354,7 +354,7 @@ def _action_head_file(request: dict, ctx: WorkerContext) -> dict: is_binary = filestore.check_is_binary(subpath) if not file_info.is_dir else False return { - "info": json.loads(file_info.model_dump_json()), + "info": file_info.model_dump(mode="json"), "content_type": content_type, "is_binary": is_binary, } @@ -477,7 +477,7 @@ def _action_update_file(request: dict, ctx: WorkerContext) -> dict: try: old_file_info = filestore.get_file_info(subpath, ctx.username) - result = {"info": json.loads(old_file_info.model_dump_json())} + result = {"info": old_file_info.model_dump(mode="json")} if new_permissions is not None and new_permissions != old_file_info.permissions: filestore.change_file_permissions(subpath, new_permissions) From 6b66a5a260a935147fc3a0f17298660904165d61 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:23:51 -0400 Subject: [PATCH 16/19] add error handling for dev-mode worker action dispatch In dev/test mode (no worker pool), exceptions from action handlers would propagate unhandled to the generic exception handler. In production, the worker subprocess catches these and returns error dicts. Now dev mode also catches and logs action handler exceptions, matching the production error behavior more closely. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/fileglancer/server.py b/fileglancer/server.py index c3c948a3..d0a27500 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -257,7 +257,11 @@ async def _worker_exec(username: str, action: str, **kwargs): raise HTTPException(status_code=500, detail=f"Unknown action: {action}") ctx = WorkerContext(username=username, db_url=settings.db_url) request = {"action": action, **kwargs} - result = handler(request, ctx) + try: + result = handler(request, ctx) + except Exception as e: + logger.error(f"Action handler error for {username} action={action}: {e}") + raise HTTPException(status_code=500, detail=str(e)) # Strip the raw fd (not meaningful in-process), keep _file_handle result.pop("_fd", None) return result From 7776d6aa773357a2deb0dbfd4c18b012ea5ca658 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:24:13 -0400 Subject: [PATCH 17/19] use ctx.db_url consistently in job file action handlers _action_get_job_file_paths and _action_get_service_url were calling get_settings() to get the db_url while every other handler uses ctx.db_url. Use ctx.db_url for consistency with the rest of the worker action handlers. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/user_worker.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fileglancer/user_worker.py b/fileglancer/user_worker.py index def72134..d70e8295 100644 --- a/fileglancer/user_worker.py +++ b/fileglancer/user_worker.py @@ -602,12 +602,10 @@ def _action_get_job_file_paths(request: dict, ctx: WorkerContext) -> dict: """Get job file path info.""" from fileglancer.apps.core import get_job_file_paths from fileglancer import database as db - from fileglancer.settings import get_settings - settings = get_settings() job_id = request["job_id"] - with db.get_db_session(settings.db_url) as session: + with db.get_db_session(ctx.db_url) as session: db_job = db.get_job(session, job_id, ctx.username) if db_job is None: return {"error": f"Job {job_id} not found", "status_code": 404} @@ -619,12 +617,10 @@ def _action_get_service_url(request: dict, ctx: WorkerContext) -> dict: """Read service URL from job work directory.""" from fileglancer.apps.core import get_service_url from fileglancer import database as db - from fileglancer.settings import get_settings - settings = get_settings() job_id = request["job_id"] - with db.get_db_session(settings.db_url) as session: + with db.get_db_session(ctx.db_url) as session: db_job = db.get_job(session, job_id, ctx.username) if db_job is None: return {"error": f"Job {job_id} not found", "status_code": 404} From 626da3d3befe60b634d1e804385b8afa39574dfb Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Sun, 12 Apr 2026 12:24:27 -0400 Subject: [PATCH 18/19] add 120s socket timeout to worker IPC The worker socket had no timeout, so if a worker hung mid-response the run_in_executor thread would block indefinitely. Add a 120s timeout so the socket raises and the request fails rather than silently tying up a thread forever. Co-Authored-By: Claude Opus 4.6 (1M context) --- fileglancer/worker_pool.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fileglancer/worker_pool.py b/fileglancer/worker_pool.py index 7a66beb1..81a0cc6f 100644 --- a/fileglancer/worker_pool.py +++ b/fileglancer/worker_pool.py @@ -317,8 +317,10 @@ async def _spawn_worker(self, username: str) -> UserWorker: # Close child's end in the parent child_sock.close() - # Keep the socket blocking — all I/O runs in a thread via run_in_executor + # Keep the socket blocking — all I/O runs in a thread via run_in_executor. + # Set a timeout so a hung worker can't block a thread forever. parent_sock.setblocking(True) + parent_sock.settimeout(120) # Start a background task to forward worker stderr to loguru asyncio.create_task(self._forward_stderr(username, process)) From 7a7e407d0392b60dd709c01084d833662c755543 Mon Sep 17 00:00:00 2001 From: Konrad Rokicki Date: Mon, 13 Apr 2026 09:00:02 -0400 Subject: [PATCH 19/19] remove dead code (EffectiveUserContext) --- fileglancer/apps/core.py | 2 +- fileglancer/server.py | 15 ---- fileglancer/user_context.py | 132 ------------------------------------ 3 files changed, 1 insertion(+), 148 deletions(-) delete mode 100644 fileglancer/user_context.py diff --git a/fileglancer/apps/core.py b/fileglancer/apps/core.py index d3a15045..31c2f2c2 100644 --- a/fileglancer/apps/core.py +++ b/fileglancer/apps/core.py @@ -140,7 +140,7 @@ async def _ensure_repo_cache(url: str, pull: bool = False, When username is provided, the work is delegated to a worker subprocess that runs with the target user's real UID/GID, avoiding the process-wide - euid race condition that EffectiveUserContext has with concurrent async + euid race condition that seteuid/setegid has with concurrent async requests. When username is None, git commands run in-process (used by the worker subprocess itself, or in single-user dev mode). """ diff --git a/fileglancer/server.py b/fileglancer/server.py index d0a27500..99a09a9a 100644 --- a/fileglancer/server.py +++ b/fileglancer/server.py @@ -35,7 +35,6 @@ from fileglancer.settings import get_settings from fileglancer.issues import create_jira_ticket, get_jira_ticket_details, delete_jira_ticket from fileglancer.utils import format_timestamp, guess_content_type, parse_range_header -from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext, UserContextConfigurationError from fileglancer.filestore import Filestore, RootCheckError from fileglancer.log import AccessLogMiddleware from fileglancer.worker_pool import WorkerPool, WorkerError, WorkerDead @@ -218,12 +217,6 @@ def create_app(settings): # Per-user persistent worker pool (only used when use_access_flags=True) worker_pool = WorkerPool(settings) if settings.use_access_flags else None - def _get_user_context(username: str) -> UserContext: - if settings.use_access_flags: - return EffectiveUserContext(username) - else: - return CurrentUserContext() - async def _worker_exec(username: str, action: str, **kwargs): """Dispatch an action to the per-user worker and return the result. @@ -466,14 +459,6 @@ async def validation_exception_handler(request, exc): return JSONResponse({"error":str(exc)}, status_code=400) - @app.exception_handler(UserContextConfigurationError) - async def user_context_config_error_handler(request, exc): - logger.error(f"User context configuration error: {exc}") - return JSONResponse( - {"error": str(exc)}, - status_code=500 - ) - @app.exception_handler(PermissionError) async def permission_error_handler(request, exc): error_msg = str(exc) diff --git a/fileglancer/user_context.py b/fileglancer/user_context.py deleted file mode 100644 index 6c455553..00000000 --- a/fileglancer/user_context.py +++ /dev/null @@ -1,132 +0,0 @@ -import os -import pwd -from contextlib import AbstractContextManager - -from loguru import logger - -from fileglancer.settings import get_settings - - -class UserContextConfigurationError(PermissionError): - """ - Raised when user context setup fails due to configuration issues. - This happens when use_access_flags=true but the server is not running with sufficient privileges. - """ - def __init__(self, message: str = "Server configuration error: Run the server as root or set use_access_flags=false in config.yaml"): - super().__init__(message) - - -class UserContext(AbstractContextManager): - """ - Base no-op proxy context that does nothing. - """ - def __exit__(self, exc_type, exc_val, exc_tb): - return False - - -class CurrentUserContext(UserContext): - """ - A context manager the keeps the current user context. - """ - pass - - -class EffectiveUserContext(UserContext): - """ - A context manager for setting the user and group context for a process using seteuid/setegid access flags. - """ - def __init__(self, username: str): - self.username = username - self._uid = os.getuid() - self._gid = os.getgid() - self._gids = os.getgrouplist(pwd.getpwuid(self._uid).pw_name, self._gid) - self._user = None - - def __enter__(self): - logger.debug( - f"EffectiveUserContext entering for {self.username} " - f"(current euid={os.geteuid()} egid={os.getegid()})" - ) - user = pwd.getpwnam(self.username) - - uid = user.pw_uid - gid = user.pw_gid - gids = os.getgrouplist(self.username, gid) - try: - os.setegid(gid) - except PermissionError as e: - logger.error(f"Failed to set the effective gid: {e}") - settings = get_settings() - if settings.use_access_flags: - raise UserContextConfigurationError() from e - else: - raise - except Exception as e: - logger.error(f"Failed to set the effective gid: {e}") - raise e - - try: - # the maximum number of groups that could be set is os.sysconf("SC_NGROUPS_MAX") - # so if the current user has more than that an exception will be raised - # for now I don't limit this because I want to see if this will happen - if len(gids) > os.sysconf("SC_NGROUPS_MAX"): - logger.warning(( - f"User {self.username} is part of {len(gids)} groups " - f"which is greater than {os.sysconf("SC_NGROUPS_MAX")} " - "so this may result in an error" - )) - os.setgroups(gids) - except PermissionError as e: - logger.error(f"Failed to set the user groups: {e}") - # reset egid first - os.setegid(self._gid) - settings = get_settings() - if settings.use_access_flags: - raise UserContextConfigurationError() from e - else: - raise - except Exception as e: - logger.error(f"Failed to set the user groups: {e}") - # reset egid first - os.setegid(self._gid) - raise e - - try: - os.seteuid(uid) - except PermissionError as e: - logger.error(f"Failed to set euid: {e}") - # reset egid - os.setegid(self._gid) - settings = get_settings() - if settings.use_access_flags: - raise UserContextConfigurationError() from e - else: - raise - except Exception as e: - logger.error(f"Failed to set euid: {e}") - # reset egid - os.setegid(self._gid) - raise e - - self._user = user - logger.debug( - f"EffectiveUserContext now running as {self.username} " - f"(euid={os.geteuid()} egid={os.getegid()})" - ) - - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - logger.debug( - f"EffectiveUserContext exiting for {self.username} " - f"(restoring euid={self._uid} egid={self._gid})" - ) - os.seteuid(self._uid) - os.setegid(self._gid) - if len(self._gids) > os.sysconf("SC_NGROUPS_MAX"): - logger.info(f"Truncate original {len(self._gids)} groups to max allowed to set: {os.sysconf("SC_NGROUPS_MAX")}") - os.setgroups(self._gids[:os.sysconf("SC_NGROUPS_MAX")]) - else: - os.setgroups(self._gids) - self._user = None - return False