提问人:Miguel Formigo 提问时间:11/14/2023 更新时间:11/14/2023 访问量:17
nodejs / sequelize mysql :: 并发连接保持睡眠模式并锁定表
nodejs / sequelize mysql :: concurrent connections stay in sleep mode and lock table
问:
我有几年的编程经验,但我对 nodejs 及其与数据库的连接仍然很陌生。也就是说,我正在开发一个项目,该项目实现了 nodejs 和 sequelize 以连接到 mysql 数据库,并且我一直在根据已经实现且显然正常运行的其他代码对它们进行开发。
但是我遇到了一个我无法解决的问题,尽管我尽了最大努力。
我会尽力说出来,我提前为这么长的帖子道歉:
该项目是一个管理平台,每个客户端都有自己的数据库(在结构上都相似),因此主要配置的连接是针对记录的客户端数据库。但是,对于某些过程,信息在公司自己的数据库中注册,该数据库具有不同的访问凭据。
所有凭据都加密存储在 .env 文件中,该文件由后端读取、解密并用于在 nodejs 启动时初始化两个连接。当请求来自前端时,它会经过 Controller -> Service -> Repository,后者将连接实例初始化到数据库并执行目标查询。
问题是,与客户端自己的数据库的所有连接似乎都正常工作,在查询执行时开始和结束。对公司数据库的 after查询(仅对现有记录的日期列执行 UPDATE操作)在睡眠模式下保持挂起状态,这不会发生。
我一直在阅读续集文档,但找不到任何有用的东西。
从我从文档中收集到的信息来看,续集连接、实例等都已正确设置。
特定的客户端数据库加载程序文件(公司的加载程序文件
与不同的凭据相似)
export class DbClientLoader {
private static _instance: DbClientLoader;
public database;
private constructor() {
this.initDb();
}
/**
* Initialize the connection pool to the db
*/
private initDb() {
const { Sequelize } = require('sequelize');
this.database = new Sequelize(
process.env.BD_CLIENT_SCHEMA,
process.env.BD_CLIENT_USER,
process.env.BD_CLIENT_PASSWORD,
{
host: process.env.BD_CLIENT_HOST,
dialect: 'mysql',
pool: {
max: +process.env.BD_MAX_CONNECTIONS, // connection pool size
min: 0,
acquire: 30000,
idle: 10000,
},
timezone: 'SYSTEM',
logging: false,
dialectOptions: {
timezone: 'local',
decimalNumbers: true,
dateStrings: true,
typeCast: function (field, next) {
// for reading from database
if (field.type === 'DATETIME') {
return field.string();
}
return next();
},
},
}
);
}
/**
* Singleton loader
*/
public static get Instance() {
return this._instance || (this._instance = new this());
}
}
export const DbClientLoaderInst = DbClientLoader.Instance;
数据库加载器(在nodejs服务器启动时调用)(
所有尝试显然都已成功运行)
export class DbLoader {
public static async loadAll() {
// Sets connection with the client database
const DbClientLoader = require('./DbClientLoader');
const dbClient = DbClientLoader.DbClientLoaderInst.database;
try {
await dbClient.authenticate();
console.log('Connection established with the client database.');
} catch (error) {
console.error('(!) ERROR: Connection with the client database failed:');
console.error('> ', error);
}
// Sets connection with the company database
const DbCompanyLoader = require('./DbCompanyLoader');
const dbCompany = DbCompanyLoader.DbCompanyLoaderInst.database;
try {
await dbCompany.authenticate();
console.log('Connection established with the company database.');
} catch (error) {
console.error('(!) ERROR: Connection with the company database failed:');
console.error('> ', error);
}
}
}
数据库 ids 文件
(只是 id,而不是实际的架构名称,这些包含在凭据中)
export enum DB_ID {
CLIENT = 'client',
COMPANY = 'company',
}
数据库实用程序文件
import ...
import { DbClientLoader } from '../loaders/DbClientLoader';
import { DbCompanyLoader } from '../loaders/DbCompanyLoader';
export class dbUtils {
/**
* Gets the connection pool for the client database
*/
static getClientDB(): Sequelize {
return DbClientLoader.Instance.database;
}
/**
* Gets the connection pool for the company database
*/
static getCompanyDB(): Sequelize {
return DbCompanyLoader.Instance.database;
}
}
通用存储库文件
import cls from 'continuation-local-storage';
import { Transaction } from 'sequelize';
import { dbUtils } from '../dbUtils';
import { DB_TYPE } from './DB_ID.enum';
import { Base_Repository_Interface } from './Base_Repository.interface';
/**
* Generic implementation of a repository
*/
export class Base_Repository implements Base_Repository_Interface {
protected DBID: DB_ID;
constructor(dbid: DB_ID) {
this.DBID = dbid;
}
async getTransaction(): Promise<Transaction> {
const session = cls.getNamespace('managersoft');
let transation;
switch (this.DBID) {
case DB_ID.CLIENT:
transation = session.get('transaction_client');
if (!transation) {
transation = await this.getConnection().transaction();
session.set('transaction_client', transation);
}
break;
case DB_ID.COMPANY:
transation = session.get('transaction_company');
if (!transation) {
transation = await this.getConnection().transaction();
session.set('transaction_company', transation);
}
break;
}
return transation as Transaction;
}
public getConnection() {
switch (this.DBID) {
case DB_ID.CLIENT:
return dbUtils.getClientDB();
break;
case DB_ID.COMPANY:
return dbUtils.getCompanyDB();
break;
}
}
}
简化的运行脚本示例:
User_Service
(访问User_Repository,呼叫Company_Service)
import ...
// custom services
import { DIUser_Service, User_Service_Interface } from './User_Service.interface';
import { DICompany_Service, Company_Service_Interface } from './Company_Service.interface';
// custom repositories
import { DIUser_Repository, User_Repository_Interface } from '../repositories/User_Repository.interface';
@provide(DIUser_Service)
export class User_Service implements User_Service_Interface {
constructor(
// custom services
@inject(DICompany_Service) private companyService: Company_Service_Interface,
// custom repositories
@inject(DIUser_Repository) private userRepository: User_Repository_Interface,
) {}
public async getUserData(params) {
...
}
public async setUserData(params) {
const upsert = await this.userRepository.set({
id: params.userid,
name: params.name,
nickname: params.nickname,
birthdate: params.birthdate,
});
if (upsert.inserted > 0 || upsert.updated > 0) {
this.companyService.update({
what: 'userstats',
id: params.userid,
});
}
return upsert;
}
}
User_Repository (User_Service访问)
import ...
import { DB_ID } from './DB_ID.enum';
import { Base_Repository } from './Base_Repository';
import { User_Repository_Interface, DIUser_Repository } from './User_Repository.interface';
@provide(DIUser_Repository)
export class User_Repository extends Base_Repository implements User_Repository_Interface {
constructor() {
super(DB_ID.CLIENT);
}
async get(params) {
...
}
async set(params) {
/*
QUERIES
*/
const sql = {
users: {
INSERT: `INSERT INTO users (__FIELDS__)
VALUES (__PLACEHOLDERS__)`,
UPDATE: `UPDATE users
SET __PLACEHOLDERS__
WHERE id = :id`,
},
...
params: {
type: params.id > 0 ? 'UPDATE' : 'INSERT',
transaction: await this.getTransaction(),
replacements: {},
},
ready: '',
}
/*
SETUP READY QUERY AND PARAMS
*/
switch (sql.params.type) {
case 'INSERT':
...
break;
case 'UPDATE':
// prepare placeholders and params
let __PLACEHOLDERS__ = '';
...
sql.ready = sql.users.UPDATE.replace('__PLACEHOLDERS__', __PLACEHOLDERS__);
break;
}
/*
EXECUTE QUERY
*/
return await super.getConnection()
.query(sql.ready, sql.params)
.then((data) => {
// data<array>
// [0] => last id (on INSERT)
// [1] => affected rows
return { id: data[0], upserted: data[1] }
})
.catch((err) => {
console.error(err);
});
}
}
Company_Service (访问Company_Repository,由User_Service调用)
import ...
// custom services
import { DICompany_Service, Company_Service_Interface } from './Company_Service.interface';
// custom repositories
import { Company_Repository_Interface, DICompany_Repository } from '../repositories/Company_Repository.interface';
@provide(DICompany_Service)
export class Company_Service implements Company_Service_Interface {
constructor(
@inject(DICompany_Repository) private companyRepository: Company_Repository_Interface,
) {}
public async update(params) {
const upsert = await this.companyRepository.set(params);
return upsert;
}
}
Company_Repository (由Company_Service访问)
import ...
import { DB_ID } from './DB_ID.enum';
import { Base_Repository } from './Base_Repository';
import { Company_Repository_Interface, DICompany_Repository } from './Company_Repository.interface';
@provide(DICompany_Repository)
export class Company_Repository extends Base_Repository implements Company_Repository_Interface {
constructor() {
super(DB_ID.COMPANY);
}
async get(params) {
...
}
async set(params) {
/*
QUERIES
*/
const sql = {
userstats: {
INSERT: `INSERT INTO userstats (id, created, lastupdate)
VALUES (id, NOW(), NULL)`,
UPDATE: `UPDATE userstats
SET lastupdate = NOW()
WHERE id = :id`,
},
...
params: {
type: params.id > 0 ? 'UPDATE' : 'INSERT',
transaction: await this.getTransaction(),
replacements: {},
},
ready: '',
}
/*
SETUP READY QUERY AND PARAMS
*/
sql.ready = sql.users[sql.params.type];
sql.params.replacements.id = params.id;
/*
EXECUTE QUERY
*/
return await super.getConnection()
.query(sql.ready, sql.params)
.then((data) => {
// data<array>
// [0] => last id (on INSERT)
// [1] => affected rows
return { id: data[0], upserted: data[1] }
})
.catch((err) => {
console.error(err);
});
}
}
尝试了上述解决方案,后来重新设计了脚本以进一步分离脚本,从User_Service中删除对Company_Service的调用,独立设置它并在用户更新响应成功后从前端调用它。
不幸的是,结果是一样的:数据库表在连接的睡眠状态下被锁定。
我只希望 UPDATE 查询在成功完成事务的情况下执行,而不会使表被锁定,也不会使连接在睡眠模式下保持空闲状态。
对于任何走到这一步的人,提前感谢您抽出时间。
任何帮助将不胜感激。
答: 暂无答案
评论