File Coverage

blib/lib/Finance/BitStamp/Socket.pm
Criterion Covered Total %
statement 75 198 37.8
branch 0 32 0.0
condition 0 3 0.0
subroutine 25 58 43.1
pod 0 28 0.0
total 100 319 31.3


line stmt bran cond sub pod time code
1             package Finance::BitStamp::Socket;
2              
3 1     1   25222 use 5.014002;
  1         4  
  1         39  
4 1     1   6 use strict;
  1         2  
  1         35  
5 1     1   5 use warnings;
  1         15  
  1         51  
6             our $VERSION = '0.01';
7              
8 1     1   987 use AnyEvent::Socket;
  1         47952  
  1         156  
9 1     1   1313 use AnyEvent::Handle;
  1         18191  
  1         43  
10 1     1   1026 use Protocol::WebSocket::Handshake::Client;
  1         50422  
  1         37  
11 1     1   10 use Protocol::WebSocket::Frame;
  1         2  
  1         19  
12              
13 1     1   1119 use JSON;
  1         18435  
  1         7  
14 1     1   1126 use URI;
  1         7578  
  1         36  
15 1     1   1135 use Data::Dumper;
  1         7831  
  1         78  
16              
17 1     1   11 use constant VERBOSE => 1;
  1         2  
  1         54  
18 1     1   6 use constant DEBUG => 0;
  1         2  
  1         44  
19              
20 1         60 use constant CHANNELS => qw(
21             order_book
22             live_trades
23 1     1   5 );
  1         1  
24              
25 1     1   48 use constant ATTRIBUTES => qw(protocol app_key channels ssl);
  1         2  
  1         49  
26 1     1   5 use constant PROTOCOL => 6;
  1         2  
  1         171  
27 1     1   5 use constant APP_KEY => 'de504dc5763aeef9ff52';
  1         3  
  1         41  
28              
29             # TODO: test SSL. This didnt seem to work for me last time I set it...
30 1     1   5 use constant SSL => 0;
  1         2  
  1         38  
31              
32             # cleartext...
33 1     1   5 use constant PORT => 80;
  1         2  
  1         44  
34 1     1   5 use constant SCHEME => 'ws';
  1         2  
  1         38  
35 1     1   4 use constant TLS => undef;
  1         2  
  1         53  
36 1     1   6 use constant HOST => 'ws.pusherapp.com';
  1         1  
  1         40  
37              
38             # ssl...
39 1     1   5 use constant SSL_PORT => 443;
  1         2  
  1         57  
40 1     1   6 use constant SSL_SCHEME => 'wws';
  1         1  
  1         38  
41 1     1   5 use constant SSL_TLS => 'connect';
  1         3  
  1         56  
42 1     1   6 use constant SSL_HOST => 'wws.pusherapp.com';
  1         2  
  1         2264  
43              
44             # THESE two methods: trade() and order_book() are the main methods you will want to move and rewrite into your own module.
45             # within these subroutines you will have access to the json response in a hash format.
46             sub trade {
47 0     0 0   my $self = shift;
48 0           my $data = shift;
49 0           warn Data::Dumper->Dump([$data]);
50 0           warn "*** I am the default trade()... you should overwride this method in your own package\n";
51             }
52              
53             sub order_book {
54 0     0 0   my $self = shift;
55 0           my $data = shift;
56 0           warn Data::Dumper->Dump([$data]);
57 0           warn "** I am the default order_book()... you should overwride this method in your own package\n";
58             }
59             # end the methods you should definately override.
60              
61              
62             # This module is meant to be used as a base for your own module.
63             # Your own module will decide what to do with the incoming message through the
64             # trade() and order_book() routines.
65             #
66             # You should look at "test.pl" to see a basic example.
67              
68 0     0 0   sub new { (bless {} => shift)->init(@_) }
69              
70             sub init {
71 0     0 0   my $self = shift;
72 0           my %args = @_;
73 0           foreach my $attribute ($self->attributes) {
74 0 0         $self->$attribute($args{$attribute}) if exists $args{$attribute};
75             }
76 0           return $self;
77             }
78              
79             sub setup {
80 0     0 0   my $self = shift;
81 0 0         $self->channels([CHANNELS]) unless $self->channels;
82 0 0         $self->protocol( PROTOCOL ) unless $self->protocol;
83 0 0         $self->app_key ( APP_KEY ) unless $self->app_key;
84 0 0         $self->ssl ( SSL ) unless $self->ssl;
85             }
86              
87             sub go {
88 0     0 0   my $self = shift;
89 0           $self->setup;
90 0           $self->handle;
91 0           $self->wait;
92             }
93              
94             sub handle {
95 0     0 0   my $self = shift;
96 0           $self->client(Protocol::WebSocket::Handshake::Client->new(url => $self->uri->as_string));
97 0           $self->frame(Protocol::WebSocket::Frame->new);
98 0           $self->{handle} = AnyEvent::Handle->new(
99             connect => [$self->host, $self->port],
100             tls => $self->tls,
101             tls_ctx => {verify => 0},
102             keepalive => 1,
103             wtimeout => 50,
104             on_connect => $self->on_connect,
105             on_read => $self->on_read,
106             on_wtimeout => $self->on_wtimeout,
107             on_error => $self->on_error,
108             on_eof => $self->on_eof,
109             );
110             }
111              
112             sub on_read {
113 0     0 0   my $self = shift;
114             return sub {
115 0     0     my $handle = shift;
116 0           my $chunk = $handle->{rbuf};
117 0           $handle->{rbuf} = undef;
118 0 0         if (!$self->client->is_done) {
119 0           $self->client->parse($chunk);
120             }
121              
122 0           $self->frame->append($chunk);
123 0 0         if ($self->frame->is_ping()) {
124 0           $handle->push_write(
125             $self->frame->new(buffer => '', type => 'pong')->to_bytes
126             );
127             }
128 0           while (my $msg = $self->frame->next) {
129 0           my $d;
130             eval {
131 0           $d = $self->json->decode($msg);
132 0 0         } or do {
133 0           my $e = $@;
134 0           warn $self->now . ' - error: ' . $e;
135 0           next;
136             };
137 0           given ($d->{event}) {
138              
139 0           when ('pusher:connection_established') {
140 0           say $self->now . ' - subscribing to events' if VERBOSE;
141 0           foreach my $channel (@{$self->channels}) {
  0            
142 0           say $self->now . ' - requesting channel: ' . $channel if VERBOSE;
143 0           $handle->push_write(
144             $self->frame->new($self->json->encode({
145             event => 'pusher:subscribe',
146             data => {
147             channel => $channel,
148             },
149             }))->to_bytes
150             );
151             }
152             }
153 0           when ('pusher_internal:subscription_succeeded') {
154 0           printf("%s - subscribed to channel: %s\n", $self->now, $d->{channel}) if VERBOSE;
155             }
156              
157              
158 0           when ('trade') {
159 0           printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)}) if VERBOSE;
  0            
160 0 0         if ($d->{channel} eq 'live_trades') {
161 0           my $data = $self->json->decode($d->{data});
162 0           $self->trade($data);
163             }
164             else {
165 0           printf "%s - got event: %s", $self->now, Dumper $d if VERBOSE;
166             }
167             }
168 0           when ('data') {
169 0           printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)}) if VERBOSE;
  0            
170 0 0         if ($d->{channel} eq 'order_book') {
171 0           my $data = $self->json->decode($d->{data});
172 0           $self->order_book($data);
173             }
174             else {
175 0           printf '%s - got event: %s', $self->now, Dumper $d if VERBOSE;
176             }
177             }
178              
179 0           default {
180 0           printf '%s - got event: %s', $self->now, Dumper $d if VERBOSE;
181             }
182             }
183             }
184             }
185 0           }
186              
187             sub on_connect {
188 0     0 0   my $self = shift;
189             return sub {
190 0     0     my $handle = shift;
191 0           say $self->now . ' - connected to pusher' if VERBOSE;
192 0           $handle->push_write($self->client->to_string);
193             }
194 0           }
195              
196             sub on_wtimeout {
197 0     0 0   my $self = shift;
198             return sub {
199 0     0     my $handle = shift;
200 0           $handle->push_write(
201             $self->frame->new(buffer => '', type => 'ping')->to_bytes
202             );
203             }
204 0           }
205              
206             sub on_error {
207 0     0 0   my $self = shift;
208             return sub {
209 0     0     my ($handle, $fatal, $msg) = @_;
210 0           warn $self->now . " - fatal($fatal): $msg" if VERBOSE or DEBUG;
211 0           $handle->destroy;
212 0           $self->setup;
213             }
214 0           }
215              
216             sub on_eof {
217 0     0 0   my $self = shift;
218             return sub {
219 0     0     my $handle = shift;
220 0           warn $self->now . " - lost connection, reconnecting" if VERBOSE or DEBUG;
221 0           $self->setup;
222             }
223 0           }
224              
225 0     0 0   sub attributes { ATTRIBUTES }
226 0     0 0   sub wait { AnyEvent->condvar->wait }
227 0   0 0 0   sub json { shift->{json} ||= JSON->new }
228 0 0   0 0   sub host { shift->ssl ? SSL_HOST : HOST }
229 0 0   0 0   sub port { shift->ssl ? SSL_PORT : PORT }
230 0 0   0 0   sub tls { shift->ssl ? SSL_TLS : TLS }
231 0 0   0 0   sub scheme { shift->ssl ? SSL_SCHEME : SCHEME }
232 0     0 0   sub client { my $self = shift; $self->get_set(@_) }
  0            
233 0     0 0   sub frame { my $self = shift; $self->get_set(@_) }
  0            
234 0     0 0   sub channels { my $self = shift; $self->get_set(@_) }
  0            
235 0     0 0   sub protocol { my $self = shift; $self->get_set(@_) }
  0            
236 0     0 0   sub app_key { my $self = shift; $self->get_set(@_) }
  0            
237 0     0 0   sub ssl { my $self = shift; $self->get_set(@_) }
  0            
238 0     0 0   sub now { sprintf '%4d-%02d-%02d %02d:%02d:%02d', (localtime(time))[5] + 1900, (localtime(time))[4,3,2,1,0] }
239              
240             sub get_set {
241 0     0 0   my $self = shift;
242 0           my $attribute = ((caller(1))[3] =~ /::(\w+)$/)[0];
243 0 0         $self->{$attribute} = shift if scalar @_;
244 0           return $self->{$attribute};
245             }
246              
247             sub uri {
248 0     0 0   my $self = shift;
249 0 0         unless ($self->{uri}) {
250 0           my $uri = URI->new;
251 0           $uri->scheme('http');
252 0           $uri->host($self->host);
253 0           $uri->path(sprintf '/app/%s' => $self->app_key);
254 0           $uri->query_form(protocol => $self->protocol);
255 0           $uri->scheme($self->scheme);
256 0           $self->{uri} = $uri;
257             }
258 0           return $self->{uri};
259             }
260              
261              
262              
263             1;
264              
265             __END__