使用 Python 在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令
适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)
本快速入门演示如何使用 Python 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Python 开发,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。
安装 PostgreSQL 库
本文中的代码示例需要 psycopg2 库。 你需要使用语言包管理器(例如 pip)安装 psycopg2。
进行连接,创建表,然后插入数据
下面的代码示例创建通向 Postgres 数据库的连接池。 然后,该代码使用 cursor.execute 函数以及 SQL CREATE TABLE 和 INSERT INTO 语句来创建表并插入数据。
提示
下面的示例代码使用连接池来创建和管理与 PostgreSQL 的连接。 强烈建议使用应用程序端连接池,因为:
- 它可确保应用程序不会生成太多通向数据库的连接,从而避免超过连接限制。
- 这有助于大幅提高性能,包括延迟和吞吐量。 PostgreSQL 服务器进程必须创建分支来处理每个新连接,而重用连接可避免这项开销。
在以下代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码或 Microsoft Entra ID 令牌。
注意
此示例将在结束时关闭连接,因此,如果要在同一会话中运行文章中的其他示例,请在运行此示例时不要包括 # Clean up
部分。
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
如果代码成功运行,则会生成以下输出:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
分发表
Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table
和分布列。
注意
通过分发表,它们可在添加到群集的任何工作器节点之间增长。
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
读取数据
下面的代码示例使用以下 API 从数据库读取数据:
- cursor.execute 和 SQL SELECT 语句来读取数据。
- cursor.fetchall(),以接受查询并返回要循环访问的结果集。
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
更新数据
下面的代码示例使用 cursor.execute
和 SQL UPDATE 语句来更新数据。
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
删除数据
下面的代码示例运行 cursor.execute
和 SQL DELETE 语句来删除数据。
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
用于快速引入的 COPY 命令
在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。
用于从文件加载数据的 COPY 命令
以下代码将数据从 CSV 文件复制到数据库表。 该代码需要使用 pharmacies.csv 文件。
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
用于加载内存中数据的 COPY 命令
以下代码将内存中数据复制到表。
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
针对数据库请求失败情况的应用重试
有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。
在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。
下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。
在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
后续步骤
- 了解 Azure Cosmos DB for PostgreSQL API 如何扩展 PostgreSQL,并尝试使用有用的诊断查询
- 为工作负载选择最佳群集大小
- 监视群集性能