目标
本教程将介绍如何使用 Spanner 完成以下步骤: 适用于 PostgreSQL 驱动程序的 PGAdapter 本地代理:
- 创建 Spanner 实例和数据库。
- 写入、读取数据库中的数据和对数据执行 SQL 查询。
- 更新数据库架构。
- 使用读写事务更新数据。
- 向数据库添加二级索引。
- 使用索引来读取数据和对数据执行 SQL 查询。
- 使用只读事务检索数据。
费用
本教程使用 Spanner,它是 Google Cloud如需了解 Spanner 的使用费用,请参阅 定价。
准备工作
完成设置中介绍的步骤,包括创建和设置默认 Google Cloud 项目、启用结算功能、启用 Cloud Spanner API 以及设置 OAuth 2.0 来获取身份验证凭据以使用 Cloud Spanner API。
尤其要确保运行 gcloud auth
application-default login
,以便使用身份验证凭据设置本地开发环境。
准备本地 PGAdapter 环境
您可以将 PostgreSQL 驱动程序与 PGAdapter 结合使用 才能连接到 Spanner。PGAdapter 是一个 会将 PostgreSQL 网络协议转换为 Spanner gRPC 协议。
PGAdapter 需要 Java 或 Docker 才能运行。
如果尚未安装以下任何一项,请在开发机器上安装其中之一:
将示例应用代码库克隆到本地机器:
git clone https://github.com/GoogleCloudPlatform/pgadapter.git
切换到包含 Spanner 示例代码的目录:
psql
cd pgadapter/samples/snippets/psql-snippets
Java
cd pgadapter/samples/snippets/java-snippets mvn package -DskipTests
Go
cd pgadapter/samples/snippets/golang-snippets
Node.js
cd pgadapter/samples/snippets/nodejs-snippets npm install
Python
cd pgadapter/samples/snippets/python-snippets python -m venv ./venv pip install -r requirements.txt cd samples
C#
cd pgadapter/samples/snippets/dotnet-snippets
创建实例
首次使用 Spanner 时,您必须创建一个实例, 分配 Spanner 数据库使用的资源。创建实例时,请选择一个实例配置(决定数据的存储位置),同时选择要使用的节点数(决定实例中服务资源和存储资源的数量)。
执行以下命令,在该区域中创建 Spanner 实例
us-central1
,具有 1 个节点:
gcloud spanner instances create test-instance --config=regional-us-central1 \
--description="Test Instance" --nodes=1
请注意,此命令将创建一个具有以下特征的实例:
- 实例 ID 为
test-instance
- 显示名为
Test Instance
- 实例配置为
regional-us-central1
(单区域配置将数据存储在单个区域中,而多区域配置则将数据分布在多个区域中。如需了解详情,请参阅实例简介。) - 节点数为 1(
node_count
对应于实例中数据库可用的服务资源和存储资源的数量。有关详情,请参阅节点和 处理单元)。
您应该会看到:
Creating instance...done.
浏览示例文件
示例代码库包含一个示例,展示了如何使用 Spanner 与 PGAdapter 结合使用。
请浏览samples/snippets
文件夹,其中说明了如何使用
Spanner。代码展示了如何创建和使用新数据库。数据
使用
架构和数据模型页面。
启动 PGAdapter
在本地开发机器上启动 PGAdapter,并将其指向 实例
以下命令假定您已执行
gcloud auth application-default login
。
Java 应用
wget https://storage.googleapis.com/pgadapter-jar-releases/pgadapter.tar.gz \
&& tar -xzvf pgadapter.tar.gz
java -jar pgadapter.jar -i test-instance
Docker
docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter
docker run \
--name pgadapter \
--rm -d -p 5432:5432 \
-v "$HOME/.config/gcloud":/gcloud:ro \
--env CLOUDSDK_CONFIG=/gcloud \
gcr.io/cloud-spanner-pg-adapter/pgadapter \
-i test-instance -x
模拟器
docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
docker run \
--name pgadapter-emulator \
--rm -d \
-p 5432:5432 \
-p 9010:9010 \
-p 9020:9020 \
gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
这将使用嵌入式 Spanner 启动 PGAdapter 模拟器。此嵌入式模拟器会自动创建任何 Spanner 而无需手动创建 。
我们建议您在生产环境中运行 PGAdapter,并将其作为 Sidecar 或作为进程内依赖项使用如需详细了解如何部署 PGAdapter 在生产环境中的情况,请参阅 选择运行 PGAdapter 的方法。
创建数据库
通过以下方法在名为 test-instance
的实例中创建名为 example-db
的数据库:
请在命令行中运行以下命令。
gcloud spanner databases create example-db --instance=test-instance \
--database-dialect=POSTGRESQL
您应该会看到:
Creating database...done.
创建表
以下代码会在数据库中创建两个表。
psql
#!/bin/bash
# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create two tables in one batch.
psql << SQL
-- Create the singers table
CREATE TABLE singers (
singer_id bigint not null primary key,
first_name character varying(1024),
last_name character varying(1024),
singer_info bytea,
full_name character varying(2048) GENERATED ALWAYS
AS (first_name || ' ' || last_name) STORED
);
-- Create the albums table. This table is interleaved in the parent table
-- "singers".
CREATE TABLE albums (
singer_id bigint not null,
album_id bigint not null,
album_title character varying(1024),
primary key (singer_id, album_id)
)
-- The 'interleave in parent' clause is a Spanner-specific extension to
-- open-source PostgreSQL.
INTERLEAVE IN PARENT singers ON DELETE CASCADE;
SQL
echo "Created Singers & Albums tables in database: [${PGDATABASE}]"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
class CreateTables {
static void createTables(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (Statement statement = connection.createStatement()) {
// Create two tables in one batch.
statement.addBatch(
"create table singers ("
" singer_id bigint primary key not null,"
" first_name varchar(1024),"
" last_name varchar(1024),"
" singer_info bytea,"
" full_name varchar(2048) generated always as (\n"
" case when first_name is null then last_name\n"
" when last_name is null then first_name\n"
" else first_name || ' ' || last_name\n"
" end) stored"
")");
statement.addBatch(
"create table albums ("
" singer_id bigint not null,"
" album_id bigint not null,"
" album_title varchar,"
" primary key (singer_id, album_id)"
") interleave in parent singers on delete cascade");
statement.executeBatch();
System.out.println("Created Singers & Albums tables in database: [" database "]");
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func CreateTables(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Create two tables in one batch on Spanner.
br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
{SQL: "create table singers ("
" singer_id bigint primary key not null,"
" first_name character varying(1024),"
" last_name character varying(1024),"
" singer_info bytea,"
" full_name character varying(2048) generated "
" always as (first_name || ' ' || last_name) stored"
")"},
{SQL: "create table albums ("
" singer_id bigint not null,"
" album_id bigint not null,"
" album_title character varying(1024),"
" primary key (singer_id, album_id)"
") interleave in parent singers on delete cascade"},
}})
cmd, err := br.Exec()
if err != nil {
return err
}
if cmd.String() != "CREATE" {
return fmt.Errorf("unexpected command tag: %v", cmd.String())
}
if err := br.Close(); err != nil {
return err
}
fmt.Printf("Created Singers & Albums tables in database: [%s]\n", database)
return nil
}
Node.js
import { Client } from 'pg';
async function createTables(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Create two tables in one batch.
await connection.query("start batch ddl");
await connection.query("create table singers ("
" singer_id bigint primary key not null,"
" first_name character varying(1024),"
" last_name character varying(1024),"
" singer_info bytea,"
" full_name character varying(2048) generated "
" always as (first_name || ' ' || last_name) stored"
")");
await connection.query("create table albums ("
" singer_id bigint not null,"
" album_id bigint not null,"
" album_title character varying(1024),"
" primary key (singer_id, album_id)"
") interleave in parent singers on delete cascade");
await connection.query("run batch");
console.log(`Created Singers & Albums tables in database: [${database}]`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def create_tables(host: string, port: int, database: string):
# Connect to Cloud Spanner using psycopg3 through PGAdapter.
with psycopg.connect("host={host} port={port} "
"dbname={database} "
"sslmode=disable".format(host=host, port=port,
database=database)) as conn:
# Enable autocommit to execute DDL statements, as psycopg otherwise
# tries to use a read/write transaction.
conn.autocommit = True
# Use a pipeline to execute multiple DDL statements in one batch.
with conn.pipeline():
conn.execute("create table singers ("
" singer_id bigint primary key not null,"
" first_name character varying(1024),"
" last_name character varying(1024),"
" singer_info bytea,"
" full_name character varying(2048) generated "
" always as (first_name || ' ' || last_name) stored"
")")
conn.execute("create table albums ("
" singer_id bigint not null,"
" album_id bigint not null,"
" album_title character varying(1024),"
" primary key (singer_id, album_id)"
") interleave in parent singers on delete cascade")
print("Created Singers & Albums tables in database: [{database}]"
.format(database=database))
C#
using Npgsql;
namespace dotnet_snippets;
public static class CreateTablesSample
{
public static void CreateTables(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Create two tables in one batch.
var batch = connection.CreateBatch();
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"create table singers ("
" singer_id bigint primary key not null,"
" first_name varchar(1024),"
" last_name varchar(1024),"
" singer_info bytea,"
" full_name varchar(2048) generated always as (\n"
" case when first_name is null then last_name\n"
" when last_name is null then first_name\n"
" else first_name || ' ' || last_name\n"
" end) stored"
")"));
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"create table albums ("
" singer_id bigint not null,"
" album_id bigint not null,"
" album_title varchar,"
" primary key (singer_id, album_id)"
") interleave in parent singers on delete cascade"));
batch.ExecuteNonQuery();
Console.WriteLine($"Created Singers & Albums tables in database: [{database}]");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./create_tables.sh example-db
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar createtables example-db
Go
go run sample_runner.go createtables example-db
Node.js
npm start createtables example-db
Python
python create_tables.py example-db
C#
dotnet run createtables example-db
下一步是将数据写入数据库。
创建连接
您必须先创建与以下服务的连接,然后才能执行读写操作 PGAdapter 结合使用。您与 Spanner 的所有交互都必须 通过Connection
实现。数据库名称在连接字符串中指定。
psql
#!/bin/bash
# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Connect to Cloud Spanner using psql through PGAdapter
# and execute a simple query.
psql -c "select 'Hello world!' as hello"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class CreateConnection {
static void createConnection(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection.createStatement().executeQuery("select 'Hello world!' as hello")) {
while (resultSet.next()) {
System.out.printf("Greeting from Cloud Spanner PostgreSQL: %s\n", resultSet.getString(1));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func CreateConnection(host string, port int, database string) error {
ctx := context.Background()
// Connect to Cloud Spanner using pgx through PGAdapter.
// 'sslmode=disable' is optional, but adding it reduces the connection time,
// as pgx will then skip first trying to create an SSL connection.
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
row := conn.QueryRow(ctx, "select 'Hello world!' as hello")
var msg string
if err := row.Scan(&msg); err != nil {
return err
}
fmt.Printf("Greeting from Cloud Spanner PostgreSQL: %s\n", msg)
return nil
}
Node.js
import { Client } from 'pg';
async function createConnection(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("select 'Hello world!' as hello");
console.log(`Greeting from Cloud Spanner PostgreSQL: ${result.rows[0]['hello']}`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def create_connection(host: string, port: int, database: string):
# Connect to Cloud Spanner using psycopg3 through PGAdapter.
# 'sslmode=disable' is optional, but adding it reduces the connection time,
# as psycopg3 will then skip first trying to create an SSL connection.
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("select 'Hello world!' as hello")
print("Greeting from Cloud Spanner PostgreSQL:", cur.fetchone()[0])
C#
using Npgsql;
namespace dotnet_snippets;
public static class CreateConnectionSample
{
public static void CreateConnection(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("select 'Hello World!' as hello", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
var greeting = reader.GetString(0);
Console.WriteLine($"Greeting from Cloud Spanner PostgreSQL: {greeting}");
}
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./create_connection.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar createconnection example-db
Go
go run sample_runner.go createconnection example-db
Node.js
npm start createconnection example-db
Python
python create_connection.py example-db
C#
dotnet run createconnection example-db
使用 DML 写入数据
您可以在读写事务中使用数据操纵语言 (DML) 插入数据。
这些示例展示了如何在 Spanner 上执行 DML 语句 使用 PostgreSQL 驱动程序。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "INSERT INTO singers (singer_id, first_name, last_name) VALUES
(12, 'Melissa', 'Garcia'),
(13, 'Russel', 'Morales'),
(14, 'Jacqueline', 'Long'),
(15, 'Dylan', 'Shaw')"
echo "4 records inserted"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
class WriteDataWithDml {
static class Singer {
private final long singerId;
private final String firstName;
private final String lastName;
Singer(final long id, final String first, final String last) {
this.singerId = id;
this.firstName = first;
this.lastName = last;
}
}
static void writeDataWithDml(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Add 4 rows in one statement.
// JDBC always uses '?' as a parameter placeholder.
try (PreparedStatement preparedStatement =
connection.prepareStatement(
"INSERT INTO singers (singer_id, first_name, last_name) VALUES "
"(?, ?, ?), "
"(?, ?, ?), "
"(?, ?, ?), "
"(?, ?, ?)")) {
final List<Singer> singers =
Arrays.asList(
new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
new Singer(/* SingerId = */ 15L, "Dylan", "Shaw"));
// Note that JDBC parameters start at index 1.
int paramIndex = 0;
for (Singer singer : singers) {
preparedStatement.setLong( paramIndex, singer.singerId);
preparedStatement.setString( paramIndex, singer.firstName);
preparedStatement.setString( paramIndex, singer.lastName);
}
int updateCount = preparedStatement.executeUpdate();
System.out.printf("%d records inserted.\n", updateCount);
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteDataWithDml(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
tag, err := conn.Exec(ctx,
"INSERT INTO singers (singer_id, first_name, last_name) "
"VALUES ($1, $2, $3), ($4, $5, $6), "
" ($7, $8, $9), ($10, $11, $12)",
12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw")
if err != nil {
return err
}
fmt.Printf("%v records inserted\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
async function writeDataWithDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("INSERT INTO singers (singer_id, first_name, last_name) "
"VALUES ($1, $2, $3), ($4, $5, $6), "
" ($7, $8, $9), ($10, $11, $12)",
[12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw"])
console.log(`${result.rowCount} records inserted`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def write_data_with_dml(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("INSERT INTO singers (singer_id, first_name, last_name)"
" VALUES (%s, %s, %s), (%s, %s, %s), "
" (%s, %s, %s), (%s, %s, %s)",
(12, "Melissa", "Garcia",
13, "Russel", "Morales",
14, "Jacqueline", "Long",
15, "Dylan", "Shaw",))
print("%d records inserted" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithDmlSample
{
readonly struct Singer
{
public Singer(long singerId, string firstName, string lastName)
{
SingerId = singerId;
FirstName = firstName;
LastName = lastName;
}
public long SingerId { get; }
public string FirstName { get; }
public string LastName { get; }
}
public static void WriteDataWithDml(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Add 4 rows in one statement.
using var cmd = new NpgsqlCommand("INSERT INTO singers (singer_id, first_name, last_name) VALUES "
"($1, $2, $3), "
"($4, $5, $6), "
"($7, $8, $9), "
"($10, $11, $12)", connection);
List<Singer> singers =
[
new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
new Singer(/* SingerId = */ 15L, "Dylan", "Shaw")
];
foreach (var singer in singers)
{
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.SingerId });
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.FirstName });
cmd.Parameters.Add(new NpgsqlParameter { Value = singer.LastName });
}
var updateCount = cmd.ExecuteNonQuery();
Console.WriteLine($"{updateCount} records inserted.");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./write_data_with_dml.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdml example-db
Go
go run sample_runner.go writeusingdml example-db
Node.js
npm start writeusingdml example-db
Python
python write_data_with_dml.py example-db
C#
dotnet run writeusingdml example-db
您应该会看到以下响应:
4 records inserted.
使用 DML 批次写入数据
PGAdapter 支持执行 DML 批次。发送多个 DML 语句减少 并提升应用的性能。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create a prepared insert statement and execute this prepared
# insert statement three times in one SQL string. The single
# SQL string with three insert statements will be executed as
# a single DML batch on Spanner.
psql -c "PREPARE insert_singer AS
INSERT INTO singers (singer_id, first_name, last_name)
VALUES (\$1, \$2, \$3)" \
-c "EXECUTE insert_singer (16, 'Sarah', 'Wilson');
EXECUTE insert_singer (17, 'Ethan', 'Miller');
EXECUTE insert_singer (18, 'Maya', 'Patel');"
echo "3 records inserted"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
class WriteDataWithDmlBatch {
static class Singer {
private final long singerId;
private final String firstName;
private final String lastName;
Singer(final long id, final String first, final String last) {
this.singerId = id;
this.firstName = first;
this.lastName = last;
}
}
static void writeDataWithDmlBatch(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Add multiple rows in one DML batch.
// JDBC always uses '?' as a parameter placeholder.
try (PreparedStatement preparedStatement =
connection.prepareStatement(
"INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)")) {
final List<Singer> singers =
Arrays.asList(
new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
new Singer(/* SingerId = */ 18L, "Maya", "Patel"));
for (Singer singer : singers) {
// Note that JDBC parameters start at index 1.
int paramIndex = 0;
preparedStatement.setLong( paramIndex, singer.singerId);
preparedStatement.setString( paramIndex, singer.firstName);
preparedStatement.setString( paramIndex, singer.lastName);
preparedStatement.addBatch();
}
int[] updateCounts = preparedStatement.executeBatch();
System.out.printf("%d records inserted.\n", Arrays.stream(updateCounts).sum());
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteDataWithDmlBatch(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
sql := "INSERT INTO singers (singer_id, first_name, last_name) "
"VALUES ($1, $2, $3)"
batch := &pgx.Batch{}
batch.Queue(sql, 16, "Sarah", "Wilson")
batch.Queue(sql, 17, "Ethan", "Miller")
batch.Queue(sql, 18, "Maya", "Patel")
br := conn.SendBatch(ctx, batch)
_, err = br.Exec()
if err := br.Close(); err != nil {
return err
}
if err != nil {
return err
}
fmt.Printf("%v records inserted\n", batch.Len())
return nil
}
Node.js
import { Client } from 'pg';
async function writeDataWithDmlBatch(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// node-postgres does not support PostgreSQL pipeline mode, so we must use the
// `start batch dml` / `run batch` statements to execute a DML batch.
const sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
await connection.query("start batch dml");
await connection.query(sql, [16, "Sarah", "Wilson"]);
await connection.query(sql, [17, "Ethan", "Miller"]);
await connection.query(sql, [18, "Maya", "Patel"]);
const result = await connection.query("run batch");
// RUN BATCH returns the update counts as an array of strings, with one element for each
// DML statement in the batch. This calculates the total number of affected rows from that array.
const updateCount = result.rows[0]["UPDATE_COUNTS"]
.map((s: string) => parseInt(s))
.reduce((c: number, current: number) => c current, 0);
console.log(`${updateCount} records inserted`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def write_data_with_dml_batch(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.executemany("INSERT INTO singers "
"(singer_id, first_name, last_name) "
"VALUES (%s, %s, %s)",
[(16, "Sarah", "Wilson",),
(17, "Ethan", "Miller",),
(18, "Maya", "Patel",), ])
print("%d records inserted" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithDmlBatchSample
{
readonly struct Singer
{
public Singer(long singerId, string firstName, string lastName)
{
SingerId = singerId;
FirstName = firstName;
LastName = lastName;
}
public long SingerId { get; }
public string FirstName { get; }
public string LastName { get; }
}
public static void WriteDataWithDmlBatch(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Add multiple rows in one DML batch.
const string sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
List<Singer> singers =
[
new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
new Singer(/* SingerId = */ 18L, "Maya", "Patel")
];
using var batch = new NpgsqlBatch(connection);
foreach (var singer in singers)
{
batch.BatchCommands.Add(new NpgsqlBatchCommand
{
CommandText = sql,
Parameters =
{
new NpgsqlParameter {Value = singer.SingerId},
new NpgsqlParameter {Value = singer.FirstName},
new NpgsqlParameter {Value = singer.LastName}
}
});
}
var updateCount = batch.ExecuteNonQuery();
Console.WriteLine($"{updateCount} records inserted.");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./write_data_with_dml_batch.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdmlbatch example-db
Go
go run sample_runner.go writeusingdmlbatch example-db
Node.js
npm start writeusingdmlbatch example-db
Python
python write_data_with_dml_batch.py example-db
C#
dotnet run writeusingdmlbatch example-db
您应该会看到:
3 records inserted.
使用变更写入数据
您还可以使用变更插入数据。
PGAdapter 会将 PostgreSQL COPY
命令转换为
变更。使用 COPY
是在
Spanner 数据库的数据。
COPY
操作默认为原子操作。原子操作
Spanner 受到提交大小限制。
如需了解详情,请参阅 CRUD 限制。
以下示例展示了如何执行非原子 COPY
操作。这样,
COPY
操作以超出提交大小限制。
psql
#!/bin/bash
# Get the source directory of this script.
directory=${BASH_SOURCE%/*}/
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Copy data to Spanner from a tab-separated text file using the COPY command.
psql -c "COPY singers (singer_id, first_name, last_name) FROM STDIN" \
< "${directory}singers_data.txt"
psql -c "COPY albums FROM STDIN" \
< "${directory}albums_data.txt"
echo "Copied singers and albums"
Java
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
class WriteDataWithCopy {
static void writeDataWithCopy(String host, int port, String database)
throws SQLException, IOException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Unwrap the PostgreSQL JDBC connection interface to get access to
// a CopyManager.
PGConnection pgConnection = connection.unwrap(PGConnection.class);
CopyManager copyManager = pgConnection.getCopyAPI();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
long numSingers =
copyManager.copyIn(
"COPY singers (singer_id, first_name, last_name) FROM STDIN",
WriteDataWithCopy.class.getResourceAsStream("singers_data.txt"));
System.out.printf("Copied %d singers\n", numSingers);
long numAlbums =
copyManager.copyIn(
"COPY albums FROM STDIN",
WriteDataWithCopy.class.getResourceAsStream("albums_data.txt"));
System.out.printf("Copied %d albums\n", numAlbums);
}
}
}
Go
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v5"
)
func WriteDataWithCopy(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'")
file, err := os.Open("samples/singers_data.txt")
if err != nil {
return err
}
tag, err := conn.PgConn().CopyFrom(ctx, file,
"copy singers (singer_id, first_name, last_name) from stdin")
if err != nil {
return err
}
fmt.Printf("Copied %v singers\n", tag.RowsAffected())
file, err = os.Open("samples/albums_data.txt")
if err != nil {
return err
}
tag, err = conn.PgConn().CopyFrom(ctx, file,
"copy albums from stdin")
if err != nil {
return err
}
fmt.Printf("Copied %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import { from as copyFrom } from 'pg-copy-streams'
import path from "path";
async function writeDataWithCopy(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Copy data from a csv file to Spanner using the COPY command.
// Note that even though the command says 'from stdin', the actual input comes from a file.
const copySingersStream = copyFrom('copy singers (singer_id, first_name, last_name) from stdin');
const ingestSingersStream = connection.query(copySingersStream);
const sourceSingersStream = fs.createReadStream(path.join(__dirname, 'singers_data.txt'));
await pipeline(sourceSingersStream, ingestSingersStream);
console.log(`Copied ${copySingersStream.rowCount} singers`);
const copyAlbumsStream = copyFrom('copy albums from stdin');
const ingestAlbumsStream = connection.query(copyAlbumsStream);
const sourceAlbumsStream = fs.createReadStream(path.join(__dirname, 'albums_data.txt'));
await pipeline(sourceAlbumsStream, ingestAlbumsStream);
console.log(`Copied ${copyAlbumsStream.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import os
import string
import psycopg
def write_data_with_copy(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
script_dir = os.path.dirname(os.path.abspath(__file__))
singers_file_path = os.path.join(script_dir, "singers_data.txt")
albums_file_path = os.path.join(script_dir, "albums_data.txt")
conn.autocommit = True
block_size = 1024
with conn.cursor() as cur:
with open(singers_file_path, "r") as f:
with cur.copy("COPY singers (singer_id, first_name, last_name) "
"FROM STDIN") as copy:
while data := f.read(block_size):
copy.write(data)
print("Copied %d singers" % cur.rowcount)
with open(albums_file_path, "r") as f:
with cur.copy("COPY albums "
"FROM STDIN") as copy:
while data := f.read(block_size):
copy.write(data)
print("Copied %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class WriteDataWithCopySample
{
public static void WriteDataWithCopy(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
using var cmd = new NpgsqlCommand("set spanner.autocommit_dml_mode='partitioned_non_atomic'", connection);
cmd.ExecuteNonQuery();
var singerCount = 0;
using var singerReader = new StreamReader("singers_data.txt");
using (var singerWriter = connection.BeginTextImport("COPY singers (singer_id, first_name, last_name) FROM STDIN"))
{
while (singerReader.ReadLine() is { } line)
{
singerWriter.WriteLine(line);
singerCount ;
}
}
Console.WriteLine($"Copied {singerCount} singers");
var albumCount = 0;
using var albumReader = new StreamReader("albums_data.txt");
using (var albumWriter = connection.BeginTextImport("COPY albums FROM STDIN"))
{
while (albumReader.ReadLine() is { } line)
{
albumWriter.WriteLine(line);
albumCount ;
}
}
Console.WriteLine($"Copied {albumCount} albums");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./write_data_with_copy.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar write example-db
Go
go run sample_runner.go write example-db
Node.js
npm start write example-db
Python
python write_data_with_copy.py example-db
C#
dotnet run write example-db
您应该会看到:
Copied 5 singers
Copied 5 albums
使用 SQL 查询数据
Spanner 支持使用 SQL 接口读取数据, 使用 Google Cloud CLI 或 以编程方式使用 PostgreSQL 驱动程序。
在命令行中
执行以下 SQL 语句,读取 Albums
表中所有列的值:
gcloud spanner databases execute-sql example-db --instance=test-instance \
--sql='SELECT singer_id, album_id, album_title FROM albums'
结果应为:
SingerId AlbumId AlbumTitle
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
使用 PostgreSQL 驱动程序
除了在命令行上执行 SQL 语句外,还可以发出 使用 PostgreSQL 驱动程序以编程方式创建相同的 SQL 语句。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "SELECT singer_id, album_id, album_title
FROM albums"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryData {
static void queryData(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery("SELECT singer_id, album_id, album_title FROM albums")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryData(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx, "SELECT singer_id, album_id, album_title "
"FROM albums")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId, albumId int64
var title string
err = rows.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryData(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query("SELECT singer_id, album_id, album_title "
"FROM albums");
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, album_id, album_title "
"FROM albums")
for album in cur:
print(album)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataSample
{
public static void QueryData(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, album_title FROM albums", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader.GetInt64(0)} {reader.GetInt64(1)} {reader.GetString(2)}");
}
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./query_data.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar query example-db
Go
go run sample_runner.go query example-db
Node.js
npm start query example-db
Python
python query_data.py example-db
C#
dotnet run query example-db
您应该会看到以下结果:
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
使用 SQL 参数进行查询
如果您的应用具有频繁执行的查询,您可以提高其性能 对其进行参数化处理。生成的参数查询可以缓存和重复使用 降低编译开销。如需了解详情,请参阅 使用查询参数加快频繁执行的查询的速度。
以下示例展示了如何在 WHERE
子句中使用形参
包含特定 LastName
值的查询记录。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Create a prepared statement to use a query parameter.
# Using a prepared statement for executing the same SQL string multiple
# times increases the execution speed of the statement.
psql -c "PREPARE select_singer AS
SELECT singer_id, first_name, last_name
FROM singers
WHERE last_name = \$1" \
-c "EXECUTE select_singer ('Garcia')"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryDataWithParameter {
static void queryDataWithParameter(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (PreparedStatement statement =
connection.prepareStatement(
"SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = ?")) {
statement.setString(1, "Garcia");
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
System.out.printf(
"%d %s %s\n",
resultSet.getLong("singer_id"),
resultSet.getString("first_name"),
resultSet.getString("last_name"));
}
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryDataWithParameter(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx,
"SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = $1", "Garcia")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId int64
var firstName, lastName string
err = rows.Scan(&singerId, &firstName, &lastName)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryWithParameter(host: string, port: number, database: string): Promise<void> {
// Connect to Spanner through PGAdapter.
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query(
"SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = $1", ["Garcia"]);
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data_with_parameter(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = %s", ("Garcia",))
for singer in cur:
print(singer)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataWithParameterSample
{
public static void QueryDataWithParameter(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, first_name, last_name "
"FROM singers "
"WHERE last_name = $1", connection);
cmd.Parameters.Add(new NpgsqlParameter { Value = "Garcia" });
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
}
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./query_data_with_parameter.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar querywithparameter example-db
Go
go run sample_runner.go querywithparameter example-db
Node.js
npm start querywithparameter example-db
Python
python query_data_with_parameter.py example-db
C#
dotnet run querywithparameter example-db
您应该会看到以下结果:
12 Melissa Garcia
更新数据库架构
假设您需要将名为 MarketingBudget
的新列添加到 Albums
表。向现有表添加新列需要更新数据库架构。Spanner 支持对数据库进行架构更新,同时
以便继续处理流量架构更新不需要
数据库离线,且不会锁定整个表或列;您可以继续
在架构更新期间将数据写入数据库。详细了解“支持”
架构更新和架构更改性能,
进行架构更新。
添加列
您可以使用 Google Cloud CLI 或 以编程方式使用 PostgreSQL 驱动程序。
在命令行中
使用以下 ALTER TABLE
命令向表添加新列:
gcloud spanner databases ddl update example-db --instance=test-instance \
--ddl='ALTER TABLE albums ADD COLUMN marketing_budget BIGINT'
您应该会看到:
Schema updating...done.
使用 PostgreSQL 驱动程序
使用 PostgreSQL 驱动程序执行 DDL 语句,以修改 schema:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "ALTER TABLE albums ADD COLUMN marketing_budget bigint"
echo "Added marketing_budget column"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
class AddColumn {
static void addColumn(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
connection.createStatement().execute("alter table albums add column marketing_budget bigint");
System.out.println("Added marketing_budget column");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func AddColumn(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
_, err = conn.Exec(ctx,
"ALTER TABLE albums "
"ADD COLUMN marketing_budget bigint")
if err != nil {
return err
}
fmt.Println("Added marketing_budget column")
return nil
}
Node.js
import { Client } from 'pg';
async function addColumn(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
await connection.query(
"ALTER TABLE albums "
"ADD COLUMN marketing_budget bigint");
console.log("Added marketing_budget column");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def add_column(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# DDL can only be executed when autocommit=True.
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("ALTER TABLE albums "
"ADD COLUMN marketing_budget bigint")
print("Added marketing_budget column")
C#
using Npgsql;
namespace dotnet_snippets;
public static class AddColumnSample
{
public static void AddColumn(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = connection.CreateCommand();
cmd.CommandText = "alter table albums add column marketing_budget bigint";
cmd.ExecuteNonQuery();
Console.WriteLine("Added marketing_budget column");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./add_column.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar addmarketingbudget example-db
Go
go run sample_runner.go addmarketingbudget example-db
Node.js
npm start addmarketingbudget example-db
Python
python add_column.py example-db
C#
dotnet run addmarketingbudget example-db
您应该会看到:
Added marketing_budget column
执行 DDL 批次
建议一次性执行多个架构修改。
您可以使用内置的
批量处理功能,具体方法是提交所有 DDL,
语句表示为一个以英文分号分隔的 SQL 字符串,或者使用
START BATCH DDL
和 RUN BATCH
语句。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Use a single SQL command to batch multiple statements together.
# Executing multiple DDL statements as one batch is more efficient
# than executing each statement individually.
# Separate the statements with semicolons.
psql << SQL
CREATE TABLE venues (
venue_id bigint not null primary key,
name varchar(1024),
description jsonb
);
CREATE TABLE concerts (
concert_id bigint not null primary key ,
venue_id bigint not null,
singer_id bigint not null,
start_time timestamptz,
end_time timestamptz,
constraint fk_concerts_venues foreign key
(venue_id) references venues (venue_id),
constraint fk_concerts_singers foreign key
(singer_id) references singers (singer_id)
);
SQL
echo "Added venues and concerts tables"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
class DdlBatch {
static void ddlBatch(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (Statement statement = connection.createStatement()) {
// Create two new tables in one batch.
statement.addBatch(
"CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")");
statement.addBatch(
"CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")");
statement.executeBatch();
}
System.out.println("Added venues and concerts tables");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func DdlBatch(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Executing multiple DDL statements as one batch is
// more efficient than executing each statement
// individually.
br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
{SQL: "CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")"},
{SQL: "CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")"},
}})
if _, err := br.Exec(); err != nil {
return err
}
if err := br.Close(); err != nil {
return err
}
fmt.Println("Added venues and concerts tables")
return nil
}
Node.js
import { Client } from 'pg';
async function ddlBatch(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Executing multiple DDL statements as one batch is
// more efficient than executing each statement
// individually.
await connection.query("start batch ddl");
await connection.query("CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")");
await connection.query("CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")");
await connection.query("run batch");
console.log("Added venues and concerts tables");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def ddl_batch(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# DDL can only be executed when autocommit=True.
conn.autocommit = True
# Use a pipeline to batch multiple statements together.
# Executing multiple DDL statements as one batch is
# more efficient than executing each statement
# individually.
with conn.pipeline():
# The following statements are buffered on PGAdapter
# until the pipeline ends.
conn.execute("CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")")
conn.execute("CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")")
print("Added venues and concerts tables")
C#
using Npgsql;
namespace dotnet_snippets;
public static class DdlBatchSample
{
public static void DdlBatch(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Create two new tables in one batch.
var batch = connection.CreateBatch();
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"CREATE TABLE venues ("
" venue_id bigint not null primary key,"
" name varchar(1024),"
" description jsonb"
")"));
batch.BatchCommands.Add(new NpgsqlBatchCommand(
"CREATE TABLE concerts ("
" concert_id bigint not null primary key ,"
" venue_id bigint not null,"
" singer_id bigint not null,"
" start_time timestamptz,"
" end_time timestamptz,"
" constraint fk_concerts_venues foreign key"
" (venue_id) references venues (venue_id),"
" constraint fk_concerts_singers foreign key"
" (singer_id) references singers (singer_id)"
")"));
batch.ExecuteNonQuery();
Console.WriteLine("Added venues and concerts tables");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./ddl_batch.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar ddlbatch example-db
Go
go run sample_runner.go ddlbatch example-db
Node.js
npm start ddlbatch example-db
Python
python ddl_batch.py example-db
C#
dotnet run ddlbatch example-db
您应该会看到:
Added venues and concerts tables
将数据写入新列
以下代码可将数据写入新列。对于 Albums(1, 1)
键控的行,该代码会将 MarketingBudget
设置为 100000
;而对于 Albums(2, 2)
键控的行,该代码会将其设置为 500000
。
COPY
命令转换为
变更。COPY
命令默认转换为 Insert
变更。
执行 set spanner.copy_upsert=true
以将 COPY
命令转换为
InsertOrUpdate
项变更。这可用于更新现有数据
Spanner。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
psql -c "set spanner.copy_upsert=true" \
-c "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN
WITH (DELIMITER ';')" \
<< DATA
1;1;100000
2;2;500000
DATA
echo "Copied albums using upsert"
Java
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
class UpdateDataWithCopy {
static void updateDataWithCopy(String host, int port, String database)
throws SQLException, IOException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Unwrap the PostgreSQL JDBC connection interface to get access to
// a CopyManager.
PGConnection pgConnection = connection.unwrap(PGConnection.class);
CopyManager copyManager = pgConnection.getCopyAPI();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
connection.createStatement().execute("set spanner.copy_upsert=true");
// COPY uses mutations to insert or update existing data in Spanner.
long numAlbums =
copyManager.copyIn(
"COPY albums (singer_id, album_id, marketing_budget) FROM STDIN",
new StringReader("1\t1\t100000\n" "2\t2\t500000\n"));
System.out.printf("Updated %d albums\n", numAlbums);
}
}
}
Go
import (
"context"
"fmt"
"io"
"github.com/jackc/pgx/v5"
)
func UpdateDataWithCopy(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable non-atomic mode. This makes the COPY operation non-atomic,
// and allows it to exceed the Spanner mutation limit.
if _, err := conn.Exec(ctx,
"set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
return err
}
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update data.
if _, err := conn.Exec(ctx, "set spanner.copy_upsert=true"); err != nil {
return err
}
// Create a pipe that can be used to write the data manually that we want to copy.
reader, writer := io.Pipe()
// Write the data to the pipe using a separate goroutine. This allows us to stream the data
// to the COPY operation row-by-row.
go func() error {
for _, record := range []string{"1\t1\t100000\n", "2\t2\t500000\n"} {
if _, err := writer.Write([]byte(record)); err != nil {
return err
}
}
if err := writer.Close(); err != nil {
return err
}
return nil
}()
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN")
if err != nil {
return err
}
fmt.Printf("Updated %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import {Readable} from "stream";
async function updateDataWithCopy(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
await connection.query("set spanner.copy_upsert=true");
// Copy data to Spanner using the COPY command.
const copyStream = copyFrom('COPY albums (singer_id, album_id, marketing_budget) FROM STDIN');
const ingestStream = connection.query(copyStream);
// Create a source stream and attach the source to the destination.
const sourceStream = new Readable();
const operation = pipeline(sourceStream, ingestStream);
// Manually push data to the source stream to write data to Spanner.
sourceStream.push("1\t1\t100000\n");
sourceStream.push("2\t2\t500000\n");
// Push a 'null' to indicate the end of the stream.
sourceStream.push(null);
// Wait for the copy operation to finish.
await operation;
console.log(`Updated ${copyStream.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def update_data_with_copy(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
cur.execute("set spanner.copy_upsert=true")
# COPY uses mutations to insert or update existing data in Spanner.
with cur.copy("COPY albums (singer_id, album_id, marketing_budget) "
"FROM STDIN") as copy:
copy.write_row((1, 1, 100000))
copy.write_row((2, 2, 500000))
print("Updated %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class UpdateDataWithCopySample
{
public static void UpdateDataWithCopy(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
// will succeed even if it exceeds Spanner's mutation limit per transaction.
using var cmd = connection.CreateCommand();
cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
cmd.ExecuteNonQuery();
// Instruct PGAdapter to use insert-or-update for COPY statements.
// This enables us to use COPY to update existing data.
cmd.CommandText = "set spanner.copy_upsert=true";
cmd.ExecuteNonQuery();
// COPY uses mutations to insert or update existing data in Spanner.
using (var albumWriter = connection.BeginTextImport(
"COPY albums (singer_id, album_id, marketing_budget) FROM STDIN"))
{
albumWriter.WriteLine("1\t1\t100000");
albumWriter.WriteLine("2\t2\t500000");
}
Console.WriteLine($"Updated 2 albums");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./update_data_with_copy.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar update example-db
Go
go run sample_runner.go update example-db
Node.js
npm start update example-db
Python
python update_data_with_copy.py example-db
C#
dotnet run update example-db
您应该会看到:
Updated 2 albums
您也可以执行 SQL 查询来获取刚刚写入的值。
以下是执行查询的代码:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql -c "SELECT singer_id, album_id, marketing_budget
FROM albums
ORDER BY singer_id, album_id"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class QueryDataWithNewColumn {
static void queryDataWithNewColumn(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("marketing_budget"));
}
}
}
}
}
Go
import (
"context"
"database/sql"
"fmt"
"github.com/jackc/pgx/v5"
)
func QueryDataWithNewColumn(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
rows, err := conn.Query(ctx, "SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId, albumId int64
var marketingBudget sql.NullString
err = rows.Scan(&singerId, &albumId, &marketingBudget)
if err != nil {
return err
}
var budget string
if marketingBudget.Valid {
budget = marketingBudget.String
} else {
budget = "NULL"
}
fmt.Printf("%v %v %v\n", singerId, albumId, budget)
}
return rows.Err()
}
Node.js
import { Client } from 'pg';
async function queryDataWithNewColumn(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
const result = await connection.query(
"SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id"
);
for (const row of result.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["marketing_budget"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def query_data_with_new_column(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id")
for album in cur:
print(album)
C#
using Npgsql;
namespace dotnet_snippets;
public static class QueryDataWithNewColumnSample
{
public static void QueryWithNewColumnData(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, marketing_budget "
"FROM albums "
"ORDER BY singer_id, album_id", connection);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["marketing_budget"]}");
}
}
}
使用以下命令运行查询:
psql
PGDATABASE=example-db ./query_data_with_new_column.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar querymarketingbudget example-db
Go
go run sample_runner.go querymarketingbudget example-db
Node.js
npm start querymarketingbudget example-db
Python
python query_data_with_new_column.py example-db
C#
dotnet run querymarketingbudget example-db
您应该会看到:
1 1 100000
1 2 null
2 1 null
2 2 500000
2 3 null
更新数据
您可以在读写事务中使用 DML 来更新数据。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Transfer marketing budget from one album to another.
-- We do it in a transaction to ensure that the transfer is atomic.
-- Begin a read/write transaction.
begin;
-- Increase the marketing budget of album 1 if album 2 has enough budget.
-- The condition that album 2 has enough budget is guaranteed for the
-- duration of the transaction, as read/write transactions in Spanner use
-- external consistency as the default isolation level.
update albums set
marketing_budget = marketing_budget 200000
where singer_id = 1
and album_id = 1
and exists (
select album_id
from albums
where singer_id = 2
and album_id = 2
and marketing_budget > 200000
);
-- Decrease the marketing budget of album 2.
update albums set
marketing_budget = marketing_budget - 200000
where singer_id = 2
and album_id = 2
and marketing_budget > 200000;
-- Commit the transaction to make the changes to both marketing budgets
-- durably stored in the database and visible to other transactions.
commit;
SQL
echo "Transferred marketing budget from Album 2 to Album 1"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class UpdateDataWithTransaction {
static void writeWithTransactionUsingDml(String host, int port, String database)
throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic. There is no need
// to explicitly start the transaction. The first statement on the
// connection will start a transaction when AutoCommit=false.
String selectMarketingBudgetSql =
"SELECT marketing_budget from albums WHERE singer_id = ? and album_id = ?";
long album2Budget = 0;
try (PreparedStatement selectMarketingBudgetStatement =
connection.prepareStatement(selectMarketingBudgetSql)) {
// Bind the query parameters to SingerId=2 and AlbumId=2.
selectMarketingBudgetStatement.setLong(1, 2);
selectMarketingBudgetStatement.setLong(2, 2);
try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
while (resultSet.next()) {
album2Budget = resultSet.getLong("marketing_budget");
}
}
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
final long transfer = 200000;
if (album2Budget >= transfer) {
long album1Budget = 0;
// Re-use the existing PreparedStatement for selecting the
// marketing_budget to get the budget for Album 1.
// Bind the query parameters to SingerId=1 and AlbumId=1.
selectMarketingBudgetStatement.setLong(1, 1);
selectMarketingBudgetStatement.setLong(2, 1);
try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
while (resultSet.next()) {
album1Budget = resultSet.getLong("marketing_budget");
}
}
// Transfer part of the marketing budget of Album 2 to Album 1.
album1Budget = transfer;
album2Budget -= transfer;
String updateSql =
"UPDATE albums "
"SET marketing_budget = ? "
"WHERE singer_id = ? and album_id = ?";
try (PreparedStatement updateStatement = connection.prepareStatement(updateSql)) {
// Update Album 1.
int paramIndex = 0;
updateStatement.setLong( paramIndex, album1Budget);
updateStatement.setLong( paramIndex, 1);
updateStatement.setLong( paramIndex, 1);
// Create a DML batch by calling addBatch
// on the current PreparedStatement.
updateStatement.addBatch();
// Update Album 2 in the same DML batch.
paramIndex = 0;
updateStatement.setLong( paramIndex, album2Budget);
updateStatement.setLong( paramIndex, 2);
updateStatement.setLong( paramIndex, 2);
updateStatement.addBatch();
// Execute both DML statements in one batch.
updateStatement.executeBatch();
}
}
}
// Commit the current transaction.
connection.commit();
System.out.println("Transferred marketing budget from Album 2 to Album 1");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func WriteWithTransactionUsingDml(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic.
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
const selectSql = "SELECT marketing_budget "
"from albums "
"WHERE singer_id = $1 and album_id = $2"
// Get the marketing_budget of singer 2 / album 2.
row := tx.QueryRow(ctx, selectSql, 2, 2)
var budget2 int64
if err := row.Scan(&budget2); err != nil {
tx.Rollback(ctx)
return err
}
const transfer = 20000
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
if budget2 >= transfer {
// Get the marketing_budget of singer 1 / album 1.
row := tx.QueryRow(ctx, selectSql, 1, 1)
var budget1 int64
if err := row.Scan(&budget1); err != nil {
tx.Rollback(ctx)
return err
}
// Transfer part of the marketing budget of Album 2 to Album 1.
budget1 = transfer
budget2 -= transfer
const updateSql = "UPDATE albums "
"SET marketing_budget = $1 "
"WHERE singer_id = $2 and album_id = $3"
// Start a DML batch and execute it as part of the current transaction.
batch := &pgx.Batch{}
batch.Queue(updateSql, budget1, 1, 1)
batch.Queue(updateSql, budget2, 2, 2)
br := tx.SendBatch(ctx, batch)
_, err = br.Exec()
if err := br.Close(); err != nil {
tx.Rollback(ctx)
return err
}
}
// Commit the current transaction.
tx.Commit(ctx)
fmt.Println("Transferred marketing budget from Album 2 to Album 1")
return nil
}
Node.js
import { Client } from 'pg';
async function writeWithTransactionUsingDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Transfer marketing budget from one album to another. We do it in a
// transaction to ensure that the transfer is atomic. node-postgres
// requires you to explicitly start the transaction by executing 'begin'.
await connection.query("begin");
const selectMarketingBudgetSql = "SELECT marketing_budget "
"from albums "
"WHERE singer_id = $1 and album_id = $2";
// Get the marketing_budget of singer 2 / album 2.
const album2BudgetResult = await connection.query(selectMarketingBudgetSql, [2, 2]);
let album2Budget = album2BudgetResult.rows[0]["marketing_budget"];
const transfer = 200000;
// The transaction will only be committed if this condition still holds
// at the time of commit. Otherwise, the transaction will be aborted.
if (album2Budget >= transfer) {
// Get the marketing budget of singer 1 / album 1.
const album1BudgetResult = await connection.query(selectMarketingBudgetSql, [1, 1]);
let album1Budget = album1BudgetResult.rows[0]["marketing_budget"];
// Transfer part of the marketing budget of Album 2 to Album 1.
album1Budget = transfer;
album2Budget -= transfer;
const updateSql = "UPDATE albums "
"SET marketing_budget = $1 "
"WHERE singer_id = $2 and album_id = $3";
// Start a DML batch. This batch will become part of the current transaction.
// TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
// await connection.query("start batch dml");
// Update the marketing budget of both albums.
await connection.query(updateSql, [album1Budget, 1, 1]);
await connection.query(updateSql, [album2Budget, 2, 2]);
// TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
// await connection.query("run batch");
}
// Commit the current transaction.
await connection.query("commit");
console.log("Transferred marketing budget from Album 2 to Album 1");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def update_data_with_transaction(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# Set autocommit=False to use transactions.
# The first statement that is executed starts the transaction.
conn.autocommit = False
with conn.cursor() as cur:
# Transfer marketing budget from one album to another.
# We do it in a transaction to ensure that the transfer is atomic.
# There is no need to explicitly start the transaction. The first
# statement on the connection will start a transaction when
# AutoCommit=false.
select_marketing_budget_sql = ("SELECT marketing_budget "
"from albums "
"WHERE singer_id = %s "
"and album_id = %s")
# Get the marketing budget of Album #2.
cur.execute(select_marketing_budget_sql, (2, 2))
album2_budget = cur.fetchone()[0]
transfer = 200000
if album2_budget > transfer:
# Get the marketing budget of Album #1.
cur.execute(select_marketing_budget_sql, (1, 1))
album1_budget = cur.fetchone()[0]
# Transfer the marketing budgets and write the update back
# to the database.
album1_budget = transfer
album2_budget -= transfer
update_sql = ("update albums "
"set marketing_budget = %s "
"where singer_id = %s "
"and album_id = %s")
# Use a pipeline to execute two DML statements in one batch.
with conn.pipeline():
cur.execute(update_sql, (album1_budget, 1, 1,))
cur.execute(update_sql, (album2_budget, 2, 2,))
else:
print("Insufficient budget to transfer")
# Commit the transaction.
conn.commit()
print("Transferred marketing budget from Album 2 to Album 1")
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class TagsSample
{
public static void Tags(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a transaction with isolation level Serializable.
// Spanner only supports this isolation level. Trying to use a lower
// isolation level (including the default isolation level READ COMMITTED),
// will result in an error.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
// Create a command that uses the current transaction.
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
cmd.ExecuteNonQuery();
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
cmd.ExecuteNonQuery();
// Get the marketing_budget of Album (1,1).
cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
var marketingBudget = (long?)cmd.ExecuteScalar();
// Reduce the marketing budget by 10% if it is more than 1,000.
if (marketingBudget > 1000L)
{
marketingBudget -= (long) (marketingBudget * 0.1);
// Set the statement tag to use for the update statement.
cmd.Parameters.Clear();
cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
cmd.ExecuteNonQuery();
cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.ExecuteNonQuery();
}
// Commit the current transaction.
transaction.Commit();
Console.WriteLine("Reduced marketing budget");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./update_data_with_transaction.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar writewithtransactionusingdml example-db
Go
go run sample_runner.go writewithtransactionusingdml example-db
Node.js
npm start writewithtransactionusingdml example-db
Python
python update_data_with_transaction.py example-db
C#
dotnet run writewithtransactionusingdml example-db
您应该会看到:
Transferred marketing budget from Album 2 to Album 1
交易代码和请求代码
使用事务代码和请求代码
来排查 Spanner 中的事务和查询问题。您可以设置
交易代码以及请求代码与 SPANNER.TRANSACTION_TAG
和
SPANNER.STATEMENT_TAG
会话变量。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Start a transaction.
begin;
-- Set the TRANSACTION_TAG session variable to set a transaction tag
-- for the current transaction. This can only be executed at the start
-- of the transaction.
set spanner.transaction_TAG='example-tx-tag';
-- Set the STATEMENT_TAG session variable to set the request tag
-- that should be included with the next SQL statement.
set spanner.statement_tag='query-marketing-budget';
select marketing_budget
from albums
where singer_id = 1
and album_id = 1;
-- Reduce the marketing budget by 10% if it is more than 1,000.
-- Set a statement tag for the update statement.
set spanner.statement_tag='reduce-marketing-budget';
update albums
set marketing_budget = marketing_budget - (marketing_budget * 0.1)::bigint
where singer_id = 1
and album_id = 1
and marketing_budget > 1000;
commit;
SQL
echo "Reduced marketing budget"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
class Tags {
static void tags(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
connection.createStatement().execute("set spanner.transaction_tag='example-tx-tag'");
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
connection.createStatement().execute("set spanner.statement_tag='query-marketing-budget'");
long marketingBudget = 0L;
long singerId = 1L;
long albumId = 1L;
try (PreparedStatement statement =
connection.prepareStatement(
"select marketing_budget from albums where singer_id=? and album_id=?")) {
statement.setLong(1, singerId);
statement.setLong(2, albumId);
try (ResultSet albumResultSet = statement.executeQuery()) {
while (albumResultSet.next()) {
marketingBudget = albumResultSet.getLong(1);
}
}
}
// Reduce the marketing budget by 10% if it is more than 1,000.
final long maxMarketingBudget = 1000L;
final float reduction = 0.1f;
if (marketingBudget > maxMarketingBudget) {
marketingBudget -= (long) (marketingBudget * reduction);
connection.createStatement().execute("set spanner.statement_tag='reduce-marketing-budget'");
try (PreparedStatement statement =
connection.prepareStatement(
"update albums set marketing_budget=? where singer_id=? AND album_id=?")) {
int paramIndex = 0;
statement.setLong( paramIndex, marketingBudget);
statement.setLong( paramIndex, singerId);
statement.setLong( paramIndex, albumId);
statement.executeUpdate();
}
}
// Commit the current transaction.
connection.commit();
System.out.println("Reduced marketing budget");
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func Tags(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
_, _ = tx.Exec(ctx, "set spanner.transaction_tag='example-tx-tag'")
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
_, _ = tx.Exec(ctx, "set spanner.statement_tag='query-marketing-budget'")
row := tx.QueryRow(ctx, "select marketing_budget "
"from albums "
"where singer_id=$1 and album_id=$2", 1, 1)
var budget int64
if err := row.Scan(&budget); err != nil {
tx.Rollback(ctx)
return err
}
// Reduce the marketing budget by 10% if it is more than 1,000.
if budget > 1000 {
budget = int64(float64(budget) - float64(budget)*0.1)
_, _ = tx.Exec(ctx, "set spanner.statement_tag='reduce-marketing-budget'")
if _, err := tx.Exec(ctx, "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3", budget, 1, 1); err != nil {
tx.Rollback(ctx)
return err
}
}
// Commit the current transaction.
tx.Commit(ctx)
fmt.Println("Reduced marketing budget")
return nil
}
Node.js
import { Client } from 'pg';
async function tags(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
await connection.query("begin");
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
await connection.query("set spanner.transaction_tag='example-tx-tag'");
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
await connection.query("set spanner.statement_tag='query-marketing-budget'");
const budgetResult = await connection.query(
"select marketing_budget "
"from albums "
"where singer_id=$1 and album_id=$2", [1, 1])
let budget = budgetResult.rows[0]["marketing_budget"];
// Reduce the marketing budget by 10% if it is more than 1,000.
if (budget > 1000) {
budget = budget - budget * 0.1;
await connection.query("set spanner.statement_tag='reduce-marketing-budget'");
await connection.query("update albums set marketing_budget=$1 "
"where singer_id=$2 AND album_id=$3", [budget, 1, 1]);
}
// Commit the current transaction.
await connection.query("commit");
console.log("Reduced marketing budget");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def tags(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
# Set autocommit=False to enable transactions.
conn.autocommit = False
with conn.cursor() as cur:
# Set the TRANSACTION_TAG session variable to set a transaction tag
# for the current transaction.
cur.execute("set spanner.transaction_TAG='example-tx-tag'")
# Set the STATEMENT_TAG session variable to set the request tag
# that should be included with the next SQL statement.
cur.execute("set spanner.statement_tag='query-marketing-budget'")
singer_id = 1
album_id = 1
cur.execute("select marketing_budget "
"from albums "
"where singer_id = %s "
" and album_id = %s",
(singer_id, album_id,))
marketing_budget = cur.fetchone()[0]
# Reduce the marketing budget by 10% if it is more than 1,000.
max_marketing_budget = 1000
reduction = 0.1
if marketing_budget > max_marketing_budget:
# Make sure the marketing_budget remains an int.
marketing_budget -= int(marketing_budget * reduction)
# Set a statement tag for the update statement.
cur.execute(
"set spanner.statement_tag='reduce-marketing-budget'")
cur.execute("update albums set marketing_budget = %s "
"where singer_id = %s "
" and album_id = %s",
(marketing_budget, singer_id, album_id,))
else:
print("Marketing budget already less than or equal to 1,000")
# Commit the transaction.
conn.commit()
print("Reduced marketing budget")
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class TagsSample
{
public static void Tags(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a transaction with isolation level Serializable.
// Spanner only supports this isolation level. Trying to use a lower
// isolation level (including the default isolation level READ COMMITTED),
// will result in an error.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
// Create a command that uses the current transaction.
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// Set the TRANSACTION_TAG session variable to set a transaction tag
// for the current transaction.
cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
cmd.ExecuteNonQuery();
// Set the STATEMENT_TAG session variable to set the request tag
// that should be included with the next SQL statement.
cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
cmd.ExecuteNonQuery();
// Get the marketing_budget of Album (1,1).
cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
var marketingBudget = (long?)cmd.ExecuteScalar();
// Reduce the marketing budget by 10% if it is more than 1,000.
if (marketingBudget > 1000L)
{
marketingBudget -= (long) (marketingBudget * 0.1);
// Set the statement tag to use for the update statement.
cmd.Parameters.Clear();
cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
cmd.ExecuteNonQuery();
cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
cmd.ExecuteNonQuery();
}
// Commit the current transaction.
transaction.Commit();
Console.WriteLine("Reduced marketing budget");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./tags.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar tags example-db
Go
go run sample_runner.go tags example-db
Node.js
npm start tags example-db
Python
python tags.py example-db
C#
dotnet run tags example-db
使用只读事务检索数据
假设您要在同一时间戳执行多个读取操作。只读事务会遵从一致的事务提交历史记录前缀,因此您的应用始终可获得一致的数据。
将连接设置为只读或使用 SET TRANSACTION READ ONLY
SQL
语句来执行只读事务。
下面演示了如何运行查询并在同一只读事务中执行读取操作:
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
psql << SQL
-- Begin a transaction.
begin;
-- Change the current transaction to a read-only transaction.
-- This statement can only be executed at the start of a transaction.
set transaction read only;
-- The following two queries use the same read-only transaction.
select singer_id, album_id, album_title
from albums
order by singer_id, album_id;
select singer_id, album_id, album_title
from albums
order by album_title;
-- Read-only transactions must also be committed or rolled back to mark
-- the end of the transaction. There is no semantic difference between
-- rolling back or committing a read-only transaction.
commit;
SQL
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class ReadOnlyTransaction {
static void readOnlyTransaction(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Set AutoCommit=false to enable transactions.
connection.setAutoCommit(false);
// This SQL statement instructs the JDBC driver to use
// a read-only transaction.
connection.createStatement().execute("set transaction read only");
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY singer_id, album_id")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY album_title")) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong("singer_id"),
resultSet.getLong("album_id"),
resultSet.getString("album_title"));
}
}
// End the read-only transaction by calling commit().
connection.commit();
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func ReadOnlyTransaction(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Start a read-only transaction by supplying additional transaction options.
tx, err := conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
albumsOrderedById, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY singer_id, album_id")
defer albumsOrderedById.Close()
if err != nil {
return err
}
for albumsOrderedById.Next() {
var singerId, albumId int64
var title string
err = albumsOrderedById.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
albumsOrderedTitle, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY album_title")
defer albumsOrderedTitle.Close()
if err != nil {
return err
}
for albumsOrderedTitle.Next() {
var singerId, albumId int64
var title string
err = albumsOrderedTitle.Scan(&singerId, &albumId, &title)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, albumId, title)
}
// End the read-only transaction by calling Commit().
return tx.Commit(ctx)
}
Node.js
import { Client } from 'pg';
async function readOnlyTransaction(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Start a transaction.
await connection.query("begin");
// This SQL statement instructs the PGAdapter to make it a read-only transaction.
await connection.query("set transaction read only");
const albumsOrderById = await connection.query(
"SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY singer_id, album_id");
for (const row of albumsOrderById.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
const albumsOrderByTitle = await connection.query(
"SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY album_title");
for (const row of albumsOrderByTitle.rows) {
console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
}
// End the read-only transaction by executing commit.
await connection.query("commit");
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def read_only_transaction(host: string, port: int, database: string):
with (psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn):
# Set autocommit=False to enable transactions.
conn.autocommit = False
with conn.cursor() as cur:
# Change the current transaction to a read-only transaction.
# This statement can only be executed at the start of a transaction.
cur.execute("set transaction read only")
# The following two queries use the same read-only transaction.
cur.execute("select singer_id, album_id, album_title "
"from albums "
"order by singer_id, album_id")
for album in cur:
print(album)
cur.execute("select singer_id, album_id, album_title "
"from albums "
"order by album_title")
for album in cur:
print(album)
# Read-only transactions must also be committed or rolled back to mark
# the end of the transaction. There is no semantic difference between
# rolling back or committing a read-only transaction.
conn.commit()
C#
using Npgsql;
using System.Data;
namespace dotnet_snippets;
public static class ReadOnlyTransactionSample
{
public static void ReadOnlyTransaction(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Start a read-only transaction.
// You must specify Serializable as the isolation level, as the npgsql driver
// will otherwise automatically set the isolation level to read-committed.
var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
using var cmd = connection.CreateCommand();
cmd.Transaction = transaction;
// This SQL statement instructs the npgsql driver to use
// a read-only transaction.
cmd.CommandText = "set transaction read only";
cmd.ExecuteNonQuery();
cmd.CommandText = "SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY singer_id, album_id";
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
}
}
cmd.CommandText = "SELECT singer_id, album_id, album_title "
"FROM albums "
"ORDER BY album_title";
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
}
}
// End the read-only transaction by calling commit().
transaction.Commit();
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./read_only_transaction.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar readonlytransaction example-db
Go
go run sample_runner.go readonlytransaction example-db
Node.js
npm start readonlytransaction example-db
Python
python read_only_transaction.py example-db
C#
dotnet run readonlytransaction example-db
您看到的输出结果应该类似于以下内容:
1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified
2 2 Forever Hold Your Peace
1 2 Go, Go, Go
2 1 Green
2 3 Terrified
1 1 Total Junk
分区查询和 Data Boost
partitionQuery
API 将查询划分为较小的部分或分区,并使用多个
并行提取分区。每个分区都由
分区标记。PartitionQuery API 的延迟时间比标准
Query API,因为它仅适用于批量操作,例如导出或
扫描整个数据库。
数据提升 执行分析查询和数据导出,费用接近于零 对已预配 Spanner 实例上现有工作负载的影响。 Data Boost 仅支持分区查询。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# 'set spanner.data_boost_enabled=true' enables Data Boost for
# all partitioned queries on this connection.
# 'run partitioned query' is a shortcut for partitioning the query
# that follows and executing each of the partitions that is returned
# by Spanner.
psql -c "set spanner.data_boost_enabled=true" \
-c "run partitioned query
select singer_id, first_name, last_name
from singers"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
class DataBoost {
static void dataBoost(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// This enables Data Boost for all partitioned queries on this connection.
connection.createStatement().execute("set spanner.data_boost_enabled=true");
// Run a partitioned query. This query will use Data Boost.
try (ResultSet resultSet =
connection
.createStatement()
.executeQuery(
"run partitioned query "
"select singer_id, first_name, last_name "
"from singers")) {
while (resultSet.next()) {
System.out.printf(
"%d %s %s\n",
resultSet.getLong("singer_id"),
resultSet.getString("first_name"),
resultSet.getString("last_name"));
}
}
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func DataBoost(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// This enables Data Boost for all partitioned queries on this connection.
_, _ = conn.Exec(ctx, "set spanner.data_boost_enabled=true")
// Run a partitioned query. This query will use Data Boost.
rows, err := conn.Query(ctx, "run partitioned query select singer_id, first_name, last_name from singers")
defer rows.Close()
if err != nil {
return err
}
for rows.Next() {
var singerId int64
var firstName, lastName string
err = rows.Scan(&singerId, &firstName, &lastName)
if err != nil {
return err
}
fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
}
return nil
}
Node.js
import { Client } from 'pg';
async function dataBoost(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// This enables Data Boost for all partitioned queries on this connection.
await connection.query("set spanner.data_boost_enabled=true");
// Run a partitioned query. This query will use Data Boost.
const singers = await connection.query(
"run partitioned query "
"select singer_id, first_name, last_name "
"from singers");
for (const row of singers.rows) {
console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
}
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def data_boost(host: string, port: int, database: string):
with (psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn):
# Set autocommit=True so each query uses a separate transaction.
conn.autocommit = True
with conn.cursor() as cur:
# This enables Data Boost for all partitioned queries on this
# connection.
cur.execute("set spanner.data_boost_enabled=true")
# Run a partitioned query. This query will use Data Boost.
cur.execute("run partitioned query "
"select singer_id, first_name, last_name "
"from singers")
for singer in cur:
print(singer)
C#
using Npgsql;
namespace dotnet_snippets;
public static class DataBoostSample
{
public static void DataBoost(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = connection.CreateCommand();
// This enables Data Boost for all partitioned queries on this connection.
cmd.CommandText = "set spanner.data_boost_enabled=true";
cmd.ExecuteNonQuery();
// Run a partitioned query. This query will use Data Boost.
cmd.CommandText = "run partitioned query "
"select singer_id, first_name, last_name "
"from singers";
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
}
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./data_boost.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar databoost example-db
Go
go run sample_runner.go databoost example-db
Node.js
npm start databoost example-db
Python
python data_boost.py example-db
C#
dotnet run databoost example-db
如需详细了解如何运行分区查询以及如何搭配使用 Data Boost 和 PGAdapter,请参阅: Data Boost 和分区查询语句
分区 DML
分区数据操纵语言 (DML) 是 适用于以下类型的批量更新和删除操作:
- 定期清理和垃圾回收。
- 使用默认值回填新列。
psql
#!/bin/bash
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"
# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See https://cloud.google.com/spanner/docs/dml-partitioned for more
# information.
psql -c "set spanner.autocommit_dml_mode='partitioned_non_atomic'" \
-c "update albums
set marketing_budget=0
where marketing_budget is null"
echo "Updated albums using Partitioned DML"
Java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
class PartitionedDml {
static void partitionedDml(String host, int port, String database) throws SQLException {
String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Enable Partitioned DML on this connection.
connection
.createStatement()
.execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Back-fill a default value for the MarketingBudget column.
long lowerBoundUpdateCount =
connection
.createStatement()
.executeUpdate("update albums set marketing_budget=0 where marketing_budget is null");
System.out.printf("Updated at least %d albums\n", lowerBoundUpdateCount);
}
}
}
Go
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func PartitionedDML(host string, port int, database string) error {
ctx := context.Background()
connString := fmt.Sprintf(
"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
host, port, database)
conn, err := pgx.Connect(ctx, connString)
if err != nil {
return err
}
defer conn.Close(ctx)
// Enable Partitioned DML on this connection.
if _, err := conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
return err
}
// Back-fill a default value for the MarketingBudget column.
tag, err := conn.Exec(ctx, "update albums set marketing_budget=0 where marketing_budget is null")
if err != nil {
return err
}
fmt.Printf("Updated at least %v albums\n", tag.RowsAffected())
return nil
}
Node.js
import { Client } from 'pg';
async function partitionedDml(host: string, port: number, database: string): Promise<void> {
const connection = new Client({
host: host,
port: port,
database: database,
});
await connection.connect();
// Enable Partitioned DML on this connection.
await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
// Back-fill a default value for the MarketingBudget column.
const lowerBoundUpdateCount = await connection.query(
"update albums "
"set marketing_budget=0 "
"where marketing_budget is null");
console.log(`Updated at least ${lowerBoundUpdateCount.rowCount} albums`);
// Close the connection.
await connection.end();
}
Python
import string
import psycopg
def execute_partitioned_dml(host: string, port: int, database: string):
with psycopg.connect("host={host} port={port} dbname={database} "
"sslmode=disable".format(host=host,
port=port,
database=database)) as conn:
conn.autocommit = True
with conn.cursor() as cur:
# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See https://cloud.google.com/spanner/docs/dml-partitioned for more
# information.
cur.execute(
"set spanner.autocommit_dml_mode='partitioned_non_atomic'")
# The following statement will use Partitioned DML.
cur.execute("update albums "
"set marketing_budget=0 "
"where marketing_budget is null")
print("Updated at least %d albums" % cur.rowcount)
C#
using Npgsql;
namespace dotnet_snippets;
public static class PartitionedDmlSample
{
public static void PartitionedDml(string host, int port, string database)
{
var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
// Enable Partitioned DML on this connection.
using var cmd = connection.CreateCommand();
cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
cmd.ExecuteNonQuery();
// Back-fill a default value for the MarketingBudget column.
cmd.CommandText = "update albums set marketing_budget=0 where marketing_budget is null";
var lowerBoundUpdateCount = cmd.ExecuteNonQuery();
Console.WriteLine($"Updated at least {lowerBoundUpdateCount} albums");
}
}
使用以下命令运行该示例:
psql
PGDATABASE=example-db ./partitioned_dml.sh
Java
java -jar target/pgadapter-snippets/pgadapter-samples.jar partitioneddml example-db
Go
go run sample_runner.go partitioneddml example-db
Node.js
npm start partitioneddml example-db
Python
python partitioned_dml.py example-db
C#
dotnet run datpartitioneddmlboost example-db
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生额外费用,请删除数据库和您创建的实例。
删除数据库
如果您删除一个实例,则该实例中的所有数据库都会自动删除。 本步骤演示了如何在不删除实例的情况下删除数据库(您仍需为该实例付费)。
在命令行中
gcloud spanner databases delete example-db --instance=test-instance
使用 Google Cloud 控制台
前往 Google Cloud 控制台中的 Spanner 实例页面。
点击实例。
点击您想删除的数据库。
在数据库详细信息页面中,点击删除。
确认您要删除数据库并点击删除。
删除实例
删除实例会自动删除在该实例中创建的所有数据库。
在命令行中
gcloud spanner instances delete test-instance
使用 Google Cloud 控制台
前往 Google Cloud 控制台中的 Spanner 实例页面。
点击您的实例。
点击删除。
确认您要删除实例并点击删除。
后续步骤
了解如何通过虚拟机实例访问 Spanner。
要了解授权和身份验证凭据,请参阅进行身份验证 使用客户端库的 Cloud 服务。
详细了解 Spanner 架构设计最佳实践。