Cinder's io_service and async call backs

Hi all,

I am chugging along optimising and refining some master/slave code. at the moment I am working with UDP multicast from the master to the slaves, although there will be bi-directional TCP packets as well. I am trying to get my head around callbacks and threading in these cases.

Setting up the multicasting was my first time time working with ASIO directly. This was made simple by Cinder maintaining an io_service for me. I just added async callbacks such as:

	udpSocket->async_receive_from(
			asio::buffer(udpData, udpMax), udpEndpoint,
			bind(&Slave::udpHandleReceive, this,
					std::placeholders::_1,//error
					std::placeholders::_2));//bytes_transferred

Inside this callback I then load the received data into a custom class instance:

void Slave::udpHandleReceive( const asio::error_code &error, size_t bytes_recvd )
{
		ci::BufferRef buf = ci::Buffer::create(udpData, bytes_recvd);
		bytesInUdp.processBuffer(buf);
}

I then access the instance of bytesInUdp from an external class to this to do things with the received data.

My question is: If I am using the one ASIO io_service for everything does that mean that all these async calls can share the same resources and be thread safe? Or do I need to wrap bytesInUdp in a mutex whenever used?

I hope I made that clear! any advice appreciated as once I resolve this for UDP will take another look at TCP, which will be a lot more involved.

Cheers,
nay.

EDIT - I think this link indicates the one io_service is thread safe If I’m reading it correctly.

Hi nay,

To answer your question, the asio::io_service is not intrinsically ‘threaded’. It is simply a thread safe dispatcher. More specifically, Cinder’s io_service instance is not threaded, i.e. run on a separate thread. It’s simply poll-ed in Cinder’s App::_privateUpdate,( github is down right now but i would’ve provided the link), which simply means that all of the functions it has accumulated during the frame will dispatch if that function’s completion condition has been acheived. To make the io_service threaded you’d need to run an io_service, not Cinder’s, on a separate thread. Take a look at the SimpleMultiThreaded* samples in the OSC block to see what I mean.

Further, In terms of thread-safety, asio::io_service is internally thread-safe, which means that you can dispatch, post, add functions to it’s queue, cancel, run, reset, etc. without data races to the io_service object itself. However, the io_service has no idea about external data access, like the function you provided in your example. Therefore, if you were writing too and reading from data in separate threads, threads that you started up and ran the io_service on, they’d need to be fully-synchronized like anything multi-threaded.

Best,
Ryan Bartley

1 Like

Thanks Ryan,
seems I was confused as to whether creating an async callback was creating another thread behind the scenes that I needed to be careful with. But in this case I think I am happy to use Cinder’s io_service on the one thread.
Cheers,
nay.

Hello,

Just to add a comment, under the hood an io_service can utilize worker threads, but as Ryan pointed out the io_service interface you access (.poll) is just a thread safe dispatcher, so I doubt you would see any benefits from threading it.

Thanks, good to know.

At the moment I am seeing a performance issue where I have a slowly increasing backlog of received UDP messages. This is strange as when I receive a message I should be replacing any that are waiting to be processed in my code. I only need the most recent.

Am I correct in thinking that if multiple messages are received during a frame that the receive handler will just be called repeatedly until all messages are dealt with before the next frame? But In that case I shouldn’t have this backlog.

I have not been able to find a way to flush the socket. I have seen some references online to making the max buffer size equal to one packet, although my packet size could vary. However, my current max buffer size is quite small (64 bytes) and I definitely end up with a backlog of messages too large to fit in that.

Any advice? Im running out of ideas, aside from that it’s all running great!

I’m not sure I understand what you mean by backlog, as in you have outdated messages waiting for you and your completion handler is called multiple times? If you have a backlog is there not a way to determine the “last” message and only take that one.

I think what Nay wants to know is: has the socket code been implemented like this:
if( newMessage() ) notifyApplication( msg );
or this:
while( newMessage() ) notifyApplication( msg );

I remember from an issue long ago (2006-ish, my Macromedia Director period) that I wrote the first version, resulting in a “backlog” of unprocessed messages because only one message was processed per frame.

-Paul

When io_service::poll is called, all completion handlers (from processes that have completed) are called. If you only call read once per frame, my understanding which could be wrong is that then only one read completion handler will be called. Often times, the technique is to recursive call so that you empty the messages you’ve received. Something like this…

void SomeClass::Read() {
    socket.async_read( ... [&]( const asio::error_code &error, size_t bytesTransferred ) {
        if( ! error ) {
            // process message
            Read(); // read more
        }
    });
}

This calls the classes function again from within the completion, thereby adding another completion handler to the io_service queue, which will process directly after you return from the current completion function, if you have more messages.

If you’re already doing this and you are losing messages, as in you’re never receiving them, then you need to make your buffer size larger as they’re probably getting dropped. It is also non-deterministic what you’ll have in the udp buffer, because a datagram is seen by the os as non-important. So, if you’re buffer is too small it will disregard all newer messages for the “newest” message or just drop the “newest” messages all together.

I ran into a problem on a project a while ago, where I was sending data from multiple computers to a single computer and I wasn’t receiving A TON of the data. I broke each computer out to connect to a separate port on the receiving end and the problem was solved. Not saying that’s the fix here, just demonstrating that UDP will drop messages at will.

Thanks all,

@paul.houx - Thanks for clarifying what I was asking!

@ryanbartley - I was not calling read recursively just each frame so that could fix my issue. It’s not perfect but I have just implemented something like your snippet below and have left my test setup running to see how it does - fingers crossed!

I am happy to miss the occasional packet as long as the most recent is processed. In this case I am having a master machine multicast a series of key value pairs to 5 slave machines each frame. The slaves use these to sync their animation timelines. I plan to add TCP later for other types of messages.


void Slave::udpRead() {
	udpSocket->async_receive_from(
			asio::buffer(udpData, udpMax), udpEndpoint,
			bind(&Slave::udpHandleReceive, this,
					std::placeholders::_1,//error
					std::placeholders::_2));//bytes_transferred
}

void Slave::udpHandleReceive( const asio::error_code &error, size_t bytes_recvd )
{
	if (error) {
		CI_LOG_E("UDP error");
	}
	else {

       	        //if I have old messages extracted from previously received buffers I delete them here

                ci::BufferRef buf = ci::Buffer::create(udpData, bytes_recvd);

		//process new received buffer into messages

		udpRead();
	}
	
}

For sure. If you were only calling read once, it would’ve only taken the first x bytes from the buffer and the system buffer is FIFO. And as soon as the buffer is filled, no more datagrams are accepted. So, you’d need to recursive read out the buffer to get to that final message as well as allowing more messages to come through. I’d also recommend that this happen on a separate thread. That way, when you come around to a new frame you take whatever is on your stack as the latest and don’t miss a beat. Another way you could do it, is have the clients send the server a done signal and then wait for the server to send a go signal. This is the type of setup you’d need if you’re trying to keep the clients synced.