#!/usr/bin/perl use v5.32; use App::MPD::Feeder::Options; use Getopt::Long (); use Log::Any qw($log); use Log::Any::Adapter Stderr => log_level => 'error'; use Object::Pad; use Syntax::Keyword::Try; class Feeder { has $cfg_file :reader; has $opt :reader; has $db; has $db_generation; has $db_needs_update :writer = 1; has $mpd :reader; use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; use DBD::Pg; use DBI; use Log::Any qw($log); use IO::Async::Signal; use Net::Async::MPD; ADJUST { Getopt::Long::Configure('pass_through'); Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); Getopt::Long::Configure('no_pass_through'); $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; $self->configure; $db_needs_update = 0 if $opt->skip_db_update; } method configure { my $new_opt = App::MPD::Feeder::Options->new; $new_opt->parse_config_file($cfg_file) if $cfg_file; $new_opt->parse_command_line; Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); $opt = $new_opt; } method connect_mpd { return if $mpd; my %conn = ( auto_connect => 1 ); $conn{host} = $opt->mpd_host if $opt->mpd_host; $conn{port} = $opt->mpd_port if $opt->mpd_port; $mpd = Net::Async::MPD->new(%conn); $mpd->loop->add( IO::Async::Signal->new( name => 'HUP', on_receipt => sub { $log->debug("SIGHUP received. Stopping loop"); $mpd->loop->stop('reload'); }, ) ); $mpd->loop->add( IO::Async::Signal->new( name => 'USR1', on_receipt => sub { $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); my $old = select \*STDERR; try { $opt->dump; } finally { select $old; } }, ) ); } method connect_db { return if $db; $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, $opt->db_user, $opt->db_password, { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); $log->info( "Connected to database " . $opt->db_path ); $db_generation = $self->db_get_option('generation'); $log->debug("DB generation is $db_generation"); $self->update_db; } method db_get_option($name) { my $sth = $db->prepare_cached("select $name from options"); $sth->execute; my @result = $sth->fetchrow_array; $sth->finish; undef $sth; return $result[0]; } method db_set_option( $name, $value ) { my $sth = $db->prepare_cached("update options set $name = ?"); $sth->execute($value); } method db_store_song($song, $artist, $album) { return unless length($song) and length($artist) and length($album); $db->prepare_cached( <<'SQL')->execute( $song, $artist, $album, $db_generation ); INSERT INTO songs(path, artist, album, generation) VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT songs_pkey DO UPDATE SET artist = $2 , album = $3 , generation = $4 SQL $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation ); INSERT INTO albums(artist, album, generation) VALUES($1, $2, $3) ON CONFLICT ON CONSTRAINT albums_pkey DO UPDATE SET generation = $3 SQL $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation ); INSERT INTO artists(artist, generation) VALUES($1, $2) ON CONFLICT ON CONSTRAINT artists_pkey DO UPDATE SET generation = $2 SQL } method db_remove_stale_entries { my $sth = $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); $sth = $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); $sth->execute($db_generation); $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); } method db_note_song_qeued($item) { $db->prepare_cached( 'UPDATE songs SET last_queued=current_timestamp WHERE path=?') ->execute( $item->{song} ); $db->prepare_cached( 'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?') ->execute( $item->{artist} ); $db->prepare_cached( 'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?' )->execute( $item->{artist}, $item->{album} ); } method update_db($force = undef) { if (!$db_needs_update and !$force) { $log->debug("Skipping DB update"); return; } $log->info('Updating song database'); $self->connect_mpd; $self->connect_db; my $rows = $mpd->send('listallinfo')->get; try { $db->begin_work; $db_generation++; my $song_count; foreach my $entry (@$rows) { next unless exists $entry->{file}; $self->db_store_song( $entry->{file}, $entry->{AlbumArtist} // $entry->{Artist}, $entry->{Album} ); $song_count++; } $log->info("Updated data about $song_count songs"); $self->db_remove_stale_entries; $self->db_set_option( generation => $db_generation ); $db->commit; $db_needs_update = 0; } catch { my $err = $@; $db_generation--; $db->rollback; die $err; } } method db_find_suitable_songs($num) { $self->connect_db; $self->update_db; my @result; my $sql = <min_song_interval, $opt->min_artist_interval, $opt->min_album_interval, $num, ); my $sth = $db->prepare_cached($sql); $sth->execute(@params); while ( my @row = $sth->fetchrow_array ) { push @result, { song => $row[0], artist => $row[1], album => $row[2] }; } undef $sth; if (scalar(@result) == $num and $log->is_debug) { $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m; $sql =~ s/^ORDER BY .+$//m; $sql =~ s/^LIMIT .+$//m; $log->debug($sql); my $sth = $db->prepare_cached($sql); pop @params; $sth->execute(@params); my $count = ($sth->fetchrow_array)[0]; $sth->finish; $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs'); $sth->execute; my $total = ($sth->fetchrow_array)[0]; $log->debug( sprintf( "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)", $count, $total, 100.0 * $count / $total ) ); $sth->finish; $sql = <prepare_cached($sql); $sth->execute($opt->min_song_interval); $count = ($sth->fetchrow_array)[0]; $total = ($sth->fetchrow_array)[0]; $sth->finish; $log->debug( sprintf( "Number of songs not queued soon: %d out of total %d (%5.2f%%)", $count, $total, 100.0 * $count / $total ) ); $sth->finish; $sql = <prepare_cached($sql); $sth->execute($opt->min_artist_interval); $count = ($sth->fetchrow_array)[0]; $total = ($sth->fetchrow_array)[0]; $log->debug( sprintf( "Number of artists not queued soon: %d out of total %d (%5.2f%%)", $count, $total, 100.0 * $count / $total ) ); $sth->finish; $sql = <prepare_cached($sql); $sth->execute($opt->min_album_interval); $count = ($sth->fetchrow_array)[0]; $total = ($sth->fetchrow_array)[0]; $log->debug( sprintf( "Number of albums not queued soon: %d out of total %d (%5.2f%%)", $count, $total, 100.0 * $count / $total ) ); $sth->finish; undef $sth; } return @result; } method db_add_unwanted_artist($artist) { $self->connect_db; try { $db->do( <<'SQL', INSERT INTO unwanted_artists(artist, generation) VALUES($1, $2) SQL undef, $artist, $db_generation ); return 1; } catch { my $err = $@; $log->debug("PostgreSQL error: $err"); $log->debug( "SQLSTATE = " . $db->state ); return 0 if $db->state eq '23505'; die $err; } } method db_del_unwanted_artist($artist) { $self->connect_db; return 1 == $db->do( <<'SQL', DELETE FROM unwanted_artists WHERE artist = $1 SQL undef, $artist ); } method queue_songs($num = undef, $callback = undef) { if (!defined $num) { $self->connect_mpd; $mpd->send('playlist')->on_done( sub { my $present = scalar @{ $_[0] }; $log->notice( "Playlist contains $present songs. Wanted: " . $opt->target_queue_length ); if ( $present < $opt->target_queue_length ) { $self->queue_songs( $opt->target_queue_length - $present, $callback ); } else { $callback->() if $callback; } } ); return; } my @list = $self->db_find_suitable_songs($num); die "Found no suitable songs" unless @list; if ( @list < $num ) { $log->warn( sprintf( 'Found only %d suitable songs instead of %d', scalar(@list), $num ) ); } $log->info("About to add $num songs to the playlist"); my @paths; for my $song (@list) { my $path = $song->{song}; $path =~ s/"/\\"/g; push @paths, $path; } $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); my @commands; for (@paths) { push @commands, [ add => "\"$_\"" ]; } $self->connect_mpd; my $f = $mpd->send( \@commands ); $f->on_fail( sub { die @_ } ); $f->on_done( sub { $self->db_note_song_qeued($_) for @list; $callback->(@_) if $callback; } ); } method prepare_to_wait_idle { $log->trace('declaring idle mode'); $mpd->send('idle database playlist')->on_done( sub { my $result = shift; if ( $result->{changed} eq 'database' ) { $db_needs_update = 1; $self->prepare_to_wait_idle; } elsif ( $result->{changed} eq 'playlist' ) { $self->queue_songs( undef, sub { $self->prepare_to_wait_idle } ); } else { use JSON; $log->warn( "Unknown result from idle: " . to_json($result) ); $self->prepare_to_wait_idle; } } ); } method run { $mpd->on( close => sub { die "Connection to MPD lost"; } ); $self->prepare_to_wait_idle; } method stop { undef $mpd; if ($db) { if ($db->{ActiveKids}) { $log->warn("$db->{ActiveKids} active DB statements"); for my $st ( @{ $db->{ChildHandles} } ) { next unless $st->{Active}; while(my($k,$v) = each %$st) { $log->debug("$k = ".($v//'')); } } } $db->disconnect; undef $db; } } } my $feeder = Feeder->new(); if (@ARGV) { my $cmd = shift @ARGV; if ($cmd eq 'dump-config') { die "dump-config command accepts no arguments\n" if @ARGV; $feeder->opt->dump; exit; } if ( $cmd eq 'add-unwanted-artist' ) { die "Missing command arguments\n" unless @ARGV; $feeder->set_db_needs_update(0); for my $artist (@ARGV) { if ( $feeder->db_add_unwanted_artist($artist) ) { $log->info("Artist '$artist' added to the unwanted list\n"); } else { $log->warn("Artist '$artist' already in the unwanted list\n"); } } exit; } if ( $cmd eq 'del-unwanted-artist' ) { die "Missing command arguments\n" unless @ARGV; $feeder->set_db_needs_update(0); for my $artist (@ARGV) { if ( $feeder->db_del_unwanted_artist($artist) ) { $log->info("Artist '$artist' deleted from the unwanted list\n"); } else { $log->warn("Artist '$artist' is not in the unwanted list\n"); } } exit; } if ( $cmd eq 'add-unwanted-album' ) { die "NOT IMPLEMENTED\n"; } if ( $cmd eq 'one-shot' ) { die "one-shot command accepts no arguments\n" if @ARGV; $feeder->queue_songs(undef, sub { exit }); $feeder->mpd->loop->run; } elsif ( $cmd eq 'single' ) { die "single command accepts no arguments\n" if @ARGV; $feeder->queue_songs(1, sub { exit }); $feeder->mpd->loop->run; } else { die "Unknown command '$cmd'"; } } $feeder->connect_db; for ( ;; ) { $feeder->queue_songs( undef, sub { $feeder->run } ); $log->debug("Entering event loop. PID=$$"); my $result = $feeder->mpd->loop->run; $log->trace( "Got loop result of " . ( $result // 'undef' ) ); if ('reload' eq $result) { $log->notice("disconnecting"); $feeder->stop; exec( "$0", '--config', $feeder->cfg_file, '--skip-db-update' ); } }