Loading...
Searching...
No Matches
piercedSync.cpp
1/*---------------------------------------------------------------------------*\
2 *
3 * bitpit
4 *
5 * Copyright (C) 2015-2021 OPTIMAD engineering Srl
6 *
7 * -------------------------------------------------------------------------
8 * License
9 * This file is part of bitpit.
10 *
11 * bitpit is free software: you can redistribute it and/or modify it
12 * under the terms of the GNU Lesser General Public License v3 (LGPL)
13 * as published by the Free Software Foundation.
14 *
15 * bitpit is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
18 * License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with bitpit. If not, see <http://www.gnu.org/licenses/>.
22 *
23\*---------------------------------------------------------------------------*/
24
25#include "bitpit_common.hpp"
26
27#include "piercedSync.hpp"
28
29namespace bitpit {
30
42 : type(_type), data(nullptr)
43{
44}
45
50{
51 // Copy the items that need allocation
52 std::unique_ptr<std::vector<std::size_t>> new_data;
53 if (other.data) {
54 new_data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(*(other.data)));
55 }
56
57 // Assign the new memory to the object
58 type = other.type;
59
60 for (std::size_t k = 0; k < INFO_COUNT; ++k) {
61 info[k] = other.info[k];
62 }
63
64 data.swap(new_data);
65}
66
74{
75 if (this != &other) {
76 PiercedSyncAction temporary(other);
77 temporary.swap(*this);
78 }
79
80 return *this;
81}
82
89{
90 std::swap(type, other.type);
91 std::swap(info, other.info);
92 data.swap(other.data);
93}
94
100void PiercedSyncAction::importData(std::vector<std::size_t> &&values)
101{
102 data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(std::move(values)));
103}
104
110void PiercedSyncAction::importData(const std::vector<std::size_t> &values)
111{
112 data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(values));
113}
114
120void PiercedSyncAction::restore(std::istream &stream)
121{
122 int actionType;
123 utils::binary::read(stream, actionType);
124 type = static_cast<ActionType>(actionType);
125
126 for (int k = 0; k < INFO_COUNT; ++k) {
127 utils::binary::read(stream, info[k]);
128 }
129
130 bool hasData;
131 utils::binary::read(stream, hasData);
132
133 if (hasData) {
134 std::size_t dataSize;
135 utils::binary::read(stream, dataSize);
136 data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(dataSize));
137 for (std::size_t k = 0; k < dataSize; ++k) {
138 utils::binary::read(stream, (*data)[k]);
139 }
140 } else {
141 data.reset();
142 }
143}
144
150void PiercedSyncAction::dump(std::ostream &stream) const
151{
152 utils::binary::write(stream, static_cast<int>(type));
153
154 for (int k = 0; k < INFO_COUNT; ++k) {
155 utils::binary::write(stream, info[k]);
156 }
157
158 bool hasData = (data != nullptr);
159 utils::binary::write(stream, hasData);
160
161 if (hasData) {
162 std::size_t dataSize = data->size();
163 utils::binary::write(stream, dataSize);
164 for (std::size_t k = 0; k < dataSize; ++k) {
165 utils::binary::write(stream, (*data)[k]);
166 }
167 }
168}
169
184
198{
199 BITPIT_UNUSED(other);
200}
201
214 : m_syncEnabled(false)
215{
216 for (int k = SYNC_MODE_ITR_BEGIN; k != SYNC_MODE_ITR_END; ++k) {
217 SyncMode mode = static_cast<SyncMode>(k);
218 m_syncGroups[mode] = SyncGroup();
219 }
220}
221
235{
236 std::swap(other.m_slaves, m_slaves);
237 std::swap(other.m_syncGroups, m_syncGroups);
238 std::swap(other.m_syncJournal, m_syncJournal);
239}
240
248{
249 if (m_slaves.count(slave) > 0) {
250 throw std::out_of_range("Slave is already registered");
251 }
252
253 // Register the slave
254 m_slaves.insert({slave, syncMode});
255
256 // Add the slave to the proper synchronization group
257 SyncGroup &syncGroup = m_syncGroups.at(syncMode);
258 syncGroup.push_back(slave);
259
260 // If needed enable synchronization
261 if (!isSyncEnabled() && syncMode != SYNC_MODE_DISABLED) {
262 setSyncEnabled(true);
263 }
264}
265
272{
273 if (!isSyncEnabled()) {
274 return;
275 }
276
277 // Synchronize concurrent slaves
278 for (PiercedSyncSlave *slave : m_syncGroups.at(PiercedSyncMaster::SYNC_MODE_CONCURRENT)) {
279 commitSyncAction(slave, action);
280 }
281
282 // If there are journaled slaves the synchronization action needs to be
283 // journaled
284 if (m_syncGroups.at(PiercedSyncMaster::SYNC_MODE_JOURNALED).size() > 0) {
285 journalSyncAction(action);
286 }
287}
288
295void PiercedSyncMaster::commitSyncAction(PiercedSyncSlave *slave, const PiercedSyncAction &action) const
296{
297 slave->commitSyncAction(action);
298}
299
305void PiercedSyncMaster::journalSyncAction(const PiercedSyncAction &action)
306{
307 // Get information about previous action
308 PiercedSyncAction *previousAction = nullptr;
309 if (!m_syncJournal.empty()) {
310 previousAction = &(m_syncJournal.back());
311 }
312
313 PiercedSyncAction::ActionType previousActionType = PiercedSyncAction::TYPE_UNDEFINED;
314 if (previousAction) {
315 previousActionType = previousAction->type;
316 }
317
318 // Journal current action
319 //
320 // If possible, current action will be merged with the previous one.
321 switch (action.type) {
322
323 case PiercedSyncAction::TYPE_CLEAR:
324 m_syncJournal.resize(1);
325 m_syncJournal[0] = action;
326 break;
327
328 case PiercedSyncAction::TYPE_APPEND:
329 if (previousActionType == PiercedSyncAction::TYPE_APPEND) {
330 previousAction->type = PiercedSyncAction::TYPE_RESIZE;
331 previousAction->info[PiercedSyncAction::INFO_SIZE] = action.info[PiercedSyncAction::INFO_POS] + 1;
332 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
333 } else if (previousActionType == PiercedSyncAction::TYPE_RESIZE) {
334 previousAction->info[PiercedSyncAction::INFO_SIZE] = action.info[PiercedSyncAction::INFO_POS] + 1;
335 } else {
336 m_syncJournal.push_back(action);
337 }
338 break;
339
340 case PiercedSyncAction::TYPE_RESIZE:
341 if (previousActionType == PiercedSyncAction::TYPE_APPEND) {
342 previousAction->type = PiercedSyncAction::TYPE_RESIZE;
343 previousAction->info[PiercedSyncAction::INFO_SIZE] = action.info[PiercedSyncAction::INFO_SIZE];
344 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
345 } else if (previousActionType == PiercedSyncAction::TYPE_RESIZE) {
346 previousAction->info[PiercedSyncAction::INFO_SIZE] = action.info[PiercedSyncAction::INFO_SIZE];
347 } else {
348 m_syncJournal.push_back(action);
349 }
350 break;
351
352 case PiercedSyncAction::TYPE_PIERCE:
353 if (previousActionType == PiercedSyncAction::TYPE_PIERCE) {
354 previousAction->type = PiercedSyncAction::TYPE_PIERCE_MULTIPLE;
355 previousAction->data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(2));
356 (*(previousAction->data))[0] = previousAction->info[PiercedSyncAction::INFO_POS];
357 (*(previousAction->data))[1] = action.info[PiercedSyncAction::INFO_POS];
358 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
359 previousAction->info[PiercedSyncAction::INFO_POS_NEXT] = std::numeric_limits<std::size_t>::max();
360 } else if (previousActionType == PiercedSyncAction::TYPE_PIERCE_MULTIPLE) {
361 previousAction->data->push_back(action.info[PiercedSyncAction::INFO_POS]);
362 } else {
363 m_syncJournal.push_back(action);
364 }
365 break;
366
367 case PiercedSyncAction::TYPE_PIERCE_MULTIPLE:
368 if (previousActionType == PiercedSyncAction::TYPE_PIERCE) {
369 previousAction->type = PiercedSyncAction::TYPE_PIERCE_MULTIPLE;
370 previousAction->data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(action.data->size() + 1));
371 (*(previousAction->data))[0] = previousAction->info[PiercedSyncAction::INFO_POS];
372 previousAction->data->insert(previousAction->data->begin() + 1, action.data->begin(), action.data->end());
373 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
374 previousAction->info[PiercedSyncAction::INFO_POS_NEXT] = std::numeric_limits<std::size_t>::max();
375 } else if (previousActionType == PiercedSyncAction::TYPE_PIERCE_MULTIPLE) {
376 previousAction->data->insert(previousAction->data->begin(), action.data->begin(), action.data->end());
377 } else {
378 m_syncJournal.push_back(action);
379 }
380 break;
381
382 case PiercedSyncAction::TYPE_OVERWRITE:
383 if (previousActionType == PiercedSyncAction::TYPE_OVERWRITE) {
384 previousAction->type = PiercedSyncAction::TYPE_OVERWRITE_MULTIPLE;
385 previousAction->data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(2));
386 (*(previousAction->data))[0] = previousAction->info[PiercedSyncAction::INFO_POS];
387 (*(previousAction->data))[1] = action.info[PiercedSyncAction::INFO_POS];
388 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
389 } else if (previousActionType == PiercedSyncAction::TYPE_OVERWRITE_MULTIPLE) {
390 previousAction->data->push_back(action.info[PiercedSyncAction::INFO_POS]);
391 } else {
392 m_syncJournal.push_back(action);
393 }
394 break;
395
396 case PiercedSyncAction::TYPE_OVERWRITE_MULTIPLE:
397 if (previousActionType == PiercedSyncAction::TYPE_OVERWRITE) {
398 previousAction->type = PiercedSyncAction::TYPE_OVERWRITE_MULTIPLE;
399 previousAction->data = std::unique_ptr<std::vector<std::size_t>>(new std::vector<std::size_t>(action.data->size() + 1));
400 (*(previousAction->data))[0] = previousAction->info[PiercedSyncAction::INFO_POS];
401 previousAction->data->insert(previousAction->data->begin() + 1, action.data->begin(), action.data->end());
402 previousAction->info[PiercedSyncAction::INFO_POS] = std::numeric_limits<std::size_t>::max();
403 } else if (previousActionType == PiercedSyncAction::TYPE_OVERWRITE_MULTIPLE) {
404 previousAction->data->insert(previousAction->data->begin(), action.data->begin(), action.data->end());
405 } else {
406 m_syncJournal.push_back(action);
407 }
408 break;
409
410 case PiercedSyncAction::TYPE_NOOP:
411 break;
412
413 default:
414 m_syncJournal.push_back(action);
415 break;
416
417 }
418}
419
426{
427 // Check if the slave was actually registered
428 if (!isSlaveRegistered(slave)) {
429 return;
430 }
431
432 // Remove the slave from the synchronization group
433 SyncMode syncMode = getSlaveSyncMode(slave);
434 SyncGroup &syncGroup = m_syncGroups.at(syncMode);
435 for (auto itr = syncGroup.begin(); itr != syncGroup.end(); ++itr) {
436 PiercedSyncSlave *item = *itr;
437 if (item == slave) {
438 syncGroup.erase(itr);
439 break;
440 }
441 }
442
443 // Unregister the slave
444 m_slaves.erase(const_cast<PiercedSyncSlave *>(slave));
445}
446
453{
454 return (m_slaves.count(const_cast<PiercedSyncSlave *>(slave)) > 0);
455}
456
464{
465 return m_slaves.at(const_cast<PiercedSyncSlave *>(slave));
466}
467
478{
479 SyncMode syncMode = getSlaveSyncMode(slave);
480 if (syncMode == SYNC_MODE_CONCURRENT) {
481 return true;
482 } else if (syncMode == SYNC_MODE_JOURNALED) {
483 return m_syncJournal.empty();
484 } else {
485 return false;
486 }
487}
488
493{
494 // Only journaled slaved need to be synchronized
495 for (PiercedSyncSlave *slave : m_syncGroups.at(SYNC_MODE_JOURNALED)) {
496 for (const PiercedSyncAction &action : m_syncJournal) {
497 commitSyncAction(slave, action);
498 }
499 }
500
501 // Clear the sync journal
502 m_syncJournal.clear();
503 m_syncJournal.shrink_to_fit();
504}
505
512{
513 for (const PiercedSyncSlave *slave : m_syncGroups.at(SYNC_MODE_JOURNALED)) {
514 if (!isSlaveSynced(slave)) {
515 return false;
516 }
517 }
518
519 return true;
520}
521
528void PiercedSyncMaster::setSyncEnabled(bool enabled) const
529{
530 m_syncEnabled = enabled;
531}
532
540{
541 return m_syncEnabled;
542}
543
549void PiercedSyncMaster::restore(std::istream &stream)
550{
551 std::size_t journalSize;
552 utils::binary::read(stream, journalSize);
553 m_syncJournal.resize(journalSize);
554 for (PiercedSyncAction &action : m_syncJournal) {
555 action.restore(stream);
556 }
557}
558
564void PiercedSyncMaster::dump(std::ostream &stream) const
565{
566 std::size_t journalSize = m_syncJournal.size();
567 utils::binary::write(stream, journalSize);
568 for (const PiercedSyncAction &action : m_syncJournal) {
569 action.dump(stream);
570 }
571}
572
573}
Action for pierced synchronization.
PiercedSyncAction(ActionType _type=TYPE_UNDEFINED)
void restore(std::istream &stream)
void swap(PiercedSyncAction &other) noexcept
void importData(std::vector< std::size_t > &&values)
void dump(std::ostream &stream) const
PiercedSyncAction & operator=(const PiercedSyncAction &other)
Base class for defining an object that acts like a master in pierced synchronization.
void restore(std::istream &stream)
PiercedSyncMaster::SyncMode getSlaveSyncMode(const PiercedSyncSlave *slave) const
void setSyncEnabled(bool enabled) const
std::vector< PiercedSyncSlave * > SyncGroup
void unregisterSlave(const PiercedSyncSlave *slave) const
void swap(PiercedSyncMaster &other) noexcept
bool isSlaveRegistered(const PiercedSyncSlave *slave) const
void registerSlave(PiercedSyncSlave *slave, PiercedSyncMaster::SyncMode syncMode) const
void processSyncAction(const PiercedSyncAction &action)
bool isSlaveSynced(const PiercedSyncSlave *slave) const
void dump(std::ostream &stream) const
std::unordered_map< PiercedSyncSlave *, SyncMode > m_slaves
Base class for defining an object that acts like a slave in pierced synchronization.
void swap(PiercedSyncSlave &other) noexcept
void write(std::ostream &stream, const std::vector< bool > &container)
void read(std::istream &stream, std::vector< bool > &container)
#define BITPIT_UNUSED(variable)
Definition compiler.hpp:63
--- layout: doxygen_footer ---