From 841c96b547cd0896e244747857af7556b2a98e38 Mon Sep 17 00:00:00 2001 From: Damyan Ivanov Date: Wed, 24 Nov 2021 06:39:36 +0000 Subject: [PATCH] rework event looping, allows for graceful (re-)connection to MPD the MPD may be down, not yet started (possibly on another host, so no way to avoid this via systemd service dependencies) --- lib/App/MPD/Feeder.pm | 183 ++++++++++++++++++++++------------ lib/App/MPD/Feeder/Options.pm | 13 +++ 2 files changed, 132 insertions(+), 64 deletions(-) diff --git a/lib/App/MPD/Feeder.pm b/lib/App/MPD/Feeder.pm index e5c2b70..942b954 100644 --- a/lib/App/MPD/Feeder.pm +++ b/lib/App/MPD/Feeder.pm @@ -21,9 +21,13 @@ has $opt :reader; has $db :reader; has $db_needs_update :writer = 1; has $mpd :reader; +has $mpd_connected = 0; +has $playlist_needs_filling = 1; +has $quit_requested = 0; +has $reload_requested = 0; has $idler; -has $work_queue = App::MPD::Feeder::WorkQueue->new; has $last_mpd_comm; +has $reconnect_delay = 5; use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf'; @@ -50,13 +54,15 @@ method configure { $opt = $new_opt; + $reconnect_delay = $opt->initial_reconnect_delay; + $db = App::MPD::Feeder::DB->new( opt => $opt ); } -method connect_mpd { +method init_mpd { return if $mpd; - my %conn = ( auto_connect => 1 ); + my %conn = ( auto_connect => 0 ); $conn{host} = $opt->mpd_host if $opt->mpd_host; $conn{port} = $opt->mpd_port if $opt->mpd_port; @@ -64,32 +70,36 @@ method connect_mpd { $mpd->on( close => sub { - die "Connection to MPD lost"; + $mpd->loop->stop('disconnected'); + $mpd_connected = 0; + $log->warn("Connection to MPD lost"); } ); $mpd->on( playlist => sub { - $work_queue->add('playlist'); + $playlist_needs_filling = 1; } ); $mpd->on( database => sub { - $work_queue->add('database'); + $db_needs_update = 1; } ); my $int_signal_handler = sub { state $signal_count = 0; $signal_count++; - $log->debug("Signal received. Stopping loop"); - $work_queue->add('quit'); - $self->break_idle; if ( $signal_count > 1 ) { $log->warn("Another signal received (#$signal_count)"); $log->warn("Exiting abruptly"); exit 2; } + + $log->debug("Signal received. Stopping loop"); + $quit_requested = 1; + $mpd->loop->stop('quit'); + $self->break_idle; }; for (qw(TERM INT)) { @@ -106,7 +116,8 @@ method connect_mpd { name => 'HUP', on_receipt => sub { $log->debug("SIGHUP received. Scheduling reload"); - $work_queue->add('reload'); + $reload_requested = 1; + $mpd->loop->stop('reload'); $self->break_idle; }, ) @@ -132,7 +143,6 @@ method connect_mpd { method connect_db { $db->connect($opt); - $self->update_db; } method update_db( $force = undef ) { @@ -142,7 +152,6 @@ method update_db( $force = undef ) { } $log->info('Updating song database'); - $self->connect_mpd; my $rows = $mpd->send('listallinfo')->get; @@ -183,9 +192,9 @@ method update_db( $force = undef ) { } method queue_songs( $num = undef ) { - $self->connect_db; if ( !defined $num ) { - $self->connect_mpd; + return unless $playlist_needs_filling; + $log->trace("Requesting playlist"); my $present = $mpd->send('playlist')->get // []; $present = scalar(@$present); @@ -195,6 +204,9 @@ method queue_songs( $num = undef ) { if ( $present < $opt->target_queue_length ) { $self->queue_songs( $opt->target_queue_length - $present ); } + else { + $playlist_needs_filling = 0; + } return; } @@ -229,52 +241,27 @@ method queue_songs( $num = undef ) { for (@paths) { push @commands, [ add => "\"$_\"" ]; } - $self->connect_mpd; my $f = $mpd->send( \@commands ); $f->on_fail( sub { die @_ } ); $f->on_done( sub { $self->db->note_song_qeued($_) for @list; + $playlist_needs_filling = 0; } ); $f->get; } -method stop { - undef $mpd; - +method reexec { + $log->notice("disconnecting and re-starting"); $db->disconnect; -} + undef $mpd; -method handle_work_queue { - while ( my $item = $work_queue->next ) { - if ( $item eq 'playlist' ) { - $self->queue_songs; - } - elsif ( $item eq 'database' ) { - $db_needs_update = 1; - $self->update_db; - } - elsif ( $item eq 'reload' ) { - $log->notice("disconnecting and re-starting"); - $self->stop; - - my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' ); - if ( $log->is_trace ) { - $log->trace( - 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) ); - } - exec(@exec); - } - elsif ( $item eq 'quit' ) { - $log->trace("quitting"); - $self->stop; - exit 0; - } - else { - die "Unknown work queue item '$item'"; - } + my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' ); + if ( $log->is_trace ) { + $log->trace( 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) ); } + exec(@exec); } method break_idle { @@ -288,19 +275,85 @@ method break_idle { } } +method sleep_before_reconnection { + $self->debug( "Waiting for " + . duration_exact($reconnect_delay) + . " before re-connecting" ); + + $mpd->loop->add( + IO::Async::Timer::Countdown->new( + delay => $reconnect_delay, + on_expire => sub { $mpd->loop->stop }, + )->start + ); + + $reconnect_delay = $reconnect_delay * 1.5; + $reconnect_delay = 120 if $reconnect_delay > 120; + $mpd->loop->run; +} + +method pulse { + unless ($mpd_connected) { + $mpd->connect->then( + sub { + $mpd_connected = 1; + $playlist_needs_filling = 1; + $reconnect_delay = $opt->initial_reconnect_delay; + } + )->get; + + $mpd->loop->later( sub { $self->pulse } ); + return; + } + + if ($db_needs_update) { + $self->update_db; + $mpd->loop->later( sub { $self->pulse } ); + return; + } + + if ($playlist_needs_filling) { + $self->queue_songs; + $mpd->loop->later( sub { $self->pulse } ); + return; + } + + $log->debug("Waiting idle. PID=$$"); + $last_mpd_comm = time; + $idler = $mpd->send("idle database playlist"); + my $result = $idler->get; + undef $idler; + + if ( ref $result and $result->{changed} ) { + my $changed = $result->{changed}; + $changed = [$changed] unless ref $changed; + + $mpd->emit($_) for @$changed; + } + + $log->trace('got out of idle'); + $mpd->loop->stop; +} + method run_loop { - $self->connect_mpd; $self->connect_db; + $self->init_mpd; + $mpd->loop->add( IO::Async::Timer::Periodic->new( interval => 60, on_tick => sub { + if (!$mpd_connected) { + $log->trace("Not connected to MPD. Skipping alive check."); + return; + } + if ( time - $last_mpd_comm > 300 ) { $log->trace( "no active MPD communication for more that 5 minutes"); - $log->trace("forcing alive check"); + $log->debug("forcing alive check"); $self->break_idle; } else { @@ -312,25 +365,27 @@ method run_loop { )->start ); - $self->queue_songs; - for ( ;; ) { - $log->debug("Waiting idle. PID=$$"); - $last_mpd_comm = time; - $idler = $mpd->send("idle database playlist"); - my $result = $idler->get; - undef $idler; + $mpd->loop->later( sub { $self->pulse } ); - if ( $result and $result->{changed} ) { - my $changed = $result->{changed}; - $changed = [$changed] unless ref $changed; - - $mpd->emit($_) for @$changed; - } + $log->trace('About to run the loop'); - $log->trace('got out of idle'); + $mpd->loop->run; - $self->handle_work_queue; + if ( $quit_requested ) { + $log->trace("about to quit"); + undef $mpd; + $db->disconnect; + last; + } + elsif ( $reload_requested ) { + $self->reexec; + die "Not reached"; + } + elsif ( !$mpd_connected ) { + $self->sleep_before_reconnection; + next; + } } } diff --git a/lib/App/MPD/Feeder/Options.pm b/lib/App/MPD/Feeder/Options.pm index eff87be..83f4222 100644 --- a/lib/App/MPD/Feeder/Options.pm +++ b/lib/App/MPD/Feeder/Options.pm @@ -18,6 +18,8 @@ has $min_album_interval :reader = parse_duration('5h'); has $min_song_interval :reader = parse_duration('13d'); has $min_artist_interval :reader = parse_duration('1h 15m'); has $skip_db_update :reader = 0; +has $initial_reconnect_delay :reader = parse_duration('3 seconds'); +has $max_reconnect_delay :reader = parse_duration('2 minutes'); method parse_command_line { Getopt::Long::GetOptions( @@ -63,6 +65,9 @@ method dump { say "[mpd]"; say "host = " . ( $mpd_host // '' ); say "port = " . ( $mpd_port // '' ); + say "initial-reconnect-delay = " + . duration_exact($initial_reconnect_delay); + say "max-reconnect-delay = " . duration_exact($max_reconnect_delay); say ""; say "[queue]"; say "target-length = $target_queue_length"; @@ -89,6 +94,14 @@ method parse_config_file($path) { handle_config_option( $ini => mpd => host => \$mpd_host ); handle_config_option( $ini => mpd => port => \$mpd_port ); + handle_config_option( + $ini => mpd => 'initial-reconnect-delay' => \$initial_reconnect_delay, + \&parse_duration + ); + handle_config_option( + $ini => mpd => 'max-reconnect-delay' => \$max_reconnect_delay, + \&parse_duration + ); handle_config_option( $ini => 'mpd-feeder' => log_level => \$log_level ); -- 2.39.5