Jump to content

Programming Reference:SignalSharingDemoClient C++ App: Difference between revisions

From BCI2000 Wiki
Mellinger (talk | contribs)
Mellinger (talk | contribs)
Line 17: Line 17:
File:SignalSharingDemo2.PNG|Client application displaying data
File:SignalSharingDemo2.PNG|Client application displaying data
</gallery>
</gallery>
==Filter Code==
The code example uses a pointer to an internal private <tt>struct</tt> to hide implementation details from the outer header file of the filter class (PIMPL idiom).
===Declaration of internal variables===
<syntaxhighlight lang="cpp">
#include "Thread.h"
#include "WaitableEvent.h"
#include "Sockets.h"
#include "Streambuf.h"
struct SignalSharingDemoFilter::Private
{
  // A socket for a connection to the client application.
  ClientTCPSocket mSocket;
  // A signal to be shared.
  GenericSignal mSharedSignal;
  // An event to notify the sender thread when new signal data is available.
  WaitableEvent mSignalAvailable;
  // An object to wrap a call to SenderThreadFunc().
  MemberCall<void(Private*)> mSenderThreadCall;
  // The sender thread.
  Thread mSenderThread;
  Private();
  void SenderThreadFunc();
};
</syntaxhighlight>
===Initialization of internal variables===
<syntaxhighlight lang="cpp">
SignalSharingDemoFilter::Private::Private()
: mSenderThreadCall(&Private::SenderThreadFunc, this),
  mSenderThread(&mSenderThreadCall, "Signal Sharing Demo Sender"),
  mSignalAvailable(true)
{
  mSocket.SetTCPNodelay(true);
  mSocket.SetFlushAfterWrite(true);
}
</syntaxhighlight>
===Sender thread===
The sender thread waits for the "signal available" event to be triggered, and then calls <tt>Serialize()</tt> on the shared signal in order to notify the client.
Using an extra thread is not a must but moves any time delay due to sending data over the network from the main thread into the extra thread. Otherwise, a slow connection or a large amount of data would contribute to the system's [[User_Reference:Timing#Measuring_Source-to-Stimulus_Delay|processing delay]].
<syntaxhighlight lang="cpp">
void SignalSharingDemoFilter::Private::SenderThreadFunc()
{
    if (mSocket.Connected())
    {
        UnbufferedIO buf;
        buf.SetOutput(&mSocket.Output());
        // A message channel wraps up BCI2000 objects into BCI2000 messages.
        MessageChannel messageChannel(buf);
        messageChannel.Send(mSharedSignal.Properties()); // send signal properties first
        while (mSignalAvailable.Wait()) // will return false when Thread::Terminate() has been called
        {
            if (!messageChannel.Send(mSharedSignal)) // will transmit memory reference, or signal, depending on
                                                    // whether ShareAcrossModules() has been called
            {
                bciwarn << "Could not send signal, giving up";
                mSenderThread.Terminate();
            }
        }
        messageChannel.Send(GenericSignal()); // empty signal to indicate end of data
    }
}
</syntaxhighlight>
===Filter <tt>Initialize()</tt> function===
The filter connects to the local or remote network address in the <tt>SignalSharingDemoClientAddress</tt> parameter and allocates signal data according to input
signal properties. If the connection is through a local socket, the filter switches the signal to shared memory by calling <tt>GenericSignal::ShareAcrossModules()</tt>. Otherwise, it proceeds with a standard <tt>GenericSignal</tt>, which is transmitted in full rather than as a shared memory reference.
<syntaxhighlight lang="cpp">
void
SignalSharingDemoFilter::Initialize(const SignalProperties& Input, const SignalProperties& Output)
{
  std::string addr = Parameter("SignalSharingDemoClientAddress");
  if(!addr.empty())
  { // connect to the client side
    p->mSocket.Open(addr);
    // initialize shared signal from input signal properties
    p->mSharedSignal = GenericSignal(Input);
    if(!p->mSocket.Connected())
    {
      bciwarn << "Cannot connect to " << addr;
    }
    else if(p->mSocket.Connected() == Socket::local) // only use shared memory when connected locally
    {
      p->mSharedSignal.ShareAcrossModules();
      bciwarn << "Locally connected to " << addr << ", using shared memory";
    }
    else // otherwise, full data will be transmitted over network (slower)
    {
      bciwarn << "Remotely connected to " << addr << ", transmitting data through network";
    }
  }
}
</syntaxhighlight>
===Filter <tt>StartRun()</tt> and <tt>StopRun()</tt> functions===
<tt>StartRun()</tt> and <tt>StopRun()</tt> start resp. stop the sender thread.
<syntaxhighlight lang="cpp">
void
SignalSharingDemoFilter::StartRun()
{
  p->mSenderThread.Start();
}
void
SignalSharingDemoFilter::StopRun()
{
  p->mSenderThread.Terminate();
}
</syntaxhighlight>
===Filter <tt>Process()</tt> function===
The <tt>Process()</tt> function calls <tt>GenericSignal::AssignValues()</tt> to assign the shared signal's entries without affecting its shared memory representation. Then, it sets the "signal available" event to the signaled state.
<syntaxhighlight lang="cpp">
void
SignalSharingDemoFilter::Process(const GenericSignal& Input, GenericSignal& Output)
{
  Output = Input;
  p->mSharedSignal.AssignValues(Input);
  p->mSignalAvailable.Set();
}
</syntaxhighlight>


==Client application code==
==Client application code==

Revision as of 13:15, 2 August 2024

Location

src/core/SignalProcessing/SignalSharingDemo

Synopsis

The SignalSharingDemo client demonstrates how to receive signal data from BCI2000, using shared memory. SignalSharing allows to tap into BCI2000 processing by receiving any filter output signal through a combination of a TCP connection, and shared memory.

Function

The SignalSharing component in BCI2000 shares its input signal through a GenericSignal object which has been linked to a shared memory block using GenericSignal::ShareAcrossModules(). A dedicated thread waits for signal updates, and sends signal data out to a separate application waiting on a TCP/IP connection.

When the client application is running on a separate machine, full signal data are sent over the network. When the client is running on the same machine, only a reference to a shared memory block is sent. On the application side, unserializing the signal will transparently bind it to the shared memory block if available.

The client application visualizes signal data by plotting normalized signals on a circle.

Client application code

Declaration of internal variables

#include "SignalSharingDemoWidget.h"

#include "Sockets.h"
#include "StringUtils.h"
#include "Streambuf.h"
#include "Thread.h"
#include "GenericSignal.h"
#include "Synchronized.h"
#include "Runnable.h"

#include <QPaintEvent>
#include <QPainter>

struct SignalSharingDemoWidget::Private
{
  SignalSharingDemoWidget* mpSelf;
  std::vector<QColor> mSignalColors;

  ServerTCPSocket mListeningSocket;
  Synchronized<bool> mConnected;
  SynchronizedObject<GenericSignal> mpSignal;

  Thread mThread;
  void ThreadFunc();
  MemberCall<void(Private*)> mThreadCall;
  void Invalidate();
  Private();
};

Initialization of internal variables

SignalSharingDemoWidget::Private::Private()
: mThreadCall(&Private::ThreadFunc, this),
  mThread(&mThreadCall, "SignalSharingDemoWidget listening/receiving thread")
{
  mSignalColors.resize(8);
  for(int i = 0; i < mSignalColors.size(); ++i)
    mSignalColors[i].setHsvF(i*1.0/mSignalColors.size(), 1, 0.9);
}

Receiving thread function

By inheriting from the MessageChannel class, the Private object will receive dispatched messages through overridden functions:

struct SignalSharingDemoWidget::Private : MessageChannel
{
    ...

    // MessageChannel overrides
    bool OnVisSignalProperties(std::istream&) override;
    bool OnVisSignal(std::istream&) override;
};

void SignalSharingDemoWidget::Private::ThreadFunc()
{
    while (!mThread.Terminating())
    { // wait for a connection
        while (mListeningSocket.Input().Wait())
        { // accept pending connection
            ClientTCPSocket clientSocket;
            if (mListeningSocket.WaitForAccept(clientSocket, 0))
            {
                mConnected = true;
                Invalidate();
                mBuffer.SetInput(&clientSocket.Input());
                MessageBuffer::ClearIOState();
                while (clientSocket.Input().Wait()) // will be interrupted by Thread::Terminate()
                {
                    MessageChannel::HandleMessage(); // will dispatch to our overridden functions
                                                     // as appropriate
                }
                *mpSignal.Mutable() = GenericSignal();
                mConnected = false;
                Invalidate();
            }
        }
    }
}

bool SignalSharingDemoWidget::Private::OnVisSignalProperties(std::istream& is)
{
    VisSignalProperties properties; // signal properties are wrapped into VisSignalProperties
    properties.Unserialize(is);
    // The only information we need from signal properties is the sampling rate
    // because the signal's dimensions will be transported by signal messages as well.
    double sampleDuration = properties.SignalProperties().ElementUnit().RawToPhysicalValue(1);
    mSamplingRate = 1.0 / sampleDuration;
    return true; // indicate we read our data from the stream
}

bool SignalSharingDemoWidget::Private::OnVisSignal(std::istream& is)
{
    VisSignal visSignal; // signals are wrapped into VisSignal messages
    visSignal.Unserialize(is);
    mpSignal.Mutable()->AssignValues(visSignal.Signal()); // get current signal content
    Invalidate(); // request window update
    return true; // indicate we read our data from the stream
}

The widget's paintEvent()

void
SignalSharingDemoWidget::paintEvent(QPaintEvent* ev)
{
  ev->accept();
  WithLocked(pSignal = p->mpSignal.Const()) // lock the signal while reading from it
  {
    if(pSignal->Empty())
    {
      QPainter painter(this);
      painter.fillRect(ev->rect(), Qt::gray);
      painter.setPen(Qt::white);
      painter.drawText(geometry(), Qt::AlignCenter, 
        p->mConnected ? "Waiting for signal ..." : "Waiting for connection ...");
    }
    else
    {
       // draw some visualization into the widget
       ...
    }
  }
}

Parameters

SignalSharingDemoClientAddress

IP address and port number of the client application.

See also

User Reference:SignalSharing, Programming Reference:GenericSignal Class, SignalSharing Python Demo