class SysVMQ
Constants
- IPC_CREAT
- IPC_EXCL
- IPC_INFO
- IPC_NOWAIT
- IPC_RMID
- IPC_SET
- IPC_STAT
Public Class Methods
new(p1, p2, p3)
click to toggle source
VALUE sysvmq_initialize(VALUE self, VALUE key, VALUE buffer_size, VALUE flags) { sysvmq_t* sysv; size_t msgbuf_size; // TODO: Also support string keys, so you can pass '0xDEADC0DE' Check_Type(key, T_FIXNUM); Check_Type(flags, T_FIXNUM); Check_Type(buffer_size, T_FIXNUM); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); // (key_t) is a 32-bit integer (int). It's defined as `int` (at least on OS X // and Linux). However, `FIX2INT()` (from Ruby) will complain if the key is // something in the range 2^31-2^32, because of the sign bit. We use UINT to // trick Ruby, so it won't complain. sysv->key = (key_t) FIX2UINT(key); while ((sysv->id = msgget(sysv->key, FIX2INT(flags))) < 0) { if (errno == EINTR) { rb_thread_wait_for(polling_interval); // TODO: Really necessary here? continue; } rb_sys_fail("Failed opening the message queue."); } // Allocate the msgbuf buffer once for the instance, to not allocate a buffer // for each message sent. This makes SysVMQ not thread-safe (requiring a // buffer for each thread), but is a reasonable trade-off for now for the // performance. sysv->buffer_size = (size_t) FIX2LONG(buffer_size + 1); msgbuf_size = sysv->buffer_size * sizeof(char) + sizeof(long); // Note that this is a zero-length array, so we size the struct to size of the // header (long, the mtype) and then the rest of the space for message buffer. sysv->msgbuf = (sysvmq_msgbuf_t*) xmalloc(msgbuf_size); return self; }
Public Instance Methods
destroy()
click to toggle source
static VALUE sysvmq_destroy(VALUE self) { VALUE argv[1]; argv[0] = INT2FIX(IPC_RMID); return sysvmq_stats(1, argv, self); }
receive(*args)
click to toggle source
VALUE sysvmq_receive(int argc, VALUE *argv, VALUE self) { VALUE type = INT2FIX(0); VALUE flags = INT2FIX(0); sysvmq_t* sysv; sysvmq_blocking_call_t blocking; if (argc > 2) { rb_raise(rb_eArgError, "Wrong number of arguments (0..2)"); } if (argc >= 1) type = argv[0]; if (argc == 2) flags = argv[1]; TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); Check_Type(type, T_FIXNUM); Check_Type(flags, T_FIXNUM); // Attach blocking call parameters to the struct passed to the blocking // function wrapper. blocking.flags = FIX2INT(flags); blocking.type = FIX2LONG(type); blocking.sysv = sysv; // Initialize error so it's never a garbage value, if // `sysvmq_maybe_blocking_receive` was interrupted at a non-nice time. blocking.error = UNINITIALIZED_ERROR; blocking.length = UNINITIALIZED_ERROR; if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) { while(sysvmq_maybe_blocking_receive(&blocking) == NULL && blocking.error < 0) { if (errno == EINTR) { continue; } rb_sys_fail("Failed recieving message from queue"); } } else { // msgrcv(2) can block sending a message, if IPC_NOWAIT is not passed. // We unlock the GVL waiting for the call so other threads (e.g. signal // handling) can continue to work. Sets `length` on `blocking` with the size // of the message returned. while (WITHOUT_GVL(sysvmq_maybe_blocking_receive, &blocking, RUBY_UBF_IO, NULL) == NULL && blocking.error < 0) { if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) { continue; } rb_sys_fail("Failed receiving message from queue"); } } // Guard it.. assert(blocking.length != UNINITIALIZED_ERROR); // Reencode with default external encoding return rb_str_new(sysv->msgbuf->mtext, blocking.length); }
send(*args)
click to toggle source
VALUE sysvmq_send(int argc, VALUE *argv, VALUE self) { VALUE message; VALUE priority = INT2FIX(1); VALUE flags = INT2FIX(0); sysvmq_blocking_call_t blocking; sysvmq_t* sysv; if (argc > 3 || argc == 0) { rb_raise(rb_eArgError, "Wrong number of arguments (1..3)"); } message = argv[0]; if (argc >= 2) priority = argv[1]; if (argc == 3) flags = argv[2]; message = rb_funcall(message, rb_intern("to_s"), 0); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); Check_Type(flags, T_FIXNUM); Check_Type(priority, T_FIXNUM); // TODO: Call to_s on message if it responds to // Attach blocking call parameters to the struct passed to the blocking // function wrapper. blocking.flags = FIX2INT(flags); blocking.size = RSTRING_LEN(message); blocking.sysv = sysv; // See msgrcv(2) wrapper blocking.error = UNINITIALIZED_ERROR; blocking.length = UNINITIALIZED_ERROR; // The buffer can be obtained from `sysvmq_maybe_blocking_send`, instead of // passing it, set it directly on the instance struct. sysv->msgbuf->mtype = FIX2INT(priority); if (blocking.size > sysv->buffer_size) { rb_raise(rb_eArgError, "Size of message is bigger than buffer size."); } // TODO: Can a string copy be avoided? memcpy(sysv->msgbuf->mtext, RSTRING_PTR(message), blocking.size); // Non-blocking call, skip the expensive GVL release/acquire if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) { while(sysvmq_maybe_blocking_send(&blocking) == NULL && blocking.error < 0) { if (errno == EINTR) { continue; } rb_sys_fail("Failed sending message to queue"); } } else { // msgsnd(2) can block waiting for a message, if IPC_NOWAIT is not passed. // We unlock the GVL waiting for the call so other threads (e.g. signal // handling) can continue to work. while (WITHOUT_GVL(sysvmq_maybe_blocking_send, &blocking, RUBY_UBF_IO, NULL) == NULL && blocking.error < 0) { if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) { continue; } rb_sys_fail("Failed sending message to queue"); } } return message; }
stats(*args)
click to toggle source
static VALUE sysvmq_stats(int argc, VALUE *argv, VALUE self) { struct msqid_ds info; VALUE info_hash; VALUE cmd; sysvmq_t* sysv; // Optional argument handling if (argc > 1) { rb_raise(rb_eArgError, "Wrong number of arguments (0..1)"); } // Default to IPC_STAT cmd = argc == 1 ? argv[0] : INT2FIX(IPC_STAT); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); // TODO: Does FIX2INT actually perform this check already? Check_Type(cmd, T_FIXNUM); while (msgctl(sysv->id, FIX2INT(cmd), &info) < 0) { if (errno == EINTR) { rb_thread_wait_for(polling_interval); continue; } rb_sys_fail("Failed executing msgctl(2) command."); } // Map values from struct to a hash // TODO: Add all the fields // TODO: They are probably not ints.. info_hash = rb_hash_new(); rb_hash_aset(info_hash, ID2SYM(rb_intern("count")), INT2FIX(info.msg_qnum)); rb_hash_aset(info_hash, ID2SYM(rb_intern("maximum_size")), INT2FIX(info.msg_qbytes)); // TODO: Can probably make a better checker here for whether the struct // actually has the member. // TODO: BSD support? #ifdef __linux__ rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.__msg_cbytes)); #elif __APPLE__ rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.msg_cbytes)); #endif return info_hash; }