Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions ckanext/digitizationknowledge/helpers.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,92 @@
import ckan.plugins.toolkit as toolkit
import ckan.model as model
import ckan.authz as authz
from sqlalchemy import and_, not_, exists
from typing import Any

# Define Template helper functions

def is_group_private(group):
"""
Template helper to check if a group is private.

Args:
group: A group dict or group object

Returns:
bool: True if group is marked as private
"""
# Handle dict format (from group_show)
if isinstance(group, dict):
# Check direct field (may be present in some cases)
if 'is_private' in group:
val = group['is_private']
if isinstance(val, str):
return val.lower() in ['true', '1', 'yes', 'on']
return bool(val)

# Check extras list format
extras = group.get('extras', [])
for extra in extras:
if extra.get('key') == 'is_private':
val = extra.get('value')
if isinstance(val, str):
return val.lower() in ['true', '1', 'yes', 'on']
return bool(val)

# Handle model object format
elif hasattr(group, 'extras') and 'is_private' in group.extras:
val = group.extras['is_private']
if isinstance(val, str):
return val.lower() in ['true', '1', 'yes', 'on']
return bool(val)

return False


def user_can_view_group(group_name_or_id):
"""
Template helper to check if current user can view a group.

Args:
group_name_or_id: Group name or ID

Returns:
bool: True if user can view the group
"""
try:
user = toolkit.current_user.name if toolkit.current_user.is_authenticated else None
context = {'user': user}
toolkit.check_access('group_show', context, {'id': group_name_or_id})
return True
except toolkit.NotAuthorized:
return False


def get_custom_featured_groups(count: int = 1):
'''
Returns a list of featured groups using the is_featured field.
Excludes private groups from featured listings.
Efficiently queries database first to find featured groups, then gets full details.
'''
try:
# Subquery to find private groups
private_groups_subquery = model.Session.query(model.GroupExtra.group_id).filter(
model.GroupExtra.key == 'is_private',
model.GroupExtra.value.in_(['True', 'true', '1', 'yes'])
).subquery()

# Query database directly for featured group names (fast!)
# Exclude private groups from featured results
query = model.Session.query(model.Group.name).join(
model.GroupExtra,
model.Group.id == model.GroupExtra.group_id
).filter(
model.Group.is_organization == False,
model.Group.state == 'active',
model.GroupExtra.key == 'is_featured',
model.GroupExtra.value.in_(['True', 'true', '1', 'yes'])
model.GroupExtra.value.in_(['True', 'true', '1', 'yes']),
# Exclude private groups
~model.Group.id.in_(private_groups_subquery)
).distinct().limit(count)

featured_names = [name for name, in query.all()]
Expand All @@ -44,6 +113,7 @@ def get_custom_featured_groups(count: int = 1):
except Exception:
return []


def get_custom_featured_organizations(count: int = 1):
'''
Returns a list of featured organizations using the is_featured field.
Expand Down Expand Up @@ -85,10 +155,11 @@ def get_custom_featured_organizations(count: int = 1):
except Exception:
return []


def get_helpers():
return {
"get_custom_featured_groups": get_custom_featured_groups,
"get_custom_featured_organizations": get_custom_featured_organizations,
"is_group_private": is_group_private,
"user_can_view_group": user_can_view_group,
}

# nameCallableFromTemplate:nameOfFunction
93 changes: 93 additions & 0 deletions ckanext/digitizationknowledge/logic/action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,56 @@
import ckan.plugins.toolkit as tk
import ckan.authz as authz
import ckan.model as model
import ckanext.digitizationknowledge.logic.schema as schema


def _is_group_private_by_id(group_id):
"""
Check if a group is private by querying its extras.

Args:
group_id: The group ID or name

Returns:
bool: True if group is marked as private
"""
try:
# Query group extras for is_private field
group = model.Group.get(group_id)
if not group:
return False

extra = model.Session.query(model.GroupExtra).filter(
model.GroupExtra.group_id == group.id,
model.GroupExtra.key == 'is_private'
).first()

if extra:
val = extra.value
if isinstance(val, str):
return val.lower() in ['true', '1', 'yes', 'on']
return bool(val)
return False
except Exception:
return False


def _user_is_member_of_group(user, group_id):
"""
Check if a user is a member of a group.

Args:
user: Username string
group_id: The group ID

Returns:
bool: True if user is a member
"""
if not user:
return False
return authz.has_user_permission_for_group_or_org(group_id, user, 'read')


@tk.side_effect_free
def digitizationknowledge_get_sum(context, data_dict):
tk.check_access(
Expand All @@ -19,7 +68,51 @@ def digitizationknowledge_get_sum(context, data_dict):
}


@tk.side_effect_free
@tk.chained_action
def group_list(original_action, context, data_dict):
"""
Override default group_list to filter out private groups for non-members.

- Sysadmins see all groups
- Regular users see public groups + private groups they're members of
- Anonymous users see only public groups
"""
user = context.get('user')

# Sysadmins see everything — no need to modify context
if authz.is_sysadmin(user):
return original_action(context, data_dict)

# For all other users: bypass auth in the core action to prevent
# group_show from throwing NotAuthorized on private groups when
# all_fields=True. We filter private groups out ourselves below.
safe_context = context.copy()
safe_context['ignore_auth'] = True
all_groups = original_action(safe_context, data_dict)

filtered_groups = []
for group in all_groups:
# Get group ID/name depending on response format
if isinstance(group, dict):
group_id = group.get('id') or group.get('name')
else:
group_id = group # Just a string name

# Check if this group is private
if _is_group_private_by_id(group_id):
# Private group - only include if user is a member
if user and _user_is_member_of_group(user, group_id):
filtered_groups.append(group)
else:
# Public group - include for everyone
filtered_groups.append(group)

return filtered_groups


def get_actions():
return {
'digitizationknowledge_get_sum': digitizationknowledge_get_sum,
'group_list': group_list,
}
Loading