-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmp3ify.py
More file actions
1444 lines (1241 loc) · 58.4 KB
/
mp3ify.py
File metadata and controls
1444 lines (1241 loc) · 58.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import pathlib
import re # Added for filename sanitization
import sys
from argparse import ArgumentParser, Namespace
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
) # Added for parallel downloads
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, cast, Tuple
import eyed3
import requests # Added for album art download
import spotipy as sp
from dotenv import load_dotenv # New import for .env support
from mutagen.easyid3 import EasyID3 # Added for metadata
from mutagen.id3 import ID3 # Added for metadata/album art
from mutagen.id3._frames import APIC # Added for metadata/album art
from mutagen.id3._util import ID3NoHeaderError # Added for metadata/album art
from spotipy.oauth2 import SpotifyOAuth
from youtubesearchpython import VideosSearch # Added for YouTube search
from yt_dlp import YoutubeDL
from eyed3 import id3 # Explicitly import the id3 submodule
from eyed3.id3.frames import CommentFrame # Import CommentFrame
# Load environment variables from .env file if it exists
load_dotenv()
SPOTIFY_API_SCOPE = "user-library-read,playlist-read-private,playlist-modify-private"
CHUNK_SIZE = 100
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
# DEFAULT_MAX_WORKERS = 5 # Can replace the old MAX_WORKERS constant
# Constants for magic numbers
TRACK_FORMAT_PARTS_4 = 4 # TrackNo - Artist - Album - Name
TRACK_FORMAT_PARTS_3 = 3 # Artist - Album - Name
TRACK_FORMAT_PARTS_2 = 2 # Album - Name
@dataclass
class SpotifyConnection:
"""
Holds the authenticated Spotipy client and user information.
Attributes:
connection: The authenticated Spotipy client instance.
userid: The Spotify user ID.
username: The Spotify display name.
"""
connection: sp.Spotify
userid: Optional[str] = None
username: Optional[str] = None
@dataclass
class TrackInfo:
"""
Represents a music track, holding metadata from either MP3 tags or Spotify.
Attributes:
filename: The local filesystem path to the MP3 file (if applicable).
artist: The primary artist of the track.
album: The album the track belongs to.
title: The title of the track.
url: The Spotify URL of the track (used in 'to-spotify').
youtube_url: The found YouTube URL for the track (used in 'from-spotify').
spotify_id: The unique Spotify ID for the track.
album_art_url: The URL of the album artwork from Spotify.
"""
filename: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
title: Optional[str] = None
url: Optional[str] = None
youtube_url: Optional[str] = None
spotify_id: Optional[str] = None
album_art_url: Optional[str] = None
@property
def search_query_spotify(self) -> str:
"""
Generates a search query string suitable for the Spotify API based on
MP3 metadata. Prioritizes artist and title.
Returns:
A formatted search query string.
"""
if not self.artist or not self.title:
# Fallback to title only if artist is missing
return self.title or ""
# Use Spotify's specific field filters for better accuracy
return f"artist:{self.artist} track:{self.title}"
@property
def search_query_youtube(self) -> str:
"""
Generates a search query string suitable for YouTube, combining available
metadata to find the corresponding audio.
Returns:
A formatted search query string.
"""
query_parts = []
if self.artist:
query_parts.append(self.artist)
if self.title:
query_parts.append(self.title)
if self.album:
query_parts.append(self.album)
# Append ' audio' to hint for music/audio results over music videos
return " ".join(query_parts) + " audio"
@property
def is_valid_for_spotify_search(self) -> bool:
"""
Checks if the track has the minimum required information (title)
to be searchable on Spotify.
Returns:
True if the track has a title, False otherwise.
"""
return bool(self.title)
@property
def is_valid_for_youtube_search(self) -> bool:
"""
Checks if the track has the minimum required information (title and artist)
to be effectively searchable on YouTube.
Returns:
True if the track has both title and artist, False otherwise.
"""
return bool(self.title and self.artist)
@property
def has_spotify_url(self) -> bool:
"""
Checks if the track object has a Spotify URL associated with it.
Returns:
True if a Spotify URL exists, False otherwise.
"""
return bool(self.url)
@property
def has_youtube_url(self) -> bool:
"""
Checks if the track object has a YouTube URL associated with it.
Returns:
True if a YouTube URL exists, False otherwise.
"""
return bool(self.youtube_url)
def spotify_connect() -> SpotifyConnection:
"""
Establishes a connection to the Spotify API using OAuth2.
Handles the authentication flow and retrieves basic user information.
Returns:
A SpotifyConnection object containing the authenticated client
and user details (ID, username). Returns connection object even
if user info fails, but userid/username might be None.
"""
print("Connecting to Spotify...")
try:
# Set up the OAuth manager using environment variables or provided credentials
auth_manager = SpotifyOAuth(scope=SPOTIFY_API_SCOPE)
connection = sp.Spotify(auth_manager=auth_manager)
# Fetch current user details to confirm successful authentication
user_info = connection.current_user()
if not user_info:
print("Warning: Could not retrieve Spotify user info after authentication.")
return SpotifyConnection(connection=connection)
user_id = user_info.get("id", "")
username = user_info.get("display_name", "")
print(f"Successfully connected as Spotify user: {username} ({user_id})")
return SpotifyConnection(
connection=connection,
userid=user_id,
username=username,
)
except Exception as e:
# Catch potential errors during authentication (e.g., network issues, bad credentials)
print(f"Error connecting to Spotify: {e}")
# Still return a potentially unauthenticated connection object if possible,
# downstream functions will handle missing user ID if required.
# If sp.Spotify itself fails, this might need adjustment.
try:
# Attempt to return a connection object even on error, might be partially usable
return SpotifyConnection(connection=sp.Spotify(auth_manager=None))
except Exception: # If even creating a basic client fails
print("Fatal error: Could not create Spotify client instance.")
sys.exit(1) # Exit if connection totally fails
def spotify_check_playlist(
connection: SpotifyConnection, playlistname: str, playlistid: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Finds a user's Spotify playlist by its name or optionally by its ID.
Args:
connection: The authenticated SpotifyConnection object.
playlistname: The name of the playlist to search for.
playlistid: An optional playlist ID to also check against.
Returns:
A dictionary representing the found playlist, or None if not found.
"""
print(f"Checking for existing Spotify playlist named '{playlistname}'...")
try:
# Fetch user's playlists in batches (Spotify API limit is usually 50 per request)
# Note: This might not find *all* playlists if user has > 50.
# A more robust implementation would paginate through all playlists.
playlists = connection.connection.current_user_playlists(limit=50)
if not playlists or "items" not in playlists:
print("No playlists found or unexpected API response format.")
return None
for playlist in playlists["items"]:
# Check if the name matches
if playlist["name"] == playlistname:
print(f" Found existing playlist by name (ID: {playlist['id']}).")
# Ensure the return type matches the annotation
return cast(Dict[str, Any], playlist)
# Optionally check if the ID matches (less common scenario)
if playlistid and playlist["id"] == playlistid:
print(f" Found existing playlist by ID: {playlistid}.")
return cast(Dict[str, Any], playlist)
print(f" Playlist '{playlistname}' not found in the first 50 playlists.")
return None
except Exception as e:
print(f"Error checking for playlist: {e}")
return None
def spotify_create_playlist(
connection: SpotifyConnection, playlistname: str
) -> Optional[str]:
"""
Creates a new private Spotify playlist for the authenticated user.
Args:
connection: The authenticated SpotifyConnection object.
playlistname: The desired name for the new playlist.
Returns:
The ID of the newly created playlist, or None if creation fails.
"""
if not connection.userid:
print("Error: Spotify User ID is required to create a playlist.")
return None
print(f"Creating new private Spotify playlist named '{playlistname}' for user {connection.username}...")
try:
# API call to create the playlist
playlist_data = connection.connection.user_playlist_create(
connection.userid, playlistname, public=False
)
# Cast the result type hint for the type checker
playlist = cast(Dict[str, Any], playlist_data)
if not playlist or "id" not in playlist:
print("Error: Failed to create playlist or response missing ID.")
return None
playlistid = cast(str, playlist["id"]) # Ensure ID is treated as string
print(f"Successfully created playlist with ID: {playlistid}")
return playlistid
except Exception as e:
print(f"Error creating Spotify playlist: {e}")
return None
def mp3_walk_directory(directory: str) -> Iterator[TrackInfo]:
"""
Recursively scans a directory for MP3 files and yields TrackInfo objects.
Attempts to extract metadata from ID3 tags first, then falls back to
parsing the filename.
Args:
directory: The path to the directory to scan.
Yields:
TrackInfo objects containing metadata for each valid MP3 found.
"""
print(f"Scanning directory for MP3 files: {directory}")
search_path = pathlib.Path(directory)
if not search_path.is_dir():
print(f"Error: Directory not found: {directory}")
return
# Use glob to find all .mp3 files recursively
for filepath in search_path.glob("**/*.mp3"):
print(f"Processing file: {filepath.name}")
track_info = None # Initialize track_info
try:
# Attempt to load ID3 tags using eyed3
mp3 = eyed3.load(filepath)
# Check if loading was successful and tags exist
if mp3 and mp3.tag:
track_info = TrackInfo(
filename=str(filepath),
artist = mp3.tag.artist,
album = mp3.tag.album,
title = mp3.tag.title
)
print(f" Found ID3 tags: Artist='{track_info.artist}', Title='{track_info.title}'")
else:
# If no tags, attempt to parse from filename
print(" No ID3 tags found, attempting to parse filename...")
track_info = _parse_track_from_filename(filepath)
print(f" Parsed from filename: Artist='{track_info.artist}', Title='{track_info.title}'")
# Only yield tracks that have enough info for Spotify search (at least a title)
if track_info and track_info.is_valid_for_spotify_search:
yield track_info
elif track_info: # If track_info was created but invalid
print(" Skipping file - insufficient metadata (missing title).")
# else: Error occurred before track_info creation
except Exception as e:
# Catch errors during individual file processing (e.g., corrupted file)
print(f" Error processing file {filepath.name}: {e}")
# Continue to the next file
pass # Explicitly pass to continue loop
def _parse_track_from_filename(filepath: pathlib.Path) -> TrackInfo:
"""
Parses artist, album, and title from a filename based on common patterns.
Assumes separators like ' - '. Used as a fallback if ID3 tags are missing.
Args:
filepath: The pathlib.Path object for the MP3 file.
Returns:
A TrackInfo object populated with parsed data (can be incomplete).
"""
# Get filename without extension, replace underscores with spaces
filename = filepath.stem.replace("_", " ")
parts = [part.strip() for part in filename.split("-")] # Split by hyphen and strip whitespace
track = TrackInfo(filename=str(filepath))
num_parts = len(parts)
# Try matching known patterns based on the number of parts
if num_parts == TRACK_FORMAT_PARTS_4: # TrackNo - Artist - Album - Name
# Assuming first part is track number, skip it
track.artist = parts[1]
track.album = parts[2]
track.title = parts[3]
elif num_parts == TRACK_FORMAT_PARTS_3: # Artist - Album - Name
track.artist = parts[0]
track.album = parts[1]
track.title = parts[2]
elif num_parts == TRACK_FORMAT_PARTS_2: # Album - Name
# Cannot determine artist reliably
track.album = parts[0]
track.title = parts[1]
else:
# If no pattern matches, assume the whole filename is the title
print(f" Could not parse filename into parts: '{filename}'. Using full stem as title.")
track.title = filename
# Basic validation check after parsing
if not track.title:
print(f" Warning: Could not extract title from filename: {filepath.name}")
return track
def list_chunks(lst: List, n: int) -> Iterator[List]:
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def get_playlist_tracks(sp_conn: sp.Spotify, playlist_id: str) -> List[TrackInfo]:
"""
Fetches all track details from a specific Spotify playlist ID.
Handles pagination to retrieve all tracks.
Args:
sp_conn: Authenticated Spotipy client instance.
playlist_id: The unique ID of the Spotify playlist.
Returns:
A list of TrackInfo objects representing the tracks in the playlist.
"""
tracks: List[TrackInfo] = []
offset = 0
print(f"Fetching tracks from Spotify playlist ID: {playlist_id}")
while True:
try:
# Request playlist items, specifying needed fields
results = sp_conn.playlist_items(
playlist_id,
offset=offset,
# Request specific fields to minimize data transfer
fields="items(track(id, name, artists(name), album(name, images)))",
additional_types=["track"],
)
if not results: # Check if results are None or empty
print(" No results returned from Spotify API.")
break
items = results.get("items", [])
if not items: # End of playlist
print(" No more items found in playlist.")
break
# Process each item in the current batch
for item in items:
track_data = item.get("track")
# Ensure item is a track and has data
if track_data and isinstance(track_data, dict):
# Extract metadata safely using .get()
artist_list = track_data.get("artists")
album_data = track_data.get("album")
images = album_data.get("images") if isinstance(album_data, dict) else None
track_info = TrackInfo(
spotify_id=track_data.get("id"),
title=track_data.get("name"),
artist=artist_list[0]["name"] if artist_list else None,
album=album_data["name"] if isinstance(album_data, dict) else None,
album_art_url=images[0]["url"] if images else None,
)
# Only add tracks that have enough info for YouTube search
if track_info.is_valid_for_youtube_search:
tracks.append(track_info)
else:
print(f" Skipping track due to missing title/artist: {track_data.get('name')}")
# Move to the next batch
offset += len(items)
print(f" Fetched {len(tracks)} tracks so far...")
# Add a small delay to avoid hitting rate limits aggressively
# time.sleep(0.1)
except Exception as e:
print(f"Error fetching playlist items (offset {offset}): {e}")
# Stop fetching on error
break
print(f"Finished fetching. Total valid tracks found: {len(tracks)}")
return tracks
def search_youtube(track: TrackInfo) -> Optional[str]:
"""
Searches YouTube for a given track using its metadata.
Args:
track: The TrackInfo object containing track metadata.
Returns:
The URL of the best matching YouTube video, or None if not found/error.
"""
if not track.is_valid_for_youtube_search:
print(f"Skipping YouTube search for track '{track.title or track.filename}' - insufficient metadata.")
return None
query = track.search_query_youtube
print(f"Searching YouTube for: '{query}'")
try:
# Perform the search, limiting to 1 result
search = VideosSearch(query, limit=1)
results_dict = search.result() # Get results as dictionary
# Check the structure of the response carefully
if results_dict and isinstance(results_dict, dict) and "result" in results_dict:
result_list = results_dict["result"]
if result_list and isinstance(result_list, list) and len(result_list) > 0:
# Get the link from the first result item
video_url = result_list[0].get("link")
if video_url and isinstance(video_url, str):
print(f" Found YouTube URL: {video_url}")
return video_url
else:
print(f" Found result item, but missing 'link': {result_list[0]}")
# If any check fails or no results found
print(f" No valid YouTube results found for '{query}'.")
return None
except Exception as e:
# Catch potential exceptions during the search process
print(f"Error searching YouTube for '{query}': {e}")
return None
def sanitize_filename(name: str) -> str:
"""
Removes characters from a string that are typically invalid in filenames
across different operating systems and cleans common YouTube title additions.
Args:
name: The input string (potential filename part, likely from YouTube title).
Returns:
A sanitized string suitable for use in filenames.
"""
# 1. Remove common YouTube additions (case-insensitive)
# Patterns like (Official Music Video), [Lyrics], | Artist Name etc.
# also remove any special characters such as |, #, *, etc.
name = re.sub(r'\s*\(.*Official Video.*?\)\s*', '', name, flags=re.IGNORECASE)
name = re.sub(r'\s*\(.*Music Video.*?\)\s*', '', name, flags=re.IGNORECASE)
name = re.sub(r'\s*\(.*Lyric Video.*?\)\s*', '', name, flags=re.IGNORECASE)
name = re.sub(r'\s*\(.*Audio.*?\)\s*', '', name, flags=re.IGNORECASE)
name = re.sub(r'\s*\[.*?\]\s*', '', name) # Remove content in square brackets
name = re.sub(r'\s*\|.*$', '', name) # Remove pipe and everything after
name = re.sub(r'\s*//.*$', '', name) # Remove double slash and everything after
name = re.sub(r'\s*#.*$', '', name) # Remove hash and everything after
name = re.sub(r'\s*\*.*$', '', name) # Remove asterisk and everything after
# 2. Remove characters invalid in filenames
name = re.sub(r'[<>:"/\\|?*\n\t]', "", name)
# 3. Clean up whitespace: replace multiple spaces/tabs with single space, strip ends
name = re.sub(r"\s+", " ", name).strip()
# 4. Optional: Consolidate multiple hyphens or dashes if needed
# name = re.sub(r'-+', '-', name).strip('-')
# Optional: Limit filename length if needed
# max_len = 100
# name = name[:max_len]
return name
def add_metadata(mp3_path: str, track: TrackInfo):
"""
Adds ID3 metadata (title, artist, album) and album art to a downloaded MP3 file.
Uses the `mutagen` library. Note: `yt-dlp` can often handle this embedding
natively, making this function potentially redundant but useful for fine-tuning.
Args:
mp3_path: The path to the MP3 file.
track: The TrackInfo object containing the metadata to add.
"""
target_path = pathlib.Path(mp3_path)
if not target_path.is_file():
print(f" Error adding metadata: File not found at {mp3_path}")
return
print(f" Attempting to add metadata to {target_path.name}...")
try:
# --- Add Basic Tags (Title, Artist, Album) ---
try:
# Load existing tags or create new ones
audio = EasyID3(mp3_path)
except ID3NoHeaderError:
# If no ID3 header exists, create one by saving empty ID3 tags
print(f" No ID3 header found, creating one for {target_path.name}.")
audio_id3_create = ID3()
audio_id3_create.save(mp3_path)
audio = EasyID3(mp3_path) # Reload as EasyID3
# Assign metadata if available in the TrackInfo object
if track.title:
audio["title"] = track.title
if track.artist:
audio["artist"] = track.artist
if track.album:
audio["album"] = track.album
audio.save() # Save the basic tags
print(f" Added basic metadata (Title/Artist/Album).")
# --- Add Album Art ---
if track.album_art_url:
print(f" Attempting to download and embed album art from {track.album_art_url[:50]}...")
try:
# Download the album art image
response = requests.get(track.album_art_url, stream=True, timeout=15) # Increased timeout
response.raise_for_status() # Check for HTTP errors
image_data = response.content
content_type = response.headers.get('content-type', 'image/jpeg').lower() # Get MIME type
# Determine MIME type for APIC frame
if 'image/jpeg' in content_type or 'image/jpg' in content_type:
mime = 'image/jpeg'
elif 'image/png' in content_type:
mime = 'image/png'
else:
print(f" Warning: Unsupported image type '{content_type}', skipping album art.")
return # Skip adding art if type unknown
# Load the file with mutagen.id3.ID3 to add complex tags like APIC
audio_id3_art = ID3(mp3_path)
# Remove existing APIC frames before adding new one
audio_id3_art.delall('APIC')
audio_id3_art.add(
APIC(
encoding=3, # 3: UTF-8
mime=mime,
type=3, # 3: Cover (front)
desc='Cover',
data=image_data,
)
)
# Save changes using ID3.save, forcing ID3v2.3 for compatibility
audio_id3_art.save(v2_version=3)
print(f" Successfully added album art.")
except requests.exceptions.RequestException as req_e:
print(f" Failed to download album art: {req_e}")
except Exception as art_e:
# Catch other potential errors during tag manipulation
print(f" Failed to embed album art: {art_e}")
else:
print(" No album art URL available.")
except Exception as meta_e:
# Catch general errors during metadata processing
print(f" Error adding metadata to {target_path.name}: {meta_e}")
def download_track_from_youtube(track: TrackInfo, output_dir: pathlib.Path) -> bool:
"""
Downloads audio from a YouTube URL using yt-dlp, converts it to MP3,
and attempts to embed metadata and album art.
Args:
track: The TrackInfo object containing the YouTube URL and metadata.
output_dir: The directory where the downloaded MP3 should be saved.
Returns:
True if the download and conversion were successful, False otherwise.
"""
if not track.youtube_url or not track.artist or not track.title:
print(f"Skipping download for track '{track.title or track.filename}' - missing YouTube URL, artist, or title.")
return False
# Create a sanitized filename based on artist and title
filename_base = sanitize_filename(f"{track.artist} - {track.title}")
# Define the output template for yt-dlp (includes path and desired extension)
output_template = output_dir / f"{filename_base}.%(ext)s"
# Define the final expected MP3 path
mp3_path = output_dir / f"{filename_base}.mp3"
# --- Check if file already exists ---
if mp3_path.exists():
print(f" Skipping download, file already exists: {mp3_path.name}")
# Optional: You could add logic here to check if metadata is missing
# and call add_metadata() if needed, even if the file exists.
return True # Consider existing file a success
# --- Configure yt-dlp options ---
ydl_opts = {
"format": "bestaudio/best", # Prefer best audio quality
"outtmpl": str(output_template), # Output path and filename template
"noplaylist": True, # Ensure only single video is downloaded
"quiet": True, # Suppress yt-dlp console output
"noprogress": True, # Suppress progress bar
"postprocessors": [
{
"key": "FFmpegExtractAudio", # Use FFmpeg to extract audio
"preferredcodec": "mp3", # Convert to MP3
"preferredquality": "192", # Set MP3 quality (e.g., 192kbps)
},
# Add metadata using FFmpeg during post-processing
{'key': 'FFmpegMetadata', 'add_metadata': True},
# Embed thumbnail using FFmpeg (requires thumbnail download)
{'key': 'EmbedThumbnail', 'already_have_thumbnail': False},
],
"writethumbnail": False, # Tell yt-dlp to download the thumbnail
"addmetadata": True, # Tell yt-dlp to add metadata if possible (might be redundant with FFmpegMetadata)
# Using 'metadatafromtitle' might be unreliable, prefer specific metadata args if possible
# 'metadatafromtitle': '%(artist)s - %(title)s',
# 'postprocessor_args': { # This method of passing args might be less reliable than FFmpegMetadata PP
# 'ffmpeg': ['-metadata', f'title={track.title}',
# '-metadata', f'artist={track.artist}',
# '-metadata', f'album={track.album or "Unknown Album"}']
# },
'embedthumbnail': False, # Tell FFmpeg postprocessor to embed downloaded thumbnail
'ignoreerrors': True, # Continue if a specific download fails
'retries': MAX_RETRIES, # Retry downloads on transient errors
# 'fragment_retries': MAX_RETRIES, # Also retry fragments if applicable
}
print(f"Downloading: {track.artist} - {track.title} from {track.youtube_url}")
try:
# Instantiate YoutubeDL - ignore potential type checker confusion
# The 'operator' ignore code is common for this specific issue with yt-dlp
ydl = YoutubeDL(ydl_opts) # type: ignore[operator]
# Start the download process for the given URL
error_code = ydl.download([track.youtube_url])
# Check results
if error_code == 0 and mp3_path.exists():
print(f" Successfully downloaded and converted: {mp3_path.name}")
# Optionally call our custom add_metadata for more control,
# though yt-dlp with FFmpegMetadata/EmbedThumbnail should handle it.
# add_metadata(str(mp3_path), track)
return True
elif error_code != 0:
print(f" yt-dlp reported an error (code {error_code}) for '{track.title}'.")
return False
else: # error_code == 0 but file doesn't exist
print(f" Download seemed to finish, but expected MP3 file not found: {mp3_path.name}")
# This might indicate an issue during the FFmpeg conversion stage.
return False
except Exception as e:
# Catch any unexpected errors during the download process
print(f" Unhandled error downloading '{track.title}': {e}")
# Optionally clean up partial files here if needed
# e.g., list(output_dir.glob(f"{filename_base}.*")) and remove them
return False
def rename_hook(d: Dict[str, Any]) -> None:
"""
yt-dlp hook called after download and postprocessing.
Renames the final MP3 file to 'Index - Artist - Title.mp3' format
and corrects the ID3 Title and Artist tags.
Ensures rename/tagging happens only once per file.
Args:
d: Dictionary passed by yt-dlp containing download status and info.
"""
# --- 1. Check status and that we have an MP3 file ---
if d['status'] != 'finished':
return # Only run on finished status
current_filepath_str = d.get('filename') or d.get('info_dict', {}).get('filepath')
if not current_filepath_str:
# print(" Rename Hook: Could not determine current filepath.")
return
current_filepath = pathlib.Path(current_filepath_str)
# Only proceed if the file extension is .mp3 (meaning conversion is done)
if current_filepath.suffix.lower() != '.mp3':
# print(f" Rename Hook: Skipping non-MP3 file: {current_filepath.name}")
return
# --- 2. Extract Info and Parse Artist/Title ---
info_dict = d.get('info_dict', {})
if not info_dict:
# print(" Rename Hook: Missing info_dict.")
return
original_title = info_dict.get('title', current_filepath.stem)
playlist_index_str = str(info_dict.get('playlist_index', '00')).zfill(2)
sanitized_title_string = sanitize_filename(original_title)
parsed_artist, parsed_title = parse_artist_title_from_string(sanitized_title_string)
# --- 3. Construct the CORRECT Target Filename ---
if parsed_artist and parsed_title:
base_filename = f"{playlist_index_str} - {parsed_artist} - {parsed_title}"
elif parsed_title:
base_filename = f"{playlist_index_str} - {parsed_title}"
else:
print(f" Rename Hook: Could not determine title for {current_filepath.name}. Skipping.")
return
target_filename = f"{base_filename}.mp3"
target_filepath = current_filepath.parent / target_filename
# --- 4. *** CRITICAL CHECK ***: Has this file already been processed? ---
# If the target file path is different from current and target already exists,
# assume a previous hook call completed the job.
if target_filepath != current_filepath and target_filepath.exists():
print(f" Rename Hook: Target file {target_filename} already exists. Skipping.")
return
# If the current file path *is* the target path, but we might still need to fix tags.
# However, if the target path exists check above passed, we don't need to do anything.
# If current == target and target doesn't exist (shouldn't happen), we proceed.
# --- 5. Ensure Source File Exists Before Acting ---
# This check prevents errors if the hook is called very late after a successful rename.
if not current_filepath.exists():
# print(f" Rename Hook: Source file {current_filepath.name} not found.")
return
# --- 6. Rename File (if necessary) ---
final_filepath = current_filepath # Path to use for tag fixing
if current_filepath != target_filepath:
try:
print(f" Renaming: '{current_filepath.name}' -> '{target_filename}'")
current_filepath.rename(target_filepath)
final_filepath = target_filepath # Use the new path for tag fixing
except OSError as e:
print(f" Error renaming file {current_filepath.name} to {target_filename}: {e}")
# If rename fails, stop processing this file in this hook call
return
except Exception as e:
print(f" Unexpected error during rename: {e}")
return
# else: Filename is already correct, proceed to tag fixing
# --- 7. Correct ID3 Tags (Only runs ONCE after potential rename) ---
if not final_filepath.exists():
print(f" ID3 Tag Fix: Final file path {final_filepath.name} not found. Cannot fix tags.")
return
print(f" Fixing ID3 tags for: {final_filepath.name}")
try:
audiofile = eyed3.load(final_filepath)
if audiofile is None:
print(f" Error: Could not load MP3 file {final_filepath.name} with eyed3.")
return
if audiofile.tag is None:
print(" Initializing new ID3 tag.")
audiofile.initTag()
if audiofile.tag is not None:
# Overwrite Title and Artist
if parsed_title:
# print(f" Setting Title: '{parsed_title}'")
audiofile.tag.title = parsed_title
if parsed_artist:
# print(f" Setting Artist: '{parsed_artist}'")
audiofile.tag.artist = parsed_artist
else:
# print(" Clearing Artist tag")
audiofile.tag.artist = None
# --- Set Comment to YouTube URL using CommentFrame ---
youtube_url = info_dict.get('webpage_url') # Get the original video URL
print(f" Debug: Full YouTube URL received: {repr(youtube_url)}") # Print the full URL for verification
if youtube_url:
# Manually create a CommentFrame with only text initially
comment_frame = CommentFrame(description=u'', text=youtube_url)
# Set encoding and language afterwards
comment_frame.encoding = 3 # 3: UTF-8
comment_frame.lang = b'XXX' # XXX: Undefined language
# Use comments.set() with the configured frame object in a list
audiofile.tag.comments.set([comment_frame])
print(f" Set Comment tag to: {youtube_url}")
else:
# Clear comments if URL wasn't found
audiofile.tag.comments.set([])
print(" Cleared Comment tag (YouTube URL not found).")
# Save the corrected tags
audiofile.tag.save(version=id3.ID3_V2_3, encoding='utf-8')
print(f" Successfully updated ID3 tags for {final_filepath.name}.")
else:
print(" Error: Tag object could not be initialized or accessed after initTag.")
except Exception as e:
print(f" Error fixing ID3 tags for {final_filepath.name}: {e}")
def parse_artist_title_from_string(text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Attempts to parse Artist and Title from a string, assuming "Artist - Title" format.
Args:
text: The string to parse (ideally already sanitized).
Returns:
A tuple (artist, title). Returns (None, text) if parsing fails.
"""
parts = text.split(" - ", 1) # Split only on the first occurrence
if len(parts) == 2:
artist = parts[0].strip()
title = parts[1].strip()
# Basic check: avoid empty strings after stripping
if artist and title:
return artist, title
# If split fails or results in empty parts, return the original text as title
return None, text.strip()
def run_sync_to_spotify(args: Namespace, connection: SpotifyConnection) -> int:
"""
Implements the 'to-spotify' command. Scans local MP3s, finds matches
on Spotify, and adds them to a specified playlist.
Args:
args: Parsed command-line arguments specific to 'to-spotify'.
connection: The authenticated SpotifyConnection object.
Returns:
0 on success, 1 on failure.
"""
print("\nStarting sync: Local MP3s -> Spotify Playlist")
n_mp3 = 0
tracks_found_on_spotify: List[TrackInfo] = []
# 1. Scan local directory
for track in mp3_walk_directory(args.directory):
n_mp3 += 1
if not track.is_valid_for_spotify_search:
print(f" Skipping {track.filename or 'Unknown File'} - insufficient info for search.")
continue
# 2. Search Spotify for each valid local track
try:
query = track.search_query_spotify
print(f"Searching Spotify for: '{query}'")
results = connection.connection.search(q=query, type="track", limit=1)
# Process search results
if (
results
and isinstance(results, dict)
and "tracks" in results
and isinstance(results["tracks"], dict)
and "items" in results["tracks"]
and results["tracks"]["items"] # Check if list is not empty
):
spotify_track = results["tracks"]["items"][0]
# Extract necessary info safely
track.url = spotify_track.get("external_urls", {}).get("spotify")
track.spotify_id = spotify_track.get("id")
if track.url and track.spotify_id:
print(f" Found Spotify match: {track.url}")
tracks_found_on_spotify.append(track)
else:
print(" Found track, but missing URL or ID in response.")
else:
print(" Not found on Spotify.")
except Exception as e:
print(f" Error searching Spotify for track '{track.title or track.filename}': {e}")
continue # Continue with the next track
# 3. Get or Create the target Spotify Playlist
print(f"\nChecking/Creating Spotify playlist: '{args.playlist}'")
playlist = spotify_check_playlist(connection, playlistname=args.playlist)
if not playlist:
playlist_id = spotify_create_playlist(connection, args.playlist)
if not playlist_id:
# spotify_create_playlist prints errors, just exit
return 1
playlist = {"id": playlist_id, "name": args.playlist} # Simulate playlist dict
if not playlist or "id" not in playlist:
print("Fatal: Failed to get or create playlist ID.")
return 1
playlistid = playlist["id"]
print(f"Using playlist ID: {playlistid}")
# 4. Add found tracks to the playlist
tracks_to_add_urls = [t.url for t in tracks_found_on_spotify if t.has_spotify_url]
if not tracks_to_add_urls:
print("\nNo valid Spotify tracks found from local MP3s to add to the playlist.")
print(f"Sync finished. MP3s scanned: {n_mp3}")
return 0 # Not an error if no tracks were found/added
print(f"\nAdding {len(tracks_to_add_urls)} tracks to playlist '{args.playlist}'...")
added_count = 0
# Process in chunks to avoid hitting API limits
for chunk_urls in list_chunks(tracks_to_add_urls, CHUNK_SIZE):
try:
connection.connection.playlist_add_items(playlistid, chunk_urls)
print(f" Added chunk of {len(chunk_urls)} tracks.")
added_count += len(chunk_urls)
except Exception as e:
print(f" Error adding tracks chunk to playlist: {e}")
# Optionally implement retries here or decide to stop/continue
# continue
print(
f"\nSync finished. MP3s scanned: {n_mp3} | "
f"Tracks added to Spotify: {added_count}"
)
return 0 # Success
def run_sync_from_spotify(args: Namespace, connection: SpotifyConnection) -> int:
"""
Implements the 'from-spotify' command. Fetches tracks from a Spotify playlist,
searches YouTube, downloads audio, converts to MP3, and adds metadata.
Uses parallel workers for searching and downloading.
Args:
args: Parsed command-line arguments including 'num_cores'.
connection: The authenticated SpotifyConnection object.
Returns:
0 on success, 1 on failure.
"""
print("\nStarting sync: Spotify Playlist -> Local MP3s")
# Determine number of workers
# If num_cores is 0, ThreadPoolExecutor uses default (usually CPU cores)