使用 Python 在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令

适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)

本快速入门演示如何使用 Python 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Python 开发,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。

安装 PostgreSQL 库

本文中的代码示例需要 psycopg2 库。 你需要使用语言包管理器(例如 pip)安装 psycopg2。

进行连接,创建表,然后插入数据

下面的代码示例创建通向 Postgres 数据库的连接池。 然后,该代码使用 cursor.execute 函数以及 SQL CREATE TABLE 和 INSERT INTO 语句来创建表并插入数据。

提示

下面的示例代码使用连接池来创建和管理与 PostgreSQL 的连接。 强烈建议使用应用程序端连接池,因为:

  • 它可确保应用程序不会生成太多通向数据库的连接,从而避免超过连接限制。
  • 这有助于大幅提高性能,包括延迟和吞吐量。 PostgreSQL 服务器进程必须创建分支来处理每个新连接,而重用连接可避免这项开销。

在以下代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码或 Microsoft Entra ID 令牌。

注意

此示例将在结束时关闭连接,因此,如果要在同一会话中运行文章中的其他示例,请在运行此示例时不要包括 # Clean up 部分。

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

如果代码成功运行,则会生成以下输出:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

分发表

Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table 和分布列。

注意

通过分发表,它们可在添加到群集的任何工作器节点之间增长。

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

读取数据

下面的代码示例使用以下 API 从数据库读取数据:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

更新数据

下面的代码示例使用 cursor.execute 和 SQL UPDATE 语句来更新数据。

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

删除数据

下面的代码示例运行 cursor.execute 和 SQL DELETE 语句来删除数据。

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

用于快速引入的 COPY 命令

在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。

用于从文件加载数据的 COPY 命令

以下代码将数据从 CSV 文件复制到数据库表。 该代码需要使用 pharmacies.csv 文件。

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

用于加载内存中数据的 COPY 命令

以下代码将内存中数据复制到表。

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

针对数据库请求失败情况的应用重试

有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。

在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。

下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。

在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

后续步骤