diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c index e8ff24c..e8651c9 100644 --- a/src/http/modules/ngx_http_fastcgi_module.c +++ b/src/http/modules/ngx_http_fastcgi_module.c @@ -66,8 +66,10 @@ typedef struct { u_char *last; ngx_uint_t type; size_t length; + size_t output_padding; size_t padding; + unsigned output_done:1; unsigned fastcgi_stdout:1; unsigned large_stderr:1; @@ -135,6 +137,8 @@ static ngx_int_t ngx_http_fastcgi_create_key(ngx_http_request_t *r); static ngx_int_t ngx_http_fastcgi_create_request(ngx_http_request_t *r); static ngx_int_t ngx_http_fastcgi_reinit_request(ngx_http_request_t *r); static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_fastcgi_output_filter_init(void *data); +static ngx_int_t ngx_http_fastcgi_output_filter(void *data, ngx_chain_t *in); static ngx_int_t ngx_http_fastcgi_input_filter_init(void *data); static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf); @@ -232,6 +236,13 @@ static ngx_command_t ngx_http_fastcgi_commands[] = { offsetof(ngx_http_fastcgi_loc_conf_t, upstream.store_access), NULL }, + { ngx_string("fastcgi_request_buffering"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_fastcgi_loc_conf_t, upstream.request_buffering), + NULL }, + { ngx_string("fastcgi_ignore_client_abort"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -621,6 +632,17 @@ ngx_http_fastcgi_handler(ngx_http_request_t *r) u->finalize_request = ngx_http_fastcgi_finalize_request; r->state = 0; + r->request_buffering = flcf->upstream.request_buffering; + if (r->headers_in.content_length_n <= 0) { + r->request_buffering = 1; + } + + if (!r->request_buffering) { + u->output_filter_init = ngx_http_fastcgi_output_filter_init; + u->output_filter = ngx_http_fastcgi_output_filter; + u->output_filter_ctx = r; + } + u->buffering = 1; u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t)); @@ -1057,8 +1079,14 @@ ngx_http_fastcgi_create_request(ngx_http_request_t *r) h->padding_length = 0; h->reserved = 0; - h = (ngx_http_fastcgi_header_t *) b->last; - b->last += sizeof(ngx_http_fastcgi_header_t); + /* + * Don't send the last FASTCGI_STDIN record, It will be sent in the + * output filter + */ + if (r->request_buffering) { + h = (ngx_http_fastcgi_header_t *) b->last; + b->last += sizeof(ngx_http_fastcgi_header_t); + } if (flcf->upstream.pass_request_body) { body = r->upstream->request_bufs; @@ -1165,14 +1193,16 @@ ngx_http_fastcgi_create_request(ngx_http_request_t *r) r->upstream->request_bufs = cl; } - h->version = 1; - h->type = NGX_HTTP_FASTCGI_STDIN; - h->request_id_hi = 0; - h->request_id_lo = 1; - h->content_length_hi = 0; - h->content_length_lo = 0; - h->padding_length = 0; - h->reserved = 0; + if (r->request_buffering) { + h->version = 1; + h->type = NGX_HTTP_FASTCGI_STDIN; + h->request_id_hi = 0; + h->request_id_lo = 1; + h->content_length_hi = 0; + h->content_length_lo = 0; + h->padding_length = 0; + h->reserved = 0; + } cl->next = NULL; @@ -1202,6 +1232,195 @@ ngx_http_fastcgi_reinit_request(ngx_http_request_t *r) static ngx_int_t +ngx_http_fastcgi_output_filter_init(void *data) +{ + ngx_http_request_t *r = data; + + /* + * Remove the r->request_body from request_bufs. + * This part buffer will be processed in the output filter. + */ + r->upstream->request_bufs = NULL; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_fastcgi_output_filter(void *data, ngx_chain_t *in) +{ + size_t len, padding; + ngx_buf_t *b; + ngx_uint_t split; + ngx_chain_t **last_out, *cl; + ngx_http_request_t *r = data; + ngx_http_fastcgi_ctx_t *f; + ngx_http_request_body_t *rb; + ngx_http_fastcgi_header_t *h; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http fastcgi output filter"); + + f = ngx_http_get_module_ctx(r, ngx_http_fastcgi_module); + rb = r->request_body; + + if (r->upstream->request_bufs == NULL) { + last_out = &r->upstream->request_bufs; + } else { + cl = r->upstream->request_bufs; + + while (cl->next) { + cl = cl->next; + } + + last_out = &cl->next; + } + + if (in == NULL) { + + if (!f->output_done && rb->rest == 0) { + goto last_chunk; + } + + return NGX_OK; + } + + while (in) { + + split = 0; + + len = ngx_buf_size(in->buf); + if (len == 0) { + in = in->next; + continue; + } + + /* + * limit the output buffer to be 32k, fastcgi record can't be + * larger than 64k. + */ + if (len > 32768) { + split = 1; + len = 32768; + } + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + *last_out = cl; + last_out = &cl->next; + + b = ngx_create_temp_buf(r->pool, + sizeof(ngx_http_fastcgi_header_t) + f->output_padding); + if (b == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + + if (f->output_padding) { + ngx_memzero(b->last, f->output_padding); + b->last += f->output_padding; + f->output_padding = 0; + } + + h = (ngx_http_fastcgi_header_t *) b->last; + b->last += sizeof(ngx_http_fastcgi_header_t); + + padding = 8 - len % 8; + padding = (padding == 8) ? 0 : padding; + + h->version = 1; + h->type = NGX_HTTP_FASTCGI_STDIN; + h->request_id_hi = 0; + h->request_id_lo = 1; + h->content_length_hi = (u_char) ((len >> 8) & 0xff); + h->content_length_lo = (u_char) (len & 0xff); + h->padding_length = (u_char) padding; + h->reserved = 0; + + f->output_padding = padding; + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + *last_out = cl; + last_out = &cl->next; + + if (!split) { + cl->buf = in->buf; + } else { + b = ngx_calloc_buf(r->pool); + if (b == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + + b->temporary = 1; + b->start = b->pos = in->buf->pos; + b->end = b->last = in->buf->pos + len; + + in->buf->pos = in->buf->pos + len; + + continue; + } + + in = in->next; + } + + if (rb->rest) { + *last_out = NULL; + return NGX_OK; + } + +last_chunk: + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + *last_out = cl; + + b = ngx_create_temp_buf(r->pool, sizeof(ngx_http_fastcgi_header_t) + + f->output_padding); + if (b == NULL) { + return NGX_ERROR; + } + + if (f->output_padding) { + ngx_memzero(b->last, f->output_padding); + b->last += f->output_padding; + f->output_padding = 0; + } + + h = (ngx_http_fastcgi_header_t *) b->last; + b->last += sizeof(ngx_http_fastcgi_header_t); + + h->version = 1; + h->type = NGX_HTTP_FASTCGI_STDIN; + h->request_id_hi = 0; + h->request_id_lo = 1; + h->content_length_hi = 0; + h->content_length_lo = 0; + h->padding_length = 0; + h->reserved = 0; + + cl->buf = b; + cl->next = NULL; + + f->output_done = 1; + + return NGX_OK; +} + + +static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r) { u_char *p, *msg, *start, *last, @@ -2103,6 +2322,7 @@ ngx_http_fastcgi_create_loc_conf(ngx_conf_t *cf) conf->upstream.store = NGX_CONF_UNSET; conf->upstream.store_access = NGX_CONF_UNSET_UINT; + conf->upstream.request_buffering = NGX_CONF_UNSET; conf->upstream.buffering = NGX_CONF_UNSET; conf->upstream.ignore_client_abort = NGX_CONF_UNSET; @@ -2171,6 +2391,9 @@ ngx_http_fastcgi_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_uint_value(conf->upstream.store_access, prev->upstream.store_access, 0600); + ngx_conf_merge_value(conf->upstream.request_buffering, + prev->upstream.request_buffering, 1); + ngx_conf_merge_value(conf->upstream.buffering, prev->upstream.buffering, 1); diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c index 387f77f..7571d8b 100644 --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -101,6 +101,8 @@ static ngx_int_t ngx_http_proxy_create_request(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_proxy_output_filter_init(void *data); +static ngx_int_t ngx_http_proxy_output_filter(void *data, ngx_chain_t *in); static ngx_int_t ngx_http_proxy_input_filter_init(void *data); static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf); @@ -242,6 +244,13 @@ static ngx_command_t ngx_http_proxy_commands[] = { offsetof(ngx_http_proxy_loc_conf_t, upstream.store_access), NULL }, + { ngx_string("proxy_request_buffering"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_proxy_loc_conf_t, upstream.request_buffering), + NULL }, + { ngx_string("proxy_buffering"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -683,6 +692,17 @@ ngx_http_proxy_handler(ngx_http_request_t *r) u->rewrite_cookie = ngx_http_proxy_rewrite_cookie; } + r->request_buffering = plcf->upstream.request_buffering; + if (r->headers_in.content_length_n <= 0) { + r->request_buffering = 1; + } + + if (!r->request_buffering) { + u->output_filter_init = ngx_http_proxy_output_filter_init; + u->output_filter = ngx_http_proxy_output_filter; + u->output_filter_ctx = r; + } + u->buffering = plcf->upstream.buffering; u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t)); @@ -1264,6 +1284,42 @@ ngx_http_proxy_reinit_request(ngx_http_request_t *r) static ngx_int_t +ngx_http_proxy_output_filter_init(void *data) +{ + ngx_http_request_t *r = data; + + r->upstream->request_bufs = NULL; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_output_filter(void *data, ngx_chain_t *in) +{ + ngx_chain_t *cl; + ngx_http_request_t *r = data; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy output filter"); + + if (r->upstream->request_bufs == NULL) { + r->upstream->request_bufs = in; + } else { + cl = r->upstream->request_bufs; + + while (cl->next) { + cl = cl->next; + } + + cl->next = in; + } + + return NGX_OK; +} + + +static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r) { size_t len; @@ -2620,6 +2676,7 @@ ngx_http_proxy_create_loc_conf(ngx_conf_t *cf) conf->upstream.store = NGX_CONF_UNSET; conf->upstream.store_access = NGX_CONF_UNSET_UINT; + conf->upstream.request_buffering = NGX_CONF_UNSET; conf->upstream.buffering = NGX_CONF_UNSET; conf->upstream.ignore_client_abort = NGX_CONF_UNSET; @@ -2701,6 +2758,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_uint_value(conf->upstream.store_access, prev->upstream.store_access, 0600); + ngx_conf_merge_value(conf->upstream.request_buffering, + prev->upstream.request_buffering, 1); + ngx_conf_merge_value(conf->upstream.buffering, prev->upstream.buffering, 1); diff --git a/src/http/ngx_http.h b/src/http/ngx_http.h index f152006..5f5a357 100644 --- a/src/http/ngx_http.h +++ b/src/http/ngx_http.h @@ -117,6 +117,8 @@ ngx_int_t ngx_http_send_special(ngx_http_request_t *r, ngx_uint_t flags); ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler); +ngx_int_t ngx_http_do_read_non_buffered_client_request_body( + ngx_http_request_t *r); ngx_int_t ngx_http_send_header(ngx_http_request_t *r); ngx_int_t ngx_http_special_response_handler(ngx_http_request_t *r, diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c index e02a251..cb680c3 100644 --- a/src/http/ngx_http_core_module.c +++ b/src/http/ngx_http_core_module.c @@ -377,6 +377,13 @@ static ngx_command_t ngx_http_core_commands[] = { offsetof(ngx_http_core_loc_conf_t, client_body_buffer_size), NULL }, + { ngx_string("client_body_postpone_sending"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_core_loc_conf_t, client_body_postpone_sending), + NULL }, + { ngx_string("client_body_timeout"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, ngx_conf_set_msec_slot, @@ -3472,6 +3479,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf) clcf->client_max_body_size = NGX_CONF_UNSET; clcf->client_body_buffer_size = NGX_CONF_UNSET_SIZE; + clcf->client_body_postpone_sending = NGX_CONF_UNSET_SIZE; clcf->client_body_timeout = NGX_CONF_UNSET_MSEC; clcf->satisfy = NGX_CONF_UNSET_UINT; clcf->if_modified_since = NGX_CONF_UNSET_UINT; @@ -3677,6 +3685,19 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_size_value(conf->client_body_buffer_size, prev->client_body_buffer_size, (size_t) 2 * ngx_pagesize); + ngx_conf_merge_size_value(conf->client_body_postpone_sending, + prev->client_body_postpone_sending, + 64 * 1024); + + if (conf->client_body_postpone_sending < conf->client_body_buffer_size) { + conf->buffer_number = 1; + conf->client_body_postpone_sending = conf->client_body_buffer_size; + } else { + conf->buffer_number = (conf->client_body_postpone_sending + + conf->client_body_buffer_size - 1) + / conf->client_body_buffer_size; + } + ngx_conf_merge_msec_value(conf->client_body_timeout, prev->client_body_timeout, 60000); diff --git a/src/http/ngx_http_core_module.h b/src/http/ngx_http_core_module.h index e95d1e0..721c9c5 100644 --- a/src/http/ngx_http_core_module.h +++ b/src/http/ngx_http_core_module.h @@ -346,7 +346,10 @@ struct ngx_http_core_loc_conf_s { off_t directio; /* directio */ off_t directio_alignment; /* directio_alignment */ + size_t buffer_number; size_t client_body_buffer_size; /* client_body_buffer_size */ + size_t client_body_postpone_sending; + /* client_body_postpone_sending */ size_t send_lowat; /* send_lowat */ size_t postpone_output; /* postpone_output */ size_t limit_rate; /* limit_rate */ diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c index ee00fd3..4df35ad 100644 --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -495,6 +495,8 @@ ngx_http_init_request(ngx_event_t *rev) r->method = NGX_HTTP_UNKNOWN; + r->request_buffering = 1; + r->headers_in.content_length_n = -1; r->headers_in.keep_alive_n = -1; r->headers_out.content_length_n = -1; diff --git a/src/http/ngx_http_request.h b/src/http/ngx_http_request.h index c2651a8..6514bb5 100644 --- a/src/http/ngx_http_request.h +++ b/src/http/ngx_http_request.h @@ -283,6 +283,14 @@ typedef struct { off_t rest; ngx_chain_t *to_write; ngx_http_client_body_handler_pt post_handler; + + /* For non buffered request body */ + ngx_chain_t *out; + ngx_chain_t *busy; + ngx_chain_t *free; + ngx_chain_t **last_out; + ngx_uint_t num; + unsigned buffered; } ngx_http_request_body_t; @@ -460,6 +468,7 @@ struct ngx_http_request_s { unsigned uri_changed:1; unsigned uri_changes:4; + unsigned request_buffering:1; unsigned request_body_in_single_buf:1; unsigned request_body_in_file_only:1; unsigned request_body_in_persistent_file:1; diff --git a/src/http/ngx_http_request_body.c b/src/http/ngx_http_request_body.c index 3c69d05..c615256 100644 --- a/src/http/ngx_http_request_body.c +++ b/src/http/ngx_http_request_body.c @@ -14,6 +14,13 @@ static void ngx_http_read_client_request_body_handler(ngx_http_request_t *r); static ngx_int_t ngx_http_do_read_client_request_body(ngx_http_request_t *r); static ngx_int_t ngx_http_write_request_body(ngx_http_request_t *r, ngx_chain_t *body); + +static ngx_int_t ngx_http_read_non_buffered_client_request_body( + ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler); +static void ngx_http_read_non_buffered_client_request_body_handler( + ngx_http_request_t *r); +static ngx_int_t ngx_http_request_body_get_buf(ngx_http_request_t *r); +static void ngx_http_request_body_chomp_buf(ngx_http_request_body_t *rb); static ngx_int_t ngx_http_read_discarded_request_body(ngx_http_request_t *r); static ngx_int_t ngx_http_test_expect(ngx_http_request_t *r); @@ -62,6 +69,10 @@ ngx_http_read_client_request_body(ngx_http_request_t *r, return NGX_OK; } + if (!r->request_buffering) { + return ngx_http_read_non_buffered_client_request_body(r, post_handler); + } + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (r->headers_in.content_length_n == 0) { @@ -442,6 +453,346 @@ ngx_http_write_request_body(ngx_http_request_t *r, ngx_chain_t *body) } +static ngx_int_t +ngx_http_read_non_buffered_client_request_body(ngx_http_request_t *r, + ngx_http_client_body_handler_pt post_handler) +{ + size_t preread; + ngx_buf_t *b; + ngx_int_t rc; + ngx_http_request_body_t *rb; + ngx_http_core_loc_conf_t *clcf; + + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + if (r->headers_in.content_length_n == 0) { + post_handler(r); + return NGX_OK; + } + + rb = r->request_body; + + rb->post_handler = post_handler; + + preread = r->header_in->last - r->header_in->pos; + + if (preread) { + + /* there is the pre-read part of the request body */ + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http client no buffered request body preread %uz", + preread); + + b = ngx_calloc_buf(r->pool); + if (b == NULL) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + b->temporary = 1; + b->start = r->header_in->pos; + b->pos = r->header_in->pos; + b->last = r->header_in->last; + b->end = r->header_in->end; + + rb->bufs = ngx_alloc_chain_link(r->pool); + if (rb->bufs == NULL) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + rb->bufs->buf = b; + rb->bufs->next = NULL; + + rb->buf = b; + + if ((off_t) preread >= r->headers_in.content_length_n) { + + /* the whole request body was pre-read */ + + r->header_in->pos += (size_t) r->headers_in.content_length_n; + r->request_length += r->headers_in.content_length_n; + b->last = r->header_in->pos; + + post_handler(r); + + return NGX_OK; + } + + /* + * to not consider the body as pipelined request in + * ngx_http_set_keepalive() + */ + r->header_in->pos = r->header_in->last; + + r->request_length += preread; + + rb->rest = r->headers_in.content_length_n - preread; + + if (r->request_length >= (off_t) clcf->client_body_postpone_sending) { + + post_handler(r); + + return NGX_OK; + } + + if (rb->rest <= (off_t) (b->end - b->last)) { + + /* the whole request body could be placed in r->header_in */ + goto read_body; + } + + rb->last_out = &rb->bufs->next; + + } else { + rb->rest = r->headers_in.content_length_n; + rb->last_out = &rb->bufs; + } + +read_body: + + rb->buffered = 1; + + rc = ngx_http_do_read_non_buffered_client_request_body(r); + + if (rc == NGX_AGAIN) { + + if (rb->buffered) { + r->read_event_handler = + ngx_http_read_non_buffered_client_request_body_handler; + } + + } else if (rc == NGX_OK) { + post_handler(r); + } + + return rc; +} + + +static void +ngx_http_read_non_buffered_client_request_body_handler(ngx_http_request_t *r) +{ + ngx_int_t rc; + + if (r->connection->read->timedout) { + r->connection->timedout = 1; + ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + rc = ngx_http_do_read_non_buffered_client_request_body(r); + + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + ngx_http_finalize_request(r, rc); + } + + if (rc == NGX_OK) { + r->request_body->post_handler(r); + } +} + + +ngx_int_t +ngx_http_do_read_non_buffered_client_request_body(ngx_http_request_t *r) +{ + size_t size; + ssize_t n; + ngx_int_t rc; + ngx_connection_t *c; + ngx_http_request_body_t *rb; + ngx_http_core_loc_conf_t *clcf; + + c = r->connection; + rb = r->request_body; + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http read no buffered client request body"); + + for ( ;; ) { + for ( ;; ) { + + if ((rb->buf == NULL) || (rb->buf->end == rb->buf->last)) { + + rc = ngx_http_request_body_get_buf(r); + + if (rc == NGX_ERROR) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + + } else if (rc == NGX_AGAIN) { + + ngx_add_timer(c->read, clcf->client_body_timeout); + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + return NGX_AGAIN; + } + } + + size = rb->buf->end - rb->buf->last; + + if ((off_t) size > rb->rest) { + size = (size_t) rb->rest; + } + + n = c->recv(c, rb->buf->last, size); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http no buffered client request body recv %z", + n); + + if (n == NGX_AGAIN) { + break; + } + + if (n == 0) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, + "client prematurely closed connection"); + } + + if (n == 0 || n == NGX_ERROR) { + c->error = 1; + return NGX_HTTP_BAD_REQUEST; + } + + rb->buf->last += n; + rb->rest -= n; + r->request_length += n; + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http no buffered client request body " + "request_length: %O, rest: %uz", + r->request_length, rb->rest); + + if (rb->rest == 0) { + break; + } + + if (rb->buffered && + r->request_length >= (off_t) clcf->client_body_postpone_sending) { + + goto read_ok; + } + + if (rb->buf->last < rb->buf->end) { + break; + } + } + + ngx_http_request_body_chomp_buf(rb); + + if (rb->rest == 0) { + break; + } + + if (!c->read->ready) { + ngx_add_timer(c->read, clcf->client_body_timeout); + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + return NGX_AGAIN; + } + } + +read_ok: + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + r->read_event_handler = ngx_http_block_reading; + + rb->buffered = 0; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_request_body_get_buf(ngx_http_request_t *r) +{ + ngx_chain_t *cl; + ngx_http_request_body_t *rb; + ngx_http_core_loc_conf_t *clcf; + + rb = r->request_body; + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + if (rb->free) { + + cl = rb->free; + rb->free = rb->free->next; + + } else { + + if (rb->num > clcf->buffer_number) { + return NGX_AGAIN; + } + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ngx_create_temp_buf(r->pool, clcf->client_body_buffer_size); + if (cl->buf == NULL) { + return NGX_ERROR; + } + + cl->buf->tag = (ngx_buf_tag_t) &ngx_http_core_module; + cl->buf->recycled = 1; + + rb->num++; + } + + cl->next = NULL; + + rb->buf = cl->buf; + *rb->last_out = cl; + rb->last_out = &cl->next; + + return NGX_OK; +} + + +/* chomp the last zero buffer when possible */ +static void +ngx_http_request_body_chomp_buf(ngx_http_request_body_t *rb) +{ + + ngx_chain_t *cl, **pre; + + cl = rb->bufs; + pre = &rb->bufs; + + if (cl == NULL) { + return; + } + + while (cl->next) { + pre = &cl->next; + cl = cl->next; + } + + if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { + + rb->buf = NULL; + *pre = NULL; + + rb->last_out = pre; + + if (cl->buf->tag == (ngx_buf_tag_t) &ngx_http_core_module) { + cl->next = rb->free; + rb->free = cl; + } + } +} + + ngx_int_t ngx_http_discard_request_body(ngx_http_request_t *r) { diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c index 703017f..414471c 100644 --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -31,8 +31,14 @@ static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r, ngx_http_upstream_t *u); static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u); +static ngx_int_t ngx_http_upstream_output_filter_init(void *data); +static ngx_int_t ngx_http_upstream_output_filter(void *data, + ngx_chain_t *in); static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u); +static void ngx_http_upstream_send_non_buffered_request(ngx_http_request_t *r, + ngx_http_upstream_t *u); +static void ngx_http_upstream_read_non_buffered_request(ngx_http_request_t *r); static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u); static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r, @@ -502,6 +508,18 @@ ngx_http_upstream_init_request(ngx_http_request_t *r) u->request_bufs = r->request_body->bufs; } + if (!u->output_filter) { + u->output_filter_init = ngx_http_upstream_output_filter_init; + u->output_filter = ngx_http_upstream_output_filter; + u->output_filter_ctx = r; + } + + if (u->output_filter_init && u->output_filter_init(u->output_filter_ctx) + != NGX_OK) { + ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); + return; + } + if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; @@ -1182,6 +1200,17 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) u->writer.limit = 0; if (u->request_sent) { + + /* + * no buffering request can't reuse the request body when part of + * the body has been sent. + */ + if (!r->request_buffering) { + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + return; + } + if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); @@ -1231,6 +1260,11 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) #endif + if (!r->request_buffering) { + ngx_http_upstream_send_non_buffered_request(r, u); + return; + } + ngx_http_upstream_send_request(r, u); } @@ -1294,6 +1328,11 @@ ngx_http_upstream_ssl_handshake(ngx_connection_t *c) c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; + if (!r->request_buffering) { + ngx_http_upstream_send_non_buffered_request(r, u); + return; + } + ngx_http_upstream_send_request(r, u); return; @@ -1459,6 +1498,243 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) } +static ngx_int_t +ngx_http_upstream_output_filter_init(void *data) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_output_filter(void *data, ngx_chain_t *in) +{ + return NGX_OK; +} + + +static void +ngx_http_upstream_send_non_buffered_request(ngx_http_request_t *r, + ngx_http_upstream_t *u) +{ + off_t rest; + ngx_int_t rc; + ngx_connection_t *c; + ngx_http_request_body_t *rb; + + c = u->peer.connection; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream read/send no buffered request"); + + if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { + ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); + return; + } + + rb = r->request_body; + + for ( ;; ) { + + if (rb == NULL) { + + c->log->action = "sending no buffered request to upstream"; + + rc = ngx_output_chain(&u->output, + u->request_sent ? NULL : u->request_bufs); + + goto send_done; + } + + if (u->request_sent && rb->rest) { + c->log->action = "reading no buffered request body from client"; + rest = rb->rest; + + rc = ngx_http_do_read_non_buffered_client_request_body(r); + + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + ngx_http_upstream_finalize_request(r, u, rc); + return; + } + + if (rc == NGX_AGAIN && rb->busy == NULL && rest == rb->rest) { + + r->read_event_handler = + ngx_http_upstream_read_non_buffered_request; + + if (ngx_handle_read_event(r->connection->read, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + } + + return; + } + } + + c->log->action = "sending no buffered request to upstream"; + +#if (NGX_DEBUG) + ngx_buf_t *buf; + ngx_chain_t *cl; + + for (cl = u->request_bufs; cl; cl = cl->next) { + buf = cl->buf; + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream before out bufs: p=%p, s=%d, size=%uO", + buf, ngx_buf_special(buf), ngx_buf_size(buf)); + } + + for (cl = rb->bufs; cl; cl = cl->next) { + buf = cl->buf; + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream before rb->bufs: p=%p, s=%d, size=%uO", + buf, ngx_buf_special(buf), ngx_buf_size(buf)); + } + +#endif + if (u->output_filter(u->output_filter_ctx, rb->bufs) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + + return; + } + +#if (NGX_DEBUG) + for (cl = u->request_bufs; cl; cl = cl->next) { + buf = cl->buf; + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream send out bufs: p=%p, s=%d, size=%uO", + buf, ngx_buf_special(buf), ngx_buf_size(buf)); + } +#endif + + rc = ngx_output_chain(&u->output, u->request_bufs); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream send no buffered request: rc=%i", rc); + + ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &u->request_bufs, + (ngx_buf_tag_t) &ngx_http_core_module); + +#if (NGX_DEBUG) + for (cl = rb->busy; cl; cl = cl->next) { + buf = cl->buf; + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream send busy bufs: p=%p, s=%d, size=%uO", + buf, ngx_buf_special(buf), ngx_buf_size(buf)); + } + + for (cl = rb->free; cl; cl = cl->next) { + buf = cl->buf; + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream send free bufs: p=%p, s=%d, size=%uO", + buf, ngx_buf_special(buf), ngx_buf_size(buf)); + } +#endif + + rb->bufs = NULL; + rb->buf = NULL; + rb->last_out = &rb->bufs; + u->request_bufs = NULL; + +send_done: + + u->request_sent = 1; + + if (rc == NGX_ERROR) { + + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + + return; + } + + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + if (rc == NGX_AGAIN) { + ngx_add_timer(c->write, u->conf->send_timeout); + + if (ngx_handle_write_event(c->write, u->conf->send_lowat) + != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + return; + } + + return; + } + + /* rc == NGX_OK */ + + if (rb == NULL || rb->rest == 0) { + break; + } + } + + /* send all the request body */ + + if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { + if (ngx_tcp_push(c->fd) == NGX_ERROR) { + ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, + ngx_tcp_push_n " failed"); + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + return; + } + + c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; + } + + ngx_add_timer(c->read, u->conf->read_timeout); + +#if 1 + if (c->read->ready) { + + /* post aio operation */ + + /* + * TODO comment + * although we can post aio operation just in the end + * of ngx_http_upstream_connect() CHECK IT !!! + * it's better to do here because we postpone header buffer allocation + */ + + ngx_http_upstream_process_header(r, u); + return; + } +#endif + + u->write_event_handler = ngx_http_upstream_dummy_handler; + + if (ngx_handle_write_event(c->write, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, + NGX_HTTP_INTERNAL_SERVER_ERROR); + return; + } +} + + +static void +ngx_http_upstream_read_non_buffered_request(ngx_http_request_t *r) +{ + ngx_connection_t *c; + ngx_http_upstream_t *u; + + c = r->connection; + u = r->upstream; + + if (c->read->timedout) { + c->timedout = 1; + ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out"); + ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + ngx_http_upstream_send_non_buffered_request(r, u); +} + + static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u) @@ -1484,6 +1760,11 @@ ngx_http_upstream_send_request_handler(ngx_http_request_t *r, #endif + if (!r->request_buffering) { + ngx_http_upstream_send_non_buffered_request(r, u); + return; + } + if (u->header_sent) { u->write_event_handler = ngx_http_upstream_dummy_handler; diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h index f32c985..fff1c85 100644 --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -143,6 +143,7 @@ typedef struct { ngx_uint_t ignore_headers; ngx_uint_t next_upstream; ngx_uint_t store_access; + ngx_flag_t request_buffering; ngx_flag_t buffering; ngx_flag_t pass_request_headers; ngx_flag_t pass_request_body; @@ -284,6 +285,12 @@ struct ngx_http_upstream_s { ngx_chain_t *busy_bufs; ngx_chain_t *free_bufs; + /* Nginx => Upstream */ + ngx_int_t (*output_filter_init)(void *data); + ngx_int_t (*output_filter)(void *data, ngx_chain_t *in); + void *output_filter_ctx; + + /* Upstream => Nginx */ ngx_int_t (*input_filter_init)(void *data); ngx_int_t (*input_filter)(void *data, ssize_t bytes); void *input_filter_ctx;