Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming LIST data over ADBC #2066

Closed
danielballan opened this issue Aug 7, 2024 · 7 comments · Fixed by #2153
Closed

Streaming LIST data over ADBC #2066

danielballan opened this issue Aug 7, 2024 · 7 comments · Fixed by #2153
Labels
Type: question Usage question

Comments

@danielballan
Copy link

danielballan commented Aug 7, 2024

What would you like help with?

Hello @paleolimbot, we briefly spoke at SciPy a month ago. @cpcloud and @gforsyth sent me your way.

My colleauge @skarakuzu and I have the following application:

Science Experiment ---> ~10Hz rows of data over HTTP ---> Database

In our use case, we are interested in reading partial datasets while an experiment is still ongoing, but we are not especially sensitive to real-time access; a little lag is acceptable. We would like to use Postgres for facility-scale "production" and SQLite for small deployments and dev/test.

We think it makes sense to use ADBC to get the data into Postgres and SQLite.

Most of our data is simple tabular data with basic types. This is a simple sketch that works:

import os

import pandas
import pyarrow

import adbc_driver_postgresql.dbapi
import adbc_driver_sqlite.dbapi


conn = adbc_driver_sqlite.dbapi.connect("test.sqlite")
with conn.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS example")


dfs = [
    pandas.DataFrame({"A": [1, 2, 3], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [4, 5, 6], "B": [40., 50., 60.]}),
]

# Create a table and append to it.
with conn.cursor() as cur:
    for df in dfs:
        table = pyarrow.Table.from_pandas(df)
        cur.adbc_ingest("example", table, mode="create_append")
        cur.execute("SELECT * FROM example")
        print(cur.fetchall())
conn.close()

In rare cases, we have detectors that produce a small "waveform" of data, a small variable-length list, alongisde other columns of scalar readings, like this:

dfs = [
    pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}),
]

Expand for a complete runnable example with this data:

import os

import pandas
import pyarrow

import adbc_driver_postgresql.dbapi
import adbc_driver_sqlite.dbapi


conn = adbc_driver_sqlite.dbapi.connect("test.sqlite")
with conn.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS example")


dfs = [
    pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}),
]

# Create a table and append to it.
with conn.cursor() as cur:
    for df in dfs:
        table = pyarrow.Table.from_pandas(df)
        cur.adbc_ingest("example", table, mode="create_append")
        cur.execute("SELECT * FROM example")
        print(cur.fetchall())
conn.close()

As you would expect this fails with a clear error message:

adbc_driver_manager.NotSupportedError: NOT_IMPLEMENTED: Column 0 has unsupported type list

It looks like support for lists was recently added for Postgres in #1962 but not yet released. We have tried without success to cobble together a C example inserting lists into PostgreSQL on main. I wonder if you could sketch a short runnable example to help us get started. (Let us know if you want to see what we've tried.) We would happy to contribute to the C documentation for ADBC once get something working.

Once #1962 is released, should we expect lists to work from Python as well, as it wraps the C/C , or are more changes needed on the Python side to make that work?

Finally, we're interested in adding support for lists in SQLite. As SQLite is loose about types, perhaps one reasonable way to do this would be with a JSON column. I wonder what you think of that approach.

@danielballan danielballan added the Type: question Usage question label Aug 7, 2024
@paleolimbot
Copy link
Member

paleolimbot commented Aug 8, 2024

I find installing the development ADBC Python packages rather difficult; however, I do have a build set up and ran your example (thanks!) against the postgres driver at main and it seems to fail! That error is coming from a previous implementation of ingestion that doesn't use COPY, and the COPY writer is where the feature was added in the PR you linked. It also may be that a recent PR I did resulted in the COPY path not getting picked up.

import os

import pandas
import pyarrow

import adbc_driver_postgresql.dbapi

# From the apache-adbc checkout, run docker compose up postgres-test
conn = adbc_driver_postgresql.dbapi.connect("postgresql://localhost:5432/postgres?user=postgres&password=password")
with conn.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS example")

dfs = [
    pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}),
]

# Create a table and append to it.
with conn.cursor() as cur:
    for df in dfs:
        table = pyarrow.Table.from_pandas(df)
        cur.adbc_ingest("example", table, mode="create_append")
        cur.execute("SELECT * FROM example")
        print(cur.fetchall())
conn.close()
#> Traceback
#> ...
#> NotSupportedError: NOT_IMPLEMENTED: [libpq] Field #1 ('A') has unsupported type for ingestion list

Reproducer in R:

library(adbcdrivermanager)
#> Warning: package 'adbcdrivermanager' was built under R version 4.3.3

con <- adbc_database_init(
  adbcpostgresql::adbcpostgresql(),
  uri = "postgresql://localhost:5432/postgres?user=postgres&password=password"
) |> 
  adbc_connection_init()

df <- tibble::tibble(
  A = vctrs::list_of(1:3, 4:5, 6:10),
  B = c(10.0, 20.0, 30.0)
)

con |> 
  execute_adbc("DROP TABLE IF EXISTS table_with_list")

df |> 
  write_adbc(con, "table_with_list")
#> Error in adbc_statement_execute_query(stmt): NOT_IMPLEMENTED: [libpq] Field #1 ('A') has unsupported type for ingestion list

Created on 2024-08-07 with reprex v2.1.0

@paleolimbot
Copy link
Member

Hmm..I'm wondering if this call to SetParamTypes() isn't needed anymore:

RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));

@skarakuzu
Copy link

Hello @paleolimbot,

Thank you very much for your response. In addition to python, we created also a c example by following the code and the tests in the c repo. We were able to create a table and read from it when the table does not contain lists. However, the code crashes with segmentation fault if there is any list ingestion. All the queries run without throwing any error but no table is created in the postgresql database. The code crashes at the stream.get_schema(&stream, &schema_rep); line . I am a bit new to arrow-adbc and nano-arrow so I wanted to ask if there is any step I am missing in the process.

I also tried to comment the line you pointed and build the code but it did not fix the problem.
We appreciate any help and suggestions. Thanks in advance!


#include <adbc.h>
#include <nanoarrow.h>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <iostream>
#include <string>
#include <vector>

int main() {
  AdbcError error{};

  AdbcDatabase database = {};
  AdbcDatabaseNew(&database, &error);
  AdbcDatabaseSetOption(&database, "driver", "adbc_driver_postgresql", &error);
  AdbcDatabaseSetOption(&database, "uri", "postgresql://localhost:5432/postgres", &error);
  AdbcDatabaseInit(&database, &error);

  /// Creating a Connection
  AdbcConnection connection = {};
  AdbcConnectionNew(&connection, &error);
  AdbcConnectionInit(&connection, &database, &error);

  struct ArrowSchema schema;
  struct ArrowArray batch;
  static struct ArrowError global_error;

  ArrowSchemaInit(&schema);
  ArrowSchemaSetTypeStruct(&schema, 2);
  ArrowSchemaInit(schema.children[0]);
  ArrowSchemaSetTypeFixedSize(schema.children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2);
  ArrowSchemaInit(schema.children[1]);
  ArrowSchemaSetTypeFixedSize(schema.children[1], NANOARROW_TYPE_FIXED_SIZE_LIST, 2);

  ArrowSchemaSetName(schema.children[0], "index");
  ArrowSchemaSetName(schema.children[1], "create");

  ArrowSchemaSetType(schema.children[0]->children[0], NANOARROW_TYPE_INT64);
  ArrowSchemaSetType(schema.children[1]->children[0], NANOARROW_TYPE_STRING);

  std::vector<std::vector<int>> v1{{42, 43}, {-42, -43}};
  std::vector<std::vector<std::string>> v2{{"foo", "foo1"}, {"bar", "bar1"}};

  ArrowArrayInitFromSchema(&batch, &schema, &global_error);

  ArrowArrayStartAppending(&batch);

  for (size_t i = 0; i < v1.size(); i  ) {
  for (size_t j = 0; j < v1[i].size(); j  ) {
    ArrowArrayAppendInt(batch.children[0]->children[0], v1[i][j]);
  }
  ArrowArrayFinishElement(batch.children[0]);
  ArrowArrayFinishElement(&batch);
  }
  for (size_t i = 0; i < v2.size(); i  ) {
  for (size_t j = 0; j < v2[i].size(); j  ) {
    ArrowArrayAppendString(
        batch.children[1]->children[0],
        ArrowStringView{v2[i][j].c_str(), (int64_t)strlen(v2[i][j].c_str())});
  }
  ArrowArrayFinishElement(batch.children[1]);
  ArrowArrayFinishElement(&batch);
  }

  batch.children[0]->length = batch.children[0]->children[0]->length;
  batch.length = batch.children[0]->length;
  ArrowArrayFinishBuildingDefault(&batch, &global_error);

  // Create Stream
  struct ArrowArrayStream stream {};

  // Creating a Statement
  struct AdbcStatement statement;
  int64_t rows_affected = -1;

  // Drop table if exists
  AdbcStatementNew(&connection, &statement, &error);
  std::string query = "DROP TABLE IF EXISTS \"bulk_ingest\"";
  AdbcStatementSetSqlQuery(&statement, query.c_str(), &error);
  AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error);
  // End of dropping the table if exists

  // Start the table
  AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest",
                         &error);
  AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
                         ADBC_INGEST_OPTION_MODE_CREATE_APPEND, &error);

  AdbcStatementBind(&statement, &batch, &schema, &error);
  AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error);

  std::cout << "0 . rows affected: " << rows_affected << std::endl;
  AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error);
  AdbcStatementExecuteQuery(&statement, &stream, &rows_affected, &error);

  std::cout << "1 . rows affected: " << rows_affected << std::endl;

  struct ArrowSchema schema_rep = {};
  stream.get_schema(&stream, &schema_rep);

  // ... Some other code for post processing

  AdbcConnectionRelease(&connection, &error);
  AdbcDatabaseRelease(&database, &error);

  return 0;
}

<\details>

@paleolimbot
Copy link
Member

Thanks for this! I'll go through the C example in a moment, but I played with this a tiny bit and there is also the place where we serialize the type name (which can probably be moved to the PostgresType if it's not already there). In general the using of ADBC from C or C at the moment requires a lot of C memory management and it can be difficult to get right.

SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
static_cast<uint64_t>(i 1), " ('", source_schema.children[i]->name,
"') has unsupported type for ingestion ",
ArrowTypeString(source_schema_fields[i].type));

I also tried shuffling some pieces around to see if we could use the "append" mode (i.e., issue the CREATE TABLE manually), but when I do this I still get an invalid COPY binary error. I'll put all this in a PR but wanted to document it while it was fresh on my mind!

@danielballan
Copy link
Author

Hello @paleolimbot. Just giving this a ping to express our continued interest in this topic and willingness to contribute if we can.

@paleolimbot
Copy link
Member

Thank you for the ping! I was unfortunately on PTO for the last week before the forthcoming release but will be back next week and will make sure a fix for this is included in the next release (~6 weeks).

@danielballan
Copy link
Author

Thanks! That is helpful for our planning. I hope you had some enjoyable days off.

@github-actions github-actions bot added this to the ADBC Libraries 15 milestone Sep 11, 2024
paleolimbot added a commit that referenced this issue Sep 12, 2024
…greSQL (#2153)

This PR adds list ingest support in the PostgreSQL driver. The raw COPY
and type support had already been added; however, there were still some
hard-coded switch-on-type that implemented some necessary features
(e.g., the CREATE TABLE syntax for each type). I moved these to the
type/copy section and removed the previous implementation (which should
also help for a future PR implementing the ability to bind nested types
as parameters).

I also added the infrastructure to test this in all drivers via the
validation suite, which required modifying some of the built-in concepts
to accommodate a nested type.

Closes #2066.

``` r
library(adbcdrivermanager)

con <- adbc_database_init(
  adbcpostgresql::adbcpostgresql(),
  uri = "postgresql://localhost:5432/postgres?user=postgres&password=password"
) |> 
  adbc_connection_init()

df <- tibble::tibble(
  A = vctrs::list_of(1:3, 4:5, 6:10),
  B = c(10.0, 20.0, 30.0)
)

con |> 
  execute_adbc("DROP TABLE IF EXISTS table_with_list")

df |> 
  write_adbc(con, "table_with_list")

con |> 
  read_adbc("select * from table_with_list") |> 
  tibble::as_tibble() |> 
  dplyr::pull(1)
#> <list_of<integer>[3]>
#> [[1]]
#> [1] 1 2 3
#> 
#> [[2]]
#> [1] 4 5
#> 
#> [[3]]
#> [1]  6  7  8  9 10
```

<sup>Created on 2024-09-11 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: question Usage question
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants