PDI 1.8.0-alpha.2024-08-11

the PDI data interface

How to create a plugin

Plugin class

Plugin is a class, that handles shared data and triggered events to perform an I/O operations. It is dynamically linked to the user program by PDI. Behavior of each plugin is defined by specification tree in plugins subtree.

plugins:
example:
...

The given example will load example plugin and pass its subtree to the plugin's constructor.

The plugin has to inherit from PDI::Plugin and have a constructor with arguments PDI::Context& and PC_tree_t.

Example plugin

Example of the simplest plugin, that does nothing:

#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{}
}
PDI_PLUGIN(example)
Definition: context.h:44
The class PDI plugins should implement.
Definition: plugin.h:43

Adding a callback

PDI::Context has a member with all the functions (callbacks) that are called when user is sharing the data (calls PDI_share). To add a new function to the plugin must call callbacks().add_data_callback.

std::function<void()> add_data_callback(const std::function<void(const std::string&, Ref)>& callback, const std::string& name = {}))

The first argument is the function to be called when user shares the data. If the second parameter (name) is given, the function will be called only on a specified data name. Returns a function that removes the callback from PDI::Context container.

Example of adding new callback:

#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
#include <pdi/ref_any.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
ctx.callbacks().add_data_callback([](const std::string& data_name, PDI::Ref ref){
std::cout << "User has shared a data named " << data_name << std::endl;
});
}
}
PDI_PLUGIN(example)
std::function< void()> add_data_callback(const std::function< void(const std::string &, Ref)> &callback, const std::string &name={})
Adds new data callback to context.
virtual Callbacks & callbacks()=0
Callbacks of the context.
A dynamically typed reference to data with automatic memory management and read/write locking semanti...
Definition: ref_any.h:248

If user create specification tree:

data:
some_integer: int
plugins:
example: ~

And a program:

#include <pdi.h>
int main()
{
PDI_init(PC_parse_path("spec_tree.yaml"));
int some_integer = 0;
PDI_expose("some_integer", &some_integer, PDI_OUT);
return 0;
}
PDI_status_t PDI_expose(const char *name, void *data, PDI_inout_t access)
Shortly exposes some data to PDI.
@ PDI_OUT
data transfer from the main code to PDI
Definition: pdi.h:187
PDI_status_t PDI_init(PC_tree_t conf)
Initializes PDI.
PDI_status_t PDI_finalize(void)
Finalizes PDI.

The console will display:

[PDI][13:42:41] *** info: Initialization successful
User has shared a data named some_integer
[PDI][13:42:42] *** info: Finalization

Reading and writing data

Example of reading and writing data:

#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
#include <pdi/ref_any.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
ctx.callbacks().add_data_callback([](const std::string& data_name, PDI::Ref ref){
if(PDI::Ref_rw ref_rw{ref}) {
//Plugin can read and write
int* some_integer = ref_rw.get();
} else if(PDI::Ref_r ref_r{ref}) {
//Plugin can read
const int* some_integer = ref_r.get();
} else if(PDI::Ref_w ref_w{ref}) {
//Plugin can write
int* some_integer = ref_w.get();
} else {
//Plugin cannot read nor write
}
});
}
}
PDI_PLUGIN(example)

Handling events

#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
#include <pdi/ref_any.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
ctx.callbacks().add_event_callback([this](const std::string& event_name){
this->handle_event(event_name);
});
ctx.callbacks().add_event_callback([this](const std::string& event_name){
this->handle_special_event(event_name);
}, "special_event");
}
private:
void handle_event(const std::string& event_name) {
std::cout << "Event" << event_name << "called." << std::endl;
}
void handle_special_event(const std::string& event_name) {
std::cout << "Special event `" << event_name << "' called." << std::endl;
}
}
PDI_PLUGIN(example)
std::function< void()> add_event_callback(const std::function< void(const std::string &)> &callback, const std::string &name={})
Adds new event callback to context.

Reading scalar and array from specification tree

Specification tree:

plugins:
example:
scalar: some_string
array: [0, 1, 2]

Reading a scalar and an array:

#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
#include <pdi/ref_any.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
//scalar
PC_tree_t scalar_tree = PC_get(spec_tree, ".scalar");
std::string string_scalar = PDI::to_string(scalar_tree);
//array
PC_tree_t array_tree = PC_get(spec_tree, ".array");
int array_size = PDI::len(array_tree);
std::vector<long> array;
for (int i = 0; i < array_size; i++) {
PC_tree_t array_element = PC_get(array_tree, "[%d]", i);
array.emplace_back(PDI::to_long(array_element));
}
}
}
PDI_PLUGIN(example)
int len(PC_tree_t tree)
Returns the length of a node.
long to_long(PC_tree_t tree)
Returns the int value of a scalar node.
std::string to_string(PC_tree_t tree)
Returns the string content of a scalar node.

Reading maps from specification tree

Specification tree:

plugins:
example:
custom_subtree:
here: 0
can: 1
be: 2
any: 3
subtree: 4

Reading a scalar and an array:

#include <unordered_map>
#include <pdi/context.h>
#include <pdi/paraconf_wrapper.h>
#include <pdi/plugin.h>
#include <pdi/ref_any.h>
struct example_plugin : public PDI::Plugin
{
example_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
PC_tree_t subtree = PC_get(spec_tree, ".custom_subtree");
int subtree_size = PDI::len(subtree);
std::unordered_map<std::string, long> custom_map;
for (int i = 0; i < subtree_size; i++) {
PC_tree_t key = PC_get(subtree, "{%d}", i);
PC_tree_t value = PC_get(subtree, "<%d>", i);
custom_map.emplace_back(PDI::to_string(key), PDI::to_long(value));
}
}
}
PDI_PLUGIN(example)

Creating a true plugin: POSIX plugin

Step 1: Think what your plugin will be for.

Simple checkpointing. Each data will be saved in separate file. User can check the status of all files at once and then recover the data.

Step 2: Prepare your specification tree schema.

data:
some_data: {type: array, subtype: int, size: 64}
can_recover_data: int
plugins:
posix:
data:
some_data: /file_path/
can_recover_all: can_recover_data

/file_path/ is a path where to save and load from some_data. can_recover_data is a flag that indicates if the recover is possible.

The fastest way to learn is by examples. To show how to create a plugin, we will create "posix plugin". It won't do anything special, but give you a basic knowledge how to create one.

Step 3: Write your plugin.

Members and constructor:

class posix_plugin : public PDI::Plugin
{
std::string m_can_recover_data;
std::unordered_map<std::string, std::string> m_data_to_path_map;
public:
posix_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{}
}
PDI_PLUGIN(posix)

Read recover tree:

void read_recover_tree(PC_tree_t spec_tree) {
PC_tree_t recover_tree = PC_get(spec_tree, ".can_recover_all");
if(!PC_status(recover_tree)) {
m_can_recover_data = PDI::to_string(recover_tree);
}
}

Read data tree:

void read_data_tree(PC_tree_t spec_tree) {
PC_tree_t data_tree = PC_get(spec_tree, ".data");
if (!PC_status(data_tree)) {
int data_tree_size = PDI::len(data_tree);
for (int i = 0; i < data_tree_size; i++) {
PC_tree_t key = PC_get(data_tree, "{%d}", i);
PC_tree_t value = PC_get(data_tree, "<%d>", i);
m_data_to_path_map.emplace_back(PDI::to_string(key), PDI::to_string(value));
}
}
}

Create a function that writes data to temporary file and check if file was created, size is correct and then replace old file:

void write_data(const std::string& data_name, PDI::Ref_r ref_r) {
if(!ref_r) {
return;
}
std::string tmp_path = m_data_to_path_map[data_name] + ".tmp";
std::ofstream file{tmp_path, std::ios::binary};
if (ref_r.type().buffersize() == ref_r.type().datasize()) {
//dense data
file.write((const char*)ref_r.get(), ref_r.type().buffersize());
} else {
//sparse data
std::unique_ptr<char> data_copy {new char[ref_r.type().dataSize()]};
ref_r.type().data_to_dense_copy(data_copy.get(), ref_r.get());
file.write(data_copy.get(), ref_r.type().datasize());
}
file.close();
//replace old file
struct stat status;
if (!stat(tmp_path.c_str(), &status) && status.st_size == ref_r.type().datasize()) {
if (!stat(m_data_to_path_map[data_name].c_str()) && std::remove(m_data_to_path_map[data_name].c_str())) {
throw PDI::Error {PDI_ERR_SYSTEM, "Cannot remove old file {}", m_data_to_path_map[data_name]};
}
if (std::rename(tmp_path.c_str(), m_data_to_path_map[data_name].c_str())) {
throw PDI::Error {PDI_ERR_SYSTEM, "Cannot rename temporary file {}", tmp_path};
}
} else {
throw PDI::Error {PDI_ERR_SYSTEM, "Data write not complete"};
}
}
Definition: error.h:42
ref_access_t< R, W > get() const
Offers access to the referenced raw data, throws on null references.
Definition: ref_any.h:481
Datatype_sptr type() const noexcept
accesses the type of the referenced raw data
@ PDI_ERR_SYSTEM
A system error occured (OS, etc.)
Definition: pdi.h:87

Create a function that reads data from file:

void read_data(const std::string& data_name, PDI::Ref_w ref_w) {
if(!ref_w) {
return;
}
std::ifstream file{m_data_to_path_map[data_name], std::ios::binary};
if (ref_w.type().buffersize() == ref_w.type().datasize()) {
//dense data
file.read((char*)ref_w.get(), ref_w.type().buffersize());
} else {
//sparse data
std::unique_ptr<char> data_copy {new char[ref_w.type().dataSize()]};
file.read(data_copy.get(), ref_w.type().datasize());
ref_w.type().data_from_dense_copy(ref_w.get(), data_copy.get());
}
}

Handle can_recover_all data:

void can_recover(const std::string& data_name, PDI::Ref_w ref_w) {
if (!ref_w) {
throw PDI::Error {PDI_ERR_RIGHT, "Cannot write to `can_recover_all' data"};
}
for (const auto& data_path_pair : m_data_to_path_map) {
struct stat status;
if (stat(data_path_pair.second.c_str(), &status)) {
*static_cast<int*>(ref_w.get()) = 0;
return;
}
}
*static_cast<int*>(ref_w.get()) = 1;
}
@ PDI_ERR_RIGHT
A conflict of onwership over a content has been raised.
Definition: pdi.h:93

Add created functions to callbacks:

class posix_plugin : public PDI::Plugin
{
std::string m_can_recover_data;
std::unordered_map<std::string, std::string> m_data_to_path_map;
public:
posix_plugin(PDI::Context& ctx, PC_tree_t spec_tree):
Plugin{ctx}
{
read_recover_tree(spec_tree);
read_data_tree(spec_tree);
for (const auto& data_path_pair : m_data_to_path_map) {
ctx.callbacks().add_data_callback([this](const std::string& data_name, PDI::Ref ref) {
this->write_data(data_name, ref);
}, data_path_pair.first);
ctx.callbacks().add_data_callback([this](const std::string& data_name, PDI::Ref ref) {
this->read_data(data_name, ref);
}, data_path_pair.first);
}
if (!m_can_recover_data.empty()) {
ctx.callbacks().add_data_callback([this](const std::string& data_name, PDI::Ref ref) {
this->can_recover(data_name, ref);
}, m_can_recover_all_data);
}
}
}

Next steps

  1. Compile: g++ posix.cxx -o libpdi_posix_plugin.so -lpdi -shared -fPIC -std=c++11
  2. Copy created file to path where dynamic linker can find it. For example: sudo cp libpdi_posix_plugin.so /usr/local/lib/
  3. Configure dynamic linker run-time bindings: sudo ldconfig
  4. Create program that uses posix plugin.
  5. Compile your test program: gcc example_use.cxx -o example_use -lpdi -lparaconf
  6. Run your test program: ./example_use

You can see example of the program that uses this plugin on these slides.