diff --git a/src/bot/constants.py b/src/bot/constants.py index eb0001b..2940f93 100644 --- a/src/bot/constants.py +++ b/src/bot/constants.py @@ -182,7 +182,7 @@ def format_hours_display(hours: int) -> str: # Anti-spam probation warning for new users NEW_USER_SPAM_WARNING = ( "⚠️ {user_mention} baru bergabung dan sedang dalam masa percobaan.\n" - "Selama {probation_display}, kamu tidak boleh meneruskan pesan atau mengirim tautan.\n" + "Selama {probation_display}, kamu tidak boleh mengirim media (foto, video, audio, dll.), meneruskan pesan, atau mengirim tautan.\n" "Pesan yang melanggar akan dihapus dan kamu bisa dibatasi jika terus mengulang.\n" "Hubungi admin jika kamu membutuhkan bantuan.\n\n" "📖 [Baca aturan grup]({rules_link})" @@ -191,7 +191,7 @@ def format_hours_display(hours: int) -> str: # Anti-spam restriction message when user exceeds violation threshold NEW_USER_SPAM_RESTRICTION = ( "🚫 {user_mention} telah dibatasi karena mengirim pesan terlarang " - "(forward/link/quote eksternal) sebanyak {violation_count} kali selama masa percobaan.\n\n" + "(media/file/forward/link/quote eksternal) sebanyak {violation_count} kali selama masa percobaan.\n\n" "📖 [Baca aturan grup]({rules_link})" ) diff --git a/src/bot/handlers/anti_spam.py b/src/bot/handlers/anti_spam.py index 5c98e4c..a537ef7 100644 --- a/src/bot/handlers/anti_spam.py +++ b/src/bot/handlers/anti_spam.py @@ -4,7 +4,7 @@ This module enforces anti-spam rules including: - Contact card spam detection (all members) - Inline keyboard URL spam detection (all members) -- Probation enforcement for new users (forwards, links, external replies, stories) +- Probation enforcement for new users (forwards, links, external replies, stories, media) """ import logging @@ -96,6 +96,31 @@ def has_story(message: Message) -> bool: return message.story is not None +def has_media(message: Message) -> bool: + """ + Check if a message contains media attachments. + + Media elements (photos, videos, animations, audio, voice, and video + notes) are often used in spam or can be disruptive when sent by brand + new users before they have passed their probation period. + + Args: + message: Telegram message to check. + + Returns: + bool: True if message contains a photo, video, animation, audio, + voice, or video note. + """ + return any([ + message.photo, + message.video, + message.animation, + message.audio, + message.voice, + message.video_note, + ]) + + def extract_urls(message: Message) -> list[str]: """ Extract all URLs from a message. @@ -486,13 +511,14 @@ async def handle_new_user_spam( msg = update.message user_mention = get_user_mention(user) - # Check for violations (forwarded message or non-whitelisted link or external reply) + # Check for violations (forwarded message or non-whitelisted link or external reply or media) if not ( is_forwarded(msg) or has_non_whitelisted_link(msg) or has_external_reply(msg) or has_story(msg) or has_non_whitelisted_inline_keyboard_urls(msg) + or has_media(msg) ): return # Not a violation @@ -500,7 +526,8 @@ async def handle_new_user_spam( f"Probation violation detected: user_id={user.id}, " f"forwarded={is_forwarded(msg)}, has_non_whitelisted_link={has_non_whitelisted_link(msg)}, " f"external_reply={has_external_reply(msg)}, has_story={has_story(msg)}, " - f"inline_keyboard_spam={has_non_whitelisted_inline_keyboard_urls(msg)}" + f"inline_keyboard_spam={has_non_whitelisted_inline_keyboard_urls(msg)}, " + f"has_media={has_media(msg)}" ) # 1. Delete the violating message diff --git a/tests/test_anti_spam.py b/tests/test_anti_spam.py index e92806d..583b7a4 100644 --- a/tests/test_anti_spam.py +++ b/tests/test_anti_spam.py @@ -17,6 +17,7 @@ has_contact, has_external_reply, has_link, + has_media, has_non_whitelisted_inline_keyboard_urls, has_non_whitelisted_link, has_story, @@ -135,6 +136,101 @@ def test_no_story_returns_false(self): assert has_story(msg) is False +class TestHasMedia: + """Tests for the has_media helper function.""" + + def test_photo_detected(self): + """Test that message with photo is detected.""" + msg = MagicMock(spec=Message) + msg.photo = [MagicMock()] + msg.video = None + msg.animation = None + msg.document = None + msg.audio = None + msg.voice = None + msg.video_note = None + + assert has_media(msg) is True + + def test_video_detected(self): + """Test that message with video is detected.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = MagicMock() + msg.animation = None + msg.document = None + msg.audio = None + msg.voice = None + msg.video_note = None + + assert has_media(msg) is True + + def test_animation_detected(self): + """Test that message with animation is detected.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = None + msg.animation = MagicMock() + msg.document = None + msg.audio = None + msg.voice = None + msg.video_note = None + + assert has_media(msg) is True + + def test_audio_detected(self): + """Test that message with audio is detected.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = None + msg.animation = None + msg.document = None + msg.audio = MagicMock() + msg.voice = None + msg.video_note = None + + assert has_media(msg) is True + + def test_voice_detected(self): + """Test that message with voice is detected.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = None + msg.animation = None + msg.document = None + msg.audio = None + msg.voice = MagicMock() + msg.video_note = None + + assert has_media(msg) is True + + def test_video_note_detected(self): + """Test that message with video_note is detected.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = None + msg.animation = None + msg.document = None + msg.audio = None + msg.voice = None + msg.video_note = MagicMock() + + assert has_media(msg) is True + + def test_no_media_returns_false(self): + """Test that message without media returns False.""" + msg = MagicMock(spec=Message) + msg.photo = None + msg.video = None + msg.animation = None + msg.document = None + msg.audio = None + msg.voice = None + msg.video_note = None + + assert has_media(msg) is False + + class TestUrlWhitelist: """Tests for URL whitelist functionality.""" @@ -305,7 +401,7 @@ def mock_update(self): update.effective_chat = MagicMock(spec=Chat) update.effective_chat.id = -100123456 # group_id from group_config - # Default: not forwarded, no links, no external reply, no story + # Default: not forwarded, no links, no external reply, no story, no media update.message.forward_origin = None update.message.external_reply = None update.message.story = None @@ -313,6 +409,13 @@ def mock_update(self): update.message.caption_entities = None update.message.text = None update.message.caption = None + update.message.photo = None + update.message.video = None + update.message.animation = None + update.message.document = None + update.message.audio = None + update.message.voice = None + update.message.video_note = None update.message.delete = AsyncMock() return update @@ -687,6 +790,162 @@ async def test_deletes_message_with_story( mock_update.message.delete.assert_called_once() + @pytest.mark.asyncio + async def test_deletes_message_with_media( + self, mock_update, mock_context, group_config + ): + """Test that messages with media attachments are deleted.""" + mock_update.message.photo = [MagicMock()] # Any non-None value + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_deletes_message_with_video( + self, mock_update, mock_context, group_config + ): + """Test that messages with video are deleted.""" + mock_update.message.video = MagicMock() + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_deletes_message_with_animation( + self, mock_update, mock_context, group_config + ): + """Test that messages with animation are deleted.""" + mock_update.message.animation = MagicMock() + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_deletes_message_with_audio( + self, mock_update, mock_context, group_config + ): + """Test that messages with audio are deleted.""" + mock_update.message.audio = MagicMock() + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_deletes_message_with_voice( + self, mock_update, mock_context, group_config + ): + """Test that messages with voice are deleted.""" + mock_update.message.voice = MagicMock() + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_deletes_message_with_video_note( + self, mock_update, mock_context, group_config + ): + """Test that messages with video_note are deleted.""" + mock_update.message.video_note = MagicMock() + + mock_record = MagicMock() + mock_record.joined_at = datetime.now(UTC) + + updated_record = MagicMock() + updated_record.violation_count = 1 + + mock_db = MagicMock() + mock_db.get_new_user_probation.return_value = mock_record + mock_db.increment_new_user_violation.return_value = updated_record + + with ( + patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config), + patch("bot.handlers.anti_spam.get_database", return_value=mock_db), + ): + with pytest.raises(ApplicationHandlerStop): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_called_once() + @pytest.mark.asyncio async def test_ignores_update_without_message(self, mock_context): """Test that update without message is ignored."""