Communication between worker processes

How might I be able to send a message or (custom) event to some given
worker process? I am storing pointers to requests (and the workers that
processed them) in shared memory for a module (
GitHub - slact/nchan: Fast, horizontally scalable, multiprocess pub/sub queuing server and proxy for HTTP, long-polling, Websockets and EventSource (SSE), powered by Nginx. ), and need to alert said
workers to respond to said requests, from the context of a location
request handler in a module. (maybe using ngx_channel stuff?)
I’d really appreciate any suggestions.

Thanque,

  • Leo

(PS: for reference, here was my original question:
request pool cleanup handler )

Hello!

On Mon, Oct 19, 2009 at 12:23:00AM -0400, Leo P. wrote:

How might I be able to send a message or (custom) event to some
given worker process? I am storing pointers to requests (and the
workers that processed them) in shared memory for a module (
GitHub - slact/nchan: Fast, horizontally scalable, multiprocess pub/sub queuing server and proxy for HTTP, long-polling, Websockets and EventSource (SSE), powered by Nginx. ), and need to alert
said workers to respond to said requests, from the context of a
location request handler in a module. (maybe using ngx_channel
stuff?)
I’d really appreciate any suggestions.

There is no infrastructure for interprocess notifications in
nginx right now. Basically it’s the reason why there is no such
things as busy locks and so on.

You may try to emulate it via e.g. message queue in shared memory
and periodic timer in each worker process to check it. It’s not
perfect, but probably will work for you.

Maxim D.

Maxim D. wrote:

location request handler in a module. (maybe using ngx_channel
perfect, but probably will work for you.

Maxim D.

What about using ngx_add_channel_event on process initialization?
Is there anything obviously wrong with doing something like the
following:

/in init process callback/
ngx_socket_t my_channel=ngx_channel;
/* is that the correct socket? what about
ngx_processes[ngx_process_slot].channel[0] and
ngx_processes[ngx_process_slot].channel[1] ?
which could be used?
*/
if (ngx_add_channel_event(cycle, my_channel, NGX_READ_EVENT,
ngx_http_push_channel_handler) == NGX_ERROR) {
exit(2);
}
//…

#define NGX_CMD_HTTP_PUSH_CHECK_MESSAGES 87; //some unlikely number.
(looks hacky)

static void ngx_http_push_channel_handler(ngx_event_t *ev) {
//mostly copied from ngx_channel_handler
(os/unix/ngx_process_cycle.c)
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
for ( ;; ) {
n = ngx_read_channel(c->fd, &ch, sizeof(*ch), ev->log);
if (n == NGX_ERROR) {
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
ngx_del_conn(c, 0);
}
ngx_close_connection©;
return;
}
if ((ngx_event_flags & NGX_USE_EVENTPORT_EVENT) &&
(ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR)) {
return;
}
if (n == NGX_AGAIN) { return; }

    //the custom command now.
    if(ch.command==NGX_CMD_HTTP_PUSH_CHECK_MESSAGES) {
        //take a look at the message queue for this worker process

in shared memory.
}
}
}

/* elsewhere, in a different worker process: */
if(/some condition/) {
ngx_channel_t ch;
ch.command = NGX_CMD_HTTP_PUSH_CHECK_MESSAGES;
ch.fd = -1;

if((ngx_write_channel(ngx_processes[request_owner_worker_process_slot].channel[0
/or 1?/], &ch, sizeof(*ch), some_log))!=NGX_OK) {
//do some error handling
}
}

Thanks again,

  • Leo

FYI, and for anyone else encountering this problem, I ended up rolling
my own socketpairs and reusing some of the existing channel handling
functions. One caveat that I did notice:

  • The number of worker processes is needed before nginx fork()s. You
    can get this in your module’s initialization handler from
    ((ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
    ngx_core_module))->worker_processes;
    In any other module handler it will either be too early or too late to
    access this data.

  • Leo

Hello!

On Mon, Oct 19, 2009 at 05:18:12PM -0400, Leo P. wrote:

location request handler in a module. (maybe using ngx_channel

 ngx_processes[ngx_process_slot].channel[1] ?
 which could be used?

*/

[…]

Socket pairs in question are used for nginx’s own needs (control
commands from master to workers). Any attempt to reuse them will
case nginx malfunction at next control command.

Eventually this will grow into something more generic and modules
will be able to use this for their own commands. But it’s not here
yet.

Maxim D.

p.s. BTW, just to make sure you know: nginx_http_push_module
relies on shared memory as something unconditionally available for
all processes, but this isn’t really true during binary upgrades.
New binary creates their own shared memory zones from scratch. As
a result - module won’t survive binary upgrades flawlessly but
will loose messages instead.