Index: ext/zlib/zlib.c =================================================================== --- ext/zlib/zlib.c (revision 36283) +++ ext/zlib/zlib.c (working copy) @@ -544,12 +544,18 @@ struct zstream { #define ZSTREAM_FLAG_IN_STREAM 0x2 #define ZSTREAM_FLAG_FINISHED 0x4 #define ZSTREAM_FLAG_CLOSING 0x8 -#define ZSTREAM_FLAG_UNUSED 0x10 +#define ZSTREAM_FLAG_GZFILE 0x10 /* disallows yield from expand_buffer for + gzip*/ +#define ZSTREAM_FLAG_UNUSED 0x20 #define ZSTREAM_READY(z) ((z)->flags |= ZSTREAM_FLAG_READY) #define ZSTREAM_IS_READY(z) ((z)->flags & ZSTREAM_FLAG_READY) #define ZSTREAM_IS_FINISHED(z) ((z)->flags & ZSTREAM_FLAG_FINISHED) #define ZSTREAM_IS_CLOSING(z) ((z)->flags & ZSTREAM_FLAG_CLOSING) +#define ZSTREAM_IS_GZFILE(z) ((z)->flags & ZSTREAM_FLAG_GZFILE) + +#define ZSTREAM_EXPAND_BUFFER_OK 0 +#define ZSTREAM_EXPAND_BUFFER_MEM_ERROR (-1) /* I think that more better value should be found, but I gave up finding it. B) */ @@ -568,8 +574,9 @@ static const struct zstream_funcs inflat struct zstream_run_args { struct zstream * z; - int flush; - int interrupt; + int flush; /* stream flush value for inflate() or deflate() */ + int interrupt; /* stop processing the stream and return to ruby */ + int jump_state; /* for buffer expansion block break or exception */ }; static voidpf @@ -614,33 +621,50 @@ zstream_init(struct zstream *z, const st static void zstream_expand_buffer(struct zstream *z) { - long inc; - if (NIL_P(z->buf)) { - /* I uses rb_str_new here not rb_str_buf_new because - rb_str_buf_new makes a zero-length string. */ - z->buf = rb_str_new(0, ZSTREAM_INITIAL_BUFSIZE); - z->buf_filled = 0; - z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf); - z->stream.avail_out = ZSTREAM_INITIAL_BUFSIZE; - RBASIC(z->buf)->klass = 0; + zstream_expand_buffer_into(z, ZSTREAM_INITIAL_BUFSIZE); return; } - if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { - /* to keep other threads from freezing */ - z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX; + if (!ZSTREAM_IS_GZFILE(z) && rb_block_given_p()) { + if (z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { + int state = 0; + VALUE self = (VALUE)z->stream.opaque; + + rb_str_resize(z->buf, z->buf_filled); + RBASIC(z->buf)->klass = rb_cString; + OBJ_INFECT(z->buf, self); + + rb_protect(rb_yield, z->buf, &state); + + z->buf = Qnil; + zstream_expand_buffer_into(z, ZSTREAM_AVAIL_OUT_STEP_MAX); + + if (state) + rb_jump_tag(state); + + return; + } + else { + zstream_expand_buffer_into(z, + ZSTREAM_AVAIL_OUT_STEP_MAX - z->buf_filled); + } } else { - inc = z->buf_filled / 2; - if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) { - inc = ZSTREAM_AVAIL_OUT_STEP_MIN; + if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { + z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX; } - rb_str_resize(z->buf, z->buf_filled + inc); - z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ? - (int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX; + else { + long inc = z->buf_filled / 2; + if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) { + inc = ZSTREAM_AVAIL_OUT_STEP_MIN; + } + rb_str_resize(z->buf, z->buf_filled + inc); + z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ? + (int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX; + } + z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled; } - z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled; } static void @@ -663,12 +687,25 @@ zstream_expand_buffer_into(struct zstrea } static int +zstream_expand_buffer_protect(struct zstream *z) +{ + int state = 0; + + rb_protect((VALUE (*)(VALUE))zstream_expand_buffer, (VALUE)z, &state); + + return state; +} + +static int zstream_expand_buffer_without_gvl(struct zstream *z) { char * new_str; long inc, len; - if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { + if (rb_block_given_p()) { + return rb_thread_call_with_gvl(zstream_expand_buffer_protect, (void *)z); + } + else if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX; } else { @@ -682,7 +719,7 @@ zstream_expand_buffer_without_gvl(struct new_str = realloc(RSTRING(z->buf)->as.heap.ptr, len + 1); if (!new_str) - return 0; + return ZSTREAM_EXPAND_BUFFER_MEM_ERROR; /* from rb_str_resize */ RSTRING(z->buf)->as.heap.ptr = new_str; @@ -695,7 +732,7 @@ zstream_expand_buffer_without_gvl(struct } z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled; - return 1; + return ZSTREAM_EXPAND_BUFFER_OK; } static void @@ -862,7 +899,7 @@ zstream_passthrough_input(struct zstream static VALUE zstream_detach_input(struct zstream *z) { - VALUE dst; + VALUE dst, self = (VALUE)z->stream.opaque; if (NIL_P(z->input)) { dst = rb_str_new(0, 0); @@ -920,7 +957,7 @@ static VALUE zstream_run_func(void *ptr) { struct zstream_run_args *args = (struct zstream_run_args *)ptr; - int err, flush = args->flush; + int err = Z_OK, flush = args->flush; struct zstream *z = args->z; uInt n; @@ -943,10 +980,17 @@ zstream_run_func(void *ptr) break; } - if (!zstream_expand_buffer_without_gvl(z)) { + state = zstream_expand_buffer_without_gvl(z); + + if (state == ZSTREAM_EXPAND_BUFFER_MEM_ERROR) { err = Z_MEM_ERROR; /* realloc failed */ break; } + else if (state) { + err = Z_OK; /* buffer expanded but stream processing was stopped */ + args->jump_state = state; + break; + } } return (VALUE)err; @@ -973,6 +1017,7 @@ zstream_run(struct zstream *z, Bytef *sr args.z = z; args.flush = flush; args.interrupt = 0; + args.jump_state = 0; if (NIL_P(z->input) && len == 0) { z->stream.next_in = (Bytef*)""; @@ -1024,6 +1069,9 @@ loop: zstream_append_input(z, z->stream.next_in, z->stream.avail_in); guard = Qnil; /* prevent tail call to make guard effective */ } + + if (args.jump_state) + rb_jump_tag(args.jump_state); } static VALUE @@ -1206,8 +1254,13 @@ rb_zstream_reset(VALUE obj) } /* - * Finishes the stream and flushes output buffer. See Zlib::Deflate#finish and - * Zlib::Inflate#finish for details of this behavior. + * call-seq: + * finish -> String + * finish { |chunk| ... } -> nil + * + * Finishes the stream and flushes output buffer. If a block is given each + * chunk is yielded to the block until the input buffer has been flushed to + * the output buffer. */ static VALUE rb_zstream_finish(VALUE obj) @@ -1220,7 +1273,13 @@ rb_zstream_finish(VALUE obj) } /* - * Flushes input buffer and returns all data in that buffer. + * call-seq: + * flush_next_out -> String + * flush_next_out { |chunk| ... } -> nil + * + * Flushes output buffer and returns all data in that buffer. If a block is + * given each chunk is yielded to the block until the current output buffer + * has been flushed. */ static VALUE rb_zstream_flush_next_in(VALUE obj) @@ -1502,13 +1561,13 @@ deflate_run(VALUE args) /* * Document-method: Zlib::Deflate.deflate * - * call-seq: Zlib.deflate(string[, level]) - * Zlib::Deflate.deflate(string[, level]) + * call-seq: + * Zlib.deflate(string[, level]) + * Zlib::Deflate.deflate(string[, level]) * * Compresses the given +string+. Valid values of level are - * NO_COMPRESSION, BEST_SPEED, - * BEST_COMPRESSION, DEFAULT_COMPRESSION, and an - * integer from 0 to 9 (the default is 6). + * Zlib::NO_COMPRESSION, Zlib::BEST_SPEED, Zlib::BEST_COMPRESSION, + * Zlib::DEFAULT_COMPRESSION, or an integer from 0 to 9 (the default is 6). * * This method is almost equivalent to the following code: * @@ -1562,17 +1621,19 @@ do_deflate(struct zstream *z, VALUE src, } /* - * Document-method: Zlib#deflate + * Document-method: Zlib::Deflate#deflate * * call-seq: - * deflate(string, flush = Zlib::NO_FLUSH) + * z.deflate(string, flush = Zlib::NO_FLUSH) -> String + * z.deflate(string, flush = Zlib::NO_FLUSH) { |chunk| ... } -> nil * * Inputs +string+ into the deflate stream and returns the output from the * stream. On calling this method, both the input and the output buffers of - * the stream are flushed. + * the stream are flushed. If +string+ is nil, this method finishes the + * stream, just like Zlib::ZStream#finish. * - * If +string+ is nil, this method finishes the stream, just like - * Zlib::ZStream#finish. + * If a block is given consecutive deflated chunks from the +string+ are + * yielded to the block and +nil+ is returned. * * The +flush+ parameter specifies the flush mode. The following constants * may be used: @@ -1619,10 +1680,13 @@ rb_deflate_addstr(VALUE obj, VALUE src) * Document-method: Zlib::Deflate#flush * * call-seq: - * flush(flush = Zlib::SYNC_FLUSH) + * flush(flush = Zlib::SYNC_FLUSH) -> String + * flush(flush = Zlib::SYNC_FLUSH) { |chunk| ... } -> nil * * This method is equivalent to deflate('', flush). This method is - * just provided to improve the readability of your Ruby program. + * just provided to improve the readability of your Ruby program. If a block + * is given chunks of deflate output are yielded to the block until the buffer + * is flushed. * * See Zlib::Deflate#deflate for detail on the +flush+ constants NO_FLUSH, * SYNC_FLUSH, FULL_FLUSH and FINISH. @@ -1810,9 +1874,11 @@ inflate_run(VALUE args) } /* - * Document-method: Zlib::Inflate.inflate + * Document-method: Zlib::inflate * - * call-seq: Zlib::Inflate.inflate(string) + * call-seq: + * Zlib.inflate(string) + * Zlib::Inflate.inflate(string) * * Decompresses +string+. Raises a Zlib::NeedDict exception if a preset * dictionary is needed for decompression. @@ -1888,12 +1954,17 @@ rb_inflate_add_dictionary(VALUE obj, VAL /* * Document-method: Zlib::Inflate#inflate * - * call-seq: inflate(string) + * call-seq: + * inflate(deflate_string) -> String + * inflate(deflate_string) { |chunk| ... } -> nil + * + * Inputs +deflate_string+ into the inflate stream and returns the output from + * the stream. Calling this method, both the input and the output buffer of + * the stream are flushed. If string is +nil+, this method finishes the + * stream, just like Zlib::ZStream#finish. * - * Inputs +string+ into the inflate stream and returns the output from the - * stream. Calling this method, both the input and the output buffer of the - * stream are flushed. If string is +nil+, this method finishes the stream, - * just like Zlib::ZStream#finish. + * If a block is given consecutive inflated chunks from the +deflate_string+ + * are yielded to the block and +nil+ is returned. * * Raises a Zlib::NeedDict exception if a preset dictionary is needed to * decompress. Set the dictionary by Zlib::Inflate#set_dictionary and then @@ -2167,6 +2238,7 @@ gzfile_new(klass, funcs, endfunc) obj = Data_Make_Struct(klass, struct gzfile, gzfile_mark, gzfile_free, gz); zstream_init(&gz->z, funcs); + gz->z.flags |= ZSTREAM_FLAG_GZFILE; gz->io = Qnil; gz->level = 0; gz->mtime = 0; Index: test/zlib/test_zlib.rb =================================================================== --- test/zlib/test_zlib.rb (revision 36282) +++ test/zlib/test_zlib.rb (working copy) @@ -39,6 +39,62 @@ if defined? Zlib assert_raise(Zlib::StreamError) { Zlib::Deflate.deflate("foo", 10000) } end + def test_deflate_chunked + original = '' + chunks = [] + r = Random.new 0 + + z = Zlib::Deflate.new + + 2.times do + input = r.bytes(16384) + original << input + z.deflate(input) do |chunk| + chunks << chunk + end + end + + assert_equal [2, 16384, 10], + chunks.map { |chunk| chunk.length } + + final = z.finish + + assert_equal 16388, final.length + + all = chunks.join + all << final + + inflated = Zlib.inflate all + + assert_equal original, inflated + end + + def test_deflate_chunked_break + chunks = [] + r = Random.new 0 + + z = Zlib::Deflate.new + + input = r.bytes(16384) + z.deflate(input) do |chunk| + chunks << chunk + break + end + + assert_equal [2], chunks.map { |chunk| chunk.length } + + final = z.finish + + assert_equal 16393, final.length + + all = chunks.join + all << final + + original = Zlib.inflate all + + assert_equal input, original + end + def test_addstr z = Zlib::Deflate.new z << "foo" @@ -202,6 +258,40 @@ if defined? Zlib assert_equal "foofoofoo", out end + def test_finish_chunked + # zeros = Zlib::Deflate.deflate("0" * 100_000) + zeros = "x\234\355\3011\001\000\000\000\302\240J\353\237\316\032\036@" \ + "\001\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\257\006\351\247BH" + + chunks = [] + + z = Zlib::Inflate.new + + z.inflate(zeros) do |chunk| + chunks << chunk + break + end + + p :finish + + z.finish do |chunk| + chunks << chunk + end + + assert_equal [16384, 16384, 16384, 16384, 16384, 16384, 1696], + chunks.map { |chunk| chunk.size } + + assert chunks.all? { |chunk| + chunk =~ /\A0+\z/ + } + end + def test_inflate s = Zlib::Deflate.deflate("foo") z = Zlib::Inflate.new @@ -212,6 +302,58 @@ if defined? Zlib z << "foo" # ??? end + def test_inflate_chunked + # s = Zlib::Deflate.deflate("0" * 100_000) + zeros = "x\234\355\3011\001\000\000\000\302\240J\353\237\316\032\036@" \ + "\001\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\257\006\351\247BH" + + chunks = [] + + z = Zlib::Inflate.new + + z.inflate(zeros) do |chunk| + chunks << chunk + end + + assert_equal [16384, 16384, 16384, 16384, 16384, 16384, 1696], + chunks.map { |chunk| chunk.size } + + assert chunks.all? { |chunk| + chunk =~ /\A0+\z/ + } + end + + def test_inflate_chunked_break + # zeros = Zlib::Deflate.deflate("0" * 100_000) + zeros = "x\234\355\3011\001\000\000\000\302\240J\353\237\316\032\036@" \ + "\001\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ + "\000\000\000\000\000\000\000\257\006\351\247BH" + + chunks = [] + + z = Zlib::Inflate.new + + z.inflate(zeros) do |chunk| + chunks << chunk + break + end + + out = z.inflate nil + + assert_equal 100_000 - chunks.first.length, out.length + end + def test_inflate_dictionary dictionary = "foo"