python
主页 > 脚本 > python >

django链接多个数据库,并使用原生sql实现

2020-03-28 | 秩名 | 点击:

settings文件如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
DATABASES = {
  'default': {
    'ENGINE': 'django.db.backends.sqlite3',
    'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
  },
  'db1': {  # 配置第二个数据库节点名称
    'ENGINE': 'django.db.backends.oracle',
    'NAME': 'devdb',
    'USER': 'hysh',
    'PASSWORD': 'hysh',
    'HOST': '192.168.191.3',
    'PORT': '1521',
  },
}
 

查找Django的文档:

?
1
2
3
4
5
6
7
8
9
from django.db import connection
 
def my_custom_sql(self):
  with connection.cursor() as cursor:
    cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
    cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
    row = cursor.fetchone()
 
  return row
 

上述方法是设置中如果有多个数据库,会默认使用 default,当你想使用指定的数据库连接时,引入的对象就变成了connections !

?
1
2
3
4
5
6
7
8
9
from django.db import connection
 
def my_custom_sql(self):
  with connection.cursor() as cursor:
    cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
    cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
    row = cursor.fetchone()
 
  return row
 

之后再进行操作。

补充知识:Django多数据源接入类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework import status
from django.db import transaction
 
 
from .contants import db_dict
 
contants.py的内容
 
 
import cx_Oracle
import pymysql
 
# 定义一个数据库类型&引擎的字典,
db_dict = {'mysql':pymysql,'Oracle':cx_Oracle}
 
from .models import DataSystem,Rule
 
 
class DBconnectView(GenericAPIView):
  __DBtype = db_dict
  def get(self,request,pk,rule_id):
 
    # 通过传入的id进行对应的数据库链接
    self.datas = DataSystem.objects.get(pk=pk)
    self.url = self.datas.url
    self.username = self.datas.username
    self.password = self.datas.password_enc
    self.DBname = self.datas.name
    self.DBtype = self.__DBtype[self.datas.type]
 
    # 获取check_code规则
    self.ruledatas = Rule.objects.get(id=rule_id)
    self.check_code = self.ruledatas.check_code
    # db = __import__(self.DBtype)
 
    try:
      conn = self.DBtype.connect(host=self.url,user=self.username,password=self.password,database=self.DBname)
      # 链接成功后创建一个游标
      cs_ms = conn.cursor()
    except Exception as e:
      raise e
    else:
      # 明显的开启事务
      with transaction.atomic():
        # 在安全的地方,创建保存点,将来操作数据库失败回滚到此
        save_id = transaction.savepoint()
 
        try:
 
          # 获取一个元组
          db_ret = cs_ms.execute(self.check_code)
        except Exception as e:
          transaction.savepoint_rollback(save_id)
          raise e
        else:
          db_set = db_ret.fetchone()
          # transaction.savepoint_commit(save_id)
    finally:
      cs_ms.close()
      conn.close()
 
    return Response({'pk':pk,'rule_id':rule_id})

原文链接:https://blog.csdn.net/qq_37049050/article/details/85131514
相关文章
最新更新