File Coverage

amqp_socket.c
Criterion Covered Total %
statement 411 605 67.9
branch 165 330 50.0
condition n/a
subroutine n/a
pod n/a
total 576 935 61.6


line stmt bran cond sub pod time code
1             /*
2             * ***** BEGIN LICENSE BLOCK *****
3             * Version: MIT
4             *
5             * Portions created by Alan Antonuk are Copyright (c) 2012-2014
6             * Alan Antonuk. All Rights Reserved.
7             *
8             * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
9             * All Rights Reserved.
10             *
11             * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
12             * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
13             *
14             * Permission is hereby granted, free of charge, to any person
15             * obtaining a copy of this software and associated documentation
16             * files (the "Software"), to deal in the Software without
17             * restriction, including without limitation the rights to use, copy,
18             * modify, merge, publish, distribute, sublicense, and/or sell copies
19             * of the Software, and to permit persons to whom the Software is
20             * furnished to do so, subject to the following conditions:
21             *
22             * The above copyright notice and this permission notice shall be
23             * included in all copies or substantial portions of the Software.
24             *
25             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32             * SOFTWARE.
33             * ***** END LICENSE BLOCK *****
34             */
35              
36             #ifdef HAVE_CONFIG_H
37             #include "config.h"
38             #endif
39              
40             #ifdef _MSC_VER
41             #define _CRT_SECURE_NO_WARNINGS
42             #endif
43              
44             #include "amqp_private.h"
45             #include "amqp_socket.h"
46             #include "amqp_table.h"
47             #include "amqp_time.h"
48              
49             #include
50             #include
51             #include
52             #include
53             #include
54             #include
55             #include
56              
57             #include
58              
59             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
60             #ifndef WIN32_LEAN_AND_MEAN
61             #define WIN32_LEAN_AND_MEAN
62             #endif
63             #include
64             #include
65             #else
66             #include
67             /* On older BSD types.h must come before net includes */
68             #include
69             #include
70             #ifdef HAVE_SELECT
71             #include
72             #endif
73             #include
74             #include
75             #include
76             #include
77             #ifdef HAVE_POLL
78             #include
79             #endif
80             #include
81             #endif
82              
83             static int amqp_id_in_reply_list(amqp_method_number_t expected,
84             amqp_method_number_t *list);
85              
86 38           static int amqp_os_socket_init(void) {
87             #ifdef _WIN32
88             static int called_wsastartup = 0;
89             if (!called_wsastartup) {
90             WSADATA data;
91             int res = WSAStartup(0x0202, &data);
92             if (res) {
93             return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR;
94             }
95              
96             called_wsastartup = 1;
97             }
98             return AMQP_STATUS_OK;
99              
100             #else
101 38           return AMQP_STATUS_OK;
102             #endif
103             }
104              
105 401           int amqp_os_socket_error(void) {
106             #ifdef _WIN32
107             return WSAGetLastError();
108             #else
109 401           return errno;
110             #endif
111             }
112              
113 36           int amqp_os_socket_close(int sockfd) {
114             #ifdef _WIN32
115             return closesocket(sockfd);
116             #else
117 36           return close(sockfd);
118             #endif
119             }
120              
121 535           ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len,
122             int flags) {
123 535 50         assert(self);
124 535 50         assert(self->klass->send);
125 535           return self->klass->send(self, buf, len, flags);
126             }
127              
128 783           ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len,
129             int flags) {
130 783 50         assert(self);
131 783 50         assert(self->klass->recv);
132 783           return self->klass->recv(self, buf, len, flags);
133             }
134              
135 0           int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
136 0 0         assert(self);
137 0 0         assert(self->klass->open);
138 0           return self->klass->open(self, host, port, NULL);
139             }
140              
141 38           int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port,
142             const struct timeval *timeout) {
143 38 50         assert(self);
144 38 50         assert(self->klass->open);
145 38           return self->klass->open(self, host, port, timeout);
146             }
147              
148 5           int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) {
149 5 50         assert(self);
150 5 50         assert(self->klass->close);
151 5           return self->klass->close(self, force);
152             }
153              
154 73           void amqp_socket_delete(amqp_socket_t *self) {
155 73 100         if (self) {
156 37 50         assert(self->klass->delete);
157 37           self->klass->delete (self);
158             }
159 73           }
160              
161 780           int amqp_socket_get_sockfd(amqp_socket_t *self) {
162 780 50         assert(self);
163 780 50         assert(self->klass->get_sockfd);
164 780           return self->klass->get_sockfd(self);
165             }
166              
167 438           int amqp_poll(int fd, int event, amqp_time_t deadline) {
168             #ifdef HAVE_POLL
169             struct pollfd pfd;
170             int res;
171             int timeout_ms;
172              
173             /* Function should only ever be called with one of these two */
174 438 100         assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT);
    50          
175              
176             start_poll:
177 438           pfd.fd = fd;
178 438           switch (event) {
179             case AMQP_SF_POLLIN:
180 394           pfd.events = POLLIN;
181 394           break;
182             case AMQP_SF_POLLOUT:
183 44           pfd.events = POLLOUT;
184 44           break;
185             }
186              
187 438           timeout_ms = amqp_time_ms_until(deadline);
188 438 50         if (-1 > timeout_ms) {
189 0           return timeout_ms;
190             }
191              
192 438           res = poll(&pfd, 1, timeout_ms);
193              
194 438 100         if (0 < res) {
195             /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or
196             * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall
197             * doesn't need to be made. */
198 433           return AMQP_STATUS_OK;
199 5 50         } else if (0 == res) {
200 5           return AMQP_STATUS_TIMEOUT;
201             } else {
202 0 0         switch (amqp_os_socket_error()) {
203             case EINTR:
204 0           goto start_poll;
205             default:
206 438           return AMQP_STATUS_SOCKET_ERROR;
207             }
208             }
209             return AMQP_STATUS_OK;
210             #elif defined(HAVE_SELECT)
211             fd_set fds;
212             fd_set exceptfds;
213             fd_set *exceptfdsp;
214             int res;
215             struct timeval tv;
216             struct timeval *tvp;
217              
218             assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT)));
219             #ifndef _WIN32
220             /* On Win32 connect() failure is indicated through the exceptfds, it does not
221             * make any sense to allow POLLERR on any other platform or condition */
222             assert(0 == (event & AMQP_SF_POLLERR));
223             #endif
224              
225             start_select:
226             FD_ZERO(&fds);
227             FD_SET(fd, &fds);
228              
229             if (event & AMQP_SF_POLLERR) {
230             FD_ZERO(&exceptfds);
231             FD_SET(fd, &exceptfds);
232             exceptfdsp = &exceptfds;
233             } else {
234             exceptfdsp = NULL;
235             }
236              
237             res = amqp_time_tv_until(deadline, &tv, &tvp);
238             if (res != AMQP_STATUS_OK) {
239             return res;
240             }
241              
242             if (event & AMQP_SF_POLLIN) {
243             res = select(fd + 1, &fds, NULL, exceptfdsp, tvp);
244             } else if (event & AMQP_SF_POLLOUT) {
245             res = select(fd + 1, NULL, &fds, exceptfdsp, tvp);
246             }
247              
248             if (0 < res) {
249             return AMQP_STATUS_OK;
250             } else if (0 == res) {
251             return AMQP_STATUS_TIMEOUT;
252             } else {
253             switch (amqp_os_socket_error()) {
254             case EINTR:
255             goto start_select;
256             default:
257             return AMQP_STATUS_SOCKET_ERROR;
258             }
259             }
260             #else
261             #error "poll() or select() is needed to compile rabbitmq-c"
262             #endif
263             }
264              
265 10           static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
266             amqp_time_t deadline) {
267 10           int fd = amqp_get_sockfd(state);
268 10 100         if (-1 == fd) {
269 3           return AMQP_STATUS_SOCKET_CLOSED;
270             }
271 7           switch (res) {
272             case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
273 0           res = amqp_poll(fd, AMQP_SF_POLLIN, deadline);
274 0           break;
275             case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
276 6           res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline);
277 6           break;
278             }
279 7           return res;
280             }
281              
282 523           ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
283             size_t len, amqp_time_t deadline, int flags) {
284             ssize_t res;
285 523           void *buf_left = (void *)buf;
286             /* Assume that len is not going to be larger than ssize_t can hold. */
287 523           ssize_t len_left = (size_t)len;
288              
289             start_send:
290 535           res = amqp_socket_send(state->socket, buf_left, len_left, flags);
291              
292 535 100         if (res > 0) {
293 525           len_left -= res;
294 525           buf_left = (char *)buf_left + res;
295 525 100         if (0 == len_left) {
296 519           return (ssize_t)len;
297             }
298 6           goto start_send;
299             }
300 10           res = do_poll(state, res, deadline);
301 10 100         if (AMQP_STATUS_OK == res) {
302 6           goto start_send;
303             }
304 4 50         if (AMQP_STATUS_TIMEOUT == res) {
305 0           return (ssize_t)len - len_left;
306             }
307 4           return res;
308             }
309              
310 0           int amqp_open_socket(char const *hostname, int portnumber) {
311 0           return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite());
312             }
313              
314 37           int amqp_open_socket_noblock(char const *hostname, int portnumber,
315             const struct timeval *timeout) {
316             amqp_time_t deadline;
317 37           int res = amqp_time_from_now(&deadline, timeout);
318 37 50         if (AMQP_STATUS_OK != res) {
319 0           return res;
320             }
321 37           return amqp_open_socket_inner(hostname, portnumber, deadline);
322             }
323              
324             #ifdef _WIN32
325             static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
326             int one = 1;
327             SOCKET sockfd;
328             int last_error;
329              
330             /*
331             * This cast is to squash warnings on Win64, see:
332             * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
333             */
334              
335             sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
336             if (INVALID_SOCKET == sockfd) {
337             return AMQP_STATUS_SOCKET_ERROR;
338             }
339              
340             /* Set the socket to be non-blocking */
341             if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) {
342             last_error = AMQP_STATUS_SOCKET_ERROR;
343             goto err;
344             }
345              
346             /* Disable nagle */
347             if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
348             (const char *)&one, sizeof(one))) {
349             last_error = AMQP_STATUS_SOCKET_ERROR;
350             goto err;
351             }
352              
353             /* Enable TCP keepalives */
354             if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE,
355             (const char *)&one, sizeof(one))) {
356             last_error = AMQP_STATUS_SOCKET_ERROR;
357             goto err;
358             }
359              
360             if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) {
361             return (int)sockfd;
362             }
363              
364             if (WSAEWOULDBLOCK != WSAGetLastError()) {
365             last_error = AMQP_STATUS_SOCKET_ERROR;
366             goto err;
367             }
368              
369             last_error =
370             amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline);
371             if (AMQP_STATUS_OK != last_error) {
372             goto err;
373             }
374              
375             {
376             int result;
377             int result_len = sizeof(result);
378              
379             if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR,
380             (char *)&result, &result_len) ||
381             result != 0) {
382             last_error = AMQP_STATUS_SOCKET_ERROR;
383             goto err;
384             }
385             }
386              
387             return (int)sockfd;
388              
389             err:
390             closesocket(sockfd);
391             return last_error;
392             }
393             #else
394 38           static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
395 38           int one = 1;
396             int sockfd;
397             int flags;
398             int last_error;
399              
400 38           sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
401 38 50         if (-1 == sockfd) {
402 0           return AMQP_STATUS_SOCKET_ERROR;
403             }
404              
405             /* Enable CLOEXEC on socket */
406 38           flags = fcntl(sockfd, F_GETFD);
407 38 50         if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
    50          
408 0           last_error = AMQP_STATUS_SOCKET_ERROR;
409 0           goto err;
410             }
411              
412             /* Set the socket as non-blocking */
413 38           flags = fcntl(sockfd, F_GETFL);
414 38 50         if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) {
    50          
415 0           last_error = AMQP_STATUS_SOCKET_ERROR;
416 0           goto err;
417             }
418              
419             #ifdef SO_NOSIGPIPE
420             /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */
421             if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
422             last_error = AMQP_STATUS_SOCKET_ERROR;
423             goto err;
424             }
425             #endif /* SO_NOSIGPIPE */
426              
427             /* Disable nagle */
428 38 50         if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
429 0           last_error = AMQP_STATUS_SOCKET_ERROR;
430 0           goto err;
431             }
432              
433             /* Enable TCP keepalives */
434 38 50         if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) {
435 0           last_error = AMQP_STATUS_SOCKET_ERROR;
436 0           goto err;
437             }
438              
439 38 50         if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
440 0           return sockfd;
441             }
442              
443 38 50         if (EINPROGRESS != errno) {
444 0           last_error = AMQP_STATUS_SOCKET_ERROR;
445 0           goto err;
446             }
447              
448 38           last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline);
449 38 100         if (AMQP_STATUS_OK != last_error) {
450 1           goto err;
451             }
452              
453             {
454             int result;
455 37           socklen_t result_len = sizeof(result);
456              
457 37 50         if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) ||
    50          
458 37           result != 0) {
459 0           last_error = AMQP_STATUS_SOCKET_ERROR;
460 0           goto err;
461             }
462             }
463              
464 37           return sockfd;
465              
466             err:
467 1           close(sockfd);
468 38           return last_error;
469             }
470             #endif
471              
472 38           int amqp_open_socket_inner(char const *hostname, int portnumber,
473             amqp_time_t deadline) {
474             struct addrinfo hint;
475             struct addrinfo *address_list;
476             struct addrinfo *addr;
477             char portnumber_string[33];
478 38           int sockfd = -1;
479             int last_error;
480              
481 38           last_error = amqp_os_socket_init();
482 38 50         if (AMQP_STATUS_OK != last_error) {
483 0           return last_error;
484             }
485              
486 38           memset(&hint, 0, sizeof(hint));
487 38           hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
488 38           hint.ai_socktype = SOCK_STREAM;
489 38           hint.ai_protocol = IPPROTO_TCP;
490              
491 38           (void)sprintf(portnumber_string, "%d", portnumber);
492              
493 38           last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
494 38 50         if (0 != last_error) {
495 0           return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED;
496             }
497              
498 38 50         for (addr = address_list; addr; addr = addr->ai_next) {
499 38           sockfd = connect_socket(addr, deadline);
500              
501 38 100         if (sockfd >= 0) {
502 37           last_error = AMQP_STATUS_OK;
503 37           break;
504 1 50         } else if (sockfd == AMQP_STATUS_TIMEOUT) {
505 1           last_error = sockfd;
506 1           break;
507             }
508             }
509              
510 38           freeaddrinfo(address_list);
511 38 100         if (last_error != AMQP_STATUS_OK || sockfd == -1) {
    50          
512 1           return last_error;
513             }
514 38           return sockfd;
515             }
516              
517 37           static int send_header_inner(amqp_connection_state_t state,
518             amqp_time_t deadline) {
519             ssize_t res;
520             static const uint8_t header[8] = {'A',
521             'M',
522             'Q',
523             'P',
524             0,
525             AMQP_PROTOCOL_VERSION_MAJOR,
526             AMQP_PROTOCOL_VERSION_MINOR,
527             AMQP_PROTOCOL_VERSION_REVISION};
528 37           res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
529 37 50         if (sizeof(header) == res) {
530 37           return AMQP_STATUS_OK;
531             }
532 0           return (int)res;
533             }
534              
535 0           int amqp_send_header(amqp_connection_state_t state) {
536 0           return send_header_inner(state, amqp_time_infinite());
537             }
538              
539 74           static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
540             amqp_bytes_t res;
541              
542 74           switch (method) {
543             case AMQP_SASL_METHOD_PLAIN:
544 74           res = amqp_cstring_bytes("PLAIN");
545 74           break;
546             case AMQP_SASL_METHOD_EXTERNAL:
547 0           res = amqp_cstring_bytes("EXTERNAL");
548 0           break;
549              
550             default:
551 0           amqp_abort("Invalid SASL method: %d", (int)method);
552             }
553              
554 74           return res;
555             }
556              
557 37           static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) {
558 37 50         if (l.len == r.len) {
559 37 50         if (l.bytes && r.bytes) {
    50          
560 37 50         if (0 == memcmp(l.bytes, r.bytes, l.len)) {
561 37           return 1;
562             }
563             }
564             }
565 0           return 0;
566             }
567              
568 37           int sasl_mechanism_in_list(amqp_bytes_t mechanisms,
569             amqp_sasl_method_enum method) {
570             amqp_bytes_t mechanism;
571             amqp_bytes_t supported_mechanism;
572             uint8_t *start;
573             uint8_t *end;
574             uint8_t *current;
575              
576 37 50         assert(NULL != mechanisms.bytes);
577              
578 37           mechanism = sasl_method_name(method);
579              
580 37           start = (uint8_t *)mechanisms.bytes;
581 37           current = start;
582 37           end = start + mechanisms.len;
583              
584 37 50         for (; current != end; start = current + 1) {
585             /* HACK: SASL states that we should be parsing this string as a UTF-8
586             * string, which we're plainly not doing here. At this point its not worth
587             * dragging an entire UTF-8 parser for this one case, and this should work
588             * most of the time */
589 37           current = memchr(start, ' ', end - start);
590 37 50         if (NULL == current) {
591 0           current = end;
592             }
593 37           supported_mechanism.bytes = start;
594 37           supported_mechanism.len = current - start;
595 37 50         if (bytes_equal(mechanism, supported_mechanism)) {
596 37           return 1;
597             }
598             }
599              
600 37           return 0;
601             }
602              
603 37           static amqp_bytes_t sasl_response(amqp_pool_t *pool,
604             amqp_sasl_method_enum method, va_list args) {
605             amqp_bytes_t response;
606              
607 37           switch (method) {
608             case AMQP_SASL_METHOD_PLAIN: {
609 37 50         char *username = va_arg(args, char *);
610 37           size_t username_len = strlen(username);
611 37 50         char *password = va_arg(args, char *);
612 37           size_t password_len = strlen(password);
613             char *response_buf;
614              
615 37           amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2,
616             &response);
617 37 50         if (response.bytes == NULL)
618             /* We never request a zero-length block, because of the +2
619             above, so a NULL here really is ENOMEM. */
620             {
621 0           return response;
622             }
623              
624 37           response_buf = response.bytes;
625 37           response_buf[0] = 0;
626 37           memcpy(response_buf + 1, username, username_len);
627 37           response_buf[username_len + 1] = 0;
628 37           memcpy(response_buf + username_len + 2, password, password_len);
629 37           break;
630             }
631             case AMQP_SASL_METHOD_EXTERNAL: {
632 0 0         char *identity = va_arg(args, char *);
633 0           size_t identity_len = strlen(identity);
634              
635 0           amqp_pool_alloc_bytes(pool, identity_len, &response);
636 0 0         if (response.bytes == NULL) {
637 0           return response;
638             }
639              
640 0           memcpy(response.bytes, identity, identity_len);
641 0           break;
642             }
643             default:
644 0           amqp_abort("Invalid SASL method: %d", (int)method);
645             }
646              
647 37           return response;
648             }
649              
650 0           amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
651 0           return (state->first_queued_frame != NULL);
652             }
653              
654             /*
655             * Check to see if we have data in our buffer. If this returns 1, we
656             * will avoid an immediate blocking read in amqp_simple_wait_frame.
657             */
658 940           amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
659 940           return (state->sock_inbound_offset < state->sock_inbound_limit);
660             }
661              
662 485           static int consume_one_frame(amqp_connection_state_t state,
663             amqp_frame_t *decoded_frame) {
664             int res;
665              
666             amqp_bytes_t buffer;
667 485           buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
668 485           buffer.bytes =
669 485           ((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
670              
671 485           res = amqp_handle_input(state, buffer, decoded_frame);
672 485 50         if (res < 0) {
673 0           return res;
674             }
675              
676 485           state->sock_inbound_offset += res;
677              
678 485           return AMQP_STATUS_OK;
679             }
680              
681 395           static int recv_with_timeout(amqp_connection_state_t state,
682             amqp_time_t timeout) {
683             ssize_t res;
684             int fd;
685              
686             start_recv:
687 783           res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
688             state->sock_inbound_buffer.len, 0);
689              
690 783 100         if (res < 0) {
691 392           fd = amqp_get_sockfd(state);
692 392 50         if (-1 == fd) {
693 0           return AMQP_STATUS_CONNECTION_CLOSED;
694             }
695 392           switch (res) {
696             default:
697 0           return (int)res;
698             case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
699 392           res = amqp_poll(fd, AMQP_SF_POLLIN, timeout);
700 392           break;
701             case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
702 0           res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout);
703 0           break;
704             }
705 392 100         if (AMQP_STATUS_OK == res) {
706 388           goto start_recv;
707             }
708 4           return (int)res;
709             }
710              
711 391           state->sock_inbound_limit = res;
712 391           state->sock_inbound_offset = 0;
713              
714 391           res = amqp_time_s_from_now(&state->next_recv_heartbeat,
715             amqp_heartbeat_recv(state));
716 391 50         if (AMQP_STATUS_OK != res) {
717 0           return (int)res;
718             }
719 391           return AMQP_STATUS_OK;
720             }
721              
722 2           int amqp_try_recv(amqp_connection_state_t state) {
723             amqp_time_t timeout;
724              
725 22 100         while (amqp_data_in_buffer(state)) {
726             amqp_frame_t frame;
727 20           int res = consume_one_frame(state, &frame);
728              
729 20 50         if (AMQP_STATUS_OK != res) {
730 0           return res;
731             }
732              
733 20 50         if (frame.frame_type != 0) {
734             amqp_pool_t *channel_pool;
735             amqp_frame_t *frame_copy;
736             amqp_link_t *link;
737              
738 20           channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
739 20 50         if (NULL == channel_pool) {
740 0           return AMQP_STATUS_NO_MEMORY;
741             }
742              
743 20           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
744 20           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
745              
746 20 50         if (frame_copy == NULL || link == NULL) {
    50          
747 0           return AMQP_STATUS_NO_MEMORY;
748             }
749              
750 20           *frame_copy = frame;
751              
752 20           link->next = NULL;
753 20           link->data = frame_copy;
754              
755 20 100         if (state->last_queued_frame == NULL) {
756 1           state->first_queued_frame = link;
757             } else {
758 19           state->last_queued_frame->next = link;
759             }
760 20           state->last_queued_frame = link;
761             }
762             }
763 2           int res = amqp_time_s_from_now(&timeout, 0);
764 2 50         if (AMQP_STATUS_OK != res) {
765 0           return res;
766             }
767              
768 2           return recv_with_timeout(state, timeout);
769             }
770              
771 460           static int wait_frame_inner(amqp_connection_state_t state,
772             amqp_frame_t *decoded_frame,
773             amqp_time_t timeout_deadline) {
774             amqp_time_t deadline;
775             int res;
776              
777             for (;;) {
778 859 100         while (amqp_data_in_buffer(state)) {
779 465           res = consume_one_frame(state, decoded_frame);
780              
781 465 50         if (AMQP_STATUS_OK != res) {
782 0           return res;
783             }
784              
785 465 100         if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
786 5           amqp_maybe_release_buffers_on_channel(state, 0);
787 5           continue;
788             }
789              
790 460 100         if (decoded_frame->frame_type != 0) {
791             /* Complete frame was read. Return it. */
792 455           return AMQP_STATUS_OK;
793             }
794             }
795              
796             beginrecv:
797 394           res = amqp_time_has_past(state->next_send_heartbeat);
798 394 50         if (AMQP_STATUS_TIMER_FAILURE == res) {
799 0           return res;
800 394 100         } else if (AMQP_STATUS_TIMEOUT == res) {
801             amqp_frame_t heartbeat;
802 2           heartbeat.channel = 0;
803 2           heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
804              
805 2           res = amqp_send_frame(state, &heartbeat);
806 2 100         if (AMQP_STATUS_OK != res) {
807 2           return res;
808             }
809             }
810 393           deadline = amqp_time_first(timeout_deadline,
811             amqp_time_first(state->next_recv_heartbeat,
812             state->next_send_heartbeat));
813              
814             /* TODO this needs to wait for a _frame_ and not anything written from the
815             * socket */
816 393           res = recv_with_timeout(state, deadline);
817              
818 393 100         if (AMQP_STATUS_TIMEOUT == res) {
819 4 50         if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
820 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
821 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
822 4 50         } else if (amqp_time_equal(deadline, timeout_deadline)) {
823 4           return AMQP_STATUS_TIMEOUT;
824 0 0         } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
825             /* send heartbeat happens before we do recv_with_timeout */
826 0           goto beginrecv;
827             } else {
828 0           amqp_abort("Internal error: unable to determine timeout reason");
829             }
830 389 50         } else if (AMQP_STATUS_OK != res) {
831 0           return res;
832             }
833 849           }
834             }
835              
836 0           static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state,
837             amqp_frame_t *frame) {
838             amqp_link_t *link;
839             amqp_frame_t *frame_copy;
840              
841 0           amqp_pool_t *channel_pool =
842 0           amqp_get_or_create_channel_pool(state, frame->channel);
843              
844 0 0         if (NULL == channel_pool) {
845 0           return NULL;
846             }
847              
848 0           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
849 0           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
850              
851 0 0         if (NULL == link || NULL == frame_copy) {
    0          
852 0           return NULL;
853             }
854              
855 0           *frame_copy = *frame;
856 0           link->data = frame_copy;
857              
858 0           return link;
859             }
860              
861 0           int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
862 0           amqp_link_t *link = amqp_create_link_for_frame(state, frame);
863 0 0         if (NULL == link) {
864 0           return AMQP_STATUS_NO_MEMORY;
865             }
866              
867 0 0         if (NULL == state->first_queued_frame) {
868 0           state->first_queued_frame = link;
869             } else {
870 0           state->last_queued_frame->next = link;
871             }
872              
873 0           link->next = NULL;
874 0           state->last_queued_frame = link;
875              
876 0           return AMQP_STATUS_OK;
877             }
878              
879 0           int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
880 0           amqp_link_t *link = amqp_create_link_for_frame(state, frame);
881 0 0         if (NULL == link) {
882 0           return AMQP_STATUS_NO_MEMORY;
883             }
884              
885 0 0         if (NULL == state->first_queued_frame) {
886 0           state->first_queued_frame = link;
887 0           state->last_queued_frame = link;
888 0           link->next = NULL;
889             } else {
890 0           link->next = state->first_queued_frame;
891 0           state->first_queued_frame = link;
892             }
893              
894 0           return AMQP_STATUS_OK;
895             }
896              
897 59           int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
898             amqp_channel_t channel,
899             amqp_frame_t *decoded_frame) {
900             amqp_frame_t *frame_ptr;
901             amqp_link_t *cur;
902             int res;
903              
904 59 50         for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
905 0           frame_ptr = cur->data;
906              
907 0 0         if (channel == frame_ptr->channel) {
908 0           state->first_queued_frame = cur->next;
909 0 0         if (NULL == state->first_queued_frame) {
910 0           state->last_queued_frame = NULL;
911             }
912              
913 0           *decoded_frame = *frame_ptr;
914              
915 0           return AMQP_STATUS_OK;
916             }
917             }
918              
919             for (;;) {
920 59           res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
921              
922 59 50         if (AMQP_STATUS_OK != res) {
923 0           return res;
924             }
925              
926 59 50         if (channel == decoded_frame->channel) {
927 59           return AMQP_STATUS_OK;
928             } else {
929 0           res = amqp_queue_frame(state, decoded_frame);
930 0 0         if (res != AMQP_STATUS_OK) {
931 0           return res;
932             }
933             }
934 0           }
935             }
936              
937 21           int amqp_simple_wait_frame(amqp_connection_state_t state,
938             amqp_frame_t *decoded_frame) {
939 21           return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL);
940             }
941              
942 121           int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
943             amqp_frame_t *decoded_frame,
944             const struct timeval *timeout) {
945             amqp_time_t deadline;
946              
947 121           int res = amqp_time_from_now(&deadline, timeout);
948 121 50         if (AMQP_STATUS_OK != res) {
949 0           return res;
950             }
951              
952 121 100         if (state->first_queued_frame != NULL) {
953 20           amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data;
954 20           state->first_queued_frame = state->first_queued_frame->next;
955 20 100         if (state->first_queued_frame == NULL) {
956 1           state->last_queued_frame = NULL;
957             }
958 20           *decoded_frame = *f;
959 20           return AMQP_STATUS_OK;
960             } else {
961 121           return wait_frame_inner(state, decoded_frame, deadline);
962             }
963             }
964              
965 74           static int amqp_simple_wait_method_list(amqp_connection_state_t state,
966             amqp_channel_t expected_channel,
967             amqp_method_number_t *expected_methods,
968             amqp_time_t deadline,
969             amqp_method_t *output) {
970             amqp_frame_t frame;
971             struct timeval tv;
972             struct timeval *tvp;
973              
974 74           int res = amqp_time_tv_until(deadline, &tv, &tvp);
975 74 50         if (res != AMQP_STATUS_OK) {
976 0           return res;
977             }
978              
979 74           res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
980 74 50         if (AMQP_STATUS_OK != res) {
981 0           return res;
982             }
983              
984 74 50         if (AMQP_FRAME_METHOD != frame.frame_type ||
    50          
985 74 50         expected_channel != frame.channel ||
986 74           !amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) {
987 0           return AMQP_STATUS_WRONG_METHOD;
988             }
989 74           *output = frame.payload.method;
990 74           return AMQP_STATUS_OK;
991             }
992              
993 37           static int simple_wait_method_inner(amqp_connection_state_t state,
994             amqp_channel_t expected_channel,
995             amqp_method_number_t expected_method,
996             amqp_time_t deadline,
997             amqp_method_t *output) {
998 37           amqp_method_number_t expected_methods[] = {expected_method, 0};
999 37           return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
1000             deadline, output);
1001             }
1002              
1003 0           int amqp_simple_wait_method(amqp_connection_state_t state,
1004             amqp_channel_t expected_channel,
1005             amqp_method_number_t expected_method,
1006             amqp_method_t *output) {
1007 0           return simple_wait_method_inner(state, expected_channel, expected_method,
1008             amqp_time_infinite(), output);
1009             }
1010              
1011 306           int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
1012             amqp_method_number_t id, void *decoded) {
1013 306           return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
1014             amqp_time_infinite());
1015             }
1016              
1017 409           int amqp_send_method_inner(amqp_connection_state_t state,
1018             amqp_channel_t channel, amqp_method_number_t id,
1019             void *decoded, int flags, amqp_time_t deadline) {
1020             amqp_frame_t frame;
1021              
1022 409           frame.frame_type = AMQP_FRAME_METHOD;
1023 409           frame.channel = channel;
1024 409           frame.payload.method.id = id;
1025 409           frame.payload.method.decoded = decoded;
1026 409           return amqp_send_frame_inner(state, &frame, flags, deadline);
1027             }
1028              
1029 674           static int amqp_id_in_reply_list(amqp_method_number_t expected,
1030             amqp_method_number_t *list) {
1031 720 100         while (*list != 0) {
1032 710 100         if (*list == expected) {
1033 664           return 1;
1034             }
1035 46           list++;
1036             }
1037 10           return 0;
1038             }
1039              
1040 302           static amqp_rpc_reply_t simple_rpc_inner(
1041             amqp_connection_state_t state, amqp_channel_t channel,
1042             amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
1043             void *decoded_request_method, amqp_time_t deadline) {
1044             int status;
1045             amqp_rpc_reply_t result;
1046              
1047 302           memset(&result, 0, sizeof(result));
1048              
1049 302           status = amqp_send_method(state, channel, request_id, decoded_request_method);
1050 302 100         if (status < 0) {
1051 2           return amqp_rpc_reply_error(status);
1052             }
1053              
1054             {
1055             amqp_frame_t frame;
1056              
1057             retry:
1058 300           status = wait_frame_inner(state, &frame, deadline);
1059 300 50         if (status < 0) {
1060 0 0         if (status == AMQP_STATUS_TIMEOUT) {
1061 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
1062             }
1063 0           return amqp_rpc_reply_error(status);
1064             }
1065              
1066             /*
1067             * We store the frame for later processing unless it's something
1068             * that directly affects us here, namely a method frame that is
1069             * either
1070             * - on the channel we want, and of the expected type, or
1071             * - on the channel we want, and a channel.close frame, or
1072             * - on channel zero, and a connection.close frame.
1073             */
1074 300 50         if (!((frame.frame_type == AMQP_FRAME_METHOD) &&
    50          
1075 300 100         (((frame.channel == channel) &&
1076 300           (amqp_id_in_reply_list(frame.payload.method.id,
1077 5 50         expected_reply_ids) ||
1078 0 0         (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) ||
1079 0 0         ((frame.channel == 0) &&
1080 0           (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) {
1081             amqp_pool_t *channel_pool;
1082             amqp_frame_t *frame_copy;
1083             amqp_link_t *link;
1084              
1085 0           channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
1086 0 0         if (NULL == channel_pool) {
1087 0           return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1088             }
1089              
1090 0           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
1091 0           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
1092              
1093 0 0         if (frame_copy == NULL || link == NULL) {
    0          
1094 0           return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1095             }
1096              
1097 0           *frame_copy = frame;
1098              
1099 0           link->next = NULL;
1100 0           link->data = frame_copy;
1101              
1102 0 0         if (state->last_queued_frame == NULL) {
1103 0           state->first_queued_frame = link;
1104             } else {
1105 0           state->last_queued_frame->next = link;
1106             }
1107 0           state->last_queued_frame = link;
1108              
1109 0           goto retry;
1110             }
1111              
1112 300 100         result.reply_type =
1113 300           (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
1114             ? AMQP_RESPONSE_NORMAL
1115             : AMQP_RESPONSE_SERVER_EXCEPTION;
1116              
1117 300           result.reply = frame.payload.method;
1118 302           return result;
1119             }
1120             }
1121              
1122 54           amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
1123             amqp_channel_t channel,
1124             amqp_method_number_t request_id,
1125             amqp_method_number_t *expected_reply_ids,
1126             void *decoded_request_method) {
1127             amqp_time_t deadline;
1128             int res;
1129              
1130 54           res = amqp_time_from_now(&deadline, state->rpc_timeout);
1131 54 50         if (res != AMQP_STATUS_OK) {
1132 0           return amqp_rpc_reply_error(res);
1133             }
1134              
1135 54           return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
1136             decoded_request_method, deadline);
1137             }
1138              
1139 211           void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
1140             amqp_channel_t channel,
1141             amqp_method_number_t request_id,
1142             amqp_method_number_t reply_id,
1143             void *decoded_request_method) {
1144             amqp_time_t deadline;
1145             int res;
1146             amqp_method_number_t replies[2];
1147              
1148 211           res = amqp_time_from_now(&deadline, state->rpc_timeout);
1149 211 50         if (res != AMQP_STATUS_OK) {
1150 0           state->most_recent_api_result = amqp_rpc_reply_error(res);
1151 0           return NULL;
1152             }
1153              
1154 211           replies[0] = reply_id;
1155 211           replies[1] = 0;
1156              
1157 211           state->most_recent_api_result = simple_rpc_inner(
1158             state, channel, request_id, replies, decoded_request_method, deadline);
1159              
1160 211 100         if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
1161 206           return state->most_recent_api_result.reply.decoded;
1162             } else {
1163 211           return NULL;
1164             }
1165             }
1166              
1167 191           amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
1168 191           return state->most_recent_api_result;
1169             }
1170              
1171             /*
1172             * Merge base and add tables. If the two tables contain an entry with the same
1173             * key, the entry from the add table takes precedence. For entries that are both
1174             * tables with the same key, the table is recursively merged.
1175             */
1176 37           int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
1177             amqp_table_t *result, amqp_pool_t *pool) {
1178             int i;
1179             int res;
1180             amqp_pool_t temp_pool;
1181             amqp_table_t temp_result;
1182 37 50         assert(base != NULL);
1183 37 50         assert(result != NULL);
1184 37 50         assert(pool != NULL);
1185              
1186 37 50         if (NULL == add) {
1187 0           return amqp_table_clone(base, result, pool);
1188             }
1189              
1190 37           init_amqp_pool(&temp_pool, 4096);
1191 37           temp_result.num_entries = 0;
1192 37           temp_result.entries =
1193 37           amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) *
1194 37           (base->num_entries + add->num_entries));
1195 37 50         if (NULL == temp_result.entries) {
1196 0           res = AMQP_STATUS_NO_MEMORY;
1197 0           goto error_out;
1198             }
1199 259 100         for (i = 0; i < base->num_entries; ++i) {
1200 222           temp_result.entries[temp_result.num_entries] = base->entries[i];
1201 222           temp_result.num_entries++;
1202             }
1203 37 50         for (i = 0; i < add->num_entries; ++i) {
1204 0           amqp_table_entry_t *e =
1205 0           amqp_table_get_entry_by_key(&temp_result, add->entries[i].key);
1206 0 0         if (NULL != e) {
1207 0 0         if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind &&
    0          
1208 0           AMQP_FIELD_KIND_TABLE == e->value.kind) {
1209 0           amqp_table_entry_t *be =
1210 0           amqp_table_get_entry_by_key(base, add->entries[i].key);
1211              
1212 0           res = amqp_merge_capabilities(&be->value.value.table,
1213 0           &add->entries[i].value.value.table,
1214             &e->value.value.table, &temp_pool);
1215 0 0         if (AMQP_STATUS_OK != res) {
1216 0           goto error_out;
1217             }
1218             } else {
1219 0           e->value = add->entries[i].value;
1220             }
1221             } else {
1222 0           temp_result.entries[temp_result.num_entries] = add->entries[i];
1223 0           temp_result.num_entries++;
1224             }
1225             }
1226 37           res = amqp_table_clone(&temp_result, result, pool);
1227             error_out:
1228 37           empty_amqp_pool(&temp_pool);
1229 37           return res;
1230             }
1231              
1232 37           static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
1233             char const *vhost, int channel_max,
1234             int frame_max, int heartbeat,
1235             const amqp_table_t *client_properties,
1236             const struct timeval *timeout,
1237             amqp_sasl_method_enum sasl_method,
1238             va_list vl) {
1239             int res;
1240             amqp_method_t method;
1241              
1242             uint16_t client_channel_max;
1243             uint32_t client_frame_max;
1244             uint16_t client_heartbeat;
1245              
1246             uint16_t server_channel_max;
1247             uint32_t server_frame_max;
1248             uint16_t server_heartbeat;
1249              
1250             amqp_rpc_reply_t result;
1251             amqp_time_t deadline;
1252              
1253 37 50         if (channel_max < 0 || channel_max > UINT16_MAX) {
    50          
1254 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1255             }
1256 37           client_channel_max = (uint16_t)channel_max;
1257              
1258 37 50         if (frame_max < 0) {
1259 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1260             }
1261 37           client_frame_max = (uint32_t)frame_max;
1262              
1263 37 50         if (heartbeat < 0 || heartbeat > UINT16_MAX) {
    50          
1264 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1265             }
1266 37           client_heartbeat = (uint16_t)heartbeat;
1267              
1268 37           res = amqp_time_from_now(&deadline, timeout);
1269 37 50         if (AMQP_STATUS_OK != res) {
1270 0           goto error_res;
1271             }
1272              
1273 37           res = send_header_inner(state, deadline);
1274 37 50         if (AMQP_STATUS_OK != res) {
1275 0           goto error_res;
1276             }
1277              
1278 37           res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
1279             deadline, &method);
1280 37 50         if (AMQP_STATUS_OK != res) {
1281 0           goto error_res;
1282             }
1283              
1284             {
1285 37           amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded;
1286 37 50         if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
    50          
1287 37           (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
1288 0           res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION;
1289 0           goto error_res;
1290             }
1291              
1292 37           res = amqp_table_clone(&s->server_properties, &state->server_properties,
1293             &state->properties_pool);
1294              
1295 37 50         if (AMQP_STATUS_OK != res) {
1296 0           goto error_res;
1297             }
1298              
1299             /* TODO: check that our chosen SASL mechanism is in the list of
1300             acceptable mechanisms. Or even let the application choose from
1301             the list! */
1302 37 50         if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) {
1303 0           res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD;
1304 0           goto error_res;
1305             }
1306             }
1307              
1308             {
1309             amqp_table_entry_t default_properties[6];
1310             amqp_table_t default_table;
1311             amqp_table_entry_t client_capabilities[2];
1312             amqp_table_t client_capabilities_table;
1313             amqp_connection_start_ok_t s;
1314             amqp_pool_t *channel_pool;
1315             amqp_bytes_t response_bytes;
1316              
1317 37           channel_pool = amqp_get_or_create_channel_pool(state, 0);
1318 37 50         if (NULL == channel_pool) {
1319 0           res = AMQP_STATUS_NO_MEMORY;
1320 0           goto error_res;
1321             }
1322              
1323 37           response_bytes = sasl_response(channel_pool, sasl_method, vl);
1324 37 50         if (response_bytes.bytes == NULL) {
1325 0           res = AMQP_STATUS_NO_MEMORY;
1326 0           goto error_res;
1327             }
1328              
1329 37           client_capabilities[0] =
1330             amqp_table_construct_bool_entry("authentication_failure_close", 1);
1331 37           client_capabilities[1] =
1332             amqp_table_construct_bool_entry("exchange_exchange_bindings", 1);
1333              
1334 37           client_capabilities_table.entries = client_capabilities;
1335 37           client_capabilities_table.num_entries =
1336             sizeof(client_capabilities) / sizeof(amqp_table_entry_t);
1337              
1338 37           default_properties[0] =
1339             amqp_table_construct_utf8_entry("product", "rabbitmq-c");
1340 37           default_properties[1] =
1341             amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING);
1342 37           default_properties[2] =
1343             amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM);
1344 37           default_properties[3] =
1345             amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT);
1346 37           default_properties[4] = amqp_table_construct_utf8_entry(
1347             "information", "See https://github.com/alanxz/rabbitmq-c");
1348 37           default_properties[5] = amqp_table_construct_table_entry(
1349             "capabilities", &client_capabilities_table);
1350              
1351 37           default_table.entries = default_properties;
1352 37           default_table.num_entries =
1353             sizeof(default_properties) / sizeof(amqp_table_entry_t);
1354              
1355 37           res = amqp_merge_capabilities(&default_table, client_properties,
1356             &state->client_properties, channel_pool);
1357 37 50         if (AMQP_STATUS_OK != res) {
1358 0           goto error_res;
1359             }
1360              
1361 37           s.client_properties = state->client_properties;
1362 37           s.mechanism = sasl_method_name(sasl_method);
1363 37           s.response = response_bytes;
1364 37           s.locale = amqp_cstring_bytes("en_US");
1365              
1366 37           res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
1367             AMQP_SF_NONE, deadline);
1368 37 50         if (res < 0) {
1369 0           goto error_res;
1370             }
1371             }
1372              
1373 37           amqp_release_buffers(state);
1374              
1375             {
1376 37           amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD,
1377             AMQP_CONNECTION_CLOSE_METHOD, 0};
1378              
1379 37           res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
1380 37 50         if (AMQP_STATUS_OK != res) {
1381 0           goto error_res;
1382             }
1383             }
1384              
1385 37 50         if (AMQP_CONNECTION_CLOSE_METHOD == method.id) {
1386 0           result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
1387 0           result.reply = method;
1388 0           result.library_error = 0;
1389 0           goto out;
1390             }
1391              
1392             {
1393 37           amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded;
1394 37           server_channel_max = s->channel_max;
1395 37           server_frame_max = s->frame_max;
1396 37           server_heartbeat = s->heartbeat;
1397             }
1398              
1399 37 50         if (server_channel_max != 0 &&
    50          
1400 37 50         (server_channel_max < client_channel_max || client_channel_max == 0)) {
1401 37           client_channel_max = server_channel_max;
1402 0 0         } else if (server_channel_max == 0 && client_channel_max == 0) {
    0          
1403 0           client_channel_max = UINT16_MAX;
1404             }
1405              
1406 37 50         if (server_frame_max != 0 && server_frame_max < client_frame_max) {
    50          
1407 37           client_frame_max = server_frame_max;
1408             }
1409              
1410 37 50         if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) {
    50          
1411 0           client_heartbeat = server_heartbeat;
1412             }
1413              
1414 37           res = amqp_tune_connection(state, client_channel_max, client_frame_max,
1415             client_heartbeat);
1416 37 50         if (res < 0) {
1417 0           goto error_res;
1418             }
1419              
1420             {
1421             amqp_connection_tune_ok_t s;
1422 37           s.frame_max = client_frame_max;
1423 37           s.channel_max = client_channel_max;
1424 37           s.heartbeat = client_heartbeat;
1425              
1426 37           res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
1427             AMQP_SF_NONE, deadline);
1428 37 50         if (res < 0) {
1429 0           goto error_res;
1430             }
1431             }
1432              
1433 37           amqp_release_buffers(state);
1434              
1435             {
1436 37           amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0};
1437             amqp_connection_open_t s;
1438 37           s.virtual_host = amqp_cstring_bytes(vhost);
1439 37           s.capabilities = amqp_empty_bytes;
1440 37           s.insist = 1;
1441              
1442 37           result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
1443             &s, deadline);
1444 37 50         if (result.reply_type != AMQP_RESPONSE_NORMAL) {
1445 0           goto out;
1446             }
1447             }
1448              
1449 37           result.reply_type = AMQP_RESPONSE_NORMAL;
1450 37           result.reply.id = 0;
1451 37           result.reply.decoded = NULL;
1452 37           result.library_error = 0;
1453 37           amqp_maybe_release_buffers(state);
1454              
1455             out:
1456 37           return result;
1457              
1458             error_res:
1459 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
1460 0           result = amqp_rpc_reply_error(res);
1461              
1462 37           goto out;
1463             }
1464              
1465 0           amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,
1466             int channel_max, int frame_max, int heartbeat,
1467             int sasl_method, ...) {
1468             va_list vl;
1469             amqp_rpc_reply_t ret;
1470              
1471 0           va_start(vl, sasl_method);
1472              
1473 0           ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1474 0           &amqp_empty_table, state->handshake_timeout,
1475             sasl_method, vl);
1476              
1477 0           va_end(vl);
1478              
1479 0           return ret;
1480             }
1481              
1482 37           amqp_rpc_reply_t amqp_login_with_properties(
1483             amqp_connection_state_t state, char const *vhost, int channel_max,
1484             int frame_max, int heartbeat, const amqp_table_t *client_properties,
1485             int sasl_method, ...) {
1486             va_list vl;
1487             amqp_rpc_reply_t ret;
1488              
1489 37           va_start(vl, sasl_method);
1490              
1491 37           ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1492 37           client_properties, state->handshake_timeout,
1493             sasl_method, vl);
1494              
1495 37           va_end(vl);
1496              
1497 37           return ret;
1498             }