dbapi.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945
  1. #-*- coding: iso-8859-1 -*-
  2. # pysqlite2/test/dbapi.py: tests for DB-API compliance
  3. #
  4. # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
  5. #
  6. # This file is part of pysqlite.
  7. #
  8. # This software is provided 'as-is', without any express or implied
  9. # warranty. In no event will the authors be held liable for any damages
  10. # arising from the use of this software.
  11. #
  12. # Permission is granted to anyone to use this software for any purpose,
  13. # including commercial applications, and to alter it and redistribute it
  14. # freely, subject to the following restrictions:
  15. #
  16. # 1. The origin of this software must not be misrepresented; you must not
  17. # claim that you wrote the original software. If you use this software
  18. # in a product, an acknowledgment in the product documentation would be
  19. # appreciated but is not required.
  20. # 2. Altered source versions must be plainly marked as such, and must not be
  21. # misrepresented as being the original software.
  22. # 3. This notice may not be removed or altered from any source distribution.
  23. import threading
  24. import unittest
  25. import sqlite3 as sqlite
  26. from test.support import TESTFN, unlink
  27. class ModuleTests(unittest.TestCase):
  28. def CheckAPILevel(self):
  29. self.assertEqual(sqlite.apilevel, "2.0",
  30. "apilevel is %s, should be 2.0" % sqlite.apilevel)
  31. def CheckThreadSafety(self):
  32. self.assertEqual(sqlite.threadsafety, 1,
  33. "threadsafety is %d, should be 1" % sqlite.threadsafety)
  34. def CheckParamStyle(self):
  35. self.assertEqual(sqlite.paramstyle, "qmark",
  36. "paramstyle is '%s', should be 'qmark'" %
  37. sqlite.paramstyle)
  38. def CheckWarning(self):
  39. self.assertTrue(issubclass(sqlite.Warning, Exception),
  40. "Warning is not a subclass of Exception")
  41. def CheckError(self):
  42. self.assertTrue(issubclass(sqlite.Error, Exception),
  43. "Error is not a subclass of Exception")
  44. def CheckInterfaceError(self):
  45. self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
  46. "InterfaceError is not a subclass of Error")
  47. def CheckDatabaseError(self):
  48. self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
  49. "DatabaseError is not a subclass of Error")
  50. def CheckDataError(self):
  51. self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
  52. "DataError is not a subclass of DatabaseError")
  53. def CheckOperationalError(self):
  54. self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
  55. "OperationalError is not a subclass of DatabaseError")
  56. def CheckIntegrityError(self):
  57. self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
  58. "IntegrityError is not a subclass of DatabaseError")
  59. def CheckInternalError(self):
  60. self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
  61. "InternalError is not a subclass of DatabaseError")
  62. def CheckProgrammingError(self):
  63. self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
  64. "ProgrammingError is not a subclass of DatabaseError")
  65. def CheckNotSupportedError(self):
  66. self.assertTrue(issubclass(sqlite.NotSupportedError,
  67. sqlite.DatabaseError),
  68. "NotSupportedError is not a subclass of DatabaseError")
  69. class ConnectionTests(unittest.TestCase):
  70. def setUp(self):
  71. self.cx = sqlite.connect(":memory:")
  72. cu = self.cx.cursor()
  73. cu.execute("create table test(id integer primary key, name text)")
  74. cu.execute("insert into test(name) values (?)", ("foo",))
  75. def tearDown(self):
  76. self.cx.close()
  77. def CheckCommit(self):
  78. self.cx.commit()
  79. def CheckCommitAfterNoChanges(self):
  80. """
  81. A commit should also work when no changes were made to the database.
  82. """
  83. self.cx.commit()
  84. self.cx.commit()
  85. def CheckRollback(self):
  86. self.cx.rollback()
  87. def CheckRollbackAfterNoChanges(self):
  88. """
  89. A rollback should also work when no changes were made to the database.
  90. """
  91. self.cx.rollback()
  92. self.cx.rollback()
  93. def CheckCursor(self):
  94. cu = self.cx.cursor()
  95. def CheckFailedOpen(self):
  96. YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
  97. with self.assertRaises(sqlite.OperationalError):
  98. con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
  99. def CheckClose(self):
  100. self.cx.close()
  101. def CheckExceptions(self):
  102. # Optional DB-API extension.
  103. self.assertEqual(self.cx.Warning, sqlite.Warning)
  104. self.assertEqual(self.cx.Error, sqlite.Error)
  105. self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
  106. self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
  107. self.assertEqual(self.cx.DataError, sqlite.DataError)
  108. self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
  109. self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
  110. self.assertEqual(self.cx.InternalError, sqlite.InternalError)
  111. self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
  112. self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
  113. def CheckInTransaction(self):
  114. # Can't use db from setUp because we want to test initial state.
  115. cx = sqlite.connect(":memory:")
  116. cu = cx.cursor()
  117. self.assertEqual(cx.in_transaction, False)
  118. cu.execute("create table transactiontest(id integer primary key, name text)")
  119. self.assertEqual(cx.in_transaction, False)
  120. cu.execute("insert into transactiontest(name) values (?)", ("foo",))
  121. self.assertEqual(cx.in_transaction, True)
  122. cu.execute("select name from transactiontest where name=?", ["foo"])
  123. row = cu.fetchone()
  124. self.assertEqual(cx.in_transaction, True)
  125. cx.commit()
  126. self.assertEqual(cx.in_transaction, False)
  127. cu.execute("select name from transactiontest where name=?", ["foo"])
  128. row = cu.fetchone()
  129. self.assertEqual(cx.in_transaction, False)
  130. def CheckInTransactionRO(self):
  131. with self.assertRaises(AttributeError):
  132. self.cx.in_transaction = True
  133. def CheckOpenWithPathLikeObject(self):
  134. """ Checks that we can successfully connect to a database using an object that
  135. is PathLike, i.e. has __fspath__(). """
  136. self.addCleanup(unlink, TESTFN)
  137. class Path:
  138. def __fspath__(self):
  139. return TESTFN
  140. path = Path()
  141. with sqlite.connect(path) as cx:
  142. cx.execute('create table test(id integer)')
  143. def CheckOpenUri(self):
  144. if sqlite.sqlite_version_info < (3, 7, 7):
  145. with self.assertRaises(sqlite.NotSupportedError):
  146. sqlite.connect(':memory:', uri=True)
  147. return
  148. self.addCleanup(unlink, TESTFN)
  149. with sqlite.connect(TESTFN) as cx:
  150. cx.execute('create table test(id integer)')
  151. with sqlite.connect('file:' + TESTFN, uri=True) as cx:
  152. cx.execute('insert into test(id) values(0)')
  153. with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx:
  154. with self.assertRaises(sqlite.OperationalError):
  155. cx.execute('insert into test(id) values(1)')
  156. @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1),
  157. 'needs sqlite versions older than 3.3.1')
  158. def CheckSameThreadErrorOnOldVersion(self):
  159. with self.assertRaises(sqlite.NotSupportedError) as cm:
  160. sqlite.connect(':memory:', check_same_thread=False)
  161. self.assertEqual(str(cm.exception), 'shared connections not available')
  162. class CursorTests(unittest.TestCase):
  163. def setUp(self):
  164. self.cx = sqlite.connect(":memory:")
  165. self.cu = self.cx.cursor()
  166. self.cu.execute(
  167. "create table test(id integer primary key, name text, "
  168. "income number, unique_test text unique)"
  169. )
  170. self.cu.execute("insert into test(name) values (?)", ("foo",))
  171. def tearDown(self):
  172. self.cu.close()
  173. self.cx.close()
  174. def CheckExecuteNoArgs(self):
  175. self.cu.execute("delete from test")
  176. def CheckExecuteIllegalSql(self):
  177. with self.assertRaises(sqlite.OperationalError):
  178. self.cu.execute("select asdf")
  179. def CheckExecuteTooMuchSql(self):
  180. with self.assertRaises(sqlite.Warning):
  181. self.cu.execute("select 5+4; select 4+5")
  182. def CheckExecuteTooMuchSql2(self):
  183. self.cu.execute("select 5+4; -- foo bar")
  184. def CheckExecuteTooMuchSql3(self):
  185. self.cu.execute("""
  186. select 5+4;
  187. /*
  188. foo
  189. */
  190. """)
  191. def CheckExecuteWrongSqlArg(self):
  192. with self.assertRaises(ValueError):
  193. self.cu.execute(42)
  194. def CheckExecuteArgInt(self):
  195. self.cu.execute("insert into test(id) values (?)", (42,))
  196. def CheckExecuteArgFloat(self):
  197. self.cu.execute("insert into test(income) values (?)", (2500.32,))
  198. def CheckExecuteArgString(self):
  199. self.cu.execute("insert into test(name) values (?)", ("Hugo",))
  200. def CheckExecuteArgStringWithZeroByte(self):
  201. self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
  202. self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
  203. row = self.cu.fetchone()
  204. self.assertEqual(row[0], "Hu\x00go")
  205. def CheckExecuteNonIterable(self):
  206. with self.assertRaises(ValueError) as cm:
  207. self.cu.execute("insert into test(id) values (?)", 42)
  208. self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
  209. def CheckExecuteWrongNoOfArgs1(self):
  210. # too many parameters
  211. with self.assertRaises(sqlite.ProgrammingError):
  212. self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
  213. def CheckExecuteWrongNoOfArgs2(self):
  214. # too little parameters
  215. with self.assertRaises(sqlite.ProgrammingError):
  216. self.cu.execute("insert into test(id) values (?)")
  217. def CheckExecuteWrongNoOfArgs3(self):
  218. # no parameters, parameters are needed
  219. with self.assertRaises(sqlite.ProgrammingError):
  220. self.cu.execute("insert into test(id) values (?)")
  221. def CheckExecuteParamList(self):
  222. self.cu.execute("insert into test(name) values ('foo')")
  223. self.cu.execute("select name from test where name=?", ["foo"])
  224. row = self.cu.fetchone()
  225. self.assertEqual(row[0], "foo")
  226. def CheckExecuteParamSequence(self):
  227. class L(object):
  228. def __len__(self):
  229. return 1
  230. def __getitem__(self, x):
  231. assert x == 0
  232. return "foo"
  233. self.cu.execute("insert into test(name) values ('foo')")
  234. self.cu.execute("select name from test where name=?", L())
  235. row = self.cu.fetchone()
  236. self.assertEqual(row[0], "foo")
  237. def CheckExecuteDictMapping(self):
  238. self.cu.execute("insert into test(name) values ('foo')")
  239. self.cu.execute("select name from test where name=:name", {"name": "foo"})
  240. row = self.cu.fetchone()
  241. self.assertEqual(row[0], "foo")
  242. def CheckExecuteDictMapping_Mapping(self):
  243. class D(dict):
  244. def __missing__(self, key):
  245. return "foo"
  246. self.cu.execute("insert into test(name) values ('foo')")
  247. self.cu.execute("select name from test where name=:name", D())
  248. row = self.cu.fetchone()
  249. self.assertEqual(row[0], "foo")
  250. def CheckExecuteDictMappingTooLittleArgs(self):
  251. self.cu.execute("insert into test(name) values ('foo')")
  252. with self.assertRaises(sqlite.ProgrammingError):
  253. self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
  254. def CheckExecuteDictMappingNoArgs(self):
  255. self.cu.execute("insert into test(name) values ('foo')")
  256. with self.assertRaises(sqlite.ProgrammingError):
  257. self.cu.execute("select name from test where name=:name")
  258. def CheckExecuteDictMappingUnnamed(self):
  259. self.cu.execute("insert into test(name) values ('foo')")
  260. with self.assertRaises(sqlite.ProgrammingError):
  261. self.cu.execute("select name from test where name=?", {"name": "foo"})
  262. def CheckClose(self):
  263. self.cu.close()
  264. def CheckRowcountExecute(self):
  265. self.cu.execute("delete from test")
  266. self.cu.execute("insert into test(name) values ('foo')")
  267. self.cu.execute("insert into test(name) values ('foo')")
  268. self.cu.execute("update test set name='bar'")
  269. self.assertEqual(self.cu.rowcount, 2)
  270. def CheckRowcountSelect(self):
  271. """
  272. pysqlite does not know the rowcount of SELECT statements, because we
  273. don't fetch all rows after executing the select statement. The rowcount
  274. has thus to be -1.
  275. """
  276. self.cu.execute("select 5 union select 6")
  277. self.assertEqual(self.cu.rowcount, -1)
  278. def CheckRowcountExecutemany(self):
  279. self.cu.execute("delete from test")
  280. self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
  281. self.assertEqual(self.cu.rowcount, 3)
  282. def CheckTotalChanges(self):
  283. self.cu.execute("insert into test(name) values ('foo')")
  284. self.cu.execute("insert into test(name) values ('foo')")
  285. self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
  286. # Checks for executemany:
  287. # Sequences are required by the DB-API, iterators
  288. # enhancements in pysqlite.
  289. def CheckExecuteManySequence(self):
  290. self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
  291. def CheckExecuteManyIterator(self):
  292. class MyIter:
  293. def __init__(self):
  294. self.value = 5
  295. def __next__(self):
  296. if self.value == 10:
  297. raise StopIteration
  298. else:
  299. self.value += 1
  300. return (self.value,)
  301. self.cu.executemany("insert into test(income) values (?)", MyIter())
  302. def CheckExecuteManyGenerator(self):
  303. def mygen():
  304. for i in range(5):
  305. yield (i,)
  306. self.cu.executemany("insert into test(income) values (?)", mygen())
  307. def CheckExecuteManyWrongSqlArg(self):
  308. with self.assertRaises(ValueError):
  309. self.cu.executemany(42, [(3,)])
  310. def CheckExecuteManySelect(self):
  311. with self.assertRaises(sqlite.ProgrammingError):
  312. self.cu.executemany("select ?", [(3,)])
  313. def CheckExecuteManyNotIterable(self):
  314. with self.assertRaises(TypeError):
  315. self.cu.executemany("insert into test(income) values (?)", 42)
  316. def CheckFetchIter(self):
  317. # Optional DB-API extension.
  318. self.cu.execute("delete from test")
  319. self.cu.execute("insert into test(id) values (?)", (5,))
  320. self.cu.execute("insert into test(id) values (?)", (6,))
  321. self.cu.execute("select id from test order by id")
  322. lst = []
  323. for row in self.cu:
  324. lst.append(row[0])
  325. self.assertEqual(lst[0], 5)
  326. self.assertEqual(lst[1], 6)
  327. def CheckFetchone(self):
  328. self.cu.execute("select name from test")
  329. row = self.cu.fetchone()
  330. self.assertEqual(row[0], "foo")
  331. row = self.cu.fetchone()
  332. self.assertEqual(row, None)
  333. def CheckFetchoneNoStatement(self):
  334. cur = self.cx.cursor()
  335. row = cur.fetchone()
  336. self.assertEqual(row, None)
  337. def CheckArraySize(self):
  338. # must default ot 1
  339. self.assertEqual(self.cu.arraysize, 1)
  340. # now set to 2
  341. self.cu.arraysize = 2
  342. # now make the query return 3 rows
  343. self.cu.execute("delete from test")
  344. self.cu.execute("insert into test(name) values ('A')")
  345. self.cu.execute("insert into test(name) values ('B')")
  346. self.cu.execute("insert into test(name) values ('C')")
  347. self.cu.execute("select name from test")
  348. res = self.cu.fetchmany()
  349. self.assertEqual(len(res), 2)
  350. def CheckFetchmany(self):
  351. self.cu.execute("select name from test")
  352. res = self.cu.fetchmany(100)
  353. self.assertEqual(len(res), 1)
  354. res = self.cu.fetchmany(100)
  355. self.assertEqual(res, [])
  356. def CheckFetchmanyKwArg(self):
  357. """Checks if fetchmany works with keyword arguments"""
  358. self.cu.execute("select name from test")
  359. res = self.cu.fetchmany(size=100)
  360. self.assertEqual(len(res), 1)
  361. def CheckFetchall(self):
  362. self.cu.execute("select name from test")
  363. res = self.cu.fetchall()
  364. self.assertEqual(len(res), 1)
  365. res = self.cu.fetchall()
  366. self.assertEqual(res, [])
  367. def CheckSetinputsizes(self):
  368. self.cu.setinputsizes([3, 4, 5])
  369. def CheckSetoutputsize(self):
  370. self.cu.setoutputsize(5, 0)
  371. def CheckSetoutputsizeNoColumn(self):
  372. self.cu.setoutputsize(42)
  373. def CheckCursorConnection(self):
  374. # Optional DB-API extension.
  375. self.assertEqual(self.cu.connection, self.cx)
  376. def CheckWrongCursorCallable(self):
  377. with self.assertRaises(TypeError):
  378. def f(): pass
  379. cur = self.cx.cursor(f)
  380. def CheckCursorWrongClass(self):
  381. class Foo: pass
  382. foo = Foo()
  383. with self.assertRaises(TypeError):
  384. cur = sqlite.Cursor(foo)
  385. def CheckLastRowIDOnReplace(self):
  386. """
  387. INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
  388. """
  389. sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
  390. for statement in ('INSERT OR REPLACE', 'REPLACE'):
  391. with self.subTest(statement=statement):
  392. self.cu.execute(sql.format(statement), (1, 'foo'))
  393. self.assertEqual(self.cu.lastrowid, 1)
  394. def CheckLastRowIDOnIgnore(self):
  395. self.cu.execute(
  396. "insert or ignore into test(unique_test) values (?)",
  397. ('test',))
  398. self.assertEqual(self.cu.lastrowid, 2)
  399. self.cu.execute(
  400. "insert or ignore into test(unique_test) values (?)",
  401. ('test',))
  402. self.assertEqual(self.cu.lastrowid, 2)
  403. def CheckLastRowIDInsertOR(self):
  404. results = []
  405. for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
  406. sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
  407. with self.subTest(statement='INSERT OR {}'.format(statement)):
  408. self.cu.execute(sql.format(statement), (statement,))
  409. results.append((statement, self.cu.lastrowid))
  410. with self.assertRaises(sqlite.IntegrityError):
  411. self.cu.execute(sql.format(statement), (statement,))
  412. results.append((statement, self.cu.lastrowid))
  413. expected = [
  414. ('FAIL', 2), ('FAIL', 2),
  415. ('ABORT', 3), ('ABORT', 3),
  416. ('ROLLBACK', 4), ('ROLLBACK', 4),
  417. ]
  418. self.assertEqual(results, expected)
  419. class ThreadTests(unittest.TestCase):
  420. def setUp(self):
  421. self.con = sqlite.connect(":memory:")
  422. self.cur = self.con.cursor()
  423. self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
  424. def tearDown(self):
  425. self.cur.close()
  426. self.con.close()
  427. def CheckConCursor(self):
  428. def run(con, errors):
  429. try:
  430. cur = con.cursor()
  431. errors.append("did not raise ProgrammingError")
  432. return
  433. except sqlite.ProgrammingError:
  434. return
  435. except:
  436. errors.append("raised wrong exception")
  437. errors = []
  438. t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  439. t.start()
  440. t.join()
  441. if len(errors) > 0:
  442. self.fail("\n".join(errors))
  443. def CheckConCommit(self):
  444. def run(con, errors):
  445. try:
  446. con.commit()
  447. errors.append("did not raise ProgrammingError")
  448. return
  449. except sqlite.ProgrammingError:
  450. return
  451. except:
  452. errors.append("raised wrong exception")
  453. errors = []
  454. t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  455. t.start()
  456. t.join()
  457. if len(errors) > 0:
  458. self.fail("\n".join(errors))
  459. def CheckConRollback(self):
  460. def run(con, errors):
  461. try:
  462. con.rollback()
  463. errors.append("did not raise ProgrammingError")
  464. return
  465. except sqlite.ProgrammingError:
  466. return
  467. except:
  468. errors.append("raised wrong exception")
  469. errors = []
  470. t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  471. t.start()
  472. t.join()
  473. if len(errors) > 0:
  474. self.fail("\n".join(errors))
  475. def CheckConClose(self):
  476. def run(con, errors):
  477. try:
  478. con.close()
  479. errors.append("did not raise ProgrammingError")
  480. return
  481. except sqlite.ProgrammingError:
  482. return
  483. except:
  484. errors.append("raised wrong exception")
  485. errors = []
  486. t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  487. t.start()
  488. t.join()
  489. if len(errors) > 0:
  490. self.fail("\n".join(errors))
  491. def CheckCurImplicitBegin(self):
  492. def run(cur, errors):
  493. try:
  494. cur.execute("insert into test(name) values ('a')")
  495. errors.append("did not raise ProgrammingError")
  496. return
  497. except sqlite.ProgrammingError:
  498. return
  499. except:
  500. errors.append("raised wrong exception")
  501. errors = []
  502. t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  503. t.start()
  504. t.join()
  505. if len(errors) > 0:
  506. self.fail("\n".join(errors))
  507. def CheckCurClose(self):
  508. def run(cur, errors):
  509. try:
  510. cur.close()
  511. errors.append("did not raise ProgrammingError")
  512. return
  513. except sqlite.ProgrammingError:
  514. return
  515. except:
  516. errors.append("raised wrong exception")
  517. errors = []
  518. t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  519. t.start()
  520. t.join()
  521. if len(errors) > 0:
  522. self.fail("\n".join(errors))
  523. def CheckCurExecute(self):
  524. def run(cur, errors):
  525. try:
  526. cur.execute("select name from test")
  527. errors.append("did not raise ProgrammingError")
  528. return
  529. except sqlite.ProgrammingError:
  530. return
  531. except:
  532. errors.append("raised wrong exception")
  533. errors = []
  534. self.cur.execute("insert into test(name) values ('a')")
  535. t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  536. t.start()
  537. t.join()
  538. if len(errors) > 0:
  539. self.fail("\n".join(errors))
  540. def CheckCurIterNext(self):
  541. def run(cur, errors):
  542. try:
  543. row = cur.fetchone()
  544. errors.append("did not raise ProgrammingError")
  545. return
  546. except sqlite.ProgrammingError:
  547. return
  548. except:
  549. errors.append("raised wrong exception")
  550. errors = []
  551. self.cur.execute("insert into test(name) values ('a')")
  552. self.cur.execute("select name from test")
  553. t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  554. t.start()
  555. t.join()
  556. if len(errors) > 0:
  557. self.fail("\n".join(errors))
  558. class ConstructorTests(unittest.TestCase):
  559. def CheckDate(self):
  560. d = sqlite.Date(2004, 10, 28)
  561. def CheckTime(self):
  562. t = sqlite.Time(12, 39, 35)
  563. def CheckTimestamp(self):
  564. ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
  565. def CheckDateFromTicks(self):
  566. d = sqlite.DateFromTicks(42)
  567. def CheckTimeFromTicks(self):
  568. t = sqlite.TimeFromTicks(42)
  569. def CheckTimestampFromTicks(self):
  570. ts = sqlite.TimestampFromTicks(42)
  571. def CheckBinary(self):
  572. b = sqlite.Binary(b"\0'")
  573. class ExtensionTests(unittest.TestCase):
  574. def CheckScriptStringSql(self):
  575. con = sqlite.connect(":memory:")
  576. cur = con.cursor()
  577. cur.executescript("""
  578. -- bla bla
  579. /* a stupid comment */
  580. create table a(i);
  581. insert into a(i) values (5);
  582. """)
  583. cur.execute("select i from a")
  584. res = cur.fetchone()[0]
  585. self.assertEqual(res, 5)
  586. def CheckScriptSyntaxError(self):
  587. con = sqlite.connect(":memory:")
  588. cur = con.cursor()
  589. with self.assertRaises(sqlite.OperationalError):
  590. cur.executescript("create table test(x); asdf; create table test2(x)")
  591. def CheckScriptErrorNormal(self):
  592. con = sqlite.connect(":memory:")
  593. cur = con.cursor()
  594. with self.assertRaises(sqlite.OperationalError):
  595. cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
  596. def CheckCursorExecutescriptAsBytes(self):
  597. con = sqlite.connect(":memory:")
  598. cur = con.cursor()
  599. with self.assertRaises(ValueError) as cm:
  600. cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
  601. self.assertEqual(str(cm.exception), 'script argument must be unicode.')
  602. def CheckConnectionExecute(self):
  603. con = sqlite.connect(":memory:")
  604. result = con.execute("select 5").fetchone()[0]
  605. self.assertEqual(result, 5, "Basic test of Connection.execute")
  606. def CheckConnectionExecutemany(self):
  607. con = sqlite.connect(":memory:")
  608. con.execute("create table test(foo)")
  609. con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
  610. result = con.execute("select foo from test order by foo").fetchall()
  611. self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
  612. self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
  613. def CheckConnectionExecutescript(self):
  614. con = sqlite.connect(":memory:")
  615. con.executescript("create table test(foo); insert into test(foo) values (5);")
  616. result = con.execute("select foo from test").fetchone()[0]
  617. self.assertEqual(result, 5, "Basic test of Connection.executescript")
  618. class ClosedConTests(unittest.TestCase):
  619. def CheckClosedConCursor(self):
  620. con = sqlite.connect(":memory:")
  621. con.close()
  622. with self.assertRaises(sqlite.ProgrammingError):
  623. cur = con.cursor()
  624. def CheckClosedConCommit(self):
  625. con = sqlite.connect(":memory:")
  626. con.close()
  627. with self.assertRaises(sqlite.ProgrammingError):
  628. con.commit()
  629. def CheckClosedConRollback(self):
  630. con = sqlite.connect(":memory:")
  631. con.close()
  632. with self.assertRaises(sqlite.ProgrammingError):
  633. con.rollback()
  634. def CheckClosedCurExecute(self):
  635. con = sqlite.connect(":memory:")
  636. cur = con.cursor()
  637. con.close()
  638. with self.assertRaises(sqlite.ProgrammingError):
  639. cur.execute("select 4")
  640. def CheckClosedCreateFunction(self):
  641. con = sqlite.connect(":memory:")
  642. con.close()
  643. def f(x): return 17
  644. with self.assertRaises(sqlite.ProgrammingError):
  645. con.create_function("foo", 1, f)
  646. def CheckClosedCreateAggregate(self):
  647. con = sqlite.connect(":memory:")
  648. con.close()
  649. class Agg:
  650. def __init__(self):
  651. pass
  652. def step(self, x):
  653. pass
  654. def finalize(self):
  655. return 17
  656. with self.assertRaises(sqlite.ProgrammingError):
  657. con.create_aggregate("foo", 1, Agg)
  658. def CheckClosedSetAuthorizer(self):
  659. con = sqlite.connect(":memory:")
  660. con.close()
  661. def authorizer(*args):
  662. return sqlite.DENY
  663. with self.assertRaises(sqlite.ProgrammingError):
  664. con.set_authorizer(authorizer)
  665. def CheckClosedSetProgressCallback(self):
  666. con = sqlite.connect(":memory:")
  667. con.close()
  668. def progress(): pass
  669. with self.assertRaises(sqlite.ProgrammingError):
  670. con.set_progress_handler(progress, 100)
  671. def CheckClosedCall(self):
  672. con = sqlite.connect(":memory:")
  673. con.close()
  674. with self.assertRaises(sqlite.ProgrammingError):
  675. con()
  676. class ClosedCurTests(unittest.TestCase):
  677. def CheckClosed(self):
  678. con = sqlite.connect(":memory:")
  679. cur = con.cursor()
  680. cur.close()
  681. for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
  682. if method_name in ("execute", "executescript"):
  683. params = ("select 4 union select 5",)
  684. elif method_name == "executemany":
  685. params = ("insert into foo(bar) values (?)", [(3,), (4,)])
  686. else:
  687. params = []
  688. with self.assertRaises(sqlite.ProgrammingError):
  689. method = getattr(cur, method_name)
  690. method(*params)
  691. class SqliteOnConflictTests(unittest.TestCase):
  692. """
  693. Tests for SQLite's "insert on conflict" feature.
  694. See https://www.sqlite.org/lang_conflict.html for details.
  695. """
  696. def setUp(self):
  697. self.cx = sqlite.connect(":memory:")
  698. self.cu = self.cx.cursor()
  699. self.cu.execute("""
  700. CREATE TABLE test(
  701. id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
  702. );
  703. """)
  704. def tearDown(self):
  705. self.cu.close()
  706. self.cx.close()
  707. def CheckOnConflictRollbackWithExplicitTransaction(self):
  708. self.cx.isolation_level = None # autocommit mode
  709. self.cu = self.cx.cursor()
  710. # Start an explicit transaction.
  711. self.cu.execute("BEGIN")
  712. self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
  713. self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
  714. with self.assertRaises(sqlite.IntegrityError):
  715. self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
  716. # Use connection to commit.
  717. self.cx.commit()
  718. self.cu.execute("SELECT name, unique_name from test")
  719. # Transaction should have rolled back and nothing should be in table.
  720. self.assertEqual(self.cu.fetchall(), [])
  721. def CheckOnConflictAbortRaisesWithExplicitTransactions(self):
  722. # Abort cancels the current sql statement but doesn't change anything
  723. # about the current transaction.
  724. self.cx.isolation_level = None # autocommit mode
  725. self.cu = self.cx.cursor()
  726. # Start an explicit transaction.
  727. self.cu.execute("BEGIN")
  728. self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
  729. self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
  730. with self.assertRaises(sqlite.IntegrityError):
  731. self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
  732. self.cx.commit()
  733. self.cu.execute("SELECT name, unique_name FROM test")
  734. # Expect the first two inserts to work, third to do nothing.
  735. self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
  736. def CheckOnConflictRollbackWithoutTransaction(self):
  737. # Start of implicit transaction
  738. self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
  739. self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
  740. with self.assertRaises(sqlite.IntegrityError):
  741. self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
  742. self.cu.execute("SELECT name, unique_name FROM test")
  743. # Implicit transaction is rolled back on error.
  744. self.assertEqual(self.cu.fetchall(), [])
  745. def CheckOnConflictAbortRaisesWithoutTransactions(self):
  746. # Abort cancels the current sql statement but doesn't change anything
  747. # about the current transaction.
  748. self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
  749. self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
  750. with self.assertRaises(sqlite.IntegrityError):
  751. self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
  752. # Make sure all other values were inserted.
  753. self.cu.execute("SELECT name, unique_name FROM test")
  754. self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
  755. def CheckOnConflictFail(self):
  756. self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
  757. with self.assertRaises(sqlite.IntegrityError):
  758. self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
  759. self.assertEqual(self.cu.fetchall(), [])
  760. def CheckOnConflictIgnore(self):
  761. self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
  762. # Nothing should happen.
  763. self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
  764. self.cu.execute("SELECT unique_name FROM test")
  765. self.assertEqual(self.cu.fetchall(), [('foo',)])
  766. def CheckOnConflictReplace(self):
  767. self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
  768. # There shouldn't be an IntegrityError exception.
  769. self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
  770. self.cu.execute("SELECT name, unique_name FROM test")
  771. self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
  772. def suite():
  773. module_suite = unittest.makeSuite(ModuleTests, "Check")
  774. connection_suite = unittest.makeSuite(ConnectionTests, "Check")
  775. cursor_suite = unittest.makeSuite(CursorTests, "Check")
  776. thread_suite = unittest.makeSuite(ThreadTests, "Check")
  777. constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
  778. ext_suite = unittest.makeSuite(ExtensionTests, "Check")
  779. closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
  780. closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
  781. on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check")
  782. return unittest.TestSuite((
  783. module_suite, connection_suite, cursor_suite, thread_suite,
  784. constructor_suite, ext_suite, closed_con_suite, closed_cur_suite,
  785. on_conflict_suite,
  786. ))
  787. def test():
  788. runner = unittest.TextTestRunner()
  789. runner.run(suite())
  790. if __name__ == "__main__":
  791. test()