flashdb_tsdb1.py 760 B

123456789101112131415161718192021222324252627282930313233
  1. import flashdb
  2. import struct
  3. import time
  4. import os
  5. DB_PATH = "test/out/fdb_tsdb"
  6. tsdb = flashdb.TSDB("env", DB_PATH, max_len=512)
  7. tic = time.time() * 1000
  8. for i in range(10):
  9. blob_i = struct.pack('i', i)
  10. time.sleep(0.001)
  11. ret = tsdb.tsl_append(blob_i)
  12. toc = time.time() * 1000
  13. assert ret == 0
  14. def callback(tsl, user_data) -> int:
  15. # print(tsl.get_time(), tsl.to_blob())
  16. t = tsl.get_time()
  17. blob_i = tsl.to_blob()
  18. i = struct.unpack('i', blob_i)[0]
  19. print(t, i, user_data)
  20. return False # False: continue, True: stop
  21. assert tsdb.tsl_iter(callback, 'user_data') == 0
  22. assert tsdb.tsl_iter_reverse(callback, 'user_data_reverse') == 0
  23. assert tsdb.tsl_iter_by_time(tic, toc, callback, 'user_data_by_time') == 0
  24. print('PASS')