settings文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
DATABASES = { 'default' : { 'ENGINE' : 'django.db.backends.sqlite3' , 'NAME' : os.path. join (BASE_DIR, 'db.sqlite3' ), }, 'db1' : { # 配置第二个数据库节点名称 'ENGINE' : 'django.db.backends.oracle' , 'NAME' : 'devdb' , 'USER' : 'hysh' , 'PASSWORD' : 'hysh' , 'HOST' : '192.168.191.3' , 'PORT' : '1521' , }, } |
查找Django的文档:
1
2
3
4
5
6
7
8
9
|
from django.db import connection def my_custom_sql( self ): with connection.cursor() as cursor: cursor.execute( "UPDATE bar SET foo = 1 WHERE baz = %s" , [ self .baz]) cursor.execute( "SELECT foo FROM bar WHERE baz = %s" , [ self .baz]) row = cursor.fetchone() return row |
上述方法是设置中如果有多个数据库,会默认使用 default,当你想使用指定的数据库连接时,引入的对象就变成了connections !
1
2
3
4
5
6
7
8
9
|
from django.db import connection def my_custom_sql(self): with connection . cursor () as cursor : cursor . execute ( "UPDATE bar SET foo = 1 WHERE baz = %s" , [self.baz]) cursor . execute ( "SELECT foo FROM bar WHERE baz = %s" , [self.baz]) row = cursor .fetchone() return row |
之后再进行操作。
补充知识:Django多数据源接入类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
from rest_framework.generics import GenericAPIView from rest_framework.response import Response from rest_framework import status from django.db import transaction from .contants import db_dict contants.py的内容 ( import cx_Oracle import pymysql # 定义一个数据库类型&引擎的字典, db_dict = { 'mysql' :pymysql, 'Oracle' :cx_Oracle} ) from .models import DataSystem,Rule class DBconnectView(GenericAPIView): __DBtype = db_dict def get( self ,request,pk,rule_id): # 通过传入的id进行对应的数据库链接 self .datas = DataSystem.objects.get(pk = pk) self .url = self .datas.url self .username = self .datas.username self .password = self .datas.password_enc self .DBname = self .datas.name self .DBtype = self .__DBtype[ self .datas. type ] # 获取check_code规则 self .ruledatas = Rule.objects.get( id = rule_id) self .check_code = self .ruledatas.check_code # db = __import__(self.DBtype) try : conn = self .DBtype.connect(host = self .url,user = self .username,password = self .password,database = self .DBname) # 链接成功后创建一个游标 cs_ms = conn.cursor() except Exception as e: raise e else : # 明显的开启事务 with transaction.atomic(): # 在安全的地方,创建保存点,将来操作数据库失败回滚到此 save_id = transaction.savepoint() try : # 获取一个元组 db_ret = cs_ms.execute( self .check_code) except Exception as e: transaction.savepoint_rollback(save_id) raise e else : db_set = db_ret.fetchone() # transaction.savepoint_commit(save_id) finally : cs_ms.close() conn.close() return Response({ 'pk' :pk, 'rule_id' :rule_id}) |