File Coverage

blib/lib/Finance/Bitcoin/Feed/Pusher.pm
Criterion Covered Total %
statement 81 193 41.9
branch 3 62 4.8
condition 0 13 0.0
subroutine 28 58 48.2
pod 0 30 0.0
total 112 356 31.4


line stmt bran cond sub pod time code
1             package Finance::Bitcoin::Feed::Pusher;
2              
3 1     1   324 use strict;
  1         1  
  1         24  
4 1     1   3 use warnings;
  1         1  
  1         77  
5             our $VERSION = '0.01';
6 1     1   4 use feature qw(say);
  1         1  
  1         92  
7              
8 1     1   451 use AnyEvent::Socket;
  1         23614  
  1         106  
9 1     1   674 use AnyEvent::Handle;
  1         5083  
  1         39  
10 1     1   483 use Protocol::WebSocket::Handshake::Client;
  1         60329  
  1         36  
11 1     1   8 use Protocol::WebSocket::Frame;
  1         2  
  1         19  
12              
13 1     1   655 use JSON;
  1         12773  
  1         4  
14 1     1   551 use URI;
  1         3787  
  1         32  
15 1     1   620 use Data::Dumper;
  1         5928  
  1         71  
16              
17 1         51 use constant CHANNELS => qw(
18             order_book
19             live_trades
20 1     1   7 );
  1         1  
21              
22 1     1   4 use constant ATTRIBUTES => qw(protocol app_key channels ssl);
  1         1  
  1         51  
23 1     1   6 use constant PROTOCOL => 6;
  1         1  
  1         59  
24 1     1   6 use constant APP_KEY => 'de504dc5763aeef9ff52';
  1         1  
  1         55  
25              
26             # TODO: test SSL. This didnt seem to work for me last time I set it...
27 1     1   3 use constant SSL => 0;
  1         2  
  1         35  
28              
29             # cleartext...
30 1     1   4 use constant PORT => 80;
  1         1  
  1         27  
31 1     1   3 use constant SCHEME => 'ws';
  1         1  
  1         28  
32 1     1   4 use constant TLS => undef;
  1         2  
  1         53  
33 1     1   6 use constant HOST => 'ws.pusherapp.com';
  1         1  
  1         58  
34              
35             # ssl...
36 1     1   6 use constant SSL_PORT => 443;
  1         1  
  1         45  
37 1     1   4 use constant SSL_SCHEME => 'wws';
  1         1  
  1         40  
38 1     1   5 use constant SSL_TLS => 'connect';
  1         1  
  1         131  
39 1     1   6 use constant SSL_HOST => 'wws.pusherapp.com';
  1         2  
  1         2125  
40              
41             sub VERBOSE {
42 0   0 0 0 0 return $ENV{DEBUG} || 0;
43             }
44              
45             sub DEBUG {
46 0   0 0 0 0 return $ENV{DEBUG} || 0;
47             }
48              
49             # THESE two methods: trade() and order_book() are the main methods you will want to move and rewrite into your own module.
50             # within these subroutines you will have access to the json response in a hash format.
51             sub trade {
52 0     0 0 0 my $self = shift;
53 0         0 my $data = shift;
54 0         0 warn Data::Dumper->Dump([$data]);
55 0         0 warn "*** I am the default trade()... you should overwride this method in your own package\n";
56 0         0 return;
57             }
58              
59             sub order_book {
60 0     0 0 0 my $self = shift;
61 0         0 my $data = shift;
62 0         0 warn Data::Dumper->Dump([$data]);
63 0         0 warn "** I am the default order_book()... you should overwride this method in your own package\n";
64 0         0 return;
65             }
66              
67             # end the methods you should definately override.
68              
69             # This module is meant to be used as a base for your own module.
70             # Your own module will decide what to do with the incoming message through the
71             # trade() and order_book() routines.
72             #
73             # You should look at "test.pl" to see a basic example.
74              
75 1     1 0 11 sub new { return (bless {} => shift)->init(@_) }
76              
77             sub init {
78 1     1 0 3 my ($self, %args) = @_;
79 1         7 foreach my $attribute ($self->attributes) {
80 4 100       16 $self->$attribute($args{$attribute}) if exists $args{$attribute};
81             }
82 1         3 return $self;
83             }
84              
85             sub setup {
86 0     0 0 0 my $self = shift;
87 0 0       0 $self->channels([CHANNELS]) unless $self->channels;
88 0 0       0 $self->protocol(PROTOCOL) unless $self->protocol;
89 0 0       0 $self->app_key(APP_KEY) unless $self->app_key;
90 0 0       0 $self->ssl(SSL) unless $self->ssl;
91 0         0 return;
92             }
93              
94             sub go {
95 0     0 0 0 my $self = shift;
96 0         0 $self->setup;
97 0         0 $self->handle;
98 0         0 $self->just_wait;
99 0         0 return;
100             }
101              
102             sub handle {
103 0     0 0 0 my $self = shift;
104 0         0 $self->client(Protocol::WebSocket::Handshake::Client->new(url => $self->uri->as_string));
105 0         0 $self->frame(Protocol::WebSocket::Frame->new);
106 0         0 $self->{handle} = AnyEvent::Handle->new(
107             connect => [$self->host, $self->port],
108             tls => $self->tls,
109             tls_ctx => {verify => 0},
110             keepalive => 1,
111             wtimeout => 50,
112             on_connect => $self->on_connect,
113             on_read => $self->on_read,
114             on_wtimeout => $self->on_wtimeout,
115             on_error => $self->on_error,
116             on_eof => $self->on_eof,
117             );
118 0         0 return;
119             }
120              
121             sub on_read {
122 0     0 0 0 my $self = shift;
123             return sub {
124 0     0   0 my $handle = shift;
125 0         0 my $chunk = $handle->{rbuf};
126 0         0 $handle->{rbuf} = undef;
127 0 0       0 if (!$self->client->is_done) {
128 0         0 $self->client->parse($chunk);
129             }
130              
131 0         0 $self->frame->append($chunk);
132 0 0       0 if ($self->frame->is_ping()) {
133 0         0 $handle->push_write(
134             $self->frame->new(
135             buffer => '',
136             type => 'pong'
137             )->to_bytes
138             );
139             }
140 0         0 while (my $msg = $self->frame->next) {
141 0         0 my $d;
142 0 0       0 eval { $d = $self->json->decode($msg); } or do {
  0         0  
143 0         0 my $e = $@;
144 0         0 warn $self->now . ' - error: ' . $e;
145 0         0 next;
146             };
147              
148 0 0       0 if ($d->{event} eq 'pusher:connection_established') {
    0          
    0          
    0          
149 0 0       0 say $self->now . ' - subscribing to events' if VERBOSE;
150 0         0 foreach my $channel (@{$self->channels}) {
  0         0  
151 0 0       0 say $self->now . ' - requesting channel: ' . $channel
152             if VERBOSE;
153 0         0 $handle->push_write(
154             $self->frame->new(
155             $self->json->encode({
156             event => 'pusher:subscribe',
157             data => {
158             channel => $channel,
159             },
160             })
161             )->to_bytes
162             );
163             }
164             } elsif ($d->{event} eq 'pusher_internal:subscription_succeeded') {
165 0 0       0 printf("%s - subscribed to channel: %s\n", $self->now, $d->{channel})
166             if VERBOSE;
167             }
168              
169             elsif ($d->{event} eq 'trade') {
170 0 0       0 printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)})
  0         0  
171             if VERBOSE;
172 0 0       0 if ($d->{channel} eq 'live_trades') {
173 0         0 my $data = $self->json->decode($d->{data});
174 0         0 $self->trade($data);
175             } else {
176 0 0       0 printf "%s - got event: %s", $self->now, Dumper $d
177             if VERBOSE;
178             }
179             } elsif ($d->{event} eq 'data') {
180 0 0       0 printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)})
  0         0  
181             if VERBOSE;
182 0 0       0 if ($d->{channel} eq 'order_book') {
183 0         0 my $data = $self->json->decode($d->{data});
184 0         0 $self->order_book($data);
185             } else {
186 0 0       0 printf '%s - got event: %s', $self->now, Dumper $d
187             if VERBOSE;
188             }
189             }
190              
191             else {
192 0 0       0 printf '%s - got event: %s', $self->now, Dumper $d if VERBOSE;
193             }
194              
195             }
196             }
197 0         0 }
198              
199             sub on_connect {
200 0     0 0 0 my $self = shift;
201             return sub {
202 0     0   0 my $handle = shift;
203 0 0       0 say $self->now . ' - connected to pusher' if VERBOSE;
204 0         0 $handle->push_write($self->client->to_string);
205             }
206 0         0 }
207              
208             sub on_wtimeout {
209 0     0 0 0 my $self = shift;
210             return sub {
211 0     0   0 my $handle = shift;
212 0         0 $handle->push_write(
213             $self->frame->new(
214             buffer => '',
215             type => 'ping'
216             )->to_bytes
217             );
218             }
219 0         0 }
220              
221             sub on_error {
222 0     0 0 0 my $self = shift;
223             return sub {
224 0     0   0 my ($handle, $fatal, $msg) = @_;
225 0 0 0     0 warn $self->now . " - fatal($fatal): $msg" if VERBOSE or DEBUG;
226 0         0 $handle->destroy;
227 0         0 $self->setup;
228             }
229 0         0 }
230              
231             sub on_eof {
232 0     0 0 0 my $self = shift;
233             return sub {
234 0     0   0 my $handle = shift;
235 0 0 0     0 warn $self->now . " - lost connection, reconnecting"
236             if VERBOSE or DEBUG;
237 0         0 $self->setup;
238             }
239 0         0 }
240              
241 1     1 0 5 sub attributes { return ATTRIBUTES }
242 0     0 0 0 sub just_wait { return AnyEvent->condvar->wait }
243 0   0 0 0 0 sub json { return shift->{json} ||= JSON->new }
244 0 0   0 0 0 sub host { return shift->ssl ? SSL_HOST : HOST }
245 0 0   0 0 0 sub port { return shift->ssl ? SSL_PORT : PORT }
246 0 0   0 0 0 sub tls { return shift->ssl ? SSL_TLS : TLS }
247 0 0   0 0 0 sub scheme { return shift->ssl ? SSL_SCHEME : SCHEME }
248 0     0 0 0 sub client { my $self = shift; return $self->get_set(@_) }
  0         0  
249 0     0 0 0 sub frame { my $self = shift; return $self->get_set(@_) }
  0         0  
250 1     1 0 2 sub channels { my $self = shift; return $self->get_set(@_) }
  1         7  
251 0     0 0 0 sub protocol { my $self = shift; return $self->get_set(@_) }
  0         0  
252 0     0 0 0 sub app_key { my $self = shift; return $self->get_set(@_) }
  0         0  
253 0     0 0 0 sub ssl { my $self = shift; return $self->get_set(@_) }
  0         0  
254              
255             sub now {
256 0     0 0 0 return sprintf '%4d-%02d-%02d %02d:%02d:%02d', (localtime(time))[5] + 1900, (localtime(time))[4, 3, 2, 1, 0];
257             }
258              
259             sub get_set {
260 1     1 0 2 my ($self, $v) = @_;
261 1         18 my $attribute = ((caller(1))[3] =~ /::(\w+)$/)[0];
262 1 50       40 $self->{$attribute} = $v if $v;
263 1         3 return $self->{$attribute};
264             }
265              
266             sub uri {
267 0     0 0   my $self = shift;
268 0 0         unless ($self->{uri}) {
269 0           my $uri = URI->new;
270 0           $uri->scheme('http');
271 0           $uri->host($self->host);
272 0           $uri->path(sprintf '/app/%s' => $self->app_key);
273 0           $uri->query_form(protocol => $self->protocol);
274 0           $uri->scheme($self->scheme);
275 0           $self->{uri} = $uri;
276             }
277 0           return $self->{uri};
278             }
279              
280             1;
281              
282             __END__
283              
284             # Below is stub documentation for your module. You'd better edit it!
285              
286             =head1 NAME
287              
288             This module is extracted from Finance::BitStamp::Socket v0.01.
289             BitStamp::Socket - Perl extension for connecting to the BitStamp exchange
290             socket through the Pusher service.
291              
292             =head1 SYNOPSIS
293              
294             # this will dump the socket messages to the terminal...
295              
296             use BitStamp::Socket;
297             BitStamp::Socket->new->go;
298              
299             ... or just type this at the command prompt:
300              
301             $ perl -e 'use base qw(BitStamp::Socket); main->new->go'
302              
303             =======================
304             But instead do this:
305             =======================
306              
307             use base qw(BitStamp::Socket);
308             main->new->go;
309            
310             sub order_book {
311             my $self = shift;
312             my $data = shift;
313             # I just got new order book socket data
314             # ... your code goes here ... #
315             }
316              
317             sub trade {
318             my $self = shift;
319             my $data = shift;
320             # I just got new trade socket data
321             # ... your code goes here ... #
322             }
323              
324              
325             =head1 DESCRIPTION
326              
327             This module is extracted from Finance::BitStamp::Socket v0.01. Please refer to
328             L<Finance::BitStamp::Socket>
329              
330             I cannot close its constant VERBOSE, so I copied it to my package directly.
331              
332             The BitStamp socket is the fastest any most bandwidth efficient way
333             to maintain your own up to date tracking of all trades and market
334             changes.
335              
336             This module will save you some time since the connection and
337             communication negotiations are done for you. All you need to do
338             is write the code to handle the messages. For example: to store
339             into a database.
340              
341              
342             =head1 SEE ALSO
343              
344             AnyEvent::Socket, AnyEvent::Handle
345              
346             =head1 AUTHOR
347              
348             Jeff Anderson, E<lt>peawormsworth@gmail.comE<gt>
349              
350             =head1 COPYRIGHT AND LICENSE
351              
352             Copyright (C) 2014 by Jeff Anderson
353              
354             This library is free software; you can redistribute it and/or modify
355             it under the same terms as Perl itself, either Perl version 5.14.2 or,
356             at your option, any later version of Perl 5 you may have available.
357              
358              
359             =cut
360