X-Git-Url: https://git.ktnx.net/?a=blobdiff_plain;f=lib%2FApp%2FMPD%2FFeeder.pm;h=36bf29d0fd3c643be97397741ca672e3c6dc3455;hb=f5de4154d0110183daa442faefa4c746a14a7023;hp=dfb83c7b200e478dddf4405894d4c2f0477b4299;hpb=9cdc96f44f4de5d4044d4d66b710444f2878b3b5;p=mpd-feeder.git diff --git a/lib/App/MPD/Feeder.pm b/lib/App/MPD/Feeder.pm index dfb83c7..36bf29d 100644 --- a/lib/App/MPD/Feeder.pm +++ b/lib/App/MPD/Feeder.pm @@ -1,534 +1,426 @@ -package App::MPD::Feeder; - -use strict; -use warnings; +use v5.28; use utf8; +use Object::Pad; +class App::MPD::Feeder; +use App::MPD::Feeder::DB; use App::MPD::Feeder::Options; +use App::MPD::Feeder::WorkQueue; use DBD::Pg; use DBI; use Getopt::Long; use IO::Async::Signal; +use IO::Async::Timer::Countdown; +use IO::Async::Timer::Periodic; use Log::Any qw($log); use Net::Async::MPD; use Object::Pad; use Syntax::Keyword::Try; +use Time::Duration qw(duration_exact); + +has $cfg_file :reader; +has $opt :reader; +has $db :reader; +has $db_needs_update :writer = 1; +has $mpd :reader; +has $mpd_connected = 0; +has $playlist_needs_filling = 1; +has $quit_requested = 0; +has $reload_requested = 0; +has $idler; +has $last_mpd_comm; +has $reconnect_delay = 5; +use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; -class App::MPD::Feeder { - has $cfg_file :reader; - has $opt :reader; - has $db; - has $db_generation; - has $db_needs_update :writer = 1; - has $mpd :reader; +ADJUST { + Getopt::Long::Configure('pass_through'); + Getopt::Long::GetOptions( 'cfg|config=s' => \$cfg_file ); + Getopt::Long::Configure('no_pass_through'); -use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; + $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; - ADJUST { - Getopt::Long::Configure('pass_through'); - Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file); - Getopt::Long::Configure('no_pass_through'); + $self->configure; - $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE; + $db_needs_update = 0 if $opt->skip_db_update; +} - $self->configure; +method configure { + my $new_opt = App::MPD::Feeder::Options->new; - $db_needs_update = 0 if $opt->skip_db_update; - } + $new_opt->parse_config_file($cfg_file) if $cfg_file; - method configure { - my $new_opt = App::MPD::Feeder::Options->new; + $new_opt->parse_command_line; - $new_opt->parse_config_file($cfg_file) if $cfg_file; + Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); - $new_opt->parse_command_line; + $opt = $new_opt; - Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level ); + $reconnect_delay = $opt->initial_reconnect_delay; - $opt = $new_opt; - } + $db = App::MPD::Feeder::DB->new( opt => $opt ); +} - method connect_mpd { - return if $mpd; +method init_mpd { + return if $mpd; - my %conn = ( auto_connect => 1 ); - $conn{host} = $opt->mpd_host if $opt->mpd_host; - $conn{port} = $opt->mpd_port if $opt->mpd_port; + my %conn = ( auto_connect => 0 ); + $conn{host} = $opt->mpd_host if $opt->mpd_host; + $conn{port} = $opt->mpd_port if $opt->mpd_port; - $mpd = Net::Async::MPD->new(%conn); + $mpd = Net::Async::MPD->new(%conn); - $mpd->loop->add( - IO::Async::Signal->new( - name => 'TERM', - on_receipt => sub { - $log->debug("SIGTERM received. Stopping loop"); - $mpd->loop->stop('quit'); - }, - ) - ); + $mpd->on( + close => sub { + $log->warn("Connection to MPD lost"); + $mpd->loop->stop('disconnected'); + $mpd_connected = 0; + $idler->cancel if $idler; + undef $mpd; + $self->init_mpd; + } + ); + $mpd->on( + playlist => sub { + $playlist_needs_filling = 1; + } + ); + $mpd->on( + database => sub { + $db_needs_update = 1; + } + ); - $mpd->loop->add( - IO::Async::Signal->new( - name => 'INT', - on_receipt => sub { - $log->debug("SIGINT received. Stopping loop"); - $mpd->loop->stop('quit'); - }, - ) - ); + my $loop = $mpd->loop; - $mpd->loop->add( - IO::Async::Signal->new( - name => 'HUP', - on_receipt => sub { - $log->debug("SIGHUP received. Stopping loop"); - $mpd->loop->stop('reload'); - }, - ) - ); + my $int_signal_handler = sub { + state $signal_count = 0; + $signal_count++; - $mpd->loop->add( + if ( $signal_count > 1 ) { + $log->warn("Another signal received (#$signal_count)"); + $log->warn("Exiting abruptly"); + exit 2; + } + + $log->debug("Signal received. Stopping loop"); + $quit_requested = 1; + $loop->stop('quit'); + $self->break_idle; + }; + + for (qw(TERM INT)) { + $loop->add( IO::Async::Signal->new( - name => 'USR1', - on_receipt => sub { - $log->debug("SIGUSR1 received. Dumping configuration to STDERR"); - my $old = select \*STDERR; - try { - $opt->dump; - } - finally { - select $old; - } - }, + name => $_, + on_receipt => $int_signal_handler, ) ); } - method connect_db { - return if $db; - - $db = DBI->connect( "dbi:Pg:dbname=" . $opt->db_path, - $opt->db_user, $opt->db_password, - { RaiseError => 1, PrintError => 0, AutoCommit => 1 } ); - - $log->info( "Connected to database " . $opt->db_path ); - $db_generation = $self->db_get_option('generation'); - $log->debug("DB generation is $db_generation"); - $self->update_db; - } + $loop->add( + IO::Async::Signal->new( + name => 'HUP', + on_receipt => sub { + $log->debug("SIGHUP received. Scheduling reload"); + $reload_requested = 1; + $loop->stop('reload'); + $self->break_idle; + }, + ) + ); + + $loop->add( + IO::Async::Signal->new( + name => 'USR1', + on_receipt => sub { + $log->debug( + "SIGUSR1 received. Dumping configuration to STDERR"); + my $old = select \*STDERR; + try { + $opt->dump; + } + finally { + select $old; + } + }, + ) + ); +} - method db_get_option($name) { - my $sth = $db->prepare_cached("select $name from options"); - $sth->execute; - my @result = $sth->fetchrow_array; - $sth->finish; - undef $sth; +method connect_db { + $db->connect($opt); +} - return $result[0]; +method update_db( $force = undef ) { + if ( !$db_needs_update and !$force ) { + $log->debug("Skipping DB update"); + return; } - method db_set_option( $name, $value ) { - my $sth = $db->prepare_cached("update options set $name = ?"); - $sth->execute($value); - } + $log->info('Updating song database'); - method db_store_song($song, $artist, $album) { - return unless length($song) and length($artist) and length($album); - - $db->prepare_cached( - <<'SQL')->execute( $song, $artist, $album, $db_generation ); -INSERT INTO songs(path, artist, album, generation) -VALUES($1, $2, $3, $4) -ON CONFLICT ON CONSTRAINT songs_pkey DO -UPDATE SET artist = $2 - , album = $3 - , generation = $4 -SQL - $db->prepare_cached(<<'SQL')->execute( $artist, $album, $db_generation ); -INSERT INTO albums(artist, album, generation) -VALUES($1, $2, $3) -ON CONFLICT ON CONSTRAINT albums_pkey DO -UPDATE SET generation = $3 -SQL - $db->prepare_cached(<<'SQL')->execute( $artist, $db_generation ); -INSERT INTO artists(artist, generation) -VALUES($1, $2) -ON CONFLICT ON CONSTRAINT artists_pkey DO -UPDATE SET generation = $2 -SQL - } + my $rows = $mpd->send('listallinfo')->get; - method db_remove_stale_entries { - my $sth = - $db->prepare_cached('DELETE FROM songs WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale songs", $sth->rows ) ); + $log->trace('got all songs from MPD'); - $sth = $db->prepare_cached('DELETE FROM albums WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale albums", $sth->rows ) ); + $db->start_update; + try { + my $song_count; - $sth = - $db->prepare_cached('DELETE FROM artists WHERE generation <> ?'); - $sth->execute($db_generation); - $log->debug( sprintf( "Deleted %d stale artists", $sth->rows ) ); - } + foreach my $entry (@$rows) { + next unless exists $entry->{file}; - method db_note_song_qeued($item) { - $db->prepare_cached( - 'UPDATE songs SET last_queued=current_timestamp WHERE path=?') - ->execute( $item->{song} ); - $db->prepare_cached( - 'UPDATE artists SET last_queued=CURRENT_TIMESTAMP WHERE artist=?') - ->execute( $item->{artist} ); - $db->prepare_cached( - 'UPDATE albums SET last_queued=CURRENT_TIMESTAMP WHERE artist=? AND album=?' - )->execute( $item->{artist}, $item->{album} ); - } + $self->db->store_song( $entry->{file}, + $entry->{AlbumArtist} // $entry->{Artist}, + $entry->{Album} ); - method update_db($force = undef) { - if (!$db_needs_update and !$force) { - $log->debug("Skipping DB update"); - return; + $song_count++; } - $log->info('Updating song database'); - $self->connect_mpd; - $self->connect_db; + my ($total_songs, $total_artists, $total_albums, + $new_songs, $new_artists, $new_albums + ) = $self->db->finish_update; - my $rows = $mpd->send('listallinfo')->get; - try { - $db->begin_work; + $log->info( + "Updated data about $song_count songs (including $new_songs new), " + . "$total_artists artists (including $new_artists new) " - $db_generation++; + . "and $total_albums albums (including $new_albums new)" + ); - my $song_count; + $db_needs_update = 0; + } + catch { + my $err = $@; + $self->db->cancel_update; + die $err; + } +} - foreach my $entry (@$rows) { - next unless exists $entry->{file}; - $self->db_store_song( $entry->{file}, - $entry->{AlbumArtist} // $entry->{Artist}, - $entry->{Album} ); - $song_count++; - } +method queue_songs( $num = undef ) { + if ( !defined $num ) { + return unless $playlist_needs_filling; - $log->info("Updated data about $song_count songs"); + $log->trace("Requesting playlist"); + my $present = $mpd->send('playlist')->get // []; + $present = scalar(@$present); - $self->db_remove_stale_entries; + $log->notice( "Playlist contains $present songs. Wanted: " + . $opt->target_queue_length ); + if ( $present < $opt->target_queue_length ) { + $self->queue_songs( $opt->target_queue_length - $present ); + } + else { + $playlist_needs_filling = 0; + } - $self->db_set_option( generation => $db_generation ); + return; + } - $db->commit; + my @list = $self->db->find_suitable_songs($num); - $db_needs_update = 0; - } - catch { - my $err = $@; + die "Found no suitable songs" unless @list; - $db_generation--; + if ( @list < $num ) { + $log->warn( + sprintf( + 'Found only %d suitable songs instead of %d', + scalar(@list), $num + ) + ); + } - $db->rollback; + $log->info("About to add $num songs to the playlist"); - die $err; - } + my @paths; + for my $song (@list) { + my $path = $song->{song}; + $path =~ s/"/\\"/g; + push @paths, $path; } - method db_find_suitable_songs($num) { - $self->connect_db; - $self->update_db; + $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); - my @result; - my $sql = <min_song_interval, $opt->min_artist_interval, - $opt->min_album_interval, $num, - ); - my $sth = $db->prepare_cached($sql); - $sth->execute(@params); - while ( my @row = $sth->fetchrow_array ) { - push @result, - { song => $row[0], artist => $row[1], album => $row[2] }; - } - undef $sth; - - if (scalar(@result) == $num and $log->is_debug) { - $sql =~ s/^SELECT .+$/SELECT COUNT(DISTINCT s.path)/m; - $sql =~ s/^ORDER BY .+$//m; - $sql =~ s/^LIMIT .+$//m; - my $sth = $db->prepare_cached($sql); - pop @params; - $sth->execute(@params); - my $count = ($sth->fetchrow_array)[0]; - $sth->finish; - - $sth = $db->prepare_cached('SELECT COUNT(*) FROM songs'); - $sth->execute; - my $total = ($sth->fetchrow_array)[0]; - $sth->finish; - $log->debug( - sprintf( - "Number of songs meeting the criteria: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_song_interval); - $count = ($sth->fetchrow_array)[0]; - $sth->finish; - - $log->debug( - sprintf( - "Number of songs not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - $sth->finish; - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_artist_interval); - $count = ($sth->fetchrow_array)[0]; - $sth->finish; - - $sth = $db->prepare_cached('SELECT COUNT(*) FROM artists'); - $sth->execute; - $total = ($sth->fetchrow_array)[0]; - $sth->finish; - $log->debug( - sprintf( - "Number of artists not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - - $sql = <prepare_cached($sql); - $sth->execute($opt->min_album_interval); - $count = ($sth->fetchrow_array)[0]; - $sth->finish; - - $sth = $db->prepare_cached('SELECT COUNT(*) FROM albums'); - $sth->execute; - $total = ($sth->fetchrow_array)[0]; - $sth->finish; - $log->debug( - sprintf( - "Number of albums not queued soon: %d out of total %d (%5.2f%%)", - $count, $total, 100.0 * $count / $total - ) - ); - - undef $sth; + # MPD needs raw bytes + utf8::encode($_) for @paths; + my @commands; + for (@paths) { + push @commands, [ add => "\"$_\"" ]; + } + my $f = $mpd->send( \@commands ); + $f->on_fail( sub { die @_ } ); + $f->on_done( + sub { + $self->db->note_song_qeued($_) for @list; + $playlist_needs_filling = 0; } + ); + $f->get; +} - return @result; +method reexec { + $log->notice("disconnecting and re-starting"); + $db->disconnect; + undef $mpd; + + my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' ); + if ( $log->is_trace ) { + $log->trace( 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) ); } + exec(@exec); +} - method db_add_unwanted_artist($artist) { - $self->connect_db; - - try { - $db->do( - <<'SQL', -INSERT INTO unwanted_artists(artist, generation) -VALUES($1, $2) -SQL - undef, $artist, $db_generation - ); - return 1; +method break_idle { + if ( $idler && !$idler->is_ready ) { + if ($mpd_connected) { + $log->trace("hand-sending 'noidle'"); + undef $idler; + $mpd->{mpd_handle}->write("noidle\n"); } - catch { - my $err = $@; - - $log->debug("PostgreSQL error: $err"); - $log->debug( "SQLSTATE = " . $db->state ); - return 0 if $db->state eq '23505'; - - die $err; + else { + $log->trace("not connected to MPD: skipping 'noidle'"); } } - - method db_del_unwanted_artist($artist) { - $self->connect_db; - - return 1 == $db->do( - <<'SQL', -DELETE FROM unwanted_artists -WHERE artist = $1 -SQL - undef, $artist - ); + else { + $log->trace("no idler found"); } +} - method queue_songs($num = undef, $callback = undef) { - if (!defined $num) { - $self->connect_mpd; - $mpd->send('playlist')->on_done( - sub { - my $present = scalar @{ $_[0] }; - - $log->notice( "Playlist contains $present songs. Wanted: " - . $opt->target_queue_length ); - if ( $present < $opt->target_queue_length ) { - $self->queue_songs( - $opt->target_queue_length - $present, $callback ); - } - else { - $callback->() if $callback; - } - } - ); - - return; - } - - my @list = $self->db_find_suitable_songs($num); +method sleep_before_reconnection { + $log->debug( "Waiting for " + . duration_exact($reconnect_delay) + . " before re-connecting" ); + + $mpd->loop->add( + IO::Async::Timer::Countdown->new( + delay => $reconnect_delay, + on_expire => sub { $mpd->loop->stop }, + )->start + ); + + $reconnect_delay = $reconnect_delay * 1.5; + $reconnect_delay = 120 if $reconnect_delay > 120; + $mpd->loop->run; +} - die "Found no suitable songs" unless @list; +method pulse { + unless ($mpd_connected) { + $log->trace("Connecting to MPD..."); + my $f = $mpd->connect->await; - if ( @list < $num ) { - $log->warn( - sprintf( - 'Found only %d suitable songs instead of %d', - scalar(@list), $num - ) - ); + if ( $f->is_done ) { + $mpd_connected = 1; + $playlist_needs_filling = 1; + $reconnect_delay = $opt->initial_reconnect_delay; + $mpd->loop->later( sub { $self->pulse } ); } - - $log->info("About to add $num songs to the playlist"); - - my @paths; - for my $song (@list) { - my $path = $song->{song}; - $path =~ s/"/\\"/g; - push @paths, $path; + elsif ( $f->is_failed ) { + $mpd->loop->stop('disconnected'); + $log->warn($f->failure); + $self->sleep_before_reconnection; } - - $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) ); - my @commands; - for (@paths) { - push @commands, [ add => "\"$_\"" ]; + else { + die "connect Future neither done nor failed!?"; } - $self->connect_mpd; - my $f = $mpd->send( \@commands ); - $f->on_fail( sub { die @_ } ); - $f->on_done( - sub { - $self->db_note_song_qeued($_) for @list; - $callback->(@_) if $callback; - } - ); + + return; } - method prepare_to_wait_idle { - $log->trace('declaring idle mode'); - $mpd->send('idle database playlist')->on_done( - sub { - my $result = shift; + if ($db_needs_update) { + $self->update_db; + $mpd->loop->later( sub { $self->pulse } ); + return; + } - if ( $result->{changed} eq 'database' ) { - $db_needs_update = 1; - $self->prepare_to_wait_idle; - } - elsif ( $result->{changed} eq 'playlist' ) { - $self->queue_songs( undef, - sub { $self->prepare_to_wait_idle } ); - } - else { - use JSON; - $log->warn( - "Unknown result from idle: " . to_json($result) ); - $self->prepare_to_wait_idle; - } - } - ); + if ($playlist_needs_filling) { + $self->queue_songs; + $mpd->loop->later( sub { $self->pulse } ); + return; } - method run { - $mpd->on( - close => sub { - die "Connection to MPD lost"; - } - ); + $log->debug("Waiting idle. PID=$$"); + $last_mpd_comm = time; + $idler = $mpd->send("idle database playlist"); + $idler->await; - $self->prepare_to_wait_idle; - } + $log->trace('got out of idle'); - method stop { - undef $mpd; - - if ($db) { - if ($db->{ActiveKids}) { - $log->warn("$db->{ActiveKids} active DB statements"); - for my $st ( @{ $db->{ChildHandles} } ) { - next unless $st->{Active}; - while(my($k,$v) = each %$st) { - $log->debug("$k = ".($v//'')); - } - } - } + if ($idler) { + if ( $idler->is_done ) { + my $result = $idler->get; + undef $idler; + if ( ref $result and $result->{changed} ) { + my $changed = $result->{changed}; + $changed = [$changed] unless ref $changed; - $db->disconnect; - undef $db; + $mpd->emit($_) for @$changed; + } + } + elsif ( $idler->is_cancelled ) { + $log->trace("idle was cancelled"); + undef $idler; + } + elsif ( $idler->is_failed ) { + $log->warn("idle failed: ".$idler->failure); + undef $idler; } } - method run_loop { - $self->connect_db; - - for ( ;; ) { - $self->queue_songs( undef, sub { $self->run } ); + $mpd->loop->stop; +} - $log->debug("Entering event loop. PID=$$"); +method run_loop { + $self->connect_db; - my $result = $mpd->loop->run; - $log->trace( "Got loop result of " . ( $result // 'undef' ) ); + $self->init_mpd; - if ( 'reload' eq $result ) { - $log->notice("disconnecting"); - $self->stop; + my $loop = $mpd->loop; - my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' ); - if ( $log->is_trace ) { - $log->trace( 'exec ' - . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) ); + $loop->add( + IO::Async::Timer::Periodic->new( + interval => 60, + on_tick => sub { + if (!$mpd_connected) { + $log->trace("Not connected to MPD. Skipping alive check."); + $loop->stop('disconnected'); + return; } - exec(@exec); - } - if ( 'quit' eq $result ) { - $log->trace("quitting because of 'quit' loop result"); - $self->stop; - exit 0; - } + if ( time - $last_mpd_comm > 300 ) { + + $log->trace( + "no active MPD communication for more that 5 minutes"); + $log->debug("forcing alive check"); + $self->break_idle; + } + else { + $log->trace( + "contacted MPD less than 5 minutes ago. skipping alive check" + ); + } + }, + )->start + ); + + for ( ;; ) { + if ( $quit_requested ) { + $log->trace("about to quit"); + undef $mpd; + $db->disconnect; + last; + } + elsif ( $reload_requested ) { + $self->reexec; + die "Not reached"; } + + $log->trace('About to run the loop'); + + $mpd->loop->later( sub { $self->pulse } ); + + $mpd->loop->run; } } +1;