File Coverage

blib/lib/Mojo/MySQL5/Connection.pm
Criterion Covered Total %
statement 172 314 54.7
branch 60 150 40.0
condition 6 44 13.6
subroutine 28 51 54.9
pod 4 4 100.0
total 270 563 47.9


line stmt bran cond sub pod time code
1             package Mojo::MySQL5::Connection;
2 7     7   20499 use Mojo::Base 'Mojo::EventEmitter';
  7         7  
  7         28  
3              
4 7     7   1961 use utf8;
  7         10  
  7         18  
5 7     7   575 use Encode qw(_utf8_off _utf8_on);
  7         8596  
  7         350  
6 7     7   563 use Digest::SHA qw(sha1);
  7         2707  
  7         334  
7 7     7   28 use Scalar::Util 'weaken';
  7         12  
  7         270  
8 7     7   2215 use Mojo::IOLoop;
  7         389923  
  7         32  
9 7     7   452 use Mojo::MySQL5::URL;
  7         12  
  7         41  
10              
11             has state => 'disconnected';
12              
13             has url => sub { Mojo::MySQL5::URL->new('mysql:///') };
14              
15 7   50 7   456 use constant DEBUG => $ENV{MOJO_MYSQL_DEBUG} // 0;
  7         8  
  7         958  
16              
17             use constant {
18 7         1135 CLIENT_CAPABILITY => [ qw(
19             LONG_PASSWORD FOUND_ROWS LONG_FLAG CONNECT_WITH_DB
20             NO_SCHEMA COMPRESS ODBC LOCAL_FILES
21             IGNORE_SPACE PROTOCOL_41 INTERACTIVE SSL
22             IGNORE_SIGPIPE TRANSACTIONS RESERVED SECURE_CONNECTION
23             MULTI_STATEMENTS MULTI_RESULTS PS_MULTI_RESULTS PLUGIN_AUTH
24             CONNECT_ATTRS PLUGIN_AUTH_LENENC_CLIENT_DATA CAN_HANDLE_EXPIRED_PASSWORDS SESSION_TRACK
25             DEPRECATE_EOF) ],
26              
27             SERVER_STATUS => [ qw(
28             STATUS_IN_TRANS STATUS_AUTOCOMMIT RESERVED MORE_RESULTS_EXISTS
29             STATUS_NO_GOOD_INDEX_USED STATUS_NO_INDEX_USED STATUS_CURSOR_EXISTS STATUS_LAST_ROW_SENT
30             STATUS_DB_DROPPED STATUS_NO_BACKSLASH_ESCAPES STATUS_METADATA_CHANGED QUERY_WAS_SLOW
31             PS_OUT_PARAMS STATUS_IN_TRANS_READONLY SESSION_STATE_CHANGED) ],
32              
33             FIELD_FLAG => [ qw(
34             NOT_NULL PRI_KEY UNIQUE_KEY MULTIPLE_KEY
35             BLOB UNSIGNED ZEROFILL BINARY
36             ENUM AUTO_INCREMENT TIMESTAMP SET) ],
37              
38             CHARSET => {
39             UTF8 => 33, BINARY => 63, ASCII => 65 },
40              
41             DATATYPE => {
42             DECIMAL => 0x00, TINY => 0x01, SHORT => 0x02, LONG => 0x03,
43             FLOAT => 0x04, DOUBLE => 0x05,
44             NULL => 0x06, TIMESTAMP => 0x07,
45             LONGLONG => 0x08, INT24 => 0x09,
46             DATE => 0x0a, TIME => 0x0b, DATETIME => 0x0c, YEAR => 0x0d, NEWDATE => 0x0e,
47             VARCHAR => 0x0f, BIT => 0x10,
48             NEWDECIMAL => 0xf6, ENUM => 0xf7, SET => 0xf8,
49             TINY_BLOB => 0xf9, MEDIUM_BLOB => 0xfa, LONG_BLOB => 0xfb, BLOB => 0xfc,
50             VAR_STRING => 0xfd, STRING => 0xfe, GEOMETRY => 0xff },
51 7     7   32 };
  7         8  
52              
53             use constant {
54 7         38 REV_CHARSET => { reverse %{CHARSET()} },
  189         922  
55 7         8 REV_DATATYPE => { map { chr(DATATYPE->{$_}) => $_ } keys %{DATATYPE()} },
  7         32  
56 7     7   31 };
  7         8  
57              
58             # state machine
59             # doing => { state => '_op', state => '_op' }
60 7         27600 use constant SEQ => {
61             connect => {
62             connected => '_recv_handshake',
63             handshake => '_send_auth',
64             auth => '_recv_ok',
65             },
66             query => {
67             idle => '_send_query',
68             query => '_recv_query_responce',
69             field => '_recv_field',
70             result => '_recv_row',
71             },
72             ping => {
73             idle => '_send_ping',
74             ping => '_recv_ok',
75             },
76             disconnect => {
77             idle => '_send_quit',
78             quit => '_recv_ok'
79             }
80 7     7   27 };
  7         7  
81              
82              
83             # encode fixed length integer
84             sub _encode_int($$) {
85 0     0   0 my ($int, $len) = @_;
86 0 0 0     0 return substr pack('V', $int), 0, $len if $len >= 1 and $len <= 4;
87 0 0 0     0 return substr pack('VV', int $int % 2 ** 32, int $int / 2 ** 32), 0, $len if $len == 6 or $len = 8;
88 0         0 return undef;
89             }
90              
91             # encode length coded integer
92             sub _encode_lcint($) {
93 29     29   4176 my $int = shift;
94             return
95 29 100       20878 !defined $int ? pack 'C', 251 :
    100          
    100          
    100          
96             $int <= 250 ? pack 'C', $int :
97             $int <= 0xffff ? pack 'Cv', 252, $int :
98             $int <= 0xffffff ? substr pack('CV', 253, $int), 0, 4 :
99             pack 'CVV', 254, int $int % 2 ** 32, int $int / 2 ** 32;
100             }
101              
102             # encode length coded string
103             sub _encode_lcstr($) {
104 12     12   12321 my $str = shift;
105 12 100       37 return defined $str ? _encode_lcint(length $str) . $str : _encode_lcint($str);
106             }
107              
108             # get fixed length integer
109             sub _get_int {
110 109     109   9143 my ($self, $len, $chew) = @_;
111 109 100       235 my $data = $chew ? substr $self->{incoming}, 0, $len, '' : substr $self->{incoming}, 0, $len;
112 109 100       315 return unpack 'C', $data if $len == 1;
113 31 100 66     159 return unpack 'V', $data . "\0\0" if $len >= 2 and $len <= 4;
114 6 50       11 return undef unless $len == 8;
115 6         16 my $lo = unpack ('V', substr $data, 0, 4);
116 6         10 my $hi = unpack('V', substr $data, 4, 4);
117 6 100       19 return $hi ?
118             int $lo + int $hi * 2 ** 32 : $lo;
119             }
120              
121 81     81   111 sub _chew_int { shift->_get_int(shift, 1) }
122              
123             # get length coded integer
124             sub _chew_lcint {
125 41     41   60 my $self = shift;
126 41         58 my $first = $self->_chew_int(1);
127             return
128 41 50       124 $first < 251 ? $first :
    100          
    100          
    100          
    100          
129             $first == 251 ? undef :
130             $first == 252 ? $self->_chew_int(2) :
131             $first == 253 ? $self->_chew_int(3) :
132             $first == 254 ? $self->_chew_int(8) : undef;
133             }
134              
135             # get length coded string
136             sub _chew_lcstr {
137 14     14   32 my $self = shift;
138 14         19 my $len = $self->_chew_lcint;
139 14 100       9830 return defined $len ? substr $self->{incoming}, 0, $len, '' : undef;
140             }
141              
142             # get zero ending string
143             sub _chew_zstr {
144 3     3   8 my $self = shift;
145 3         9 my $str = unpack 'Z*', $self->{incoming};
146 3 50       8 return undef unless defined $str;
147 3         7 substr $self->{incoming}, 0, length($str) + 1, '';
148 3         8 return $str;
149             }
150              
151             # get fixed length string
152             sub _chew_str {
153 8     8   9 my ($self, $len) = @_;
154 8 50       16 die "_chew_str($len) error" if $len > length $self->{incoming};
155 8         15 return substr $self->{incoming}, 0, $len, '';
156             }
157              
158              
159             sub _send_auth {
160 0     0   0 my $self = shift;
161              
162 0         0 my ($username, $password, $database, $crypt) =
163             ($self->url->username, $self->url->password, $self->url->database, '');
164              
165 0         0 my @flags = qw(LONG_PASSWORD LONG_FLAG PROTOCOL_41 TRANSACTIONS SECURE_CONNECTION MULTI_RESULTS);
166 0 0       0 push @flags, 'CONNECT_WITH_DB' if $database;
167 0 0       0 push @flags, 'MULTI_STATEMENTS' if $self->url->options->{multi_statements};
168 0 0       0 push @flags, 'FOUND_ROWS' if $self->url->options->{found_rows};
169 0         0 my $flags = _flag_set(CLIENT_CAPABILITY, @flags);
170              
171 0         0 warn '>>> AUTH ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
172             ' user:', $username, ' database:', $database,
173             ' flags:', _flag_list(CLIENT_CAPABILITY, $flags),
174             '(', sprintf('%08X', $flags), ')', "\n" if DEBUG > 1;
175              
176 0         0 _utf8_off $username; _utf8_off $password; _utf8_off $database;
  0         0  
  0         0  
177              
178 0 0       0 if ($password) {
179 0         0 my $crypt1 = sha1($password);
180 0         0 my $crypt2 = sha1($self->{auth_plugin_data} . sha1 $crypt1);
181 0         0 $crypt = $crypt1 ^ $crypt2;
182             }
183              
184 0         0 $self->state('auth');
185 0         0 delete $self->{auth_plugin_data};
186 0 0 0     0 return pack 'VVCx23Z*a*Z*',
187             $flags, 131072, ($self->url->options->{utf8} // 1) ? CHARSET->{UTF8} : CHARSET->{BINARY},
188             $username, _encode_lcstr($crypt), $database, 'mysql_native_password';
189             }
190              
191             sub _send_quit {
192 0     0   0 my $self = shift;
193 0         0 warn '>>> QUIT ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
194 0         0 $self->state('quit');
195 0         0 return pack 'C', 1;
196             }
197              
198             sub _send_query {
199 0     0   0 my $self = shift;
200 0         0 my $sql = $self->{sql};
201 0         0 warn '>>> QUERY ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
202             " sql:$sql\n" if DEBUG > 1;
203 0         0 _utf8_off $sql;
204 0         0 $self->state('query');
205 0         0 return pack('C', 3) . $sql;
206             }
207              
208             sub _send_ping {
209 0     0   0 my $self = shift;
210 0         0 warn '>>> PING ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n" if DEBUG > 1;
211 0         0 $self->state('ping');
212 0         0 return pack 'C', 14;
213             }
214              
215             sub _recv_error {
216 1     1   3 my $self = shift;
217 1         4 my $first = $self->_chew_int(1);
218 1 50       3 die "_recv_error() wrong packet $first" unless $first == 255;
219              
220 1         2 $self->{error_code} = $self->_chew_int(2);
221 1         3 $self->_chew_str(1);
222 1         2 $self->{sql_state} = $self->_chew_str(5);
223 1         2 $self->{error_message} = $self->_chew_zstr;
224              
225 1         1 warn '<<< ERROR ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
226             ' error:', $self->{error_code},
227             ' state:', $self->{sql_state},
228             ' message:', $self->{error_message}, "\n" if DEBUG > 1;
229              
230 1         4 $self->state('idle');
231 1         9 $self->emit(error => $self->{error_message});
232             }
233              
234             sub _recv_ok {
235 2     2   3895 my $self = shift;
236 2         7 my $first = $self->_get_int(1);
237 2 100       10 return $self->_recv_error if $first == 255;
238 1 50       3 die "_recv_ok() wrong packet $first" unless $first == 0;
239              
240 1         3 $self->_chew_int(1);
241 1         4 $self->{affected_rows} = $self->_chew_lcint;
242 1         3 $self->{last_insert_id} = $self->_chew_lcint;
243 1         3 $self->{status_flags} = $self->_chew_int(2);
244 1         3 $self->{warnings_count} = $self->_chew_int(2);
245 1         2 $self->{field_count} = 0;
246              
247 1         2 warn '<<< OK ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
248             ' affected:', $self->{affected_rows},
249             ' last_insert_id:', $self->{last_insert_id},
250             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
251             '(', sprintf('%04X', $self->{status_flags}), ')',
252             ' warnings:', $self->{warnings_count}, "\n" if DEBUG > 1;
253              
254 1 50       3 $self->emit('connect') if $self->state eq 'auth';
255 1 50       8 $self->emit('end') if $self->state eq 'query';
256 1         27 $self->state('idle');
257             }
258              
259             sub _recv_query_responce {
260 1     1   5 my $self = shift;
261 1         2 my $first = $self->_get_int(1);
262 1 50       4 return $self->_recv_error if $first == 255;
263 1 50       5 return $self->_recv_ok if $first == 0;
264              
265 1         3 $self->{field_count} = $self->_chew_lcint;
266              
267 1         2 warn '<<< QUERY_RESPONSE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
268             ' fields:', $self->{field_count}, "\n" if DEBUG > 1;
269              
270 1         4 $self->state('field');
271             }
272              
273             sub _recv_eof {
274 2     2   2 my $self = shift;
275 2         4 my $first = $self->_get_int(1);
276 2 50       30 return $self->_recv_error if $first == 255;
277 2 50       6 die "_recv_eof() wrong packet $first" unless $first == 254;
278              
279 2         4 $self->_chew_int(1);
280 2         3 $self->{warnings_count} = $self->_chew_int(2);
281 2         3 $self->{status_flags} = $self->_chew_int(2);
282              
283 2         2 warn '<<< EOF ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
284             ' warnings:', $self->{warnings_count},
285             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
286             '(', sprintf('%04X', $self->{status_flags}), ')', "\n" if DEBUG > 1;
287              
288 2 100       6 if ($self->state eq 'field') {
    50          
289 1         10 $self->emit(fields => $self->{column_info});
290 1         15 $self->state('result');
291             }
292             elsif ($self->state eq 'result') {
293 1         11 $self->{column_info} = [];
294 1 50       4 if ($self->{status_flags} & 0x0008) {
295             # MORE_RESULTS
296 0         0 $self->state('query');
297             }
298             else {
299 1         3 $self->emit(end => undef);
300 1         7 $self->state('idle');
301             }
302             }
303             }
304              
305             sub _recv_field {
306 2     2   1515 my $self = shift;
307 2         5 my $first = $self->_get_int(1);
308 2 50       5 return $self->_recv_error if $first == 255;
309 2 100       6 return $self->_recv_eof if $first == 254;
310 1 50       2 die "_recv_field() wrong packet $first" if $first > 250;
311              
312 1         3 my $field = {};
313 1         3 $field->{catalog} = $self->_chew_lcstr;
314 1         3 $field->{schema} = $self->_chew_lcstr;
315 1         4 $field->{table} = $self->_chew_lcstr;
316 1         4 $field->{org_table} = $self->_chew_lcstr;
317 1         3 $field->{name} = $self->_chew_lcstr;
318 1         3 $field->{org_name} = $self->_chew_lcstr;
319 1         2 $self->_chew_lcint;
320 1         2 $field->{character_set} = $self->_chew_int(2);
321 1         2 $field->{column_length} = $self->_chew_int(4);
322 1         3 $field->{column_type} = $self->_chew_int(1);
323 1         2 $field->{flags} = $self->_chew_int(2);
324 1         1 $field->{decimals} = $self->_chew_int(1);
325 1         3 $self->_chew_str(2);
326              
327 1 50 50     5 do { _utf8_on $field->{$_} for qw(catalog schema table org_table name org_name) }
  1         9  
328             if ($self->url->options->{utf8} // 1);
329              
330 1         3 push @{$self->{column_info}}, $field;
  1         2  
331              
332 1         2 warn '<<< FIELD ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
333             ' name:', $field->{name},
334             ' type:', REV_DATATYPE->{chr $field->{column_type}}, '(', $field->{column_type}, ')',
335             ' length:', $field->{column_length},
336             ' charset:', REV_CHARSET->{$field->{character_set}} // 'UNKNOWN', '(', $field->{character_set}, ')',
337             ' flags:', _flag_list(FIELD_FLAG, $field->{flags}), '(', $field->{flags}, ')', , "\n" if DEBUG > 1;
338             }
339              
340             sub _recv_row {
341 3     3   2466 my $self = shift;
342 3         6 my $first = $self->_get_int(1);
343 3 50       6 return $self->_recv_error if $first == 255;
344 3 100       8 return $self->_recv_eof if $first == 254;
345              
346 2         1 my @row;
347 2         5 for (0 .. $self->{field_count} - 1) {
348 2         4 $row[$_] = $self->_chew_lcstr;
349 2 50       10 _utf8_on $row[$_]
350             if $self->{column_info}->[$_]->{character_set} == CHARSET->{UTF8};
351             }
352              
353             warn '<<< ROW ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
354 2         2 join(', ', map { defined $_ ? "'" . $_ . "'" : 'null' } @row), "\n" if DEBUG > 1;
355              
356 2         5 $self->emit(result => \@row);
357             }
358              
359             sub _recv_handshake {
360 1     1   141593 my $self = shift;
361 1         6 my $first = $self->_get_int(1);
362 1 50       4 return $self->_recv_error if $first == 255;
363              
364 1         7 $self->{protocol_version} = $self->_chew_int(1);
365 1         6 $self->{server_version} = $self->_chew_zstr;
366 1         3 $self->{connection_id} = $self->_chew_int(4);
367 1         6 $self->{auth_plugin_data} = $self->_chew_str(8);
368 1         3 $self->_chew_str(1);
369 1         11 $self->{capability_flags} = $self->_chew_int(2);
370 1         2 $self->{character_set} = $self->_chew_int(1);
371 1         3 $self->{status_flags} = $self->_chew_int(2);
372 1         4 $self->{capability_flags} |= $self->_chew_int(2) << 16;
373 1         3 my $auth_len = $self->_chew_int(1);
374 1         2 $self->_chew_str(10);
375 1         3 $self->{auth_plugin_data} .= $self->_chew_str(12);
376 1         3 $self->_chew_str(1);
377 1         2 my $auth_plugin_name = $self->_chew_zstr;
378              
379 1         1 warn '<<< HANDSHAKE ', $self->{connection_id}, ' #', $self->{seq}, ' state:', $self->state, "\n",
380             ' protocol:', $self->{protocol_version},
381             ' version:', $self->{server_version},
382             ' connection:', $self->{connection_id},
383             ' status:', _flag_list(SERVER_STATUS, $self->{status_flags}),
384             '(', sprintf('%04X', $self->{status_flags}), ')',
385             ' capabilities:', _flag_list(CLIENT_CAPABILITY, $self->{capability_flags}),
386             '(', sprintf('%08X', $self->{capability_flags}), ')',
387             ' auth:', $auth_plugin_name, "\n" if DEBUG > 1;
388              
389 1 50       9 die '_recv_handshake() invalid protocol version ' . $self->{protocol_version}
390             unless $self->{protocol_version} == 10;
391 1 50       4 die '_recv_handshake() unsupported auth method ' . $auth_plugin_name
392             unless $auth_plugin_name eq 'mysql_native_password';
393 1 50 33     10 die '_recv_handshake() invalid auth data '
394             unless $auth_len == 21 and length($self->{auth_plugin_data}) == 20;
395              
396 1         6 $self->state('handshake');
397             }
398              
399             sub _reset {
400 1     1   1271 my $self = shift;
401              
402 1         9 undef $self->{$_} for qw(error_code sql_state error_message
403             affected_rows last_insert_id status_flags warnings_count field_count);
404              
405 1         3 $self->{column_info} = [];
406 1         2 $self->{seq} = 0;
407 1         2 $self->{incoming} = '';
408             }
409              
410             sub _ioloop {
411 0 0 0 0   0 $_[1] ? Mojo::IOLoop->singleton : ($_[0]->{ioloop} ||= Mojo::IOLoop->new);
412             }
413              
414             sub _seq_next_ready {
415 0     0   0 my $self = shift;
416 0 0       0 return 0 if length $self->{incoming} < 4;
417 0         0 return length($self->{incoming}) - 4 >= $self->_get_int(3);
418             }
419              
420             sub _seq_next {
421 0     0   0 my ($self, $cmd, $writeonly) = @_;
422 0         0 my $next = SEQ->{$cmd}{$self->state};
423 0         0 warn 'stream state:', $self->state, ' doing:', $cmd, ' next:', ($next // ''), "\n" if DEBUG > 2;
424 0 0       0 return unless $next;
425 0 0       0 if (substr($next, 0, 6) eq '_send_') {
    0          
426 0         0 my $packet = $self->$next();
427 0         0 $self->{stream}->write(_encode_int(length $packet, 3) . _encode_int($self->{seq}, 1) . $packet);
428             }
429             elsif (substr($next, 0, 6) eq '_recv_') {
430 0 0       0 return if $writeonly;
431 0         0 my ($len, $seq) = ($self->_chew_int(3), $self->_chew_int(1));
432 0 0 0     0 die "_next_packet() packet out of order $seq " . $self->{seq}
433             if $self->{seq} and $seq != (($self->{seq} + 1) & 0xff);
434 0 0       0 die "_next_packet() not ready" if $len > length($self->{incoming});
435 0         0 $self->{seq}++;
436 0         0 $self->$next();
437             }
438             else {
439 0         0 $self->$next();
440             }
441             }
442              
443             sub _seq {
444 0     0   0 my ($self, $cmd, $cb) = @_;
445              
446 0         0 $self->{stream} = Mojo::IOLoop::Stream->new($self->{socket});
447 0 0       0 $self->{stream}->reactor($self->_ioloop(0)->reactor) unless $cb;
448 0 0       0 $self->{stream}->timeout($self->url->options->{query_timeout})
449             if $self->url->options->{query_timeout};
450 0         0 weaken $self;
451              
452             $self->{stream}->on(read => sub {
453 0     0   0 my ($stream, $bytes) = @_;
454 0         0 $self->{incoming} .= $bytes;
455              
456 0         0 $self->_seq_next($cmd, 0) while $self->_seq_next_ready;
457              
458 0 0       0 if ($self->state eq 'idle') {
459 0         0 $stream->steal_handle;
460 0         0 delete $self->{stream};
461 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
462             }
463             else {
464 0         0 $self->_seq_next($cmd, 1);
465             }
466 0         0 });
467             $self->{stream}->on(error => sub {
468 0     0   0 my ($stream, $err) = @_;
469 0         0 warn "stream error: $err state:", $self->state, "\n" if DEBUG;
470 0   0     0 $self->{error_message} //= $err;
471 0         0 $self->emit(error => $err);
472 0         0 });
473             $self->{stream}->on(timeout => sub {
474 0     0   0 warn "stream timeout state:", $self->state, "\n" if DEBUG;
475 0   0     0 $self->{error_message} //= 'timeout';
476 0         0 });
477             $self->{stream}->on(close => sub {
478 0     0   0 $self->{socket} = undef;
479 0         0 $self->state('disconnected');
480 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
481 0         0 });
482              
483 0         0 $self->{stream}->start;
484 0         0 $self->_seq_next($cmd, 1);
485             }
486              
487             sub _cmd {
488 0     0   0 my ($self, $cmd, $cb) = @_;
489 0 0       0 die 'invalid cmd:' . $cmd unless exists SEQ->{$cmd};
490 0 0       0 die 'invalid state:' . $self->state . ' doing:'. $cmd unless exists SEQ->{$cmd}{$self->state};
491              
492 0         0 $self->_reset;
493 0         0 $self->_seq($cmd, $cb);
494 0 0       0 $self->_ioloop(0)->start unless $cb;
495 0 0       0 return $self->state eq 'idle' ? 1 : 0;
496             }
497              
498             sub connect {
499 0     0 1 0 my ($self, $cb) = @_;
500              
501 0         0 $self->state('connecting');
502 0         0 $self->_reset;
503              
504 0 0 0     0 if ($self->url->host eq '' or $self->url->host eq 'localhost') {
505 0 0       0 if (!$self->url->options->{socket}) {
506 0         0 $self->url->options->{socket} = `mysql_config --socket`;
507 0         0 chomp $self->url->options->{socket};
508             }
509 0         0 warn "Connecting to UNIX socket '", $self->url->options->{socket}, "'\n" if DEBUG;
510 0   0     0 $self->{socket} = IO::Socket::UNIX->new(
511             Peer => $self->url->options->{socket},
512             Timeout => $self->url->options->{connect_timeout} // 10,
513             Blocking => 0
514             );
515 0 0       0 return unless $self->{socket};
516 0         0 $self->state('connected');
517 0         0 $self->_seq('connect', $cb);
518             }
519             else {
520 0         0 $self->{client} = Mojo::IOLoop::Client->new;
521 0 0       0 $self->{client}->reactor($self->_ioloop(0)->reactor) unless $cb;
522 0         0 weaken $self;
523              
524             $self->{client}->on(connect => sub {
525 0     0   0 my ($client, $handle) = @_;
526 0         0 delete $self->{client};
527 0         0 $self->{socket} = $handle;
528 0         0 $self->state('connected');
529 0         0 $self->_seq('connect', $cb);
530 0         0 });
531             $self->{client}->on(error => sub {
532 0     0   0 my ($client, $err) = @_;
533 0         0 delete $self->{client};
534 0         0 $self->state('disconnected');
535 0         0 $self->emit(error => $err);
536 0 0       0 $cb ? $self->$cb() : $self->_ioloop(0)->stop;
537 0         0 });
538              
539 0   0     0 $self->{client}->connect(
      0        
      0        
540             address => $self->url->host || '127.0.0.1',
541             port => $self->url->port || 3306,
542             timeout => $self->url->options->{connect_timeout} // 10
543             );
544             }
545              
546 0 0       0 $self->_ioloop(0)->start unless $cb;
547             }
548              
549 0     0 1 0 sub disconnect { shift->_cmd('disconnect') }
550              
551             sub ping {
552 0     0 1 0 my ($self, $cb) = @_;
553 0 0       0 return $self->state eq 'disconnected' ? 0 : $self->_cmd('ping', $cb);
554             }
555              
556             sub query {
557 0     0 1 0 my ($self, $sql, $cb) = @_;
558 0         0 $self->{sql} = $sql;
559 0         0 $self->_cmd('query', $cb);
560             }
561              
562             sub DESTROY {
563 1     1   1108 my $self = shift;
564 1 50 33     2 $self->disconnect if $self->state eq 'idle' and $self->{socket};
565             }
566              
567             # Private util functions
568             sub _flag_list($$;$) {
569 0     0     my ($list, $data, $sep) = @_;
570 0           my $i = 0;
571 0   0       return join $sep || '|', grep { $data & 1 << $i++ } @$list;
  0            
572             }
573              
574             sub _flag_set($;@) {
575 0     0     my ($list, @ops) = @_;
576 0           my ($i, $flags) = (0, 0);
577 0           foreach my $flag (@$list) {
578 0 0         do { $flags |= 1 << $i if $_ eq $flag } for @ops;
  0            
579 0           $i++;
580             }
581 0           return $flags;
582             }
583              
584             sub _flag_is($$$) {
585 0     0     my ($list, $data, $flag) = @_;
586 0           my $i = 0;
587 0           foreach (@$list) {
588 0 0         return $data & 1 << $i if $flag eq $_;
589 0           $i++;
590             }
591 0           return undef;
592             }
593              
594             1;
595              
596             =encoding utf8
597              
598             =head1 NAME
599              
600             Mojo::MySQL5::Connection - TCP connection to MySQL Server
601              
602             =head1 SYNOPSIS
603              
604             use Mojo::MySQL5::Conection;
605              
606             my $c = Mojo::MySQL5::Conection->new(
607             url => Mojo::MySQL5->new(
608             'mysql://test:password@127.0.0.1:3306/test?found_rows=1&connect_timeout=2')
609             );
610              
611             Mojo::IOLoop->delay(
612             sub {
613             my $delay = shift;
614             $c->connect($delay->begin);
615             },
616             sub {
617             my ($delay, $c) = @_;
618             $c->query('select * from test_data', $delay->begin);
619             },
620             sub {
621             my ($delay, $c) = @_;
622             }
623             )->wait;
624              
625              
626             =head1 DESCRIPTION
627              
628             L is Asyncronous Protocol Implementation for connection to MySQL Server
629             managed by L.
630              
631             =head1 EVENTS
632              
633             L inherits all events from L and can emit the
634             following new ones.
635              
636             =head2 fields
637              
638             $c->on(fields => sub {
639             my ($c, $fields) = @_;
640             ...
641             });
642              
643             Emitted after posting query and fields definition is received.
644              
645             =head2 result
646              
647             $c->on(result => sub {
648             my ($c, $result) = @_;
649             ...
650             });
651              
652             Emited when a result row is received.
653              
654             =head2 end
655              
656             $c->on(end => sub {
657             my $c = shift;
658             ...
659             });
660              
661             Emited when query ended successfully.
662              
663             =head2 error
664              
665             $c->on(error => sub {
666             my ($c, $error) = @_;
667             ...
668             });
669              
670             Emited when Error is received.
671              
672              
673             =head1 ATTRIBUTES
674              
675             L implements the following attributes.
676              
677             =head2 state
678              
679             my $state = $c->state;
680             $c->state('disconnected');
681              
682             Connection State.
683              
684             Possible States are:
685              
686             =over 2
687              
688             =item disconnected
689              
690             Initial state before connecting to server.
691              
692             Same state after fatal erorr.
693              
694             =item connected
695              
696             Connection to server is established.
697              
698             Waiting for C packet.
699              
700             =item handshake
701              
702             Server responded with C.
703              
704             Next send C (authentication) packet.
705              
706             =item auth
707              
708             C (authentication) packet sent to server.
709              
710             Next wait for C or C packet.
711              
712             =item idle
713              
714             Connection is idle and ready for sending commands.
715              
716             =item query
717              
718             C packet sent to server.
719              
720             Waiting for C packet. C is expected for non-SELECT queries.
721              
722             =item field
723              
724             Waiting for C packets. C is expected for end of column definition.
725              
726             =item result
727              
728             Waiting for C packets. C is expected for end of result rows.
729              
730             =item ping
731              
732             C packet is sent to server.
733              
734             Waitint for C packet.
735              
736             =back
737              
738             =head2 url
739              
740             my $url = $c->url;
741             $c->url(Mojo::MySQL5::URL->new('mysql://localhost/test');
742              
743             MySQL Connection URL.
744              
745             Supported Options are:
746              
747             =over 2
748              
749             =item found_rows
750              
751             Enables or disables the flag C while connecting to the MySQL server.
752             Without C, if you perform a query like
753            
754             UPDATE $table SET id = 1 WHERE id = 1;
755            
756             then the MySQL engine will return 0, because no rows have changed.
757             With C, it will return the number of rows that have an id 1.
758              
759             =item multi_statements
760              
761             Enables or disables the flag C while connecting to the server.
762             If enabled multiple statements separated by semicolon (;) can be send with single
763             call to L.
764              
765             =item utf8
766              
767             If enabled default character set is to C while connecting to the server.
768             If disabled C is the default character set.
769              
770             =item connect_timeout
771              
772             The connect request to the server will timeout if it has not been successful
773             after the given number of seconds.
774              
775             =item query_timeout
776              
777             If enabled, the read or write operation to the server will timeout
778             if it has not been successful after the given number of seconds.
779              
780             =item socket
781              
782             Unix socket that is used for connecting to the server.
783              
784             Determined by calling C unless specified.
785              
786             Unix socket is used if host part of L<"/url"> is C<''> or C<'localhost'>.
787             Use C<'127.0.0.1'> to connect to local machine via TCP.
788              
789             =back
790              
791             =head1 METHODS
792              
793             L inherits all methods from L and
794             implements the following new ones.
795              
796             =head2 connect
797              
798             # Blocking
799             $c->connect;
800             # Non-Blocking
801             $c->connect(sub { ... });
802              
803             Connect and authenticate to MySQL Server.
804              
805             =head2 disconnect
806              
807             $c->disconnect;
808              
809             Disconnect gracefully from server.
810              
811             =head2 ping
812            
813             say "ok" if $c->ping;
814              
815             Check if connection is alive.
816              
817             =head2 query
818              
819             # Blocking
820             $c->query('select 1 as `one`');
821             # Non-Blocking
822             $c->query('select 1 as `one`', sub { ... });
823              
824             Send SQL query to server.
825             Results are handled by events.
826              
827             =head1 DEBUGGING
828              
829             Debugging is enabled if environment variable MOJO_MYSQL_DEBUG is set.
830              
831             Packet tracing is enabled if MOJO_MYSQL_DEBUG is 2 or greater.
832              
833             =head1 AUTHOR
834              
835             Svetoslav Naydenov, C.
836              
837             =head1 COPYRIGHT AND LICENSE
838              
839             Copyright (C) 2015, Svetoslav Naydenov.
840              
841             This program is free software, you can redistribute it and/or modify it under
842             the terms of the Artistic License version 2.0.
843              
844             =head1 SEE ALSO
845              
846             L,
847              
848             L.
849              
850             =cut