]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
working re-connection on disconnection
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 use v5.28;
2 use utf8;
3 use Object::Pad;
4 class App::MPD::Feeder;
5
6 use App::MPD::Feeder::DB;
7 use App::MPD::Feeder::Options;
8 use App::MPD::Feeder::WorkQueue;
9 use DBD::Pg;
10 use DBI;
11 use Getopt::Long;
12 use IO::Async::Signal;
13 use IO::Async::Timer::Countdown;
14 use IO::Async::Timer::Periodic;
15 use Log::Any qw($log);
16 use Net::Async::MPD;
17 use Object::Pad;
18 use Syntax::Keyword::Try;
19 use Time::Duration qw(duration_exact);
20
21 has $cfg_file :reader;
22 has $opt :reader;
23 has $db :reader;
24 has $db_needs_update :writer = 1;
25 has $mpd :reader;
26 has $mpd_connected = 0;
27 has $playlist_needs_filling = 1;
28 has $quit_requested = 0;
29 has $reload_requested = 0;
30 has $idler;
31 has $last_mpd_comm;
32 has $reconnect_delay = 5;
33
34 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
35
36 ADJUST {
37     Getopt::Long::Configure('pass_through');
38     Getopt::Long::GetOptions( 'cfg|config=s' => \$cfg_file );
39     Getopt::Long::Configure('no_pass_through');
40
41     $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
42
43     $self->configure;
44
45     $db_needs_update = 0 if $opt->skip_db_update;
46 }
47
48 method configure {
49     my $new_opt = App::MPD::Feeder::Options->new;
50
51     $new_opt->parse_config_file($cfg_file) if $cfg_file;
52
53     $new_opt->parse_command_line;
54
55     Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
56
57     $opt = $new_opt;
58
59     $reconnect_delay = $opt->initial_reconnect_delay;
60
61     $db = App::MPD::Feeder::DB->new( opt => $opt );
62 }
63
64 method init_mpd {
65     return if $mpd;
66
67     my %conn = ( auto_connect => 0 );
68     $conn{host} = $opt->mpd_host if $opt->mpd_host;
69     $conn{port} = $opt->mpd_port if $opt->mpd_port;
70
71     $mpd = Net::Async::MPD->new(%conn);
72
73     $mpd->on(
74         close => sub {
75             $log->warn("Connection to MPD lost");
76             $mpd->loop->stop('disconnected');
77             $mpd_connected = 0;
78             $idler->cancel if $idler;
79             undef $mpd;
80             $self->init_mpd;
81         }
82     );
83     $mpd->on(
84         playlist => sub {
85             $playlist_needs_filling = 1;
86         }
87     );
88     $mpd->on(
89         database => sub {
90             $db_needs_update = 1;
91         }
92     );
93
94     my $int_signal_handler = sub {
95         state $signal_count = 0;
96         $signal_count++;
97
98         if ( $signal_count > 1 ) {
99             $log->warn("Another signal received (#$signal_count)");
100             $log->warn("Exiting abruptly");
101             exit 2;
102         }
103
104         $log->debug("Signal received. Stopping loop");
105         $quit_requested = 1;
106         $mpd->loop->stop('quit');
107         $self->break_idle;
108     };
109
110     for (qw(TERM INT)) {
111         $mpd->loop->add(
112             IO::Async::Signal->new(
113                 name       => $_,
114                 on_receipt => $int_signal_handler,
115             )
116         );
117     }
118
119     $mpd->loop->add(
120         IO::Async::Signal->new(
121             name       => 'HUP',
122             on_receipt => sub {
123                 $log->debug("SIGHUP received. Scheduling reload");
124                 $reload_requested = 1;
125                 $mpd->loop->stop('reload');
126                 $self->break_idle;
127             },
128         )
129     );
130
131     $mpd->loop->add(
132         IO::Async::Signal->new(
133             name       => 'USR1',
134             on_receipt => sub {
135                 $log->debug(
136                     "SIGUSR1 received. Dumping configuration to STDERR");
137                 my $old = select \*STDERR;
138                 try {
139                     $opt->dump;
140                 }
141                 finally {
142                     select $old;
143                 }
144             },
145         )
146     );
147 }
148
149 method connect_db {
150     $db->connect($opt);
151 }
152
153 method update_db( $force = undef ) {
154     if ( !$db_needs_update and !$force ) {
155         $log->debug("Skipping DB update");
156         return;
157     }
158
159     $log->info('Updating song database');
160
161     my $rows = $mpd->send('listallinfo')->get;
162
163     $log->trace('got all songs from MPD');
164
165     $db->start_update;
166     try {
167         my $song_count;
168
169         foreach my $entry (@$rows) {
170             next unless exists $entry->{file};
171
172             $self->db->store_song( $entry->{file},
173                 $entry->{AlbumArtist} // $entry->{Artist},
174                 $entry->{Album} );
175
176             $song_count++;
177         }
178
179         my ($total_songs, $total_artists, $total_albums,
180             $new_songs,   $new_artists,   $new_albums
181         ) = $self->db->finish_update;
182
183         $log->info(
184             "Updated data about $song_count songs (including $new_songs new), "
185                 . "$total_artists artists (including $new_artists new) "
186
187                 . "and $total_albums albums (including $new_albums new)"
188         );
189
190         $db_needs_update = 0;
191     }
192     catch {
193         my $err = $@;
194         $self->db->cancel_update;
195         die $err;
196     }
197 }
198
199 method queue_songs( $num = undef ) {
200     if ( !defined $num ) {
201         return unless $playlist_needs_filling;
202
203         $log->trace("Requesting playlist");
204         my $present = $mpd->send('playlist')->get // [];
205         $present = scalar(@$present);
206
207         $log->notice( "Playlist contains $present songs. Wanted: "
208                 . $opt->target_queue_length );
209         if ( $present < $opt->target_queue_length ) {
210             $self->queue_songs( $opt->target_queue_length - $present );
211         }
212         else {
213             $playlist_needs_filling = 0;
214         }
215
216         return;
217     }
218
219     my @list = $self->db->find_suitable_songs($num);
220
221     die "Found no suitable songs" unless @list;
222
223     if ( @list < $num ) {
224         $log->warn(
225             sprintf(
226                 'Found only %d suitable songs instead of %d',
227                 scalar(@list), $num
228             )
229         );
230     }
231
232     $log->info("About to add $num songs to the playlist");
233
234     my @paths;
235     for my $song (@list) {
236         my $path = $song->{song};
237         $path =~ s/"/\\"/g;
238         push @paths, $path;
239     }
240
241     $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) );
242
243     # MPD needs raw bytes
244     utf8::encode($_) for @paths;
245     my @commands;
246     for (@paths) {
247         push @commands, [ add => "\"$_\"" ];
248     }
249     my $f = $mpd->send( \@commands );
250     $f->on_fail( sub { die @_ } );
251     $f->on_done(
252         sub {
253             $self->db->note_song_qeued($_) for @list;
254             $playlist_needs_filling = 0;
255         }
256     );
257     $f->get;
258 }
259
260 method reexec {
261     $log->notice("disconnecting and re-starting");
262     $db->disconnect;
263     undef $mpd;
264
265     my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' );
266     if ( $log->is_trace ) {
267         $log->trace( 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) );
268     }
269     exec(@exec);
270 }
271
272 method break_idle {
273     if ( $idler && !$idler->is_ready ) {
274         if ($mpd_connected) {
275             $log->trace("hand-sending 'noidle'");
276             undef $idler;
277             $mpd->{mpd_handle}->write("noidle\n");
278         }
279         else {
280             $log->trace("not connected to MPD: skipping 'noidle'");
281         }
282     }
283     else {
284         $log->trace("no idler found");
285     }
286 }
287
288 method sleep_before_reconnection {
289     $log->debug( "Waiting for "
290             . duration_exact($reconnect_delay)
291             . " before re-connecting" );
292
293     $mpd->loop->add(
294         IO::Async::Timer::Countdown->new(
295             delay     => $reconnect_delay,
296             on_expire => sub { $mpd->loop->stop },
297         )->start
298     );
299
300     $reconnect_delay = $reconnect_delay * 1.5;
301     $reconnect_delay = 120 if $reconnect_delay > 120;
302     $mpd->loop->run;
303 }
304
305 method pulse {
306     unless ($mpd_connected) {
307         $log->trace("Connecting to MPD...");
308         my $f = $mpd->connect->await;
309
310         if ( $f->is_done ) {
311             $mpd_connected          = 1;
312             $playlist_needs_filling = 1;
313             $reconnect_delay        = $opt->initial_reconnect_delay;
314             $mpd->loop->later( sub { $self->pulse } );
315         }
316         elsif ( $f->is_failed ) {
317             $mpd->loop->stop('disconnected');
318             $log->warn($f->failure);
319             $self->sleep_before_reconnection;
320         }
321         else {
322             die "connect Future neither done nor failed!?";
323         }
324
325         return;
326     }
327
328     if ($db_needs_update) {
329         $self->update_db;
330         $mpd->loop->later( sub { $self->pulse } );
331         return;
332     }
333
334     if ($playlist_needs_filling) {
335         $self->queue_songs;
336         $mpd->loop->later( sub { $self->pulse } );
337         return;
338     }
339
340     $log->debug("Waiting idle. PID=$$");
341     $last_mpd_comm = time;
342     $idler         = $mpd->send("idle database playlist");
343     $idler->await;
344
345     $log->trace('got out of idle');
346
347     if ( $idler->is_done ) {
348         my $result = $idler->get;
349         undef $idler;
350         if ( ref $result and $result->{changed} ) {
351             my $changed = $result->{changed};
352             $changed = [$changed] unless ref $changed;
353
354             $mpd->emit($_) for @$changed;
355         }
356     }
357     elsif ( $idler->is_cancelled ) {
358         $log->trace("idle was cancelled");
359         undef $idler;
360     }
361     elsif ( $idler->is_failed ) {
362         $log->warn("idle failed: ".$idler->failure);
363         undef $idler;
364     }
365
366     $mpd->loop->stop;
367 }
368
369 method run_loop {
370     $self->connect_db;
371
372     $self->init_mpd;
373
374     $mpd->loop->add(
375         IO::Async::Timer::Periodic->new(
376             interval => 60,
377             on_tick  => sub {
378                 if (!$mpd_connected) {
379                     $log->trace("Not connected to MPD. Skipping alive check.");
380                     return;
381                 }
382
383                 if ( time - $last_mpd_comm > 300 ) {
384
385                     $log->trace(
386                         "no active MPD communication for more that 5 minutes");
387                     $log->debug("forcing alive check");
388                     $self->break_idle;
389                 }
390                 else {
391                     $log->trace(
392                         "contacted MPD less than 5 minutes ago. skipping alive check"
393                     );
394                 }
395             },
396         )->start
397     );
398
399     for ( ;; ) {
400         if ( $quit_requested ) {
401             $log->trace("about to quit");
402             undef $mpd;
403             $db->disconnect;
404             last;
405         }
406         elsif ( $reload_requested ) {
407             $self->reexec;
408             die "Not reached";
409         }
410
411         $log->trace('About to run the loop');
412
413         $mpd->loop->later( sub { $self->pulse } );
414
415         $mpd->loop->run;
416     }
417 }
418
419 1;