]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
keep $loop variable at closure reach
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 use v5.28;
2 use utf8;
3 use Object::Pad;
4 class App::MPD::Feeder;
5
6 use App::MPD::Feeder::DB;
7 use App::MPD::Feeder::Options;
8 use App::MPD::Feeder::WorkQueue;
9 use DBD::Pg;
10 use DBI;
11 use Getopt::Long;
12 use IO::Async::Signal;
13 use IO::Async::Timer::Countdown;
14 use IO::Async::Timer::Periodic;
15 use Log::Any qw($log);
16 use Net::Async::MPD;
17 use Object::Pad;
18 use Syntax::Keyword::Try;
19 use Time::Duration qw(duration_exact);
20
21 has $cfg_file :reader;
22 has $opt :reader;
23 has $db :reader;
24 has $db_needs_update :writer = 1;
25 has $mpd :reader;
26 has $mpd_connected = 0;
27 has $playlist_needs_filling = 1;
28 has $quit_requested = 0;
29 has $reload_requested = 0;
30 has $idler;
31 has $last_mpd_comm;
32 has $reconnect_delay = 5;
33
34 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
35
36 ADJUST {
37     Getopt::Long::Configure('pass_through');
38     Getopt::Long::GetOptions( 'cfg|config=s' => \$cfg_file );
39     Getopt::Long::Configure('no_pass_through');
40
41     $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
42
43     $self->configure;
44
45     $db_needs_update = 0 if $opt->skip_db_update;
46 }
47
48 method configure {
49     my $new_opt = App::MPD::Feeder::Options->new;
50
51     $new_opt->parse_config_file($cfg_file) if $cfg_file;
52
53     $new_opt->parse_command_line;
54
55     Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
56
57     $opt = $new_opt;
58
59     $reconnect_delay = $opt->initial_reconnect_delay;
60
61     $db = App::MPD::Feeder::DB->new( opt => $opt );
62 }
63
64 method init_mpd {
65     return if $mpd;
66
67     my %conn = ( auto_connect => 0 );
68     $conn{host} = $opt->mpd_host if $opt->mpd_host;
69     $conn{port} = $opt->mpd_port if $opt->mpd_port;
70
71     $mpd = Net::Async::MPD->new(%conn);
72
73     $mpd->on(
74         close => sub {
75             $log->warn("Connection to MPD lost");
76             $mpd->loop->stop('disconnected');
77             $mpd_connected = 0;
78             $idler->cancel if $idler;
79             undef $mpd;
80             $self->init_mpd;
81         }
82     );
83     $mpd->on(
84         playlist => sub {
85             $playlist_needs_filling = 1;
86         }
87     );
88     $mpd->on(
89         database => sub {
90             $db_needs_update = 1;
91         }
92     );
93
94     my $loop = $mpd->loop;
95
96     my $int_signal_handler = sub {
97         state $signal_count = 0;
98         $signal_count++;
99
100         if ( $signal_count > 1 ) {
101             $log->warn("Another signal received (#$signal_count)");
102             $log->warn("Exiting abruptly");
103             exit 2;
104         }
105
106         $log->debug("Signal received. Stopping loop");
107         $quit_requested = 1;
108         $loop->stop('quit');
109         $self->break_idle;
110     };
111
112     for (qw(TERM INT)) {
113         $loop->add(
114             IO::Async::Signal->new(
115                 name       => $_,
116                 on_receipt => $int_signal_handler,
117             )
118         );
119     }
120
121     $loop->add(
122         IO::Async::Signal->new(
123             name       => 'HUP',
124             on_receipt => sub {
125                 $log->debug("SIGHUP received. Scheduling reload");
126                 $reload_requested = 1;
127                 $loop->stop('reload');
128                 $self->break_idle;
129             },
130         )
131     );
132
133     $loop->add(
134         IO::Async::Signal->new(
135             name       => 'USR1',
136             on_receipt => sub {
137                 $log->debug(
138                     "SIGUSR1 received. Dumping configuration to STDERR");
139                 my $old = select \*STDERR;
140                 try {
141                     $opt->dump;
142                 }
143                 finally {
144                     select $old;
145                 }
146             },
147         )
148     );
149 }
150
151 method connect_db {
152     $db->connect($opt);
153 }
154
155 method update_db( $force = undef ) {
156     if ( !$db_needs_update and !$force ) {
157         $log->debug("Skipping DB update");
158         return;
159     }
160
161     $log->info('Updating song database');
162
163     my $rows = $mpd->send('listallinfo')->get;
164
165     $log->trace('got all songs from MPD');
166
167     $db->start_update;
168     try {
169         my $song_count;
170
171         foreach my $entry (@$rows) {
172             next unless exists $entry->{file};
173
174             $self->db->store_song( $entry->{file},
175                 $entry->{AlbumArtist} // $entry->{Artist},
176                 $entry->{Album} );
177
178             $song_count++;
179         }
180
181         my ($total_songs, $total_artists, $total_albums,
182             $new_songs,   $new_artists,   $new_albums
183         ) = $self->db->finish_update;
184
185         $log->info(
186             "Updated data about $song_count songs (including $new_songs new), "
187                 . "$total_artists artists (including $new_artists new) "
188
189                 . "and $total_albums albums (including $new_albums new)"
190         );
191
192         $db_needs_update = 0;
193     }
194     catch {
195         my $err = $@;
196         $self->db->cancel_update;
197         die $err;
198     }
199 }
200
201 method queue_songs( $num = undef ) {
202     if ( !defined $num ) {
203         return unless $playlist_needs_filling;
204
205         $log->trace("Requesting playlist");
206         my $present = $mpd->send('playlist')->get // [];
207         $present = scalar(@$present);
208
209         $log->notice( "Playlist contains $present songs. Wanted: "
210                 . $opt->target_queue_length );
211         if ( $present < $opt->target_queue_length ) {
212             $self->queue_songs( $opt->target_queue_length - $present );
213         }
214         else {
215             $playlist_needs_filling = 0;
216         }
217
218         return;
219     }
220
221     my @list = $self->db->find_suitable_songs($num);
222
223     die "Found no suitable songs" unless @list;
224
225     if ( @list < $num ) {
226         $log->warn(
227             sprintf(
228                 'Found only %d suitable songs instead of %d',
229                 scalar(@list), $num
230             )
231         );
232     }
233
234     $log->info("About to add $num songs to the playlist");
235
236     my @paths;
237     for my $song (@list) {
238         my $path = $song->{song};
239         $path =~ s/"/\\"/g;
240         push @paths, $path;
241     }
242
243     $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) );
244
245     # MPD needs raw bytes
246     utf8::encode($_) for @paths;
247     my @commands;
248     for (@paths) {
249         push @commands, [ add => "\"$_\"" ];
250     }
251     my $f = $mpd->send( \@commands );
252     $f->on_fail( sub { die @_ } );
253     $f->on_done(
254         sub {
255             $self->db->note_song_qeued($_) for @list;
256             $playlist_needs_filling = 0;
257         }
258     );
259     $f->get;
260 }
261
262 method reexec {
263     $log->notice("disconnecting and re-starting");
264     $db->disconnect;
265     undef $mpd;
266
267     my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' );
268     if ( $log->is_trace ) {
269         $log->trace( 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) );
270     }
271     exec(@exec);
272 }
273
274 method break_idle {
275     if ( $idler && !$idler->is_ready ) {
276         if ($mpd_connected) {
277             $log->trace("hand-sending 'noidle'");
278             undef $idler;
279             $mpd->{mpd_handle}->write("noidle\n");
280         }
281         else {
282             $log->trace("not connected to MPD: skipping 'noidle'");
283         }
284     }
285     else {
286         $log->trace("no idler found");
287     }
288 }
289
290 method sleep_before_reconnection {
291     $log->debug( "Waiting for "
292             . duration_exact($reconnect_delay)
293             . " before re-connecting" );
294
295     $mpd->loop->add(
296         IO::Async::Timer::Countdown->new(
297             delay     => $reconnect_delay,
298             on_expire => sub { $mpd->loop->stop },
299         )->start
300     );
301
302     $reconnect_delay = $reconnect_delay * 1.5;
303     $reconnect_delay = 120 if $reconnect_delay > 120;
304     $mpd->loop->run;
305 }
306
307 method pulse {
308     unless ($mpd_connected) {
309         $log->trace("Connecting to MPD...");
310         my $f = $mpd->connect->await;
311
312         if ( $f->is_done ) {
313             $mpd_connected          = 1;
314             $playlist_needs_filling = 1;
315             $reconnect_delay        = $opt->initial_reconnect_delay;
316             $mpd->loop->later( sub { $self->pulse } );
317         }
318         elsif ( $f->is_failed ) {
319             $mpd->loop->stop('disconnected');
320             $log->warn($f->failure);
321             $self->sleep_before_reconnection;
322         }
323         else {
324             die "connect Future neither done nor failed!?";
325         }
326
327         return;
328     }
329
330     if ($db_needs_update) {
331         $self->update_db;
332         $mpd->loop->later( sub { $self->pulse } );
333         return;
334     }
335
336     if ($playlist_needs_filling) {
337         $self->queue_songs;
338         $mpd->loop->later( sub { $self->pulse } );
339         return;
340     }
341
342     $log->debug("Waiting idle. PID=$$");
343     $last_mpd_comm = time;
344     $idler         = $mpd->send("idle database playlist");
345     $idler->await;
346
347     $log->trace('got out of idle');
348
349     if ( $idler->is_done ) {
350         my $result = $idler->get;
351         undef $idler;
352         if ( ref $result and $result->{changed} ) {
353             my $changed = $result->{changed};
354             $changed = [$changed] unless ref $changed;
355
356             $mpd->emit($_) for @$changed;
357         }
358     }
359     elsif ( $idler->is_cancelled ) {
360         $log->trace("idle was cancelled");
361         undef $idler;
362     }
363     elsif ( $idler->is_failed ) {
364         $log->warn("idle failed: ".$idler->failure);
365         undef $idler;
366     }
367
368     $mpd->loop->stop;
369 }
370
371 method run_loop {
372     $self->connect_db;
373
374     $self->init_mpd;
375
376     my $loop = $mpd->loop;
377
378     $loop->add(
379         IO::Async::Timer::Periodic->new(
380             interval => 60,
381             on_tick  => sub {
382                 if (!$mpd_connected) {
383                     $log->trace("Not connected to MPD. Skipping alive check.");
384                     $loop->stop('disconnected');
385                     return;
386                 }
387
388                 if ( time - $last_mpd_comm > 300 ) {
389
390                     $log->trace(
391                         "no active MPD communication for more that 5 minutes");
392                     $log->debug("forcing alive check");
393                     $self->break_idle;
394                 }
395                 else {
396                     $log->trace(
397                         "contacted MPD less than 5 minutes ago. skipping alive check"
398                     );
399                 }
400             },
401         )->start
402     );
403
404     for ( ;; ) {
405         if ( $quit_requested ) {
406             $log->trace("about to quit");
407             undef $mpd;
408             $db->disconnect;
409             last;
410         }
411         elsif ( $reload_requested ) {
412             $self->reexec;
413             die "Not reached";
414         }
415
416         $log->trace('About to run the loop');
417
418         $mpd->loop->later( sub { $self->pulse } );
419
420         $mpd->loop->run;
421     }
422 }
423
424 1;