File Coverage

lib/Finance/Alpaca/TradeStream.pm
Criterion Covered Total %
statement 54 69 78.2
branch 7 10 70.0
condition n/a
subroutine 11 13 84.6
pod 0 3 0.0
total 72 95 75.7


line stmt bran cond sub pod time code
1             package Finance::Alpaca::TradeStream 0.9904 {
2 17     17   115 use strictures 2;
  17         131  
  17         652  
3 17     17   3377 use Moo;
  17         30  
  17         96  
4 17     17   5069 use feature 'signatures';
  17         39  
  17         1347  
5 17     17   94 no warnings 'experimental::signatures';
  17         30  
  17         505  
6 17     17   79 use Types::Standard qw[ArrayRef CodeRef Dict Enum InstanceOf Str];
  17         26  
  17         140  
7 17     17   21236 use Mojo::Promise;
  17         30  
  17         183  
8             #
9 17     17   476 use lib './lib/';
  17         24  
  17         115  
10 17     17   8621 use Finance::Alpaca::Struct::TradeUpdate qw[to_TradeUpdate];
  17         49  
  17         172  
11             #
12             has tx => ( is => 'rwp', isa => InstanceOf ['Mojo::Transaction::WebSocket'], predicate => 1 );
13             has cb => ( is => 'ro', isa => CodeRef, required => 1 );
14             has subscriptions => ( is => 'rwp', isa => ArrayRef [Str], default => sub { [] }, lazy => 1 );
15              
16 1     1 0 34 sub authorize ( $s, $ua, $keys, $paper ) {
  1         3  
  1         2  
  1         2  
  1         3  
  1         2  
17              
18             #use Data::Dump;
19             #ddx $keys;
20 1         6 $ua->inactivity_timeout(120); # XXX - Testing!
21              
22             # warn(
23             # $paper ? 'wss://paper-api.alpaca.markets/stream' : 'wss://api.alpaca.markets/stream' );
24 1         3 $ua->websocket_p(
25             (
26             $paper
27             ? 'wss://paper-api.alpaca.markets/stream'
28             : 'wss://api.alpaca.markets/stream'
29             )
30             )->then(
31 1     1   2 sub ($tx) {
  1         355045  
32 1         9 my $promise = Mojo::Promise->new;
33 1         76 $s->_set_tx($tx);
34              
35             #$tx->on( finish => sub { $promise->resolve } );
36             # my $promise = Mojo::Promise->new;
37             #$tx->on( finish => sub { $promise->resolve } );
38             $tx->on(
39 0         0 finish => sub ( $tx, $code, $reason = '' ) {
40 0         0 warn "WebSocket closed with status $code. $reason";
41 0         0 $promise->resolve;
42             }
43 1         54 );
44 1         12 $tx->on( error => sub ( $e, $err ) { warn "This looks bad: $err" } );
  0         0  
  0         0  
45             $tx->on(
46 4         8 json => sub ( $tx, $msg = () ) {
47 4 100       32 if ( $msg->{stream} eq 'authorization' ) {
    100          
    50          
48 1 50       8 if ( $msg->{data}{status} eq 'authorized' ) {
49 1         7 $s->subscribe( streams => ['trade_updates'] );
50 1         397 $promise->resolve();
51             }
52             }
53             elsif ( $msg->{stream} eq 'listening' ) {
54 1         43 $s->_set_subscriptions( $msg->{data}{streams} );
55             }
56             elsif ( $msg->{stream} eq 'trade_updates' ) {
57 2         13 $s->cb->( to_TradeUpdate( $msg->{data} ) );
58             }
59             else {
60             #warn 'unknown data';
61             #...;
62 0         0 $s->cb->($msg);
63             }
64              
65             #$tx->finish;
66             }
67 1         11 );
68 1         17 $tx->send(
69             {
70             json => {
71             action => 'authenticate',
72             data => { key_id => $keys->[0], secret_key => $keys->[1] }
73             }
74             }
75             );
76 1         411 return $promise;
77             }
78 0         0 )->catch(
79 0     0   0 sub ($err) {
  0         0  
80 0         0 warn "WebSocket error: $err";
81             }
82 1 50       14 );
83             }
84              
85 1     1 0 3 sub subscribe ( $s, %params ) {
  1         3  
  1         4  
  1         2  
86 1         13 $s->tx->send( { json => { action => 'listen', data => {%params} } } );
87             }
88              
89 0     0 0   sub unsubscribe ( $s, %params ) { # XXX - Grep current list
  0            
  0            
  0            
90 0           $s->tx->send( { json => { action => 'listen', data => {%params} } } );
91             }
92             }
93             1;
94             __END__