From 6cf83f1216390bbe51bde92ce6b28af95914778d Mon Sep 17 00:00:00 2001 From: Gourav Date: Tue, 9 Jun 2026 07:54:15 +0530 Subject: [PATCH] Keepalive http2 modification and Add Multiplexer --- src/http/modules/ngx_http_proxy_v2_module.c | 225 +++++- .../ngx_http_upstream_keepalive_module.c | 744 ++++++++++++++++-- src/http/ngx_http_upstream.c | 232 +++++- src/http/ngx_http_upstream.h | 34 + 4 files changed, 1123 insertions(+), 112 deletions(-) diff --git a/src/http/modules/ngx_http_proxy_v2_module.c b/src/http/modules/ngx_http_proxy_v2_module.c index 0be5691aa..f688a4757 100644 --- a/src/http/modules/ngx_http_proxy_v2_module.c +++ b/src/http/modules/ngx_http_proxy_v2_module.c @@ -46,6 +46,8 @@ typedef struct { ngx_chain_t *free; ngx_chain_t *busy; + ngx_http_request_t *request; + ngx_http_proxy_v2_conn_t *connection; ngx_uint_t id; @@ -84,6 +86,10 @@ typedef struct { unsigned literal:1; unsigned field_huffman:1; + ssize_t window_charge; /* DATA payload framed but not yet charged to window */ + + off_t last_sent; /* c->sent at last window charge */ + unsigned header_sent:1; unsigned output_closed:1; unsigned output_blocked:1; @@ -160,12 +166,18 @@ static ngx_http_proxy_v2_ctx_t * ngx_http_proxy_v2_get_ctx(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r, ngx_http_proxy_v2_ctx_t *ctx, ngx_peer_connection_t *pc); +static ngx_http_upstream_mux_hooks_t ngx_http_proxy_v2_mux_hooks; static void ngx_http_proxy_v2_cleanup(void *data); static void ngx_http_proxy_v2_abort_request(ngx_http_request_t *r); static void ngx_http_proxy_v2_finalize_request(ngx_http_request_t *r, ngx_int_t rc); +static void ngx_http_proxy_v2_charge_window(ngx_http_proxy_v2_ctx_t *ctx, + off_t bytes_written); + + + static ngx_http_module_t ngx_http_proxy_v2_module_ctx = { NULL, /* preconfiguration */ @@ -239,6 +251,7 @@ ngx_http_proxy_v2_handler(ngx_http_request_t *r) plcf->upstream.preserve_output = 1; u = r->upstream; + u->http2 = 1; if (plcf->proxy_lengths == NULL) { ctx->ctx.vars = plcf->vars; @@ -943,7 +956,9 @@ ngx_http_proxy_v2_reinit_request(ngx_http_request_t *r) ctx->status = 0; ctx->rst = 0; ctx->goaway = 0; + ctx->connection = NULL; + ctx->in = NULL; ctx->busy = NULL; ctx->out = NULL; @@ -964,7 +979,7 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in) ngx_int_t rc; ngx_uint_t next, last; ngx_chain_t *cl, *out, *ln, **ll; - ngx_http_upstream_t *u; + ngx_http_upstream_t *u = r->upstream; ngx_http_proxy_v2_ctx_t *ctx; ngx_http_proxy_v2_frame_t *f; @@ -1014,6 +1029,9 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in) f->stream_id_2 = (u_char) ((ctx->id >> 8) & 0xff); f->stream_id_3 = (u_char) (ctx->id & 0xff); + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "proxy v2: patched frame sid=%ui", ctx->id); + p += (f->length_0 << 16) + (f->length_1 << 8) + f->length_2; } } @@ -1164,8 +1182,8 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in) f->stream_id_3 = (u_char) (ctx->id & 0xff); limit -= len; - ctx->send_window -= len; - ctx->connection->send_window -= len; + ctx->window_charge += len; + /* window decrement deferred to write-completion time */ } while (!next && limit > 0); @@ -1261,6 +1279,17 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in) rc = ngx_chain_writer(&r->upstream->writer, out); + if (u != NULL) { + ngx_connection_t *sc = u->peer.connection; + if (sc != NULL) { + off_t delta = sc->sent - ctx->last_sent; + if (delta > 0) { + ngx_http_proxy_v2_charge_window(ctx, delta); + } + ctx->last_sent = sc->sent; + } + } + ngx_chain_update_chains(r->pool, &ctx->free, &ctx->busy, &out, (ngx_buf_tag_t) &ngx_http_proxy_v2_body_output_filter); @@ -1297,7 +1326,6 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in) * here anyway. */ - u = r->upstream; u->length = 0; u->pipe->length = 0; @@ -4050,9 +4078,20 @@ ngx_http_proxy_v2_get_ctx(ngx_http_request_t *r) ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_v2_module); - if (ctx->connection == NULL) { - u = r->upstream; + u = r->upstream; + /* + * Force (re)resolution to the real v2_conn on the backend c's pool + * when we now have a real peer connection. This ensures the *creating* + * request (which saw c==NULL during create_request and got a temp + * v2c with id=0) picks up the shared real v2c (and registers for + * the multiplexer) once the socket exists. Second+ requests (fresh + * ctx, connection==NULL) also go through here. + */ + if (ctx->connection == NULL + || (u && u->peer.connection != NULL + && (ctx->id == 0 || ctx->connection->init_window == 0))) + { if (ngx_http_proxy_v2_get_connection_data(r, ctx, &u->peer) != NGX_OK) { return NULL; } @@ -4066,10 +4105,14 @@ static ngx_int_t ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r, ngx_http_proxy_v2_ctx_t *ctx, ngx_peer_connection_t *pc) { - ngx_connection_t *c; - ngx_pool_cleanup_t *cln; + ngx_connection_t *c; + ngx_pool_cleanup_t *cln; + ngx_http_upstream_t *u; + ngx_http_upstream_mux_t *mux; c = pc->connection; + u = r->upstream; + ctx->request = r; if (c == NULL) { ctx->connection = ngx_palloc(r->pool, sizeof(ngx_http_proxy_v2_conn_t)); @@ -4082,29 +4125,24 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r, goto done; } - if (pc->cached) { - - /* - * for cached connections, connection data can be found - * in the cleanup handler - */ - - for (cln = c->pool->cleanup; cln; cln = cln->next) { - if (cln->handler == ngx_http_proxy_v2_cleanup) { - ctx->connection = cln->data; - break; - } + /* + * Look for existing per-connection v2 state attached to this + * backend c's pool. Also find (or create) the upstream mux. + */ + for (cln = c->pool->cleanup; cln; cln = cln->next) { + if (cln->handler == ngx_http_proxy_v2_cleanup) { + ctx->connection = cln->data; + break; } + } + + if (ctx->connection != NULL) { + /* reuse of already-initialized v2 state on this c */ - if (ctx->connection == NULL) { - ngx_log_error(NGX_LOG_ERR, c->log, 0, - "no connection data found for " - "keepalive http2 connection"); - return NGX_ERROR; - } ctx->send_window = ctx->connection->init_window; ctx->recv_window = NGX_HTTP_V2_MAX_WINDOW; + ctx->last_sent = c->sent; ctx->connection->last_stream_id += 2; ctx->id = ctx->connection->last_stream_id; @@ -4112,6 +4150,8 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r, return NGX_OK; } + /* first H2 proxy use ever on this backend connection c */ + cln = ngx_pool_cleanup_add(c->pool, sizeof(ngx_http_proxy_v2_conn_t)); if (cln == NULL) { return NGX_ERROR; @@ -4120,6 +4160,16 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r, cln->handler = ngx_http_proxy_v2_cleanup; ctx->connection = cln->data; + /* Create upstream mux for this connection (u->mux assigned by upstream layer) */ + + mux = ngx_http_upstream_mux_get(c); + if (mux == NULL) { + return NGX_ERROR; + } + + mux->protocol_data = ctx->connection; + mux->hooks = &ngx_http_proxy_v2_mux_hooks; + ctx->id = 1; done: @@ -4130,9 +4180,20 @@ done: ctx->send_window = NGX_HTTP_V2_DEFAULT_WINDOW; ctx->recv_window = NGX_HTTP_V2_MAX_WINDOW; + ctx->last_sent = c ? c->sent : 0; ctx->connection->last_stream_id = 1; + if (c != NULL) { + u->keepalive = 1; + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0, + "proxy v2: NEW v2c=%p on #%uA c=%p fd=%d pool=%p " + "ctx->id=%ui", + ctx->connection, c->number, c, c->fd, c->pool, + ctx->id); + } + return NGX_OK; } @@ -4147,6 +4208,101 @@ ngx_http_proxy_v2_cleanup(void *data) return; } +/* Return the v2_conn state (if any) attached to this backend connection's + * pool via the sentinel cleanup. Used by core upstream (guards, force + * event drive on reuse) and internally. Returns void* so callers in + * core upstream do not need the private struct type. + */ +void * +ngx_http_proxy_v2_get_connection(ngx_connection_t *c) +{ + ngx_pool_cleanup_t *cln; + + if (c == NULL || c->pool == NULL) { + return NULL; + } + + for (cln = c->pool->cleanup; cln; cln = cln->next) { + if (cln->handler == ngx_http_proxy_v2_cleanup) { + return cln->data; + } + } + + return NULL; +} + + + +/* + * Charge the flow-control window for DATA payload that was actually + * written to the socket. Called from drive_multiplex (slow path) and + * ngx_http_upstream.c (fast path) right after ngx_http_upstream_send_request. + * + * We defer window decrement from frame-construction time (body_output_filter) + * to write-completion time so that frames queued by enqueue_request (dummy + * send_chain) do not phantom-consume the window. + * + * To handle partial writes correctly we use c->sent: the caller saves + * c->sent before the write and passes the delta. Only that many DATA + * bytes (capped by window_charge) are charged. + */ +static void +ngx_http_proxy_v2_charge_window(ngx_http_proxy_v2_ctx_t *ctx, + off_t bytes_written) +{ + ssize_t charge; + + if (ctx == NULL || ctx->window_charge <= 0 || bytes_written <= 0) { + return; + } + + charge = ctx->window_charge; + if (bytes_written < charge) { + charge = (ssize_t) bytes_written; + } + + ngx_log_debug4(NGX_LOG_DEBUG_HTTP, + ctx->request ? ctx->request->connection->log : NULL, 0, + "proxy v2: charge window sid=%ui charge=%d (of %d) stream_win=%d", + ctx->id, charge, ctx->window_charge, (int) ctx->send_window); + + ctx->send_window -= charge; + ctx->connection->send_window -= charge; + ctx->window_charge -= charge; +} + + +static ngx_int_t +ngx_http_proxy_v2_mux_can_send(ngx_http_upstream_t *u) +{ + ngx_http_proxy_v2_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(u->request, ngx_http_proxy_v2_module); + if (ctx == NULL) { + return 1; + } + + return ctx->send_window +} + + +static ngx_int_t +ngx_http_proxy_v2_mux_is_done(ngx_http_upstream_t *u) +{ + + if (u->writer.out != NULL) { + return 0; + } + + return 1; +} + + +static ngx_http_upstream_mux_hooks_t ngx_http_proxy_v2_mux_hooks = { + ngx_http_proxy_v2_mux_can_send, + ngx_http_proxy_v2_mux_is_done, +}; + static void ngx_http_proxy_v2_abort_request(ngx_http_request_t *r) @@ -4160,7 +4316,24 @@ ngx_http_proxy_v2_abort_request(ngx_http_request_t *r) static void ngx_http_proxy_v2_finalize_request(ngx_http_request_t *r, ngx_int_t rc) { + ngx_http_proxy_v2_ctx_t *ctx; + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "finalize proxy http2 request"); + + ngx_http_upstream_t *u; + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_v2_module); + if (ctx != NULL) { + u = r->upstream; + if (u != NULL && u->in_mux_queue) { + ngx_http_upstream_mux_dequeue(u); + } + ctx->connection = NULL; + } + return; } + + + diff --git a/src/http/modules/ngx_http_upstream_keepalive_module.c b/src/http/modules/ngx_http_upstream_keepalive_module.c index c605e98ad..47fb2fe8b 100644 --- a/src/http/modules/ngx_http_upstream_keepalive_module.c +++ b/src/http/modules/ngx_http_upstream_keepalive_module.c @@ -9,6 +9,10 @@ #include #include +#if (NGX_HTTP_V2) +#include +#endif + typedef struct { ngx_uint_t max_cached; @@ -23,6 +27,9 @@ typedef struct { ngx_uint_t local; /* unsigned local:1; */ + ngx_uint_t max_streams_per_connection; + ngx_uint_t max_streams_total; + ngx_uint_t total_active_streams; } ngx_http_upstream_keepalive_srv_conf_t; @@ -37,6 +44,9 @@ typedef struct { ngx_http_upstream_conf_t *tag; + ngx_uint_t active_streams; + + unsigned h2:1; } ngx_http_upstream_keepalive_cache_t; @@ -57,6 +67,11 @@ typedef struct { ngx_event_notify_peer_pt original_notify; + unsigned h2_enabled:1; + + ngx_uint_t max_streams_per_connection; + ngx_uint_t max_streams_total; + } ngx_http_upstream_keepalive_peer_data_t; @@ -87,6 +102,32 @@ static char *ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); +static ngx_http_upstream_keepalive_cache_t * + ngx_http_upstream_keepalive_find_cached_connection(ngx_queue_t *cache, + ngx_connection_t *c); +static void ngx_http_upstream_keepalive_adjust_total( + ngx_http_upstream_keepalive_srv_conf_t *kcf, ngx_uint_t delta); +static void ngx_http_upstream_keepalive_prepare_connection( + ngx_connection_t *c, ngx_peer_connection_t *pc); +static void ngx_http_upstream_keepalive_set_idle( + ngx_http_upstream_keepalive_cache_t *item, ngx_connection_t *c, + ngx_peer_connection_t *pc, ngx_http_upstream_t *u); +static ngx_int_t ngx_http_upstream_get_keepalive_peer_h1( + ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp); +static ngx_int_t ngx_http_upstream_get_keepalive_peer_h2( + ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp); +static void ngx_http_upstream_free_keepalive_peer_h1( + ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp, + ngx_uint_t state); +static void ngx_http_upstream_free_keepalive_peer_h2( + ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp, + ngx_uint_t state); +static ngx_uint_t ngx_http_upstream_keepalive_h2_connection_bad( + ngx_peer_connection_t *pc, ngx_uint_t state); +static void ngx_http_upstream_keepalive_drop_h2_connection( + ngx_http_upstream_keepalive_srv_conf_t *kcf, + ngx_http_upstream_keepalive_cache_t *item, ngx_peer_connection_t *pc); + static ngx_command_t ngx_http_upstream_keepalive_commands[] = { @@ -118,6 +159,21 @@ static ngx_command_t ngx_http_upstream_keepalive_commands[] = { offsetof(ngx_http_upstream_keepalive_srv_conf_t, requests), NULL }, + { ngx_string("keepalive_max_streams_per_connection"), + NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_SRV_CONF_OFFSET, + offsetof(ngx_http_upstream_keepalive_srv_conf_t, + max_streams_per_connection), + NULL }, + + { ngx_string("keepalive_max_streams_total"), + NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_SRV_CONF_OFFSET, + offsetof(ngx_http_upstream_keepalive_srv_conf_t, max_streams_total), + NULL }, + ngx_null_command }; @@ -153,6 +209,193 @@ ngx_module_t ngx_http_upstream_keepalive_module = { }; +static ngx_http_upstream_keepalive_cache_t * +ngx_http_upstream_keepalive_find_cached_connection(ngx_queue_t *cache, + ngx_connection_t *c) +{ + ngx_queue_t *q; + ngx_http_upstream_keepalive_cache_t *item; + + for (q = ngx_queue_head(cache); + q != ngx_queue_sentinel(cache); + q = ngx_queue_next(q)) + { + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + + if (item->connection == c) { + return item; + } + } + + return NULL; +} + + +/* Acquire a cache slot for a new connection. + * For H2: only evict connections that currently have zero active streams. + * Returns a card removed from free or from cache (with its old conn closed). + * Returns NULL if no safe slot (all cards busy with streams). + */ +static ngx_http_upstream_keepalive_cache_t * +ngx_http_upstream_keepalive_get_slot(ngx_http_upstream_keepalive_srv_conf_t *kcf) +{ + ngx_queue_t *q; + ngx_http_upstream_keepalive_cache_t *item; + + if (!ngx_queue_empty(&kcf->free)) { + q = ngx_queue_head(&kcf->free); + ngx_queue_remove(q); + return ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + } + + /* H2-safe eviction: only close idle (active_streams == 0) connections. + * Walk from tail (LRU) to find the least-recent idle one. + */ + for (q = ngx_queue_last(&kcf->cache); + q != ngx_queue_sentinel(&kcf->cache); + q = ngx_queue_prev(q)) + { + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + if (item->active_streams == 0) { + ngx_queue_remove(q); + ngx_http_upstream_keepalive_close(item->connection); + return item; + } + } + + return NULL; +} + + +static void +ngx_http_upstream_keepalive_attach_new_h2(void *data, ngx_connection_t *c, + ngx_peer_connection_t *pc) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *item; + ngx_http_upstream_t *u; + + if (kp == NULL) { + return; + } + + kcf = kp->conf; + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc ? pc->log : kp->upstream ? kp->upstream->peer.log : NULL, 0, + "attach_new_h2: entered, h2_enabled=%d", kp->h2_enabled); + + if (!kp->h2_enabled) { + return; + } + + u = kp->upstream; + + item = ngx_http_upstream_keepalive_get_slot(kcf); + if (item == NULL) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc ? pc->log : NULL, 0, + "attach_new_h2: get_slot returned NULL, not parking early"); + /* No safe slot (all pooled conns are busy). Do not add this + * connection to the cache so we don't evict active work. + * Current request can still use the new socket; it just won't + * be offered for multiplexing to other requests. + */ + return; + } + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc ? pc->log : c->log, 0, + "attach_new_h2: slot=%p parking #%uA c=%p fd=%d " + "pool=%p active=1", + item, c->number, c, c->fd, c->pool); + + item->connection = c; + item->active_streams = 1; + item->h2 = 1; + item->tag = u->conf; + item->socklen = pc->socklen; + ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); + + ngx_queue_insert_head(&kcf->cache, &item->queue); + + /* + * We do not call prepare_connection() here. For a freshly created + * connection the core upstream code (after this hook returns) will + * initialize the pool, logs, c->data = r, handlers etc. We only need + * the item in the cache with active_streams=1 so that concurrent + * get_keepalive_peer_h2 calls can find and share this connection. + * Do not call set_idle() — this conn is still in use by the current + * request. + */ +} + + +static void +ngx_http_upstream_keepalive_adjust_total( + ngx_http_upstream_keepalive_srv_conf_t *kcf, ngx_uint_t delta) +{ + if (delta == 0) { + return; + } + + if (delta > kcf->total_active_streams) { + kcf->total_active_streams = 0; + return; + } + + kcf->total_active_streams -= delta; +} + + +static void +ngx_http_upstream_keepalive_prepare_connection(ngx_connection_t *c, + ngx_peer_connection_t *pc) +{ + c->idle = 0; + c->sent = 0; + c->data = NULL; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + c->pool->log = pc->log; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } +} + + +static void +ngx_http_upstream_keepalive_set_idle( + ngx_http_upstream_keepalive_cache_t *item, ngx_connection_t *c, + ngx_peer_connection_t *pc, ngx_http_upstream_t *u) +{ + c->read->delayed = 0; + ngx_add_timer(c->read, item->conf->timeout); + + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->write->handler = ngx_http_upstream_keepalive_dummy_handler; + c->read->handler = ngx_http_upstream_keepalive_close_handler; + + c->data = item; + c->idle = 1; + c->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + c->pool->log = ngx_cycle->log; + + item->tag = u->conf; + + item->socklen = pc->socklen; + ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); + + if (c->read->ready) { + ngx_http_upstream_keepalive_close_handler(c->read); + } +} + + static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) @@ -192,9 +435,44 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session; #endif - if (r->upstream->peer.notify) { - kp->original_notify = r->upstream->peer.notify; - r->upstream->peer.notify = ngx_http_upstream_notify_keepalive_peer; + /* Always install notify wrapper (after the L1 original_init_peer has run). + * This ensures NGX_HTTP_UPSTREAM_NOTIFY_CONNECT is delivered for early + * H2 parking even when no other module (sticky/least_time) provided a + * base notify. We save whatever was there (possibly NULL) and chain. + */ + kp->original_notify = r->upstream->peer.notify; + r->upstream->peer.notify = ngx_http_upstream_notify_keepalive_peer; + +#if (NGX_HTTP_V2) + { + ngx_http_proxy_loc_conf_t *plcf; + + plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module); + if (plcf && plcf->http_version == NGX_HTTP_VERSION_20) { + kp->h2_enabled = 1; + if (kp->upstream) { + kp->upstream->http2 = 1; + } + + /* if user did not set the H2 keepalive knobs, apply defaults for this request */ + if (kcf->max_streams_per_connection == NGX_CONF_UNSET_UINT) { + kp->max_streams_per_connection = 100; /* sensible default */ + } else { + kp->max_streams_per_connection = kcf->max_streams_per_connection; + } + + if (kcf->max_streams_total == NGX_CONF_UNSET_UINT) { + kp->max_streams_total = kp->max_streams_per_connection * kcf->max_cached; + } else { + kp->max_streams_total = kcf->max_streams_total; + } + } + } +#endif + + if (!kp->h2_enabled) { + kp->max_streams_per_connection = kcf->max_streams_per_connection; + kp->max_streams_total = kcf->max_streams_total; } return NGX_OK; @@ -202,28 +480,14 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, static ngx_int_t -ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) +ngx_http_upstream_get_keepalive_peer_h1(ngx_peer_connection_t *pc, + ngx_http_upstream_keepalive_peer_data_t *kp) { - ngx_http_upstream_keepalive_peer_data_t *kp = data; - ngx_http_upstream_keepalive_cache_t *item; + ngx_http_upstream_keepalive_cache_t *item; - ngx_int_t rc; ngx_queue_t *q, *cache; ngx_connection_t *c; - ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, - "get keepalive peer"); - - /* ask balancer */ - - rc = kp->original_get_peer(pc, kp->data); - - if (rc != NGX_OK) { - return rc; - } - - /* search cache for suitable connection */ - cache = &kp->conf->cache; for (q = ngx_queue_head(cache); @@ -237,6 +501,10 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) continue; } + if (item->h2) { + continue; + } + if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, item->socklen, pc->socklen) == 0) @@ -244,29 +512,102 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) ngx_queue_remove(q); ngx_queue_insert_head(&kp->conf->free, q); - goto found; + ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using #%uA c=%p fd=%d pool=%p", + c->number, c, c->fd, c->pool); + + ngx_http_upstream_keepalive_prepare_connection(c, pc); + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; } } return NGX_OK; +} -found: - ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, - "get keepalive peer: using connection %p", c); +static ngx_int_t +ngx_http_upstream_get_keepalive_peer_h2(ngx_peer_connection_t *pc, + ngx_http_upstream_keepalive_peer_data_t *kp) +{ + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *item, *best; - c->idle = 0; - c->sent = 0; - c->data = NULL; - c->log = pc->log; - c->read->log = pc->log; - c->write->log = pc->log; - c->pool->log = pc->log; + ngx_queue_t *q, *cache; + ngx_connection_t *c; + ngx_uint_t best_active; - if (c->read->timer_set) { - ngx_del_timer(c->read); + kcf = kp->conf; + + if (kcf->total_active_streams >= kp->max_streams_total) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: total stream limit reached"); + + return NGX_BUSY; } + cache = &kcf->cache; + best = NULL; + best_active = NGX_MAX_UINT32_VALUE; + + for (q = ngx_queue_head(cache); + q != ngx_queue_sentinel(cache); + q = ngx_queue_next(q)) + { + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + c = item->connection; + + if (kp->conf->local && item->tag != kp->upstream->conf) { + continue; + } + + if (!item->h2) { + continue; + } + + if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, + item->socklen, pc->socklen) + != 0) + { + continue; + } + + if (item->active_streams >= kp->max_streams_per_connection) { + continue; + } + + if (item->active_streams < best_active) { + best = item; + best_active = item->active_streams; + } + } + + if (best == NULL) { + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get_h2: no suitable cached conn found (best==NULL), will create new. " + "total_active=%ui max_total=%ui", + kcf->total_active_streams, kp->max_streams_total); + kcf->total_active_streams++; + + return NGX_OK; + } + + c = best->connection; + + best->active_streams++; + kcf->total_active_streams++; + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using #%uA c=%p fd=%d pool=%p " + "slot=%p active_streams=%ui total=%ui", + c->number, c, c->fd, c->pool, best, + best->active_streams, kcf->total_active_streams); + + ngx_http_upstream_keepalive_prepare_connection(c, pc); + pc->connection = c; pc->cached = 1; @@ -274,62 +615,98 @@ found: } -static void -ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, - ngx_uint_t state) +static ngx_int_t +ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) { ngx_http_upstream_keepalive_peer_data_t *kp = data; - ngx_http_upstream_keepalive_cache_t *item; - ngx_queue_t *q; - ngx_connection_t *c; - ngx_http_upstream_t *u; + ngx_int_t rc; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, - "free keepalive peer"); + "get keepalive peer"); - /* cache valid connections */ + rc = kp->original_get_peer(pc, kp->data); + + if (rc != NGX_OK) { + return rc; + } + + if (kp->h2_enabled) { + return ngx_http_upstream_get_keepalive_peer_h2(pc, kp); + } + + return ngx_http_upstream_get_keepalive_peer_h1(pc, kp); +} + + +static ngx_uint_t +ngx_http_upstream_keepalive_h2_connection_bad(ngx_peer_connection_t *pc, + ngx_uint_t state) +{ + ngx_connection_t *c; + + if (state & NGX_PEER_FAILED) { + return 1; + } - u = kp->upstream; c = pc->connection; - if (state & NGX_PEER_FAILED - || c == NULL + if (c == NULL || c->read->eof || c->read->error || c->read->timedout || c->write->error || c->write->timedout) { - goto invalid; + return 1; } - if (c->requests >= kp->conf->requests) { - goto invalid; + return 0; +} + + +static void +ngx_http_upstream_keepalive_drop_h2_connection( + ngx_http_upstream_keepalive_srv_conf_t *kcf, + ngx_http_upstream_keepalive_cache_t *item, ngx_peer_connection_t *pc) +{ + ngx_connection_t *c; + + if (item->active_streams > 0) { + ngx_http_upstream_keepalive_adjust_total(kcf, item->active_streams); + item->active_streams = 0; } - if (ngx_current_msec - c->start_time > kp->conf->time) { - goto invalid; + c = item->connection; + + ngx_queue_remove(&item->queue); + ngx_queue_insert_head(&kcf->free, &item->queue); + item->connection = NULL; + + if (c) { + ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0, + "keepalive h2 drop: #%uA c=%p fd=%d pool=%p slot=%p", + c->number, c, c->fd, c->pool, item); + + ngx_http_upstream_keepalive_close(c); } - if (!u->keepalive) { - goto invalid; - } + pc->connection = NULL; +} - if (!u->request_body_sent) { - goto invalid; - } - if (ngx_terminate || ngx_exiting) { - goto invalid; - } +static void +ngx_http_upstream_free_keepalive_peer_h1(ngx_peer_connection_t *pc, + ngx_http_upstream_keepalive_peer_data_t *kp, ngx_uint_t state) +{ + ngx_http_upstream_keepalive_cache_t *item; - if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - goto invalid; - } + ngx_queue_t *q; + ngx_connection_t *c; + ngx_http_upstream_t *u; - ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, - "free keepalive peer: saving connection %p", c); + u = kp->upstream; + c = pc->connection; if (ngx_queue_empty(&kp->conf->free)) { @@ -351,35 +728,182 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, item->connection = c; item->tag = u->conf; + item->h2 = 0; pc->connection = NULL; - c->read->delayed = 0; - ngx_add_timer(c->read, kp->conf->timeout); + ngx_http_upstream_keepalive_set_idle(item, c, pc, u); +} - if (c->write->timer_set) { - ngx_del_timer(c->write); + +static void +ngx_http_upstream_free_keepalive_peer_h2(ngx_peer_connection_t *pc, + ngx_http_upstream_keepalive_peer_data_t *kp, ngx_uint_t state) +{ + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_connection_t *c; + ngx_http_upstream_t *u; + + kcf = kp->conf; + u = kp->upstream; + c = pc->connection; + + item = ngx_http_upstream_keepalive_find_cached_connection(&kcf->cache, c); + + if (item) { + + if (item->active_streams > 0) { + item->active_streams--; + ngx_http_upstream_keepalive_adjust_total(kcf, 1); + } + + pc->connection = NULL; + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0, + "keepalive h2 free: #%uA c=%p fd=%d pool=%p slot=%p " + "active_streams=%ui total=%ui", + c->number, c, c->fd, c->pool, item, + item->active_streams, kcf->total_active_streams); + + if (item->active_streams > 0) { + return; + } + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0, + "keepalive h2 idle: #%uA c=%p fd=%d last stream done", + c->number, c, c->fd); + + ngx_http_upstream_keepalive_set_idle(item, c, pc, u); + + return; } - c->write->handler = ngx_http_upstream_keepalive_dummy_handler; - c->read->handler = ngx_http_upstream_keepalive_close_handler; - - c->data = item; - c->idle = 1; - c->log = ngx_cycle->log; - c->read->log = ngx_cycle->log; - c->write->log = ngx_cycle->log; - c->pool->log = ngx_cycle->log; - - item->socklen = pc->socklen; - ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); - - if (c->read->ready) { - ngx_http_upstream_keepalive_close_handler(c->read); + if (kcf->total_active_streams > 0) { + kcf->total_active_streams--; } + item = ngx_http_upstream_keepalive_get_slot(kcf); + if (item == NULL) { + /* No safe slot available (all pooled connections are busy). + * Do not park this connection. The socket will be closed by + * the normal upstream path after this request/stream ends. + */ + return; + } + + ngx_queue_insert_head(&kcf->cache, &item->queue); + + item->connection = c; + item->active_streams = 0; + item->h2 = 1; + item->tag = u->conf; + + pc->connection = NULL; + + ngx_http_upstream_keepalive_set_idle(item, c, pc, u); +} + + +static void +ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_connection_t *c; + ngx_http_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer"); + + u = kp->upstream; + c = pc->connection; + kcf = kp->conf; + + /* + * H2 multiplex: cached connections use per-stream teardown. + * Do not apply the H1 !u->keepalive gate to shared sockets. + */ + if (kp->h2_enabled && c != NULL) { + item = ngx_http_upstream_keepalive_find_cached_connection(&kcf->cache, + c); + if (item != NULL) { + if (ngx_http_upstream_keepalive_h2_connection_bad(pc, state)) { + ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0, + "keepalive h2 free cached bad: #%uA c=%p fd=%d", + c->number, c, c->fd); + + ngx_http_upstream_keepalive_drop_h2_connection(kcf, item, pc); + + } else { + ngx_http_upstream_free_keepalive_peer_h2(pc, kp, state); + } + + goto done; + } + } + + if (ngx_http_upstream_keepalive_h2_connection_bad(pc, state)) { + goto invalid; + } + + if (c->requests >= kcf->requests) { + goto invalid; + } + + if (ngx_current_msec - c->start_time > kcf->time) { + goto invalid; + } + + if (!u->keepalive) { + goto invalid; + } + + if (!u->request_body_sent) { + goto invalid; + } + + if (ngx_terminate || ngx_exiting) { + goto invalid; + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto invalid; + } + + ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer: saving #%uA c=%p fd=%d pool=%p", + c->number, c, c->fd, c->pool); + + if (kp->h2_enabled) { + ngx_http_upstream_free_keepalive_peer_h2(pc, kp, state); + goto done; + } + + ngx_http_upstream_free_keepalive_peer_h1(pc, kp, state); + + goto done; + invalid: + if (kp->h2_enabled) { + item = c ? ngx_http_upstream_keepalive_find_cached_connection( + &kcf->cache, c) : NULL; + + if (item) { + ngx_http_upstream_keepalive_drop_h2_connection(kcf, item, pc); + + } else if (kcf->total_active_streams > 0) { + kcf->total_active_streams--; + } + } + +done: + kp->original_free_peer(pc, kp->data, state); } @@ -428,10 +952,20 @@ close: item = c->data; conf = item->conf; + if (item->h2) { + if (item->active_streams > 0) { + ngx_http_upstream_keepalive_adjust_total(conf, + item->active_streams); + item->active_streams = 0; + item->h2 = 0; + } + } + ngx_http_upstream_keepalive_close(c); ngx_queue_remove(&item->queue); ngx_queue_insert_head(&conf->free, &item->queue); + item->connection = NULL; } @@ -487,7 +1021,28 @@ ngx_http_upstream_notify_keepalive_peer(ngx_peer_connection_t *pc, void *data, { ngx_http_upstream_keepalive_peer_data_t *kp = data; - kp->original_notify(pc, kp->data, type); + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "notify keepalive peer: type=%ui pc=%p", + type, pc); + + if (type == NGX_HTTP_UPSTREAM_NOTIFY_CONNECT) { + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "notify keepalive: CONNECT received, kp=%p", kp); + if (kp && kp->h2_enabled) { + ngx_connection_t *c = pc->connection; + if (c != NULL) { + ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "notify keepalive: calling attach for " + "#%uA c=%p fd=%d pool=%p", + c->number, c, c->fd, c->pool); + ngx_http_upstream_keepalive_attach_new_h2(kp, c, pc); + } + } + } + + if (kp && kp->original_notify) { + kp->original_notify(pc, kp->data, type); + } } @@ -507,12 +1062,15 @@ ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) * * conf->original_init_peer = NULL; * conf->local = 0; + * conf->total_active_streams = 0; */ conf->time = NGX_CONF_UNSET_MSEC; conf->timeout = NGX_CONF_UNSET_MSEC; conf->requests = NGX_CONF_UNSET_UINT; conf->max_cached = NGX_CONF_UNSET_UINT; + conf->max_streams_per_connection = NGX_CONF_UNSET_UINT; + conf->max_streams_total = NGX_CONF_UNSET_UINT; return conf; } @@ -554,6 +1112,22 @@ ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, void *conf) kcf->max_cached = 32; } + if (kcf->max_streams_per_connection != NGX_CONF_UNSET_UINT) { + + if (kcf->max_streams_total == NGX_CONF_UNSET_UINT) { + kcf->max_streams_total = + kcf->max_streams_per_connection * kcf->max_cached; + } + + if (kcf->max_streams_total < kcf->max_streams_per_connection) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "keepalive_max_streams_total must be " + "greater than or equal to " + "keepalive_max_streams_per_connection"); + return NGX_CONF_ERROR; + } + } + kcf->original_init_peer = uscfp[i]->peer.init; uscfp[i]->peer.init = ngx_http_upstream_init_keepalive_peer; @@ -572,6 +1146,8 @@ ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, void *conf) for (j = 0; j < kcf->max_cached; j++) { ngx_queue_insert_head(&kcf->free, &cached[j].queue); cached[j].conf = kcf; + cached[j].active_streams = 0; + cached[j].h2 = 0; } } diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c index bb8964add..ea3335eab 100644 --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -46,6 +46,8 @@ static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u); static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r); +static void ngx_http_upstream_read_request_handler_delayed(ngx_http_request_t *r, + ngx_http_upstream_t *u); static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u); static ngx_int_t ngx_http_upstream_process_early_hints(ngx_http_request_t *r, @@ -1322,7 +1324,11 @@ ngx_http_upstream_handler(ngx_event_t *ev) } if (ev->write) { - u->write_event_handler(r, u); + if (u->mux != NULL) { + ngx_http_upstream_mux_drive(u->peer.connection, u->mux); + } else { + u->write_event_handler(r, u); + } } else { u->read_event_handler(r, u); @@ -1553,6 +1559,162 @@ ngx_http_upstream_check_broken_connection(ngx_http_request_t *r, } +/* + * Upstream mux: multiplexes multiple HTTP/2 streams over a single + * upstream connection via a send queue and fair round-robin scheduling. + * The mux is stored as a cleanup on the upstream connection's pool. + */ + + +static void +ngx_http_upstream_mux_cleanup(void *data) +{ + /* pool-owned; nothing to free */ +} + + +ngx_http_upstream_mux_t * +ngx_http_upstream_mux_get(ngx_connection_t *c) +{ + ngx_http_upstream_mux_t *mux; + ngx_pool_cleanup_t *cln; + + if (c == NULL || c->pool == NULL) { + return NULL; + } + + for (cln = c->pool->cleanup; cln; cln = cln->next) { + if (cln->handler == ngx_http_upstream_mux_cleanup) { + return cln->data; + } + } + + /* not found — create */ + cln = ngx_pool_cleanup_add(c->pool, sizeof(ngx_http_upstream_mux_t)); + if (cln == NULL) { + return NULL; + } + + mux = cln->data; + ngx_memzero(mux, sizeof(ngx_http_upstream_mux_t)); + ngx_queue_init(&mux->send_queue); + + cln->handler = ngx_http_upstream_mux_cleanup; + + return mux; +} + + +void +ngx_http_upstream_mux_enqueue(ngx_http_request_t *r, ngx_http_upstream_t *u) +{ + ngx_http_upstream_mux_t *mux; + + mux = u->mux; + if (mux == NULL) { + return; + } + + if (u->in_mux_queue) { + return; + } + + ngx_queue_insert_tail(&mux->send_queue, &u->mux_send_link); + u->in_mux_queue = 1; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream mux: enqueued r=%p", r); +} + + +void +ngx_http_upstream_mux_dequeue(ngx_http_upstream_t *u) +{ + if (!u->in_mux_queue) { + return; + } + + ngx_queue_remove(&u->mux_send_link); + ngx_queue_init(&u->mux_send_link); + u->in_mux_queue = 0; +} + + +ngx_uint_t +ngx_http_upstream_mux_queue_has_waiters(ngx_http_upstream_mux_t *mux) +{ + if (mux == NULL) { + return 0; + } + + return !ngx_queue_empty(&mux->send_queue); +} + + +void +ngx_http_upstream_mux_drive(ngx_connection_t *c, ngx_http_upstream_mux_t *mux) +{ + ngx_queue_t *q; + ngx_http_request_t *r; + ngx_http_upstream_t *u; + + if (mux == NULL) { + return; + } + + if (ngx_queue_empty(&mux->send_queue)) { + return; + } + + q = ngx_queue_head(&mux->send_queue); + u = ngx_queue_data(q, ngx_http_upstream_t, mux_send_link); + r = u->request; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream mux: drive r=%p", r); + + if (mux->hooks && mux->hooks->can_send(u)) { + u->mux_handler(r, u); + + if (mux->hooks->is_done(u)) { + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream mux: is_done #%uA r=%p dequeue", + c->number, r); + + ngx_http_upstream_mux_dequeue(u); + + if (!ngx_queue_empty(&mux->send_queue)) { + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream mux: post write #%uA " + "(queue has waiters)", + c->number); + + ngx_post_event(c->write, &ngx_posted_events); + + } else { + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream mux: queue empty #%uA", + c->number); + } + + } else { + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream mux: not done #%uA r=%p " + "(stay in queue)", + c->number, r); + } + + } else { + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream mux: can_send false #%uA r=%p rotate", + c->number, r); + + ngx_queue_remove(q); + ngx_queue_insert_tail(&mux->send_queue, q); + } +} + + static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) { @@ -1645,7 +1807,7 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) if (r->connection->tcp_nopush == NGX_TCP_NOPUSH_DISABLED) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; } - + if (c->pool == NULL) { /* we need separate pool here to be able to cache SSL connections */ @@ -1663,6 +1825,17 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) c->read->log = c->log; c->write->log = c->log; + if (u->http2) { + u->mux = ngx_http_upstream_mux_get(c); + } + + if (rc == NGX_OK || rc == NGX_AGAIN) { + if (u->peer.notify) { + u->peer.notify(&u->peer, u->peer.data, + NGX_HTTP_UPSTREAM_NOTIFY_CONNECT); + } + } + /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); @@ -1706,6 +1879,7 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) r->request_body->buf->tag = u->output.tag; } + u->request = r; u->request_sent = 0; u->request_body_sent = 0; u->request_body_blocked = 0; @@ -1725,6 +1899,26 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) #endif + if (u->mux != NULL) { + + if (rc == NGX_DONE) { + ngx_uint_t was_empty = ngx_queue_empty(&u->mux->send_queue); + + u->mux_handler = u->write_event_handler; + ngx_http_upstream_mux_enqueue(r, u); + + if (was_empty) + { + ngx_http_upstream_mux_drive(c, u->mux); + } + + return; + } + + /* for new H2 conn (rc != DONE): u->mux assigned here at connect time; + enqueue/drive will be handled by later send/read paths */ + } + ngx_http_upstream_send_request(r, u, 1); } @@ -2180,6 +2374,10 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u, } if (rc == NGX_AGAIN) { + if (u->mux) { + u->mux_handler = u->write_event_handler; + } + if (!c->write->ready || u->request_body_blocked) { ngx_add_timer(c->write, u->conf->send_timeout); @@ -2418,6 +2616,23 @@ ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_send_request(r, u, 1); } +static void +ngx_http_upstream_read_request_handler_delayed(ngx_http_request_t *r, + ngx_http_upstream_t *u) +{ + ngx_connection_t *c; + + c = r->connection; + + if (c->read->timedout) { + c->timedout = 1; + ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + ngx_http_upstream_send_request(r, u, 0); +} + static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r) @@ -2437,6 +2652,19 @@ ngx_http_upstream_read_request_handler(ngx_http_request_t *r) return; } + if (u->mux) { + ngx_uint_t was_empty = ngx_queue_empty(&u->mux->send_queue); + + u->mux_handler = ngx_http_upstream_read_request_handler_delayed; + ngx_http_upstream_mux_enqueue(r, u); + + if (was_empty) { + ngx_http_upstream_mux_drive(u->peer.connection, u->mux); + } + + return; + } + ngx_http_upstream_send_request(r, u, 0); } diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h index 4560bbe9a..26345ee6f 100644 --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -58,6 +58,7 @@ #define NGX_HTTP_UPSTREAM_NOTIFY_HEADER 0x1 +#define NGX_HTTP_UPSTREAM_NOTIFY_CONNECT 0x2 typedef struct { @@ -334,6 +335,9 @@ typedef void (*ngx_http_upstream_handler_pt)(ngx_http_request_t *r, ngx_http_upstream_t *u); +typedef struct ngx_http_upstream_mux_s ngx_http_upstream_mux_t; + + struct ngx_http_upstream_s { ngx_http_upstream_handler_pt read_event_handler; ngx_http_upstream_handler_pt write_event_handler; @@ -412,12 +416,20 @@ struct ngx_http_upstream_s { unsigned keepalive:1; unsigned upgrade:1; unsigned error:1; + unsigned http2:1; unsigned request_sent:1; unsigned request_body_sent:1; unsigned request_body_blocked:1; unsigned header_sent:1; unsigned response_received:1; + + ngx_http_request_t *request; + + ngx_http_upstream_mux_t *mux; + ngx_queue_t mux_send_link; + unsigned in_mux_queue:1; + ngx_http_upstream_handler_pt mux_handler; }; @@ -427,6 +439,18 @@ typedef struct { } ngx_http_upstream_next_t; +typedef struct { + ngx_int_t (*can_send)(ngx_http_upstream_t *u); + ngx_int_t (*is_done)(ngx_http_upstream_t *u); +} ngx_http_upstream_mux_hooks_t; + +struct ngx_http_upstream_mux_s { + ngx_queue_t send_queue; + ngx_http_upstream_mux_hooks_t *hooks; + void *protocol_data; +}; + + typedef struct { ngx_str_t key; ngx_str_t value; @@ -457,6 +481,16 @@ ngx_int_t ngx_http_upstream_merge_ssl_passwords(ngx_conf_t *cf, uscf->srv_conf[module.ctx_index] +ngx_http_upstream_mux_t *ngx_http_upstream_mux_get(ngx_connection_t *c); +void ngx_http_upstream_mux_drive(ngx_connection_t *c, + ngx_http_upstream_mux_t *mux); +void ngx_http_upstream_mux_enqueue(ngx_http_request_t *r, + ngx_http_upstream_t *u); +void ngx_http_upstream_mux_dequeue(ngx_http_upstream_t *u); +ngx_uint_t ngx_http_upstream_mux_queue_has_waiters( + ngx_http_upstream_mux_t *mux); + + extern ngx_module_t ngx_http_upstream_module; extern ngx_conf_bitmask_t ngx_http_upstream_cache_method_mask[]; extern ngx_conf_bitmask_t ngx_http_upstream_ignore_headers_masks[];