Skip to content

Commit

Permalink
initialization is parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
r-shf committed Nov 19, 2014
1 parent 6df330e commit 7d1e74a
Show file tree
Hide file tree
Showing 33 changed files with 286 additions and 228 deletions.
66 changes: 66 additions & 0 deletions core/Attribute.h
Original file line number Diff line number Diff line change
@@ -0,0 1,66 @@
/*
*
* Tiny Multimedia Framework
* Copyright (C) 2014 Arash Shafiei
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

#ifndef ATTRIBUTE_H_
#define ATTRIBUTE_H_

#include <map>

using namespace std;

class Attribute {
private:
map<string, string> props; /**< The map containing the message (key, value). */

public:
/*!
* Get the message value by passing the key
*
* \param key the key to retrieve the message value
*/
string getProp(const string & key) {
auto k = props.find(key);

if (k == props.end())
return "";

return props[key];
}

/*!
* Set the string message by key and value
*
* \param key the key of the message
* \param val the string value of the message
*
*/
template <typename T>
void setProp(const string & key, const T& val) {
string valstr = to_string(val);
props.insert(std::make_pair(key, valstr));
}

void setProp(const string & key, const string& val) {
props.insert(std::make_pair(key, val));
}

};

#endif /* ATTRIBUTE_H_ */
56 changes: 31 additions & 25 deletions core/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 23,18 @@

#include <iostream>

Filter::Filter(const string &name) {
Filter::Filter(const string &name) : realtime(false) {

this->name = name;
linked = 0;
inputFed = 0;
inMsg = 0;
inMsg = nullptr;
outMsg = new Message();

}

void Filter::setRealTime(bool rt) {
realtime = rt;
}

void Filter::log(std::string msg) {
io_lock->lock();
std::cout << name << ": " << msg << std::endl;
Expand Down Expand Up @@ -63,15 65,21 @@ void Filter::connectFilter(Filter * f) {
}
}

void Filter::setProp(const string & key, const string & val) {
props.emplace(this->name "::" key, val);
}
//void Filter::setProp(const string & key, const string & val) {
// props.emplace(this->name "::" key, val);
//}

string Filter::getProp(const string & key) {
return props[this->name "::" key];
//string Filter::getProp(const string & key) {
// return props[this->name "::" key];
//}

FilterStatus Filter::initFilter() {
FilterStatus status = FILTER_SUCCESS;
status = init();
return status;
}

FilterStatus Filter::run() {
FilterStatus Filter::runFilter() {
FilterStatus status = FILTER_SUCCESS;

while(status != FILTER_FINISHED) {
Expand All @@ -81,7 89,7 @@ FilterStatus Filter::run() {
//}

//inputFed = 0;
status = process();
status = realtime? runRT() : run();
}
return status;
}
Expand Down Expand Up @@ -109,17 117,14 @@ FilterStatus Filter::run() {
* }
*/

void Filter::increaseLinked() {
linked ;
}

int Filter::inputPortNum() {
return inputPorts.size();
}
//int Filter::inputPortNum() {
// return inputPorts.size();
//}

int Filter::outputPortNum() {
return outputPorts.size();
}
//int Filter::outputPortNum() {
// return outputPorts.size();
//}

/* void Filter::processNextFilters(Port * p) {
* vector<Filter*> * nextFilters = getNextFilters(p);
Expand Down Expand Up @@ -169,17 174,18 @@ int Filter::outputPortNum() {
*
* }
*/

void Filter::start() {
t = new thread(&Filter::run, this);
void Filter::startInit() {
t = new thread(&Filter::initFilter, this);
}

void Filter::startRT() {
t = new thread(&Filter::run, this);
void Filter::startRun() {
t = new thread(&Filter::runFilter, this);
}

void Filter::wait() {
t->join();

delete t;
}

void Filter::setIOLock(mutex * mux) {
Expand Down
79 changes: 33 additions & 46 deletions core/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 21,7 @@
#ifndef FITLER_H_
#define FITLER_H_

#include "core/Attribute.h"
#include "core/Message.h"
#include "core/Port.h"
#include <string>
Expand All @@ -41,7 42,6 @@ enum FilterStatus {
FILTER_SUCCESS, /**< Filter processed successfully. */
FILTER_ERROR, /**< An error occurred while processing. */
FILTER_FINISHED, /**< Filter is done generating more data. Used in data sources. */
FILTER_WAIT_FOR_INPUT /**< Filter is waiting for input. */
};

/*!
Expand All @@ -53,13 53,14 @@ enum FilterStatus {

class Filter {
private:

thread * t;
mutex * io_lock;

string name; /**< The name f the filter */
int linked; /**< The number of filters which are connected to this filter */
int inputFed; /**< The number of data which are already fed to the filter */
map<string, string> props; /**< A map containing the message keys and values transfered to filter from a pipeline */
thread * t;
//map<Port*, vector<Filter*>*> nextFilters; /**< A map containing the next filters based on one port. */
Attribute attr; /**< A map containing the message keys and values transfered to filter from a pipeline */

bool realtime;

protected:
Message * inMsg; /**< Input message of the filter */
Expand All @@ -82,25 83,19 @@ class Filter {
* Virtual function, to be implemented in the subclass filters.
* Read data from input filter, process the data, and write the result to the output port.
*/
virtual FilterStatus process() = 0;
virtual FilterStatus processRT() {
return process();
virtual FilterStatus init() {
return FILTER_SUCCESS;
}
virtual FilterStatus run() = 0;
virtual FilterStatus runRT() {
return run();
}

//void initNextFilters(Port *p, Message * msg);
//void addNextFilter(Port * p, Filter *f);
public:

//vector<Filter*> * getNextFilters(Port *);

public:

/*!
* Perform initialization of the filter.
* To be overridden in subclasses to allow initialization of specific filter values.
*/
virtual FilterStatus init() {
return FILTER_SUCCESS;
}
void setRealTime(bool rt);

/*!
* Set a property of the filter.
Expand All @@ -110,15 105,20 @@ class Filter {
* \param val
* The property value.
*/
void setProp(const string & key, const string & val);

template<typename T>
void setProp(const string & key, const T& val) {
attr.setProp(key, val);
}
/*!
* Get the value of a filter property.
*
* \param key
* The property name.
*/
string getProp(const string & key);

string&& getProp(const string & key) {
return move(attr.getProp(key));
}

/*!
* Connect this filter to another filter in the pipeline.
Expand All @@ -129,52 129,39 @@ class Filter {
*/
void connectFilter(Filter * f);

/*!
* Execute the processing of this filter.
* The filters are connected by a link list and each filter calls executeFilter of the next filter.
*
* \return The new status of the filter.
*/
FilterStatus run();

/*!
* Execute the init of this filter.
* The filters are connected by a link list and each filter calls initFilter of the next filter.
*
* \return The new status of the filter.
*/
//FilterStatus initFilter(Message * msg);
FilterStatus initFilter();

/*!
* Increase the number of the linked filters.
* Execute the processing of this filter.
* The filters are connected by a link list and each filter calls executeFilter of the next filter.
*
* \return The new status of the filter.
*/
void increaseLinked();
FilterStatus runFilter();

/*!
* Get the number of input ports.
*/
int inputPortNum();
//int inputPortNum();

/*!
* Get the number of output port.
*/
int outputPortNum();

void start();
void startInit();

void startRT();
void startRun();

void wait();

void setIOLock(mutex * mux);
/*!
*
* TODO
* Process all filters which are connected to a port
*
* \param p The output port
*/
//void processNextFilters(Port * p);

/*!
* Destructor of the filter.
Expand Down
15 changes: 9 additions & 6 deletions core/MediaBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 35,7 @@ const int TMF_BUFFER_SIZE = 4;
template <typename T>
class MediaBuffer {
protected:
MediaSample<T> * samples; /**< The array containing the samples */
MediaSample<T> * samples[TMF_BUFFER_SIZE]; /**< The array containing the samples */
int size; /**< The size of the buffer */

public:
Expand All @@ -47,13 47,14 @@ class MediaBuffer {
*/
MediaBuffer<T>(int s): size(s) {

samples = new MediaSample<T>[TMF_BUFFER_SIZE]();

for (int i=0; i<size; i ) {
samples[i] = new MediaSample<T>();
}
}

void addConsumer() {
for (int i=0; i<size; i ) {
samples[i].addConsumer();
samples[i]->addConsumer();
}
}

Expand All @@ -70,14 71,16 @@ class MediaBuffer {
* \param idx the number of the element
* \return the element number idx
*/
MediaSample<T>& at(int idx) const { return samples[idx]; }
MediaSample<T>* at(int idx) const { return samples[idx]; }

/*!
* Buffer destructor
*
*/
~MediaBuffer<T>() {
delete samples;
for (int i=0; i<size; i ) {
delete samples[i];
}
}

};
Expand Down
Loading

0 comments on commit 7d1e74a

Please sign in to comment.