/*
* call-seq:
* conn.wait_for_notify( [ timeout ] ) -> String
* conn.wait_for_notify( [ timeout ] ) { |event, pid| block }
*
* Blocks while waiting for notification(s), or until the optional
* _timeout_ is reached, whichever comes first. _timeout_ is
* measured in seconds and can be fractional.
*
* Returns +nil+ if _timeout_ is reached, the name of the NOTIFY
* event otherwise. If used in block form, passes the name of the
* NOTIFY +event+ and the generating +pid+ into the block.
*
*/
static VALUE
pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
{
PGconn *conn = get_pgconn(self);
PGnotify *notify;
int sd = PQsocket(conn);
int ret;
struct timeval timeout;
struct timeval *ptimeout = NULL;
VALUE timeout_in, relname = Qnil, be_pid = Qnil;
double timeout_sec;
fd_set sd_rset;
if (sd < 0)
rb_bug("PQsocket(conn): couldn't fetch the connection's socket!");
if (rb_scan_args(argc, argv, "01", &timeout_in) == 1) {
timeout_sec = NUM2DBL(timeout_in);
timeout.tv_sec = (long)timeout_sec;
timeout.tv_usec = (long)((timeout_sec - (long)timeout_sec) * 1e6);
ptimeout = &timeout;
}
FD_ZERO(&sd_rset);
FD_SET(sd, &sd_rset);
ret = rb_thread_select(sd+1, &sd_rset, NULL, NULL, ptimeout);
if (ret == 0) {
return Qnil;
} else if (ret < 0) {
rb_sys_fail(0);
}
if ( (ret = PQconsumeInput(conn)) != 1 ) {
rb_raise(rb_ePGError, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn));
}
while ((notify = PQnotifies(conn)) != NULL) {
relname = rb_tainted_str_new2(notify->relname);
be_pid = INT2NUM(notify->be_pid);
PQfreemem(notify);
}
if (rb_block_given_p()) rb_yield( rb_ary_new3(2, relname, be_pid) );
return relname;
}