]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
b6c82033a2f11cfe9cdde3208aa092f35dae8f60
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 use v5.28;
2 use utf8;
3 use Object::Pad;
4 class App::MPD::Feeder;
5
6 use App::MPD::Feeder::DB;
7 use App::MPD::Feeder::Options;
8 use DBD::Pg;
9 use DBI;
10 use Getopt::Long;
11 use IO::Async::Signal;
12 use IO::Async::Timer::Countdown;
13 use IO::Async::Timer::Periodic;
14 use Log::Any qw($log);
15 use Net::Async::MPD;
16 use Object::Pad;
17 use Syntax::Keyword::Try;
18 use Time::Duration qw(duration_exact);
19
20 has $cfg_file :reader;
21 has $opt :reader;
22 has $db :reader;
23 has $db_needs_update :writer = 1;
24 has $mpd :reader;
25 has $mpd_connected = 0;
26 has $playlist_needs_filling = 1;
27 has $quit_requested = 0;
28 has $reload_requested = 0;
29 has $idler;
30 has $last_mpd_comm;
31 has $reconnect_delay = 5;
32
33 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
34
35 ADJUST {
36     Getopt::Long::Configure('pass_through');
37     Getopt::Long::GetOptions( 'cfg|config=s' => \$cfg_file );
38     Getopt::Long::Configure('no_pass_through');
39
40     $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
41
42     $self->configure;
43
44     $db_needs_update = 0 if $opt->skip_db_update;
45 }
46
47 method configure {
48     my $new_opt = App::MPD::Feeder::Options->new;
49
50     $new_opt->parse_config_file($cfg_file) if $cfg_file;
51
52     $new_opt->parse_command_line;
53
54     Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
55
56     $opt = $new_opt;
57
58     $reconnect_delay = $opt->initial_reconnect_delay;
59
60     $db = App::MPD::Feeder::DB->new( opt => $opt );
61 }
62
63 method init_mpd {
64     return if $mpd;
65
66     my %conn = ( auto_connect => 0 );
67     $conn{host} = $opt->mpd_host if $opt->mpd_host;
68     $conn{port} = $opt->mpd_port if $opt->mpd_port;
69
70     $mpd = Net::Async::MPD->new(%conn);
71
72     $mpd->on(
73         close => sub {
74             $log->warn("Connection to MPD lost");
75             $mpd->loop->stop('disconnected');
76             $mpd_connected = 0;
77             $idler->cancel if $idler;
78             undef $mpd;
79             $self->init_mpd;
80         }
81     );
82     $mpd->on(
83         playlist => sub {
84             $playlist_needs_filling = 1;
85         }
86     );
87     $mpd->on(
88         database => sub {
89             $db_needs_update = 1;
90         }
91     );
92
93     my $loop = $mpd->loop;
94
95     my $int_signal_handler = sub {
96         state $signal_count = 0;
97         $signal_count++;
98
99         if ( $signal_count > 1 ) {
100             $log->warn("Another signal received (#$signal_count)");
101             $log->warn("Exiting abruptly");
102             exit 2;
103         }
104
105         $log->debug("Signal received. Stopping loop");
106         $quit_requested = 1;
107         $loop->stop('quit');
108         $self->break_idle;
109     };
110
111     for (qw(TERM INT)) {
112         $loop->add(
113             IO::Async::Signal->new(
114                 name       => $_,
115                 on_receipt => $int_signal_handler,
116             )
117         );
118     }
119
120     $loop->add(
121         IO::Async::Signal->new(
122             name       => 'HUP',
123             on_receipt => sub {
124                 $log->debug("SIGHUP received. Scheduling reload");
125                 $reload_requested = 1;
126                 $loop->stop('reload');
127                 $self->break_idle;
128             },
129         )
130     );
131
132     $loop->add(
133         IO::Async::Signal->new(
134             name       => 'USR1',
135             on_receipt => sub {
136                 $log->debug(
137                     "SIGUSR1 received. Dumping configuration to STDERR");
138                 my $old = select \*STDERR;
139                 try {
140                     $opt->dump;
141                 }
142                 finally {
143                     select $old;
144                 }
145             },
146         )
147     );
148 }
149
150 method connect_db {
151     $db->connect($opt);
152 }
153
154 method update_db( $force = undef ) {
155     if ( !$db_needs_update and !$force ) {
156         $log->debug("Skipping DB update");
157         return;
158     }
159
160     $log->info('Updating song database');
161
162     my $rows = $mpd->send('listallinfo')->get;
163
164     $log->trace('got all songs from MPD');
165
166     $db->start_update;
167     try {
168         my $song_count;
169
170         foreach my $entry (@$rows) {
171             next unless exists $entry->{file};
172
173             $self->db->store_song( $entry->{file},
174                 $entry->{AlbumArtist} // $entry->{Artist},
175                 $entry->{Album} );
176
177             $song_count++;
178         }
179
180         my ($total_songs, $total_artists, $total_albums,
181             $new_songs,   $new_artists,   $new_albums
182         ) = $self->db->finish_update;
183
184         $log->notice(
185             "Updated data about $song_count songs (including $new_songs new), "
186                 . "$total_artists artists (including $new_artists new) "
187
188                 . "and $total_albums albums (including $new_albums new)"
189         );
190
191         $db_needs_update = 0;
192     }
193     catch {
194         my $err = $@;
195         $self->db->cancel_update;
196         die $err;
197     }
198 }
199
200 method queue_songs( $num = undef ) {
201     if ( !defined $num ) {
202         return unless $playlist_needs_filling;
203
204         $log->trace("Requesting playlist");
205         my $present = $mpd->send('playlist')->get // [];
206         $present = scalar(@$present);
207
208         if ( $present < $opt->target_queue_length ) {
209             $log->notice( "Playlist contains $present songs. Wanted: "
210                     . $opt->target_queue_length );
211             $self->queue_songs( $opt->target_queue_length - $present );
212         }
213         else {
214             $log->info("Playlist contains $present songs");
215             $playlist_needs_filling = 0;
216         }
217
218         return;
219     }
220
221     my @list = $self->db->find_suitable_songs($num);
222
223     die "Found no suitable songs" unless @list;
224
225     if ( @list < $num ) {
226         $log->warn(
227             sprintf(
228                 'Found only %d suitable songs instead of %d',
229                 scalar(@list), $num
230             )
231         );
232     }
233
234     $log->debug("About to add $num songs to the playlist");
235
236     my @paths;
237     for my $song (@list) {
238         my $path = $song->{song};
239         $path =~ s/"/\\"/g;
240         push @paths, $path;
241     }
242
243     $log->notice( "Adding " . join( ', ', map {"«$_»"} @paths ) );
244
245     # MPD needs raw bytes
246     utf8::encode($_) for @paths;
247     my @commands;
248     for (@paths) {
249         push @commands, [ add => "\"$_\"" ];
250     }
251     my $f = $mpd->send( \@commands );
252     $f->on_fail( sub { die @_ } );
253     $f->on_done(
254         sub {
255             $self->db->note_song_qeued($_) for @list;
256             $playlist_needs_filling = 0;
257         }
258     );
259     $f->get;
260 }
261
262 method reexec {
263     $log->info("disconnecting and re-starting");
264     $db->disconnect;
265     undef $mpd;
266
267     my @exec = ( $0, '--config', $self->cfg_file, '--skip-db-update' );
268     if ( $log->is_trace ) {
269         $log->trace( 'exec ' . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) );
270     }
271     exec(@exec);
272 }
273
274 method break_idle {
275     if ( $idler && !$idler->is_ready ) {
276         if ($mpd_connected) {
277             $log->trace("hand-sending 'noidle'");
278             undef $idler;
279             $mpd->{mpd_handle}->write("noidle\n");
280         }
281         else {
282             $log->trace("not connected to MPD: skipping 'noidle'");
283         }
284     }
285     else {
286         $log->trace("no idler found");
287     }
288 }
289
290 method sleep_before_reconnection {
291     $log->debug( "Waiting for "
292             . duration_exact($reconnect_delay)
293             . " before re-connecting" );
294
295     $mpd->loop->add(
296         IO::Async::Timer::Countdown->new(
297             delay     => $reconnect_delay,
298             on_expire => sub { $mpd->loop->stop },
299         )->start
300     );
301
302     $reconnect_delay = $reconnect_delay * 1.5;
303     $reconnect_delay = 120 if $reconnect_delay > 120;
304     $mpd->loop->run;
305 }
306
307 method pulse {
308     unless ($mpd_connected) {
309         $log->trace("Connecting to MPD...");
310         my $f = $mpd->connect->await;
311
312         if ( $f->is_done ) {
313             $mpd_connected          = 1;
314             $playlist_needs_filling = 1;
315             $reconnect_delay        = $opt->initial_reconnect_delay;
316             $mpd->loop->later( sub { $self->pulse } );
317         }
318         elsif ( $f->is_failed ) {
319             $mpd->loop->stop('disconnected');
320             $log->warn($f->failure);
321             $self->sleep_before_reconnection;
322         }
323         else {
324             die "connect Future neither done nor failed!?";
325         }
326
327         return;
328     }
329
330     if ($db_needs_update) {
331         $self->update_db;
332         $mpd->loop->later( sub { $self->pulse } );
333         return;
334     }
335
336     if ($playlist_needs_filling) {
337         $self->queue_songs;
338         $mpd->loop->later( sub { $self->pulse } );
339         return;
340     }
341
342     $log->debug("Waiting idle. PID=$$");
343     $last_mpd_comm = time;
344     $idler         = $mpd->send("idle database playlist");
345     $idler->await;
346
347     $log->trace('got out of idle');
348
349     if ($idler) {
350         if ( $idler->is_done ) {
351             my $result = $idler->get;
352             undef $idler;
353             if ( ref $result and $result->{changed} ) {
354                 my $changed = $result->{changed};
355                 $changed = [$changed] unless ref $changed;
356
357                 $mpd->emit($_) for @$changed;
358             }
359         }
360         elsif ( $idler->is_cancelled ) {
361             $log->trace("idle was cancelled");
362             undef $idler;
363         }
364         elsif ( $idler->is_failed ) {
365             $log->warn("idle failed: ".$idler->failure);
366             undef $idler;
367         }
368     }
369
370     $mpd->loop->stop;
371 }
372
373 method run_loop {
374     $self->connect_db;
375
376     $self->init_mpd;
377
378     my $loop = $mpd->loop;
379
380     $loop->add(
381         IO::Async::Timer::Periodic->new(
382             interval => 60,
383             on_tick  => sub {
384                 if (!$mpd_connected) {
385                     $log->trace("Not connected to MPD. Skipping alive check.");
386                     $loop->stop('disconnected');
387                     return;
388                 }
389
390                 if ( time - $last_mpd_comm > 300 ) {
391
392                     $log->trace(
393                         "no active MPD communication for more that 5 minutes");
394                     $log->debug("forcing alive check");
395                     $self->break_idle;
396                 }
397                 else {
398                     $log->trace(
399                         "contacted MPD less than 5 minutes ago. skipping alive check"
400                     );
401                 }
402             },
403         )->start
404     );
405
406     for ( ;; ) {
407         if ( $quit_requested ) {
408             $log->trace("about to quit");
409             undef $mpd;
410             $db->disconnect;
411             last;
412         }
413         elsif ( $reload_requested ) {
414             $self->reexec;
415             die "Not reached";
416         }
417
418         $log->trace('About to run the loop');
419
420         $mpd->loop->later( sub { $self->pulse } );
421
422         $mpd->loop->run;
423     }
424 }
425
426 1;