AERA
mem.cpp
1 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
2 //_/_/
3 //_/_/ AERA
4 //_/_/ Autocatalytic Endogenous Reflective Architecture
5 //_/_/
6 //_/_/ Copyright (c) 2018-2025 Jeff Thompson
7 //_/_/ Copyright (c) 2018-2025 Kristinn R. Thorisson
8 //_/_/ Copyright (c) 2018-2025 Icelandic Institute for Intelligent Machines
9 //_/_/ http://www.iiim.is
10 //_/_/
11 //_/_/ Copyright (c) 2010-2012 Eric Nivel
12 //_/_/ Copyright (c) 2010 Nathaniel Thurston
13 //_/_/ Center for Analysis and Design of Intelligent Agents
14 //_/_/ Reykjavik University, Menntavegur 1, 102 Reykjavik, Iceland
15 //_/_/ http://cadia.ru.is
16 //_/_/
17 //_/_/ Part of this software was developed by Eric Nivel
18 //_/_/ in the HUMANOBS EU research project, which included
19 //_/_/ the following parties:
20 //_/_/
21 //_/_/ Autonomous Systems Laboratory
22 //_/_/ Technical University of Madrid, Spain
23 //_/_/ http://www.aslab.org/
24 //_/_/
25 //_/_/ Communicative Machines
26 //_/_/ Edinburgh, United Kingdom
27 //_/_/ http://www.cmlabs.com/
28 //_/_/
29 //_/_/ Istituto Dalle Molle di Studi sull'Intelligenza Artificiale
30 //_/_/ University of Lugano and SUPSI, Switzerland
31 //_/_/ http://www.idsia.ch/
32 //_/_/
33 //_/_/ Institute of Cognitive Sciences and Technologies
34 //_/_/ Consiglio Nazionale delle Ricerche, Italy
35 //_/_/ http://www.istc.cnr.it/
36 //_/_/
37 //_/_/ Dipartimento di Ingegneria Informatica
38 //_/_/ University of Palermo, Italy
39 //_/_/ http://diid.unipa.it/roboticslab/
40 //_/_/
41 //_/_/
42 //_/_/ --- HUMANOBS Open-Source BSD License, with CADIA Clause v 1.0 ---
43 //_/_/
44 //_/_/ Redistribution and use in source and binary forms, with or without
45 //_/_/ modification, is permitted provided that the following conditions
46 //_/_/ are met:
47 //_/_/ - Redistributions of source code must retain the above copyright
48 //_/_/ and collaboration notice, this list of conditions and the
49 //_/_/ following disclaimer.
50 //_/_/ - Redistributions in binary form must reproduce the above copyright
51 //_/_/ notice, this list of conditions and the following disclaimer
52 //_/_/ in the documentation and/or other materials provided with
53 //_/_/ the distribution.
54 //_/_/
55 //_/_/ - Neither the name of its copyright holders nor the names of its
56 //_/_/ contributors may be used to endorse or promote products
57 //_/_/ derived from this software without specific prior
58 //_/_/ written permission.
59 //_/_/
60 //_/_/ - CADIA Clause: The license granted in and to the software
61 //_/_/ under this agreement is a limited-use license.
62 //_/_/ The software may not be used in furtherance of:
63 //_/_/ (i) intentionally causing bodily injury or severe emotional
64 //_/_/ distress to any person;
65 //_/_/ (ii) invading the personal privacy or violating the human
66 //_/_/ rights of any person; or
67 //_/_/ (iii) committing or preparing for any act of war.
68 //_/_/
69 //_/_/ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
70 //_/_/ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
71 //_/_/ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
72 //_/_/ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
73 //_/_/ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
74 //_/_/ CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
75 //_/_/ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
76 //_/_/ BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
77 //_/_/ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
78 //_/_/ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
79 //_/_/ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
80 //_/_/ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
81 //_/_/ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
82 //_/_/ OF SUCH DAMAGE.
83 //_/_/
84 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
85 
86 #include <algorithm>
87 #include "mem.h"
88 #include "mdl_controller.h"
89 #include "model_base.h"
90 
91 using namespace std;
92 using namespace std::chrono;
93 using namespace r_code;
94 
95 namespace r_exec {
96 
97 _Mem::_Mem() : r_code::Mem(),
98  state_(NOT_STARTED),
99  deleted_(false),
100  base_period_(50000),
101  reduction_core_count_(0),
102  time_core_count_(0),
103  mdl_inertia_sr_thr_(0.9),
104  mdl_inertia_cnt_thr_(6),
105  tpx_dsr_thr_(0.1),
106  min_sim_time_horizon_(0),
107  max_sim_time_horizon_(0),
108  sim_time_horizon_factor_(0.3),
109  tpx_time_horizon_(5000000),
110  perf_sampling_period_(250000),
111  float_tolerance_(0.00001),
112  time_tolerance_(10000),
113  primary_thz_(seconds(3600000)),
114  secondary_thz_(seconds(7200000)),
115  debug_(true),
116  ntf_mk_res_(1),
117  goal_pred_success_res_(1000),
118  keep_invalidated_objects_(false),
119  probe_level_(2),
120  reduction_cores_(0),
121  time_cores_(0),
122  reduction_job_count_(0),
123  time_job_count_(0),
124  time_job_avg_latency_(0),
125  _time_job_avg_latency_(0),
126  core_count_(0),
127  stop_sem_(0),
128  stdin_(0),
129  stdout_(0),
130  self_(0),
131  default_runtime_output_stream_(&std::cout)
132 {
133 
134  new ModelBase();
135  objects_.reserve(1024);
136  for (uint32 i = 0; i < RUNTIME_OUTPUT_STREAM_COUNT; ++i)
137  runtime_output_streams_[i] = NULL;
138 }
139 
140 _Mem::~_Mem() {
141 
142  for (uint32 i = 0; i < RUNTIME_OUTPUT_STREAM_COUNT; ++i)
143  if (runtime_output_streams_[i] != NULL)
144  delete runtime_output_streams_[i];
145 }
146 
147 void _Mem::init(microseconds base_period,
148  uint32 reduction_core_count,
149  uint32 time_core_count,
150  float32 mdl_inertia_sr_thr,
151  uint32 mdl_inertia_cnt_thr,
152  float32 tpx_dsr_thr,
153  microseconds min_sim_time_horizon,
154  microseconds max_sim_time_horizon,
155  float32 sim_time_horizon_factor,
156  microseconds tpx_time_horizon,
157  microseconds perf_sampling_period,
158  float32 float_tolerance,
159  microseconds time_tolerance,
160  microseconds primary_thz,
161  microseconds secondary_thz,
162  bool debug,
163  uint32 ntf_mk_res,
164  uint32 goal_pred_success_res,
165  uint32 probe_level,
166  uint32 traces,
167  bool keep_invalidated_objects) {
168 
169  base_period_ = base_period;
170 
171  reduction_core_count_ = reduction_core_count;
172  time_core_count_ = time_core_count;
173 
174  mdl_inertia_sr_thr_ = mdl_inertia_sr_thr;
175  mdl_inertia_cnt_thr_ = mdl_inertia_cnt_thr;
176  tpx_dsr_thr_ = tpx_dsr_thr;
177  min_sim_time_horizon_ = min_sim_time_horizon;
178  max_sim_time_horizon_ = max_sim_time_horizon;
179  sim_time_horizon_factor_ = sim_time_horizon_factor;
180  tpx_time_horizon_ = tpx_time_horizon;
181  perf_sampling_period_ = perf_sampling_period;
182  float_tolerance_ = float_tolerance;
183  time_tolerance_ = time_tolerance;
184  primary_thz_ = primary_thz;
185  secondary_thz_ = secondary_thz;
186 
187  debug_ = debug;
188  if (debug)
189  ntf_mk_res_ = ntf_mk_res;
190  else
191  ntf_mk_res_ = 1;
192  goal_pred_success_res_ = goal_pred_success_res;
193 
194  probe_level_ = probe_level;
195  keep_invalidated_objects_ = keep_invalidated_objects;
196 
197  reduction_job_count_ = time_job_count_ = 0;
198  reduction_job_avg_latency_ = _reduction_job_avg_latency_ = microseconds(0);
199  time_job_avg_latency_ = _time_job_avg_latency_ = microseconds(0);
200 
201  uint32 mask = 1;
202  for (uint32 i = 0; i < RUNTIME_OUTPUT_STREAM_COUNT; ++i) {
203 
204  if (traces & mask)
205  // NULL means Output() will use defaultDebugStream_ .
206  runtime_output_streams_[i] = NULL;
207  else
208  runtime_output_streams_[i] = new NullOStream();
209  mask <<= 1;
210  }
211 }
212 
213 std::ostream &_Mem::Output(TraceLevel l) {
214 
215  return (_Mem::Get()->runtime_output_streams_[l] == NULL ?
216  *_Mem::Get()->default_runtime_output_stream_ : *(_Mem::Get()->runtime_output_streams_[l]));
217 }
218 
219 // This is declared at the r_exec namespace level in overlay.h, so that all headers
220 // don't need to include mem.h.
221 std::ostream &_Mem_Output(TraceLevel l) { return _Mem::Output(l); }
222 
223 void _Mem::reset() {
224 
225  uint32 i;
226  for (i = 0; i < reduction_core_count_; ++i)
227  delete reduction_cores_[i];
228  delete[] reduction_cores_;
229  for (i = 0; i < time_core_count_; ++i)
230  delete time_cores_[i];
231  delete[] time_cores_;
232 
233  delete reduction_job_queue_;
234  delete time_job_queue_;
235 
236  delete stop_sem_;
237 }
238 
240 
241 Code *_Mem::get_root() const {
242 
243  return root_;
244 }
245 
246 Code *_Mem::get_stdin() const {
247 
248  return stdin_;
249 }
250 
251 Code *_Mem::get_stdout() const {
252 
253  return stdout_;
254 }
255 
256 Code *_Mem::get_self() const {
257 
258  return self_;
259 }
260 
262 
263 _Mem::State _Mem::check_state() {
264 
265  State s;
266  stateCS_.enter();
267  s = state_;
268  stateCS_.leave();
269 
270  return s;
271 }
272 
273 void _Mem::start_core() {
274 
275  core_countCS_.enter();
276  if (++core_count_ == 1)
277  stop_sem_->acquire();
278  core_countCS_.leave();
279 }
280 
281 void _Mem::shutdown_core() {
282 
283  core_countCS_.enter();
284  if (--core_count_ == 0)
285  stop_sem_->release();
286  core_countCS_.leave();
287 }
288 
290 
291 void _Mem::store(Code *object) {
292 
293  int32 location;
294  objects_.push_back(object, location);
295  object->set_strorage_index(location);
296 }
297 
298 bool _Mem::load(const vector<r_code::Code *> *objects, uint32 stdin_oid, uint32 stdout_oid, uint32 self_oid) { // no cov at init time.
299 
300  uint32 i;
301  reduction_cores_ = new ReductionCore *[reduction_core_count_];
302  for (i = 0; i < reduction_core_count_; ++i)
303  reduction_cores_[i] = new ReductionCore();
304  time_cores_ = new TimeCore *[time_core_count_];
305  for (i = 0; i < time_core_count_; ++i)
306  time_cores_[i] = new TimeCore();
307 
308  Utils::SetReferenceValues(base_period_, float_tolerance_, time_tolerance_);
309 
310  // load root (always comes first).
311  root_ = (Group *)(*objects)[0];
312  store((Code *)root_);
313  initial_groups_.push_back(root_);
314 
315  // Get the highest existing OID.
316  uint32 highest_oid = 0;
317  for (uint32 i = 0; i < objects->size(); ++i)
318  highest_oid = max(highest_oid, (*objects)[i]->get_oid());
319  set_last_oid(max(highest_oid, objects->size() - 1));
320 
321  for (uint32 i = 1; i < objects->size(); ++i) { // skip root as it has no initial views.
322 
323  Code *object = (*objects)[i];
324  store(object);
325 
326  if (object->get_oid() == stdin_oid)
327  stdin_ = (Group *)(*objects)[i];
328  else if (object->get_oid() == stdout_oid)
329  stdout_ = (Group *)(*objects)[i];
330  else if (object->get_oid() == self_oid)
331  self_ = (*objects)[i];
332 
333  switch (object->code(0).getDescriptor()) {
334  case Atom::MODEL:
335  if (Utils::has_reference(&object->code(0), HLP_FWD_GUARDS)) {
336  cerr << "ERROR: Illegal referenced object in forward guards of model OID " << object->get_oid() << endl;
337  return false;
338  }
339  if (Utils::has_reference(&object->code(0), HLP_BWD_GUARDS)) {
340  cerr << "ERROR: Illegal referenced object in backward guards of model OID " << object->get_oid() << endl;
341  return false;
342  }
343  unpack_hlp(object);
344  //object->add_reference(NULL); // classifier.
345  ModelBase::Get()->load(object);
346  break;
347  case Atom::COMPOSITE_STATE:
348  if (Utils::has_reference(&object->code(0), HLP_FWD_GUARDS)) {
349  cerr << "ERROR: Illegal referenced object in forward guards of cst OID " << object->get_oid() << endl;
350  return false;
351  }
352  if (Utils::has_reference(&object->code(0), HLP_BWD_GUARDS)) {
353  cerr << "ERROR: Illegal referenced object in backward guards of cst OID " << object->get_oid() << endl;
354  return false;
355  }
356  unpack_hlp(object);
357  break;
358  case Atom::INSTANTIATED_PROGRAM: // refine the opcode depending on the inputs and the program type.
359  if (object->get_reference(0)->code(0).asOpcode() == Opcodes::Pgm) {
360 
361  if (object->get_reference(0)->code(object->get_reference(0)->code(PGM_INPUTS).asIndex()).getAtomCount() == 0)
362  object->code(0) = Atom::InstantiatedInputLessProgram(object->code(0).asOpcode(), object->code(0).getAtomCount());
363  } else
364  object->code(0) = Atom::InstantiatedAntiProgram(object->code(0).asOpcode(), object->code(0).getAtomCount());
365  break;
366  }
367 
368  unordered_set<_View *, _View::Hash, _View::Equal>::const_iterator v;
369  for (v = object->views_.begin(); v != object->views_.end(); ++v) {
370 
371  // init hosts' member_set.
372  View *view = (View *)*v;
373  view->set_object(object);
374  Group *host = view->get_host();
375 
376  if (!host->load(view, object))
377  return false;
378  if (host == stdin_ && view->get_sync() == View::SYNC_AXIOM &&
379  (view->object_->code(0).asOpcode() == Opcodes::Fact ||
380  view->object_->code(0).asOpcode() == Opcodes::AntiFact))
381  // This is an axiom in the stdin group, so save for matches_axiom().
382  axiom_values_.push_back(view->object_->get_reference(0));
383  }
384 
385  if (object->code(0).getDescriptor() == Atom::GROUP)
386  initial_groups_.push_back((Group *)object); // convenience to create initial update jobs - see start().
387  }
388 
389  return true;
390 }
391 
399 static void update_timestamps(Timestamp time_reference, Atom* code, uint16 index) {
400  Atom atom = code[index];
401 
402  switch (atom.getDescriptor()) {
403  case Atom::TIMESTAMP: {
404  auto ts = Utils::GetTimestamp(code + index).time_since_epoch();
405  if (ts >= Utils_MaxTime - time_reference)
406  // Adding time_reference would overflow, so just set to the max time stamp.
407  Utils::SetTimestamp(code + index, Utils_MaxTime);
408  else
409  Utils::SetTimestamp(code + index, ts + time_reference);
410  break;
411  }
412  case Atom::I_PTR:
413  update_timestamps(time_reference, code, atom.asIndex());
414  break;
415  case Atom::C_PTR:
416  case Atom::SET:
417  case Atom::OBJECT:
418  case Atom::S_SET:
419  case Atom::MARKER:
420  case Atom::OPERATOR:
421  case Atom::GROUP: {
422  uint16 count = atom.getAtomCount();
423  for (uint16 i = 1; i <= count; ++i)
424  update_timestamps(time_reference, code, index + i);
425  break;
426  }
427  }
428 }
429 
430 void _Mem::init_timestamps(Timestamp time_reference, const r_code::list<P<Code>>& objects) {
431  for (auto o = objects.begin(); o != objects.end(); ++o)
432  update_timestamps(time_reference, &(*o)->code(0), 0);
433 }
434 
435 Timestamp _Mem::start() {
436 
437  if (state_ != STOPPED && state_ != NOT_STARTED)
438  return Timestamp(seconds(0));
439 
440  core_count_ = 0;
441  stop_sem_ = new Semaphore(1, 1);
442 
443  time_job_queue_ = new PipeNN<P<TimeJob>, 1024>();
444  reduction_job_queue_ = new PipeNN<P<_ReductionJob>, 1024>();
445 
446  vector<std::pair<View *, Group *> > initial_reduction_jobs;
447 
448  uint32 i;
449  auto now = Now();
450  Utils::SetTimeReference(now);
451  ModelBase::Get()->set_thz(secondary_thz_);
452  init_timestamps(now, objects_);
453 
454  for (i = 0; i < initial_groups_.size(); ++i) {
455 
456  Group *g = initial_groups_[i];
457  bool c_active = g->get_c_act() > g->get_c_act_thr();
458  bool c_salient = g->get_c_sln() > g->get_c_sln_thr();
459 
460  FOR_ALL_VIEWS_BEGIN(g, v)
461  Utils::SetTimestamp<View>(v->second, VIEW_IJT, now); // init injection time for the view.
462  FOR_ALL_VIEWS_END
463 
464  if (c_active) {
465 
466  unordered_map<uint32, P<View> >::const_iterator v;
467 
468  // build signaling jobs for active input-less overlays.
469  for (v = g->input_less_ipgm_views_.begin(); v != g->input_less_ipgm_views_.end(); ++v) {
470 
471  if (v->second->controller_ != NULL && v->second->controller_->is_activated()) {
472  P<TimeJob> j = new InputLessPGMSignalingJob(v->second, now + Utils::GetDuration<Code>(v->second->object_, IPGM_TSC));
473  time_job_queue_->push(j);
474  }
475  }
476 
477  // build signaling jobs for active anti-pgm overlays.
478  for (v = g->anti_ipgm_views_.begin(); v != g->anti_ipgm_views_.end(); ++v) {
479 
480  if (v->second->controller_ != NULL && v->second->controller_->is_activated()) {
481  P<TimeJob> j = new AntiPGMSignalingJob(v->second, now + Utils::GetDuration<Code>(v->second->object_, IPGM_TSC));
482  time_job_queue_->push(j);
483  }
484  }
485  }
486 
487  if (c_salient) {
488 
489  // build reduction jobs for each salient view and each active overlay - regardless of the view's sync mode.
490  FOR_ALL_VIEWS_BEGIN(g, v)
491 
492  if (v->second->get_sln() > g->get_sln_thr()) { // salient view.
493 
494  g->newly_salient_views_.insert(v->second);
495  initial_reduction_jobs.push_back(std::pair<View *, Group *>(v->second, g));
496  }
497  FOR_ALL_VIEWS_END
498  }
499 
500  if (g->get_upr() > 0) { // inject the next update job for the group.
501 
502  P<TimeJob> j = new UpdateJob(g, g->get_next_upr_time(now));
503  time_job_queue_->push(j);
504  }
505  }
506 
507  initial_groups_.clear();
508 
509  state_ = RUNNING;
510 
511  P<TimeJob> j = new PerfSamplingJob(now + perf_sampling_period_, perf_sampling_period_);
512  time_job_queue_->push(j);
513 
514  for (i = 0; i < reduction_core_count_; ++i)
515  reduction_cores_[i]->start(ReductionCore::Run);
516  for (i = 0; i < time_core_count_; ++i)
517  time_cores_[i]->start(TimeCore::Run);
518 
519  for (uint32 i = 0; i < initial_reduction_jobs.size(); ++i)
520  initial_reduction_jobs[i].second->inject_reduction_jobs(initial_reduction_jobs[i].first);
521 
522  return now;
523 }
524 
525 void _Mem::run_in_diagnostic_time(milliseconds run_time) {
526  if (!(reduction_core_count_ == 0 && time_core_count_ == 0))
527  // This should only be called if there are no running core threads.
528  return;
529 
530  DiagnosticTimeState diagnostic_time_state(this, run_time);
531  // Step until we reach the run_time.
532  while (diagnostic_time_state.step()) {}
533 }
534 
535 DiagnosticTimeState::DiagnosticTimeState(_Mem* mem, milliseconds run_time)
536  : mem_(mem),
537  run_time_(run_time),
538  n_reduction_jobs_this_sampling_period_(0),
539  reduction_job_queue_index_(0) {
540  tick_time_ = Now();
541  mem_->on_diagnostic_time_tick();
542  need_diagnostic_time_tick_ = false;
543  end_time_ = Now() + run_time_;
544  pass_number_ = 1;
545 }
546 
548  // The maximum number of reduction jobs to run before trying a time job.
549  // Average job time is 10us. 10000 jobs is 100000us, which is the sampling period.
550  // Assume 8 threads (on 4 cores), so allow 10000 * 8 = 80000 jobs per cycle.
551  const size_t max_reduction_jobs_per_cycle = 80000;
552 
553  if (mem_->get_state() == _Mem::STOPPED)
554  return false;
555 
556  // Reduction jobs can add more reduction jobs, so make a few passes.
557  if (pass_number_ <= 100) {
558  if (reduction_job_queue_index_ == 0) {
559  // We're ready to run the first reduction job for this pass.
560  // Transfer all reduction jobs to a local queue and run only these.
561  // Below, we only run one time job, so any extra jobs that these reduction
562  // jobs add will be run on the next pass after running the time job.
563  while (true) {
564  P<_ReductionJob> reduction_job = mem_->pop_reduction_job(false);
565  if (reduction_job == NULL)
566  // No more reduction jobs.
567  break;
568  reduction_job_queue_.push_back(reduction_job);
569  }
570  }
571 
572  size_t n_jobs_to_run = min(reduction_job_queue_.size(), max_reduction_jobs_per_cycle);
573  if (n_jobs_to_run > 0) {
574  if (reduction_job_queue_index_ < n_jobs_to_run) {
575  // Add breakpoint here to check which reduction job leads to the failure.
576  reduction_job_queue_[reduction_job_queue_index_]->update(Now());
577  reduction_job_queue_[reduction_job_queue_index_] = NULL;
578  ++reduction_job_queue_index_;
579  // Try the next reduction job.
580  return true;
581  }
582  // We have done all the reduction jobs for this pass, so reset.
583  reduction_job_queue_index_ = 0;
584 
585  n_reduction_jobs_this_sampling_period_ += n_jobs_to_run;
586 
587  if (reduction_job_queue_.size() > max_reduction_jobs_per_cycle)
588  // There are remaining jobs to be run. Shift them to the front.
589  reduction_job_queue_.erase(
590  reduction_job_queue_.begin(), reduction_job_queue_.begin() + max_reduction_jobs_per_cycle);
591  else
592  reduction_job_queue_.clear();
593 
594  // Make sure we haven't hit the limit of reduction jobs this sampling period.
595  if (n_reduction_jobs_this_sampling_period_ < max_reduction_jobs_per_cycle) {
596  ++pass_number_;
597  // Try the next pass.
598  return true;
599  }
600  }
601  }
602 
603  // We have done all the passes, so reset.
604  pass_number_ = 1;
605 
606  // Transfer all time jobs to ordered_time_job_queue_,
607  // sorted on target_time_.
608  while (true) {
609  P<TimeJob> time_job = mem_->pop_time_job(false);
610  if (time_job == NULL)
611  // No more time jobs.
612  break;
613 
614  ordered_time_job_queue_.insert(
615  upper_bound(ordered_time_job_queue_.begin(),
616  ordered_time_job_queue_.end(), time_job, time_job_compare_),
617  time_job);
618  }
619 
620  if (Now() >= end_time_)
621  // Finished.
622  return false;
623 
624  // The entry at the front is the earliest.
625  if (!need_diagnostic_time_tick_ &&
626  (ordered_time_job_queue_.size() == 0 ||
627  ordered_time_job_queue_.front()->target_time_ >=
628  tick_time_ + mem_->get_sampling_period())) {
629  // There is no time job before the next tick time, so tick.
630  tick_time_ += mem_->get_sampling_period();
631  // Increase the diagnostic time to the tick time.
632  _Mem::diagnostic_time_now_ = tick_time_;
633  // We are beginning a new sampling period.
634  n_reduction_jobs_this_sampling_period_ = 0;
635  need_diagnostic_time_tick_ = true;
636  }
637 
638  // Call on_diagnostic_time_tick() only if there are no time jobs to run now.
639  if (need_diagnostic_time_tick_ &&
640  !(ordered_time_job_queue_.size() > 0 &&
641  ordered_time_job_queue_.front()->target_time_ <= tick_time_)) {
642  need_diagnostic_time_tick_ = false;
643  mem_->on_diagnostic_time_tick();
644 
645  // Step again in case on_diagnostic_time_tick() added a reduction job,
646  // or a reduction job will add more time jobs.
647  return true;
648  }
649 
650  if (ordered_time_job_queue_.size() == 0)
651  // No time jobs. Step again in case a reduction job will add one.
652  return true;
653 
654  if (ordered_time_job_queue_.front()->target_time_ > Now())
655  // Increase the diagnostic time to the job's target time.
656  _Mem::diagnostic_time_now_ = ordered_time_job_queue_.front()->target_time_;
657 
658  // Only process one job in case it adds more jobs.
659  P<TimeJob> time_job = ordered_time_job_queue_.front();
660  ordered_time_job_queue_.erase(ordered_time_job_queue_.begin());
661 
662  if (!time_job->is_alive()) {
663  time_job = NULL;
664  return true;
665  }
666 
667  Timestamp next_target(seconds(0));
668  if (!time_job->update(next_target)) {
669  // update() says to stop running.
670  time_job = NULL;
671  return true;
672  }
673 
674  if (next_target.time_since_epoch().count() != 0) {
675  // The job wants to run again, so re-insert into the queue.
676  time_job->target_time_ = next_target;
677  ordered_time_job_queue_.insert(
678  upper_bound(ordered_time_job_queue_.begin(),
679  ordered_time_job_queue_.end(), time_job, time_job_compare_),
680  time_job);
681  }
682  else
683  time_job = NULL;
684 
685  return true;
686 }
687 
688 Timestamp _Mem::diagnostic_time_now_ = Timestamp(microseconds(1));
689 
690 Timestamp _Mem::get_diagnostic_time_now() { return diagnostic_time_now_; }
691 
692 void _Mem::on_diagnostic_time_tick() {}
693 
694 void _Mem::stop() {
695 
696  stateCS_.enter();
697  if (state_ != RUNNING) {
698 
699  stateCS_.leave();
700  return;
701  }
702 
703  uint32 i;
705  for (i = 0; i < reduction_core_count_; ++i)
706  reduction_job_queue_->push(r = new ShutdownReductionCore());
707  P<TimeJob> t;
708  for (i = 0; i < time_core_count_; ++i)
709  time_job_queue_->push(t = new ShutdownTimeCore());
710 
711  state_ = STOPPED;
712  stateCS_.leave();
713 
714  for (i = 0; i < time_core_count_; ++i)
715  Thread::Wait(time_cores_[i]);
716 
717  for (i = 0; i < reduction_core_count_; ++i)
718  Thread::Wait(reduction_cores_[i]);
719 
720  stop_sem_->acquire(); // wait for delegates.
721 
722  reset();
723 }
724 
726 
727 P<_ReductionJob> _Mem::pop_reduction_job(bool waitForItem) {
728 
729  if (state_ == STOPPED)
730  return NULL;
731  return reduction_job_queue_->pop(waitForItem);
732 }
733 
734 void _Mem::push_reduction_job(_ReductionJob *j) {
735 
736  if (state_ == STOPPED)
737  return;
738  j->ijt_ = Now();
739  P<_ReductionJob> _j = j;
740  reduction_job_queue_->push(_j);
741 }
742 
743 P<TimeJob> _Mem::pop_time_job(bool waitForItem) {
744 
745  if (state_ == STOPPED)
746  return NULL;
747  return time_job_queue_->pop(waitForItem);
748 }
749 
750 void _Mem::push_time_job(TimeJob *j) {
751 
752  if (state_ == STOPPED)
753  return;
754  P<TimeJob> _j = j;
755  time_job_queue_->push(_j);
756 }
757 
759 
760 void _Mem::eject(View *view, uint16 node_id) {
761 }
762 
763 r_code::Code* _Mem::eject(Code *command) {
764  return NULL;
765 }
766 
768 
769 void _Mem::inject_copy(View *view, Group *destination) {
770 
771  View *copied_view = new View(view, destination); // ctrl values are morphed.
772  inject_existing_object(copied_view, view->object_, destination);
773 }
774 
775 void _Mem::inject_existing_object(View *view, Code *object, Group *host) {
776 
777  view->set_object(object); // the object already exists (content-wise): have the view point to the existing one.
778  host->inject_existing_object(view);
779 }
780 
781 void _Mem::inject_null_program(Controller *c, Group *group, microseconds time_to_live, bool take_past_inputs) {
782 
783  auto now = Now();
784 
785  Code *null_pgm = new LObject();
786  null_pgm->code(0) = Atom::NullProgram(take_past_inputs);
787 
788  uint32 res = Utils::GetResilience(now, time_to_live, group->get_upr() * Utils::GetBasePeriod().count());
789 
790  View *view = new View(View::SYNC_ONCE, now, 0, res, group, NULL, null_pgm, 1);
791  view->controller_ = c;
792 
793  c->set_view(view);
794 
795  inject(view);
796 }
797 
798 void _Mem::inject_new_object(View *view) {
799 
800  Group *host = view->get_host();
801  //uint64 t0,t1,t2;
802  switch (view->object_->code(0).getDescriptor()) {
803  case Atom::GROUP:
804  bind(view);
805 
806  host->inject_group(view);
807  break;
808  default:
809  //t0=Now();
810  bind(view);
811  //t1=Now();
812  host->inject_new_object(view);
813  //t2=Now();
814  //timings_report.push_back(t2-t0);
815  break;
816  }
817 }
818 
820  // Inject first to set the OID.
821  inject(view, true);
822  // For UNDEFINED_OID, assume that InjectionJob::update will log it.
823  if (view->object_->get_oid() != UNDEFINED_OID)
824  // The view injection time may be different than now, so log it too.
825  OUTPUT_LINE(IO_DEVICE_INJ_EJT, Utils::RelativeTime(Now()) << " I/O device inject " <<
826  view->object_->get_oid() << ", ijt " << Utils::RelativeTime(view->get_ijt()));
827 }
828 
830  Code* obj, Code* prop, Atom val, Timestamp after, Timestamp before,
831  View::SyncMode sync_mode, Code* group)
832 {
833  if (!obj || !prop)
834  // We don't expect this, but sanity check.
835  return NULL;
836 
837  Code *object = new LObject(this);
838  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
839  object->code(1) = Atom::RPointer(0); // obj
840  object->code(2) = Atom::RPointer(1); // prop
841  object->code(3) = val;
842  object->code(4) = Atom::Float(1); // psln_thr.
843 
844  object->set_reference(0, obj);
845  object->set_reference(1, prop);
846 
847  return inject_fact_from_io_device(object, after, before, sync_mode, group);
848 }
849 
851  Code* obj, Code* prop, std::vector<Atom> val, Timestamp after, Timestamp before,
852  View::SyncMode sync_mode, Code* group)
853 {
854  if (!obj || !prop)
855  // We don't expect this, but sanity check.
856  return NULL;
857 
858  Code* object = new LObject(this);
859  uint16 extent_index = 4;
860  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
861  object->code(1) = Atom::RPointer(0); // obj
862  object->code(2) = Atom::RPointer(1); // prop
863  object->code(3) = Atom::IPointer(++extent_index);
864  object->code(4) = Atom::Float(1); // psln_thr.
865 
866 
867  object->set_reference(0, obj);
868  object->set_reference(1, prop);
869  object->code(extent_index) = Atom::Set(val.size());
870  for (uint16 i = 0; i < val.size(); ++i) {
871  object->code(++extent_index) = val[i];
872  }
873 
874  return inject_fact_from_io_device(object, after, before, sync_mode, group);
875 }
876 
878  Code* obj, Code* prop, Code* val, Timestamp after, Timestamp before,
879  View::SyncMode sync_mode, Code* group)
880 {
881  if (!obj || !prop)
882  // We don't expect this, but sanity check.
883  return NULL;
884 
885  Code *object = new LObject(this);
886  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
887  object->code(1) = Atom::RPointer(0); // obj
888  object->code(2) = Atom::RPointer(1); // prop
889  object->code(3) = Atom::RPointer(2); // val
890  object->code(4) = Atom::Float(1); // psln_thr.
891 
892  object->set_reference(0, obj);
893  object->set_reference(1, prop);
894  object->set_reference(2, val);
895 
896  return inject_fact_from_io_device(object, after, before, sync_mode, group);
897 }
898 
900  Code* obj, Code* prop, const std::string& val, Timestamp after, Timestamp before, View::SyncMode sync_mode, Code* group)
901 {
902  if (!obj || !prop)
903  // We don't expect this, but sanity check.
904  return NULL;
905 
906  Code* object = new LObject(this);
907  uint16 extent_index = 4;
908  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
909  object->code(1) = Atom::RPointer(0); // obj
910  object->set_reference(0, obj);
911  object->code(2) = Atom::RPointer(1); // prop
912  object->set_reference(1, prop);
913  object->code(3) = Atom::IPointer(++extent_index); // val
914  object->code(4) = Atom::Float(1); // psln_thr.
915 
916  Utils::SetString<Code>(object, 3, val);
917 
918  return inject_fact_from_io_device(object, after, before, sync_mode, group);
919 }
920 
922  Code* obj, Code* prop, const vector<Code*>& val, Timestamp after, Timestamp before,
923  View::SyncMode sync_mode, Code* group)
924 {
925  if (!obj || !prop)
926  // We don't expect this, but sanity check.
927  return NULL;
928 
929  Code* object = new LObject(this);
930  uint16 extent_index = 4;
931  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
932  object->code(1) = Atom::RPointer(0); // obj
933  object->code(2) = Atom::RPointer(1); // prop
934  object->code(3) = Atom::IPointer(++extent_index); // val
935  object->code(4) = Atom::Float(1); // psln_thr.
936 
937  object->set_reference(0, obj);
938  object->set_reference(1, prop);
939  object->code(extent_index) = Atom::Set(val.size());
940  for (uint16 i = 0; i < val.size(); ++i) {
941  object->code(++extent_index) = Atom::RPointer(object->references_size());
942  object->set_reference(object->references_size(), val[i]);
943  }
944 
945  return inject_fact_from_io_device(object, after, before, sync_mode, group);
946 }
947 
949  Code* obj, Code* prop, uint16 opcode, const vector<Atom>& vals, Timestamp after, Timestamp before,
950  View::SyncMode sync_mode, Code* group)
951 {
952  if (!obj || !prop)
953  // We don't expect this, but sanity check.
954  return NULL;
955 
956  Code* object = new LObject(this);
957  uint16 extent_index = 4;
958  object->code(0) = Atom::Marker(GetOpcode("mk.val"), 4); // Caveat: arity does not include the opcode.
959  object->code(1) = Atom::RPointer(0); // obj
960  object->set_reference(0, obj);
961  object->code(2) = Atom::RPointer(1); // prop
962  object->set_reference(1, prop);
963  object->code(3) = Atom::IPointer(++extent_index); // val
964  object->code(4) = Atom::Float(1); // psln_thr.
965 
966  object->code(extent_index++) = Atom::Object(opcode, vals.size());
967  for (uint16 i = 0; i < vals.size(); ++i) {
968  object->code(extent_index++) = vals[i];
969  }
970 
971  return inject_fact_from_io_device(object, after, before, sync_mode, group);
972 }
973 
975  Code* object, Timestamp after, Timestamp before, View::SyncMode sync_mode,
976  Code* group)
977 {
978  // Build a fact.
979  Code* fact = new Fact(object, after, before, 1, 1);
980 
981  // Build a view for the fact.
982  View *view = new View(sync_mode, after, 1, 1, group, NULL, fact);
983 
984  // Inject the view.
985  inject_from_io_device(view);
986  return view;
987 }
988 
989 void _Mem::inject(View *view, bool is_from_io_device) {
990 
991  if (view->object_->is_invalidated())
992  return;
993 
994  Group *host = view->get_host();
995 
996  if (host->is_invalidated())
997  return;
998 
999  auto now = Now();
1000  auto ijt = view->get_ijt();
1001 
1002  if (view->object_->is_registered()) { // existing object.
1003 
1004  if (ijt <= now)
1005  inject_existing_object(view, view->object_, host);
1006  else {
1007 
1008  P<TimeJob> j = new EInjectionJob(view, ijt);
1009  time_job_queue_->push(j);
1010  }
1011  } else { // new object.
1012 
1013  if (ijt <= now)
1014  inject_new_object(view);
1015  else {
1016 
1017  P<TimeJob> j = new InjectionJob(view, ijt, is_from_io_device);
1018  time_job_queue_->push(j);
1019  }
1020  }
1021 }
1022 
1023 void _Mem::inject_async(View *view) {
1024 
1025  if (view->object_->is_invalidated())
1026  return;
1027 
1028  Group *host = view->get_host();
1029 
1030  if (host->is_invalidated())
1031  return;
1032 
1033  auto now = Now();
1034  auto ijt = view->get_ijt();
1035 
1036  if (ijt <= now) {
1037 
1038  P<_ReductionJob> j = new AsyncInjectionJob(view);
1039  reduction_job_queue_->push(j);
1040  } else {
1041 
1042  if (view->object_->is_registered()) { // existing object.
1043 
1044  P<TimeJob> j = new EInjectionJob(view, ijt);
1045  time_job_queue_->push(j);
1046  } else {
1047 
1048  P<TimeJob> j = new InjectionJob(view, ijt, false);
1049  time_job_queue_->push(j);
1050  }
1051  }
1052 }
1053 
1054 void _Mem::inject_hlps(vector<View *> views, Group *destination) {
1055 
1056  vector<View *>::const_iterator view;
1057  for (view = views.begin(); view != views.end(); ++view)
1058  bind(*view);
1059 
1060  destination->inject_hlps(views);
1061 }
1062 
1063 void _Mem::inject_notification(View *view, bool lock) { // no notification for notifications; no cov.
1064  // notifications are ephemeral: they are not held by the marker sets of the object they refer to; this implies no propagation of saliency changes trough notifications.
1065  Group *host = view->get_host();
1066 
1067  bind(view);
1068 
1069  host->inject_notification(view, lock);
1070 }
1071 
1073 
1074 void _Mem::register_reduction_job_latency(microseconds latency) {
1075 
1076  reduction_jobCS_.enter();
1077  ++reduction_job_count_;
1078  reduction_job_avg_latency_ += latency;
1079  reduction_jobCS_.leave();
1080 }
1081 void _Mem::register_time_job_latency(microseconds latency) {
1082 
1083  time_jobCS_.enter();
1084  ++time_job_count_;
1085  time_job_avg_latency_ += latency;
1086  time_jobCS_.leave();
1087 }
1088 
1089 void _Mem::inject_perf_stats() {
1090 
1091  reduction_jobCS_.enter();
1092  time_jobCS_.enter();
1093 
1094  microseconds d_reduction_job_avg_latency;
1095  if (reduction_job_count_ > 0) {
1096 
1097  reduction_job_avg_latency_ /= reduction_job_count_;
1098  d_reduction_job_avg_latency = reduction_job_avg_latency_ - _reduction_job_avg_latency_;
1099  } else
1100  reduction_job_avg_latency_ = d_reduction_job_avg_latency = microseconds(0);
1101 
1102  microseconds d_time_job_avg_latency;
1103  if (time_job_count_ > 0) {
1104 
1105  time_job_avg_latency_ /= time_job_count_;
1106  d_time_job_avg_latency = time_job_avg_latency_ - _time_job_avg_latency_;
1107  } else
1108  time_job_avg_latency_ = d_time_job_avg_latency = microseconds(0);
1109 
1110  Code *perf = new Perf(reduction_job_avg_latency_, d_reduction_job_avg_latency, time_job_avg_latency_, d_time_job_avg_latency);
1111 
1112  // reset stats.
1113  reduction_job_count_ = time_job_count_ = 0;
1114  _reduction_job_avg_latency_ = reduction_job_avg_latency_;
1115  _time_job_avg_latency_ = time_job_avg_latency_;
1116 
1117  time_jobCS_.leave();
1118  reduction_jobCS_.leave();
1119 
1120  // inject f->perf in stdin.
1121  auto now = Now();
1122  Code *f_perf = new Fact(perf, now, now + perf_sampling_period_, 1, 1);
1123  View *view = new View(View::SYNC_ONCE, now, 1, 1, stdin_, NULL, f_perf); // sync front, sln=1, res=1.
1124  inject(view);
1125 }
1126 
1128 
1129 void _Mem::propagate_sln(Code *object, float32 change, float32 source_sln_thr) {
1130 
1131  // apply morphed change to views.
1132  // loops are prevented within one call, but not accross several upr:
1133  // - feedback can happen, i.e. m:(mk o1 o2); o1.vw.g propag -> o1 propag ->m propag -> o2 propag o2.vw.g, next upr in g, o2 propag -> m propag -> o1 propag -> o1,vw.g: loop spreading accross several upr.
1134  // - to avoid this, have the psln_thr set to 1 in o2: this is applicaton-dependent.
1135  object->acq_views();
1136 
1137  if (object->views_.size() == 0) {
1138 
1139  object->invalidate();
1140  object->rel_views();
1141  return;
1142  }
1143 
1144  unordered_set<_View *, _View::Hash, _View::Equal>::const_iterator it;
1145  for (it = object->views_.begin(); it != object->views_.end(); ++it) {
1146 
1147  float32 morphed_sln_change = View::MorphChange(change, source_sln_thr, ((View*)*it)->get_host()->get_sln_thr());
1148  if (morphed_sln_change != 0)
1149  ((View*)*it)->get_host()->pending_operations_.push_back(new Group::Mod(((View*)*it)->get_oid(), VIEW_SLN, morphed_sln_change));
1150  }
1151  object->rel_views();
1152 }
1153 
1154 void _Mem::unpack_hlp(Code *hlp) { // produces a new object (featuring a set of pattern objects instread of a set of embedded pattern expressions) and add it as a hidden reference to the original (still packed) hlp.
1155 
1156  Code *unpacked_hlp = new LObject(); // will not be transmitted nor decompiled.
1157 
1158  for (uint16 i = 0; i < hlp->code_size(); ++i)
1159  unpacked_hlp->code(i) = hlp->code(i);
1160 
1161  uint16 pattern_set_index = hlp->code(HLP_OBJS).asIndex();
1162  uint16 pattern_count = hlp->code(pattern_set_index).getAtomCount();
1163  for (uint16 i = 1; i <= pattern_count; ++i) { // init the new references with the facts; turn the exisitng i-ptrs into r-ptrs.
1164 
1165  Code *fact = unpack_fact(hlp, hlp->code(pattern_set_index + i).asIndex());
1166  unpacked_hlp->add_reference(fact);
1167  unpacked_hlp->code(pattern_set_index + i) = Atom::RPointer(unpacked_hlp->references_size() - 1);
1168  }
1169 
1170  uint16 group_set_index = hlp->code(HLP_OUT_GRPS).asIndex();
1171  uint16 group_count = hlp->code(group_set_index++).getAtomCount();
1172  for (uint16 i = 0; i < group_count; ++i) { // append the out_groups to the new references; adjust the exisitng r-ptrs.
1173 
1174  unpacked_hlp->add_reference(hlp->get_reference(hlp->code(group_set_index + i).asIndex()));
1175  unpacked_hlp->code(group_set_index + i) = Atom::RPointer(unpacked_hlp->references_size() - 1);
1176  }
1177 
1178  uint16 invalid_point = pattern_set_index + pattern_count + 1; // index of what is after set of the patterns.
1179  uint16 valid_point = hlp->code(HLP_FWD_GUARDS).asIndex(); // index of the first atom that does not belong to the patterns.
1180  uint16 invalid_zone_length = valid_point - invalid_point;
1181  for (uint16 i = valid_point; i < hlp->code_size(); ++i) { // shift the valid code upward; adjust i-ptrs.
1182 
1183  Atom h_atom = unpacked_hlp->code(i);
1184  switch (h_atom.getDescriptor()) {
1185  case Atom::I_PTR:
1186  unpacked_hlp->code(i - invalid_zone_length) = Atom::IPointer(h_atom.asIndex() - invalid_zone_length);
1187  break;
1188  case Atom::ASSIGN_PTR:
1189  unpacked_hlp->code(i - invalid_zone_length) = Atom::AssignmentPointer(h_atom.asAssignmentIndex(), h_atom.asIndex() - invalid_zone_length);
1190  break;
1191  default:
1192  unpacked_hlp->code(i - invalid_zone_length) = h_atom;
1193  break;
1194  }
1195  }
1196 
1197  // adjust set indices.
1198  unpacked_hlp->code(HLP_FWD_GUARDS) = Atom::IPointer(hlp->code(HLP_FWD_GUARDS).asIndex() - invalid_zone_length);
1199  unpacked_hlp->code(HLP_BWD_GUARDS) = Atom::IPointer(hlp->code(HLP_BWD_GUARDS).asIndex() - invalid_zone_length);
1200  unpacked_hlp->code(HLP_OUT_GRPS) = Atom::IPointer(hlp->code(HLP_OUT_GRPS).asIndex() - invalid_zone_length);
1201 
1202  uint16 unpacked_code_length = hlp->code_size() - invalid_zone_length;
1203  unpacked_hlp->resize_code(unpacked_code_length);
1204  hlp->add_reference(unpacked_hlp);
1205 }
1206 
1207 Code *_Mem::unpack_fact(Code *hlp, uint16 fact_index) {
1208 
1209  Code *fact = new LObject();
1210  Code *fact_object;
1211  uint16 fact_size = hlp->code(fact_index).getAtomCount() + 1;
1212  for (uint16 i = 0; i < fact_size; ++i) {
1213 
1214  Atom h_atom = hlp->code(fact_index + i);
1215  switch (h_atom.getDescriptor()) {
1216  case Atom::I_PTR:
1217  fact->code(i) = Atom::RPointer(fact->references_size());
1218  fact_object = unpack_fact_object(hlp, h_atom.asIndex());
1219  fact->add_reference(fact_object);
1220  break;
1221  case Atom::R_PTR: // case of a reference to an exisitng object.
1222  fact->code(i) = Atom::RPointer(fact->references_size());
1223  fact->add_reference(hlp->get_reference(h_atom.asIndex()));
1224  break;
1225  default:
1226  fact->code(i) = h_atom;
1227  break;
1228  }
1229  }
1230 
1231  return fact;
1232 }
1233 
1234 Code *_Mem::unpack_fact_object(Code *hlp, uint16 fact_object_index) {
1235 
1236  Code *fact_object = new LObject();
1237  _unpack_code(hlp, fact_object_index, fact_object, fact_object_index);
1238  return fact_object;
1239 }
1240 
1241 void _Mem::_unpack_code(Code *hlp, uint16 fact_object_index, Code *fact_object, uint16 read_index) {
1242 
1243  Atom h_atom = hlp->code(read_index);
1244  uint16 code_size = h_atom.getAtomCount() + 1;
1245  uint16 write_index = read_index - fact_object_index;
1246  for (uint16 i = 0; i < code_size; ++i) {
1247 
1248  switch (h_atom.getDescriptor()) {
1249  case Atom::R_PTR:
1250  fact_object->code(write_index + i) = Atom::RPointer(fact_object->references_size());
1251  fact_object->add_reference(hlp->get_reference(h_atom.asIndex()));
1252  break;
1253  case Atom::I_PTR:
1254  fact_object->code(write_index + i) = Atom::IPointer(h_atom.asIndex() - fact_object_index);
1255  _unpack_code(hlp, fact_object_index, fact_object, h_atom.asIndex());
1256  break;
1257  default:
1258  fact_object->code(write_index + i) = h_atom;
1259  break;
1260  }
1261 
1262  h_atom = hlp->code(read_index + i + 1);
1263  }
1264 }
1265 
1266 void _Mem::pack_hlp(Code *hlp) const { // produces a new object where a set of pattern objects is transformed into a packed set of pattern code.
1267 
1268  Code *unpacked_hlp = clone(hlp);
1269 
1270  vector<Atom> trailing_code; // copy of the original code (the latter will be overwritten by packed facts).
1271  uint16 trailing_code_index = hlp->code(HLP_FWD_GUARDS).asIndex();
1272  for (uint16 i = trailing_code_index; i < hlp->code_size(); ++i)
1273  trailing_code.push_back(hlp->code(i));
1274 
1275  uint16 group_set_index = hlp->code(HLP_OUT_GRPS).asIndex();
1276  uint16 group_count = hlp->code(group_set_index).getAtomCount();
1277 
1278  vector<P<Code> > references;
1279 
1280  uint16 pattern_set_index = hlp->code(HLP_OBJS).asIndex();
1281  uint16 pattern_count = hlp->code(pattern_set_index).getAtomCount();
1282  uint16 insertion_point = pattern_set_index + pattern_count + 1; // point from where compacted code is to be inserted.
1283  uint16 extent_index = insertion_point;
1284  for (uint16 i = 0; i < pattern_count; ++i) {
1285 
1286  Code *pattern_object = hlp->get_reference(i);
1287  hlp->code(pattern_set_index + i + 1) = Atom::IPointer(extent_index);
1288  pack_fact(pattern_object, hlp, extent_index, &references);
1289  }
1290 
1291  uint16 inserted_zone_length = extent_index - insertion_point;
1292 
1293  for (uint16 i = 0; i < trailing_code.size(); ++i) { // shift the trailing code downward; adjust i-ptrs.
1294 
1295  Atom t_atom = trailing_code[i];
1296  switch (t_atom.getDescriptor()) {
1297  case Atom::I_PTR:
1298  hlp->code(i + extent_index) = Atom::IPointer(t_atom.asIndex() + inserted_zone_length);
1299  break;
1300  case Atom::ASSIGN_PTR:
1301  hlp->code(i + extent_index) = Atom::AssignmentPointer(t_atom.asAssignmentIndex(), t_atom.asIndex() + inserted_zone_length);
1302  break;
1303  default:
1304  hlp->code(i + extent_index) = t_atom;
1305  break;
1306  }
1307  }
1308 
1309  // adjust set indices.
1310  hlp->code(HLP_FWD_GUARDS) = Atom::IPointer(hlp->code(HLP_FWD_GUARDS).asIndex() + inserted_zone_length);
1311  hlp->code(HLP_BWD_GUARDS) = Atom::IPointer(hlp->code(HLP_BWD_GUARDS).asIndex() + inserted_zone_length);
1312  hlp->code(HLP_OUT_GRPS) = Atom::IPointer(hlp->code(HLP_OUT_GRPS).asIndex() + inserted_zone_length);
1313 
1314  group_set_index += inserted_zone_length;
1315  for (uint16 i = 1; i <= group_count; ++i) { // append the out_groups to the new references; adjust the exisitng r-ptrs.
1316 
1317  references.push_back(hlp->get_reference(hlp->code(group_set_index + i).asIndex()));
1318  hlp->code(group_set_index + i) = Atom::RPointer(references.size() - 1);
1319  }
1320 
1321  hlp->set_references(references);
1322 
1323  hlp->add_reference(unpacked_hlp); // hidden reference.
1324 }
1325 
1326 void _Mem::pack_fact(Code *fact, Code *hlp, uint16 &write_index, vector<P<Code> > *references) {
1327 
1328  uint16 extent_index = write_index + fact->code_size();
1329  for (uint16 i = 0; i < fact->code_size(); ++i) {
1330 
1331  Atom p_atom = fact->code(i);
1332  switch (p_atom.getDescriptor()) {
1333  case Atom::R_PTR: // transform into a i_ptr and pack the pointed object.
1334  hlp->code(write_index) = Atom::IPointer(extent_index);
1335  pack_fact_object(fact->get_reference(p_atom.asIndex()), hlp, extent_index, references);
1336  ++write_index;
1337  break;
1338  default:
1339  hlp->code(write_index) = p_atom;
1340  ++write_index;
1341  break;
1342  }
1343  }
1344  write_index = extent_index;
1345 }
1346 
1347 void _Mem::pack_fact_object(Code *fact_object, Code *hlp, uint16 &write_index, vector<P<Code> > *references) {
1348 
1349  uint16 extent_index = write_index + fact_object->code_size();
1350  uint16 offset = write_index;
1351  for (uint16 i = 0; i < fact_object->code_size(); ++i) {
1352 
1353  Atom p_atom = fact_object->code(i);
1354  switch (p_atom.getDescriptor()) {
1355  case Atom::R_PTR: { // append this reference to the hlp's if not already there.
1356  Code *reference = fact_object->get_reference(p_atom.asIndex());
1357  bool found = false;
1358  for (uint16 i = 0; i < references->size(); ++i) {
1359 
1360  if ((*references)[i] == reference) {
1361 
1362  hlp->code(write_index) = Atom::RPointer(i);
1363  found = true;
1364  break;
1365  }
1366  }
1367  if (!found) {
1368 
1369  hlp->code(write_index) = Atom::RPointer(references->size());
1370  references->push_back(reference);
1371  }
1372  ++write_index;
1373  break;
1374  }case Atom::I_PTR: // offset the ptr by write_index. PB HERE.
1375  hlp->code(write_index) = Atom::IPointer(offset + p_atom.asIndex());
1376  ++write_index;
1377  break;
1378  default:
1379  hlp->code(write_index) = p_atom;
1380  ++write_index;
1381  break;
1382  }
1383  }
1384 }
1385 
1386 Code* _Mem::find_object(const vector<Code*> *objects, const char* name) {
1387  // Find the object OID.
1388  uint32 oid = Seed.object_names_.findSymbol(name);
1389  if (oid == UNDEFINED_OID)
1390  return NULL;
1391 
1392  // Find the object. (Imitate the code in _Mem::load.)
1393  for (uint32 i = 0; i < objects->size(); ++i) {
1394  Code *object = (*objects)[i];
1395  if (object->get_oid() == oid)
1396  return object;
1397  }
1398 
1399  return NULL;
1400 }
1401 
1402 Code *_Mem::clone(Code *original) const { // shallow copy; oid not copied.
1403 
1404  Code *_clone = build_object(original->code(0));
1405  uint16 opcode = original->code(0).asOpcode();
1406  if (opcode == Opcodes::Ont || opcode == Opcodes::Ent)
1407  return original;
1408 
1409  for (uint16 i = 0; i < original->code_size(); ++i)
1410  _clone->code(i) = original->code(i);
1411  for (uint16 i = 0; i < original->references_size(); ++i)
1412  _clone->add_reference(original->get_reference(i));
1413  return _clone;
1414 }
1415 
1417 {
1418  for (auto axiom = axiom_values_.begin(); axiom != axiom_values_.end(); ++axiom) {
1419  if (_Fact::MatchObject(obj, *axiom))
1420  return true;
1421  }
1422 
1423  return false;
1424 }
1425 
1427 
1428 r_comp::Image *_Mem::get_models() {
1429 
1430  r_comp::Image *image = new r_comp::Image();
1431  image->timestamp_ = Now();
1432 
1433  r_code::list<P<Code> > models;
1434  ModelBase::Get()->get_models(models); // protected by ModelBase.
1435  image->add_objects(models);
1436 
1437  return image;
1438 }
1439 
1441 
1442 MemStatic::MemStatic() : _Mem(), last_oid_(-1) {
1443 }
1444 
1445 MemStatic::~MemStatic() {
1446 }
1447 
1448 void MemStatic::bind(View *view) {
1449 
1450  Code *object = view->object_;
1451  object->views_.insert(view);
1452  objectsCS_.enter();
1453  object->set_oid(++last_oid_);
1454  if (object->code(0).getDescriptor() == Atom::NULL_PROGRAM) {
1455 
1456  objectsCS_.leave();
1457  return;
1458  }
1459  int32 location;
1460  objects_.push_back(object, location);
1461  object->set_strorage_index(location);
1462  objectsCS_.leave();
1463 }
1464 void MemStatic::set_last_oid(int32 oid) {
1465 
1466  last_oid_ = oid;
1467 }
1468 
1469 void MemStatic::delete_object(r_code::Code *object) { // called only if the object is registered, i.e. has a valid storage index.
1470 
1471  if (deleted_)
1472  return;
1473 
1474  // keep_invalidated_objects_ is true if settings.xml has
1475  // get_objects="yes_with_invalidated", in which case don't erase.
1476  if (!keep_invalidated_objects_) {
1477  objectsCS_.enter();
1478  objects_.erase(object->get_storage_index());
1479  objectsCS_.leave();
1480  }
1481 }
1482 
1483 r_comp::Image *MemStatic::get_objects(bool include_invalidated) {
1484 
1485  r_comp::Image *image = new r_comp::Image();
1486  image->timestamp_ = Now();
1487 
1488  objectsCS_.enter();
1489  image->add_objects(objects_, include_invalidated);
1490  objectsCS_.leave();
1491 
1492  return image;
1493 }
1494 
1496 
1497 MemVolatile::MemVolatile() : _Mem(), last_oid_(-1) {
1498 }
1499 
1500 MemVolatile::~MemVolatile() {
1501 }
1502 
1503 uint32 MemVolatile::get_oid() {
1504 
1505  return ++last_oid_;
1506 }
1507 
1508 void MemVolatile::set_last_oid(int32 oid) {
1509 
1510  last_oid_ = oid;
1511 }
1512 
1513 void MemVolatile::bind(View *view) {
1514 
1515  Code *object = view->object_;
1516  object->views_.insert(view);
1517  object->set_oid(get_oid());
1518 }
1519 }
r_exec::DiagnosticTimeState::DiagnosticTimeState
DiagnosticTimeState(_Mem *mem, std::chrono::milliseconds run_time)
Definition: mem.cpp:535
r_exec::DiagnosticTimeState
Definition: mem.h:904
r_exec::_Mem::init_timestamps
static void init_timestamps(Timestamp time_reference, const r_code::list< P< r_code::Code >> &objects)
Definition: mem.cpp:430
r_exec::_Mem::Output
static std::ostream & Output(TraceLevel l)
Definition: mem.cpp:213
core::PipeNN::pop
T pop(bool waitForItem=true)
r_exec::_Mem::find_object
static r_code::Code * find_object(const std::vector< r_code::Code * > *objects, const char *name)
Definition: mem.cpp:1386
r_exec::_Mem::get_sampling_period
std::chrono::microseconds get_sampling_period() const
Definition: mem.h:242
r_exec::_Mem::inject_fact_from_io_device
View * inject_fact_from_io_device(r_code::Code *object, Timestamp after, Timestamp before, View::SyncMode sync_mode, r_code::Code *group)
Definition: mem.cpp:974
r_exec::_Mem::inject_from_io_device
void inject_from_io_device(View *view)
Definition: mem.cpp:819
r_code::Atom
Definition: atom.h:104
core::PipeNN
Definition: pipe.h:151
r_comp::Image::add_objects
void add_objects(r_code::list< P< r_code::Code > > &objects, bool include_invalidated=false)
Definition: segments.cpp:491
r_code::NullOStream
Definition: r_code/utils.h:105
r_exec::LObject
Definition: r_exec/object.h:195
r_exec::_Mem::run_in_diagnostic_time
void run_in_diagnostic_time(std::chrono::milliseconds run_time)
Definition: mem.cpp:525
core::P
Definition: base.h:103
r_exec::InjectionJob
Definition: time_job.h:163
r_code::Code
Definition: r_code/object.h:224
r_exec::AsyncInjectionJob
Definition: reduction_job.h:160
r_exec::_Fact::MatchObject
static bool MatchObject(const r_code::Code *lhs, const r_code::Code *rhs, bool same_binding_state=false)
Definition: factory.cpp:364
r_exec::EInjectionJob
Definition: time_job.h:181
r_exec::View
Definition: view.h:102
r_exec::Group
Definition: group.h:108
r_exec::InputLessPGMSignalingJob
Definition: time_job.h:152
r_exec::_Mem::inject
void inject(View *view, bool is_from_io_device=false)
Definition: mem.cpp:989
r_comp::Image
Definition: segments.h:177
r_comp::ObjectNames::findSymbol
uint32 findSymbol(const std::string &name)
Definition: segments.cpp:460
core::Semaphore
Definition: submodules/CoreLibrary/CoreLibrary/utils.h:208
r_code::Mem
Definition: r_code/object.h:420
r_code::list
Definition: list.h:99
r_exec::Fact
Definition: factory.h:360
r_exec::_Mem
Definition: mem.h:118
r_exec::DiagnosticTimeState::step
bool step()
Definition: mem.cpp:547
r_exec::_Mem::matches_axiom
bool matches_axiom(r_code::Code *obj)
Definition: mem.cpp:1416
r_exec::_Mem::inject_marker_value_from_io_device
View * inject_marker_value_from_io_device(r_code::Code *obj, r_code::Code *prop, Atom val, Timestamp after, Timestamp before, View::SyncMode sync_mode, r_code::Code *group)
Definition: mem.cpp:829