This commit is contained in:
Gourav Kandoria 2026-06-26 00:51:52 +00:00 committed by GitHub
commit 770d65e6db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 1123 additions and 112 deletions

View file

@ -46,6 +46,8 @@ typedef struct {
ngx_chain_t *free;
ngx_chain_t *busy;
ngx_http_request_t *request;
ngx_http_proxy_v2_conn_t *connection;
ngx_uint_t id;
@ -84,6 +86,10 @@ typedef struct {
unsigned literal:1;
unsigned field_huffman:1;
ssize_t window_charge; /* DATA payload framed but not yet charged to window */
off_t last_sent; /* c->sent at last window charge */
unsigned header_sent:1;
unsigned output_closed:1;
unsigned output_blocked:1;
@ -160,12 +166,18 @@ static ngx_http_proxy_v2_ctx_t *
ngx_http_proxy_v2_get_ctx(ngx_http_request_t *r);
static ngx_int_t ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r,
ngx_http_proxy_v2_ctx_t *ctx, ngx_peer_connection_t *pc);
static ngx_http_upstream_mux_hooks_t ngx_http_proxy_v2_mux_hooks;
static void ngx_http_proxy_v2_cleanup(void *data);
static void ngx_http_proxy_v2_abort_request(ngx_http_request_t *r);
static void ngx_http_proxy_v2_finalize_request(ngx_http_request_t *r,
ngx_int_t rc);
static void ngx_http_proxy_v2_charge_window(ngx_http_proxy_v2_ctx_t *ctx,
off_t bytes_written);
static ngx_http_module_t ngx_http_proxy_v2_module_ctx = {
NULL, /* preconfiguration */
@ -239,6 +251,7 @@ ngx_http_proxy_v2_handler(ngx_http_request_t *r)
plcf->upstream.preserve_output = 1;
u = r->upstream;
u->http2 = 1;
if (plcf->proxy_lengths == NULL) {
ctx->ctx.vars = plcf->vars;
@ -987,7 +1000,9 @@ ngx_http_proxy_v2_reinit_request(ngx_http_request_t *r)
ctx->status = 0;
ctx->rst = 0;
ctx->goaway = 0;
ctx->connection = NULL;
ctx->in = NULL;
ctx->busy = NULL;
ctx->out = NULL;
@ -1008,7 +1023,7 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in)
ngx_int_t rc;
ngx_uint_t next, last;
ngx_chain_t *cl, *out, *ln, **ll;
ngx_http_upstream_t *u;
ngx_http_upstream_t *u = r->upstream;
ngx_http_proxy_v2_ctx_t *ctx;
ngx_http_proxy_v2_frame_t *f;
@ -1058,6 +1073,9 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in)
f->stream_id_2 = (u_char) ((ctx->id >> 8) & 0xff);
f->stream_id_3 = (u_char) (ctx->id & 0xff);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"proxy v2: patched frame sid=%ui", ctx->id);
p += (f->length_0 << 16) + (f->length_1 << 8) + f->length_2;
}
}
@ -1208,8 +1226,8 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in)
f->stream_id_3 = (u_char) (ctx->id & 0xff);
limit -= len;
ctx->send_window -= len;
ctx->connection->send_window -= len;
ctx->window_charge += len;
/* window decrement deferred to write-completion time */
} while (!next && limit > 0);
@ -1305,6 +1323,17 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in)
rc = ngx_chain_writer(&r->upstream->writer, out);
if (u != NULL) {
ngx_connection_t *sc = u->peer.connection;
if (sc != NULL) {
off_t delta = sc->sent - ctx->last_sent;
if (delta > 0) {
ngx_http_proxy_v2_charge_window(ctx, delta);
}
ctx->last_sent = sc->sent;
}
}
ngx_chain_update_chains(r->pool, &ctx->free, &ctx->busy, &out,
(ngx_buf_tag_t) &ngx_http_proxy_v2_body_output_filter);
@ -1341,7 +1370,6 @@ ngx_http_proxy_v2_body_output_filter(void *data, ngx_chain_t *in)
* here anyway.
*/
u = r->upstream;
u->length = 0;
u->pipe->length = 0;
@ -4094,9 +4122,20 @@ ngx_http_proxy_v2_get_ctx(ngx_http_request_t *r)
ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_v2_module);
if (ctx->connection == NULL) {
u = r->upstream;
u = r->upstream;
/*
* Force (re)resolution to the real v2_conn on the backend c's pool
* when we now have a real peer connection. This ensures the *creating*
* request (which saw c==NULL during create_request and got a temp
* v2c with id=0) picks up the shared real v2c (and registers for
* the multiplexer) once the socket exists. Second+ requests (fresh
* ctx, connection==NULL) also go through here.
*/
if (ctx->connection == NULL
|| (u && u->peer.connection != NULL
&& (ctx->id == 0 || ctx->connection->init_window == 0)))
{
if (ngx_http_proxy_v2_get_connection_data(r, ctx, &u->peer) != NGX_OK) {
return NULL;
}
@ -4110,10 +4149,14 @@ static ngx_int_t
ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r,
ngx_http_proxy_v2_ctx_t *ctx, ngx_peer_connection_t *pc)
{
ngx_connection_t *c;
ngx_pool_cleanup_t *cln;
ngx_connection_t *c;
ngx_pool_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_upstream_mux_t *mux;
c = pc->connection;
u = r->upstream;
ctx->request = r;
if (c == NULL) {
ctx->connection = ngx_palloc(r->pool, sizeof(ngx_http_proxy_v2_conn_t));
@ -4126,29 +4169,24 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r,
goto done;
}
if (pc->cached) {
/*
* for cached connections, connection data can be found
* in the cleanup handler
*/
for (cln = c->pool->cleanup; cln; cln = cln->next) {
if (cln->handler == ngx_http_proxy_v2_cleanup) {
ctx->connection = cln->data;
break;
}
/*
* Look for existing per-connection v2 state attached to this
* backend c's pool. Also find (or create) the upstream mux.
*/
for (cln = c->pool->cleanup; cln; cln = cln->next) {
if (cln->handler == ngx_http_proxy_v2_cleanup) {
ctx->connection = cln->data;
break;
}
}
if (ctx->connection != NULL) {
/* reuse of already-initialized v2 state on this c */
if (ctx->connection == NULL) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"no connection data found for "
"keepalive http2 connection");
return NGX_ERROR;
}
ctx->send_window = ctx->connection->init_window;
ctx->recv_window = NGX_HTTP_V2_MAX_WINDOW;
ctx->last_sent = c->sent;
ctx->connection->last_stream_id += 2;
ctx->id = ctx->connection->last_stream_id;
@ -4156,6 +4194,8 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r,
return NGX_OK;
}
/* first H2 proxy use ever on this backend connection c */
cln = ngx_pool_cleanup_add(c->pool, sizeof(ngx_http_proxy_v2_conn_t));
if (cln == NULL) {
return NGX_ERROR;
@ -4164,6 +4204,16 @@ ngx_http_proxy_v2_get_connection_data(ngx_http_request_t *r,
cln->handler = ngx_http_proxy_v2_cleanup;
ctx->connection = cln->data;
/* Create upstream mux for this connection (u->mux assigned by upstream layer) */
mux = ngx_http_upstream_mux_get(c);
if (mux == NULL) {
return NGX_ERROR;
}
mux->protocol_data = ctx->connection;
mux->hooks = &ngx_http_proxy_v2_mux_hooks;
ctx->id = 1;
done:
@ -4174,9 +4224,20 @@ done:
ctx->send_window = NGX_HTTP_V2_DEFAULT_WINDOW;
ctx->recv_window = NGX_HTTP_V2_MAX_WINDOW;
ctx->last_sent = c ? c->sent : 0;
ctx->connection->last_stream_id = 1;
if (c != NULL) {
u->keepalive = 1;
ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0,
"proxy v2: NEW v2c=%p on #%uA c=%p fd=%d pool=%p "
"ctx->id=%ui",
ctx->connection, c->number, c, c->fd, c->pool,
ctx->id);
}
return NGX_OK;
}
@ -4191,6 +4252,101 @@ ngx_http_proxy_v2_cleanup(void *data)
return;
}
/* Return the v2_conn state (if any) attached to this backend connection's
* pool via the sentinel cleanup. Used by core upstream (guards, force
* event drive on reuse) and internally. Returns void* so callers in
* core upstream do not need the private struct type.
*/
void *
ngx_http_proxy_v2_get_connection(ngx_connection_t *c)
{
ngx_pool_cleanup_t *cln;
if (c == NULL || c->pool == NULL) {
return NULL;
}
for (cln = c->pool->cleanup; cln; cln = cln->next) {
if (cln->handler == ngx_http_proxy_v2_cleanup) {
return cln->data;
}
}
return NULL;
}
/*
* Charge the flow-control window for DATA payload that was actually
* written to the socket. Called from drive_multiplex (slow path) and
* ngx_http_upstream.c (fast path) right after ngx_http_upstream_send_request.
*
* We defer window decrement from frame-construction time (body_output_filter)
* to write-completion time so that frames queued by enqueue_request (dummy
* send_chain) do not phantom-consume the window.
*
* To handle partial writes correctly we use c->sent: the caller saves
* c->sent before the write and passes the delta. Only that many DATA
* bytes (capped by window_charge) are charged.
*/
static void
ngx_http_proxy_v2_charge_window(ngx_http_proxy_v2_ctx_t *ctx,
off_t bytes_written)
{
ssize_t charge;
if (ctx == NULL || ctx->window_charge <= 0 || bytes_written <= 0) {
return;
}
charge = ctx->window_charge;
if (bytes_written < charge) {
charge = (ssize_t) bytes_written;
}
ngx_log_debug4(NGX_LOG_DEBUG_HTTP,
ctx->request ? ctx->request->connection->log : NULL, 0,
"proxy v2: charge window sid=%ui charge=%d (of %d) stream_win=%d",
ctx->id, charge, ctx->window_charge, (int) ctx->send_window);
ctx->send_window -= charge;
ctx->connection->send_window -= charge;
ctx->window_charge -= charge;
}
static ngx_int_t
ngx_http_proxy_v2_mux_can_send(ngx_http_upstream_t *u)
{
ngx_http_proxy_v2_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(u->request, ngx_http_proxy_v2_module);
if (ctx == NULL) {
return 1;
}
return ctx->send_window
}
static ngx_int_t
ngx_http_proxy_v2_mux_is_done(ngx_http_upstream_t *u)
{
if (u->writer.out != NULL) {
return 0;
}
return 1;
}
static ngx_http_upstream_mux_hooks_t ngx_http_proxy_v2_mux_hooks = {
ngx_http_proxy_v2_mux_can_send,
ngx_http_proxy_v2_mux_is_done,
};
static void
ngx_http_proxy_v2_abort_request(ngx_http_request_t *r)
@ -4204,7 +4360,24 @@ ngx_http_proxy_v2_abort_request(ngx_http_request_t *r)
static void
ngx_http_proxy_v2_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
ngx_http_proxy_v2_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"finalize proxy http2 request");
ngx_http_upstream_t *u;
ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_v2_module);
if (ctx != NULL) {
u = r->upstream;
if (u != NULL && u->in_mux_queue) {
ngx_http_upstream_mux_dequeue(u);
}
ctx->connection = NULL;
}
return;
}

View file

@ -9,6 +9,10 @@
#include <ngx_core.h>
#include <ngx_http.h>
#if (NGX_HTTP_V2)
#include <ngx_http_proxy_module.h>
#endif
typedef struct {
ngx_uint_t max_cached;
@ -23,6 +27,9 @@ typedef struct {
ngx_uint_t local; /* unsigned local:1; */
ngx_uint_t max_streams_per_connection;
ngx_uint_t max_streams_total;
ngx_uint_t total_active_streams;
} ngx_http_upstream_keepalive_srv_conf_t;
@ -37,6 +44,9 @@ typedef struct {
ngx_http_upstream_conf_t *tag;
ngx_uint_t active_streams;
unsigned h2:1;
} ngx_http_upstream_keepalive_cache_t;
@ -57,6 +67,11 @@ typedef struct {
ngx_event_notify_peer_pt original_notify;
unsigned h2_enabled:1;
ngx_uint_t max_streams_per_connection;
ngx_uint_t max_streams_total;
} ngx_http_upstream_keepalive_peer_data_t;
@ -87,6 +102,32 @@ static char *ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf,
static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_http_upstream_keepalive_cache_t *
ngx_http_upstream_keepalive_find_cached_connection(ngx_queue_t *cache,
ngx_connection_t *c);
static void ngx_http_upstream_keepalive_adjust_total(
ngx_http_upstream_keepalive_srv_conf_t *kcf, ngx_uint_t delta);
static void ngx_http_upstream_keepalive_prepare_connection(
ngx_connection_t *c, ngx_peer_connection_t *pc);
static void ngx_http_upstream_keepalive_set_idle(
ngx_http_upstream_keepalive_cache_t *item, ngx_connection_t *c,
ngx_peer_connection_t *pc, ngx_http_upstream_t *u);
static ngx_int_t ngx_http_upstream_get_keepalive_peer_h1(
ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp);
static ngx_int_t ngx_http_upstream_get_keepalive_peer_h2(
ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp);
static void ngx_http_upstream_free_keepalive_peer_h1(
ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp,
ngx_uint_t state);
static void ngx_http_upstream_free_keepalive_peer_h2(
ngx_peer_connection_t *pc, ngx_http_upstream_keepalive_peer_data_t *kp,
ngx_uint_t state);
static ngx_uint_t ngx_http_upstream_keepalive_h2_connection_bad(
ngx_peer_connection_t *pc, ngx_uint_t state);
static void ngx_http_upstream_keepalive_drop_h2_connection(
ngx_http_upstream_keepalive_srv_conf_t *kcf,
ngx_http_upstream_keepalive_cache_t *item, ngx_peer_connection_t *pc);
static ngx_command_t ngx_http_upstream_keepalive_commands[] = {
@ -118,6 +159,21 @@ static ngx_command_t ngx_http_upstream_keepalive_commands[] = {
offsetof(ngx_http_upstream_keepalive_srv_conf_t, requests),
NULL },
{ ngx_string("keepalive_max_streams_per_connection"),
NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_SRV_CONF_OFFSET,
offsetof(ngx_http_upstream_keepalive_srv_conf_t,
max_streams_per_connection),
NULL },
{ ngx_string("keepalive_max_streams_total"),
NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_SRV_CONF_OFFSET,
offsetof(ngx_http_upstream_keepalive_srv_conf_t, max_streams_total),
NULL },
ngx_null_command
};
@ -153,6 +209,193 @@ ngx_module_t ngx_http_upstream_keepalive_module = {
};
static ngx_http_upstream_keepalive_cache_t *
ngx_http_upstream_keepalive_find_cached_connection(ngx_queue_t *cache,
ngx_connection_t *c)
{
ngx_queue_t *q;
ngx_http_upstream_keepalive_cache_t *item;
for (q = ngx_queue_head(cache);
q != ngx_queue_sentinel(cache);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
if (item->connection == c) {
return item;
}
}
return NULL;
}
/* Acquire a cache slot for a new connection.
* For H2: only evict connections that currently have zero active streams.
* Returns a card removed from free or from cache (with its old conn closed).
* Returns NULL if no safe slot (all cards busy with streams).
*/
static ngx_http_upstream_keepalive_cache_t *
ngx_http_upstream_keepalive_get_slot(ngx_http_upstream_keepalive_srv_conf_t *kcf)
{
ngx_queue_t *q;
ngx_http_upstream_keepalive_cache_t *item;
if (!ngx_queue_empty(&kcf->free)) {
q = ngx_queue_head(&kcf->free);
ngx_queue_remove(q);
return ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
}
/* H2-safe eviction: only close idle (active_streams == 0) connections.
* Walk from tail (LRU) to find the least-recent idle one.
*/
for (q = ngx_queue_last(&kcf->cache);
q != ngx_queue_sentinel(&kcf->cache);
q = ngx_queue_prev(q))
{
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
if (item->active_streams == 0) {
ngx_queue_remove(q);
ngx_http_upstream_keepalive_close(item->connection);
return item;
}
}
return NULL;
}
static void
ngx_http_upstream_keepalive_attach_new_h2(void *data, ngx_connection_t *c,
ngx_peer_connection_t *pc)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_http_upstream_keepalive_cache_t *item;
ngx_http_upstream_t *u;
if (kp == NULL) {
return;
}
kcf = kp->conf;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc ? pc->log : kp->upstream ? kp->upstream->peer.log : NULL, 0,
"attach_new_h2: entered, h2_enabled=%d", kp->h2_enabled);
if (!kp->h2_enabled) {
return;
}
u = kp->upstream;
item = ngx_http_upstream_keepalive_get_slot(kcf);
if (item == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc ? pc->log : NULL, 0,
"attach_new_h2: get_slot returned NULL, not parking early");
/* No safe slot (all pooled conns are busy). Do not add this
* connection to the cache so we don't evict active work.
* Current request can still use the new socket; it just won't
* be offered for multiplexing to other requests.
*/
return;
}
ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc ? pc->log : c->log, 0,
"attach_new_h2: slot=%p parking #%uA c=%p fd=%d "
"pool=%p active=1",
item, c->number, c, c->fd, c->pool);
item->connection = c;
item->active_streams = 1;
item->h2 = 1;
item->tag = u->conf;
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
ngx_queue_insert_head(&kcf->cache, &item->queue);
/*
* We do not call prepare_connection() here. For a freshly created
* connection the core upstream code (after this hook returns) will
* initialize the pool, logs, c->data = r, handlers etc. We only need
* the item in the cache with active_streams=1 so that concurrent
* get_keepalive_peer_h2 calls can find and share this connection.
* Do not call set_idle() this conn is still in use by the current
* request.
*/
}
static void
ngx_http_upstream_keepalive_adjust_total(
ngx_http_upstream_keepalive_srv_conf_t *kcf, ngx_uint_t delta)
{
if (delta == 0) {
return;
}
if (delta > kcf->total_active_streams) {
kcf->total_active_streams = 0;
return;
}
kcf->total_active_streams -= delta;
}
static void
ngx_http_upstream_keepalive_prepare_connection(ngx_connection_t *c,
ngx_peer_connection_t *pc)
{
c->idle = 0;
c->sent = 0;
c->data = NULL;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
c->pool->log = pc->log;
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
}
static void
ngx_http_upstream_keepalive_set_idle(
ngx_http_upstream_keepalive_cache_t *item, ngx_connection_t *c,
ngx_peer_connection_t *pc, ngx_http_upstream_t *u)
{
c->read->delayed = 0;
ngx_add_timer(c->read, item->conf->timeout);
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
c->read->handler = ngx_http_upstream_keepalive_close_handler;
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
c->pool->log = ngx_cycle->log;
item->tag = u->conf;
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
if (c->read->ready) {
ngx_http_upstream_keepalive_close_handler(c->read);
}
}
static ngx_int_t
ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
@ -192,9 +435,44 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
#endif
if (r->upstream->peer.notify) {
kp->original_notify = r->upstream->peer.notify;
r->upstream->peer.notify = ngx_http_upstream_notify_keepalive_peer;
/* Always install notify wrapper (after the L1 original_init_peer has run).
* This ensures NGX_HTTP_UPSTREAM_NOTIFY_CONNECT is delivered for early
* H2 parking even when no other module (sticky/least_time) provided a
* base notify. We save whatever was there (possibly NULL) and chain.
*/
kp->original_notify = r->upstream->peer.notify;
r->upstream->peer.notify = ngx_http_upstream_notify_keepalive_peer;
#if (NGX_HTTP_V2)
{
ngx_http_proxy_loc_conf_t *plcf;
plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
if (plcf && plcf->http_version == NGX_HTTP_VERSION_20) {
kp->h2_enabled = 1;
if (kp->upstream) {
kp->upstream->http2 = 1;
}
/* if user did not set the H2 keepalive knobs, apply defaults for this request */
if (kcf->max_streams_per_connection == NGX_CONF_UNSET_UINT) {
kp->max_streams_per_connection = 100; /* sensible default */
} else {
kp->max_streams_per_connection = kcf->max_streams_per_connection;
}
if (kcf->max_streams_total == NGX_CONF_UNSET_UINT) {
kp->max_streams_total = kp->max_streams_per_connection * kcf->max_cached;
} else {
kp->max_streams_total = kcf->max_streams_total;
}
}
}
#endif
if (!kp->h2_enabled) {
kp->max_streams_per_connection = kcf->max_streams_per_connection;
kp->max_streams_total = kcf->max_streams_total;
}
return NGX_OK;
@ -202,28 +480,14 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
static ngx_int_t
ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
ngx_http_upstream_get_keepalive_peer_h1(ngx_peer_connection_t *pc,
ngx_http_upstream_keepalive_peer_data_t *kp)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_cache_t *item;
ngx_http_upstream_keepalive_cache_t *item;
ngx_int_t rc;
ngx_queue_t *q, *cache;
ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer");
/* ask balancer */
rc = kp->original_get_peer(pc, kp->data);
if (rc != NGX_OK) {
return rc;
}
/* search cache for suitable connection */
cache = &kp->conf->cache;
for (q = ngx_queue_head(cache);
@ -237,6 +501,10 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
continue;
}
if (item->h2) {
continue;
}
if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
item->socklen, pc->socklen)
== 0)
@ -244,29 +512,102 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
ngx_queue_remove(q);
ngx_queue_insert_head(&kp->conf->free, q);
goto found;
ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using #%uA c=%p fd=%d pool=%p",
c->number, c, c->fd, c->pool);
ngx_http_upstream_keepalive_prepare_connection(c, pc);
pc->connection = c;
pc->cached = 1;
return NGX_DONE;
}
}
return NGX_OK;
}
found:
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using connection %p", c);
static ngx_int_t
ngx_http_upstream_get_keepalive_peer_h2(ngx_peer_connection_t *pc,
ngx_http_upstream_keepalive_peer_data_t *kp)
{
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_http_upstream_keepalive_cache_t *item, *best;
c->idle = 0;
c->sent = 0;
c->data = NULL;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
c->pool->log = pc->log;
ngx_queue_t *q, *cache;
ngx_connection_t *c;
ngx_uint_t best_active;
if (c->read->timer_set) {
ngx_del_timer(c->read);
kcf = kp->conf;
if (kcf->total_active_streams >= kp->max_streams_total) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: total stream limit reached");
return NGX_BUSY;
}
cache = &kcf->cache;
best = NULL;
best_active = NGX_MAX_UINT32_VALUE;
for (q = ngx_queue_head(cache);
q != ngx_queue_sentinel(cache);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
c = item->connection;
if (kp->conf->local && item->tag != kp->upstream->conf) {
continue;
}
if (!item->h2) {
continue;
}
if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
item->socklen, pc->socklen)
!= 0)
{
continue;
}
if (item->active_streams >= kp->max_streams_per_connection) {
continue;
}
if (item->active_streams < best_active) {
best = item;
best_active = item->active_streams;
}
}
if (best == NULL) {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get_h2: no suitable cached conn found (best==NULL), will create new. "
"total_active=%ui max_total=%ui",
kcf->total_active_streams, kp->max_streams_total);
kcf->total_active_streams++;
return NGX_OK;
}
c = best->connection;
best->active_streams++;
kcf->total_active_streams++;
ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using #%uA c=%p fd=%d pool=%p "
"slot=%p active_streams=%ui total=%ui",
c->number, c, c->fd, c->pool, best,
best->active_streams, kcf->total_active_streams);
ngx_http_upstream_keepalive_prepare_connection(c, pc);
pc->connection = c;
pc->cached = 1;
@ -274,62 +615,98 @@ found:
}
static void
ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
static ngx_int_t
ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_cache_t *item;
ngx_queue_t *q;
ngx_connection_t *c;
ngx_http_upstream_t *u;
ngx_int_t rc;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer");
"get keepalive peer");
/* cache valid connections */
rc = kp->original_get_peer(pc, kp->data);
if (rc != NGX_OK) {
return rc;
}
if (kp->h2_enabled) {
return ngx_http_upstream_get_keepalive_peer_h2(pc, kp);
}
return ngx_http_upstream_get_keepalive_peer_h1(pc, kp);
}
static ngx_uint_t
ngx_http_upstream_keepalive_h2_connection_bad(ngx_peer_connection_t *pc,
ngx_uint_t state)
{
ngx_connection_t *c;
if (state & NGX_PEER_FAILED) {
return 1;
}
u = kp->upstream;
c = pc->connection;
if (state & NGX_PEER_FAILED
|| c == NULL
if (c == NULL
|| c->read->eof
|| c->read->error
|| c->read->timedout
|| c->write->error
|| c->write->timedout)
{
goto invalid;
return 1;
}
if (c->requests >= kp->conf->requests) {
goto invalid;
return 0;
}
static void
ngx_http_upstream_keepalive_drop_h2_connection(
ngx_http_upstream_keepalive_srv_conf_t *kcf,
ngx_http_upstream_keepalive_cache_t *item, ngx_peer_connection_t *pc)
{
ngx_connection_t *c;
if (item->active_streams > 0) {
ngx_http_upstream_keepalive_adjust_total(kcf, item->active_streams);
item->active_streams = 0;
}
if (ngx_current_msec - c->start_time > kp->conf->time) {
goto invalid;
c = item->connection;
ngx_queue_remove(&item->queue);
ngx_queue_insert_head(&kcf->free, &item->queue);
item->connection = NULL;
if (c) {
ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0,
"keepalive h2 drop: #%uA c=%p fd=%d pool=%p slot=%p",
c->number, c, c->fd, c->pool, item);
ngx_http_upstream_keepalive_close(c);
}
if (!u->keepalive) {
goto invalid;
}
pc->connection = NULL;
}
if (!u->request_body_sent) {
goto invalid;
}
if (ngx_terminate || ngx_exiting) {
goto invalid;
}
static void
ngx_http_upstream_free_keepalive_peer_h1(ngx_peer_connection_t *pc,
ngx_http_upstream_keepalive_peer_data_t *kp, ngx_uint_t state)
{
ngx_http_upstream_keepalive_cache_t *item;
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto invalid;
}
ngx_queue_t *q;
ngx_connection_t *c;
ngx_http_upstream_t *u;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer: saving connection %p", c);
u = kp->upstream;
c = pc->connection;
if (ngx_queue_empty(&kp->conf->free)) {
@ -351,35 +728,182 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
item->connection = c;
item->tag = u->conf;
item->h2 = 0;
pc->connection = NULL;
c->read->delayed = 0;
ngx_add_timer(c->read, kp->conf->timeout);
ngx_http_upstream_keepalive_set_idle(item, c, pc, u);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
static void
ngx_http_upstream_free_keepalive_peer_h2(ngx_peer_connection_t *pc,
ngx_http_upstream_keepalive_peer_data_t *kp, ngx_uint_t state)
{
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_http_upstream_keepalive_cache_t *item;
ngx_connection_t *c;
ngx_http_upstream_t *u;
kcf = kp->conf;
u = kp->upstream;
c = pc->connection;
item = ngx_http_upstream_keepalive_find_cached_connection(&kcf->cache, c);
if (item) {
if (item->active_streams > 0) {
item->active_streams--;
ngx_http_upstream_keepalive_adjust_total(kcf, 1);
}
pc->connection = NULL;
ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0,
"keepalive h2 free: #%uA c=%p fd=%d pool=%p slot=%p "
"active_streams=%ui total=%ui",
c->number, c, c->fd, c->pool, item,
item->active_streams, kcf->total_active_streams);
if (item->active_streams > 0) {
return;
}
ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0,
"keepalive h2 idle: #%uA c=%p fd=%d last stream done",
c->number, c, c->fd);
ngx_http_upstream_keepalive_set_idle(item, c, pc, u);
return;
}
c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
c->read->handler = ngx_http_upstream_keepalive_close_handler;
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
c->pool->log = ngx_cycle->log;
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
if (c->read->ready) {
ngx_http_upstream_keepalive_close_handler(c->read);
if (kcf->total_active_streams > 0) {
kcf->total_active_streams--;
}
item = ngx_http_upstream_keepalive_get_slot(kcf);
if (item == NULL) {
/* No safe slot available (all pooled connections are busy).
* Do not park this connection. The socket will be closed by
* the normal upstream path after this request/stream ends.
*/
return;
}
ngx_queue_insert_head(&kcf->cache, &item->queue);
item->connection = c;
item->active_streams = 0;
item->h2 = 1;
item->tag = u->conf;
pc->connection = NULL;
ngx_http_upstream_keepalive_set_idle(item, c, pc, u);
}
static void
ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_http_upstream_keepalive_cache_t *item;
ngx_connection_t *c;
ngx_http_upstream_t *u;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer");
u = kp->upstream;
c = pc->connection;
kcf = kp->conf;
/*
* H2 multiplex: cached connections use per-stream teardown.
* Do not apply the H1 !u->keepalive gate to shared sockets.
*/
if (kp->h2_enabled && c != NULL) {
item = ngx_http_upstream_keepalive_find_cached_connection(&kcf->cache,
c);
if (item != NULL) {
if (ngx_http_upstream_keepalive_h2_connection_bad(pc, state)) {
ngx_log_debug(NGX_LOG_DEBUG_HTTP, c->log, 0,
"keepalive h2 free cached bad: #%uA c=%p fd=%d",
c->number, c, c->fd);
ngx_http_upstream_keepalive_drop_h2_connection(kcf, item, pc);
} else {
ngx_http_upstream_free_keepalive_peer_h2(pc, kp, state);
}
goto done;
}
}
if (ngx_http_upstream_keepalive_h2_connection_bad(pc, state)) {
goto invalid;
}
if (c->requests >= kcf->requests) {
goto invalid;
}
if (ngx_current_msec - c->start_time > kcf->time) {
goto invalid;
}
if (!u->keepalive) {
goto invalid;
}
if (!u->request_body_sent) {
goto invalid;
}
if (ngx_terminate || ngx_exiting) {
goto invalid;
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto invalid;
}
ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer: saving #%uA c=%p fd=%d pool=%p",
c->number, c, c->fd, c->pool);
if (kp->h2_enabled) {
ngx_http_upstream_free_keepalive_peer_h2(pc, kp, state);
goto done;
}
ngx_http_upstream_free_keepalive_peer_h1(pc, kp, state);
goto done;
invalid:
if (kp->h2_enabled) {
item = c ? ngx_http_upstream_keepalive_find_cached_connection(
&kcf->cache, c) : NULL;
if (item) {
ngx_http_upstream_keepalive_drop_h2_connection(kcf, item, pc);
} else if (kcf->total_active_streams > 0) {
kcf->total_active_streams--;
}
}
done:
kp->original_free_peer(pc, kp->data, state);
}
@ -428,10 +952,20 @@ close:
item = c->data;
conf = item->conf;
if (item->h2) {
if (item->active_streams > 0) {
ngx_http_upstream_keepalive_adjust_total(conf,
item->active_streams);
item->active_streams = 0;
item->h2 = 0;
}
}
ngx_http_upstream_keepalive_close(c);
ngx_queue_remove(&item->queue);
ngx_queue_insert_head(&conf->free, &item->queue);
item->connection = NULL;
}
@ -487,7 +1021,28 @@ ngx_http_upstream_notify_keepalive_peer(ngx_peer_connection_t *pc, void *data,
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
kp->original_notify(pc, kp->data, type);
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"notify keepalive peer: type=%ui pc=%p",
type, pc);
if (type == NGX_HTTP_UPSTREAM_NOTIFY_CONNECT) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"notify keepalive: CONNECT received, kp=%p", kp);
if (kp && kp->h2_enabled) {
ngx_connection_t *c = pc->connection;
if (c != NULL) {
ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"notify keepalive: calling attach for "
"#%uA c=%p fd=%d pool=%p",
c->number, c, c->fd, c->pool);
ngx_http_upstream_keepalive_attach_new_h2(kp, c, pc);
}
}
}
if (kp && kp->original_notify) {
kp->original_notify(pc, kp->data, type);
}
}
@ -507,12 +1062,15 @@ ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
*
* conf->original_init_peer = NULL;
* conf->local = 0;
* conf->total_active_streams = 0;
*/
conf->time = NGX_CONF_UNSET_MSEC;
conf->timeout = NGX_CONF_UNSET_MSEC;
conf->requests = NGX_CONF_UNSET_UINT;
conf->max_cached = NGX_CONF_UNSET_UINT;
conf->max_streams_per_connection = NGX_CONF_UNSET_UINT;
conf->max_streams_total = NGX_CONF_UNSET_UINT;
return conf;
}
@ -554,6 +1112,22 @@ ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, void *conf)
kcf->max_cached = 32;
}
if (kcf->max_streams_per_connection != NGX_CONF_UNSET_UINT) {
if (kcf->max_streams_total == NGX_CONF_UNSET_UINT) {
kcf->max_streams_total =
kcf->max_streams_per_connection * kcf->max_cached;
}
if (kcf->max_streams_total < kcf->max_streams_per_connection) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"keepalive_max_streams_total must be "
"greater than or equal to "
"keepalive_max_streams_per_connection");
return NGX_CONF_ERROR;
}
}
kcf->original_init_peer = uscfp[i]->peer.init;
uscfp[i]->peer.init = ngx_http_upstream_init_keepalive_peer;
@ -572,6 +1146,8 @@ ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, void *conf)
for (j = 0; j < kcf->max_cached; j++) {
ngx_queue_insert_head(&kcf->free, &cached[j].queue);
cached[j].conf = kcf;
cached[j].active_streams = 0;
cached[j].h2 = 0;
}
}

View file

@ -46,6 +46,8 @@ static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r,
static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r);
static void ngx_http_upstream_read_request_handler_delayed(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_process_header(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static ngx_int_t ngx_http_upstream_process_early_hints(ngx_http_request_t *r,
@ -1322,7 +1324,11 @@ ngx_http_upstream_handler(ngx_event_t *ev)
}
if (ev->write) {
u->write_event_handler(r, u);
if (u->mux != NULL) {
ngx_http_upstream_mux_drive(u->peer.connection, u->mux);
} else {
u->write_event_handler(r, u);
}
} else {
u->read_event_handler(r, u);
@ -1553,6 +1559,162 @@ ngx_http_upstream_check_broken_connection(ngx_http_request_t *r,
}
/*
* Upstream mux: multiplexes multiple HTTP/2 streams over a single
* upstream connection via a send queue and fair round-robin scheduling.
* The mux is stored as a cleanup on the upstream connection's pool.
*/
static void
ngx_http_upstream_mux_cleanup(void *data)
{
/* pool-owned; nothing to free */
}
ngx_http_upstream_mux_t *
ngx_http_upstream_mux_get(ngx_connection_t *c)
{
ngx_http_upstream_mux_t *mux;
ngx_pool_cleanup_t *cln;
if (c == NULL || c->pool == NULL) {
return NULL;
}
for (cln = c->pool->cleanup; cln; cln = cln->next) {
if (cln->handler == ngx_http_upstream_mux_cleanup) {
return cln->data;
}
}
/* not found — create */
cln = ngx_pool_cleanup_add(c->pool, sizeof(ngx_http_upstream_mux_t));
if (cln == NULL) {
return NULL;
}
mux = cln->data;
ngx_memzero(mux, sizeof(ngx_http_upstream_mux_t));
ngx_queue_init(&mux->send_queue);
cln->handler = ngx_http_upstream_mux_cleanup;
return mux;
}
void
ngx_http_upstream_mux_enqueue(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_http_upstream_mux_t *mux;
mux = u->mux;
if (mux == NULL) {
return;
}
if (u->in_mux_queue) {
return;
}
ngx_queue_insert_tail(&mux->send_queue, &u->mux_send_link);
u->in_mux_queue = 1;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream mux: enqueued r=%p", r);
}
void
ngx_http_upstream_mux_dequeue(ngx_http_upstream_t *u)
{
if (!u->in_mux_queue) {
return;
}
ngx_queue_remove(&u->mux_send_link);
ngx_queue_init(&u->mux_send_link);
u->in_mux_queue = 0;
}
ngx_uint_t
ngx_http_upstream_mux_queue_has_waiters(ngx_http_upstream_mux_t *mux)
{
if (mux == NULL) {
return 0;
}
return !ngx_queue_empty(&mux->send_queue);
}
void
ngx_http_upstream_mux_drive(ngx_connection_t *c, ngx_http_upstream_mux_t *mux)
{
ngx_queue_t *q;
ngx_http_request_t *r;
ngx_http_upstream_t *u;
if (mux == NULL) {
return;
}
if (ngx_queue_empty(&mux->send_queue)) {
return;
}
q = ngx_queue_head(&mux->send_queue);
u = ngx_queue_data(q, ngx_http_upstream_t, mux_send_link);
r = u->request;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream mux: drive r=%p", r);
if (mux->hooks && mux->hooks->can_send(u)) {
u->mux_handler(r, u);
if (mux->hooks->is_done(u)) {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream mux: is_done #%uA r=%p dequeue",
c->number, r);
ngx_http_upstream_mux_dequeue(u);
if (!ngx_queue_empty(&mux->send_queue)) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream mux: post write #%uA "
"(queue has waiters)",
c->number);
ngx_post_event(c->write, &ngx_posted_events);
} else {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream mux: queue empty #%uA",
c->number);
}
} else {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream mux: not done #%uA r=%p "
"(stay in queue)",
c->number, r);
}
} else {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream mux: can_send false #%uA r=%p rotate",
c->number, r);
ngx_queue_remove(q);
ngx_queue_insert_tail(&mux->send_queue, q);
}
}
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
@ -1645,7 +1807,7 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
if (r->connection->tcp_nopush == NGX_TCP_NOPUSH_DISABLED) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
}
if (c->pool == NULL) {
/* we need separate pool here to be able to cache SSL connections */
@ -1663,6 +1825,17 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
c->read->log = c->log;
c->write->log = c->log;
if (u->http2) {
u->mux = ngx_http_upstream_mux_get(c);
}
if (rc == NGX_OK || rc == NGX_AGAIN) {
if (u->peer.notify) {
u->peer.notify(&u->peer, u->peer.data,
NGX_HTTP_UPSTREAM_NOTIFY_CONNECT);
}
}
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
@ -1706,6 +1879,7 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
r->request_body->buf->tag = u->output.tag;
}
u->request = r;
u->request_sent = 0;
u->request_body_sent = 0;
u->request_body_blocked = 0;
@ -1725,6 +1899,26 @@ ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
#endif
if (u->mux != NULL) {
if (rc == NGX_DONE) {
ngx_uint_t was_empty = ngx_queue_empty(&u->mux->send_queue);
u->mux_handler = u->write_event_handler;
ngx_http_upstream_mux_enqueue(r, u);
if (was_empty)
{
ngx_http_upstream_mux_drive(c, u->mux);
}
return;
}
/* for new H2 conn (rc != DONE): u->mux assigned here at connect time;
enqueue/drive will be handled by later send/read paths */
}
ngx_http_upstream_send_request(r, u, 1);
}
@ -2180,6 +2374,10 @@ ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u,
}
if (rc == NGX_AGAIN) {
if (u->mux) {
u->mux_handler = u->write_event_handler;
}
if (!c->write->ready || u->request_body_blocked) {
ngx_add_timer(c->write, u->conf->send_timeout);
@ -2418,6 +2616,23 @@ ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_send_request(r, u, 1);
}
static void
ngx_http_upstream_read_request_handler_delayed(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
c = r->connection;
if (c->read->timedout) {
c->timedout = 1;
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
ngx_http_upstream_send_request(r, u, 0);
}
static void
ngx_http_upstream_read_request_handler(ngx_http_request_t *r)
@ -2437,6 +2652,19 @@ ngx_http_upstream_read_request_handler(ngx_http_request_t *r)
return;
}
if (u->mux) {
ngx_uint_t was_empty = ngx_queue_empty(&u->mux->send_queue);
u->mux_handler = ngx_http_upstream_read_request_handler_delayed;
ngx_http_upstream_mux_enqueue(r, u);
if (was_empty) {
ngx_http_upstream_mux_drive(u->peer.connection, u->mux);
}
return;
}
ngx_http_upstream_send_request(r, u, 0);
}

View file

@ -58,6 +58,7 @@
#define NGX_HTTP_UPSTREAM_NOTIFY_HEADER 0x1
#define NGX_HTTP_UPSTREAM_NOTIFY_CONNECT 0x2
typedef struct {
@ -334,6 +335,9 @@ typedef void (*ngx_http_upstream_handler_pt)(ngx_http_request_t *r,
ngx_http_upstream_t *u);
typedef struct ngx_http_upstream_mux_s ngx_http_upstream_mux_t;
struct ngx_http_upstream_s {
ngx_http_upstream_handler_pt read_event_handler;
ngx_http_upstream_handler_pt write_event_handler;
@ -412,12 +416,20 @@ struct ngx_http_upstream_s {
unsigned keepalive:1;
unsigned upgrade:1;
unsigned error:1;
unsigned http2:1;
unsigned request_sent:1;
unsigned request_body_sent:1;
unsigned request_body_blocked:1;
unsigned header_sent:1;
unsigned response_received:1;
ngx_http_request_t *request;
ngx_http_upstream_mux_t *mux;
ngx_queue_t mux_send_link;
unsigned in_mux_queue:1;
ngx_http_upstream_handler_pt mux_handler;
};
@ -427,6 +439,18 @@ typedef struct {
} ngx_http_upstream_next_t;
typedef struct {
ngx_int_t (*can_send)(ngx_http_upstream_t *u);
ngx_int_t (*is_done)(ngx_http_upstream_t *u);
} ngx_http_upstream_mux_hooks_t;
struct ngx_http_upstream_mux_s {
ngx_queue_t send_queue;
ngx_http_upstream_mux_hooks_t *hooks;
void *protocol_data;
};
typedef struct {
ngx_str_t key;
ngx_str_t value;
@ -457,6 +481,16 @@ ngx_int_t ngx_http_upstream_merge_ssl_passwords(ngx_conf_t *cf,
uscf->srv_conf[module.ctx_index]
ngx_http_upstream_mux_t *ngx_http_upstream_mux_get(ngx_connection_t *c);
void ngx_http_upstream_mux_drive(ngx_connection_t *c,
ngx_http_upstream_mux_t *mux);
void ngx_http_upstream_mux_enqueue(ngx_http_request_t *r,
ngx_http_upstream_t *u);
void ngx_http_upstream_mux_dequeue(ngx_http_upstream_t *u);
ngx_uint_t ngx_http_upstream_mux_queue_has_waiters(
ngx_http_upstream_mux_t *mux);
extern ngx_module_t ngx_http_upstream_module;
extern ngx_conf_bitmask_t ngx_http_upstream_cache_method_mask[];
extern ngx_conf_bitmask_t ngx_http_upstream_ignore_headers_masks[];