Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def format_hours_display(hours: int) -> str:
# Anti-spam probation warning for new users
NEW_USER_SPAM_WARNING = (
"⚠️ {user_mention} baru bergabung dan sedang dalam masa percobaan.\n"
"Selama {probation_display}, kamu tidak boleh meneruskan pesan atau mengirim tautan.\n"
"Selama {probation_display}, kamu tidak boleh mengirim media (foto, video, audio, dll.), meneruskan pesan, atau mengirim tautan.\n"
"Pesan yang melanggar akan dihapus dan kamu bisa dibatasi jika terus mengulang.\n"
"Hubungi admin jika kamu membutuhkan bantuan.\n\n"
"📖 [Baca aturan grup]({rules_link})"
Expand All @@ -191,7 +191,7 @@ def format_hours_display(hours: int) -> str:
# Anti-spam restriction message when user exceeds violation threshold
NEW_USER_SPAM_RESTRICTION = (
"🚫 {user_mention} telah dibatasi karena mengirim pesan terlarang "
"(forward/link/quote eksternal) sebanyak {violation_count} kali selama masa percobaan.\n\n"
"(media/file/forward/link/quote eksternal) sebanyak {violation_count} kali selama masa percobaan.\n\n"
"📖 [Baca aturan grup]({rules_link})"
)

Expand Down
33 changes: 30 additions & 3 deletions src/bot/handlers/anti_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
This module enforces anti-spam rules including:
- Contact card spam detection (all members)
- Inline keyboard URL spam detection (all members)
- Probation enforcement for new users (forwards, links, external replies, stories)
- Probation enforcement for new users (forwards, links, external replies, stories, media)
"""

import logging
Expand Down Expand Up @@ -96,6 +96,31 @@ def has_story(message: Message) -> bool:
return message.story is not None


def has_media(message: Message) -> bool:
"""
Check if a message contains media attachments.

Media elements (photos, videos, animations, audio, voice, and video
notes) are often used in spam or can be disruptive when sent by brand
new users before they have passed their probation period.

Args:
message: Telegram message to check.

Returns:
bool: True if message contains a photo, video, animation, audio,
voice, or video note.
"""
return any([
message.photo,
message.video,
message.animation,
message.audio,
message.voice,
message.video_note,
])


def extract_urls(message: Message) -> list[str]:
"""
Extract all URLs from a message.
Expand Down Expand Up @@ -486,21 +511,23 @@ async def handle_new_user_spam(
msg = update.message
user_mention = get_user_mention(user)

# Check for violations (forwarded message or non-whitelisted link or external reply)
# Check for violations (forwarded message or non-whitelisted link or external reply or media)
if not (
is_forwarded(msg)
or has_non_whitelisted_link(msg)
or has_external_reply(msg)
or has_story(msg)
or has_non_whitelisted_inline_keyboard_urls(msg)
or has_media(msg)
):
return # Not a violation

logger.info(
f"Probation violation detected: user_id={user.id}, "
f"forwarded={is_forwarded(msg)}, has_non_whitelisted_link={has_non_whitelisted_link(msg)}, "
f"external_reply={has_external_reply(msg)}, has_story={has_story(msg)}, "
f"inline_keyboard_spam={has_non_whitelisted_inline_keyboard_urls(msg)}"
f"inline_keyboard_spam={has_non_whitelisted_inline_keyboard_urls(msg)}, "
f"has_media={has_media(msg)}"
)

# 1. Delete the violating message
Expand Down
261 changes: 260 additions & 1 deletion tests/test_anti_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
has_contact,
has_external_reply,
has_link,
has_media,
has_non_whitelisted_inline_keyboard_urls,
has_non_whitelisted_link,
has_story,
Expand Down Expand Up @@ -135,6 +136,101 @@ def test_no_story_returns_false(self):
assert has_story(msg) is False


class TestHasMedia:
"""Tests for the has_media helper function."""

def test_photo_detected(self):
"""Test that message with photo is detected."""
msg = MagicMock(spec=Message)
msg.photo = [MagicMock()]
msg.video = None
msg.animation = None
msg.document = None
msg.audio = None
msg.voice = None
msg.video_note = None

assert has_media(msg) is True

def test_video_detected(self):
"""Test that message with video is detected."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = MagicMock()
msg.animation = None
msg.document = None
msg.audio = None
msg.voice = None
msg.video_note = None

assert has_media(msg) is True

def test_animation_detected(self):
"""Test that message with animation is detected."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = None
msg.animation = MagicMock()
msg.document = None
msg.audio = None
msg.voice = None
msg.video_note = None

assert has_media(msg) is True

def test_audio_detected(self):
"""Test that message with audio is detected."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = None
msg.animation = None
msg.document = None
msg.audio = MagicMock()
msg.voice = None
msg.video_note = None

assert has_media(msg) is True

def test_voice_detected(self):
"""Test that message with voice is detected."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = None
msg.animation = None
msg.document = None
msg.audio = None
msg.voice = MagicMock()
msg.video_note = None

assert has_media(msg) is True

def test_video_note_detected(self):
"""Test that message with video_note is detected."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = None
msg.animation = None
msg.document = None
msg.audio = None
msg.voice = None
msg.video_note = MagicMock()

assert has_media(msg) is True

def test_no_media_returns_false(self):
"""Test that message without media returns False."""
msg = MagicMock(spec=Message)
msg.photo = None
msg.video = None
msg.animation = None
msg.document = None
msg.audio = None
msg.voice = None
msg.video_note = None

assert has_media(msg) is False


class TestUrlWhitelist:
"""Tests for URL whitelist functionality."""

Expand Down Expand Up @@ -305,14 +401,21 @@ def mock_update(self):
update.effective_chat = MagicMock(spec=Chat)
update.effective_chat.id = -100123456 # group_id from group_config

# Default: not forwarded, no links, no external reply, no story
# Default: not forwarded, no links, no external reply, no story, no media
update.message.forward_origin = None
update.message.external_reply = None
update.message.story = None
update.message.entities = None
update.message.caption_entities = None
update.message.text = None
update.message.caption = None
update.message.photo = None
update.message.video = None
update.message.animation = None
update.message.document = None
update.message.audio = None
update.message.voice = None
update.message.video_note = None
update.message.delete = AsyncMock()

return update
Expand Down Expand Up @@ -687,6 +790,162 @@ async def test_deletes_message_with_story(

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_media(
self, mock_update, mock_context, group_config
):
"""Test that messages with media attachments are deleted."""
mock_update.message.photo = [MagicMock()] # Any non-None value

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_video(
self, mock_update, mock_context, group_config
):
"""Test that messages with video are deleted."""
mock_update.message.video = MagicMock()

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_animation(
self, mock_update, mock_context, group_config
):
"""Test that messages with animation are deleted."""
mock_update.message.animation = MagicMock()

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_audio(
self, mock_update, mock_context, group_config
):
"""Test that messages with audio are deleted."""
mock_update.message.audio = MagicMock()

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_voice(
self, mock_update, mock_context, group_config
):
"""Test that messages with voice are deleted."""
mock_update.message.voice = MagicMock()

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_deletes_message_with_video_note(
self, mock_update, mock_context, group_config
):
"""Test that messages with video_note are deleted."""
mock_update.message.video_note = MagicMock()

mock_record = MagicMock()
mock_record.joined_at = datetime.now(UTC)

updated_record = MagicMock()
updated_record.violation_count = 1

mock_db = MagicMock()
mock_db.get_new_user_probation.return_value = mock_record
mock_db.increment_new_user_violation.return_value = updated_record

with (
patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config),
patch("bot.handlers.anti_spam.get_database", return_value=mock_db),
):
with pytest.raises(ApplicationHandlerStop):
await handle_new_user_spam(mock_update, mock_context)

mock_update.message.delete.assert_called_once()

@pytest.mark.asyncio
async def test_ignores_update_without_message(self, mock_context):
"""Test that update without message is ignored."""
Expand Down
Loading