File Coverage

blib/lib/SRS/EPP/Session.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             # vim: filetype=perl:noexpandtab:ts=3:sw=3
2             #
3             # Copyright (C) 2009 NZ Registry Services
4             #
5             # This program is free software: you can redistribute it and/or modify
6             # it under the terms of the Artistic License 2.0 or later. You should
7             # have received a copy of the Artistic License the file COPYING.txt.
8             # If not, see <http://www.perlfoundation.org/artistic_license_2_0>
9              
10             package SRS::EPP::Session;
11             {
12             $SRS::EPP::Session::VERSION = '0.22';
13             }
14              
15             # this object is unfortunately something of a ``God Object'', but
16             # measures are taken to stop that from being awful; mostly delegation
17             # to other objects
18              
19 1     1   3771 use 5.010;
  1         5  
  1         47  
20 1     1   6 use strict;
  1         1  
  1         37  
21              
22 1     1   6 use Moose;
  1         2  
  1         9  
23 1     1   10813 use MooseX::Params::Validate;
  1         3  
  1         11  
24 1     1   429 use Scalar::Util qw(blessed);
  1         3  
  1         73  
25 1     1   374218 use Data::Dumper;
  1         8382  
  1         73  
26 1     1   37 use Carp;
  1         2  
  1         68  
27              
28             with 'MooseX::Log::Log4perl::Easy';
29              
30             # messages that we use
31             # - XML formats
32 1     1   462 use XML::EPP;
  0            
  0            
33             use XML::SRS;
34              
35             # - wrapper classes
36             use SRS::EPP::Command;
37             use SRS::EPP::Response;
38             use SRS::EPP::Response::Error;
39             use SRS::EPP::SRSMessage;
40             use SRS::EPP::SRSRequest;
41             use SRS::EPP::SRSResponse;
42              
43             # queue classes and slave components
44             use SRS::EPP::Packets;
45             use SRS::EPP::Session::CmdQ;
46             use SRS::EPP::Session::BackendQ;
47             use SRS::EPP::Proxy::UA;
48              
49             # other includes
50             use HTTP::Request::Common qw(POST);
51             use bytes qw();
52             use utf8;
53             use Encode qw(decode encode);
54              
55             our %SSL_ERROR;
56              
57             BEGIN {
58             my @errors =
59             qw( NONE ZERO_RETURN WANT_READ WANT_WRITE WANT_CONNECT
60             WANT_X509_LOOKUP SYSCALL SSL );
61             %SSL_ERROR = map { $_ => undef } @errors;
62             }
63             use Net::SSLeay::OO::Constants map {"ERROR_$_"} keys %SSL_ERROR;
64              
65             BEGIN {
66             no strict 'refs';
67             $SSL_ERROR{$_} = &{"ERROR_$_"}
68             for keys %SSL_ERROR;
69             }
70              
71             has io => (
72             is => "ro",
73             isa => "Net::SSLeay::OO::SSL",
74             );
75              
76             # so the socket doesn't fall out of scope and get closed...
77             has 'socket' => (
78             is => "ro",
79             isa => "IO::Handle",
80             );
81              
82             has user => (
83             is => "rw",
84             isa => "Maybe[Str]",
85             );
86              
87             # hack for login message
88             has want_user => (
89             is => "rw",
90             isa => "Str",
91             clearer => "clear_want_user",
92             );
93              
94             # this "State" is the state according to the chart in RFC3730 and is
95             # updated for amusement's sake only
96             has state => (
97             is => "rw",
98             isa => "Str",
99             default => "Waiting for Client",
100             trigger => sub {
101             my $self = shift;
102             if ( $self->has_proxy ) {
103             $self->proxy->show_state(shift, $self);
104             }
105             },
106             );
107              
108             has 'proxy' => (
109             is => "ro",
110             isa => "SRS::EPP::Proxy",
111             predicate => "has_proxy",
112             weak_ref => 1,
113             handles => [qw/openpgp/],
114             required => 1,
115             );
116              
117             # this object is billed with providing an Event.pm-like interface.
118             has event => (
119             is => "ro",
120             required => 1,
121             );
122              
123             has output_event_watcher => (
124             is => "rw",
125             );
126              
127             has input_event_watcher => (
128             is => "rw",
129             );
130              
131             # 'yield' means to queue an event for running but not run it
132             # immediately.
133             has 'yielding' => (
134             is => "ro",
135             isa => "HashRef",
136             default => sub { {} },
137             );
138              
139             sub yield {
140             my $self = shift;
141            
142             my ( $method ) = pos_validated_list(
143             [shift],
144             { isa => 'Str' },
145             );
146            
147             my @args = @_;
148            
149             my $trace;
150             if ( $self->log->is_trace ) {
151             my $caller = ((caller(1))[3]);
152             $self->log_trace(
153             "$caller yields $method"
154             .(@args?" (with args: @args)":"")
155             );
156             }
157             if ( !@args ) {
158             if ( $self->yielding->{$method} ) {
159             $self->log_trace(" - already yielding");
160             return;
161             }
162             else {
163             $self->yielding->{$method} = 1;
164             }
165             }
166             $self->event->timer(
167             desc => $method,
168             after => 0,
169             cb => sub {
170             delete $self->yielding->{$method};
171             if ( $self->log->is_trace ) {
172             $self->log_trace(
173             "Calling $method".(@args?"(@args)":"")
174             );
175             }
176              
177             my $ok = eval {
178             $self->$method(@args);
179             1;
180             };
181             my $error = $@;
182             if (!$ok) {
183             my $message = "Uncaught exception when yielding to "
184             ."$method: $error";
185             $self->log_error($message);
186              
187             die $error || $message;
188             }
189             },
190             );
191             }
192              
193             has 'connection_id' => (
194             is => "ro",
195             isa => "Str",
196             default => sub {
197             sprintf("sep.%x.%.4x",time(),$$&65535);
198             },
199             );
200              
201             has 'peerhost' => (
202             is => "rw",
203             isa => "Str",
204             );
205              
206             has 'peer_cn' => (
207             is => "rw",
208             isa => "Str",
209             );
210              
211             has 'server_id_seq' => (
212             is => "rw",
213             isa => "Num",
214             traits => [qw/Number/],
215             handles => {
216             'inc_server_id' => 'add',
217             },
218             default => 0,
219             );
220              
221             use SRS::EPP::Session::Extensions;
222             has 'extensions' => (
223             is => "ro",
224             isa => 'SRS::EPP::Session::Extensions',
225             default => sub {
226             SRS::EPP::Session::Extensions->new(),
227             }
228             );
229              
230             # called when a response is generated from the server itself, not the
231             # back-end. Return an ephemeral ID based on the timestamp and a
232             # session counter.
233             sub new_server_id {
234             my $self = shift;
235            
236             $self->inc_server_id(1);
237             my $id = $self->connection_id.".".sprintf("%.3d",$self->server_id_seq);
238             $self->log_trace("server-generated ID is $id");
239             $id;
240             }
241              
242             #----
243             # input packet chunking
244             has 'input_packeter' => (
245             default => sub {
246             my $self = shift;
247             SRS::EPP::Packets->new(session => $self);
248             },
249             handles => [qw( input_event input_state input_expect )],
250             );
251              
252             sub read_input {
253             my $self = shift;
254            
255             my ( $how_much ) = pos_validated_list(
256             \@_,
257             { isa => 'Int', },
258             );
259            
260             croak '$how_much must be > 0' unless $how_much > 0;
261            
262             my $rv = $self->io->read($how_much);
263              
264             if (! defined $rv) {
265             # Error occured during read
266             my ($error, $error_name, $err_info) = $self->get_last_ssl_error;
267              
268             $self->log_error("error on write; $error_name ($err_info)");
269             }
270              
271             $self->log_trace("read_input($how_much) = ".bytes::length($rv));
272             return $rv;
273             }
274              
275             sub input_ready {
276             my $self = shift;
277            
278             !!$self->io->peek(1);
279             }
280              
281             # convert input packets to messages
282             sub input_packet {
283             my $self = shift;
284            
285             my ( $data ) = pos_validated_list(
286             \@_,
287             { isa => 'Str' },
288             );
289            
290             $self->log_debug("parsing ".bytes::length($data)." bytes of XML");
291             my $msg = eval {
292             if ( !utf8::is_utf8($data) ) {
293             my $pre_length = bytes::length($data);
294             $data = decode("utf8", $data);
295             my $post_length = length($data);
296             if ( $pre_length != $post_length ) {
297             $self->log_debug(
298             "data is $post_length unicode characters"
299             );
300             }
301             }
302             $self->log_packet("input", $data);
303             XML::EPP->parse($data);
304             };
305             my $error = ( $msg ? undef : $@ );
306             if ($error) {
307             my $err_str = "".$error;
308             $self->log_info("error parsing message: $err_str");
309             }
310             my $queue_item = SRS::EPP::Command->new(
311             ( $msg ? (message => $msg) : () ),
312             xml => $data,
313             ( $error ? (error => $error) : () ),
314             session => $self,
315             );
316             $self->log_info("queuing command: $queue_item");
317             $self->queue_command($queue_item);
318             if ($error) {
319             my $error_rs = SRS::EPP::Response::Error->new(
320             (
321             $queue_item->client_id
322             ? (client_id => $queue_item->client_id)
323             : ()
324             ),
325             server_id => $self->new_server_id,
326             exception => $error,
327             );
328             $self->log_info("queuing response: $error_rs");
329             $self->add_command_response(
330             $error_rs,
331             $queue_item,
332             );
333             $self->yield("send_pending_replies");
334             }
335             else {
336             $self->yield("process_queue");
337             }
338             }
339              
340             #----
341             # queues
342             has 'processing_queue' => (
343             is => "ro",
344             default => sub {
345             my $self = shift;
346             SRS::EPP::Session::CmdQ->new();
347             },
348             handles => [
349             qw( queue_command next_command
350             add_command_response commands_queued
351             response_ready dequeue_response )
352             ],
353             );
354              
355             has 'backend_queue' => (
356             is => "ro",
357             default => sub {
358             my $self = shift;
359             SRS::EPP::Session::BackendQ->new();
360             },
361             handles => [
362             qw( queue_backend_request backend_next
363             backend_pending
364             add_backend_response backend_response_ready
365             dequeue_backend_response get_owner_of_request )
366             ],
367             );
368              
369             # this shouldn't be required... but is a good checklist
370             sub check_queues() {
371             my $self = shift;
372            
373             $self->yield("send_pending_replies")
374             if $self->response_ready;
375             $self->yield("process_queue")
376             if !$self->stalled and $self->commands_queued;
377             $self->yield("process_responses")
378             if $self->backend_response_ready;
379             $self->yield("send_backend_queue")
380             if $self->backend_pending;
381             }
382              
383             # "stalling" means that no more processing can be advanced until the
384             # responses to the currently processing commands are available.
385             #
386             # eg, "login" and "logout" both stall the queue, as will the
387             # <transform><renew> command, if we have to first query the back-end
388             # to determine what the correct renewal message is.
389             #
390             # the value in 'stalled' is the command which stalled the pipeline;
391             # so that it can be restarted without the command doing anything
392             # special.
393             has stalled => (
394             is => "rw",
395             isa => "Maybe[SRS::EPP::Command|Bool]",
396             trigger => sub {
397             my $self = shift;
398             my $val = shift;
399             $self->log_debug(
400             "processing queue is ".($val?"":"un-")."stalled"
401             );
402             if ( !$val ) {
403             $self->check_queues;
404             }
405             }
406             );
407              
408             sub process_queue {
409             my $self = shift;
410            
411             my ( $count ) = pos_validated_list(
412             \@_,
413             { isa => 'Int', default => 1 },
414             );
415            
416             while ( $count-- > 0 ) {
417             if ( $self->stalled ) {
418             $self->state("Processing Command");
419             $self->log_trace("stalled; not processing");
420             last;
421             }
422             my $command = $self->next_command or last;
423             $self->log_info(
424             "processing command $command"
425             );
426             if ( $command->simple ) {
427              
428             # "simple" commands include "hello" and "logout"
429             my $response = $command->process($self);
430             $self->log_debug(
431             "processed simple command $command; response is $response"
432             );
433             $self->add_command_response($response, $command);
434             }
435             elsif ( $command->authenticated xor $self->user ) {
436             my $reason = ($self->user?"already":"not")." logged in";
437             $self->add_command_response(
438             $command->make_response(
439             Error => (
440             code => 2001,
441             exception => $reason,
442             )
443             ),
444             $command,
445             );
446             $self->log_info(
447             "rejecting command: $reason"
448             );
449             }
450             else {
451              
452             # regular message which may need to talk to
453             # the SRS backend
454             my @messages = eval {
455             $command->process($self);
456             };
457             my $error = $@;
458             $self->process_notify_result( $command, $error, @messages );
459             }
460             $self->yield("send_pending_replies")
461             if $self->response_ready;
462             }
463             }
464              
465             sub process_notify_result {
466             my $self = shift;
467            
468             my ( $command ) = pos_validated_list(
469             [shift],
470             { isa => 'SRS::EPP::Command', },
471             );
472             my ($error, @messages) = @_;
473            
474             $self->log_debug(
475             "$command process/notify result: error=".($error//"(none)")
476             .", ".(@messages?"messages=@messages":"no messages"),
477             );
478             if (!@messages or !blessed($messages[0])) {
479             $self->log_info(
480             $error
481             ? "Error when calling process on $command: $error"
482             : "Unblessed return from process: @messages"
483             );
484              
485             my $error_resp = $command->make_error(
486             ($error ? (exception => $error) : ()),
487             code => 2400,
488             );
489             @messages = $error_resp;
490             }
491              
492             # convert unwrapped responses to wrapped ones
493             if ( $messages[0]->isa('XML::EPP') ) {
494              
495             # add these messages to the outgoing queue
496             die "wrong" if @messages > 1;
497             my $response = SRS::EPP::EPPResponse->new(
498             message => $messages[0],
499             );
500             @messages = $response;
501             }
502              
503             # check what kind of messages these are
504             if (
505             $messages[0]->does('XML::SRS::Action') ||
506             $messages[0]->does('XML::SRS::Query')
507             )
508             {
509             foreach my $i (0 .. $#messages) {
510             # Make sure every message has a unique action or query id
511             # TODO: perhaps we should override anything the mapping has set, so we can
512             # be sure it is actually unique. Would also make sense to have one place
513             # where the ids are controlled.
514             unless ($messages[$i]->unique_id) {
515             my $id = $command->client_id || $command->server_id;
516             $id .= "[$i]" if scalar @messages > 1;
517             $messages[$i]->unique_id($id);
518             }
519              
520             $messages[$i] = SRS::EPP::SRSRequest->new( message => $messages[$i], );
521             }
522              
523             $self->log_info( "$command produced ".@messages." SRS message(s)" );
524             $self->queue_backend_request( $command, @messages, );
525             if ( $command->isa("SRS::EPP::Command::Login") ) {
526             $self->state("Processing <login>");
527             }
528             else {
529             $self->state("Processing Command");
530             }
531             $self->yield("send_backend_queue");
532             }
533             elsif ( $messages[0]->isa('SRS::EPP::Response') )
534             {
535             if ( $self->stalled and $self->stalled == $command ) {
536             $self->log_info(
537             $error
538             ? "re-enabling pipeline after command received untrapped error"
539             : "command did not re-enable processing pipeline!"
540             );
541             $self->stalled(0);
542             }
543             $self->log_info("$command produced $messages[0]");
544             $self->add_command_response( $messages[0], $command, );
545              
546             $self->yield("send_pending_replies")
547             if $self->response_ready;
548             }
549             else {
550              
551             # We got something else unknown... return an error
552             $self->log_debug(
553             "process_queue: Unknown message type - $messages[0] ... doesn't appear"
554             ." to be a SRS or EPP request, returning error"
555             );
556             my $rs = $command->make_response(
557             code => 2400,
558             );
559             $self->add_command_response( $rs, $command, );
560             }
561             }
562              
563             #----
564             # method to say "we're connected, so send a greeting"; if this class
565             # were abstracted to not run over a stream transport then this would
566             # be important.
567             sub connected {
568             my $self = shift;
569            
570             $self->state("Prepare Greeting");
571             my $response = SRS::EPP::Response::Greeting->new(
572             session => $self,
573             );
574             $self->log_info(
575             "prepared greeting $response for ".$self->peerhost
576             );
577             my $socket_fd = $self->io->get_fd;
578             $self->log_trace("setting up io event handlers for FD $socket_fd");
579             my $w = $self->event->io(
580             desc => "input_event",
581             fd => $socket_fd,
582             poll => 'r',
583             cb => sub {
584             $self->log_trace("got input callback");
585             $self->input_event;
586             },
587             timeout => $self->timeout,
588             timeout_cb => sub {
589             $self->log_trace("got input timeout event");
590             $self->input_timeout;
591             },
592             );
593             $self->input_event_watcher($w);
594              
595             $w = $self->event->io(
596             desc => "output_event",
597             fd => $socket_fd,
598             poll => 'w',
599             cb => sub {
600             $self->output_event;
601             },
602             timeout => $self->timeout,
603             timeout_cb => sub {
604             $self->log_trace("got output timeout event");
605             },
606             );
607             $w->stop;
608             $self->output_event_watcher($w);
609              
610             # Process signals every few seconds (if any were received)
611             $self->event->timer(
612             desc => "signal_handler_timer",
613             after => 3,
614             interval => 3,
615             cb => sub {
616             $self->proxy->process_signals;
617             },
618             );
619              
620             $self->send_reply($response);
621             $self->state("Waiting for Client Authentication");
622             }
623              
624             #----
625             # Backend stuff. Perhaps this should all go in the BackendQ class.
626              
627             has 'backend_tx_max' => (
628             isa => "Int",
629             is => "rw",
630             default => 10,
631             );
632              
633             has 'user_agent' => (
634             is => "rw",
635             lazy => 1,
636             default => sub {
637             my $self = shift;
638             my $ua = SRS::EPP::Proxy::UA->new(session => $self);
639             $self->log_trace("setting up UA input event");
640             my $w;
641             $w = $self->event->io(
642             desc => "user_agent",
643             fd => $ua->read_fh,
644             poll => 'r',
645             cb => sub {
646             if ( $self->user_agent ) {
647             $self->log_trace(
648             "UA input event fired, calling backend_response",
649             );
650              
651             eval {
652             $self->backend_response;
653             };
654             if ($@) {
655             my $error =
656             "Uncaught exception calling backend_response in user_agent: $@";
657             $self->log_info($error);
658              
659             die $error;
660             }
661             }
662             else {
663             $self->log_trace(
664             "canceling UA watcher",
665             );
666             $w->cancel;
667             }
668             },
669             );
670             $ua;
671             },
672             handles => {
673             "user_agent_busy" => "busy",
674             },
675             );
676              
677             has 'backend_url' => (
678             isa => "Str",
679             is => "rw",
680             required => 1,
681             );
682              
683             has 'active_request' => (
684             is => "rw",
685             isa => "Maybe[SRS::EPP::SRSMessage]",
686             );
687              
688             sub next_message {
689             my $self = shift;
690            
691             my @next = $self->backend_next($self->backend_tx_max)
692             or return;
693             my $tx = XML::SRS::Request->new(
694             version => "auto",
695             requests => [ map { $_->message } @next ],
696             );
697             my $rq = SRS::EPP::SRSMessage->new(
698             message => $tx,
699             parts => \@next,
700             );
701             $self->log_info("creating a ".@next."-part SRS message");
702             if ( $self->log->is_debug ) {
703             $self->log_debug("parts: @next");
704             }
705             $self->active_request($rq);
706             $rq;
707             }
708              
709             sub send_backend_queue {
710             my $self = shift;
711            
712             return if $self->user_agent_busy;
713              
714             my $tx = $self->next_message;
715             my $xml = $tx->to_xml;
716             $self->log_packet(
717             "backend request",
718             $xml,
719             );
720             my $sig = $self->openpgp->detached_sign($xml);
721             $self->log_debug("signed XML message - sig is ".$sig)
722             if $self->log->is_debug;
723             my $reg_id = $self->user;
724             if ( !$reg_id ) {
725             $reg_id = $self->want_user;
726             }
727              
728             # It seems like we are getting bytes from the XML libraries, but the
729             # HTTP::Request library wants chars. This matters when we are dealing
730             # with UNICODE.
731             $xml = decode("utf8",$xml);
732              
733             my $req = POST(
734             $self->backend_url,
735             [
736             r => $xml,
737             s => $sig,
738             n => $reg_id,
739             ],
740             );
741             $self->log_info(
742             "posting to ".$self->backend_url." as registrar $reg_id"
743             );
744              
745             $self->user_agent->request($req);
746             }
747              
748             sub url_decode {
749             my $url_encoded = shift;
750             $url_encoded =~ tr{+}{ };
751             $url_encoded =~ s{%([0-9a-f]{2})}{chr(hex($1))}eg;
752             return $url_encoded;
753             }
754              
755             #----
756             # Dealing with backend responses
757             sub backend_response {
758             my $self = shift;
759            
760             my $response = $self->user_agent->get_response;
761              
762             # urldecode response; split response from fields
763             my $content = $response->content;
764              
765             $self->log_debug(
766             "received ".bytes::length($content)." bytes of "
767             ."response from back-end"
768             );
769              
770             my %fields = map {
771             my ($key, $value) = split "=", $_, 2;
772             ($key, decode("utf8", url_decode($value)));
773             } split "&", $content;
774              
775             # check signature
776             $self->log_debug("verifying signature");
777            
778             if ($fields{s}) {
779             $self->openpgp->verify_detached(data => $fields{r}, signature => $fields{s})
780             or die "failed to verify BE response integrity";
781             }
782              
783             $self->log_packet("BE response", $fields{r});
784              
785             my $rs_tx = $self->parse_be_response($fields{r});
786             return unless $rs_tx;
787              
788             $self->be_response($rs_tx);
789              
790             # user agent is now free, perhaps more messages are waiting
791             $self->yield("send_backend_queue")
792             if $self->backend_pending;
793             }
794              
795             sub parse_be_response {
796             my $self = shift;
797            
798             my ( $xml ) = pos_validated_list(
799             \@_,
800             { isa => 'Str' },
801             );
802            
803             # decode message
804             my $message = eval { XML::SRS::Response->parse($xml, 1) };
805             my $error = $@;
806             if ($error) {
807             # Got an error parsing response. Log and generate a 2500 error
808             $self->log_error("Exception parsing SRS Response: $error");
809              
810             my $request = $self->active_request;
811             my $rq_parts = $request->parts;
812              
813             my $command = $self->get_owner_of_request($rq_parts->[0]);
814              
815             my $error_resp = SRS::EPP::Response::Error->new(
816             code => 2500,
817             server_id => 'unknown',
818             );
819              
820             $self->add_command_response(
821             $error_resp,
822             $command,
823             );
824             $self->yield("send_pending_replies");
825             $self->shutdown;
826              
827             return;
828              
829             }
830              
831             return SRS::EPP::SRSMessage->new( message => $message );
832             }
833              
834             sub be_response {
835             my $self = shift;
836            
837             my ( $rs_tx ) = pos_validated_list(
838             \@_,
839             { isa => 'SRS::EPP::SRSMessage' },
840             );
841            
842             my $request = $self->active_request;
843              
844             my $rq_parts = $request->parts;
845             my $rs_parts = $rs_tx->parts;
846             my $result_id = eval { $rs_parts->[0]->message->result_id }
847             || "(no unique_id)";
848             $self->log_info(
849             "response $result_id from back-end has "
850             .@$rs_parts." parts, "
851             ."active request ".@$rq_parts." parts"
852             );
853             if (
854             @$rs_parts < @$rq_parts
855             and @$rs_parts == 1
856             and
857             $rs_parts->[0]->message->isa("XML::SRS::Error")
858             )
859             {
860             # this is a more fundamental type of error than others
861             # ... 'extend' to the other messages
862             @$rs_parts = ((@$rs_parts) x @$rq_parts);
863             }
864              
865             (@$rq_parts == @$rs_parts) or do {
866             die "rs parts != rq parts";
867             };
868              
869             for (my $i = 0; $i <= $#$rq_parts; $i++ ) {
870             if (@$rq_parts > 1) {
871             eval { $rs_parts->[$i]->message->part($i+1); };
872             }
873             $self->add_backend_response($rq_parts->[$i], $rs_parts->[$i]);
874             }
875             $self->yield("process_responses");
876             }
877              
878             sub process_responses {
879             my $self = shift;
880            
881             while ( $self->backend_response_ready ) {
882             my ($cmd, @rs) = $self->dequeue_backend_response;
883              
884             # for easier tracking of messages.
885             if (
886             my $server_id = eval {
887             $rs[0]->message->result_id;
888             }
889             )
890             {
891             my $before = $cmd->server_id
892             if $cmd->has_server_id;
893             if ( @rs > 1 ) {
894             $server_id .= "+".(@rs-1);
895             }
896             $cmd->server_id($server_id);
897             if ($before) {
898             my $after = $cmd->server_id;
899             $self->log_info(
900             "changing server_id: $before => $after"
901             );
902             }
903             }
904              
905             my (@messages, $error);
906              
907             my $check_ok = eval { @messages = $self->check_for_be_error($cmd, @rs); 1 };
908             $error = $@;
909             if ( @messages or $error or !$check_ok ) {
910             $self->log_error(
911             "$cmd received "
912             .(
913             $error
914             ? "fault during BE error check"
915             : (
916             $check_ok
917             ? "untrapped SRS error"
918             : "error without a clue"
919             )
920             )
921             );
922             $error ||= "SRS error"; # flag for process_notify_result
923             }
924             else {
925             $self->log_info(
926             "notifying command $cmd of back-end response"
927             );
928             @messages = eval{ $cmd->notify(\@rs) };
929             $error = $@;
930             }
931              
932             $self->process_notify_result($cmd, $error, @messages);
933             }
934             }
935              
936             # Check responses for an error from the SRS. If we find one, we create
937             # an appropriate response and return it
938             sub check_for_be_error {
939             my $self = shift;
940            
941             my ( $cmd ) = pos_validated_list(
942             [shift],
943             { isa => 'SRS::EPP::Command' },
944             );
945            
946             my @rs = @_;
947              
948             my @errors;
949             foreach my $rs (@rs) {
950             my $message = $rs->message;
951              
952             my $resps = $message->can('responses')
953             ? $message->responses : [$message];
954              
955             next unless $resps;
956              
957             foreach my $resp (@$resps) {
958             if ($resp->isa('XML::SRS::Error')) {
959             push @errors, $resp;
960              
961             # If it's a system error (i.e. the
962             # original message is an
963             # XML::SRS::Error, not an error
964             # wrapped in a response), or if this
965             # command type doesn't expect multiple
966             # responses, we're done here.
967              
968             last if $message->isa('XML::SRS::Error')
969             || !$cmd->multiple_responses;
970             }
971             }
972             }
973              
974             if (@errors) {
975             return $cmd->make_error_response(
976             \@errors,
977             );
978             }
979              
980             return;
981             }
982              
983             sub send_pending_replies {
984             my $self = shift;
985            
986             while ( $self->response_ready ) {
987             my $response = $self->dequeue_response;
988             $self->log_info(
989             "queuing response $response"
990             );
991             $self->send_reply($response);
992             }
993             if ( !$self->commands_queued ) {
994             if ( $self->user ) {
995             $self->state("Waiting for Command");
996             }
997             else {
998             $self->state("Waiting for Client Authentication");
999             }
1000             }
1001             }
1002              
1003             #----
1004             # Sending responses back
1005              
1006             # this is a queue of byte strings, which are ready for transmission
1007             has 'output_queue' => (
1008             is => "ro",
1009             isa => "ArrayRef[Str]",
1010             default => sub { [] },
1011             );
1012              
1013             # this is the interface for sending replies.
1014             sub send_reply {
1015             my $self = shift;
1016            
1017             my ( $rs ) = pos_validated_list(
1018             \@_,
1019             { isa => 'SRS::EPP::Response' },
1020             );
1021            
1022             $self->log_debug(
1023             "converting response $rs to XML"
1024             );
1025             my $reply_data = $rs->to_xml;
1026             $self->log_packet("output", $reply_data);
1027             if ( utf8::is_utf8($reply_data) ) {
1028             $reply_data = encode("utf8", $reply_data);
1029             }
1030             $self->log_info(
1031             "response $rs is ".bytes::length($reply_data)
1032             ." bytes long"
1033             );
1034             my $length = pack("N", bytes::length($reply_data)+4);
1035             push @{ $self->output_queue }, $length, $reply_data;
1036             $self->yield("output_event");
1037             my $remaining = 0;
1038             for ( @{ $self->output_queue }) {
1039             $remaining += bytes::length;
1040             }
1041             return $remaining;
1042             }
1043              
1044             # once we are "shutdown", no new commands will be allowed to process
1045             # (stalled queue) and the connection will be disconnected once the
1046             # back-end processing and output queue is cleared.
1047             has 'shutting_down' => (
1048             is => "rw",
1049             isa => "Bool",
1050             );
1051              
1052             sub shutdown {
1053             my $self = shift;
1054            
1055             $self->log_info("shutting down session");
1056             $self->state("Shutting down");
1057             $self->stalled(1);
1058             $self->shutting_down(1);
1059             $self->yield("output_event");
1060             }
1061              
1062             has 'timeout' => (
1063             is => "ro",
1064             isa => "Int",
1065             default => 300,
1066             );
1067              
1068             sub input_timeout {
1069             my $self = shift;
1070              
1071             # just hang up...
1072             $self->shutdown;
1073             }
1074              
1075             sub do_close {
1076             my $self = shift;
1077            
1078              
1079             # hang up on us without logging out will you? Well, we'll
1080             # just have to close your TCP session without properly closing
1081             # SSL. Take that.
1082             $self->log_debug("shutting down Socket");
1083             $self->socket->shutdown(1);
1084             $self->log_debug("shutting down user agent");
1085             $self->user_agent(undef);
1086             $self->input_event_watcher->cancel;
1087             $self->event->unloop_all;
1088             }
1089              
1090             # called when input_event fires, but nothing is readable.
1091             sub empty_read {
1092             my $self = shift;
1093            
1094             $self->log_info("detected EOF on input");
1095             $self->do_close;
1096             }
1097              
1098             sub output_event {
1099             my $self = shift;
1100            
1101             my $oq = $self->output_queue;
1102              
1103             my $written = $self->write_to_client($oq);
1104              
1105             if (@$oq) {
1106             $self->output_event_watcher->start;
1107             }
1108             else {
1109             $self->output_event_watcher->stop;
1110             $self->log_info("flushed output to client");
1111             if ( $self->shutting_down ) {
1112             $self->check_queues;
1113              
1114             # if check_queues didn't yield any events, we're done.
1115             if ( !keys %{$self->yielding} ) {
1116             $self->do_close;
1117             }
1118             else {
1119             $self->log_debug(
1120             "shutdown still pending: @{[keys %{$self->yielding}]}"
1121             );
1122             }
1123             }
1124             }
1125             return $written;
1126             }
1127              
1128             # write as much to the client as the output buffer will accept this
1129             # time around and re-queue any partial fragments
1130             sub write_to_client {
1131             my $self = shift;
1132            
1133             my ( $oq ) = pos_validated_list(
1134             \@_,
1135             { isa => 'ArrayRef' },
1136             );
1137            
1138             my $written = 0;
1139             my $io = $self->io;
1140             while (@$oq) {
1141             my $datum = shift @$oq;
1142             my $wrote = $io->write($datum);
1143             if ( $wrote <= 0 ) {
1144             my ($error, $error_name, $err_info) = $self->get_last_ssl_error($wrote);
1145              
1146             if ( $error == $SSL_ERROR{WANT_READ} ) {
1147             # try calling input_event straight away
1148             $self->log_debug("got WANT_READ during write to client, calling input_event()");
1149              
1150             $self->input_event;
1151             }
1152             elsif ($error != $SSL_ERROR{NONE}) {
1153             # Got an error we couldn't handle, probably can't continue with this connection
1154             $self->log_error("error on write; $error_name (ret: $wrote, $err_info)");
1155             die "Error writing to client: $err_info\n";
1156             }
1157              
1158             unshift @$oq, $datum;
1159             last;
1160             }
1161             else {
1162              
1163             # thankfully, this is returned in bytes.
1164             $written += $wrote;
1165             if ( $wrote < bytes::length $datum ) {
1166             unshift @$oq, bytes::substr $datum, $wrote;
1167             last;
1168             }
1169             }
1170             }
1171             $self->log_trace(
1172             "write_to_client wrote $written bytes, ".@$oq." chunk(s) remaining"
1173             );
1174              
1175             return $written;
1176             }
1177              
1178             sub get_last_ssl_error {
1179             my $self = shift;
1180             my $ret = shift;
1181              
1182             my $io = $self->io;
1183              
1184             my $error = $io->get_error($ret);
1185             my $err_info = "err = $error";
1186             if ( $error == $SSL_ERROR{SYSCALL} ) {
1187             $err_info .= ", \$! = $!";
1188             }
1189              
1190             my ($error_name) = grep { $SSL_ERROR{$_} == $error }
1191             keys %SSL_ERROR;
1192              
1193             return ($error, $error_name, $err_info);
1194             }
1195              
1196             sub log_packet {
1197             my $self = shift;
1198            
1199             my ( $label, $data ) = pos_validated_list(
1200             \@_,
1201             { isa => 'Str' },
1202             { isa => 'Str' },
1203             );
1204            
1205            
1206             $data =~ s{([\0-\037])}{chr(ord($1)+0x2400)}eg;
1207             $data =~ s{([,\|])}{chr(ord($1)+0xff00-0x20)}eg;
1208             my @data;
1209             while ( length $data ) {
1210             push @data, substr $data, 0, 1024, "";
1211             }
1212             for (my $i = 0; $i <= $#data; $i++ ) {
1213             my $n_of_n = (@data > 1 ? " [".($i+1)." of ".@data."]" : "");
1214             $self->log_info(
1215             "$label message$n_of_n: "
1216             .encode("utf8", $data[$i]),
1217             );
1218             }
1219             }
1220             1;
1221              
1222             __END__
1223              
1224             =head1 NAME
1225              
1226             SRS::EPP::Session - logic for EPP Session State machine
1227              
1228             =head1 SYNOPSIS
1229              
1230             my $session = SRS::EPP::Session->new( io => $socket );
1231              
1232             #--- session events:
1233              
1234             $session->connected;
1235             $session->input_event;
1236             $session->input_packet($data);
1237             $session->queue_command($command);
1238             $session->process_queue($count);
1239             $session->be_response($srs_rs);
1240             $session->send_pending_replies();
1241             $session->send_reply($response);
1242             $session->output_event;
1243              
1244             #--- information messages:
1245              
1246             # print RFC3730 state eg 'Waiting for Client',
1247             # 'Prepare Greeting' (see Page 4 of RFC3730)
1248             print $session->state;
1249              
1250             # return the credential used for login
1251             print $session->user;
1252              
1253             =head1 DESCRIPTION
1254              
1255             The SRS::EPP::Session class manages the flow of individual
1256             connections. It implements the "EPP Server State Machine" from
1257             RFC3730, as well as the exchange encapsulation described in RFC3734
1258             "EPP TCP Transport".
1259              
1260             This class is designed to be called from within an event-based
1261             framework; this is fairly essential in the context of a server given
1262             the potential to deadlock if the client does not clear its responses
1263             in a timely fashion.
1264              
1265             Input commands go through several stages:
1266              
1267             =over
1268              
1269             =item *
1270              
1271             First, incoming data ready is chunked into complete EPP requests.
1272             This is a binary de-chunking, and is based on reading a packet length
1273             as a U32, then waiting for that many octets. See L</input_event>
1274              
1275             =item *
1276              
1277             Complete chunks are passed to the L<SRS::EPP::Command> constructor for
1278             validation and object construction. See L</input_packet>
1279              
1280             =item *
1281              
1282             The constructed object is triaged, and added to an appropriate
1283             processing queue. See L</queue_command>
1284              
1285             =item *
1286              
1287             The request is processed; either locally for requests such as
1288             C<E<gt>helloE<lt>>, or converted to the back-end format
1289             (L<SRS::Request>) and placed in the back-end queue (this is normally
1290             immediately dispatched). See L</process_queue>
1291              
1292             =item *
1293              
1294             The response (a L<SRS::Response> object) from the back-end is
1295             received; this is converted to a corresponding L<SRS::EPP::Response>
1296             object. Outstanding queued back-end requests are then dispatched if
1297             they are present (so each session has a maximum of one outstanding
1298             request at a time). See L</be_response>
1299              
1300             =item *
1301              
1302             Prepared L<SRS::EPP::Response> objects are queued, this involves
1303             individually converting them to strings, which are sent back to the
1304             client, each response its own SSL frame. See L</send_reply>
1305              
1306             =item *
1307              
1308             If the output blocks, then the responses wait and are sent back as
1309             the response queue clears. See L</output_event>
1310              
1311             =back
1312              
1313             =head1 METHODS
1314              
1315             =head2 connected()
1316              
1317             This event signals to the Session that the client is now connected.
1318             It signals that it is time to issue a C<E<gt>greetingE<lt>> response,
1319             just as if a C<E<gt>helloE<lt>> message had been received.
1320              
1321             =head2 input_event()
1322              
1323             This event is intended to be invoked whenever there is data ready to
1324             read on the input socket. It returns false if not enough data could
1325             be read to get a complete subpacket.
1326              
1327             =head2 input_packet($data)
1328              
1329             This message is self-fired with a complete packet of data once it has
1330             been read.
1331              
1332             =head2 queue_command($command)
1333              
1334             Enqueues an EPP command for processing and does nothing else.
1335              
1336             =head2 process_queue($count)
1337              
1338             Processes the back-end queue, up to C<$count> at a time. At the end
1339             of this, if there are no outstanding back-end transactions, any
1340             produced L<SRS::Request> objects are wrapped into an
1341             L<SRS::Transaction> object and dispatched to the back-end.
1342              
1343             Returns the number of commands remaining to process.
1344              
1345             =head2 be_response($srs_rs)
1346              
1347             This is fired when a back-end response is received. It is responsible
1348             for matching responses with commands in the command queue and
1349             converting to L<SRS::EPP::Response> objects.
1350              
1351             =head2 send_pending_replies()
1352              
1353             This is called by process_queue() or be_response(), and checks each
1354             command for a corresponding L<SRS::EPP::Response> object, dequeues and
1355             starts to send them back.
1356              
1357             =head2 send_reply($response)
1358              
1359             This is called by send_pending_replies(), and converts a
1360             L<SRS::EPP::Response> object to network form, then starts to send it.
1361             Returns the total number of octets which are currently outstanding; if
1362             this is non-zero, the caller is expected to watch the output socket
1363             for writability and call L<output_event()> once it is writable.
1364              
1365             =head2 output_event()
1366              
1367             This event is intended to be called when the return socket is newly
1368             writable; it writes everything it can to the output socket and returns
1369             the number of bytes written.
1370              
1371             =head1 SEE ALSO
1372              
1373             L<SRS::EPP::Command>, L<SRS::EPP::Response>
1374              
1375             =cut
1376              
1377             # Local Variables:
1378             # mode:cperl
1379             # indent-tabs-mode: t
1380             # cperl-continued-statement-offset: 8
1381             # cperl-brace-offset: 0
1382             # cperl-close-paren-offset: 0
1383             # cperl-continued-brace-offset: 0
1384             # cperl-continued-statement-offset: 8
1385             # cperl-extra-newline-before-brace: nil
1386             # cperl-indent-level: 8
1387             # cperl-indent-parens-as-block: t
1388             # cperl-indent-wrt-brace: nil
1389             # cperl-label-offset: -8
1390             # cperl-merge-trailing-else: t
1391             # End:
1392