File Coverage

blib/lib/Net/Async/MPD.pm
Criterion Covered Total %
statement 43 169 25.4
branch 1 62 1.6
condition 0 12 0.0
subroutine 14 36 38.8
pod 7 8 87.5
total 65 287 22.6


line stmt bran cond sub pod time code
1             package Net::Async::MPD;
2              
3 2     2   431787 use strict;
  2         19  
  2         58  
4 2     2   10 use warnings;
  2         4  
  2         74  
5              
6             our $VERSION = '0.005';
7              
8 2     2   725 use IO::Async::Loop;
  2         45596  
  2         57  
9 2     2   1157 use IO::Async::Stream;
  2         74055  
  2         84  
10 2     2   16 use IO::Socket::IP;
  2         4  
  2         17  
11 2     2   1127 use Future::Utils qw( repeat );
  2         5  
  2         103  
12 2     2   23 use Scalar::Util qw( weaken );
  2         10  
  2         80  
13 2     2   11 use Carp;
  2         5  
  2         104  
14              
15 2     2   964 use namespace::clean;
  2         26391  
  2         13  
16              
17 2     2   4773 use Moo;
  2         6709  
  2         13  
18 2     2   3398 use MooX::HandlesVia;
  2         18821  
  2         14  
19             with 'Role::EventEmitter';
20              
21 2         25 use Types::Standard qw(
22             InstanceOf Int ArrayRef HashRef Str Maybe Bool CodeRef
23 2     2   858 );
  2         73647  
24              
25 2     2   4433 use Log::Any;
  2         16431  
  2         10  
26             my $log = Log::Any->get_logger( category => __PACKAGE__ );
27              
28             has auto_connect => (
29             is => 'ro',
30             isa => Bool,
31             default => 0,
32             );
33              
34             has state => (
35             is => 'rw',
36             isa => Str,
37             init_arg => undef,
38             default => 'created',
39             trigger => sub {
40             $_[0]->emit( state => $_[0]->{state} );
41             },
42             );
43              
44             has loop => (
45             is => 'ro',
46             lazy => 1,
47             default => sub { IO::Async::Loop->new },
48             );
49              
50             has read_queue => (
51             is => 'ro',
52             isa => ArrayRef [CodeRef],
53             lazy => 1,
54             init_arg => undef,
55             default => sub { [] },
56             handles_via => 'Array',
57             handles => {
58             push_read => 'push',
59             pop_read => 'pop',
60             shift_read => 'shift',
61             unshift_read => 'unshift',
62             },
63             );
64              
65             has password => (
66             is => 'ro',
67             isa => Maybe[Str],
68             lazy => 1,
69             );
70              
71             has port => (
72             is => 'ro',
73             isa => Int,
74             lazy => 1,
75             default => sub { $ENV{MPD_PORT} // 6600 },
76             );
77              
78             has host => (
79             is => 'ro',
80             isa => Str,
81             lazy => 1,
82             default => sub { $ENV{MPD_HOST} // 'localhost' },
83             );
84              
85 0     0 1 0 sub version { $_[0]->{version} };
86              
87             sub _parse_block {
88 0     0   0 my $self = shift;
89              
90             return sub {
91 0     0   0 my ( $handle, $buffref, $eof ) = @_;
92              
93 0         0 while ( $$buffref =~ s/^(.*)\n// ) {
94 0         0 my $line = $1;
95              
96 0 0       0 if ($line =~ /\w/) {
97 0         0 $log->tracef('< %s', $line);
98 0 0       0 if ($line =~ /^OK/) {
    0          
    0          
99 0 0       0 if ($line =~ /OK MPD (.*)/) {
100 0         0 $log->trace('Connection established');
101 0         0 $self->{version} = $1;
102              
103 0 0       0 $self->send( password => $self->password )
104             if $self->password;
105              
106 0         0 $self->state( 'ready' );
107             }
108             else {
109 0 0       0 pop @{$self->{mpd_buffer}} unless @{$self->{mpd_buffer}[-1]};
  0         0  
  0         0  
110 0         0 $self->shift_read->( 1, $self->{mpd_buffer} );
111 0         0 $self->{mpd_buffer} = [[]];
112             }
113             }
114             elsif ($line =~ /^list_OK/) {
115 0         0 push @{$self->{mpd_buffer}}, [];
  0         0  
116             }
117             elsif ($line =~ /^ACK/) {
118 0         0 $self->shift_read->( 0, $line );
119 0         0 $self->{mpd_buffer} = [[]];
120 0         0 last;
121             }
122             else {
123 0         0 push @{$self->{mpd_buffer}[-1]}, $line;
  0         0  
124             }
125             }
126             }
127              
128 0         0 return 0;
129 0         0 };
130             }
131              
132             # Set up response parsers for each command
133             my $parsers = { none => sub { @_ } };
134             {
135             my $item = sub {
136             return {
137             map {
138             my ($key, $value) = split /: /, $_, 2;
139             $key => $value;
140             } @{$_[0]}
141             };
142             };
143              
144             my $flat_list = sub { [ map { (split /: /, $_, 2)[1] } @{$_[0]} ] };
145              
146             my $base_list = sub {
147             my @main_keys = @{shift()};
148             my @list_keys = @{shift()};
149             my @lines = @{shift()};
150              
151             my @return;
152             my $item = {};
153              
154             foreach my $line (@lines) {
155             my ($key, $value) = split /: /, $line, 2;
156              
157             if ( grep { /$key/ } @main_keys ) {
158             push @return, $item if defined $item->{$key};
159             $item = { $key => $value };
160             }
161             elsif ( grep { /$key/ } @list_keys ) {
162             unless (defined $item->{$key}) {
163             $item->{$key} = []
164             }
165             push @{$item->{$key}}, $value;
166             }
167             else {
168             $item->{$key} = $value;
169             }
170             }
171             push @return, $item if keys %{$item};
172              
173             return \@return;
174             };
175              
176             my $grouped_list = sub {
177             my @lines = @{shift()};
178              
179             # Our main category comes at the top of the list of lines
180             my ($main) = split /:\s+/, $lines[0], 2;
181              
182             # Make a list of any other categories we might have
183             my @categories;
184             foreach (@lines) {
185             my ($key) = split /:\s+/, $_, 2;
186             if ($key eq $main) {
187             last if @categories;
188             }
189             else {
190             push @categories, $key;
191             };
192             }
193              
194             my $return = {};
195              
196             while (@lines) {
197             # Generate a has with all the data returned for a single item
198             # This will be over several lines if we are grouping
199             my $item = do {
200             my $set;
201             my %missing_keys = map { $_ => 1 } $main, @categories;
202              
203             while ( my $line = shift @lines ) {
204             my ($key, $value) = split /:\s+/, $line, 2;
205              
206             $set->{$key} = $value;
207             delete $missing_keys{$key};
208              
209             last unless %missing_keys;
210             }
211              
212             $set;
213             };
214              
215             # Find or create the array of results we need to push the data into
216             my $pointer = $return;
217             for my $category (@categories) {
218             my $value = $item->{$category} // '';
219             $pointer = $pointer->{$category}{$value} //= {};
220             }
221              
222             push @{ $pointer->{$main} //= [] }, delete $item->{$main};
223             }
224              
225             return $return;
226             };
227              
228             # Untested commands: what do they return?
229             # consume
230             # crossfade
231              
232             my $file_list = sub { $base_list->( [qw( directory file )], [], @_ ) };
233              
234             $parsers->{$_} = $flat_list foreach qw(
235             commands notcommands channels tagtypes urlhandlers listplaylist
236             );
237              
238             $parsers->{$_} = $item foreach qw(
239             currentsong stats idle status addid update
240             readcomments replay_gain_status rescan
241             );
242              
243             $parsers->{$_} = $file_list foreach qw(
244             find playlistinfo listallinfo search find playlistid playlistfind
245             listfiles plchanges listplaylistinfo playlistsearch listfind
246             );
247              
248             $parsers->{list} = $grouped_list;
249              
250             foreach (
251             [ outputs => [qw( outputid )], [] ],
252             [ plchangesposid => [qw( cpos )], [] ],
253             [ listplaylists => [qw( playlist )], [] ],
254             [ listmounts => [qw( mount )], [] ],
255             [ listneighbors => [qw( neighbor )], [] ],
256             [ listall => [qw( directory )], [qw( file )] ],
257             [ readmessages => [qw( channel )], [qw( message )] ],
258             [ lsinfo => [qw( directory file playlist )], [] ],
259             [ decoders => [qw( plugin )], [qw( suffix mime_type )] ],
260             ) {
261              
262             my ($cmd, $header, $list) = @{$_};
263             $parsers->{$cmd} = sub { $base_list->( $header, $list, @_ ) };
264             }
265              
266             $parsers->{playlist} = sub {
267             my $lines = [ map { s/^\w*?://; $_ } @{shift()} ];
268             $flat_list->( $lines, @_ )
269             };
270              
271             $parsers->{count} = sub {
272             my $lines = shift;
273             my ($main) = split /:\s+/, $lines->[0], 2;
274             $base_list->( [ $main ], [qw( )], $lines, @_ )
275             };
276              
277             $parsers->{sticker} = sub {
278             my $lines = shift;
279             return {} unless scalar @{$lines};
280              
281             my $single = ($lines->[0] !~ /^file/);
282              
283             my $base = $base_list->( [qw( file )], [qw( sticker )], $lines, @_ );
284             my $return = [ map {
285             $_->{sticker} = { map { split(/=/, $_, 2) } @{$_->{sticker}} }; $_;
286             } @{$base} ];
287              
288             return $single ? $return->[0] : $return;
289             };
290             }
291              
292             sub idle {
293 0     0 1 0 my ($self, @subsystems) = @_;
294              
295 0         0 $self->{idle_future} = $self->loop->new_future;
296              
297 0         0 weaken $self;
298             repeat {
299             $self->send( idle => @subsystems, sub {
300 0         0 $self->emit( shift->{changed} );
301 0     0   0 });
302 0     0   0 } until => sub { $self->{idle_future}->is_ready };
  0         0  
303              
304 0         0 return $self->{idle_future};
305             }
306              
307             sub noidle {
308 0     0 1 0 my ($self) = @_;
309              
310 0         0 my $idle = $self->{idle_future};
311 0 0       0 return unless defined $idle;
312 0 0       0 return if $idle->is_ready;
313              
314 0         0 $self->send( 'noidle' );
315 0         0 $idle->done;
316              
317 0         0 return;
318             }
319              
320             sub send {
321 0     0 1 0 my $self = shift;
322              
323 0 0       0 my $opt = ( ref $_[0] eq 'HASH' ) ? shift : {};
324 0 0       0 my $cb = pop if ref $_[-1] eq 'CODE';
325 0         0 my (@commands) = @_;
326              
327 0 0       0 croak 'Need commands to send'
328             unless @commands;
329              
330             # Normalise input
331 0 0       0 if (ref $commands[0] eq 'ARRAY') {
332             @commands = map {
333 0 0       0 ( ref $_ eq 'ARRAY' ) ? join( q{ }, @{$_} ) : $_;
  0         0  
334 0         0 } @{$commands[0]};
  0         0  
335             }
336             else {
337 0         0 @commands = join q{ }, @commands;
338             }
339              
340 0         0 my $command = '';
341             # Remove underscores from (most) command names
342             @commands = map {
343 0         0 my $args;
  0         0  
344 0         0 ($command, $args) = split /\s/, $_, 2;
345 0 0       0 $command =~ s/_//g unless $command =~ /^(replay_gain_|command_list)/;
346 0   0     0 $args //= q{};
347 0 0       0 $command . ($args ne q{} ? " $args" : q{});
348             } @commands;
349              
350             # Ensure a command list if sending multiple commands
351 0 0       0 if (scalar @commands > 1) {
352 0   0     0 my $list = $opt->{list} // 1;
353 0 0       0 my $list_start =
354             'command_list' . ( $list ? '_ok' : q{} ) . '_begin';
355              
356 0 0       0 unshift @commands, $list_start
357             unless $commands[0] =~ /^command_list/;
358 0 0       0 push @commands, 'command_list_end'
359             unless $commands[-1] =~ /^command_list/;
360             }
361              
362 0         0 my $parser;
363              
364 0 0       0 if (defined $opt->{parser}) {
365 0         0 my $input = delete $opt->{parser};
366 0 0       0 $parser = (ref $input eq 'CODE') ? $input : $parsers->{$input};
367 0 0       0 croak 'Not a code reference or recognised parser name'
368             unless defined $parser;
369             }
370             else {
371             $parser = sub {
372 0     0   0 my ($input, $commands) = @_;
373              
374             my @result = map {
375 0         0 my $command;
376 0   0     0 do { $command = shift @{$commands} }
  0         0  
  0         0  
377             until !defined $command or $command !~ /^command_list/;
378              
379 0   0     0 my $sub = $parsers->{$command // ''} // $parsers->{none};
      0        
380              
381 0         0 $sub->( $input->[$_] );
382 0         0 } 0 .. $#{$input};
  0         0  
383              
384             return @result
385 0         0 };
  0         0  
386             }
387              
388 0         0 my $future = $self->loop->new_future;
389 0 0       0 $future->on_done( $cb ) if $cb;
390              
391             return $future->fail('No connection to MPD server' )
392 0 0       0 unless $self->{mpd_handle};
393              
394             $self->push_read( sub {
395 0     0   0 my ($success, $result) = @_;
396              
397 0 0       0 if ($success) {
398             $future->done( $parser->(
399 0         0 $result, [ map { my ($name) = split /\s/, $_, 2 } @commands ]
  0         0  
400             ));
401             }
402             else {
403 0         0 $self->emit( error => $result );
404 0         0 $future->fail( $result );
405             }
406 0         0 });
407              
408 0         0 $log->tracef( '> %s', $_ ) foreach @commands;
409 0         0 $self->{mpd_handle}->write( join("\n", @commands) . "\n" );
410              
411 0         0 return $future;
412             }
413              
414 0     0 1 0 sub get { shift->send( @_ )->get }
415              
416             sub until {
417 0     0 1 0 my ($self, $name, $check, $cb) = @_;
418              
419 0         0 weaken $self;
420 0         0 my $wrapper;
421             $wrapper = sub {
422 0 0   0   0 if ($check->(@_)) {
423 0         0 $self->unsubscribe($name => $wrapper);
424 0         0 $cb->(@_);
425             }
426 0         0 };
427 0         0 $self->on($name => $wrapper);
428 0         0 weaken $wrapper;
429              
430 0         0 return $wrapper;
431             }
432              
433             sub BUILD {
434 1     1 0 5283 my ($self, $args) = @_;
435 1 50       6 $self->connect->get if $self->auto_connect;
436 1     0   9 $self->catch( sub {} );
437 1         20 $self->{mpd_buffer} = [[]];
438             }
439              
440             sub connect {
441 0     0 1   my ($self) = @_;
442 0           my $loop = $self->loop;
443 0           my $connected = $loop->new_future;
444              
445 0 0         return $connected->done if $self->state eq 'ready';
446              
447 0 0         my $socket = IO::Socket::IP->new($self->host . ':' . $self->port)
448             or return $connected->fail("MPD connect failed: $!");
449              
450 0           $log->debugf('Connecting to %s:%s', $self->host, $self->port);
451              
452 0     0     my $on_error = sub { $self->emit( error => shift ) };
  0            
453              
454             my $handle = IO::Async::Stream->new(
455             handle => $socket,
456 0     0     on_read_error => sub { $on_error->('Read error: ' . shift) },
457 0     0     on_write_error => sub { $on_error->('Write error: ' . shift) },
458 0     0     on_read_eof => sub { shift->close },
459             on_closed => sub {
460 0     0     $self->{mpd_handle} = undef;
461 0           $self->emit( 'close' );
462             },
463 0           on_read => $self->_parse_block,
464             );
465              
466 0 0         unless ($self->{mpd_handle}) {
467 0           $self->{mpd_handle} = $handle;
468 0           $loop->add( $handle );
469             }
470              
471             $self->until( state =>
472 0     0     sub { $_[1] eq 'ready' },
473 0     0     sub { $connected->done; }
474 0           );
475              
476 0           return $connected;
477             }
478              
479             1;
480              
481             __END__