29 #ifdef HAVE_SQL_STORAGE
38 #include <opencog/util/oc_assert.h>
53 using namespace opencog;
55 #define USE_INLINE_EDGES
68 class AtomStorage::Response
91 bool create_atom_column_cb(
const char *colname,
const char * colvalue)
94 if (!strcmp(colname,
"type"))
96 itype = atoi(colvalue);
98 else if (!strcmp(colname,
"name"))
102 else if (!strcmp(colname,
"outgoing"))
106 if (!strcmp(colname,
"tv_type"))
108 tv_type = atoi(colvalue);
110 else if (!strcmp(colname,
"stv_mean"))
112 mean = atof(colvalue);
114 else if (!strcmp(colname,
"stv_confidence"))
116 confidence = atof(colvalue);
118 else if (!strcmp(colname,
"stv_count"))
120 count = atof(colvalue);
122 else if (!strcmp(colname,
"uuid"))
124 UUID uuid = strtoul(colvalue, NULL, 10);
129 bool create_atom_cb(
void)
132 rs->foreach_column(&Response::create_atom_column_cb,
this);
139 bool load_all_atoms_cb(
void)
142 rs->foreach_column(&Response::create_atom_column_cb,
this);
145 table->add(
atom,
true);
154 bool load_if_not_exists_cb(
void)
157 rs->foreach_column(&Response::create_atom_column_cb,
this);
159 if (not table->holds(handle))
162 load_recursive_if_not_exists(
atom);
176 const HandleSeq& oset = link->getOutgoingSet();
179 if (table->holds(h))
continue;
181 load_recursive_if_not_exists(
a);
184 table->add(atom,
true);
187 std::vector<Handle> *hvec;
188 bool fetch_incoming_set_cb(
void)
191 rs->foreach_column(&Response::create_atom_column_cb,
this);
202 bool row_exists_cb(
void)
208 #ifndef USE_INLINE_EDGES
210 std::vector<Handle> *outvec;
214 bool create_edge_cb(
void)
217 rs->foreach_column(&Response::create_edge_column_cb,
this);
218 int sz = outvec->size();
219 if (sz <= pos) outvec->resize(pos+1);
220 outvec->at(pos) = dst;
223 bool create_edge_column_cb(
const char *colname,
const char * colvalue)
226 if (!strcmp(colname,
"dst_uuid"))
228 dst =
Handle(strtoul(colvalue, (
char **) NULL, 10));
230 else if (!strcmp(colname,
"pos"))
232 pos = atoi(colvalue);
241 rs->foreach_column(&Response::type_column_cb,
this);
242 store->set_typemap(itype, tname);
246 bool type_column_cb(
const char *colname,
const char * colvalue)
248 if (!strcmp(colname,
"type"))
250 itype = atoi(colvalue);
252 else if (!strcmp(colname,
"typename"))
258 #ifdef OUT_OF_LINE_TVS
261 bool create_tv_cb(
void)
264 rs->foreach_column(&Response::create_tv_column_cb,
this);
267 bool create_tv_column_cb(
const char *colname,
const char * colvalue)
269 printf (
"%s = %s\n", colname, colvalue);
270 if (!strcmp(colname,
"mean"))
272 mean = atof(colvalue);
274 else if (!strcmp(colname,
"count"))
276 count = atof(colvalue);
284 unsigned long intval;
287 rs->foreach_column(&Response::intval_column_cb,
this);
290 bool intval_column_cb(
const char *colname,
const char * colvalue)
293 intval = strtoul(colvalue, NULL, 10);
298 std::set<UUID> *id_set;
299 bool note_id_cb(
void)
301 rs->foreach_column(&Response::note_id_column_cb,
this);
304 bool note_id_column_cb(
const char *colname,
const char * colvalue)
307 UUID id = strtoul(colvalue, NULL, 10);
341 rp.row_exists =
false;
342 rp.rs = db_conn->
exec(buff);
346 return rp.row_exists;
352 #ifndef USE_INLINE_EDGES
357 class AtomStorage::Outgoing
370 bool each_handle (
Handle h)
375 snprintf(buff,
BUFSZ,
"INSERT INTO Edges "
376 "(src_uuid, dst_uuid, pos) VALUES (%lu, %lu, %u);",
377 src_uuid, dst_uuid, pos);
380 rp.rs = db_conn->
exec(buff);
394 Outgoing out(db_conn, h);
396 foreach_outgoing_handle(h, &Outgoing::each_handle, &out);
405 const char * username,
406 const char * authentication)
411 #define DEFAULT_NUM_CONNS 6
412 for (
int i=0; i<DEFAULT_NUM_CONNS; i++)
432 const char * username,
433 const char * authentication)
436 init(dbname, username, authentication);
439 AtomStorage::AtomStorage(
const std::string& dbname,
440 const std::string& username,
441 const std::string& authentication)
444 init(dbname.c_str(), username.c_str(), authentication.c_str());
447 AtomStorage::~AtomStorage()
472 bool have_connection = db_conn->
connected();
474 return have_connection;
479 #define STMT(colname,val) { \
481 if (notfirst) { cols += ", "; } else notfirst = 1; \
486 if (notfirst) { cols += ", "; vals += ", "; } else notfirst = 1; \
492 #define STMTI(colname,ival) { \
494 snprintf(buff, BUFSZ, "%u", ival); \
495 STMT(colname, buff); \
498 #define STMTF(colname,fval) { \
500 snprintf(buff, BUFSZ, "%12.8g", fval); \
501 STMT(colname, buff); \
506 #ifdef OUT_OF_LINE_TVS
510 bool AtomStorage::tvExists(
int tvid)
513 snprintf(buff,
BUFSZ,
"SELECT tvid FROM SimpleTVs WHERE tvid = %u;", tvid);
534 fprintf(stderr,
"Error: non-simple truth values are not handled\n");
541 if (tvid <= 4)
return tvid;
544 char tvidbuff[
BUFSZ];
545 snprintf(tvidbuff,
BUFSZ,
"%u", tvid);
547 bool update = tvExists(tvid);
550 cols =
"UPDATE SimpleTVs SET ";
552 coda =
" WHERE tvid = ";
558 cols =
"INSERT INTO SimpleTVs (";
561 STMT(
"tvid", tvidbuff);
567 std::string qry = cols + vals + coda;
569 rp.rs = db_conn->
exec(qry.c_str());
587 rp.rs = db_conn->
exec(
"SELECT NEXTVAL('tvid_seq');");
602 snprintf(buff,
BUFSZ,
"SELECT * FROM SimpleTVs WHERE tvid = %u;", tvid);
605 rp.rs = db_conn->
exec(buff);
628 if (NULL == l)
return 0;
631 int arity = l->getArity();
633 const HandleSeq& out = l->getOutgoingSet();
634 for (
int i=0; i<arity; i++)
638 if (maxd < d) maxd = d;
650 for (
int i=0; i<arity; i++)
653 if (i != 0) str +=
", ";
656 snprintf(buff,
BUFSZ,
"%lu", uuid);
714 int arity = l->getArity();
715 const HandleSeq& out = l->getOutgoingSet();
716 for (
int i=0; i<arity; i++)
720 if (lheight < heig) lheight = heig;
758 char uuidbuff[
BUFSZ];
759 Handle h(atom->getHandle());
761 throw RuntimeException(TRACE_INFO,
"Trying to save atom with an invalid handle!");
764 snprintf(uuidbuff,
BUFSZ,
"%lu", uuid);
767 bool update = not lck.owns_lock();
770 cols =
"UPDATE Atoms SET ";
772 coda =
" WHERE uuid = ";
778 cols =
"INSERT INTO Atoms (";
782 STMT(
"uuid", uuidbuff);
796 snprintf(uuidbuff,
BUFSZ,
"%lu", asuid);
797 STMT(
"space", uuidbuff);
800 Type t = atom->getType();
802 STMTI(
"type", dbtype);
809 std::string qname = n->getName();
811 qname.insert(0U,1U,
'\'');
816 std::string qname =
" $ocp$";
817 qname += n->getName();
828 STMTI(
"height", aheight);
830 #ifdef USE_INLINE_EDGES
834 int arity = l->getArity();
837 cols +=
", outgoing";
850 STMTI(
"tv_type", tvt);
859 STMTF(
"stv_mean", tv->
getMean());
866 STMTF(
"stv_mean", itv->getL());
867 STMTF(
"stv_count", itv->getU());
868 STMTF(
"stv_confidence", itv->getConfidenceLevel());
872 throw RuntimeException(TRACE_INFO,
873 "Error: store_single: Unknown truth value type\n");
876 std::string qry = cols + vals + coda;
879 rp.rs = db_conn->
exec(qry.c_str());
883 #ifndef USE_INLINE_EDGES
946 rp.rs = db_conn->
exec(
"SELECT * FROM TypeCodes;");
952 for (
Type t=0; t<numberOfTypes; t++)
977 if (TYPEMAP_SZ <= sqid)
980 fprintf(stderr,
"Fatal Error: type table overflow!\n");
986 snprintf(buff,
BUFSZ,
987 "INSERT INTO TypeCodes (type, typename) "
988 "VALUES (%d, \'%s\');",
990 rp.rs = db_conn->
exec(buff);
1014 #ifdef ASK_SQL_SERVER
1017 snprintf(buff,
BUFSZ,
"SELECT uuid FROM Atoms WHERE uuid = %lu;", uuid);
1059 return std::unique_lock<std::mutex>();
1064 cache_lock.unlock();
1077 "Atom for UUID was not created!");
1078 return std::unique_lock<std::mutex>();
1080 cache_lock.unlock();
1112 for (rec = 0; rec <= max_nrec; rec += USTEP)
1115 snprintf(buff,
BUFSZ,
"SELECT uuid FROM Atoms WHERE "
1116 "uuid > %lu AND uuid <= %lu;",
1121 rp.rs = db_conn->
exec(buff);
1130 #ifndef USE_INLINE_EDGES
1135 snprintf(buff,
BUFSZ,
"SELECT * FROM Edges WHERE src_uuid = %lu;", uuid);
1139 rp.rs = db_conn->
exec(buff);
1141 rp.rs->
foreach_row(&Response::create_edge_cb, &rp);
1155 rp.rs = db_conn->
exec(query);
1156 rp.rs->
foreach_row(&Response::create_atom_cb, &rp);
1186 snprintf(buff,
BUFSZ,
"SELECT * FROM Atoms WHERE uuid = %lu;", uuid);
1196 std::vector<Handle> iset;
1201 snprintf(buff,
BUFSZ,
1202 "SELECT * FROM Atoms WHERE outgoing @> ARRAY[CAST(%lu AS BIGINT)];", uuid);
1215 rp.rs = db_conn->
exec(buff);
1216 rp.rs->
foreach_row(&Response::fetch_incoming_set_cb, &rp);
1236 char buff[40*
BUFSZ];
1239 int nc = snprintf(buff, 4*
BUFSZ,
"SELECT * FROM Atoms WHERE "
1242 if (40*
BUFSZ-1 <= nc)
1244 fprintf(stderr,
"Error: AtomStorage::getNode: buffer overflow!\n");
1245 buff[40*
BUFSZ-1] = 0x0;
1246 fprintf(stderr,
"\tnc=%d buffer=>>%s<<\n", nc, buff);
1268 snprintf(buff,
BUFSZ,
1269 "SELECT * FROM Atoms WHERE type = %hu AND outgoing = ",
1272 std::string ostr = buff;
1289 if (NOTYPE == realtype)
1291 throw RuntimeException(TRACE_INFO,
1292 "Fatal Error: OpenCog does not have a type called %s\n",
1302 if ((0 == rp.height) ||
1303 ((-1 == rp.height) &&
1310 std::vector<Handle> outvec;
1311 #ifndef USE_INLINE_EDGES
1314 char *p = (
char *) rp.outlist;
1319 if (*p ==
'}' or *p ==
'\0')
break;
1321 outvec.push_back(hout);
1343 if (realtype != atom->getType())
1346 throw RuntimeException(TRACE_INFO,
1347 "Fatal Error: mismatched atom type for existing atom! "
1348 "uuid=%lu real=%d atom=%d\n",
1349 uuid, realtype, atom->getType());
1353 atom->_uuid != h.
value())
1355 throw RuntimeException(TRACE_INFO,
1356 "Fatal Error: mismatched handle and atom UUID's, atom=%lu handle=%lu",
1357 atom->_uuid, h.
value());
1362 atom->_uuid = h.
value();
1373 atom->setTruthValue(stv);
1379 atom->setTruthValue(ctv);
1385 atom->setTruthValue(itv);
1391 atom->setTruthValue(ptv);
1395 throw RuntimeException(TRACE_INFO,
1396 "Error: makeAtom: Unknown truth value type\n");
1402 fprintf(stderr,
"\tLoaded %lu atoms.\n", (
unsigned long)
load_count);
1415 fprintf(stderr,
"Max observed UUID is %lu\n", max_nrec);
1418 fprintf(stderr,
"Max Height is %d\n",
max_height);
1431 #if GET_ONE_BIG_BLOB
1433 snprintf(buff,
BUFSZ,
"SELECT * FROM Atoms WHERE height = %d;", hei);
1435 rp.rs = db_conn->
exec(buff);
1436 rp.rs->
foreach_row(&Response::load_all_atoms_cb, &rp);
1448 for (rec = 0; rec <= max_nrec; rec += STEP)
1451 snprintf(buff,
BUFSZ,
"SELECT * FROM Atoms WHERE "
1452 "height = %d AND uuid > %lu AND uuid <= %lu;",
1453 hei, rec, rec+STEP);
1455 rp.rs = db_conn->
exec(buff);
1456 rp.rs->
foreach_row(&Response::load_all_atoms_cb, &rp);
1460 fprintf(stderr,
"Loaded %lu atoms at height %d\n",
load_count - cur, hei);
1463 fprintf(stderr,
"Finished loading %lu atoms in total\n",
1474 logger().debug(
"AtomStorage::loadType: Max observed UUID is %lu\n", max_nrec);
1483 logger().debug(
"AtomStorage::loadType: Max Height is %d\n",
max_height);
1497 #if GET_ONE_BIG_BLOB
1499 snprintf(buff,
BUFSZ,
1500 "SELECT * FROM Atoms WHERE height = %d AND type = %d;",
1503 rp.rs = db_conn->
exec(buff);
1504 rp.rs->
foreach_row(&Response::load_if_not_exists_cb, &rp);
1516 for (rec = 0; rec <= max_nrec; rec += STEP)
1519 snprintf(buff,
BUFSZ,
"SELECT * FROM Atoms WHERE type = %d "
1520 "AND height = %d AND uuid > %lu AND uuid <= %lu;",
1521 db_atom_type, hei, rec, rec+STEP);
1523 rp.rs = db_conn->
exec(buff);
1524 rp.rs->
foreach_row(&Response::load_if_not_exists_cb, &rp);
1528 logger().debug(
"AtomStorage::loadType: Loaded %lu atoms of type %d at height %d\n",
1532 logger().debug(
"AtomStorage::loadType: Finished loading %lu atoms in total\n",
1545 fprintf(stderr,
"\tStored %lu atoms.\n", (
unsigned long)
store_count);
1562 fprintf(stderr,
"Max UUID is %lu\n", max_uuid);
1569 #ifndef USE_INLINE_EDGES
1572 rp.rs = db_conn->
exec(
"DROP INDEX uuid_idx;");
1574 rp.rs = db_conn->
exec(
"DROP INDEX src_idx;");
1581 #ifndef USE_INLINE_EDGES
1583 rp.rs = db_conn->
exec(
"CREATE INDEX uuid_idx ON Atoms (uuid);");
1585 rp.rs = db_conn->
exec(
"CREATE INDEX src_idx ON Edges (src_uuid);");
1589 rp.rs = db_conn->
exec(
"VACUUM ANALYZE;");
1594 fprintf(stderr,
"\tFinished storing %lu atoms total.\n",
1605 rp.rs = db_conn->
exec(
"ALTER TABLE Atoms RENAME TO Atoms_Backup;");
1607 #ifndef USE_INLINE_EDGES
1608 rp.rs = db_conn->
exec(
"ALTER TABLE Edges RENAME TO Edges_Backup;");
1611 rp.rs = db_conn->
exec(
"ALTER TABLE Global RENAME TO Global_Backup;");
1613 rp.rs = db_conn->
exec(
"ALTER TABLE TypeCodes RENAME TO TypeCodes_Backup;");
1625 rp.rs = db_conn->
exec(
"CREATE TABLE Atoms ("
1626 "uuid BIGINT PRIMARY KEY,"
1631 "stv_confidence FLOAT,"
1635 "outgoing BIGINT[]);");
1638 #ifndef USE_INLINE_EDGES
1639 rp.rs = db_conn->
exec(
"CREATE TABLE Edges ("
1646 rp.rs = db_conn->
exec(
"CREATE TABLE TypeCodes ("
1647 "type SMALLINT UNIQUE,"
1648 "typename TEXT UNIQUE);");
1652 rp.rs = db_conn->
exec(
"CREATE TABLE Spaces ("
1657 rp.rs = db_conn->
exec(
"CREATE TABLE Global ("
1658 "max_height INT);");
1660 rp.rs = db_conn->
exec(
"INSERT INTO Global (max_height) VALUES (0);");
1678 rp.rs = db_conn->
exec(
"DELETE from Atoms;");
1681 rp.rs = db_conn->
exec(
"UPDATE Global SET max_height = 0;");
1694 snprintf(buff,
BUFSZ,
"UPDATE Global SET max_height = %d;",
max_height);
1698 rp.rs = db_conn->
exec(buff);
1707 rp.rs = db_conn->
exec(
"SELECT max_height FROM Global;");
1719 rp.rs = db_conn->
exec(
"SELECT uuid FROM Atoms ORDER BY uuid DESC LIMIT 1;");
1731 rp.rs = db_conn->
exec(
"SELECT height FROM Atoms ORDER BY height DESC LIMIT 1;");
1741 fprintf(stderr,
"Reserving UUID up to %lu\n", max_observed_id);
ODBCRecordSet * exec(const char *)
void storeSingleAtom(AtomPtr)
static TruthValuePtr createTV(strength_t s, confidence_t f, count_t c)
void init(const char *, const char *, const char *)
virtual count_t getCount() const =0
a TruthValue that stores a mean and the number of observations (strength and confidence) ...
static bool isInvalidHandle(const Handle &h)
virtual TruthValueType getType() const =0
std::atomic< unsigned long > store_count
char * db_typename[TYPEMAP_SZ]
std::string oset_to_string(const HandleSeq &, int)
std::vector< Handle > HandleSeq
a list of handles
std::shared_ptr< Atom > AtomPtr
void storeAtom(AtomPtr, bool synchronous=false)
static TruthValuePtr TRUE_TV()
std::shared_ptr< TruthValue > TruthValuePtr
static UUID getMaxUUID(void)
ODBCConnection * get_conn()
Type loading_typemap[TYPEMAP_SZ]
std::shared_ptr< Link > LinkPtr
static void reserve_upto(UUID hi)
void vdo_store_atom(AtomPtr &)
ClassServer & classserver(ClassServerFactory *=ClassServer::createInstance)
static NodePtr NodeCast(const Handle &h)
void do_store_single_atom(AtomPtr, int)
void storeOutgoing(AtomPtr, Handle)
static const Handle UNDEFINED
virtual confidence_t getConfidence() const =0
unsigned long UUID
UUID == Universally Unique Identifier.
Type getNumberOfClasses()
static TruthValuePtr NULL_TV()
static TruthValuePtr createTV(strength_t s, confidence_t f, count_t c)
std::mutex id_cache_mutex
bool local_id_cache_is_inited
AtomPtr makeAtom(Response &, Handle)
std::shared_ptr< IndefiniteTruthValue > IndefiniteTruthValuePtr
static TruthValuePtr TRIVIAL_TV()
void loadType(AtomTable &, Type)
void getOutgoing(HandleSeq &, Handle)
void foreachHandleByType(Function func, Type type, bool subclass=false, bool parent=true) const
static TruthValuePtr createTV(strength_t mean, count_t count)
std::set< UUID > id_create_cache
NodePtr getNode(Type, const char *)
static TruthValuePtr DEFAULT_TV()
const std::string & getTypeName(Type type)
void store(const AtomTable &)
void escape_single_quotes(std::string &str)
static TruthValuePtr FALSE_TV()
static LinkPtr LinkCast(const Handle &h)
AtomStorage(const std::string &dbname, const std::string &username, const std::string &authentication)
AtomPtr getAtom(const char *, int)
int getMaxObservedHeight(void)
std::atomic< unsigned long > load_count
int do_store_atom(AtomPtr)
std::set< UUID > local_id_cache
bool foreach_row(bool(T::*cb)(void), T *data)
std::shared_ptr< Node > NodePtr
async_caller< AtomStorage, AtomPtr > _write_queue
static AtomPtr factory(Type atom_type, AtomPtr atom)
std::unique_lock< std::mutex > maybe_create_id(UUID)
Type getType(const std::string &typeName)
static TruthValuePtr createTV(TruthValuePtr tv)
UUID getMaxObservedUUID(void)
int storing_typemap[TYPEMAP_SZ]
unsigned short Type
type of Atoms, represented as short integer (16 bits)
std::mutex id_create_mutex
void set_typemap(int, const char *)
bool connected(void) const
av init(getOutgoingSet())
std::vector< Handle > getIncomingSet(Handle)
void add_id_to_cache(UUID)
virtual strength_t getMean() const =0
LinkPtr getLink(Type, const std::vector< Handle > &)
bool idExists(const char *)
void put_conn(ODBCConnection *)
concurrent_stack< ODBCConnection * > conn_pool