我正在使用sqlAlchemy 1.0.0,并希望批量生成一些UPDATE(更新,如果匹配主键,则不执行任何操作)查询.
我做了一些实验,发现批量更新看起来比批量插入或批量upsert慢得多.
你能不能帮助我指出为什么它的工作速度如此之慢,或者有没有其他方法/想法用sqlAlchemy进行BULK UPDATE(不是BULK UPSERT)?
下面是MysqL中的表:
CREATE TABLE `test` (
`id` int(11) unsigned NOT NULL,`value` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
和测试代码:
from sqlalchemy import create_engine,text
import time
driver = 'MysqL'
host = 'host'
user = 'user'
password = 'password'
database = 'database'
url = "{}://{}:{}@{}/{}?charset=utf8".format(driver,user,password,host,database)
engine = create_engine(url)
engine.connect()
engine.execute('TRUNCATE TABLE test')
num_of_rows = 1000
rows = []
for i in xrange(0,num_of_rows):
rows.append({'id': i,'value': i})
print '--------- test insert --------------'
sql = '''
INSERT INTO test (id,value)
VALUES (:id,:value)
'''
start = time.time()
engine.execute(text(sql),rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test upsert --------------'
for r in rows:
r['value'] = r['id'] + 1
sql = '''
INSERT INTO test (id,:value)
ON DUPLICATE KEY UPDATE value = VALUES(value)
'''
start = time.time()
engine.execute(text(sql),rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test update --------------'
for r in rows:
r['value'] = r['id'] * 10
sql = '''
UPDATE test
SET value = :value
WHERE id = :id
'''
start = time.time()
engine.execute(text(sql),rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
num_of_rows = 100时的输出:
--------- test insert --------------
Cost 0.568960905075 seconds
--------- test upsert --------------
Cost 0.569655895233 seconds
--------- test update --------------
Cost 20.0891299248 seconds
num_of_rows = 1000时的输出:
--------- test insert --------------
Cost 0.807548999786 seconds
--------- test upsert --------------
Cost 0.584554195404 seconds
--------- test update --------------
Cost 206.199367046 seconds
数据库服务器的网络延迟大约为500毫秒.
看起来像批量更新它一个接一个地发送和执行每个查询,而不是批处理?
提前致谢.
最佳答案
即使数据库服务器(如您的情况)具有非常糟糕的延迟,您也可以通过技巧加快批量更新操作.您可以使用stage-table非常快速地插入新数据,然后对目标表执行一次join-update,而不是直接更新表.这样做的另一个好处是可以大大减少必须发送到数据库的语句数量.
这如何与UPDATE一起使用?
假设您有一个表条目,并且您始终有新数据,但您只想更新已存储的数据.您创建目标表entries_stage的副本,其中只包含相关字段:
entries = Table('entries',Metadata,Column('id',Integer,autoincrement=True,primary_key=True),Column('value',Unicode(64),nullable=False),)
entries_stage = Table('entries_stage',autoincrement=False,unique=True),)
然后使用批量插入插入数据.如果您使用MysqL的多值插入语法(sqlAlchemy本身不支持,但可以毫无困难地构建),这可以进一步加速.
INSERT INTO enries_stage (`id`,`value`)
VALUES
(1,'string1'),(2,'string2'),(3,'string3'),...;
最后,使用stage-table中的值更新destination-table的值,如下所示:
UPDATE entries e
JOIN entries_stage es ON e.id = es.id
SET e.value = es.value;
然后你就完成了.
插入怎么样?
当然,这也可以加速插入.由于您已经在stage-table中拥有了数据,所以您需要做的就是发出INSERT INTO … SELECT语句,其中的数据还没有在destination-table中.
INSERT INTO entries (id,value)
SELECT FROM entries_stage es
LEFT JOIN entries e ON e.id = es.id
HAVING e.id IS NULL;
关于这一点的好处是你不必执行INSERT IGNORE,REPLACE或ON DUPLICATE KEY UPDATE,这将增加你的主键,即使它们什么也不做.