-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrss_bot.py
More file actions
178 lines (138 loc) · 5.29 KB
/
rss_bot.py
File metadata and controls
178 lines (138 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import configparser
from difflib import SequenceMatcher
import logging
from logging.handlers import RotatingFileHandler
import time
from threading import Thread
from flask import Flask
import defusedxml.ElementTree as ElemTree # Заменил стандартный парсер на безопасную версию.
from newspaper import Article
import requests
# Configure configparser
config = configparser.ConfigParser()
config.read('config.ini')
# Configure root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a rotating file handler
handler = RotatingFileHandler('error.log', maxBytes=100000, backupCount=2, encoding='utf-8') # 100000 bytes = 100 KB
formatter = logging.Formatter('%(asctime)s \t %(name)s \t %(levelname)s \t %(message)s', datefmt='%d-%m-%Y %H:%M:%S')
handler.setFormatter(formatter)
# Add the handler to the root logger
logger.addHandler(handler)
URL = 'https://lenta.ru/rss'
def sim(a: str, b: str) -> float:
"""
A function that calculates the similarity ratio between two input sequences.
Parameters:
a (any): The first input sequence.
b (any): The second input sequence.
Returns:
float: The similarity ratio between the two input sequences.
"""
return SequenceMatcher(None, a, b).ratio()
def parse_text(url: str) -> str:
"""
Parses the text content from the given URL and returns it.
Parameters:
url (str): The URL of the article to parse
Returns:
str: The parsed text content
"""
article = Article(url, language='ru') # Create Article object for the given URL
article.download() # Download the article content
article.parse() # Parse the article
# If no text is extracted, return an empty string
if not article.text:
return ''
# Clean up the text content
article_text = article.text.replace('\n\n', '\n')
article_text = article_text.split('\n')[1:] # Remove the title
# Add period at the end of each line if not present
article_text = [line + '.' if line and not line.endswith('.') else line for line in article_text]
try:
# Check similarity between the first two lines and remove if similar
similarity = sim(article_text[0], article_text[1])
if similarity >= 0.3:
article_text = article_text[1:]
except Exception as e:
print('error:', e)
# Find and remove the last line containing 'Ранее'
ind = max([i for i, line in enumerate(article_text) if 'Ранее' in line], default=50)
article_text = '\n'.join(line for line in article_text[:ind] if line) # Join non-empty lines with newline
return article_text
def fetch_rss_feed(url) -> None:
"""Download and save RSS feed."""
try:
response = requests.get(URL, timeout=5)
response.raise_for_status()
with open('lenta.xml', 'wb') as f:
f.write(response.content)
logging.info('Successfully fetched Lenta RSS')
except Exception as e:
print('Error:', e)
logging.error(e)
finally:
if 'response' in locals():
response.close()
def process_xml_content():
tree = ElemTree.parse('lenta.xml') # Parse the XML file
root = tree.getroot() # Get the root of the XML tree
# Parse
for ind, item in enumerate(root.iter('item'), start=1):
title = item.find('title').text
print(ind, '-', title)
link = None
try:
category = item.find('category').text
if category not in ('Путешествия', 'Спорт'):
for element in item:
if element.tag == 'link':
link = element.text
if element.tag in ('author', 'category', 'guid'):
item.remove(element)
if element.tag == 'description' and len(element.text) < 10:
element.text = parse_text(link) # Parse and update description if condition is met
tree.write('output.xml', encoding='utf-8') # Write the updated XML tree to a new file
except Exception as e:
logging.error(e, title)
continue
print('RSS parsed successfully!')
def parse_lenta_rss() -> None:
"""
Function to parse the RSS feed from Lenta.ru
"""
while True:
start = time.time()
try:
# 1. Fetch RSS.
fetch_rss_feed(URL)
# 2. Parse and process XML.
process_xml_content()
end = time.time()
mes = f'Elapsed time: {end - start}'
print(mes)
logging.info(mes)
except Exception as e:
print('Error:', e)
logging.error(e)
time.sleep(60 * 60) # Wait 1 hour
thread = Thread(target=parse_lenta_rss)
thread.start()
app = Flask(__name__)
@app.route('/')
def hello_world() -> str:
"""
A function that returns a message based on whether a thread is alive.
"""
message = '🟢' if thread.is_alive() else '🔴'
return message
@app.route('/rss')
def index():
with open('output.xml', 'r', encoding='utf-8') as f:
rss = f.readlines()
return ''.join(rss) # rss
if __name__ == '__main__':
host = config['settings']['host']
port = config['settings'].getint('port')
app.run(debug=False, host=host, port=port)