Skip to content

Commit

Permalink
prepare for initial release
Browse files Browse the repository at this point in the history
  • Loading branch information
OWarneke committed Mar 19, 2021
1 parent 873571b commit c804f61
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 124 deletions.
5 changes: 5 additions & 0 deletions Examples/BufferedExchange/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 1,5 @@
bin/
external/
intermediate/
CmakeLists.txt
plcnext.proj
106 changes: 95 additions & 11 deletions Examples/BufferedExchange/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 6,15 @@
3. [Preconditions](#preconditions)
4. [Project compiling in Eclipse](#project-compiling-in-eclipse)
5. [PLCnext Engineer project](#plcnext-engineer-project)
6. [Project Execution](#project-execution)
6. [Application Description](#application-description)
7. [FileStream Operations ](#filestream-operations)
8. [Exception Handling](#exceptions)
9. [General Notes](#general-notes)
<!-- /TOC -->

# Introduction

This Example shows how to use a FileStream to check the binary generation timestamp whenever a Component is reinitialized.
This example shows how to transfere and process larger amounts of data from your C RealTime application without impacing the RealTime.

## Example details

Expand Down Expand Up @@ -44,23 44,107 @@ This Example shows how to use a FileStream to check the binary generation timest
1. Instantiate the "BufferedExchangeProgram" under a previously defined task.
1. Download the PLCnext Engineer project to the PLCnext Control.

## Project Execution
## Application Description

In this Project we show how to publish data from a [RealTimeProgram](src/BufferedExchangeProgram.cpp) in a cyclic manner and store them in a component for further processing inside a thread.
In this Project we show how to publish data from a [RealTimeProgram](src/BufferedExchangeProgram.cpp) in a cyclic manner and storage them in the [PLMComponent](src/BufferedExchangeComponent.hpp) for further processing. The processing happens inside a thread and by doing that we decouple this processing from the RealTime sceduling of the Program.


The [class MyWorker](src/MyWorker.hpp ) holds all the data and methods the Thread is interacting with.
It implenents methods for each Type of thread as well as an interface to publish data.
The class [MyWorker](src/MyWorker.hpp ) holds all the data and methods the Thread is interacting with.
It implenents methods for each Type of thread as well as an interface to recive data.

The data storage is realised by swaping two queues.
One to recieve data and another Queue for processing.
One `std::Queue` to recieve data and another `std::Queue` for processing.
Whenever data are added or beeing processed the Queues are locked up using a Mutex to aussure thread safety.
This process works fine in a single producer/single consumer environment.
As long as the time requirement or the forwarded data are not to large it will also work fine with multiple producers.

Watch out the `SetData()` function is blocking, it actively waits for the Mutex to be locked and might infulence your RealTime.
You can switch the Lock() to TryLock() and add a timeout loop if necessary.
Watch out the `SetData()` function is curently implemented as blocking, it actively waits for the Mutex to be locked and this might infulence your RealTime. If you are producing more data then you can handle.
You can switch this Blocking behavior from `Mutex.Lock()` to `Mutex.TryLock()` and add a timeout loop if necessary.
e.g
```cpp
while(!Mutex.TryLock() && !timeout){}
```

```cpp
///BufferedExchangeComponent.hpp
MyWorker wD; // Thread execution environment
Thread delegateThread;
...
///BufferedExchangComponent.cpp
delegateThread(ThreadSettings("-DelegateThread", 20, 0, 0), Arp::make_delegate(&wD, &BufferedExchange::MyWorker::Run))
```
This means that the `MyWorker::Run` method of the instance `wD` is called.
Every Task Cycle the BufferedExchangeProgram::Exceute() method pushes data to the MyWorker instance `wD`.
```cpp
/// BufferedExchangeProgram.cpp
bufferedExchangeComponent.wD.SetData(count)
```
Every 1000 Cycles the Execute() method will print a hint that it is still running. If data can not be set a Warning is beeing written to the Output.log file.
When data are beeing stored we also store a timestamp that shows the passed time since the last data were added.
```cpp
data_storage.push(std::pair<double, int>( duration_cast<duration<double>>(time - last_time).count(), x));
```

The method `MyWorker::process()` is responsible for processing the collected data.
It prints the first and last element of a queue, this way we are able to compare the timestamps of the the critical area of swaping between `Queue::data_storage` and `queue::data_toproceess`. This timestamp should always approximatly show the TaskCycle of the RealTime sheduled Program.

### Example Output
This Example Output shows a ESM Task with a cycle time of 1 ms.
Because the Thread sleep time is set to 1000ms and the MAX_QUEUE_SIZE is set to 1000 elements this is an area where a slight jitter in the cycletime of the Thread (sleep(999) execution time of process()) can cause an OutOfMemory exception to occure.
```cpp
//"Endless loop" for normal thread
void Run(void *t)
{
while (!this->Stop)
{
RunSingle();
// Sleep for X ms until next RunSingle execution.
Arp::Thread::Sleep(999);
}
};
```
So if processing the stored data plus the sleep time results in more then 1000ms the storage might overflow.
It is important that you take these cases as well as external factors (another application might take all the free memory!) into consideration and handle them according to your application requirements.
It also is recomended to set fixed limits for your application storage size as this will make testing the boundaries much easier.
Also other users of your library will be able to determine how much resources your appliication requires at max. and set the dimension of their applicaiton.
```bash
27.03.18 17:36:15.417 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 0
27.03.18 17:36:15.433 root INFO - -----------------DelegateThread Swap queue pointers data_store<<-->>data_toprocess
27.03.18 17:36:16.417 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 1000
27.03.18 17:36:16.434 root INFO - ---------------- -DelegateThread firstElement: time:362614 data:1
27.03.18 17:36:16.435 root INFO - -----------------DelegateThread lastElement time:0.00099717 data:17
27.03.18 17:36:16.435 root INFO - -----------------DelegateThread Swap queue pointers data_store<<-->>data_toprocess
27.03.18 17:36:17.417 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 2000
27.03.18 17:36:17.435 root INFO - ---------------- -DelegateThread firstElement: time:0.0012245 data:18
27.03.18 17:36:17.436 root INFO - -----------------DelegateThread lastElement time:0.00102634 data:1018
27.03.18 17:36:17.436 root INFO - -----------------DelegateThread Swap queue pointers data_store<<-->>data_toprocess
27.03.18 17:36:18.417 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 3000
27.03.18 17:36:18.436 root INFO - ---------------- -DelegateThread firstElement: time:0.00120248 data:1019
27.03.18 17:36:18.437 root INFO - -----------------DelegateThread lastElement time:0.00100479 data:2019
27.03.18 17:36:18.437 root INFO - -----------------DelegateThread Swap queue pointers data_store<<-->>data_toprocess
27.03.18 17:36:19.417 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 4000
27.03.18 17:36:19.437 root INFO - ---------------- -DelegateThread firstElement: time:0.00130645 data:2020
27.03.18 17:36:19.438 root INFO - -----------------DelegateThread lastElement time:0.00100336 data:3020
27.03.18 17:36:19.444 root ERROR - --- Exception in ThreadProcess:T_FastTask
27.03.18 17:36:19.444 root ERROR - --- DataStore has to many elements already!
27.03.18 17:36:19.444 root ERROR - --- Process Queue:0, Store Queue:1001
27.03.18 17:36:19.444 BufferedExchange.BufferedExchangeProgram WARN - -------------- Instance:BufferedExchangeComponent1/BufferedExchangeProgram1 DataLost: 4022
27.03.18 17:36:19.445 root INFO - -----------------DelegateThread Swap queue pointers data_store<<-->>data_toprocess
27.03.18 17:36:20.423 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 5000
27.03.18 17:36:20.445 root INFO - ---------------- -DelegateThread firstElement: time:0.00134554 data:3021
27.03.18 17:36:20.446 root INFO - -----------------DelegateThread lastElement time:0.00101738 data:4021
27.03.18 17:36:21.423 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 6000
27.03.18 17:36:22.423 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 7000
27.03.18 17:36:23.423 BufferedExchange.BufferedExchangeProgram INFO - ---------------- Execute: 8000
```

## General notes:
Please make sure that your Buffer meets your speed requirements and implement a overflow exception.
You have to be able to process the data faster then they are created or else this setup will create issues.

You have to be able to process the data faster then they are beeing created or else this setup will create issues.

The logging is used mainly for demonstration purposes in this application in productive systems it should now be used as excecively e.g. Change `Log.Info()` to `Log.Debug()`.
62 changes: 29 additions & 33 deletions Examples/BufferedExchange/src/BufferedExchangeComponent.cpp
Original file line number Diff line number Diff line change
@@ -1,21 1,17 @@
#include "BufferedExchangeComponent.hpp"
#include "BufferedExchangeComponent.hpp"
#include "Arp/Plc/Commons/Esm/ProgramComponentBase.hpp"
#include "BufferedExchangeLibrary.hpp"

namespace BufferedExchange {

BufferedExchangeComponent::BufferedExchangeComponent(IApplication &application,const String &name) :
ComponentBase(application,::BufferedExchange::BufferedExchangeLibrary::GetInstance(), name,ComponentCategory::Custom),
BufferedExchangeComponent::BufferedExchangeComponent(IApplication &application,
const String &name) :
ComponentBase(application,::BufferedExchange::BufferedExchangeLibrary::GetInstance(),name, ComponentCategory::Custom),
programProvider(*this),
ProgramComponentBase(::BufferedExchange::BufferedExchangeLibrary::GetInstance().GetNamespace(), programProvider),
ProgramComponentBase(::BufferedExchange::BufferedExchangeLibrary::GetInstance().GetNamespace(), programProvider),
//
delegateThread(ThreadSettings("-DelegateThread", 20, 0, 0), Arp::make_delegate(&wD, &MyWorker::Run)),
//
staticThread(ThreadSettings("-StaticThread", 20, 0, 0), &MyWorker::RunStatic,(void*)&wS),
//
workerThread(make_delegate(&wT, &MyWorker::RunSingle), 1000,"-WorkerThread")
{
log.Info("------------------- BufferedExchangeComponent Constructor");
delegateThread(ThreadSettings("-DelegateThread", 0, 0, 0), Arp::make_delegate(&wD, &BufferedExchange::MyWorker<long int>::Run)) {
log.Info("-------------------BufferedExchangeComponent Constructor");
}

void BufferedExchangeComponent::Initialize() {
Expand All @@ -40,52 36,52 @@ void BufferedExchangeComponent::SetupConfig() {
void BufferedExchangeComponent::ResetConfig() {
// never remove next line
ProgramComponentBase::ResetConfig();
log.Info("--- ResetConfig");
log.Info("---------------- ResetConfig");

// implement this inverse to SetupConfig() and LoadConfig()
}

void BufferedExchangeComponent::Start(void) {
log.Info("--- Start");
log.Info("---------------- Start");
try {
wT.Stop = false;
workerThread.Start();

wS.Stop = false;
staticThread.Start();

wD.Stop = false;
delegateThread.Start();
} catch (Exception &e) {
log.Error("--- Error thread start:{0}", e.GetMessage());
log.Error("---------------- Error thread start:{0}", e.GetMessage());
}
}

void BufferedExchangeComponent::Stop(void) {
log.Info("--- Stop:");
log.Info("----------------Stop:");
try {
StopWT(wT, workerThread);
StopT(wD, delegateThread);
StopT(wS, staticThread);

} catch (Exception &e) {
log.Error("--- Error thread Stop:{0}", e.GetMessage());
log.Error("---------------- Error thread Stop:{0}", e.GetMessage());
}
}

void BufferedExchangeComponent::StopWT(MyWorker &W, WorkerThread &T) {
log.Info("--- Thread:{0} Running:{1} ", "Worker", T.IsRunning());
template<typename S>
void BufferedExchangeComponent::StopWT(MyWorker<S> &W, WorkerThread &T) {
log.Info("---------------- Thread:{0} Running:{1} ", "Worker",
T.IsRunning());
W.Stop = true;
// Stopping WorkerThread synchronously.
T.Stop();
}
template<typename S>
void BufferedExchangeComponent::StopT(MyWorker<S> &W, Thread &T) {
log.Info("---------------- Thread:{0} Running:{1} Joinable:{2}",
T.GetName(), T.IsRunning(), T.IsJoinable());

void BufferedExchangeComponent::StopT(MyWorker &W, Thread &T) {
log.Info("--- Thread:{0} Running:{1} Joinable:{2} }",
T.GetName(), T.IsRunning(), T.IsJoinable());
// Stopping thread synchronously.
// Stopping thread loops
W.Stop = true;
T.Interrupt();

// If thread is still running after setting "Stop" call interrupt
// to ensure thread shutdown.
if (T.IsRunning()){
T.Interrupt();
}

// Wait for the thread to finish.
if (T.IsJoinable()) {
T.Join();
}
Expand Down
18 changes: 9 additions & 9 deletions Examples/BufferedExchange/src/BufferedExchangeComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 55,19 @@ class BufferedExchangeComponent: public ComponentBase,

private: // fields
BufferedExchangeComponentProgramProvider programProvider;

public:
void StopWT(MyWorker &W, WorkerThread &T);
void StopT(MyWorker &W, Thread &T);
public:
MyWorker wT;
MyWorker wS;
MyWorker wD;
MyWorker<long int> wD; // Thread execution environment
int RunCounter { 0 };
Thread delegateThread;


public:
WorkerThread workerThread;
Thread delegateThread;
Thread staticThread;
template<typename S>
void StopWT(MyWorker<S> &W, WorkerThread &T);

template<typename S>
void StopT(MyWorker<S> &W, Thread &T);
};

inline IComponent::Ptr BufferedExchangeComponent::Create(Arp::System::Acf::IApplication &application, const String &name) {
Expand Down
26 changes: 12 additions & 14 deletions Examples/BufferedExchange/src/BufferedExchangeProgram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 14,19 @@ namespace BufferedExchange
}
count ;

//Watch out blocking!
// if (bufferedExchangeComponent.wT.SetData(count))
//Watch out this is blocking!
if(!error_LastCycle || retry )
{
log.Warning("-------------- Instance:{1} DataLost: {0} ", count, this->GetFullName());
}

if (bufferedExchangeComponent.wS.SetData(count_d))
{
log.Warning("-------------- Instance:{1} DataLost: {0} ", count_d, this->GetFullName());
}

count_d ;
if (bufferedExchangeComponent.wD.SetData(count_d count))
{
log.Warning("-------------- Instance:{1} DataLost: {0} ", count_d count, this->GetFullName());
if (!bufferedExchangeComponent.wD.SetData( count))
{
// ensure the log does not get flooded.
if(!error_LastCycle){
log.Warning("-------------- Instance:{1} DataLost: {0} ", count, this->GetFullName());
error_LastCycle = true;
}
}else{
error_LastCycle = false;
}
}
}

Expand Down
41 changes: 23 additions & 18 deletions Examples/BufferedExchange/src/BufferedExchangeProgram.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 3,51 @@
#include "Arp/System/Core/Arp.h"
#include "Arp/Plc/Commons/Esm/ProgramBase.hpp"
#include "Arp/System/Commons/Logging.h"

namespace BufferedExchange
{

namespace BufferedExchange {

using namespace Arp;
using namespace Arp::System::Commons::Diagnostics::Logging;
using namespace Arp::Plc::Commons::Esm;

//#program
//#component(BufferedExchange::BufferedExchangeComponent)
class BufferedExchangeProgram : public ProgramBase, private Loggable<BufferedExchangeProgram>
{
class BufferedExchangeProgram: public ProgramBase, private Loggable<BufferedExchangeProgram> {
public: // typedefs

public: // construction/destruction
BufferedExchangeProgram(BufferedExchange::BufferedExchangeComponent& bufferedExchangeComponentArg, const String& name);
BufferedExchangeProgram(const BufferedExchangeProgram& arg) = delete;
virtual ~BufferedExchangeProgram() = default;
BufferedExchangeProgram(
BufferedExchange::BufferedExchangeComponent &bufferedExchangeComponentArg,
const String &name);
BufferedExchangeProgram(const BufferedExchangeProgram &arg) = delete;
virtual ~BufferedExchangeProgram() = default;

public: // operators
BufferedExchangeProgram& operator=(const BufferedExchangeProgram& arg) = delete;
BufferedExchangeProgram& operator=(const BufferedExchangeProgram &arg) = delete;

public: // properties

public: // operations
void Execute() override;
void Execute() override;

private: // fields
BufferedExchange::BufferedExchangeComponent& bufferedExchangeComponent;
int count{0};
long int count_d{0};

BufferedExchange::BufferedExchangeComponent &bufferedExchangeComponent;
long int count { 0 };
bool error_LastCycle { false };
public:
//#port
//#attributes(Input)
//#name(retry_sending)
boolean retry{false};
};

///////////////////////////////////////////////////////////////////////////////
// inline methods of class ProgramBase
inline BufferedExchangeProgram::BufferedExchangeProgram(BufferedExchange::BufferedExchangeComponent& bufferedExchangeComponentArg, const String& name)
: ProgramBase(name)
, bufferedExchangeComponent(bufferedExchangeComponentArg)
{
inline BufferedExchangeProgram::BufferedExchangeProgram(
BufferedExchange::BufferedExchangeComponent &bufferedExchangeComponentArg,
const String &name) :
ProgramBase(name),
bufferedExchangeComponent(bufferedExchangeComponentArg) {
}

} // end of namespace BufferedExchange
Loading

0 comments on commit c804f61

Please sign in to comment.