OpenCog Framework  Branch: master, revision 6f0b7fc776b08468cf1b74aa9db028f387b4f0c0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
AtomStorage.cc
Go to the documentation of this file.
1 /*
2  * FUNCTION:
3  * Persistent Atom storage, SQL-backed.
4  *
5  * Atoms are saved to, and restored from, an SQL DB.
6  * Atoms are identified by means of unique ID's, which are taken to
7  * be the atom Handles, as maintained by the TLB. In particular, the
8  * system here depends on the handles in the TLB and in the SQL DB
9  * to be consistent (i.e. kept in sync).
10  *
11  * Copyright (c) 2008,2009,2013 Linas Vepstas <linas@linas.org>
12  *
13  * LICENSE:
14  * This program is free software; you can redistribute it and/or modify
15  * it under the terms of the GNU Affero General Public License v3 as
16  * published by the Free Software Foundation and including the exceptions
17  * at http://opencog.org/wiki/Licenses
18  *
19  * This program is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22  * GNU General Public License for more details.
23  *
24  * You should have received a copy of the GNU Affero General Public License
25  * along with this program; if not, write to:
26  * Free Software Foundation, Inc.,
27  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
28  */
29 #ifdef HAVE_SQL_STORAGE
30 
31 #include <stdlib.h>
32 #include <unistd.h>
33 
34 #include <chrono>
35 #include <memory>
36 #include <thread>
37 
38 #include <opencog/util/oc_assert.h>
39 #include <opencog/atomspace/Atom.h>
43 #include <opencog/atomspace/Link.h>
44 #include <opencog/atomspace/Node.h>
47 #include <opencog/atomspace/TLB.h>
49 
50 #include "AtomStorage.h"
51 #include "odbcxx.h"
52 
53 using namespace opencog;
54 
55 #define USE_INLINE_EDGES
56 
57 /* ================================================================ */
58 
68 class AtomStorage::Response
69 {
70  public:
71  ODBCRecordSet *rs;
72 
73  // Temporary cache of info about atom being assembled.
74  Handle handle;
75  int itype;
76  const char * name;
77  int tv_type;
78  double mean;
79  double confidence;
80  double count;
81  const char *outlist;
82  int height;
83 
84  Response()
85  {
86  tname = "";
87  itype = 0;
88  intval = 0;
89  }
90 
91  bool create_atom_column_cb(const char *colname, const char * colvalue)
92  {
93  // printf ("%s = %s\n", colname, colvalue);
94  if (!strcmp(colname, "type"))
95  {
96  itype = atoi(colvalue);
97  }
98  else if (!strcmp(colname, "name"))
99  {
100  name = colvalue;
101  }
102  else if (!strcmp(colname, "outgoing"))
103  {
104  outlist = colvalue;
105  }
106  if (!strcmp(colname, "tv_type"))
107  {
108  tv_type = atoi(colvalue);
109  }
110  else if (!strcmp(colname, "stv_mean"))
111  {
112  mean = atof(colvalue);
113  }
114  else if (!strcmp(colname, "stv_confidence"))
115  {
116  confidence = atof(colvalue);
117  }
118  else if (!strcmp(colname, "stv_count"))
119  {
120  count = atof(colvalue);
121  }
122  else if (!strcmp(colname, "uuid"))
123  {
124  UUID uuid = strtoul(colvalue, NULL, 10);
125  handle = Handle(uuid);
126  }
127  return false;
128  }
129  bool create_atom_cb(void)
130  {
131  // printf ("---- New atom found ----\n");
132  rs->foreach_column(&Response::create_atom_column_cb, this);
133 
134  return false;
135  }
136 
137  AtomTable *table;
139  bool load_all_atoms_cb(void)
140  {
141  // printf ("---- New atom found ----\n");
142  rs->foreach_column(&Response::create_atom_column_cb, this);
143 
144  AtomPtr atom(store->makeAtom(*this, handle));
145  table->add(atom, true);
146  return false;
147  }
148 
149  // Load an atom into the atom table, but only if it's not in
150  // it already. The goal is to avoid clobbering the truth value
151  // that is currently in the AtomTable. Adding an atom to the
152  // atom table that already exists causes the two TV's to be
153  // merged, which is probably not what was wanted...
154  bool load_if_not_exists_cb(void)
155  {
156  // printf ("---- New atom found ----\n");
157  rs->foreach_column(&Response::create_atom_column_cb, this);
158 
159  if (not table->holds(handle))
160  {
161  AtomPtr atom(store->makeAtom(*this, handle));
162  load_recursive_if_not_exists(atom);
163  }
164  return false;
165  }
166 
167  // Helper function for the above. The problem is that, when
168  // adding links of unknown provenance, it could happen that
169  // the outgoing set of the link has not yet been loaded. In
170  // that case, we have to load the outgoing set first.
171  void load_recursive_if_not_exists(AtomPtr atom)
172  {
173  LinkPtr link(LinkCast(atom));
174  if (link)
175  {
176  const HandleSeq& oset = link->getOutgoingSet();
177  for (Handle h : oset)
178  {
179  if (table->holds(h)) continue;
180  AtomPtr a(store->getAtom(h));
181  load_recursive_if_not_exists(a);
182  }
183  }
184  table->add(atom, true);
185  }
186 
187  std::vector<Handle> *hvec;
188  bool fetch_incoming_set_cb(void)
189  {
190  // printf ("---- New atom found ----\n");
191  rs->foreach_column(&Response::create_atom_column_cb, this);
192 
193  // Note, unlike the above 'load' routines, this merely fetches
194  // the atoms, and returns a vector of them. They are loaded
195  // into the atomspace later, by the caller.
196  Handle h(store->makeAtom(*this, handle));
197  hvec->push_back(h);
198  return false;
199  }
200 
201  bool row_exists;
202  bool row_exists_cb(void)
203  {
204  row_exists = true;
205  return false;
206  }
207 
208 #ifndef USE_INLINE_EDGES
209  // Temporary cache of info about the outgoing set.
210  std::vector<Handle> *outvec;
211  Handle dst;
212  int pos;
213 
214  bool create_edge_cb(void)
215  {
216  // printf ("---- New edge found ----\n");
217  rs->foreach_column(&Response::create_edge_column_cb, this);
218  int sz = outvec->size();
219  if (sz <= pos) outvec->resize(pos+1);
220  outvec->at(pos) = dst;
221  return false;
222  }
223  bool create_edge_column_cb(const char *colname, const char * colvalue)
224  {
225  // printf ("%s = %s\n", colname, colvalue);
226  if (!strcmp(colname, "dst_uuid"))
227  {
228  dst = Handle(strtoul(colvalue, (char **) NULL, 10));
229  }
230  else if (!strcmp(colname, "pos"))
231  {
232  pos = atoi(colvalue);
233  }
234  return false;
235  }
236 #endif /* USE_INLINE_EDGES */
237 
238  // deal twith the type-to-id map
239  bool type_cb(void)
240  {
241  rs->foreach_column(&Response::type_column_cb, this);
242  store->set_typemap(itype, tname);
243  return false;
244  }
245  const char * tname;
246  bool type_column_cb(const char *colname, const char * colvalue)
247  {
248  if (!strcmp(colname, "type"))
249  {
250  itype = atoi(colvalue);
251  }
252  else if (!strcmp(colname, "typename"))
253  {
254  tname = colvalue;
255  }
256  return false;
257  }
258 #ifdef OUT_OF_LINE_TVS
259  // Callbacks for SimpleTruthValues
260  int tvid;
261  bool create_tv_cb(void)
262  {
263  // printf ("---- New SimpleTV found ----\n");
264  rs->foreach_column(&Response::create_tv_column_cb, this);
265  return false;
266  }
267  bool create_tv_column_cb(const char *colname, const char * colvalue)
268  {
269  printf ("%s = %s\n", colname, colvalue);
270  if (!strcmp(colname, "mean"))
271  {
272  mean = atof(colvalue);
273  }
274  else if (!strcmp(colname, "count"))
275  {
276  count = atof(colvalue);
277  }
278  return false;
279  }
280 
281 #endif /* OUT_OF_LINE_TVS */
282 
283  // get generic positive integer values
284  unsigned long intval;
285  bool intval_cb(void)
286  {
287  rs->foreach_column(&Response::intval_column_cb, this);
288  return false;
289  }
290  bool intval_column_cb(const char *colname, const char * colvalue)
291  {
292  // we're not going to bother to check the column name ...
293  intval = strtoul(colvalue, NULL, 10);
294  return false;
295  }
296 
297  // Get all handles in the database.
298  std::set<UUID> *id_set;
299  bool note_id_cb(void)
300  {
301  rs->foreach_column(&Response::note_id_column_cb, this);
302  return false;
303  }
304  bool note_id_column_cb(const char *colname, const char * colvalue)
305  {
306  // we're not going to bother to check the column name ...
307  UUID id = strtoul(colvalue, NULL, 10);
308  id_set->insert(id);
309  return false;
310  }
311 
312 };
313 
314 /* ================================================================ */
322 
325 {
326  return conn_pool.pop();
327 }
328 
331 {
332  conn_pool.push(db_conn);
333 }
334 
335 /* ================================================================ */
336 
337 bool AtomStorage::idExists(const char * buff)
338 {
339  ODBCConnection* db_conn = get_conn();
340  Response rp;
341  rp.row_exists = false;
342  rp.rs = db_conn->exec(buff);
343  rp.rs->foreach_row(&Response::row_exists_cb, &rp);
344  rp.rs->release();
345  put_conn(db_conn);
346  return rp.row_exists;
347 }
348 
349 /* ================================================================ */
350 #define BUFSZ 250
351 
352 #ifndef USE_INLINE_EDGES
353 
357 class AtomStorage::Outgoing
358 {
359  private:
360  ODBCConnection *db_conn;
361  unsigned int pos;
362  Handle src_handle;
363  public:
364  Outgoing (ODBCConnection *c, Handle h)
365  {
366  db_conn = c;
367  src_handle = h;
368  pos = 0;
369  }
370  bool each_handle (Handle h)
371  {
372  char buff[BUFSZ];
373  UUID src_uuid = src_handle.value();
374  UUID dst_uuid = h.value();
375  snprintf(buff, BUFSZ, "INSERT INTO Edges "
376  "(src_uuid, dst_uuid, pos) VALUES (%lu, %lu, %u);",
377  src_uuid, dst_uuid, pos);
378 
379  Response rp;
380  rp.rs = db_conn->exec(buff);
381  rp.rs->release();
382  pos ++;
383  return false;
384  }
385 };
386 
393 {
394  Outgoing out(db_conn, h);
395 
396  foreach_outgoing_handle(h, &Outgoing::each_handle, &out);
397 }
398 
399 #endif /* USE_INLINE_EDGES */
400 
401 /* ================================================================ */
402 // Constructors
403 
404 void AtomStorage::init(const char * dbname,
405  const char * username,
406  const char * authentication)
407 {
408  // Create six, by default ... maybe make more?
409  // There should probably be a few more here, than the number of
410  // startWriterThread() calls below.
411 #define DEFAULT_NUM_CONNS 6
412  for (int i=0; i<DEFAULT_NUM_CONNS; i++)
413  {
414  ODBCConnection* db_conn = new ODBCConnection(dbname, username, authentication);
415  conn_pool.push(db_conn);
416  }
417  type_map_was_loaded = false;
418  max_height = 0;
419 
420  for (int i=0; i< TYPEMAP_SZ; i++)
421  {
422  db_typename[i] = NULL;
423  }
424 
425  local_id_cache_is_inited = false;
426  if (!connected()) return;
427 
428  reserve();
429 }
430 
431 AtomStorage::AtomStorage(const char * dbname,
432  const char * username,
433  const char * authentication)
434  : _write_queue(this, &AtomStorage::vdo_store_atom)
435 {
436  init(dbname, username, authentication);
437 }
438 
439 AtomStorage::AtomStorage(const std::string& dbname,
440  const std::string& username,
441  const std::string& authentication)
442  : _write_queue(this, &AtomStorage::vdo_store_atom)
443 {
444  init(dbname.c_str(), username.c_str(), authentication.c_str());
445 }
446 
447 AtomStorage::~AtomStorage()
448 {
449  if (connected())
451 
452  while (not conn_pool.is_empty())
453  {
454  ODBCConnection* db_conn = conn_pool.pop();
455  delete db_conn;
456  }
457 
458  for (int i=0; i<TYPEMAP_SZ; i++)
459  {
460  if (db_typename[i]) free(db_typename[i]);
461  }
462 }
463 
469 bool AtomStorage::connected(void)
470 {
471  ODBCConnection* db_conn = get_conn();
472  bool have_connection = db_conn->connected();
473  put_conn(db_conn);
474  return have_connection;
475 }
476 
477 /* ================================================================ */
478 
479 #define STMT(colname,val) { \
480  if(update) { \
481  if (notfirst) { cols += ", "; } else notfirst = 1; \
482  cols += colname; \
483  cols += " = "; \
484  cols += val; \
485  } else { \
486  if (notfirst) { cols += ", "; vals += ", "; } else notfirst = 1; \
487  cols += colname; \
488  vals += val; \
489  } \
490 }
491 
492 #define STMTI(colname,ival) { \
493  char buff[BUFSZ]; \
494  snprintf(buff, BUFSZ, "%u", ival); \
495  STMT(colname, buff); \
496 }
497 
498 #define STMTF(colname,fval) { \
499  char buff[BUFSZ]; \
500  snprintf(buff, BUFSZ, "%12.8g", fval); \
501  STMT(colname, buff); \
502 }
503 
504 /* ================================================================ */
505 
506 #ifdef OUT_OF_LINE_TVS
507 
510 bool AtomStorage::tvExists(int tvid)
511 {
512  char buff[BUFSZ];
513  snprintf(buff, BUFSZ, "SELECT tvid FROM SimpleTVs WHERE tvid = %u;", tvid);
514  return idExists(buff);
515 }
516 
522 int AtomStorage::storeTruthValue(AtomPtr atom, Handle h)
523 {
524  int notfirst = 0;
525  std::string cols;
526  std::string vals;
527  std::string coda;
528 
529  const TruthValue &tv = atom->getTruthValue();
530 
531  const SimpleTruthValue *stv = dynamic_cast<const SimpleTruthValue *>(&tv);
532  if (NULL == stv)
533  {
534  fprintf(stderr, "Error: non-simple truth values are not handled\n");
535  return 0;
536  }
537 
538  int tvid = TVID(tv);
539 
540  // If its a stock truth value, there is nothing to do.
541  if (tvid <= 4) return tvid;
542 
543  // Use the TLB Handle as the UUID.
544  char tvidbuff[BUFSZ];
545  snprintf(tvidbuff, BUFSZ, "%u", tvid);
546 
547  bool update = tvExists(tvid);
548  if (update)
549  {
550  cols = "UPDATE SimpleTVs SET ";
551  vals = "";
552  coda = " WHERE tvid = ";
553  coda += tvidbuff;
554  coda += ";";
555  }
556  else
557  {
558  cols = "INSERT INTO SimpleTVs (";
559  vals = ") VALUES (";
560  coda = ");";
561  STMT("tvid", tvidbuff);
562  }
563 
564  STMTF("mean", tv.getMean());
565  STMTF("count", tv.getCount());
566 
567  std::string qry = cols + vals + coda;
568  Response rp;
569  rp.rs = db_conn->exec(qry.c_str());
570  rp.rs->release();
571 
572  return tvid;
573 }
574 
578 int AtomStorage::TVID(const TruthValue &tv)
579 {
580  if (tv == TruthValue::NULL_TV()) return 0;
581  if (tv == TruthValue::TRIVIAL_TV()) return 1;
582  if (tv == TruthValue::FALSE_TV()) return 2;
583  if (tv == TruthValue::TRUE_TV()) return 3;
584  if (tv == TruthValue::DEFAULT_TV()) return 4;
585 
586  Response rp;
587  rp.rs = db_conn->exec("SELECT NEXTVAL('tvid_seq');");
588  rp.rs->foreach_row(&Response::tvid_seq_cb, &rp);
589  rp.rs->release();
590  return rp.tvid;
591 }
592 
593 TruthValue* AtomStorage::getTV(int tvid)
594 {
595  if (0 == tvid) return (TruthValue *) & TruthValue::NULL_TV();
596  if (1 == tvid) return (TruthValue *) & TruthValue::DEFAULT_TV();
597  if (2 == tvid) return (TruthValue *) & TruthValue::FALSE_TV();
598  if (3 == tvid) return (TruthValue *) & TruthValue::TRUE_TV();
599  if (4 == tvid) return (TruthValue *) & TruthValue::TRIVIAL_TV();
600 
601  char buff[BUFSZ];
602  snprintf(buff, BUFSZ, "SELECT * FROM SimpleTVs WHERE tvid = %u;", tvid);
603 
604  Response rp;
605  rp.rs = db_conn->exec(buff);
606  rp.rs->foreach_row(&Response::create_tv_cb, &rp);
607  rp.rs->release();
608 
609  SimpleTruthValue *stv = new SimpleTruthValue(rp.mean,rp.count);
610  return stv;
611 }
612 
613 #endif /* OUT_OF_LINE_TVS */
614 
615 /* ================================================================== */
616 
626 {
627  LinkPtr l(LinkCast(atom));
628  if (NULL == l) return 0;
629 
630  int maxd = 0;
631  int arity = l->getArity();
632 
633  const HandleSeq& out = l->getOutgoingSet();
634  for (int i=0; i<arity; i++)
635  {
636  Handle h = out[i];
637  int d = get_height(h);
638  if (maxd < d) maxd = d;
639  }
640  return maxd +1;
641 }
642 
643 /* ================================================================ */
644 
645 std::string AtomStorage::oset_to_string(const std::vector<Handle>& out,
646  int arity)
647 {
648  std::string str;
649  str += "\'{";
650  for (int i=0; i<arity; i++)
651  {
652  Handle h = out[i];
653  if (i != 0) str += ", ";
654  char buff[BUFSZ];
655  UUID uuid = h.value();
656  snprintf(buff, BUFSZ, "%lu", uuid);
657  str += buff;
658  }
659  str += "}\'";
660  return str;
661 }
662 
663 /* ================================================================ */
664 
671 {
672  _write_queue.flush_queue();
673 }
674 
675 /* ================================================================ */
686 void AtomStorage::storeAtom(AtomPtr atom, bool synchronous)
687 {
688  get_ids();
689 
690  // If a synchronous store, avoid the queues entirely.
691  if (synchronous)
692  {
693  do_store_atom(atom);
694  return;
695  }
696  _write_queue.enqueue(atom);
697 }
698 
705 {
706  LinkPtr l(LinkCast(atom));
707  if (NULL == l)
708  {
709  do_store_single_atom(atom, 0);
710  return 0;
711  }
712 
713  int lheight = 0;
714  int arity = l->getArity();
715  const HandleSeq& out = l->getOutgoingSet();
716  for (int i=0; i<arity; i++)
717  {
718  // Recurse.
719  int heig = do_store_atom(out[i]);
720  if (lheight < heig) lheight = heig;
721  }
722 
723  // Height of this link is, by definition, one more than tallest
724  // atom in outgoing set.
725  lheight ++;
726  do_store_single_atom(atom, lheight);
727  return lheight;
728 }
729 
731 {
732  do_store_atom(atom);
733 }
734 
735 /* ================================================================ */
742 {
743  get_ids();
744  int height = get_height(atom);
745  do_store_single_atom(atom, height);
746 }
747 
748 void AtomStorage::do_store_single_atom(AtomPtr atom, int aheight)
749 {
750  setup_typemap();
751 
752  int notfirst = 0;
753  std::string cols;
754  std::string vals;
755  std::string coda;
756 
757  // Use the TLB Handle as the UUID.
758  char uuidbuff[BUFSZ];
759  Handle h(atom->getHandle());
760  if (TLB::isInvalidHandle(h))
761  throw RuntimeException(TRACE_INFO, "Trying to save atom with an invalid handle!");
762 
763  UUID uuid = h.value();
764  snprintf(uuidbuff, BUFSZ, "%lu", uuid);
765 
766  std::unique_lock<std::mutex> lck = maybe_create_id(uuid);
767  bool update = not lck.owns_lock();
768  if (update)
769  {
770  cols = "UPDATE Atoms SET ";
771  vals = "";
772  coda = " WHERE uuid = ";
773  coda += uuidbuff;
774  coda += ";";
775  }
776  else
777  {
778  cols = "INSERT INTO Atoms (";
779  vals = ") VALUES (";
780  coda = ");";
781 
782  STMT("uuid", uuidbuff);
783  }
784 
785  // Store the atom type and node name only if storing for the
786  // first time ever. Once an atom is in an atom table, it's
787  // name can type cannot be changed. Only its truth value can
788  // change.
789  if (false == update)
790  {
791  // Store the atomspace UUID
792  UUID asuid = 0;
793  AtomTable * at = atom->getAtomTable();
794  // We allow storage of atoms that don't belong to an atomspace.
795  if (at) asuid = at->get_uuid();
796  snprintf(uuidbuff, BUFSZ, "%lu", asuid);
797  STMT("space", uuidbuff);
798 
799  // Store the atom UUID
800  Type t = atom->getType();
801  int dbtype = storing_typemap[t];
802  STMTI("type", dbtype);
803 
804  // Store the node name, if its a node
805  NodePtr n(NodeCast(atom));
806  if (n)
807  {
808 #if 0
809  std::string qname = n->getName();
810  escape_single_quotes(qname);
811  qname.insert(0U,1U,'\'');
812  qname += "'";
813 #else
814  // Use postgres $-quoting to make unicode strings
815  // easier to deal with.
816  std::string qname = " $ocp$";
817  qname += n->getName();
818  qname += "$ocp$ ";
819 #endif
820  STMT("name", qname);
821 
822  // Nodes have a height of zero by definition.
823  STMTI("height", 0);
824  }
825  else
826  {
827  if (max_height < aheight) max_height = aheight;
828  STMTI("height", aheight);
829 
830 #ifdef USE_INLINE_EDGES
831  LinkPtr l(LinkCast(atom));
832  if (l)
833  {
834  int arity = l->getArity();
835  if (arity)
836  {
837  cols += ", outgoing";
838  vals += ", ";
839  vals += oset_to_string(l->getOutgoingSet(), arity);
840  }
841  }
842 #endif /* USE_INLINE_EDGES */
843  }
844  }
845 
846  // Store the truth value
847  TruthValuePtr tv(atom->getTruthValue());
849  if (tv) tvt = tv->getType();
850  STMTI("tv_type", tvt);
851 
852  switch (tvt)
853  {
854  case NULL_TRUTH_VALUE:
855  break;
856  case SIMPLE_TRUTH_VALUE:
857  case COUNT_TRUTH_VALUE:
859  STMTF("stv_mean", tv->getMean());
860  STMTF("stv_confidence", tv->getConfidence());
861  STMTF("stv_count", tv->getCount());
862  break;
864  {
865  IndefiniteTruthValuePtr itv = std::static_pointer_cast<IndefiniteTruthValue>(tv);
866  STMTF("stv_mean", itv->getL());
867  STMTF("stv_count", itv->getU());
868  STMTF("stv_confidence", itv->getConfidenceLevel());
869  break;
870  }
871  default:
872  throw RuntimeException(TRACE_INFO,
873  "Error: store_single: Unknown truth value type\n");
874  }
875 
876  std::string qry = cols + vals + coda;
877  ODBCConnection* db_conn = get_conn();
878  Response rp;
879  rp.rs = db_conn->exec(qry.c_str());
880  rp.rs->release();
881  put_conn(db_conn);
882 
883 #ifndef USE_INLINE_EDGES
884  // Store the outgoing handles only if we are storing for the first
885  // time, otherwise do nothing. The semantics is that, once the
886  // outgoing set has been determined, it cannot be changed.
887  if (false == update)
888  {
889  storeOutgoing(atom);
890  }
891 #endif /* USE_INLINE_EDGES */
892 
893  // Make note of the fact that this atom has been stored.
894  add_id_to_cache(uuid);
895 }
896 
897 /* ================================================================ */
926 {
927  /* Only need to set up the typemap once. */
928  if (type_map_was_loaded) return;
929  type_map_was_loaded = true;
930 
931  // If we are here, we need to reconcile the types currently in
932  // use, with a possibly pre-existing typemap. New types must be
933  // stored. So we start by loading a map from SQL (if its there).
934  //
935  // Be careful to initialize the typemap with invalid types,
936  // in case there are unexpected holes in the map!
937  for (int i=0; i< TYPEMAP_SZ; i++)
938  {
939  loading_typemap[i] = NOTYPE;
940  storing_typemap[i] = -1;
941  db_typename[i] = NULL;
942  }
943 
944  ODBCConnection* db_conn = get_conn();
945  Response rp;
946  rp.rs = db_conn->exec("SELECT * FROM TypeCodes;");
947  rp.store = this;
948  rp.rs->foreach_row(&Response::type_cb, &rp);
949  rp.rs->release();
950 
951  unsigned int numberOfTypes = classserver().getNumberOfClasses();
952  for (Type t=0; t<numberOfTypes; t++)
953  {
954  int sqid = storing_typemap[t];
955  /* If this typename is not yet known, record it */
956  if (-1 == sqid)
957  {
958  const char * tname = classserver().getTypeName(t).c_str();
959 
960  // Let the sql id be the same as the current type number,
961  // unless this sql number is already in use, in which case
962  // we need to find another, unused one. Its in use if we
963  // have a string name associated to it.
964  sqid = t;
965 
966  if ((db_typename[sqid] != NULL) &&
967  (loading_typemap[sqid] != t))
968  {
969  // Find some (any) unused type index to use in the
970  // sql table. Use the lowest unused value that we
971  // can find.
972  for (sqid = 0; sqid<TYPEMAP_SZ; sqid++)
973  {
974  if (NULL == db_typename[sqid]) break;
975  }
976 
977  if (TYPEMAP_SZ <= sqid)
978  {
979  put_conn(db_conn);
980  fprintf(stderr, "Fatal Error: type table overflow!\n");
981  abort();
982  }
983  }
984 
985  char buff[BUFSZ];
986  snprintf(buff, BUFSZ,
987  "INSERT INTO TypeCodes (type, typename) "
988  "VALUES (%d, \'%s\');",
989  sqid, tname);
990  rp.rs = db_conn->exec(buff);
991  rp.rs->release();
992  set_typemap(sqid, tname);
993  }
994  }
995  put_conn(db_conn);
996 }
997 
998 void AtomStorage::set_typemap(int dbval, const char * tname)
999 {
1000  Type realtype = classserver().getType(tname);
1001  loading_typemap[dbval] = realtype;
1002  storing_typemap[realtype] = dbval;
1003  if (db_typename[dbval] != NULL) free (db_typename[dbval]);
1004  db_typename[dbval] = strdup(tname);
1005 }
1006 
1007 /* ================================================================ */
1013 {
1014 #ifdef ASK_SQL_SERVER
1015  char buff[BUFSZ];
1016  UUID uuid = h.value();
1017  snprintf(buff, BUFSZ, "SELECT uuid FROM Atoms WHERE uuid = %lu;", uuid);
1018  return idExists(buff);
1019 #else
1020  std::unique_lock<std::mutex> lock(id_cache_mutex);
1021  // look at the local cache of id's to see if the atom is in storage or not.
1022  return local_id_cache.count(h.value());
1023 #endif
1024 }
1025 
1031 {
1032  std::unique_lock<std::mutex> lock(id_cache_mutex);
1033  local_id_cache.insert(uuid);
1034 
1035  // If we were previously making this ID, then we are done.
1036  // The other half of this is in maybe_create_id() below.
1037  if (0 < id_create_cache.count(uuid))
1038  {
1039  id_create_cache.erase(uuid);
1040  }
1041 }
1042 
1053 std::unique_lock<std::mutex> AtomStorage::maybe_create_id(UUID uuid)
1054 {
1055  std::unique_lock<std::mutex> create_lock(id_create_mutex);
1056  std::unique_lock<std::mutex> cache_lock(id_cache_mutex);
1057  // Look at the local cache of id's to see if the atom is in storage or not.
1058  if (0 < local_id_cache.count(uuid))
1059  return std::unique_lock<std::mutex>();
1060 
1061  // Is some other thread in the process of adding this ID?
1062  if (0 < id_create_cache.count(uuid))
1063  {
1064  cache_lock.unlock();
1065  while (true)
1066  {
1067  // If we are here, some other thread is making this UUID,
1068  // and so we need to wait till they're done. Wait by stalling
1069  // on the creation lock.
1070  std::unique_lock<std::mutex> local_create_lock(id_create_mutex);
1071  // If we are here, then someone finished creating some UUID.
1072  // Was it our ID? If so, we are done; if not, wait some more.
1073  cache_lock.lock();
1074  if (0 == id_create_cache.count(uuid))
1075  {
1076  OC_ASSERT(0 < local_id_cache.count(uuid),
1077  "Atom for UUID was not created!");
1078  return std::unique_lock<std::mutex>();
1079  }
1080  cache_lock.unlock();
1081  }
1082  }
1083 
1084  // If we are here, then no one has attempted to make this UUID before.
1085  // Grab the maker lock, and make the damned thing already.
1086  id_create_cache.insert(uuid);
1087  return create_lock;
1088 }
1089 
1093 void AtomStorage::get_ids(void)
1094 {
1095  std::unique_lock<std::mutex> lock(id_cache_mutex);
1096 
1097  if (local_id_cache_is_inited) return;
1098  local_id_cache_is_inited = true;
1099 
1100  local_id_cache.clear();
1101  ODBCConnection* db_conn = get_conn();
1102 
1103  // It appears that, when the select statment returns more than
1104  // about a 100K to a million atoms or so, some sort of heap
1105  // corruption occurs in the odbc code, causing future mallocs
1106  // to fail. So limit the number of records processed in one go.
1107  // It also appears that asking for lots of records increases
1108  // the memory fragmentation (and/or there's a memory leak in odbc??)
1109 #define USTEP 12003
1110  unsigned long rec;
1111  unsigned long max_nrec = getMaxObservedUUID();
1112  for (rec = 0; rec <= max_nrec; rec += USTEP)
1113  {
1114  char buff[BUFSZ];
1115  snprintf(buff, BUFSZ, "SELECT uuid FROM Atoms WHERE "
1116  "uuid > %lu AND uuid <= %lu;",
1117  rec, rec+USTEP);
1118 
1119  Response rp;
1120  rp.id_set = &local_id_cache;
1121  rp.rs = db_conn->exec(buff);
1122  rp.rs->foreach_row(&Response::note_id_cb, &rp);
1123  rp.rs->release();
1124  }
1125  put_conn(db_conn);
1126 }
1127 
1128 /* ================================================================ */
1129 
1130 #ifndef USE_INLINE_EDGES
1131 void AtomStorage::getOutgoing(std::vector<Handle> &outv, Handle h)
1132 {
1133  char buff[BUFSZ];
1134  UUID uuid = h.value();
1135  snprintf(buff, BUFSZ, "SELECT * FROM Edges WHERE src_uuid = %lu;", uuid);
1136 
1137  ODBCConnection* db_conn = get_conn();
1138  Response rp;
1139  rp.rs = db_conn->exec(buff);
1140  rp.outvec = &outv;
1141  rp.rs->foreach_row(&Response::create_edge_cb, &rp);
1142  rp.rs->release();
1143  put_conn(db_conn);
1144 }
1145 #endif /* USE_INLINE_EDGES */
1146 
1147 /* ================================================================ */
1148 
1149 /* One-size-fits-all atom fetcher */
1150 AtomPtr AtomStorage::getAtom(const char * query, int height)
1151 {
1152  ODBCConnection* db_conn = get_conn();
1153  Response rp;
1154  rp.handle = Handle::UNDEFINED;
1155  rp.rs = db_conn->exec(query);
1156  rp.rs->foreach_row(&Response::create_atom_cb, &rp);
1157 
1158  // Did we actually find anything?
1159  // DO NOT USE TLB::IsInvalidHandle() HERE! It won't work, duhh!
1160  if (rp.handle.value() == Handle::UNDEFINED.value())
1161  {
1162  rp.rs->release();
1163  put_conn(db_conn);
1164  return NULL;
1165  }
1166 
1167  rp.height = height;
1168  AtomPtr atom(makeAtom(rp, rp.handle));
1169  rp.rs->release();
1170  put_conn(db_conn);
1171  return atom;
1172 }
1173 
1182 {
1183  setup_typemap();
1184  char buff[BUFSZ];
1185  UUID uuid = h.value();
1186  snprintf(buff, BUFSZ, "SELECT * FROM Atoms WHERE uuid = %lu;", uuid);
1187 
1188  return getAtom(buff, -1);
1189 }
1190 
1194 std::vector<Handle> AtomStorage::getIncomingSet(Handle h)
1195 {
1196  std::vector<Handle> iset;
1197 
1198  setup_typemap();
1199  char buff[BUFSZ];
1200  UUID uuid = h.value();
1201  snprintf(buff, BUFSZ,
1202  "SELECT * FROM Atoms WHERE outgoing @> ARRAY[CAST(%lu AS BIGINT)];", uuid);
1203 
1204  // Note: "select * from atoms where outgoing@>array[556];" will return
1205  // all links with atom 556 in the outgoing set -- i.e. the incoming set of 556.
1206  // Could also use && here instead of @> Don't know if one is faster or not.
1207  // The cast to BIGINT is needed, as otherwise on gets
1208  // ERROR: operator does not exist: bigint[] @> integer[]
1209 
1210  ODBCConnection* db_conn = get_conn();
1211  Response rp;
1212  rp.store = this;
1213  rp.height = -1;
1214  rp.hvec = &iset;
1215  rp.rs = db_conn->exec(buff);
1216  rp.rs->foreach_row(&Response::fetch_incoming_set_cb, &rp);
1217  rp.rs->release();
1218  put_conn(db_conn);
1219 
1220  return iset;
1221 }
1222 
1233 NodePtr AtomStorage::getNode(Type t, const char * str)
1234 {
1235  setup_typemap();
1236  char buff[40*BUFSZ];
1237 
1238  // Use postgres $-quoting to make unicode strings easier to deal with.
1239  int nc = snprintf(buff, 4*BUFSZ, "SELECT * FROM Atoms WHERE "
1240  "type = %hu AND name = $ocp$%s$ocp$ ;", storing_typemap[t], str);
1241 
1242  if (40*BUFSZ-1 <= nc)
1243  {
1244  fprintf(stderr, "Error: AtomStorage::getNode: buffer overflow!\n");
1245  buff[40*BUFSZ-1] = 0x0;
1246  fprintf(stderr, "\tnc=%d buffer=>>%s<<\n", nc, buff);
1247  return NULL;
1248  }
1249 
1250  return NodeCast(getAtom(buff, 0));
1251 }
1252 
1263 LinkPtr AtomStorage::getLink(Type t, const std::vector<Handle>&oset)
1264 {
1265  setup_typemap();
1266 
1267  char buff[BUFSZ];
1268  snprintf(buff, BUFSZ,
1269  "SELECT * FROM Atoms WHERE type = %hu AND outgoing = ",
1270  storing_typemap[t]);
1271 
1272  std::string ostr = buff;
1273  ostr += oset_to_string(oset, oset.size());
1274  ostr += ";";
1275 
1276  AtomPtr atom = getAtom(ostr.c_str(), 1);
1277  return LinkCast(atom);
1278 }
1279 
1283 AtomPtr AtomStorage::makeAtom(Response &rp, Handle h)
1284 {
1285  // Now that we know everything about an atom, actually construct one.
1286  AtomPtr atom(h);
1287  Type realtype = loading_typemap[rp.itype];
1288 
1289  if (NOTYPE == realtype)
1290  {
1291  throw RuntimeException(TRACE_INFO,
1292  "Fatal Error: OpenCog does not have a type called %s\n",
1293  db_typename[rp.itype]);
1294  return NULL;
1295  }
1296 
1297  if (NULL == atom)
1298  {
1299  // All height zero atoms are nodes,
1300  // All positive height atoms are links.
1301  // A negative height is "unknown" and must be checked.
1302  if ((0 == rp.height) ||
1303  ((-1 == rp.height) &&
1304  classserver().isA(realtype, NODE)))
1305  {
1306  atom = createNode(realtype, rp.name);
1307  }
1308  else
1309  {
1310  std::vector<Handle> outvec;
1311 #ifndef USE_INLINE_EDGES
1312  getOutgoing(outvec, h);
1313 #else
1314  char *p = (char *) rp.outlist;
1315  while (p)
1316  {
1317  // Break if there is no more atom in the outgoing set
1318  // or the outgoing set is empty in the first place
1319  if (*p == '}' or *p == '\0') break;
1320  Handle hout = (Handle) strtoul(p+1, &p, 10);
1321  outvec.push_back(hout);
1322  }
1323 #endif /* USE_INLINE_EDGES */
1324  atom = createLink(realtype, outvec);
1325  }
1326 
1327  // Create via the factory for specific types of atom
1328  // Otherwise at the later stage of the sql-load process when the
1329  // same atom is being added to the AtomTable (which will also call
1330  // the same factory function), the dynamic-casting of any atom of
1331  // one of these types will fail, resulting a new atom being created.
1332  // But the new atom is having a different UUID, if there exist another
1333  // link connecting to this atom, the system will fail to find the
1334  // correct handle of this atom (because it is using the original
1335  // UUID, as retrieve from the SQL database). Since each of the atoms
1336  // in the outgoing set of a link needs to be valid, we will get
1337  // an "Atom in outgoing set isn't known!" error as a result
1338  atom = AtomTable::factory(realtype, atom);
1339  }
1340  else
1341  {
1342  // Perform at least some basic sanity checking ...
1343  if (realtype != atom->getType())
1344  {
1345  UUID uuid = h.value();
1346  throw RuntimeException(TRACE_INFO,
1347  "Fatal Error: mismatched atom type for existing atom! "
1348  "uuid=%lu real=%d atom=%d\n",
1349  uuid, realtype, atom->getType());
1350  }
1351  // If we are here, and the atom uuid is set, then it should match.
1352  if (Handle::UNDEFINED.value() != atom->_uuid and
1353  atom->_uuid != h.value())
1354  {
1355  throw RuntimeException(TRACE_INFO,
1356  "Fatal Error: mismatched handle and atom UUID's, atom=%lu handle=%lu",
1357  atom->_uuid, h.value());
1358  }
1359  }
1360 
1361  // Give the atom the correct UUID. The AtomTable will need this.
1362  atom->_uuid = h.value();
1363 
1364  // Now get the truth value
1365  switch (rp.tv_type)
1366  {
1367  case NULL_TRUTH_VALUE:
1368  break;
1369 
1370  case SIMPLE_TRUTH_VALUE:
1371  {
1372  TruthValuePtr stv(SimpleTruthValue::createTV(rp.mean, rp.count));
1373  atom->setTruthValue(stv);
1374  break;
1375  }
1376  case COUNT_TRUTH_VALUE:
1377  {
1378  TruthValuePtr ctv(CountTruthValue::createTV(rp.mean, rp.confidence, rp.count));
1379  atom->setTruthValue(ctv);
1380  break;
1381  }
1383  {
1384  TruthValuePtr itv(IndefiniteTruthValue::createTV(rp.mean, rp.count, rp.confidence));
1385  atom->setTruthValue(itv);
1386  break;
1387  }
1389  {
1390  TruthValuePtr ptv(ProbabilisticTruthValue::createTV(rp.mean, rp.confidence, rp.count));
1391  atom->setTruthValue(ptv);
1392  break;
1393  }
1394  default:
1395  throw RuntimeException(TRACE_INFO,
1396  "Error: makeAtom: Unknown truth value type\n");
1397  }
1398 
1399  load_count ++;
1400  if (load_count%10000 == 0)
1401  {
1402  fprintf(stderr, "\tLoaded %lu atoms.\n", (unsigned long) load_count);
1403  }
1404 
1405  add_id_to_cache(h.value());
1406  return atom;
1407 }
1408 
1409 /* ================================================================ */
1410 
1411 void AtomStorage::load(AtomTable &table)
1412 {
1413  unsigned long max_nrec = getMaxObservedUUID();
1414  TLB::reserve_upto(max_nrec);
1415  fprintf(stderr, "Max observed UUID is %lu\n", max_nrec);
1416  load_count = 0;
1418  fprintf(stderr, "Max Height is %d\n", max_height);
1419 
1420  setup_typemap();
1421 
1422  ODBCConnection* db_conn = get_conn();
1423  Response rp;
1424  rp.table = &table;
1425  rp.store = this;
1426 
1427  for (int hei=0; hei<=max_height; hei++)
1428  {
1429  unsigned long cur = load_count;
1430 
1431 #if GET_ONE_BIG_BLOB
1432  char buff[BUFSZ];
1433  snprintf(buff, BUFSZ, "SELECT * FROM Atoms WHERE height = %d;", hei);
1434  rp.height = hei;
1435  rp.rs = db_conn->exec(buff);
1436  rp.rs->foreach_row(&Response::load_all_atoms_cb, &rp);
1437  rp.rs->release();
1438 #else
1439  // It appears that, when the select statment returns more than
1440  // about a 100K to a million atoms or so, some sort of heap
1441  // corruption occurs in the iodbc code, causing future mallocs
1442  // to fail. So limit the number of records processed in one go.
1443  // It also appears that asking for lots of records increases
1444  // the memory fragmentation (and/or there's a memory leak in iodbc??)
1445  // XXX Not clear is UnixODBC suffers from this same problem.
1446 #define STEP 12003
1447  unsigned long rec;
1448  for (rec = 0; rec <= max_nrec; rec += STEP)
1449  {
1450  char buff[BUFSZ];
1451  snprintf(buff, BUFSZ, "SELECT * FROM Atoms WHERE "
1452  "height = %d AND uuid > %lu AND uuid <= %lu;",
1453  hei, rec, rec+STEP);
1454  rp.height = hei;
1455  rp.rs = db_conn->exec(buff);
1456  rp.rs->foreach_row(&Response::load_all_atoms_cb, &rp);
1457  rp.rs->release();
1458  }
1459 #endif
1460  fprintf(stderr, "Loaded %lu atoms at height %d\n", load_count - cur, hei);
1461  }
1462  put_conn(db_conn);
1463  fprintf(stderr, "Finished loading %lu atoms in total\n",
1464  (unsigned long) load_count);
1465 
1466  // synchrnonize!
1467  table.barrier();
1468 }
1469 
1470 void AtomStorage::loadType(AtomTable &table, Type atom_type)
1471 {
1472  unsigned long max_nrec = getMaxObservedUUID();
1473  TLB::reserve_upto(max_nrec);
1474  logger().debug("AtomStorage::loadType: Max observed UUID is %lu\n", max_nrec);
1475  load_count = 0;
1476 
1477  // For links, assume a worst-case height.
1478  // For nodes, its easy ... max_height is zero.
1479  if (classserver().isNode(atom_type))
1480  max_height = 0;
1481  else
1483  logger().debug("AtomStorage::loadType: Max Height is %d\n", max_height);
1484 
1485  setup_typemap();
1486  int db_atom_type = storing_typemap[atom_type];
1487 
1488  ODBCConnection* db_conn = get_conn();
1489  Response rp;
1490  rp.table = &table;
1491  rp.store = this;
1492 
1493  for (int hei=0; hei<=max_height; hei++)
1494  {
1495  unsigned long cur = load_count;
1496 
1497 #if GET_ONE_BIG_BLOB
1498  char buff[BUFSZ];
1499  snprintf(buff, BUFSZ,
1500  "SELECT * FROM Atoms WHERE height = %d AND type = %d;",
1501  hei, db_atom_type);
1502  rp.height = hei;
1503  rp.rs = db_conn->exec(buff);
1504  rp.rs->foreach_row(&Response::load_if_not_exists_cb, &rp);
1505  rp.rs->release();
1506 #else
1507  // It appears that, when the select statment returns more than
1508  // about a 100K to a million atoms or so, some sort of heap
1509  // corruption occurs in the iodbc code, causing future mallocs
1510  // to fail. So limit the number of records processed in one go.
1511  // It also appears that asking for lots of records increases
1512  // the memory fragmentation (and/or there's a memory leak in iodbc??)
1513  // XXX Not clear is UnixODBC suffers from this same problem.
1514 #define STEP 12003
1515  unsigned long rec;
1516  for (rec = 0; rec <= max_nrec; rec += STEP)
1517  {
1518  char buff[BUFSZ];
1519  snprintf(buff, BUFSZ, "SELECT * FROM Atoms WHERE type = %d "
1520  "AND height = %d AND uuid > %lu AND uuid <= %lu;",
1521  db_atom_type, hei, rec, rec+STEP);
1522  rp.height = hei;
1523  rp.rs = db_conn->exec(buff);
1524  rp.rs->foreach_row(&Response::load_if_not_exists_cb, &rp);
1525  rp.rs->release();
1526  }
1527 #endif
1528  logger().debug("AtomStorage::loadType: Loaded %lu atoms of type %d at height %d\n",
1529  load_count - cur, db_atom_type, hei);
1530  }
1531  put_conn(db_conn);
1532  logger().debug("AtomStorage::loadType: Finished loading %lu atoms in total\n",
1533  (unsigned long) load_count);
1534 
1535  // Synchronize!
1536  table.barrier();
1537 }
1538 
1539 bool AtomStorage::store_cb(AtomPtr atom)
1540 {
1541  storeSingleAtom(atom);
1542  store_count ++;
1543  if (store_count%1000 == 0)
1544  {
1545  fprintf(stderr, "\tStored %lu atoms.\n", (unsigned long) store_count);
1546  }
1547  return false;
1548 }
1549 
1550 void AtomStorage::store(const AtomTable &table)
1551 {
1552  max_height = 0;
1553  store_count = 0;
1554 
1555 #ifdef ALTER
1556  rename_tables();
1557  create_tables();
1558 #endif
1559 
1560  get_ids();
1561  UUID max_uuid = TLB::getMaxUUID();
1562  fprintf(stderr, "Max UUID is %lu\n", max_uuid);
1563 
1564  setup_typemap();
1565 
1566  ODBCConnection* db_conn = get_conn();
1567  Response rp;
1568 
1569 #ifndef USE_INLINE_EDGES
1570  // Drop indexes, for faster loading.
1571  // But this only matters for the non-inline eges...
1572  rp.rs = db_conn->exec("DROP INDEX uuid_idx;");
1573  rp.rs->release();
1574  rp.rs = db_conn->exec("DROP INDEX src_idx;");
1575  rp.rs->release();
1576 #endif
1577 
1578  table.foreachHandleByType(
1579  [&](Handle h)->void { store_cb(h); }, ATOM, true);
1580 
1581 #ifndef USE_INLINE_EDGES
1582  // Create indexes
1583  rp.rs = db_conn->exec("CREATE INDEX uuid_idx ON Atoms (uuid);");
1584  rp.rs->release();
1585  rp.rs = db_conn->exec("CREATE INDEX src_idx ON Edges (src_uuid);");
1586  rp.rs->release();
1587 #endif /* USE_INLINE_EDGES */
1588 
1589  rp.rs = db_conn->exec("VACUUM ANALYZE;");
1590  rp.rs->release();
1591  put_conn(db_conn);
1592 
1594  fprintf(stderr, "\tFinished storing %lu atoms total.\n",
1595  (unsigned long) store_count);
1596 }
1597 
1598 /* ================================================================ */
1599 
1600 void AtomStorage::rename_tables(void)
1601 {
1602  ODBCConnection* db_conn = get_conn();
1603  Response rp;
1604 
1605  rp.rs = db_conn->exec("ALTER TABLE Atoms RENAME TO Atoms_Backup;");
1606  rp.rs->release();
1607 #ifndef USE_INLINE_EDGES
1608  rp.rs = db_conn->exec("ALTER TABLE Edges RENAME TO Edges_Backup;");
1609  rp.rs->release();
1610 #endif /* USE_INLINE_EDGES */
1611  rp.rs = db_conn->exec("ALTER TABLE Global RENAME TO Global_Backup;");
1612  rp.rs->release();
1613  rp.rs = db_conn->exec("ALTER TABLE TypeCodes RENAME TO TypeCodes_Backup;");
1614  rp.rs->release();
1615  put_conn(db_conn);
1616 }
1617 
1618 void AtomStorage::create_tables(void)
1619 {
1620  ODBCConnection* db_conn = get_conn();
1621  Response rp;
1622 
1623  // See the file "atom.sql" for detailed documentation as to the
1624  // structure of the SQL tables.
1625  rp.rs = db_conn->exec("CREATE TABLE Atoms ("
1626  "uuid BIGINT PRIMARY KEY,"
1627  "space BIGINT,"
1628  "type SMALLINT,"
1629  "type_tv SMALLINT,"
1630  "stv_mean FLOAT,"
1631  "stv_confidence FLOAT,"
1632  "stv_count FLOAT,"
1633  "height SMALLINT,"
1634  "name TEXT,"
1635  "outgoing BIGINT[]);");
1636  rp.rs->release();
1637 
1638 #ifndef USE_INLINE_EDGES
1639  rp.rs = db_conn->exec("CREATE TABLE Edges ("
1640  "src_uuid INT,"
1641  "dst_uuid INT,"
1642  "pos INT);");
1643  rp.rs->release();
1644 #endif /* USE_INLINE_EDGES */
1645 
1646  rp.rs = db_conn->exec("CREATE TABLE TypeCodes ("
1647  "type SMALLINT UNIQUE,"
1648  "typename TEXT UNIQUE);");
1649  rp.rs->release();
1650  type_map_was_loaded = false;
1651 
1652  rp.rs = db_conn->exec("CREATE TABLE Spaces ("
1653  "space BIGINT,"
1654  "parent BIGINT);");
1655  rp.rs->release();
1656 
1657  rp.rs = db_conn->exec("CREATE TABLE Global ("
1658  "max_height INT);");
1659  rp.rs->release();
1660  rp.rs = db_conn->exec("INSERT INTO Global (max_height) VALUES (0);");
1661  rp.rs->release();
1662 
1663  put_conn(db_conn);
1664 }
1665 
1671 void AtomStorage::kill_data(void)
1672 {
1673  ODBCConnection* db_conn = get_conn();
1674  Response rp;
1675 
1676  // See the file "atom.sql" for detailed documentation as to the
1677  // structure of the SQL tables.
1678  rp.rs = db_conn->exec("DELETE from Atoms;");
1679  rp.rs->release();
1680 
1681  rp.rs = db_conn->exec("UPDATE Global SET max_height = 0;");
1682  rp.rs->release();
1683  put_conn(db_conn);
1684 }
1685 
1686 /* ================================================================ */
1687 
1688 void AtomStorage::setMaxHeight(int sqmax)
1689 {
1690  // Max height of db contents can only get larger!
1691  if (max_height < sqmax) max_height = sqmax;
1692 
1693  char buff[BUFSZ];
1694  snprintf(buff, BUFSZ, "UPDATE Global SET max_height = %d;", max_height);
1695 
1696  ODBCConnection* db_conn = get_conn();
1697  Response rp;
1698  rp.rs = db_conn->exec(buff);
1699  rp.rs->release();
1700  put_conn(db_conn);
1701 }
1702 
1703 int AtomStorage::getMaxHeight(void)
1704 {
1705  ODBCConnection* db_conn = get_conn();
1706  Response rp;
1707  rp.rs = db_conn->exec("SELECT max_height FROM Global;");
1708  rp.rs->foreach_row(&Response::intval_cb, &rp);
1709  rp.rs->release();
1710  put_conn(db_conn);
1711  return rp.intval;
1712 }
1713 
1715 {
1716  ODBCConnection* db_conn = get_conn();
1717  Response rp;
1718  rp.intval = 0;
1719  rp.rs = db_conn->exec("SELECT uuid FROM Atoms ORDER BY uuid DESC LIMIT 1;");
1720  rp.rs->foreach_row(&Response::intval_cb, &rp);
1721  rp.rs->release();
1722  put_conn(db_conn);
1723  return rp.intval;
1724 }
1725 
1727 {
1728  ODBCConnection* db_conn = get_conn();
1729  Response rp;
1730  rp.intval = 0;
1731  rp.rs = db_conn->exec("SELECT height FROM Atoms ORDER BY height DESC LIMIT 1;");
1732  rp.rs->foreach_row(&Response::intval_cb, &rp);
1733  rp.rs->release();
1734  put_conn(db_conn);
1735  return rp.intval;
1736 }
1737 
1738 void AtomStorage::reserve(void)
1739 {
1740  UUID max_observed_id = getMaxObservedUUID();
1741  fprintf(stderr, "Reserving UUID up to %lu\n", max_observed_id);
1742  TLB::reserve_upto(max_observed_id);
1743 }
1744 
1745 #endif /* HAVE_SQL_STORAGE */
1746 /* ============================= END OF FILE ================= */
ODBCRecordSet * exec(const char *)
void storeSingleAtom(AtomPtr)
static TruthValuePtr createTV(strength_t s, confidence_t f, count_t c)
void init(const char *, const char *, const char *)
#define createLink
Definition: Link.h:269
virtual count_t getCount() const =0
a TruthValue that stores a mean and the number of observations (strength and confidence) ...
UUID _uuid
Definition: Atom.h:93
static bool isInvalidHandle(const Handle &h)
Definition: TLB.h:131
virtual TruthValueType getType() const =0
int get_height(AtomPtr)
std::atomic< unsigned long > store_count
Definition: AtomStorage.h:78
char * db_typename[TYPEMAP_SZ]
Definition: AtomStorage.h:109
std::string oset_to_string(const HandleSeq &, int)
std::vector< Handle > HandleSeq
a list of handles
Definition: Handle.h:246
std::shared_ptr< Atom > AtomPtr
Definition: Handle.h:48
void storeAtom(AtomPtr, bool synchronous=false)
void create_tables(void)
static TruthValuePtr TRUE_TV()
Definition: TruthValue.cc:59
void setup_typemap(void)
TruthValueType
Definition: TruthValue.h:63
std::shared_ptr< TruthValue > TruthValuePtr
Definition: TruthValue.h:85
static UUID getMaxUUID(void)
Definition: TLB.h:93
void load(AtomTable &)
ODBCConnection * get_conn()
void barrier(void)
Definition: AtomTable.cc:615
Type loading_typemap[TYPEMAP_SZ]
Definition: AtomStorage.h:108
std::shared_ptr< Link > LinkPtr
Definition: Atom.h:53
static void reserve_upto(UUID hi)
Definition: TLB.h:123
void vdo_store_atom(AtomPtr &)
ClassServer & classserver(ClassServerFactory *=ClassServer::createInstance)
Definition: ClassServer.cc:159
void rename_tables(void)
static NodePtr NodeCast(const Handle &h)
Definition: Node.h:113
void do_store_single_atom(AtomPtr, int)
void storeOutgoing(AtomPtr, Handle)
static const Handle UNDEFINED
Definition: Handle.h:77
virtual confidence_t getConfidence() const =0
unsigned long UUID
UUID == Universally Unique Identifier.
Definition: Handle.h:46
static TruthValuePtr NULL_TV()
Definition: TruthValue.cc:46
static TruthValuePtr createTV(strength_t s, confidence_t f, count_t c)
std::mutex id_cache_mutex
Definition: AtomStorage.h:84
bool atomExists(Handle)
AtomPtr makeAtom(Response &, Handle)
std::shared_ptr< IndefiniteTruthValue > IndefiniteTruthValuePtr
static TruthValuePtr TRIVIAL_TV()
Definition: TruthValue.cc:73
void loadType(AtomTable &, Type)
void getOutgoing(HandleSeq &, Handle)
void foreachHandleByType(Function func, Type type, bool subclass=false, bool parent=true) const
Definition: AtomTable.h:219
static TruthValuePtr createTV(strength_t mean, count_t count)
std::set< UUID > id_create_cache
Definition: AtomStorage.h:91
NodePtr getNode(Type, const char *)
static TruthValuePtr DEFAULT_TV()
Definition: TruthValue.cc:52
const std::string & getTypeName(Type type)
Definition: ClassServer.cc:148
void store(const AtomTable &)
#define createNode
Definition: Node.h:119
void escape_single_quotes(std::string &str)
Definition: odbcxx.h:136
static TruthValuePtr FALSE_TV()
Definition: TruthValue.cc:66
static LinkPtr LinkCast(const Handle &h)
Definition: Link.h:263
AtomStorage(const std::string &dbname, const std::string &username, const std::string &authentication)
AtomPtr getAtom(const char *, int)
int getMaxObservedHeight(void)
#define TYPEMAP_SZ
Definition: AtomStorage.h:106
std::atomic< unsigned long > load_count
Definition: AtomStorage.h:77
int do_store_atom(AtomPtr)
std::set< UUID > local_id_cache
Definition: AtomStorage.h:86
bool foreach_row(bool(T::*cb)(void), T *data)
Definition: odbcxx.h:102
std::shared_ptr< Node > NodePtr
Definition: Node.h:112
UUID value(void) const
Definition: Handle.h:85
async_caller< AtomStorage, AtomPtr > _write_queue
Definition: AtomStorage.h:124
static AtomPtr factory(Type atom_type, AtomPtr atom)
Definition: AtomTable.cc:269
std::unique_lock< std::mutex > maybe_create_id(UUID)
Type getType(const std::string &typeName)
Definition: ClassServer.cc:138
static TruthValuePtr createTV(TruthValuePtr tv)
UUID getMaxObservedUUID(void)
int storing_typemap[TYPEMAP_SZ]
Definition: AtomStorage.h:107
unsigned short Type
type of Atoms, represented as short integer (16 bits)
Definition: types.h:40
std::mutex id_create_mutex
Definition: AtomStorage.h:90
void set_typemap(int, const char *)
bool connected(void) const
std::vector< Handle > getIncomingSet(Handle)
void add_id_to_cache(UUID)
virtual strength_t getMean() const =0
LinkPtr getLink(Type, const std::vector< Handle > &)
bool idExists(const char *)
bool store_cb(AtomPtr)
void put_conn(ODBCConnection *)
concurrent_stack< ODBCConnection * > conn_pool
Definition: AtomStorage.h:54
void release(void)
UUID get_uuid(void)
Definition: AtomTable.h:165