_flashdb_TSDB.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. #include "_flashdb_TSDB.h"
  2. #include "_flashdb_TSL.h"
  3. #include <stdio.h>
  4. #include "flashdb.h"
  5. // #include "fdb_def.h"
  6. #define PIKA_USING_FLASHDB1 1
  7. #if PIKA_USING_FLASHDB1
  8. // #include <pthread.h>
  9. #include "flashdb.h"
  10. #define FDB_LOG_TAG "[main]"
  11. #define _FDBBUFFS (Args*)obj_getPtr(self, "FDBBUFFS")
  12. #define strdup(x) strsCopy(_FDBBUFFS, x)
  13. /* TSDB object */
  14. // struct fdb_tsdb tsdb = { 0 };
  15. /* counts for simulated timestamp */
  16. // static int counts = 0;
  17. // extern void tsdb_basic_sample(fdb_tsdb_t tsdb);
  18. // extern void tsdb_type_string_sample(fdb_tsdb_t tsdb);
  19. // extern void tsdb_type_blob_sample(fdb_tsdb_t tsdb);
  20. // extern void tsdb_sample(fdb_tsdb_t tsdb);
  21. /*
  22. static void lock(fdb_db_t db)
  23. {
  24. pthread_mutex_lock((pthread_mutex_t *)db->user_data);
  25. }
  26. static void unlock(fdb_db_t db)
  27. {
  28. pthread_mutex_unlock((pthread_mutex_t *)db->user_data);
  29. }
  30. */
  31. static fdb_time_t get_time(void) {
  32. // ns to ms
  33. return pika_platform_get_tick() / 1000;
  34. }
  35. #endif
  36. /* TSDB object */
  37. typedef struct _fdb_tsdb_context {
  38. struct fdb_tsdb tsdb;
  39. pika_bool path_inited;
  40. } fdb_tsdb_context;
  41. #define _OBJ2TSDB_CONTEXT(x) ((fdb_tsdb_context*)obj_getStruct(x, "tsdbctx"))
  42. #define _OBJ2TSDB(x) (&_OBJ2TSDB_CONTEXT(x)->tsdb)
  43. typedef struct fdb_tsdb FDB_TSDB;
  44. typedef struct fdb_default_kv_node FDB_DEFAULT_KV_NODE;
  45. /*
  46. Arg* _flashdb_TSDB_blob_make(PikaObj *self, Arg* blob, Arg* value_buf, int
  47. buf_len){ return NULL;
  48. }
  49. Arg* _flashdb_TSDB_blob_read(PikaObj *self, Arg* db, Arg* blob){
  50. return NULL;
  51. }
  52. int _flashdb_TSDB_kv_del(PikaObj *self, Arg* tsdb, char* key){
  53. return 0;
  54. }
  55. Arg* _flashdb_TSDB_kv_get(PikaObj *self, Arg* tsdb, char* key){
  56. return NULL;
  57. }
  58. */
  59. int _flashdb_TSDB_tsl_append(PikaObj* self, Arg* blob_in) {
  60. fdb_err_t res = FDB_NO_ERR;
  61. FDB_TSDB* tsdb = _OBJ2TSDB(self);
  62. ArgType argt_blob_in = arg_getType(blob_in);
  63. if (argt_blob_in != ARG_TYPE_BYTES) {
  64. pika_platform_printf("blob must be bytes but got:%d", argt_blob_in);
  65. }
  66. size_t len = arg_getBytesSize(blob_in);
  67. uint8_t* bytes = (uint8_t*)arg_getBytes(blob_in);
  68. struct fdb_blob blob;
  69. blob.size = len;
  70. blob.buf = bytes;
  71. res = fdb_tsl_append(tsdb, &blob);
  72. return res;
  73. }
  74. int _flashdb_TSDB_set_default(PikaObj* self, Arg* tsdb) {
  75. return 0;
  76. }
  77. int _flashdb_TSDB_control(PikaObj* self, int cmd, Arg* arg) {
  78. return -1;
  79. }
  80. void _flashdb_TSDB_deinit(PikaObj* self) {
  81. fdb_tsdb_deinit(_OBJ2TSDB(self));
  82. }
  83. struct _kvdb_foreach_context {
  84. struct fdb_default_kv_node* def_kv_table;
  85. PikaObj* self;
  86. };
  87. void _flashdb_TSDB___init__(PikaObj* self,
  88. char* name,
  89. char* path,
  90. int max_len,
  91. Arg* user_data) {
  92. pika_platform_printf("tsdb_init \n");
  93. if (NULL == _OBJ2TSDB_CONTEXT(self)) {
  94. Args* buffs = New_strBuff();
  95. obj_setPtr(self, "FDBBUFFS", buffs);
  96. // create tsdb context if not exist
  97. fdb_tsdb_context tsdb_initial = {
  98. .tsdb = {0},
  99. .path_inited = pika_false,
  100. };
  101. obj_setStruct(self, "tsdbctx", tsdb_initial);
  102. }
  103. fdb_tsdb_context* tsdb_context = _OBJ2TSDB_CONTEXT(self);
  104. fdb_tsdb_t tsdb_this = &tsdb_context->tsdb;
  105. fdb_err_t result;
  106. if (!tsdb_context->path_inited) {
  107. pika_bool file_mode = pika_true;
  108. uint32_t sec_size = 4096, db_size = sec_size * 4;
  109. fdb_tsdb_control(tsdb_this, FDB_TSDB_CTRL_SET_SEC_SIZE, &sec_size);
  110. fdb_tsdb_control(tsdb_this, FDB_TSDB_CTRL_SET_MAX_SIZE, &db_size);
  111. /* enable file mode */
  112. fdb_tsdb_control(tsdb_this, FDB_TSDB_CTRL_SET_FILE_MODE, &file_mode);
  113. /* create database directory */
  114. pika_platform_mkdir(path, 0777);
  115. tsdb_context->path_inited = pika_true;
  116. }
  117. // int len =pikaDict_getSize(default_kv_in);
  118. result = fdb_tsdb_init(tsdb_this, strdup(name), strdup(path),
  119. pika_platform_get_tick, max_len, NULL);
  120. if (result != FDB_NO_ERR) {
  121. obj_setSysOut(self, "tsdb_init fail");
  122. obj_setErrorCode(self, result);
  123. }
  124. }
  125. void _flashdb_TSDB___del__(PikaObj* self) {
  126. Args* buffs = _FDBBUFFS;
  127. if (NULL != buffs) {
  128. args_deinit(_FDBBUFFS);
  129. }
  130. if (NULL != _OBJ2TSDB_CONTEXT(self)) {
  131. fdb_tsdb_deinit(_OBJ2TSDB(self));
  132. }
  133. }
  134. void _flashdb_TSDB_CTRL___init__(PikaObj* self) {
  135. obj_setInt(self, "SET_SEC_SIZE", FDB_TSDB_CTRL_SET_SEC_SIZE);
  136. obj_setInt(self, "GET_SEC_SIZE", FDB_TSDB_CTRL_GET_SEC_SIZE);
  137. obj_setInt(self, "SET_LOCK", FDB_TSDB_CTRL_SET_LOCK);
  138. obj_setInt(self, "SET_UNLOCK", FDB_TSDB_CTRL_SET_UNLOCK);
  139. obj_setInt(self, "SET_FILE_MODE", FDB_TSDB_CTRL_SET_FILE_MODE);
  140. obj_setInt(self, "SET_MAX_SIZE", FDB_TSDB_CTRL_SET_MAX_SIZE);
  141. obj_setInt(self, "SET_NOT_FORMAT", FDB_TSDB_CTRL_SET_NOT_FORMAT);
  142. }
  143. #define _OBJ2TSL(self) obj_getPtr(self, "TSL")
  144. #define _OBJSETTSL(self, tsl) obj_setPtr(self, "TSL", tsl)
  145. int64_t _flashdb_TSL_get_time(PikaObj* self) {
  146. fdb_tsl_t tsl = _OBJ2TSL(self);
  147. if (NULL == tsl) {
  148. return -1;
  149. }
  150. return tsl->time;
  151. }
  152. fdb_blob_t blob_alloc(fdb_blob_t blob) {
  153. uint8_t* buf = (uint8_t*)pikaMalloc(blob->saved.len + 1);
  154. if (!buf) {
  155. pika_platform_printf("alloc fail\n");
  156. return NULL;
  157. }
  158. blob->buf = buf;
  159. blob->size = blob->saved.len;
  160. return blob;
  161. }
  162. int fdb_blob_free(fdb_blob_t blob) {
  163. if (blob) {
  164. pikaFree(blob->buf, blob->size + 1);
  165. blob->buf = NULL;
  166. blob->size = 0;
  167. }
  168. return 0;
  169. }
  170. Arg* _flashdb_TSL_to_blob(PikaObj* self) {
  171. fdb_tsl_t tsl = _OBJ2TSL(self);
  172. fdb_tsdb_t tsdb = obj_getPtr(self, "tsdb");
  173. if (NULL == tsl) {
  174. return NULL;
  175. }
  176. struct fdb_blob blob;
  177. fdb_tsl_to_blob(tsl, &blob);
  178. if (NULL == blob_alloc(&blob)) {
  179. return NULL;
  180. }
  181. fdb_blob_read((fdb_db_t)tsdb, &blob);
  182. Arg* res = arg_newBytes((uint8_t*)blob.buf, blob.size);
  183. fdb_blob_free(&blob);
  184. return res;
  185. }
  186. typedef struct _tsdb_foreach_context {
  187. Arg* callback;
  188. Arg* user_data;
  189. fdb_tsdb_t tsdb;
  190. } tsdb_foreach_context;
  191. PikaObj* New__flashdb_TSDB(Args* args);
  192. pika_bool _flashdb_TSL_iter_callback(fdb_tsl_t tsl, void* arg) {
  193. tsdb_foreach_context* context = (tsdb_foreach_context*)arg;
  194. Arg* callback = context->callback;
  195. Arg* user_data = context->user_data;
  196. PikaObj* tsl_obj = newNormalObj(New__flashdb_TSL);
  197. _OBJSETTSL(tsl_obj, tsl);
  198. obj_setPtr(tsl_obj, "tsdb", context->tsdb);
  199. Arg* ret = pika_runFunction2(arg_copy(callback), arg_newObj(tsl_obj),
  200. arg_copy(user_data));
  201. if (NULL == ret) {
  202. return pika_true;
  203. }
  204. pika_bool res = arg_getBool(ret);
  205. arg_deinit(ret);
  206. return res;
  207. }
  208. static int _TSDB_iter(PikaObj* self,
  209. Arg* callback,
  210. Arg* user_data,
  211. pika_bool is_reverse) {
  212. fdb_tsdb_t tsdb = _OBJ2TSDB(self);
  213. tsdb_foreach_context context = {
  214. .callback = callback,
  215. .user_data = user_data,
  216. .tsdb = tsdb,
  217. };
  218. if (is_reverse) {
  219. fdb_tsl_iter_reverse(tsdb, _flashdb_TSL_iter_callback, &context);
  220. } else {
  221. fdb_tsl_iter(tsdb, _flashdb_TSL_iter_callback, &context);
  222. }
  223. return 0;
  224. }
  225. int _flashdb_TSDB_tsl_iter(PikaObj* self, Arg* callback, Arg* user_data) {
  226. return _TSDB_iter(self, callback, user_data, pika_false);
  227. }
  228. int _flashdb_TSDB_tsl_iter_reverse(PikaObj* self,
  229. Arg* callback,
  230. Arg* user_data) {
  231. return _TSDB_iter(self, callback, user_data, pika_true);
  232. }
  233. int _flashdb_TSDB_tsl_iter_by_time(PikaObj* self,
  234. int64_t from_time,
  235. int64_t to_time,
  236. Arg* callback,
  237. Arg* user_data) {
  238. fdb_tsdb_t tsdb = _OBJ2TSDB(self);
  239. tsdb_foreach_context context = {
  240. .callback = callback,
  241. .user_data = user_data,
  242. .tsdb = tsdb,
  243. };
  244. fdb_tsl_iter_by_time(tsdb, from_time, to_time, _flashdb_TSL_iter_callback,
  245. &context);
  246. return 0;
  247. }
  248. #undef strudp