transactions.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. #-*- coding: iso-8859-1 -*-
  2. # pysqlite2/test/transactions.py: tests transactions
  3. #
  4. # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
  5. #
  6. # This file is part of pysqlite.
  7. #
  8. # This software is provided 'as-is', without any express or implied
  9. # warranty. In no event will the authors be held liable for any damages
  10. # arising from the use of this software.
  11. #
  12. # Permission is granted to anyone to use this software for any purpose,
  13. # including commercial applications, and to alter it and redistribute it
  14. # freely, subject to the following restrictions:
  15. #
  16. # 1. The origin of this software must not be misrepresented; you must not
  17. # claim that you wrote the original software. If you use this software
  18. # in a product, an acknowledgment in the product documentation would be
  19. # appreciated but is not required.
  20. # 2. Altered source versions must be plainly marked as such, and must not be
  21. # misrepresented as being the original software.
  22. # 3. This notice may not be removed or altered from any source distribution.
  23. import os, unittest
  24. import sqlite3 as sqlite
  25. def get_db_path():
  26. return "sqlite_testdb"
  27. class TransactionTests(unittest.TestCase):
  28. def setUp(self):
  29. try:
  30. os.remove(get_db_path())
  31. except OSError:
  32. pass
  33. self.con1 = sqlite.connect(get_db_path(), timeout=0.1)
  34. self.cur1 = self.con1.cursor()
  35. self.con2 = sqlite.connect(get_db_path(), timeout=0.1)
  36. self.cur2 = self.con2.cursor()
  37. def tearDown(self):
  38. self.cur1.close()
  39. self.con1.close()
  40. self.cur2.close()
  41. self.con2.close()
  42. try:
  43. os.unlink(get_db_path())
  44. except OSError:
  45. pass
  46. def CheckDMLDoesNotAutoCommitBefore(self):
  47. self.cur1.execute("create table test(i)")
  48. self.cur1.execute("insert into test(i) values (5)")
  49. self.cur1.execute("create table test2(j)")
  50. self.cur2.execute("select i from test")
  51. res = self.cur2.fetchall()
  52. self.assertEqual(len(res), 0)
  53. def CheckInsertStartsTransaction(self):
  54. self.cur1.execute("create table test(i)")
  55. self.cur1.execute("insert into test(i) values (5)")
  56. self.cur2.execute("select i from test")
  57. res = self.cur2.fetchall()
  58. self.assertEqual(len(res), 0)
  59. def CheckUpdateStartsTransaction(self):
  60. self.cur1.execute("create table test(i)")
  61. self.cur1.execute("insert into test(i) values (5)")
  62. self.con1.commit()
  63. self.cur1.execute("update test set i=6")
  64. self.cur2.execute("select i from test")
  65. res = self.cur2.fetchone()[0]
  66. self.assertEqual(res, 5)
  67. def CheckDeleteStartsTransaction(self):
  68. self.cur1.execute("create table test(i)")
  69. self.cur1.execute("insert into test(i) values (5)")
  70. self.con1.commit()
  71. self.cur1.execute("delete from test")
  72. self.cur2.execute("select i from test")
  73. res = self.cur2.fetchall()
  74. self.assertEqual(len(res), 1)
  75. def CheckReplaceStartsTransaction(self):
  76. self.cur1.execute("create table test(i)")
  77. self.cur1.execute("insert into test(i) values (5)")
  78. self.con1.commit()
  79. self.cur1.execute("replace into test(i) values (6)")
  80. self.cur2.execute("select i from test")
  81. res = self.cur2.fetchall()
  82. self.assertEqual(len(res), 1)
  83. self.assertEqual(res[0][0], 5)
  84. def CheckToggleAutoCommit(self):
  85. self.cur1.execute("create table test(i)")
  86. self.cur1.execute("insert into test(i) values (5)")
  87. self.con1.isolation_level = None
  88. self.assertEqual(self.con1.isolation_level, None)
  89. self.cur2.execute("select i from test")
  90. res = self.cur2.fetchall()
  91. self.assertEqual(len(res), 1)
  92. self.con1.isolation_level = "DEFERRED"
  93. self.assertEqual(self.con1.isolation_level , "DEFERRED")
  94. self.cur1.execute("insert into test(i) values (5)")
  95. self.cur2.execute("select i from test")
  96. res = self.cur2.fetchall()
  97. self.assertEqual(len(res), 1)
  98. @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2),
  99. 'test hangs on sqlite versions older than 3.2.2')
  100. def CheckRaiseTimeout(self):
  101. self.cur1.execute("create table test(i)")
  102. self.cur1.execute("insert into test(i) values (5)")
  103. with self.assertRaises(sqlite.OperationalError):
  104. self.cur2.execute("insert into test(i) values (5)")
  105. @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2),
  106. 'test hangs on sqlite versions older than 3.2.2')
  107. def CheckLocking(self):
  108. """
  109. This tests the improved concurrency with pysqlite 2.3.4. You needed
  110. to roll back con2 before you could commit con1.
  111. """
  112. self.cur1.execute("create table test(i)")
  113. self.cur1.execute("insert into test(i) values (5)")
  114. with self.assertRaises(sqlite.OperationalError):
  115. self.cur2.execute("insert into test(i) values (5)")
  116. # NO self.con2.rollback() HERE!!!
  117. self.con1.commit()
  118. def CheckRollbackCursorConsistency(self):
  119. """
  120. Checks if cursors on the connection are set into a "reset" state
  121. when a rollback is done on the connection.
  122. """
  123. con = sqlite.connect(":memory:")
  124. cur = con.cursor()
  125. cur.execute("create table test(x)")
  126. cur.execute("insert into test(x) values (5)")
  127. cur.execute("select 1 union select 2 union select 3")
  128. con.rollback()
  129. with self.assertRaises(sqlite.InterfaceError):
  130. cur.fetchall()
  131. class SpecialCommandTests(unittest.TestCase):
  132. def setUp(self):
  133. self.con = sqlite.connect(":memory:")
  134. self.cur = self.con.cursor()
  135. def CheckDropTable(self):
  136. self.cur.execute("create table test(i)")
  137. self.cur.execute("insert into test(i) values (5)")
  138. self.cur.execute("drop table test")
  139. def CheckPragma(self):
  140. self.cur.execute("create table test(i)")
  141. self.cur.execute("insert into test(i) values (5)")
  142. self.cur.execute("pragma count_changes=1")
  143. def tearDown(self):
  144. self.cur.close()
  145. self.con.close()
  146. class TransactionalDDL(unittest.TestCase):
  147. def setUp(self):
  148. self.con = sqlite.connect(":memory:")
  149. def CheckDdlDoesNotAutostartTransaction(self):
  150. # For backwards compatibility reasons, DDL statements should not
  151. # implicitly start a transaction.
  152. self.con.execute("create table test(i)")
  153. self.con.rollback()
  154. result = self.con.execute("select * from test").fetchall()
  155. self.assertEqual(result, [])
  156. def CheckImmediateTransactionalDDL(self):
  157. # You can achieve transactional DDL by issuing a BEGIN
  158. # statement manually.
  159. self.con.execute("begin immediate")
  160. self.con.execute("create table test(i)")
  161. self.con.rollback()
  162. with self.assertRaises(sqlite.OperationalError):
  163. self.con.execute("select * from test")
  164. def CheckTransactionalDDL(self):
  165. # You can achieve transactional DDL by issuing a BEGIN
  166. # statement manually.
  167. self.con.execute("begin")
  168. self.con.execute("create table test(i)")
  169. self.con.rollback()
  170. with self.assertRaises(sqlite.OperationalError):
  171. self.con.execute("select * from test")
  172. def tearDown(self):
  173. self.con.close()
  174. def suite():
  175. default_suite = unittest.makeSuite(TransactionTests, "Check")
  176. special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check")
  177. ddl_suite = unittest.makeSuite(TransactionalDDL, "Check")
  178. return unittest.TestSuite((default_suite, special_command_suite, ddl_suite))
  179. def test():
  180. runner = unittest.TextTestRunner()
  181. runner.run(suite())
  182. if __name__ == "__main__":
  183. test()