Receive stream using ASIO TCP


#1

Hello cinder universe,

I’m trying to connect to a push API on a local network that requires an initial subscription, then spits out streaming data as events occur. I’ve managed to get a version working in Python that sends the initial socket message and then instantiates a server that listens on the port targeted by the event stream. The API only accepts TCP connections, as far as I can tell. I’m trying to integrate into Cinder so that I can make use of the data coming from the API. I’ve been using the Cinder-Asio block. I’ve separated it into different apps because I got confused about how the session is handled when the app has a TcpClient and TcpServer together.

The workflow should be as follows:

  • Send message over TCP telling the API I want to subscribe
  • Create a TCP server to listen for data
  • Process data as it arrives

I’ve been successful in sending the subscribe after creating a TCP client:

   // tell the API that we're subscribing and will be listening on port 4444
    if ( mSession && mSession->getSocket()->is_open() ) {
        mJson.addChild( JsonTree("command", "auto_subscribe") );
        mJson.addChild( JsonTree("port", 4444));
        std::string mString = mJson.serialize();
        mSession->write( TcpSession::stringToBuffer( mString ) );
}

Once I’ve done this, I know that data is being pushed because I can read it using my python script without having to send the subscription message again.

So I’m trying to create a TcpServer, listen on the desired port, and read the data. From what I understand, I shouldn’t be closing the session or canceling the server after each message is received, but I never seem to receive any data from the onRead handler, instead only the onReadComplete handler ever fires. The TcpSession appears to have a TcpSocket as a member, which I would think should be handling be used to keep the connection open, but I’m not sure if/how it does.

The console output looks as follows:

Listening on port: 3445
Connected
Read Complete
Connected
Read Complete
Connected
Read Complete
Connected
Read Complete
Connected
etc...

Here’s the full app:


#include "CinderAsio.h"
#include "cinder/app/App.h"
#include "cinder/Font.h"
#include "cinder/gl/gl.h"
#include "cinder/params/Params.h"
#include "cinder/Json.h"
#include "TcpServer.h"

class TcpServerApp : public ci::app::App
{
public:
    void                        draw() override;
    void                        setup() override;
    void                        update() override;
    void                        subscribe();
    
private:
    void                        accept();
    TcpServerRef                mServer;
    TcpSessionRef               mSession;
    TcpSocketRef                mSocket;
    int32_t                     mPort;
    
    void                        onAccept( TcpSessionRef session );
    void                        onCancel();
    void                        onClose();
    void                        onError( std::string err, size_t bytesTransferred );
    void                        onRead( ci::BufferRef buffer );
    void                        onReadComplete();
    void                        onWrite( size_t bytesTransferred );
    
    ci::JsonTree                mJson;
};

#include "cinder/app/RendererGl.h"
#include "cinder/Text.h"
#include "cinder/Utilities.h"

using namespace ci;
using namespace ci::app;
using namespace std;


void TcpServerApp::setup()
{
    mPort = 3445;
    mServer = TcpServer::create( io_service() );
    mServer->connectAcceptEventHandler( &TcpServerApp::onAccept, this );
    mServer->connectCancelEventHandler( &TcpServerApp::onCancel, this );
    mServer->connectErrorEventHandler( &TcpServerApp::onError, this );
    accept(); // start listening
}

void TcpServerApp::update()
{
    // App's IO_service polls automatically
}

void TcpServerApp::accept()
{
    if ( mServer ) {
        mServer->accept( (uint16_t)mPort );
        cout << "Listening on port: " + toString( mPort ) << std::endl;
    }
}

void TcpServerApp::subscribe() {
    mJson.addChild( JsonTree("command", "auto_subscribe"));
    mJson.addChild( JsonTree("port", 3445));
    std::string mString = mJson.serialize();
    mSession->write( TcpSession::stringToBuffer( mString));
}

void TcpServerApp::draw()
{
    gl::clear( Colorf::black() );
}

void TcpServerApp::onAccept( TcpSessionRef session )
{
    cout << "Connected" << std::endl;
    mSession = session;
    
    mSession->connectCloseEventHandler( &TcpServerApp::onClose, this );
    mSession->connectErrorEventHandler( &TcpServerApp::onError, this );
    mSession->connectReadCompleteEventHandler( &TcpServerApp::onReadComplete, this );
    mSession->connectReadEventHandler( &TcpServerApp::onRead, this );
    mSession->connectWriteEventHandler( &TcpServerApp::onWrite, this );

//    subscribe();
    
    // Start reading data from the client.
    mSession->read();
  
}

void TcpServerApp::onCancel()
{
    cout << "Canceled" << std::endl;
    accept();
}

void TcpServerApp::onClose()
{
   cout <<"Disconnected" << std::endl;
   accept();
}

void TcpServerApp::onError( string err, size_t bytesTransferred )
{
     if ( !err.empty() ) cout <<"Error: " << err <<std::endl;
}

void TcpServerApp::onRead( BufferRef buffer )
{
    cout << "reading" << std::endl;
    cout<< toString( buffer->getSize() ) + " bytes read" << std::endl;
    string response    = TcpSession::bufferToString( buffer );
    cout<< response << std::endl;
}

// This event is triggered when the connection is closed
// remotely.
void TcpServerApp::onReadComplete()
{
    cout << "Read Complete" << std::endl;

//     we don't want to close the connection just yet...
//    mSession->close();
//    mServer->cancel();
}

void TcpServerApp::onWrite( size_t bytesTransferred )
{
    cout << toString( bytesTransferred ) + " bytes written" << std::endl;
    // Read after writing to look for an EOF, or disconnect signal from the client.
    mSession->read();
}


CINDER_APP( TcpServerApp, RendererGl )


Please help!


#2

Does adding a mSession->read() in onReadComplete help?


#3

Hi @balachandran_c,

That makes no difference. Console output is now:

Listening on port: 3445
[onAccept] Connected
Read Complete
Read Complete
Read Complete
Read Complete
Read Complete
etc...

It seems counter-intuitive that the onReadComplete fires without onRead firing. Event if the buffer was empty, it should say as such.

I also tried adding a read to the update() loop:

 if (mConnected && mSession->getSocket()->is_open()) mSession->read();

But that also didn’t help.


#4

The python socket is not doing anything special:

class EventHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        self.request.settimeout(2)
        data = ""
        buf = self.request.recv(4096)
        while buf != "":
            data += buf
            buf = self.request.recv(4096)
        try:
            data = json.loads(data)
            print(json.dumps(data, indent=4))
            return
        except ValueError:
            pass
        print('Not JSON data. {}'.format(data))

It’s also a TcpServer, so I can’t see why the Asio server behaves differently.


#5

Could you share the bit where you are sending data from the python side too?


#6

The push API I’m interacting with is a black box. I wrote the python script to prototype before moving to Cinder, so the python server is performing the same task I want my Cinder App to do.


#7

Is it possible that in this case the server is closing the connection before sending data?


#8

I thought that could be a possibility - but then the standard Asio TcpServer sample should work, as the Client sample is set to behave that way…


#9

Should subscribe() be commented out in accept() ?


#10

Yes, the subscribe message only needs to be sent once, at which point the server hosting the API remembers the IP and port where it needs to stream data. I can test this by running the python script and watch data stream in.


#11

I think you should only use the client. In this setup the remote server will response to the client and as long nobody closes the connection the client will receive messages.
The client example has a method onRead
void TcpClientApp::onRead( ci::BufferRef buffer ).