使用 Ruby 在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令
适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)
本快速入门演示如何使用 Ruby 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Ruby 开发,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。
安装 PostgreSQL 库
本文中的代码示例需要 pg gem。 你需要使用语言包管理器(例如 bundler)安装 pg。
进行连接,创建表,然后插入数据
使用以下代码进行连接,使用 CREATE TABLE SQL 语句创建表,然后使用 INSERT INTO SQL 语句将行添加到表中。 该代码使用 PG::Connection
对象和构造函数来连接到 Azure Cosmos DB for PostgreSQL。 然后调用 exec()
方法,以便运行 DROP、CREATE TABLE 和 INSERT INTO 命令。 代码使用 PG::Error
类来检查是否存在错误。 然后,它会调用方法 close()
,在终止之前关闭连接。
在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码或 Microsoft Entra ID 令牌。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database'
# Drop previous table of same name if one exists
connection.exec('DROP TABLE IF EXISTS pharmacy;')
puts 'Finished dropping table (if existed).'
# Drop previous table of same name if one exists.
connection.exec('CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);')
puts 'Finished creating table.'
# Insert some data into table.
connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);")
connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);")
puts 'Inserted 2 rows of data.'
# Create index
connection.exec("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
分发表
Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table
和分布列。
注意
通过分发表,它们可在添加到群集的任何工作器节点之间增长。
使用以下代码连接到数据库并分发表。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
# Super power of distributed tables.
connection.exec("select create_distributed_table('pharmacy','pharmacy_id');")
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
读取数据
使用以下代码进行连接,并使用 SELECT SQL 语句来读取数据。
该代码调用 exec()
方法来运行 SELECT 命令,并将结果保存在结果集中。 结果集集合使用 resultSet.each
do 循环进行迭代,将当前行值保存在行变量中。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
resultSet = connection.exec('SELECT * from pharmacy')
resultSet.each do |row|
puts 'Data row = (%s, %s, %s, %s, %s)' % [row['pharmacy_id'], row['pharmacy_name'], row['city'], row['state'], row['zip_code ']]
end
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
更新数据
使用以下代码进行连接,并使用 UPDATE SQL 语句更新数据。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
# Modify some data in table.
connection.exec('UPDATE pharmacy SET city = %s WHERE pharmacy_id = %d;' % ['\'guntur\'',100])
puts 'Updated 1 row of data.'
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
删除数据
使用以下代码进行连接,并使用 DELETE SQL 语句删除数据。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
# Delete some data in table.
connection.exec('DELETE FROM pharmacy WHERE city = %s;' % ['\'guntur\''])
puts 'Deleted 1 row of data.'
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
用于超快速引入的 COPY 命令
在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。
用于从文件加载数据的 COPY 命令
以下代码将数据从 CSV 文件复制到数据库表。 它要求使用 pharmacies.csv 文件。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
filename = String('pharmacies.csv')
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
# Copy the data from Csv to table.
result = connection.copy_data "COPY pharmacy FROM STDIN with csv" do
File.open(filename , 'r').each do |line|
connection.put_copy_data line
end
puts 'Copied csv data successfully.'
end
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
用于加载内存中数据的 COPY 命令
以下代码将内存中数据复制到表。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
puts 'Successfully created connection to database.'
enco = PG::TextEncoder::CopyRow.new
connection.copy_data "COPY pharmacy FROM STDIN", enco do
connection.put_copy_data [5000,'Target','Sunnyvale','California','94001']
connection.put_copy_data [5001, 'CVS','San Francisco','California','94002']
puts 'Copied in-memory data successfully.'
end
rescue PG::Error => e
puts e.message
ensure
connection.close if connection
end
针对数据库请求失败情况的应用重试
有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。
在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。
下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。
在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
require 'pg'
def executeretry(sql,retryCount)
begin
for a in 1..retryCount do
begin
# NOTE: Replace <cluster> and <password> in the connection string.
connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
resultSet = connection.exec(sql)
return resultSet.each
rescue PG::Error => e
puts e.message
sleep 60
ensure
connection.close if connection
end
end
end
return nil
end
var = executeretry('select 1',5)
if var !=nil then
var.each do |row|
puts 'Data row = (%s)' % [row]
end
end
后续步骤
- 了解 Azure Cosmos DB for PostgreSQL API 如何扩展 PostgreSQL,并尝试使用有用的诊断查询
- 为工作负载选择最佳群集大小
- 监视群集性能