]> git.ktnx.net Git - mpd-feeder.git/blob - lib/App/MPD/Feeder.pm
rework idling again, walking around Net::Async::MPD interface
[mpd-feeder.git] / lib / App / MPD / Feeder.pm
1 package App::MPD::Feeder;
2
3 use strict;
4 use warnings;
5 use utf8;
6 use feature 'state';
7
8 use App::MPD::Feeder::DB;
9 use App::MPD::Feeder::Options;
10 use App::MPD::Feeder::WorkQueue;
11 use DBD::Pg;
12 use DBI;
13 use Getopt::Long;
14 use IO::Async::Signal;
15 use IO::Async::Timer::Periodic;
16 use Log::Any qw($log);
17 use Net::Async::MPD;
18 use Object::Pad;
19 use Syntax::Keyword::Try;
20
21 class App::MPD::Feeder {
22     has $cfg_file :reader;
23     has $opt :reader;
24     has $db :reader;
25     has $db_needs_update :writer = 1;
26     has $mpd :reader;
27     has $idler;
28     has $work_queue = App::MPD::Feeder::WorkQueue->new;
29     has $last_mpd_comm;
30
31 use constant DEFAULT_CONFIG_FILE => '/etc/mpd-feeder/mpd-feeder.conf';
32
33     ADJUST {
34         Getopt::Long::Configure('pass_through');
35         Getopt::Long::GetOptions('cfg|config=s' => \$cfg_file);
36         Getopt::Long::Configure('no_pass_through');
37
38         $cfg_file //= DEFAULT_CONFIG_FILE if -e DEFAULT_CONFIG_FILE;
39
40         $self->configure;
41
42         $db_needs_update = 0 if $opt->skip_db_update;
43     }
44
45     method configure {
46         my $new_opt = App::MPD::Feeder::Options->new;
47
48         $new_opt->parse_config_file($cfg_file) if $cfg_file;
49
50         $new_opt->parse_command_line;
51
52         Log::Any::Adapter->set( Stderr => log_level => $new_opt->log_level );
53
54         $opt = $new_opt;
55
56         $db = App::MPD::Feeder::DB->new( opt => $opt );
57     }
58
59     method connect_mpd {
60         return if $mpd;
61
62         my %conn = ( auto_connect => 1 );
63         $conn{host} = $opt->mpd_host if $opt->mpd_host;
64         $conn{port} = $opt->mpd_port if $opt->mpd_port;
65
66         $mpd = Net::Async::MPD->new(%conn);
67
68         $mpd->on(
69             close => sub {
70                 die "Connection to MPD lost";
71             }
72         );
73         $mpd->on(
74             playlist => sub {
75                 $work_queue->add('playlist');
76             }
77         );
78         $mpd->on(
79             database => sub {
80                 $work_queue->add('database');
81             }
82         );
83
84         my $int_signal_handler = sub {
85             state $signal_count = 0;
86             $signal_count++;
87             $log->debug("Signal received. Stopping loop");
88             $work_queue->add('quit');
89             $self->break_idle;
90
91             if ( $signal_count > 1 ) {
92                 $log->warn("Another signal received (#$signal_count)");
93                 $log->warn("Exiting abruptly");
94                 exit 2;
95             }
96         };
97
98         for (qw(TERM INT)) {
99             $mpd->loop->add(
100                 IO::Async::Signal->new(
101                     name       => $_,
102                     on_receipt => $int_signal_handler,
103                 )
104             );
105         }
106
107         $mpd->loop->add(
108             IO::Async::Signal->new(
109                 name       => 'HUP',
110                 on_receipt => sub {
111                     $log->debug("SIGHUP received. Scheduling reload");
112                     $work_queue->add('reload');
113                     $self->break_idle;
114                 },
115             )
116         );
117
118         $mpd->loop->add(
119             IO::Async::Signal->new(
120                 name       => 'USR1',
121                 on_receipt => sub {
122                     $log->debug("SIGUSR1 received. Dumping configuration to STDERR");
123                     my $old = select \*STDERR;
124                     try {
125                         $opt->dump;
126                     }
127                     finally {
128                         select $old;
129                     }
130                 },
131             )
132         );
133     }
134
135     method connect_db {
136         $db->connect($opt);
137         $self->update_db;
138     }
139
140     method update_db($force = undef) {
141         if (!$db_needs_update and !$force) {
142             $log->debug("Skipping DB update");
143             return;
144         }
145
146         $log->info('Updating song database');
147         $self->connect_mpd;
148
149         my $rows = $mpd->send('listallinfo')->get;
150
151         $log->trace('got all songs from MPD');
152
153         $db->start_update;
154         try {
155             my $song_count;
156
157             foreach my $entry (@$rows) {
158                 next unless exists $entry->{file};
159
160                 $self->db->store_song( $entry->{file},
161                     $entry->{AlbumArtist} // $entry->{Artist},
162                     $entry->{Album} );
163
164                 $song_count++;
165             }
166
167             my ($total_songs, $total_artists, $total_albums,
168                 $new_songs,   $new_artists,   $new_albums
169             ) = $self->db->finish_update;
170
171             $log->info(
172                 "Updated data about $song_count songs (including $new_songs new), "
173                     . "$total_artists artists (including $new_artists new) "
174
175                     . "and $total_albums albums (including $new_albums new)"
176             );
177
178             $db_needs_update = 0;
179         }
180         catch {
181             my $err = $@;
182             $self->db->cancel_update;
183             die $err;
184         }
185     }
186
187     method queue_songs($num = undef) {
188         $self->connect_db;
189         if (!defined $num) {
190             $self->connect_mpd;
191             $log->trace("Requesting playlist");
192             my $present = $mpd->send('playlist')->get // [];
193             $present = scalar(@$present);
194
195             $log->notice( "Playlist contains $present songs. Wanted: "
196                     . $opt->target_queue_length );
197             if ( $present < $opt->target_queue_length ) {
198                 $self->queue_songs(
199                     $opt->target_queue_length - $present );
200             }
201
202             return;
203         }
204
205         my @list = $self->db->find_suitable_songs($num);
206
207         die "Found no suitable songs" unless @list;
208
209         if ( @list < $num ) {
210             $log->warn(
211                 sprintf(
212                     'Found only %d suitable songs instead of %d',
213                     scalar(@list), $num
214                 )
215             );
216         }
217
218         $log->info("About to add $num songs to the playlist");
219
220         my @paths;
221         for my $song (@list) {
222             my $path = $song->{song};
223             $path =~ s/"/\\"/g;
224             push @paths, $path;
225         }
226
227         $log->debug( "Adding " . join( ', ', map {"«$_»"} @paths ) );
228         # MPD needs raw bytes
229         utf8::encode($_) for @paths;
230         my @commands;
231         for (@paths) {
232             push @commands, [ add => "\"$_\"" ];
233         }
234         $self->connect_mpd;
235         my $f = $mpd->send( \@commands );
236         $f->on_fail( sub { die @_ } );
237         $f->on_done(
238             sub {
239                 $self->db->note_song_qeued($_) for @list;
240             }
241         );
242         $f->get;
243     }
244
245     method stop {
246         undef $mpd;
247
248         $db->disconnect;
249     }
250
251     method handle_work_queue {
252         while ( my $item = $work_queue->next ) {
253             if ( $item eq 'playlist' ) {
254                 $self->queue_songs;
255             }
256             elsif ( $item eq 'database' ) {
257                 $db_needs_update = 1;
258                 $self->update_db;
259             }
260             elsif ( $item eq 'reload' ) {
261                 $log->notice("disconnecting and re-starting");
262                 $self->stop;
263
264                 my @exec =
265                     ( $0, '--config', $self->cfg_file, '--skip-db-update' );
266                 if ( $log->is_trace ) {
267                     $log->trace( 'exec '
268                             . join( ' ', map { /\s/ ? "'$_'" : $_ } @exec ) );
269                 }
270                 exec(@exec);
271             }
272             elsif ( $item eq 'quit' ) {
273                 $log->trace("quitting");
274                 $self->stop;
275                 exit 0;
276             }
277             else {
278                 die "Unknown work queue item '$item'";
279             }
280         }
281     }
282
283     method break_idle {
284         if ($idler && !$idler->is_ready) {
285             $log->trace("hand-sending 'noidle'");
286             undef $idler;
287             $mpd->{mpd_handle}->write("noidle\n");;
288         }
289         else {
290             $log->trace("no idler found");
291         }
292     }
293
294     method run_loop {
295         $self->connect_mpd;
296         $self->connect_db;
297
298         $mpd->loop->add(
299             IO::Async::Timer::Periodic->new(
300                 interval => 60,
301                 on_tick  => sub {
302                     if ( time - $last_mpd_comm > 300 ) {
303
304                         $log->trace(
305                             "no active MPD communication for more that 5 minutes"
306                         );
307                         $log->trace("forcing alive check");
308                         $self->break_idle;
309                     }
310                     else {
311                         $log->trace("contacted MPD less than 5 minutes ago. skipping alive check");
312                     }
313                 },
314             )->start
315         );
316
317         $self->queue_songs;
318
319         for ( ;; ) {
320             $log->debug("Waiting idle. PID=$$");
321             $last_mpd_comm = time;
322             $idler = $mpd->send("idle database playlist");
323             my $result = $idler->get;
324             undef $idler;
325
326             if ($result and $result->{changed}){
327                 my $changed = $result->{changed};
328                 $changed = [ $changed ] unless ref $changed;
329
330                 $mpd->emit($_) for @$changed;
331             }
332
333             $log->trace('got out of idle');
334
335             $self->handle_work_queue;
336         }
337     }
338 }
339