Spawning workers using multiple threads for IO


#1

Hi all,

For better or worse I’m trying to record a videostream to disk on Windows. After finally achieving the basics and recording a test animation to a movie file to disk ( see issue here ), now I need to do it using the camera stream in real-time. I’m using a Mako IP camera from AVT and wrapping their Vimba SDK. I have a stream coming in at ~30fps and can convert each frame from a surface to texture and draw to the screen without any visible lag or delay. However, when I try recording the frames to disk using Quicktime’s addFrame() method in the main update() loop, the framerate drops significantly. I tried creating a vector to temporarily store the frames in memory until I’m ready to write, but the frames are 2064x1544 and I run out of memory within a few seconds and the app crashes.

The only way I can think to get around this issue is multithreading… In pesudocode:
update(){
get frame from camera
convert frame to surface
spawn worker on detached thread
call addFrame() on thread
destroy worker
}

Is this logic flawed? I’ve tried looking through the Flickr multithreaded example, but I’m not trying to load/save files to disk necessarily, instead I’m trying to add them to the qtime::MovieWriter instance. I assume I don’t need to use a concurrentCircularBuffer because I’m not trying to communicate between the worker threads and the main GL context.

Thanks


#2

I’ve gotten around this issue using a hybrid of the methods you mentioned. I write frames to a thread safe queue ( boost::concurrent_circular_buffer<> or one of the their lockfree queues ), and then have a worker thread that pops available frames out and does the IO. From memory i ended up just dumping frames to disk and then using ffmpeg to reassemble them offline since that was a viable option for me, but calling MovieWriter::addFrame should work as long as you pass a fixed frame duration.

Pseudo-ish code, written inline and not tested, just a example of what i’m waffling on about.

class App
{
    void setup()
    {
        // setup camera etc

        _worker = std::thread ( [&]
        {
            static int kNextFrame = 0;

            while ( _recording )
            {
                while ( !_queue.empty() )
                {
                    SurfaceRef frame = _queue.pop();
                    writeImage ( "Frame_" + std::to_string ( kNextFrame++ ) + ".png", frame );
                    // or _writer->addFrame ( frame, 1.0 / 30.0f ); 
                }
            }
        } );
    }

    void update()
    {
        if ( _camera->checkNewFrame() )
        {
            _queue.push ( _camera->getSurface() );
        }
    }
     
    ThreadSafeQueue     _queue; // Which ever one you go with
    std::thread         _worker;
    CaptureRef          _camera;
    std::atomic_bool    _recording;
}

#3

Thanks! The logic makes sense and is close to what I’d manage to implement on my own. Using your advice, I managed to get the worker running as follows:

ConcurrentCircularBuffer<Surface8u>	*mFrameQueue;
std::thread				mWorkerThread;

setup(){
  
  mFrameQueue = new ConcurrentCircularBuffer<Surface8u>(90);
   
  mWorkerThread = std::thread([&] {
	console() << "CREATING WORKER THREAD " << std::endl;
	static int kNextFrame = 0;
	static bool running = true;
	while (running)
	{
		while (mFrameQueue->isNotEmpty()) {
		Surface8u frame;
		mFrameQueue->popBack(&frame);
		writeImage("Frame_" + std::to_string(kNextFrame++) + ".png", frame);
		//mMovieWriters[mMovieWriters.size() - 1]->addFrame(frame, 1.f / 30.f);
		};
	};
});

}

update()
{
	mFrameQueue->pushFront(*mCameraManager.getSurface()); // checks internally if a new frame is available
	console() << "update: size of mFrameQueue: " << mFrameQueue->getSize() << std::endl;
}

It works initially, but throws an exception and breaks relatively quickly throwing the following error:

Microsoft C++ exception: std::bad_alloc at memory location 0x0019F8BC.

I assume this is because I run out of available memory? Through experimentation and logging to the console, it looks like my worker can write a frame once for every 6/7 frames captured from the camera. As far as my understanding goes, a circular buffer starts to overwrite data once it reaches capacity… So say my circularbuffer has capacity for 90 frames, once the 90 frames have been generated it begins to overwrite them… shouldn’t my worker then need to be able to keep pace to write frames as fast as they’re produced in order to prevent overwriting data in the circular buffer? Also, if data is theoretically being overwritten and the capacity is fixed, shouldn’t that prevent memory leaks or running out of allocated memory?

Thanks again!


#4

I’m not sure why you’re crashing, but one thing that is immediately obvious is that you’re doing a lot of copying of image data. Try and use a SurfaceRef everywhere you’re currently using a Surface8u and see if that makes any difference.


#5

For non-blocking behavior, you might want to use mFrameQueue->tryPushFront. This way, you will get the frame skipping behavior you are expecting.


#6

Thanks for the tip, that saves me having to handle it internally!


#7

You were quite right, though I don’t see much improvement RE write speed, which is to be expected I suppose. I originally copied the surface every frame as I used an observer to notify my app every time a new frame was available.

A surface is created upon a new frame being available ( or rather a global Surface has it’s data updated to save the constructor call every frame), the observer then passes a reference to that surface to the main app, which is in turn added to the queue. My thinking was that because I was passing a reference to a temporary object, I’d need to copy it before the data was overwritten when the next frame becomes available. So, for example, if I pass a reference to the newly acquired frame to the queue at frame 55 (random example), by the time the frame gets written to disk / added to the moviewriter object on the thread, many frames might have passed due to the worker taking it’s time to do the IO, so say for example the Surface8uRef that was added to the queue on frame 55 only gets processed on frame 90, the pointer no longer points to the data acquired at frame 55?


#8

That would depend on whether the camera updates an internal SurfaceRef or creates a new one altogether. If it’s the former, your pushing it into a container merely extends its life via the ref-counting mechanism, but if it’s updating the same Surface you will have a collection of identical surfaces, all with the same data, and will have all kind of race conditions on the IO thread.

If that is the case, you’ll need to push a copy of the data into your queue. It’s still a good idea to use a Surface8uRef because this prevents another copy when you’re popping it out of the container, and may help with your memory problems. (Assuming that’s what’s going wrong).

mFrameQueue->pushFront ( Surface8u::create ( mCameraManager.getSurface()->clone() ) );

A.