|
12 | 12 | from urllib.parse import urlparse |
13 | 13 |
|
14 | 14 | from telegram import Message, MessageEntity, Update |
15 | | -from telegram.ext import ContextTypes |
| 15 | +from telegram.ext import ApplicationHandlerStop, ContextTypes |
16 | 16 |
|
17 | 17 | from bot.constants import ( |
| 18 | + INLINE_KEYBOARD_SPAM_NOTIFICATION, |
| 19 | + INLINE_KEYBOARD_SPAM_NOTIFICATION_NO_RESTRICT, |
18 | 20 | NEW_USER_SPAM_RESTRICTION, |
19 | 21 | NEW_USER_SPAM_WARNING, |
20 | 22 | RESTRICTED_PERMISSIONS, |
@@ -194,6 +196,139 @@ def has_non_whitelisted_link(message: Message) -> bool: |
194 | 196 | return False |
195 | 197 |
|
196 | 198 |
|
| 199 | +def has_non_whitelisted_inline_keyboard_urls(message: Message) -> bool: |
| 200 | + """ |
| 201 | + Check if a message contains inline keyboard buttons with non-whitelisted URLs. |
| 202 | +
|
| 203 | + Regular Telegram users cannot create inline keyboards from the client. |
| 204 | + Messages with inline keyboard URL buttons pointing to non-whitelisted |
| 205 | + domains are considered spam. Checks url, login_url, and web_app button types. |
| 206 | +
|
| 207 | + Args: |
| 208 | + message: Telegram message to check. |
| 209 | +
|
| 210 | + Returns: |
| 211 | + bool: True if any inline keyboard button has a non-whitelisted URL. |
| 212 | + """ |
| 213 | + rm = getattr(message, "reply_markup", None) |
| 214 | + keyboard = getattr(rm, "inline_keyboard", None) |
| 215 | + if not keyboard: |
| 216 | + return False |
| 217 | + |
| 218 | + for row in keyboard: |
| 219 | + if not row: |
| 220 | + continue |
| 221 | + for button in row: |
| 222 | + if not button: |
| 223 | + continue |
| 224 | + |
| 225 | + candidates: list[str] = [] |
| 226 | + if getattr(button, "url", None): |
| 227 | + candidates.append(button.url) |
| 228 | + |
| 229 | + login_url = getattr(button, "login_url", None) |
| 230 | + if getattr(login_url, "url", None): |
| 231 | + candidates.append(login_url.url) |
| 232 | + |
| 233 | + web_app = getattr(button, "web_app", None) |
| 234 | + if getattr(web_app, "url", None): |
| 235 | + candidates.append(web_app.url) |
| 236 | + |
| 237 | + for u in candidates: |
| 238 | + if not is_url_whitelisted(u): |
| 239 | + return True |
| 240 | + |
| 241 | + return False |
| 242 | + |
| 243 | + |
| 244 | +async def handle_inline_keyboard_spam( |
| 245 | + update: Update, context: ContextTypes.DEFAULT_TYPE |
| 246 | +) -> None: |
| 247 | + """ |
| 248 | + Handle spam messages containing inline keyboard buttons with non-whitelisted URLs. |
| 249 | +
|
| 250 | + Regular users cannot create inline keyboards from the Telegram client. |
| 251 | + Any group message with inline keyboard URL buttons pointing to |
| 252 | + non-whitelisted domains is treated as spam. |
| 253 | +
|
| 254 | + Args: |
| 255 | + update: Telegram update containing the message. |
| 256 | + context: Bot context with helper methods. |
| 257 | + """ |
| 258 | + if not update.message or not update.message.from_user: |
| 259 | + return |
| 260 | + |
| 261 | + group_config = get_group_config_for_update(update) |
| 262 | + if group_config is None: |
| 263 | + return |
| 264 | + |
| 265 | + user = update.message.from_user |
| 266 | + if user.is_bot: |
| 267 | + return |
| 268 | + |
| 269 | + admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, []) |
| 270 | + if user.id in admin_ids: |
| 271 | + return |
| 272 | + |
| 273 | + msg = update.message |
| 274 | + if not has_non_whitelisted_inline_keyboard_urls(msg): |
| 275 | + return |
| 276 | + |
| 277 | + user_mention = get_user_mention(user) |
| 278 | + logger.info( |
| 279 | + f"Inline keyboard spam detected: user_id={user.id}, " |
| 280 | + f"group_id={group_config.group_id}" |
| 281 | + ) |
| 282 | + |
| 283 | + try: |
| 284 | + await msg.delete() |
| 285 | + logger.info(f"Deleted inline keyboard spam from user_id={user.id}") |
| 286 | + except Exception: |
| 287 | + logger.error( |
| 288 | + f"Failed to delete inline keyboard spam: user_id={user.id}", |
| 289 | + exc_info=True, |
| 290 | + ) |
| 291 | + |
| 292 | + restricted = False |
| 293 | + try: |
| 294 | + await context.bot.restrict_chat_member( |
| 295 | + chat_id=group_config.group_id, |
| 296 | + user_id=user.id, |
| 297 | + permissions=RESTRICTED_PERMISSIONS, |
| 298 | + ) |
| 299 | + restricted = True |
| 300 | + logger.info(f"Restricted user_id={user.id} for inline keyboard spam") |
| 301 | + except Exception: |
| 302 | + logger.error( |
| 303 | + f"Failed to restrict user for inline keyboard spam: user_id={user.id}", |
| 304 | + exc_info=True, |
| 305 | + ) |
| 306 | + |
| 307 | + try: |
| 308 | + template = ( |
| 309 | + INLINE_KEYBOARD_SPAM_NOTIFICATION if restricted |
| 310 | + else INLINE_KEYBOARD_SPAM_NOTIFICATION_NO_RESTRICT |
| 311 | + ) |
| 312 | + notification_text = template.format( |
| 313 | + user_mention=user_mention, |
| 314 | + rules_link=group_config.rules_link, |
| 315 | + ) |
| 316 | + await context.bot.send_message( |
| 317 | + chat_id=group_config.group_id, |
| 318 | + message_thread_id=group_config.warning_topic_id, |
| 319 | + text=notification_text, |
| 320 | + parse_mode="Markdown", |
| 321 | + ) |
| 322 | + logger.info(f"Sent inline keyboard spam notification for user_id={user.id}") |
| 323 | + except Exception: |
| 324 | + logger.error( |
| 325 | + f"Failed to send inline keyboard spam notification: user_id={user.id}", |
| 326 | + exc_info=True, |
| 327 | + ) |
| 328 | + |
| 329 | + raise ApplicationHandlerStop |
| 330 | + |
| 331 | + |
197 | 332 | async def handle_new_user_spam( |
198 | 333 | update: Update, context: ContextTypes.DEFAULT_TYPE |
199 | 334 | ) -> None: |
@@ -249,13 +384,20 @@ async def handle_new_user_spam( |
249 | 384 | user_mention = get_user_mention(user) |
250 | 385 |
|
251 | 386 | # Check for violations (forwarded message or non-whitelisted link or external reply) |
252 | | - if not (is_forwarded(msg) or has_non_whitelisted_link(msg) or has_external_reply(msg) or has_story(msg)): |
| 387 | + if not ( |
| 388 | + is_forwarded(msg) |
| 389 | + or has_non_whitelisted_link(msg) |
| 390 | + or has_external_reply(msg) |
| 391 | + or has_story(msg) |
| 392 | + or has_non_whitelisted_inline_keyboard_urls(msg) |
| 393 | + ): |
253 | 394 | return # Not a violation |
254 | 395 |
|
255 | 396 | logger.info( |
256 | 397 | f"Probation violation detected: user_id={user.id}, " |
257 | 398 | f"forwarded={is_forwarded(msg)}, has_non_whitelisted_link={has_non_whitelisted_link(msg)}, " |
258 | | - f"external_reply={has_external_reply(msg)}, has_story={has_story(msg)}" |
| 399 | + f"external_reply={has_external_reply(msg)}, has_story={has_story(msg)}, " |
| 400 | + f"inline_keyboard_spam={has_non_whitelisted_inline_keyboard_urls(msg)}" |
259 | 401 | ) |
260 | 402 |
|
261 | 403 | # 1. Delete the violating message |
|
0 commit comments