File Coverage

srl_compress.h
Criterion Covered Total %
statement 91 92 98.9
branch 48 50 96.0
condition n/a
subroutine n/a
pod n/a
total 139 142 97.8


line stmt bran cond sub pod time code
1             #ifndef SRL_COMPRESS_H_
2             #define SRL_COMPRESS_H_
3              
4             #include "srl_buffer.h"
5             #include "srl_inline.h"
6             #include "srl_protocol.h"
7             #include "srl_buffer_types.h"
8              
9             /* WARNING: This is different from the protocol bit SRL_PROTOCOL_ENCODING_SNAPPY
10             * and SRL_PROTOCOL_ENCODING_ZLIB in that it's a flag indicating that
11             * we want to use Snappy or Zlib.
12             *
13             * DO NOT CHANGE THIS WITHOUT REVIEWING THE BITS IN srl_encoder.h and etc.
14             */
15              
16             #define SRL_F_COMPRESS_SNAPPY 0x00040UL
17             #define SRL_F_COMPRESS_SNAPPY_INCREMENTAL 0x00080UL
18             #define SRL_F_COMPRESS_ZLIB 0x00100UL
19             #define SRL_F_COMPRESS_ZSTD 0x40000UL
20             /* WARNING: IF ADDING NEW COMPRESSION MAKE SURE THAT NEW CONSTANT DOES NOT
21             * COLLIDE WITH CONSTANTS IN srl_encoder.h!
22             */
23              
24             #define SRL_F_COMPRESS_FLAGS_MASK (SRL_F_COMPRESS_SNAPPY | \
25             SRL_F_COMPRESS_SNAPPY_INCREMENTAL | \
26             SRL_F_COMPRESS_ZLIB | \
27             SRL_F_COMPRESS_ZSTD)
28              
29             #if defined(HAVE_CSNAPPY)
30             #include
31             #else
32             #include "snappy/csnappy_compress.c"
33             #endif
34              
35             #if defined(HAVE_MINIZ)
36             #include
37             #else
38             #include "miniz.h"
39             #endif
40              
41             #if defined(HAVE_ZSTD)
42             #include
43             #else
44             #include "zstd/zstd.h"
45             #endif
46              
47             /* Update a varint anywhere in the output stream with defined start and end
48             * positions. This can produce non-canonical varints and is useful for filling
49             * pre-allocated varints. */
50             SRL_STATIC_INLINE void
51 118455           srl_update_varint_from_to(pTHX_ unsigned char *varint_start, unsigned char *varint_end, UV number)
52             {
53 182347 100         while (number >= 0x80) { /* while we are larger than 7 bits long */
54 63892           *varint_start++ = (number & 0x7f) | 0x80; /* write out the least significant 7 bits, set the high bit */
55 63892           number = number >> 7; /* shift off the 7 least significant bits */
56             }
57              
58             /* if it is the same size we can use a canonical varint */
59 118455 100         if ( varint_start == varint_end ) {
60 20308           *varint_start = number; /* encode the last 7 bits without the high bit being set */
61             } else {
62             /* if not we produce a non-canonical varint, basically we stuff
63             * 0 bits (via 0x80) into the "tail" of the varint, until we can
64             * stick in a null to terminate the sequence. This means that the
65             * varint is effectively "self-padding", and we only need special
66             * logic in the encoder - a decoder will happily process a non-canonical
67             * varint with no problem */
68 98147           *varint_start++ = (number & 0x7f) | 0x80;
69 122099 100         while ( varint_start < varint_end )
70 23952           *varint_start++ = 0x80;
71 98147           *varint_start= 0;
72             }
73 118455           }
74              
75             /* Lazy working buffer alloc */
76             SRL_STATIC_INLINE void
77 75600           srl_init_snappy_workmem(pTHX_ void **workmem)
78             {
79             /* Lazy working buffer alloc */
80 75600 100         if (expect_false(*workmem == NULL)) {
81             /* Cleaned up automatically by the cleanup handler */
82 19458           Newx(*workmem, CSNAPPY_WORKMEM_BYTES, char);
83 19458 50         if (*workmem == NULL)
84 0           croak("Out of memory!");
85             }
86 75600           }
87              
88             /* Destroy working buffer */
89             SRL_STATIC_INLINE void
90 244588           srl_destroy_snappy_workmem(pTHX_ void *workmem)
91             {
92 244588           Safefree(workmem);
93 244588           }
94              
95             SRL_STATIC_INLINE U8
96 1314079           srl_get_compression_header_flag(const U32 compress_flags)
97             {
98 1314079 100         if (compress_flags & SRL_F_COMPRESS_SNAPPY) {
99 30852           return SRL_PROTOCOL_ENCODING_SNAPPY;
100 1283227 100         } else if (compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL) {
101 394080           return SRL_PROTOCOL_ENCODING_SNAPPY_INCREMENTAL;
102 889147 100         } else if (compress_flags & SRL_F_COMPRESS_ZLIB) {
103 152138           return SRL_PROTOCOL_ENCODING_ZLIB;
104 737009 100         } else if (compress_flags & SRL_F_COMPRESS_ZSTD) {
105 100297           return SRL_PROTOCOL_ENCODING_ZSTD;
106             } else {
107 636712           return SRL_PROTOCOL_ENCODING_RAW;
108             }
109             }
110              
111             /* Sets the compression header flag */
112             SRL_STATIC_INLINE void
113 124935           srl_set_compression_header_flag(srl_buffer_t *buf, const U32 compress_flags)
114             {
115             /* sizeof(const char *) includes a count of \0 */
116 124935           srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;
117 124935           *flags_and_version_byte |= srl_get_compression_header_flag(compress_flags);
118 124935           }
119              
120             /* Resets the compression header flag to OFF.
121             * Obviously requires that a Sereal header was already written to the
122             * encoder's output buffer. */
123             SRL_STATIC_INLINE void
124 427497           srl_reset_compression_header_flag(srl_buffer_t *buf)
125             {
126             /* sizeof(const char *) includes a count of \0 */
127 427497           srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;
128              
129             /* disable snappy flag in header */
130 427497           *flags_and_version_byte = SRL_PROTOCOL_ENCODING_RAW |
131 427497           (*flags_and_version_byte & SRL_PROTOCOL_VERSION_MASK);
132 427497           }
133              
134             /* Compress body with one of available compressors (zlib, snappy).
135             * The function sets/resets compression bits at version byte.
136             * The caller has to adjust buf->body_pos by calling SRL_UPDATE_BODY_POS
137             * right after exiting from srl_compress_body.
138             */
139              
140             SRL_STATIC_INLINE void
141 198750           srl_compress_body(pTHX_ srl_buffer_t *buf, STRLEN sereal_header_length,
142             const U32 compress_flags, const int compress_level, void **workmem)
143             {
144 198750           const int is_traditional_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY;
145 198750           const int is_incremental_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL;
146 198750           const int is_zstd = compress_flags & SRL_F_COMPRESS_ZSTD;
147 198750 100         const int is_zlib = !is_traditional_snappy && !is_incremental_snappy && !is_zstd;
    100          
    100          
148              
149 198750           size_t uncompressed_body_length = BUF_POS_OFS(buf) - sereal_header_length;
150             size_t compressed_body_length;
151 198750           srl_buffer_char *varint_start = NULL;
152 198750           srl_buffer_char *varint_end = NULL;
153             srl_buffer_t old_buf;
154              
155             DEBUG_ASSERT_BUF_SANE(buf);
156              
157             /* Get estimated compressed payload length */
158 198750 100         if (is_incremental_snappy) {
159 69120           compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
160 69120           compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
161 129630 100         } else if (is_traditional_snappy) {
162 6480           compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
163 123150 100         } else if (is_zstd) {
164 49260           compressed_body_length = ZSTD_compressBound(uncompressed_body_length);
165 49260           compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
166             } else {
167 73890           compressed_body_length = (size_t) mz_compressBound(uncompressed_body_length);
168 73890           compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed uncommpressed packet length as varint */
169 73890           compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
170             }
171              
172             /* Back up old buffer and allocate new one with correct size */
173 198750           srl_buf_copy_buffer(aTHX_ buf, &old_buf);
174 198750           srl_buf_init_buffer(aTHX_ buf, sereal_header_length + compressed_body_length + 1);
175              
176             /* Copy Sereal header */
177 198750           Copy(old_buf.start, buf->pos, sereal_header_length, char);
178 198750           buf->pos += sereal_header_length;
179              
180             /* Embed uncompressed packet length if Zlib */
181 198750 100         if (is_zlib) srl_buf_cat_varint_nocheck(aTHX_ buf, 0, uncompressed_body_length);
182              
183             /* Embed compressed packet length if incr. Snappy, Zlib or Zstd*/
184 198750 100         if (is_incremental_snappy || is_zlib || is_zstd) {
    100          
    100          
185 192270           varint_start = buf->pos;
186 192270           srl_buf_cat_varint_nocheck(aTHX_ buf, 0, compressed_body_length);
187 192270           varint_end = buf->pos - 1;
188             }
189              
190 274350 100         if (is_incremental_snappy || is_traditional_snappy) {
    100          
191 75600           uint32_t len = (uint32_t) compressed_body_length;
192 75600           srl_init_snappy_workmem(aTHX_ workmem);
193              
194 75600           csnappy_compress((char*) (old_buf.start + sereal_header_length), (uint32_t) uncompressed_body_length,
195 75600           (char*) buf->pos, &len, *workmem, CSNAPPY_WORKMEM_BYTES_POWER_OF_TWO);
196              
197 75600           compressed_body_length = (size_t) len;
198 123150 100         } else if (is_zstd) {
199 49260           size_t code = ZSTD_compress((void*) buf->pos, compressed_body_length,
200 49260           (void*) (old_buf.start + sereal_header_length), uncompressed_body_length,
201             compress_level);
202              
203             assert(ZSTD_isError(code) == 0);
204 49260           compressed_body_length = code;
205 73890 50         } else if (is_zlib) {
206 73890           mz_ulong dl = (mz_ulong) compressed_body_length;
207 73890           int status = mz_compress2(
208 73890           buf->pos,
209             &dl,
210 73890           old_buf.start + sereal_header_length,
211             (mz_ulong) uncompressed_body_length,
212             compress_level
213             );
214              
215             (void)status;
216             assert(status == Z_OK);
217 73890           compressed_body_length = (size_t) dl;
218             }
219              
220             assert(compressed_body_length != 0);
221              
222             /* If compression didn't help, swap back to old, uncompressed buffer */
223 198750 100         if (compressed_body_length >= uncompressed_body_length) {
224             /* swap in old, uncompressed buffer */
225 73815           srl_buf_swap_buffer(aTHX_ buf, &old_buf);
226             /* disable compression flag */
227 73815           srl_reset_compression_header_flag(buf);
228             } else { /* go ahead with Snappy and do final fixups */
229             /* overwrite the max size varint with the real size of the compressed data */
230 124935 100         if (varint_start)
231 118455           srl_update_varint_from_to(aTHX_ varint_start, varint_end, compressed_body_length);
232              
233 124935           buf->pos += compressed_body_length;
234              
235             /* enable compression flag */
236 124935           srl_set_compression_header_flag(buf, compress_flags);
237             }
238              
239 198750           srl_buf_free_buffer(aTHX_ &old_buf);
240             DEBUG_ASSERT_BUF_SANE(buf);
241 198750           }
242              
243             #endif