From 92ebf4afe5d55135b3ba39616bcb77106c07c597 Mon Sep 17 00:00:00 2001 From: Heyang Zhou Date: Wed, 22 Mar 2023 12:13:24 +0800 Subject: [PATCH] feat(ext/kv): key-value store (#18232) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds unstable "Deno.openKv()" API that allows to open a key-value database at a specified path. --------- Co-authored-by: Luca Casonato Co-authored-by: Bartek IwaƄczuk --- Cargo.lock | 15 + Cargo.toml | 3 + cli/build.rs | 5 + cli/tests/unit/kv_test.ts | 933 +++++++++++++++++++++++++++++ cli/tests/unit/test_util.ts | 1 + cli/tsc/dts/lib.deno.unstable.d.ts | 499 +++++++++++++++ ext/kv/01_db.ts | 469 +++++++++++++++ ext/kv/Cargo.toml | 24 + ext/kv/codec.rs | 559 +++++++++++++++++ ext/kv/interface.rs | 294 +++++++++ ext/kv/lib.rs | 541 +++++++++++++++++ ext/kv/sqlite.rs | 348 +++++++++++ ext/node/Cargo.toml | 2 +- runtime/Cargo.toml | 2 + runtime/build.rs | 22 + runtime/js/90_deno_ns.js | 5 + runtime/lib.rs | 1 + runtime/permissions/mod.rs | 12 + runtime/web_worker.rs | 5 + runtime/worker.rs | 7 + serde_v8/de.rs | 6 +- serde_v8/error.rs | 1 + serde_v8/lib.rs | 1 + serde_v8/magic/any_value.rs | 66 ++ serde_v8/magic/mod.rs | 1 + serde_v8/payload.rs | 3 + serde_v8/ser.rs | 8 + 27 files changed, 3831 insertions(+), 2 deletions(-) create mode 100644 cli/tests/unit/kv_test.ts create mode 100644 ext/kv/01_db.ts create mode 100644 ext/kv/Cargo.toml create mode 100644 ext/kv/codec.rs create mode 100644 ext/kv/interface.rs create mode 100644 ext/kv/lib.rs create mode 100644 ext/kv/sqlite.rs create mode 100644 serde_v8/magic/any_value.rs diff --git a/Cargo.lock b/Cargo.lock index c31ad0ffef1d93..d73f2bc850e5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1078,6 +1078,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "deno_kv" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.13.1", + "deno_core", + "hex", + "num-bigint", + "rusqlite", + "serde", +] + [[package]] name = "deno_lint" version = "0.43.0" @@ -1188,6 +1202,7 @@ dependencies = [ "deno_fs", "deno_http", "deno_io", + "deno_kv", "deno_napi", "deno_net", "deno_node", diff --git a/Cargo.toml b/Cargo.toml index 8d43b9338e585f..5a5257362ea577 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "ext/fs", "ext/http", "ext/io", + "ext/kv", "ext/net", "ext/node", "ext/url", @@ -67,6 +68,7 @@ deno_http = { version = "0.88.0", path = "./ext/http" } deno_io = { version = "0.3.0", path = "./ext/io" } deno_net = { version = "0.85.0", path = "./ext/net" } deno_node = { version = "0.30.0", path = "./ext/node" } +deno_kv = { version = "0.1.0", path = "./ext/kv" } deno_tls = { version = "0.80.0", path = "./ext/tls" } deno_url = { version = "0.93.0", path = "./ext/url" } deno_web = { version = "0.124.0", path = "./ext/web" } @@ -91,6 +93,7 @@ encoding_rs = "=0.8.31" flate2 = "=1.0.24" fs3 = "0.5.0" futures = "0.3.21" +hex = "0.4" http = "0.2.9" hyper = "0.14.18" indexmap = { version = "1.9.2", features = ["serde"] } diff --git a/cli/build.rs b/cli/build.rs index ecd7ed1bea00f6..a4f8ee92d1dc67 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -8,6 +8,7 @@ use deno_core::Extension; use deno_core::ExtensionFileSource; use deno_core::ExtensionFileSourceCode; use deno_runtime::deno_cache::SqliteBackedCache; +use deno_runtime::deno_kv::sqlite::SqliteDbHandler; use deno_runtime::permissions::PermissionsContainer; use deno_runtime::*; @@ -353,6 +354,10 @@ fn create_cli_snapshot(snapshot_path: PathBuf) { None, ), deno_tls::deno_tls::init_ops(), + deno_kv::deno_kv::init_ops( + SqliteDbHandler::::new(None), + false, // No --unstable. + ), deno_napi::deno_napi::init_ops::(), deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Default::default()), diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts new file mode 100644 index 00000000000000..7bb4656c1bb3c2 --- /dev/null +++ b/cli/tests/unit/kv_test.ts @@ -0,0 +1,933 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { + assert, + assertEquals, + AssertionError, + assertRejects, + assertThrows, +} from "./test_util.ts"; + +function dbTest(name: string, fn: (db: Deno.Kv) => Promise) { + Deno.test({ + name, + async fn() { + const db: Deno.Kv = await Deno.openKv( + ":memory:", + ); + try { + await fn(db); + } finally { + await db.close(); + } + }, + }); +} + +dbTest("basic read-write-delete and versionstamps", async (db) => { + const result1 = await db.get(["a"]); + assertEquals(result1.key, ["a"]); + assertEquals(result1.value, null); + assertEquals(result1.versionstamp, null); + + await db.set(["a"], "b"); + const result2 = await db.get(["a"]); + assertEquals(result2.key, ["a"]); + assertEquals(result2.value, "b"); + assertEquals(result2.versionstamp, "00000000000000010000"); + + await db.set(["a"], "c"); + const result3 = await db.get(["a"]); + assertEquals(result3.key, ["a"]); + assertEquals(result3.value, "c"); + assertEquals(result3.versionstamp, "00000000000000020000"); + + await db.delete(["a"]); + const result4 = await db.get(["a"]); + assertEquals(result4.key, ["a"]); + assertEquals(result4.value, null); + assertEquals(result4.versionstamp, null); +}); + +const VALUE_CASES = [ + { name: "string", value: "hello" }, + { name: "number", value: 42 }, + { name: "bigint", value: 42n }, + { name: "boolean", value: true }, + { name: "null", value: null }, + { name: "undefined", value: undefined }, + { name: "Date", value: new Date(0) }, + { name: "Uint8Array", value: new Uint8Array([1, 2, 3]) }, + { name: "ArrayBuffer", value: new ArrayBuffer(3) }, + { name: "array", value: [1, 2, 3] }, + { name: "object", value: { a: 1, b: 2 } }, + { name: "nested array", value: [[1, 2], [3, 4]] }, + { name: "nested object", value: { a: { b: 1 } } }, +]; + +for (const { name, value } of VALUE_CASES) { + dbTest(`set and get ${name} value`, async (db) => { + await db.set(["a"], value); + const result = await db.get(["a"]); + assertEquals(result.key, ["a"]); + assertEquals(result.value, value); + }); +} + +dbTest("set and get recursive object", async (db) => { + // deno-lint-ignore no-explicit-any + const value: any = { a: undefined }; + value.a = value; + await db.set(["a"], value); + const result = await db.get(["a"]); + assertEquals(result.key, ["a"]); + // deno-lint-ignore no-explicit-any + const resultValue: any = result.value; + assert(resultValue.a === resultValue); +}); + +const keys = [ + ["a"], + ["a", "b"], + ["a", "b", "c"], + [1], + ["a", 1], + ["a", 1, "b"], + [1n], + ["a", 1n], + ["a", 1n, "b"], + [true], + ["a", true], + ["a", true, "b"], + [new Uint8Array([1, 2, 3])], + ["a", new Uint8Array([1, 2, 3])], + ["a", new Uint8Array([1, 2, 3]), "b"], + [1, 1n, true, new Uint8Array([1, 2, 3]), "a"], +]; + +for (const key of keys) { + dbTest(`set and get ${Deno.inspect(key)} key`, async (db) => { + await db.set(key, "b"); + const result = await db.get(key); + assertEquals(result.key, key); + assertEquals(result.value, "b"); + }); +} + +const INVALID_KEYS = [ + [null], + [undefined], + [], + [{}], + [new Date()], + [new ArrayBuffer(3)], + [new Uint8Array([1, 2, 3]).buffer], + [["a", "b"]], +]; + +for (const key of INVALID_KEYS) { + dbTest(`set and get invalid key ${Deno.inspect(key)}`, async (db) => { + await assertRejects( + async () => { + // @ts-ignore - we are testing invalid keys + await db.set(key, "b"); + }, + Error, + ); + }); +} + +dbTest("compare and mutate", async (db) => { + await db.set(["t"], "1"); + + const currentValue = await db.get(["t"]); + assertEquals(currentValue.versionstamp, "00000000000000010000"); + + let ok = await db.atomic() + .check({ key: ["t"], versionstamp: currentValue.versionstamp }) + .set(currentValue.key, "2") + .commit(); + assertEquals(ok, true); + + const newValue = await db.get(["t"]); + assertEquals(newValue.versionstamp, "00000000000000020000"); + assertEquals(newValue.value, "2"); + + ok = await db.atomic() + .check({ key: ["t"], versionstamp: currentValue.versionstamp }) + .set(currentValue.key, "3") + .commit(); + assertEquals(ok, false); + + const newValue2 = await db.get(["t"]); + assertEquals(newValue2.versionstamp, "00000000000000020000"); + assertEquals(newValue2.value, "2"); +}); + +dbTest("compare and mutate not exists", async (db) => { + let ok = await db.atomic() + .check({ key: ["t"], versionstamp: null }) + .set(["t"], "1") + .commit(); + assertEquals(ok, true); + + const newValue = await db.get(["t"]); + assertEquals(newValue.versionstamp, "00000000000000010000"); + assertEquals(newValue.value, "1"); + + ok = await db.atomic() + .check({ key: ["t"], versionstamp: null }) + .set(["t"], "2") + .commit(); + assertEquals(ok, false); +}); + +dbTest("compare multiple and mutate", async (db) => { + await db.set(["t1"], "1"); + await db.set(["t2"], "2"); + + const currentValue1 = await db.get(["t1"]); + assertEquals(currentValue1.versionstamp, "00000000000000010000"); + const currentValue2 = await db.get(["t2"]); + assertEquals(currentValue2.versionstamp, "00000000000000020000"); + + const ok = await db.atomic() + .check({ key: ["t1"], versionstamp: currentValue1.versionstamp }) + .check({ key: ["t2"], versionstamp: currentValue2.versionstamp }) + .set(currentValue1.key, "3") + .set(currentValue2.key, "4") + .commit(); + assertEquals(ok, true); + + const newValue1 = await db.get(["t1"]); + assertEquals(newValue1.versionstamp, "00000000000000030000"); + assertEquals(newValue1.value, "3"); + const newValue2 = await db.get(["t2"]); + assertEquals(newValue2.versionstamp, "00000000000000030000"); + assertEquals(newValue2.value, "4"); + + // just one of the two checks failed + const ok2 = await db.atomic() + .check({ key: ["t1"], versionstamp: newValue1.versionstamp }) + .check({ key: ["t2"], versionstamp: null }) + .set(newValue1.key, "5") + .set(newValue2.key, "6") + .commit(); + assertEquals(ok2, false); + + const newValue3 = await db.get(["t1"]); + assertEquals(newValue3.versionstamp, "00000000000000030000"); + assertEquals(newValue3.value, "3"); + const newValue4 = await db.get(["t2"]); + assertEquals(newValue4.versionstamp, "00000000000000030000"); + assertEquals(newValue4.value, "4"); +}); + +dbTest("atomic mutation ordering (set before delete)", async (db) => { + await db.set(["a"], "1"); + const ok1 = await db.atomic() + .set(["a"], "2") + .delete(["a"]) + .commit(); + assert(ok1); + const result = await db.get(["a"]); + assertEquals(result.value, null); +}); + +dbTest("atomic mutation ordering (delete before set)", async (db) => { + await db.set(["a"], "1"); + const ok1 = await db.atomic() + .delete(["a"]) + .set(["a"], "2") + .commit(); + assert(ok1); + const result = await db.get(["a"]); + assertEquals(result.value, "2"); +}); + +dbTest("atomic mutation type=set", async (db) => { + const ok = await db.atomic() + .mutate({ key: ["a"], value: "1", type: "set" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, "1"); +}); + +dbTest("atomic mutation type=set overwrite", async (db) => { + await db.set(["a"], "1"); + const ok = await db.atomic() + .mutate({ key: ["a"], value: "2", type: "set" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, "2"); +}); + +dbTest("atomic mutation type=delete", async (db) => { + await db.set(["a"], "1"); + const ok = await db.atomic() + .mutate({ key: ["a"], type: "delete" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, null); +}); + +dbTest("atomic mutation type=delete no exists", async (db) => { + const ok = await db.atomic() + .mutate({ key: ["a"], type: "delete" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, null); +}); + +dbTest("atomic mutation type=sum", async (db) => { + await db.set(["a"], new Deno.KvU64(10n)); + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, new Deno.KvU64(11n)); +}); + +dbTest("atomic mutation type=sum no exists", async (db) => { + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assert(result.value); + assertEquals(result.value, new Deno.KvU64(1n)); +}); + +dbTest("atomic mutation type=sum wrap around", async (db) => { + await db.set(["a"], new Deno.KvU64(0xffffffffffffffffn)); + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(10n), type: "sum" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, new Deno.KvU64(9n)); + + const ok2 = await db.atomic() + .mutate({ + key: ["a"], + value: new Deno.KvU64(0xffffffffffffffffn), + type: "sum", + }) + .commit(); + assert(ok2); + const result2 = await db.get(["a"]); + assertEquals(result2.value, new Deno.KvU64(8n)); +}); + +dbTest("atomic mutation type=sum wrong type in db", async (db) => { + await db.set(["a"], 1); + assertRejects( + async () => { + await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "sum" }) + .commit(); + }, + TypeError, + "Failed to perform 'sum' mutation on a non-U64 value in the database", + ); +}); + +dbTest("atomic mutation type=sum wrong type in mutation", async (db) => { + await db.set(["a"], new Deno.KvU64(1n)); + assertRejects( + async () => { + await db.atomic() + // @ts-expect-error wrong type is intentional + .mutate({ key: ["a"], value: 1, type: "sum" }) + .commit(); + }, + TypeError, + "Failed to perform 'sum' mutation on a non-U64 operand", + ); +}); + +dbTest("atomic mutation type=min", async (db) => { + await db.set(["a"], new Deno.KvU64(10n)); + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(5n), type: "min" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, new Deno.KvU64(5n)); + + const ok2 = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(15n), type: "min" }) + .commit(); + assert(ok2); + const result2 = await db.get(["a"]); + assertEquals(result2.value, new Deno.KvU64(5n)); +}); + +dbTest("atomic mutation type=min no exists", async (db) => { + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "min" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assert(result.value); + assertEquals(result.value, new Deno.KvU64(1n)); +}); + +dbTest("atomic mutation type=min wrong type in db", async (db) => { + await db.set(["a"], 1); + assertRejects( + async () => { + await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "min" }) + .commit(); + }, + TypeError, + "Failed to perform 'min' mutation on a non-U64 value in the database", + ); +}); + +dbTest("atomic mutation type=min wrong type in mutation", async (db) => { + await db.set(["a"], new Deno.KvU64(1n)); + assertRejects( + async () => { + await db.atomic() + // @ts-expect-error wrong type is intentional + .mutate({ key: ["a"], value: 1, type: "min" }) + .commit(); + }, + TypeError, + "Failed to perform 'min' mutation on a non-U64 operand", + ); +}); + +dbTest("atomic mutation type=max", async (db) => { + await db.set(["a"], new Deno.KvU64(10n)); + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(5n), type: "max" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assertEquals(result.value, new Deno.KvU64(10n)); + + const ok2 = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(15n), type: "max" }) + .commit(); + assert(ok2); + const result2 = await db.get(["a"]); + assertEquals(result2.value, new Deno.KvU64(15n)); +}); + +dbTest("atomic mutation type=max no exists", async (db) => { + const ok = await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "max" }) + .commit(); + assert(ok); + const result = await db.get(["a"]); + assert(result.value); + assertEquals(result.value, new Deno.KvU64(1n)); +}); + +dbTest("atomic mutation type=max wrong type in db", async (db) => { + await db.set(["a"], 1); + assertRejects( + async () => { + await db.atomic() + .mutate({ key: ["a"], value: new Deno.KvU64(1n), type: "max" }) + .commit(); + }, + TypeError, + "Failed to perform 'max' mutation on a non-U64 value in the database", + ); +}); + +dbTest("atomic mutation type=max wrong type in mutation", async (db) => { + await db.set(["a"], new Deno.KvU64(1n)); + assertRejects( + async () => { + await db.atomic() + // @ts-expect-error wrong type is intentional + .mutate({ key: ["a"], value: 1, type: "max" }) + .commit(); + }, + TypeError, + "Failed to perform 'max' mutation on a non-U64 operand", + ); +}); + +Deno.test("KvU64 comparison", () => { + const a = new Deno.KvU64(1n); + const b = new Deno.KvU64(1n); + assertEquals(a, b); + assertThrows(() => { + assertEquals(a, new Deno.KvU64(2n)); + }, AssertionError); +}); + +Deno.test("KvU64 overflow", () => { + assertThrows(() => { + new Deno.KvU64(2n ** 64n); + }, RangeError); +}); + +Deno.test("KvU64 underflow", () => { + assertThrows(() => { + new Deno.KvU64(-1n); + }, RangeError); +}); + +Deno.test("KvU64 frozen", () => { + const a = new Deno.KvU64(1n); + assertThrows(() => { + // @ts-expect-error value is readonly + a.value = 2n; + }, TypeError); +}); + +Deno.test("KvU64 unbox", () => { + const a = new Deno.KvU64(1n); + assertEquals(a.value, 1n); +}); + +async function collect(iter: Deno.KvListIterator): Promise { + const entries: Deno.KvEntry[] = []; + for await (const entry of iter) { + entries.push(entry); + } + return entries; +} + +async function setupData(db: Deno.Kv) { + await db.atomic() + .set(["a"], -1) + .set(["a", "a"], 0) + .set(["a", "b"], 1) + .set(["a", "c"], 2) + .set(["a", "d"], 3) + .set(["a", "e"], 4) + .set(["b"], 99) + .set(["b", "a"], 100) + .commit(); +} + +dbTest("list prefix", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"] })); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix empty", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["c"] })); + assertEquals(entries.length, 0); + + const entries2 = await collect(db.list({ prefix: ["a", "f"] })); + assertEquals(entries2.length, 0); +}); + +dbTest("list prefix with start", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"], start: ["a", "c"] })); + assertEquals(entries, [ + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with start empty", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"], start: ["a", "f"] })); + assertEquals(entries.length, 0); +}); + +dbTest("list prefix with end", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"], end: ["a", "c"] })); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with end empty", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"], end: ["a", "a"] })); + assertEquals(entries.length, 0); +}); + +dbTest("list prefix reverse", async (db) => { + await setupData(db); + + const entries = await collect(db.list({ prefix: ["a"] }, { reverse: true })); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix reverse with start", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"], start: ["a", "c"] }, { reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix reverse with start empty", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"], start: ["a", "f"] }, { reverse: true }), + ); + assertEquals(entries.length, 0); +}); + +dbTest("list prefix reverse with end", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"], end: ["a", "c"] }, { reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix reverse with end empty", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"], end: ["a", "a"] }, { reverse: true }), + ); + assertEquals(entries.length, 0); +}); + +dbTest("list prefix limit", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"] }, { limit: 2 })); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix limit reverse", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"] }, { limit: 2, reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with small batch size", async (db) => { + await setupData(db); + const entries = await collect(db.list({ prefix: ["a"] }, { batchSize: 2 })); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with small batch size reverse", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"] }, { batchSize: 2, reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with small batch size and limit", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3 }), + ); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with small batch size and limit reverse", async (db) => { + await setupData(db); + const entries = await collect( + db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3, reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with manual cursor", async (db) => { + await setupData(db); + + const iterator = db.list({ prefix: ["a"] }, { limit: 2 }); + const values = await collect(iterator); + assertEquals(values, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + ]); + + const cursor = iterator.cursor; + assertEquals(cursor, "AmIA"); + + const iterator2 = db.list({ prefix: ["a"] }, { cursor }); + const values2 = await collect(iterator2); + assertEquals(values2, [ + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list prefix with manual cursor reverse", async (db) => { + await setupData(db); + + const iterator = db.list({ prefix: ["a"] }, { limit: 2, reverse: true }); + const values = await collect(iterator); + assertEquals(values, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + ]); + + const cursor = iterator.cursor; + assertEquals(cursor, "AmQA"); + + const iterator2 = db.list({ prefix: ["a"] }, { cursor, reverse: true }); + const values2 = await collect(iterator2); + assertEquals(values2, [ + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range", async (db) => { + await setupData(db); + + const entries = await collect( + db.list({ start: ["a", "a"], end: ["a", "z"] }), + ); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range reverse", async (db) => { + await setupData(db); + + const entries = await collect( + db.list({ start: ["a", "a"], end: ["a", "z"] }, { reverse: true }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range with limit", async (db) => { + await setupData(db); + + const entries = await collect( + db.list({ start: ["a", "a"], end: ["a", "z"] }, { limit: 3 }), + ); + assertEquals(entries, [ + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range with limit reverse", async (db) => { + await setupData(db); + + const entries = await collect( + db.list({ start: ["a", "a"], end: ["a", "z"] }, { + limit: 3, + reverse: true, + }), + ); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range nesting", async (db) => { + await setupData(db); + + const entries = await collect(db.list({ start: ["a"], end: ["a", "d"] })); + assertEquals(entries, [ + { key: ["a"], value: -1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range short", async (db) => { + await setupData(db); + + const entries = await collect( + db.list({ start: ["a", "b"], end: ["a", "d"] }), + ); + assertEquals(entries, [ + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range with manual cursor", async (db) => { + await setupData(db); + + const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, { + limit: 2, + }); + const entries = await collect(iterator); + assertEquals(entries, [ + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + ]); + + const cursor = iterator.cursor; + const iterator2 = db.list({ start: ["a", "b"], end: ["a", "z"] }, { + cursor, + }); + const entries2 = await collect(iterator2); + assertEquals(entries2, [ + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list range with manual cursor reverse", async (db) => { + await setupData(db); + + const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, { + limit: 2, + reverse: true, + }); + const entries = await collect(iterator); + assertEquals(entries, [ + { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + ]); + + const cursor = iterator.cursor; + const iterator2 = db.list({ start: ["a", "b"], end: ["a", "z"] }, { + cursor, + reverse: true, + }); + const entries2 = await collect(iterator2); + assertEquals(entries2, [ + { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + ]); +}); + +dbTest("list invalid selector", async (db) => { + await setupData(db); + + await assertRejects(async () => { + await collect( + db.list({ prefix: ["a"], start: ["a", "b"], end: ["a", "c"] }), + ); + }, TypeError); + + await assertRejects(async () => { + await collect( + // @ts-expect-error missing end + db.list({ start: ["a", "b"] }), + ); + }, TypeError); + + await assertRejects(async () => { + await collect( + // @ts-expect-error missing start + db.list({ end: ["a", "b"] }), + ); + }, TypeError); +}); + +dbTest("invalid versionstamp in atomic check rejects", async (db) => { + await assertRejects(async () => { + await db.atomic().check({ key: ["a"], versionstamp: "" }).commit(); + }, TypeError); + + await assertRejects(async () => { + await db.atomic().check({ key: ["a"], versionstamp: "xx".repeat(10) }) + .commit(); + }, TypeError); + + await assertRejects(async () => { + await db.atomic().check({ key: ["a"], versionstamp: "aa".repeat(11) }) + .commit(); + }, TypeError); +}); + +dbTest("invalid mutation type rejects", async (db) => { + await assertRejects(async () => { + await db.atomic() + // @ts-expect-error invalid type + value combo + .mutate({ key: ["a"], type: "set" }) + .commit(); + }, TypeError); + + await assertRejects(async () => { + await db.atomic() + // @ts-expect-error invalid type + value combo + .mutate({ key: ["a"], type: "delete", value: "123" }) + .commit(); + }, TypeError); + + await assertRejects(async () => { + await db.atomic() + // @ts-expect-error invalid type + .mutate({ key: ["a"], type: "foobar" }) + .commit(); + }, TypeError); + + await assertRejects(async () => { + await db.atomic() + // @ts-expect-error invalid type + .mutate({ key: ["a"], type: "foobar", value: "123" }) + .commit(); + }, TypeError); +}); diff --git a/cli/tests/unit/test_util.ts b/cli/tests/unit/test_util.ts index 64c399b2d5e31c..23713faf4ab244 100644 --- a/cli/tests/unit/test_util.ts +++ b/cli/tests/unit/test_util.ts @@ -7,6 +7,7 @@ export { assert, assertEquals, assertFalse, + AssertionError, assertMatch, assertNotEquals, assertNotStrictEquals, diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 198b634fd2adab..b042ceabe4ad48 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1518,6 +1518,505 @@ declare namespace Deno { * @category HTTP Server */ export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array]; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * Open a new {@linkcode Deno.Kv} connection to persist data. + * + * When a path is provided, the database will be persisted to disk at that + * path. Read and write access to the file is required. + * + * When no path is provided, the database will be opened in a default path for + * the current script. This location is persistent across script runs and is + * keyed on the origin storage key (the same key that is used to determine + * `localStorage` persistence). More information about the origin storage key + * can be found in the Deno Manual. + * + * @tags allow-read, allow-write + * @category KV + */ + export function openKv(path?: string): Promise; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A key to be persisted in a {@linkcode Deno.Kv}. A key is a sequence + * of {@linkcode Deno.KvKeyPart}s. + * + * Keys are ordered lexicographically by their parts. The first part is the + * most significant, and the last part is the least significant. The order of + * the parts is determined by both the type and the value of the part. The + * relative significance of the types can be found in documentation for the + * {@linkcode Deno.KvKeyPart} type. + * + * @category KV + */ + export type KvKey = readonly KvKeyPart[]; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A single part of a {@linkcode Deno.KvKey}. Parts are ordered + * lexicographically, first by their type, and within a given type by their + * value. + * + * The ordering of types is as follows: + * + * 1. `Uint8Array` + * 2. `string` + * 3. `number` + * 4. `bigint` + * 5. `boolean` + * + * Within a given type, the ordering is as follows: + * + * - `Uint8Array` is ordered by the byte ordering of the array + * - `string` is ordered by the byte ordering of the UTF-8 encoding of the + * string + * - `number` is ordered following this pattern: `-NaN` + * < `-Infinity` < `-100.0` < `-1.0` < -`0.5` < `-0.0` < `0.0` < `0.5` + * < `1.0` < `100.0` < `Infinity` < `NaN` + * - `bigint` is ordered by mathematical ordering, with the largest negative + * number being the least first value, and the largest positive number + * being the last value + * - `boolean` is ordered by `false` < `true` + * + * This means that the part `1.0` (a number) is ordered before the part `2.0` + * (also a number), but is greater than the part `0n` (a bigint), because + * `1.0` is a number and `0n` is a bigint, and type ordering has precedence + * over the ordering of values within a type. + * + * @category KV + */ + export type KvKeyPart = Uint8Array | string | number | bigint | boolean; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * Consistency level of a KV operation. + * + * - `strong` - This operation must be strongly-consistent. + * - `eventual` - Eventually-consistent behavior is allowed. + * + * @category KV + */ + export type KvConsistencyLevel = "strong" | "eventual"; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A selector that selects the range of data returned by a list operation on a + * {@linkcode Deno.Kv}. + * + * The selector can either be a prefix selector or a range selector. A prefix + * selector selects all keys that start with the given prefix (optionally + * starting at a given key). A range selector selects all keys that are + * lexicographically between the given start and end keys. + * + * @category KV + */ + export type KvListSelector = + | { prefix: KvKey } + | { prefix: KvKey; start: KvKey } + | { prefix: KvKey; end: KvKey } + | { start: KvKey; end: KvKey }; + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A mutation to a key in a {@linkcode Deno.Kv}. A mutation is a + * combination of a key, a value, and a type. The type determines how the + * mutation is applied to the key. + * + * - `set` - Sets the value of the key to the given value, overwriting any + * existing value. + * - `delete` - Deletes the key from the database. The mutation is a no-op if + * the key does not exist. + * - `sum` - Adds the given value to the existing value of the key. Both the + * value specified in the mutation, and any existing value must be of type + * `Deno.KvU64`. If the key does not exist, the value is set to the given + * value (summed with 0). + * - `max` - Sets the value of the key to the maximum of the existing value + * and the given value. Both the value specified in the mutation, and any + * existing value must be of type `Deno.KvU64`. If the key does not exist, + * the value is set to the given value. + * - `min` - Sets the value of the key to the minimum of the existing value + * and the given value. Both the value specified in the mutation, and any + * existing value must be of type `Deno.KvU64`. If the key does not exist, + * the value is set to the given value. + * + * @category KV + */ + export type KvMutation = + & { key: KvKey } + & ( + | { type: "set"; value: unknown } + | { type: "delete" } + | { type: "sum"; value: KvU64 } + | { type: "max"; value: KvU64 } + | { type: "min"; value: KvU64 } + ); + + /** **UNSTABLE**: New API, yet to be vetted. + * + * An iterator over a range of data entries in a {@linkcode Deno.Kv}. + * + * The cursor getter returns the cursor that can be used to resume the + * iteration from the current position in the future. + * + * @category KV + */ + export class KvListIterator implements AsyncIterableIterator { + /** + * Returns the cursor of the current position in the iteration. This cursor + * can be used to resume the iteration from the current position in the + * future by passing it to the `cursor` option of the `list` method. + */ + get cursor(): string; + + next(): Promise>; + [Symbol.asyncIterator](): AsyncIterableIterator; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A versioned pair of key and value in a {@linkcode Deno.Kv}. + * + * The `versionstamp` is a string that represents the current version of the + * key-value pair. It can be used to perform atomic operations on the KV store + * by passing it to the `check` method of a {@linkcode Deno.AtomicOperation}. + * A `null` versionstamp indicates that no value exists for the given key in + * the KV store. + * + * @category KV + */ + export interface KvEntry { + key: KvKey; + value: unknown; + versionstamp: string | null; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * Options for listing key-value pairs in a {@linkcode Deno.Kv}. + * + * @category KV + */ + export interface KvListOptions { + /** + * The maximum number of key-value pairs to return. If not specified, all + * matching key-value pairs will be returned. + */ + limit?: number; + /** + * The cursor to resume the iteration from. If not specified, the iteration + * will start from the beginning. + */ + cursor?: string; + /** + * Whether to reverse the order of the returned key-value pairs. If not + * specified, the order will be ascending from the start of the range as per + * the lexicographical ordering of the keys. If `true`, the order will be + * descending from the end of the range. + * + * The default value is `false`. + */ + reverse?: boolean; + /** + * The consistency level of the list operation. The default consistency + * level is "strong". Some use cases can benefit from using a weaker + * consistency level. For more information on consistency levels, see the + * documentation for {@linkcode Deno.KvConsistencyLevel}. + * + * List operations are performed in batches (in sizes specified by the + * `batchSize` option). The consistency level of the list operation is + * applied to each batch individually. This means that while each batch is + * guaranteed to be consistent within itself, the entire list operation may + * not be consistent across batches because a mutation may be applied to a + * key-value pair between batches, in a batch that has already been returned + * by the list operation. + */ + consistency?: KvConsistencyLevel; + /** + * The size of the batches in which the list operation is performed. Larger + * or smaller batch sizes may positively or negatively affect the + * performance of a list operation depending on the specific use case and + * iteration behavior. Slow iterating queries may benefit from using a + * smaller batch size for increased overall consistency, while fast + * iterating queries may benefit from using a larger batch size for better + * performance. + * + * The default batch size is equal to the `limit` option, or 100 if this is + * unset. The maximum value for this option is 500. Larger values will be + * clamped. + */ + batchSize?: number; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A check to perform as part of a {@linkcode Deno.AtomicOperation}. The check + * will fail if the versionstamp for the key-value pair in the KV store does + * not match the given versionstamp. A check with a `null` versionstamp checks + * that the key-value pair does not currently exist in the KV store. + * + * @category KV + */ + export interface AtomicCheck { + key: KvKey; + versionstamp: string | null; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * An operation on a {@linkcode Deno.Kv} that can be performed + * atomically. Atomic operations do not auto-commit, and must be committed + * explicitly by calling the `commit` method. + * + * Atomic operations can be used to perform multiple mutations on the KV store + * in a single atomic transaction. They can also be used to perform + * conditional mutations by specifying one or more + * {@linkcode Deno.AtomicCheck}s that ensure that a mutation is only performed + * if the key-value pair in the KV has a specific versionstamp. If any of the + * checks fail, the entire operation will fail and no mutations will be made. + * + * The ordering of mutations is guaranteed to be the same as the ordering of + * the mutations specified in the operation. Checks are performed before any + * mutations are performed. The ordering of checks is unobservable. + * + * Atomic operations can be used to implement optimistic locking, where a + * mutation is only performed if the key-value pair in the KV store has not + * been modified since the last read. This can be done by specifying a check + * that ensures that the versionstamp of the key-value pair matches the + * versionstamp that was read. If the check fails, the mutation will not be + * performed and the operation will fail. One can then retry the read-modify- + * write operation in a loop until it succeeds. + * + * The `commit` method of an atomic operation returns a boolean indicating + * whether checks passed and mutations were performed. If the operation failed + * because of a failed check, the return value will be `false`. If the + * operation failed for any other reason (storage error, invalid value, etc.), + * an exception will be thrown. + * + * @category KV + */ + export class AtomicOperation { + /** + * Add to the operation a check that ensures that the versionstamp of the + * key-value pair in the KV store matches the given versionstamp. If the + * check fails, the entire operation will fail and no mutations will be + * performed during the commit. + */ + check(...checks: AtomicCheck[]): this; + /** + * Add to the operation a mutation that performs the specified mutation on + * the specified key if all checks pass during the commit. The types and + * semantics of all available mutations are described in the documentation + * for {@linkcode Deno.KvMutation}. + */ + mutate(...mutations: KvMutation[]): this; + /** + * Add to the operation a mutation that sets the value of the specified key + * to the specified value if all checks pass during the commit. + */ + set(key: KvKey, value: unknown): this; + /** + * Add to the operation a mutation that deletes the specified key if all + * checks pass during the commit. + */ + delete(key: KvKey): this; + /** + * Commit the operation to the KV store. Returns a boolean indicating + * whether checks passed and mutations were performed. If the operation + * failed because of a failed check, the return value will be `false`. If + * the operation failed for any other reason (storage error, invalid value, + * etc.), an exception will be thrown. + * + * If the commit returns `false`, one may create a new atomic operation with + * updated checks and mutations and attempt to commit it again. See the note + * on optimistic locking in the documentation for {@linkcode Deno.AtomicOperation}. + */ + commit(): Promise; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * A key-value database that can be used to store and retrieve data. + * + * Data is stored as key-value pairs, where the key is a {@linkcode Deno.KvKey} + * and the value is an arbitrary structured-serializable JavaScript value. + * Keys are ordered lexicographically as described in the documentation for + * {@linkcode Deno.KvKey}. Keys are unique within a database, and the last + * value set for a given key is the one that is returned when reading the + * key. Keys can be deleted from the database, in which case they will no + * longer be returned when reading keys. + * + * Values can be any structured-serializable JavaScript value (objects, + * arrays, strings, numbers, etc.). The special value {@linkcode Deno.KvU64} + * can be used to store 64-bit unsigned integers in the database. This special + * value can not be nested within other objects or arrays. In addition to the + * regular database mutation operations, the unsigned 64-bit integer value + * also supports `sum`, `max`, and `min` mutations. + * + * Keys are versioned on write by assigning the key an ever-increasing + * "versionstamp". The versionstamp represents the version of a key-value pair + * in the database at some point in time, and can be used to perform + * transactional operations on the database without requiring any locking. + * This is enabled by atomic operations, which can have conditions that ensure + * that the operation only succeeds if the versionstamp of the key-value pair + * matches an expected versionstamp. + * + * Keys have a maximum length of 2048 bytes after serialization. Values have a + * maximum length of 16 KiB after serialization. Serialization of both keys + * and values is somewhat opaque, but one can usually assume that the + * serialization of any value is about the same length as the resulting string + * of a JSON serialization of that same value. + * + * @category KV + */ + export class Kv { + /** + * Retrieve the value and versionstamp for the given key from the database + * in the form of a {@linkcode Deno.KvEntry}. If no value exists for the key, + * the returned entry will have a `null` value and versionstamp. + * + * ```ts + * const db = await Deno.openKv(); + * const result = await db.get(["foo"]); + * result.key; // ["foo"] + * result.value; // "bar" + * result.versionstamp; // "00000000000000010000" + * ``` + * + * The `consistency` option can be used to specify the consistency level + * for the read operation. The default consistency level is "strong". Some + * use cases can benefit from using a weaker consistency level. For more + * information on consistency levels, see the documentation for + * {@linkcode Deno.KvConsistencyLevel}. + */ + get( + key: KvKey, + options?: { consistency?: KvConsistencyLevel }, + ): Promise; + + /** + * Retrieve multiple values and versionstamps from the database in the form + * of an array of {@linkcode Deno.KvEntry} objects. The returned array will + * have the same length as the `keys` array, and the entries will be in the + * same order as the keys. If no value exists for a given key, the returned + * entry will have a `null` value and versionstamp. + * + * ```ts + * const db = await Deno.openKv(); + * const result = await db.getMany([["foo"], ["baz"]]); + * result[0].key; // ["foo"] + * result[0].value; // "bar" + * result[0].versionstamp; // "00000000000000010000" + * result[1].key; // ["baz"] + * result[1].value; // null + * result[1].versionstamp; // null + * ``` + * + * The `consistency` option can be used to specify the consistency level + * for the read operation. The default consistency level is "strong". Some + * use cases can benefit from using a weaker consistency level. For more + * information on consistency levels, see the documentation for + * {@linkcode Deno.KvConsistencyLevel}. + */ + getMany( + keys: KvKey[], + options?: { consistency?: KvConsistencyLevel }, + ): Promise; + + /** + * Set the value for the given key in the database. If a value already + * exists for the key, it will be overwritten. + * + * ```ts + * const db = await Deno.openKv(); + * await db.set(["foo"], "bar"); + * ``` + */ + set(key: KvKey, value: unknown): Promise; + + /** + * Delete the value for the given key from the database. If no value exists + * for the key, this operation is a no-op. + * + * ```ts + * const db = await Deno.openKv(); + * await db.delete(["foo"]); + * ``` + */ + delete(key: KvKey): Promise; + + /** + * Retrieve a list of keys in the database. The returned list is an + * {@linkcode Deno.KvListIterator} which can be used to iterate over the + * entries in the database. + * + * Each list operation must specify a selector which is used to specify the + * range of keys to return. The selector can either be a prefix selector, or + * a range selector: + * + * - A prefix selector selects all keys that start with the given prefix of + * key parts. For example, the selector `["users"]` will select all keys + * that start with the prefix `["users"]`, such as `["users", "alice"]` + * and `["users", "bob"]`. Note that you can not partially match a key + * part, so the selector `["users", "a"]` will not match the key + * `["users", "alice"]`. A prefix selector may specify a `start` key that + * is used to skip over keys that are lexicographically less than the + * start key. + * - A range selector selects all keys that are lexicographically between + * the given start and end keys (including the start, and excluding the + * end). For example, the selector `["users", "a"], ["users", "n"]` will + * select all keys that start with the prefix `["users"]` and have a + * second key part that is lexicographically between `a` and `n`, such as + * `["users", "alice"]`, `["users", "bob"]`, and `["users", "mike"]`, but + * not `["users", "noa"]` or `["users", "zoe"]`. + * + * ```ts + * const db = await Deno.openKv(); + * const entries = db.list({ prefix: ["users"] }); + * for await (const entry of entries) { + * entry.key; // ["users", "alice"] + * entry.value; // { name: "Alice" } + * entry.versionstamp; // "00000000000000010000" + * } + * ``` + * + * The `options` argument can be used to specify additional options for the + * list operation. See the documentation for {@linkcode Deno.KvListOptions} + * for more information. + */ + list(selector: KvListSelector, options?: KvListOptions): KvListIterator; + + /** + * Create a new {@linkcode Deno.AtomicOperation} object which can be used to + * perform an atomic transaction on the database. This does not perform any + * operations on the database - the atomic transaction must be committed + * explicitly using the {@linkcode Deno.AtomicOperation.commit} method once + * all checks and mutations have been added to the operation. + */ + atomic(): AtomicOperation; + + /** + * Close the database connection. This will prevent any further operations + * from being performed on the database, but will wait for any in-flight + * operations to complete before closing the underlying database connection. + */ + close(): Promise; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * Wrapper type for 64-bit unsigned integers for use as values in a + * {@linkcode Deno.Kv}. + * + * @category KV + */ + export class KvU64 { + /** Create a new `KvU64` instance from the given bigint value. If the value + * is signed or greater than 64-bits, an error will be thrown. */ + constructor(value: bigint); + /** The value of this unsigned 64-bit integer, represented as a bigint. */ + readonly value: bigint; + } } /** **UNSTABLE**: New API, yet to be vetted. diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts new file mode 100644 index 00000000000000..571a1b3cd5c5ee --- /dev/null +++ b/ext/kv/01_db.ts @@ -0,0 +1,469 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// @ts-ignore internal api +const { + ObjectGetPrototypeOf, + AsyncGeneratorPrototype, +} = globalThis.__bootstrap.primordials; +const core = Deno.core; +const ops = core.ops; + +const encodeCursor: ( + selector: [Deno.KvKey | null, Deno.KvKey | null, Deno.KvKey | null], + boundaryKey: Deno.KvKey, +) => string = (selector, boundaryKey) => + ops.op_kv_encode_cursor(selector, boundaryKey); + +async function openKv(path: string) { + const rid = await core.opAsync("op_kv_database_open", path); + return new Kv(rid); +} + +interface RawKvEntry { + key: Deno.KvKey; + value: RawValue; + versionstamp: string; +} + +type RawValue = { + kind: "v8"; + value: Uint8Array; +} | { + kind: "bytes"; + value: Uint8Array; +} | { + kind: "u64"; + value: bigint; +}; + +class Kv { + #rid: number; + + constructor(rid: number) { + this.#rid = rid; + } + + atomic() { + return new AtomicOperation(this.#rid); + } + + async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) { + key = convertKey(key); + const [entries]: [RawKvEntry[]] = await core.opAsync( + "op_kv_snapshot_read", + this.#rid, + [[ + null, + key, + null, + 1, + false, + null, + ]], + opts?.consistency ?? "strong", + ); + if (!entries.length) { + return { + key, + value: null, + versionstamp: null, + }; + } + return deserializeValue(entries[0]); + } + + async set(key: Deno.KvKey, value: unknown) { + key = convertKey(key); + value = serializeValue(value); + + const checks: Deno.AtomicCheck[] = []; + const mutations = [ + [key, "set", value], + ]; + + const result = await core.opAsync( + "op_kv_atomic_write", + this.#rid, + checks, + mutations, + [], + ); + if (!result) throw new TypeError("Failed to set value"); + } + + async delete(key: Deno.KvKey) { + key = convertKey(key); + + const checks: Deno.AtomicCheck[] = []; + const mutations = [ + [key, "delete", null], + ]; + + const result = await core.opAsync( + "op_kv_atomic_write", + this.#rid, + checks, + mutations, + [], + ); + if (!result) throw new TypeError("Failed to set value"); + } + + list( + selector: Deno.KvListSelector, + options: { + limit?: number; + batchSize?: number; + cursor?: string; + reverse?: boolean; + consistency?: Deno.KvConsistencyLevel; + } = {}, + ): KvListIterator { + if (options.limit !== undefined && options.limit <= 0) { + throw new Error("limit must be positive"); + } + + let batchSize = options.batchSize ?? (options.limit ?? 100); + if (batchSize <= 0) throw new Error("batchSize must be positive"); + if (batchSize > 500) batchSize = 500; + + return new KvListIterator({ + limit: options.limit, + selector, + cursor: options.cursor, + reverse: options.reverse ?? false, + consistency: options.consistency ?? "strong", + batchSize, + pullBatch: this.#pullBatch(batchSize), + }); + } + + #pullBatch(batchSize: number): ( + selector: Deno.KvListSelector, + cursor: string | undefined, + reverse: boolean, + consistency: Deno.KvConsistencyLevel, + ) => Promise { + return async (selector, cursor, reverse, consistency) => { + const [entries]: [RawKvEntry[]] = await core.opAsync( + "op_kv_snapshot_read", + this.#rid, + [[ + "prefix" in selector ? selector.prefix : null, + "start" in selector ? selector.start : null, + "end" in selector ? selector.end : null, + batchSize, + reverse, + cursor, + ]], + consistency, + ); + + return entries.map(deserializeValue); + }; + } + + close() { + core.close(this.#rid); + } +} + +class AtomicOperation { + #rid: number; + + #checks: [Deno.KvKey, string | null][] = []; + #mutations: [Deno.KvKey, string, RawValue | null][] = []; + + constructor(rid: number) { + this.#rid = rid; + } + + check(...checks: Deno.AtomicCheck[]): this { + for (const check of checks) { + this.#checks.push([convertKey(check.key), check.versionstamp]); + } + return this; + } + + mutate(...mutations: Deno.KvMutation[]): this { + for (const mutation of mutations) { + const key = convertKey(mutation.key); + let type: string; + let value: RawValue | null; + switch (mutation.type) { + case "delete": + type = "delete"; + if (mutation.value) { + throw new TypeError("invalid mutation 'delete' with value"); + } + break; + case "set": + case "sum": + case "min": + case "max": + type = mutation.type; + if (!("value" in mutation)) { + throw new TypeError(`invalid mutation '${type}' without value`); + } + value = serializeValue(mutation.value); + break; + default: + throw new TypeError("Invalid mutation type"); + } + this.#mutations.push([key, type, value]); + } + return this; + } + + set(key: Deno.KvKey, value: unknown): this { + this.#mutations.push([convertKey(key), "set", serializeValue(value)]); + return this; + } + + delete(key: Deno.KvKey): this { + this.#mutations.push([convertKey(key), "delete", null]); + return this; + } + + async commit(): Promise { + const result = await core.opAsync( + "op_kv_atomic_write", + this.#rid, + this.#checks, + this.#mutations, + [], // TODO(@losfair): enqueue + ); + return result; + } + + then() { + throw new TypeError( + "`Deno.AtomicOperation` is not a promise. Did you forget to call `commit()`?", + ); + } +} + +const MIN_U64 = 0n; +const MAX_U64 = 0xffffffffffffffffn; + +class KvU64 { + readonly value: bigint; + + constructor(value: bigint) { + if (typeof value !== "bigint") { + throw new TypeError("value must be a bigint"); + } + if (value < MIN_U64) { + throw new RangeError("value must be a positive bigint"); + } + if (value > MAX_U64) { + throw new RangeError("value must be a 64-bit unsigned integer"); + } + this.value = value; + Object.freeze(this); + } +} + +function convertKey(key: Deno.KvKey | Deno.KvKeyPart): Deno.KvKey { + if (Array.isArray(key)) { + return key; + } else { + return [key as Deno.KvKeyPart]; + } +} + +function deserializeValue(entry: RawKvEntry): Deno.KvEntry { + const { kind, value } = entry.value; + switch (kind) { + case "v8": + return { + ...entry, + value: core.deserialize(value), + }; + case "bytes": + return { + ...entry, + value, + }; + case "u64": + return { + ...entry, + value: new KvU64(value), + }; + default: + throw new TypeError("Invalid value type"); + } +} + +function serializeValue(value: unknown): RawValue { + if (value instanceof Uint8Array) { + return { + kind: "bytes", + value, + }; + } else if (value instanceof KvU64) { + return { + kind: "u64", + value: value.value, + }; + } else { + return { + kind: "v8", + value: core.serialize(value), + }; + } +} + +// This gets the %AsyncIteratorPrototype% object (which exists but is not a +// global). We extend the KvListIterator iterator from, so that we immediately +// support async iterator helpers once they land. The %AsyncIterator% does not +// yet actually exist however, so right now the AsyncIterator binding refers to +// %Object%. I know. +// Once AsyncIterator is a global, we can just use it (from primordials), rather +// than doing this here. +const AsyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype); +const AsyncIterator = AsyncIteratorPrototype.constructor; + +class KvListIterator extends AsyncIterator + implements AsyncIterator { + #selector: Deno.KvListSelector; + #entries: Deno.KvEntry[] | null = null; + #cursorGen: (() => string) | null = null; + #done = false; + #lastBatch = false; + #pullBatch: ( + selector: Deno.KvListSelector, + cursor: string | undefined, + reverse: boolean, + consistency: Deno.KvConsistencyLevel, + ) => Promise; + #limit: number | undefined; + #count = 0; + #reverse: boolean; + #batchSize: number; + #consistency: Deno.KvConsistencyLevel; + + constructor( + { limit, selector, cursor, reverse, consistency, batchSize, pullBatch }: { + limit?: number; + selector: Deno.KvListSelector; + cursor?: string; + reverse: boolean; + batchSize: number; + consistency: Deno.KvConsistencyLevel; + pullBatch: ( + selector: Deno.KvListSelector, + cursor: string | undefined, + reverse: boolean, + consistency: Deno.KvConsistencyLevel, + ) => Promise; + }, + ) { + super(); + let prefix: Deno.KvKey | undefined; + let start: Deno.KvKey | undefined; + let end: Deno.KvKey | undefined; + if ("prefix" in selector && selector.prefix !== undefined) { + prefix = Object.freeze([...selector.prefix]); + } + if ("start" in selector && selector.start !== undefined) { + start = Object.freeze([...selector.start]); + } + if ("end" in selector && selector.end !== undefined) { + end = Object.freeze([...selector.end]); + } + if (prefix) { + if (start && end) { + throw new TypeError( + "Selector can not specify both 'start' and 'end' key when specifying 'prefix'.", + ); + } + if (start) { + this.#selector = { prefix, start }; + } else if (end) { + this.#selector = { prefix, end }; + } else { + this.#selector = { prefix }; + } + } else { + if (start && end) { + this.#selector = { start, end }; + } else { + throw new TypeError( + "Selector must specify either 'prefix' or both 'start' and 'end' key.", + ); + } + } + Object.freeze(this.#selector); + this.#pullBatch = pullBatch; + this.#limit = limit; + this.#reverse = reverse; + this.#consistency = consistency; + this.#batchSize = batchSize; + this.#cursorGen = cursor ? () => cursor : null; + } + + get cursor(): string { + if (this.#cursorGen === null) { + throw new Error("Cannot get cursor before first iteration"); + } + + return this.#cursorGen(); + } + + async next(): Promise> { + // Fused or limit exceeded + if ( + this.#done || + (this.#limit !== undefined && this.#count >= this.#limit) + ) { + return { done: true, value: undefined }; + } + + // Attempt to fill the buffer + if (!this.#entries?.length && !this.#lastBatch) { + const batch = await this.#pullBatch( + this.#selector, + this.#cursorGen ? this.#cursorGen() : undefined, + this.#reverse, + this.#consistency, + ); + + // Reverse the batch so we can pop from the end + batch.reverse(); + this.#entries = batch; + + // Last batch, do not attempt to pull more + if (batch.length < this.#batchSize) { + this.#lastBatch = true; + } + } + + const entry = this.#entries?.pop(); + if (!entry) { + this.#done = true; + this.#cursorGen = () => ""; + return { done: true, value: undefined }; + } + + this.#cursorGen = () => { + const selector = this.#selector; + return encodeCursor([ + "prefix" in selector ? selector.prefix : null, + "start" in selector ? selector.start : null, + "end" in selector ? selector.end : null, + ], entry.key); + }; + this.#count++; + return { + done: false, + value: entry, + }; + } + + [Symbol.asyncIterator](): AsyncIterator { + return this; + } +} + +export { Kv, KvListIterator, KvU64, openKv }; diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml new file mode 100644 index 00000000000000..cd18adc6ce4838 --- /dev/null +++ b/ext/kv/Cargo.toml @@ -0,0 +1,24 @@ +# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_kv" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +repository.workspace = true +description = "Implementation of the Deno database API" + +[lib] +path = "lib.rs" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +base64.workspace = true +deno_core.workspace = true +hex.workspace = true +num-bigint.workspace = true +rusqlite.workspace = true +serde.workspace = true diff --git a/ext/kv/codec.rs b/ext/kv/codec.rs new file mode 100644 index 00000000000000..b2acfdbc20e393 --- /dev/null +++ b/ext/kv/codec.rs @@ -0,0 +1,559 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs + +use crate::Key; +use crate::KeyPart; + +//const NIL: u8 = 0x00; +const BYTES: u8 = 0x01; +const STRING: u8 = 0x02; +//const NESTED: u8 = 0x05; +const NEGINTSTART: u8 = 0x0b; +const INTZERO: u8 = 0x14; +const POSINTEND: u8 = 0x1d; +//const FLOAT: u8 = 0x20; +const DOUBLE: u8 = 0x21; +const FALSE: u8 = 0x26; +const TRUE: u8 = 0x27; + +const ESCAPE: u8 = 0xff; + +const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64; +const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64; + +pub fn canonicalize_f64(n: f64) -> f64 { + if n.is_nan() { + if n.is_sign_negative() { + f64::from_bits(CANONICAL_NAN_NEG) + } else { + f64::from_bits(CANONICAL_NAN_POS) + } + } else { + n + } +} + +pub fn encode_key(key: &Key) -> std::io::Result> { + // Disallow empty key + if key.0.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "key should not be empty", + )); + } + + let mut output: Vec = vec![]; + for part in &key.0 { + match part { + KeyPart::String(key) => { + output.push(STRING); + escape_raw_bytes_into(&mut output, key.as_bytes()); + output.push(0); + } + KeyPart::Int(key) => { + bigint::encode_into(&mut output, key)?; + } + KeyPart::Float(key) => { + double::encode_into(&mut output, *key); + } + KeyPart::Bytes(key) => { + output.push(BYTES); + escape_raw_bytes_into(&mut output, key); + output.push(0); + } + KeyPart::False => { + output.push(FALSE); + } + KeyPart::True => { + output.push(TRUE); + } + } + } + Ok(output) +} + +pub fn decode_key(mut bytes: &[u8]) -> std::io::Result { + // Disallow empty key + if bytes.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "key should not be empty", + )); + } + + let mut key = Key(vec![]); + while !bytes.is_empty() { + let tag = bytes[0]; + bytes = &bytes[1..]; + + let next_bytes = match tag { + self::STRING => { + let (next_bytes, data) = parse_slice(bytes)?; + let data = String::from_utf8(data).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8") + })?; + key.0.push(KeyPart::String(data)); + next_bytes + } + self::NEGINTSTART..=self::POSINTEND => { + let (next_bytes, data) = bigint::decode_from(bytes, tag)?; + key.0.push(KeyPart::Int(data)); + next_bytes + } + self::DOUBLE => { + let (next_bytes, data) = double::decode_from(bytes)?; + key.0.push(KeyPart::Float(data)); + next_bytes + } + self::BYTES => { + let (next_bytes, data) = parse_slice(bytes)?; + key.0.push(KeyPart::Bytes(data)); + next_bytes + } + self::FALSE => { + key.0.push(KeyPart::False); + bytes + } + self::TRUE => { + key.0.push(KeyPart::True); + bytes + } + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid tag", + )) + } + }; + + bytes = next_bytes; + } + Ok(key) +} + +fn escape_raw_bytes_into(out: &mut Vec, x: &[u8]) { + for &b in x { + out.push(b); + if b == 0 { + out.push(ESCAPE); + } + } +} + +mod bigint { + use num_bigint::BigInt; + use num_bigint::Sign; + + use super::parse_byte; + use super::parse_bytes; + const MAX_SZ: usize = 8; + + // Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575 + pub fn encode_into(out: &mut Vec, key: &BigInt) -> std::io::Result<()> { + if key.sign() == Sign::NoSign { + out.push(super::INTZERO); + return Ok(()); + } + let (sign, mut bytes) = key.to_bytes_be(); + let n = bytes.len(); + match sign { + Sign::Minus => { + if n <= MAX_SZ { + out.push(super::INTZERO - n as u8); + } else { + out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]); + } + invert(&mut bytes); + out.extend_from_slice(&bytes); + } + Sign::NoSign => unreachable!(), + Sign::Plus => { + if n <= MAX_SZ { + out.push(super::INTZERO + n as u8); + } else { + out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]); + } + out.extend_from_slice(&bytes); + } + } + Ok(()) + } + + pub fn decode_from( + input: &[u8], + tag: u8, + ) -> std::io::Result<(&[u8], BigInt)> { + if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 { + let n = (tag - super::INTZERO) as usize; + let (input, bytes) = parse_bytes(input, n)?; + Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes))) + } else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO { + let n = (super::INTZERO - tag) as usize; + let (input, bytes) = parse_bytes(input, n)?; + Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes)))) + } else if tag == super::NEGINTSTART { + let (input, raw_length) = parse_byte(input)?; + let n = usize::from(raw_length ^ 0xff); + let (input, bytes) = parse_bytes(input, n)?; + Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes)))) + } else if tag == super::POSINTEND { + let (input, raw_length) = parse_byte(input)?; + let n: usize = usize::from(raw_length); + let (input, bytes) = parse_bytes(input, n)?; + Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes))) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("unknown bigint tag: {}", tag), + )) + } + } + + fn invert(bytes: &mut [u8]) { + // The ones' complement of a binary number is defined as the value + // obtained by inverting all the bits in the binary representation + // of the number (swapping 0s for 1s and vice versa). + for byte in bytes.iter_mut() { + *byte = !*byte; + } + } + + fn inverted(bytes: &[u8]) -> Vec { + // The ones' complement of a binary number is defined as the value + // obtained by inverting all the bits in the binary representation + // of the number (swapping 0s for 1s and vice versa). + bytes.iter().map(|byte| !*byte).collect() + } + + fn bigint_n(n: usize) -> std::io::Result { + u8::try_from(n).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "BigUint requires more than 255 bytes to be represented", + ) + }) + } +} + +mod double { + macro_rules! sign_bit { + ($type:ident) => { + (1 << (std::mem::size_of::<$type>() * 8 - 1)) + }; + } + + fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] { + let u = if f.is_sign_negative() { + f.to_bits() ^ ::std::u64::MAX + } else { + f.to_bits() ^ sign_bit!(u64) + }; + u.to_be_bytes() + } + + pub fn encode_into(out: &mut Vec, x: f64) { + out.push(super::DOUBLE); + out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x))); + } + + pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> { + let (input, bytes) = super::parse_bytes(input, 8)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + let u = u64::from_be_bytes(arr); + Ok(( + input, + f64::from_bits(if (u & sign_bit!(u64)) == 0 { + u ^ ::std::u64::MAX + } else { + u ^ sign_bit!(u64) + }), + )) + } +} + +#[inline] +fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> { + if input.len() < num { + Err(std::io::ErrorKind::UnexpectedEof.into()) + } else { + Ok((&input[num..], &input[..num])) + } +} + +#[inline] +fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> { + if input.is_empty() { + Err(std::io::ErrorKind::UnexpectedEof.into()) + } else { + Ok((&input[1..], input[0])) + } +} + +fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec)> { + let mut output: Vec = Vec::new(); + let mut i = 0usize; + + while i < input.len() { + let byte = input[i]; + i += 1; + + if byte == 0 { + if input.get(i).copied() == Some(ESCAPE) { + output.push(0); + i += 1; + continue; + } else { + return Ok((&input[i..], output)); + } + } + + output.push(byte); + } + + Err(std::io::ErrorKind::UnexpectedEof.into()) +} + +#[cfg(test)] +mod tests { + use num_bigint::BigInt; + use std::cmp::Ordering; + + use crate::Key; + use crate::KeyPart; + + use super::decode_key; + use super::encode_key; + + fn roundtrip(key: Key) { + let bytes = encode_key(&key).unwrap(); + let decoded = decode_key(&bytes).unwrap(); + assert_eq!(&key, &decoded); + assert_eq!(format!("{:?}", key), format!("{:?}", decoded)); + } + + fn check_order(a: Key, b: Key, expected: Ordering) { + let a_bytes = encode_key(&a).unwrap(); + let b_bytes = encode_key(&b).unwrap(); + + assert_eq!(a.cmp(&b), expected); + assert_eq!(a_bytes.cmp(&b_bytes), expected); + } + + fn check_bijection(key: Key, serialized: &[u8]) { + let bytes = encode_key(&key).unwrap(); + assert_eq!(&bytes[..], serialized); + let decoded = decode_key(serialized).unwrap(); + assert_eq!(&key, &decoded); + } + + #[test] + fn simple_roundtrip() { + roundtrip(Key(vec![ + KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]), + KeyPart::String("foo".to_string()), + KeyPart::Float(-f64::NAN), + KeyPart::Float(-f64::INFINITY), + KeyPart::Float(-42.1), + KeyPart::Float(-0.0), + KeyPart::Float(0.0), + KeyPart::Float(42.1), + KeyPart::Float(f64::INFINITY), + KeyPart::Float(f64::NAN), + KeyPart::Int(BigInt::from(-10000)), + KeyPart::Int(BigInt::from(-1)), + KeyPart::Int(BigInt::from(0)), + KeyPart::Int(BigInt::from(1)), + KeyPart::Int(BigInt::from(10000)), + KeyPart::False, + KeyPart::True, + ])); + } + + #[test] + #[rustfmt::skip] + fn order_bytes() { + check_order( + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Ordering::Equal, + ); + + check_order( + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]), + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Ordering::Greater, + ); + + check_order( + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]), + Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), + Ordering::Greater, + ); + } + + #[test] + #[rustfmt::skip] + fn order_tags() { + check_order( + Key(vec![KeyPart::Bytes(vec![])]), + Key(vec![KeyPart::String("".into())]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::String("".into())]), + Key(vec![KeyPart::Int(BigInt::from(0))]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::Int(BigInt::from(0))]), + Key(vec![KeyPart::Float(0.0)]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::Float(0.0)]), + Key(vec![KeyPart::False]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::False]), + Key(vec![KeyPart::True]), + Ordering::Less, + ); + + check_order( + Key(vec![KeyPart::True]), + Key(vec![KeyPart::Bytes(vec![])]), + Ordering::Greater, + ); + } + + #[test] + #[rustfmt::skip] + fn order_floats() { + check_order( + Key(vec![KeyPart::Float(-f64::NAN)]), + Key(vec![KeyPart::Float(-f64::INFINITY)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(-f64::INFINITY)]), + Key(vec![KeyPart::Float(-10.0)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(-10.0)]), + Key(vec![KeyPart::Float(-0.0)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(-0.0)]), + Key(vec![KeyPart::Float(0.0)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(0.0)]), + Key(vec![KeyPart::Float(10.0)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(10.0)]), + Key(vec![KeyPart::Float(f64::INFINITY)]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Float(f64::INFINITY)]), + Key(vec![KeyPart::Float(f64::NAN)]), + Ordering::Less, + ); + } + + #[test] + #[rustfmt::skip] + fn order_ints() { + check_order( + Key(vec![KeyPart::Int(BigInt::from(-10000))]), + Key(vec![KeyPart::Int(BigInt::from(-100))]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Int(BigInt::from(-100))]), + Key(vec![KeyPart::Int(BigInt::from(-1))]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Int(BigInt::from(-1))]), + Key(vec![KeyPart::Int(BigInt::from(0))]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Int(BigInt::from(0))]), + Key(vec![KeyPart::Int(BigInt::from(1))]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Int(BigInt::from(1))]), + Key(vec![KeyPart::Int(BigInt::from(100))]), + Ordering::Less, + ); + check_order( + Key(vec![KeyPart::Int(BigInt::from(100))]), + Key(vec![KeyPart::Int(BigInt::from(10000))]), + Ordering::Less, + ); + } + + #[test] + #[rustfmt::skip] + fn float_canonicalization() { + let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]); + let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]); + + assert_eq!(key1, key2); + assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap()); + } + + #[test] + #[rustfmt::skip] + fn explicit_bijection() { + // string + check_bijection( + Key(vec![KeyPart::String("hello".into())]), + &[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00], + ); + + // zero byte escape + check_bijection( + Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]), + &[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00], + ); + + // array + check_bijection( + Key(vec![ + KeyPart::String("hello".into()), + KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]), + ]), + &[ + 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */ + 0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */ + ], + ); + } +} diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs new file mode 100644 index 00000000000000..ee27522d154660 --- /dev/null +++ b/ext/kv/interface.rs @@ -0,0 +1,294 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::RefCell; +use std::cmp::Ordering; +use std::num::NonZeroU32; +use std::rc::Rc; + +use async_trait::async_trait; +use deno_core::error::AnyError; +use deno_core::OpState; +use num_bigint::BigInt; + +use crate::codec::canonicalize_f64; + +#[async_trait(?Send)] +pub trait DatabaseHandler { + type DB: Database + 'static; + + async fn open( + &self, + state: Rc>, + path: Option, + ) -> Result; +} + +#[async_trait(?Send)] +pub trait Database { + async fn snapshot_read( + &self, + requests: Vec, + options: SnapshotReadOptions, + ) -> Result, AnyError>; + + async fn atomic_write(&self, write: AtomicWrite) -> Result; +} + +/// Options for a snapshot read. +pub struct SnapshotReadOptions { + pub consistency: Consistency, +} + +/// The consistency of a read. +#[derive(Eq, PartialEq, Copy, Clone, Debug)] +pub enum Consistency { + Strong, + Eventual, +} + +/// A key is for a KV pair. It is a vector of KeyParts. +/// +/// The ordering of the keys is defined by the ordering of the KeyParts. The +/// first KeyPart is the most significant, and the last KeyPart is the least +/// significant. +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] +pub struct Key(pub Vec); + +/// A key part is single item in a key. It can be a boolean, a double float, a +/// variable precision signed integer, a UTF-8 string, or an arbitrary byte +/// array. +/// +/// The ordering of a KeyPart is dependent on the type of the KeyPart. +/// +/// Between different types, the ordering is as follows: arbitrary byte array < +/// UTF-8 string < variable precision signed integer < double float < false < true. +/// +/// Within a type, the ordering is as follows: +/// - For a **boolean**, false is less than true. +/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN. +/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering. +/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering. +/// - For an **arbitrary byte array**, the ordering must follow the byte ordering. +/// +/// This means that the key part `1.0` is less than the key part `2.0`, but is +/// greater than the key part `0n`, because `1.0` is a double float and `0n` +/// is a variable precision signed integer, and the ordering types obviously has +/// precedence over the ordering within a type. +#[derive(Clone, Debug)] +pub enum KeyPart { + Bytes(Vec), + String(String), + Int(BigInt), + Float(f64), + False, + True, +} + +impl KeyPart { + fn tag_ordering(&self) -> u8 { + match self { + KeyPart::Bytes(_) => 0, + KeyPart::String(_) => 1, + KeyPart::Int(_) => 2, + KeyPart::Float(_) => 3, + KeyPart::False => 4, + KeyPart::True => 5, + } + } +} + +impl Eq for KeyPart {} + +impl PartialEq for KeyPart { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Ord for KeyPart { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2), + (KeyPart::String(s1), KeyPart::String(s2)) => { + s1.as_bytes().cmp(s2.as_bytes()) + } + (KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2), + (KeyPart::Float(f1), KeyPart::Float(f2)) => { + canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2)) + } + _ => self.tag_ordering().cmp(&other.tag_ordering()), + } + } +} + +impl PartialOrd for KeyPart { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// A request to read a range of keys from the database. If `end` is `None`, +/// then the range is from `start` shall also be used as the end of the range. +/// +/// The range is inclusive of the start and exclusive of the end. The start may +/// not be greater than the end. +/// +/// The range is limited to `limit` number of entries. +pub struct ReadRange { + pub start: Vec, + pub end: Vec, + pub limit: NonZeroU32, + pub reverse: bool, +} + +/// A response to a `ReadRange` request. +pub struct ReadRangeOutput { + pub entries: Vec, +} + +/// A versionstamp is a 10 byte array that is used to represent the version of +/// a key in the database. +type Versionstamp = [u8; 10]; + +/// A key-value entry with a versionstamp. +pub struct KvEntry { + pub key: Vec, + pub value: Value, + pub versionstamp: Versionstamp, +} + +/// A serialized value for a KV pair as stored in the database. All values +/// **can** be serialized into the V8 representation, but not all values are. +/// +/// The V8 representation is an opaque byte array that is only meaningful to +/// the V8 engine. It is guaranteed to be backwards compatible. Because this +/// representation is opaque, it is not possible to inspect or modify the value +/// without deserializing it. +/// +/// The inability to inspect or modify the value without deserializing it means +/// that these values can not be quickly modified when performing atomic +/// read-modify-write operations on the database (because the database may not +/// have the ability to deserialize the V8 value into a modifiable value). +/// +/// Because of this constraint, there are more specialized representations for +/// certain types of values that can be used in atomic read-modify-write +/// operations. These specialized representations are: +/// +/// - **Bytes**: an arbitrary byte array. +/// - **U64**: a 64-bit unsigned integer. +pub enum Value { + V8(Vec), + Bytes(Vec), + U64(u64), +} + +/// A request to perform an atomic check-modify-write operation on the database. +/// +/// The operation is performed atomically, meaning that the operation will +/// either succeed or fail. If the operation fails, then the database will be +/// left in the same state as before the operation was attempted. If the +/// operation succeeds, then the database will be left in a new state. +/// +/// The operation is performed by first checking the database for the current +/// state of the keys, defined by the `checks` field. If the current state of +/// the keys does not match the expected state, then the operation fails. If +/// the current state of the keys matches the expected state, then the +/// mutations are applied to the database. +/// +/// All checks and mutations are performed atomically. +/// +/// The mutations are performed in the order that they are specified in the +/// `mutations` field. The order of checks is not specified, and is also not +/// important because this ordering is un-observable. +pub struct AtomicWrite { + pub checks: Vec, + pub mutations: Vec, + pub enqueues: Vec, +} + +/// A request to perform a check on a key in the database. The check is not +/// performed on the value of the key, but rather on the versionstamp of the +/// key. +pub struct KvCheck { + pub key: Vec, + pub versionstamp: Option, +} + +/// A request to perform a mutation on a key in the database. The mutation is +/// performed on the value of the key. +/// +/// The type of mutation is specified by the `kind` field. The action performed +/// by each mutation kind is specified in the docs for [MutationKind]. +pub struct KvMutation { + pub key: Vec, + pub kind: MutationKind, +} + +/// A request to enqueue a message to the database. This message is delivered +/// to a listener of the queue at least once. +/// +/// ## Retry +/// +/// When the delivery of a message fails, it is retried for a finite number +/// of times. Each retry happens after a backoff period. The backoff periods +/// are specified by the `backoff_schedule` field in milliseconds. If +/// unspecified, the default backoff schedule of the platform (CLI or Deploy) +/// is used. +/// +/// If all retry attempts failed, the message is written to the KV under all +/// keys specified in `keys_if_undelivered`. +pub struct Enqueue { + pub payload: Vec, + pub deadline_ms: u64, + pub keys_if_undelivered: Vec>, + pub backoff_schedule: Option>, +} + +/// The type of mutation to perform on a key in the database. +/// +/// ## Set +/// +/// The set mutation sets the value of the key to the specified value. It +/// discards the previous value of the key, if any. +/// +/// This operand supports all [Value] types. +/// +/// ## Delete +/// +/// The delete mutation deletes the value of the key. +/// +/// ## Sum +/// +/// The sum mutation adds the specified value to the existing value of the key. +/// +/// This operand supports only value types [Value::U64]. The existing value in +/// the database must match the type of the value specified in the mutation. If +/// the key does not exist in the database, then the value specified in the +/// mutation is used as the new value of the key. +/// +/// ## Min +/// +/// The min mutation sets the value of the key to the minimum of the existing +/// value of the key and the specified value. +/// +/// This operand supports only value types [Value::U64]. The existing value in +/// the database must match the type of the value specified in the mutation. If +/// the key does not exist in the database, then the value specified in the +/// mutation is used as the new value of the key. +/// +/// ## Max +/// +/// The max mutation sets the value of the key to the maximum of the existing +/// value of the key and the specified value. +/// +/// This operand supports only value types [Value::U64]. The existing value in +/// the database must match the type of the value specified in the mutation. If +/// the key does not exist in the database, then the value specified in the +/// mutation is used as the new value of the key. +pub enum MutationKind { + Set(Value), + Delete, + Sum(Value), + Min(Value), + Max(Value), +} diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs new file mode 100644 index 00000000000000..49a59af7474f97 --- /dev/null +++ b/ext/kv/lib.rs @@ -0,0 +1,541 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +pub mod codec; +mod interface; +pub mod sqlite; + +use std::borrow::Cow; +use std::cell::RefCell; +use std::num::NonZeroU32; +use std::rc::Rc; + +use codec::decode_key; +use codec::encode_key; +use deno_core::anyhow::Context; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op; +use deno_core::serde_v8::AnyValue; +use deno_core::serde_v8::BigInt; +use deno_core::ByteString; +use deno_core::OpState; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; + +pub use crate::interface::*; + +struct UnstableChecker { + pub unstable: bool, +} + +impl UnstableChecker { + // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` + pub fn check_unstable(&self, api_name: &str) { + if !self.unstable { + eprintln!( + "Unstable API '{api_name}'. The --unstable flag must be provided." + ); + std::process::exit(70); + } + } +} + +deno_core::extension!(deno_kv, + // TODO(bartlomieju): specify deps + deps = [ ], + parameters = [ DBH: DatabaseHandler ], + ops = [ + op_kv_database_open, + op_kv_snapshot_read, + op_kv_atomic_write, + op_kv_encode_cursor, + ], + esm = [ "01_db.ts" ], + options = { + handler: DBH, + unstable: bool, + }, + state = |state, options| { + state.put(Rc::new(options.handler)); + state.put(UnstableChecker { unstable: options.unstable }) + } +); + +struct DatabaseResource { + db: Rc, +} + +impl Resource for DatabaseResource { + fn name(&self) -> Cow { + "database".into() + } +} + +#[op] +async fn op_kv_database_open( + state: Rc>, + path: Option, +) -> Result +where + DBH: DatabaseHandler + 'static, +{ + let handler = { + let state = state.borrow(); + state + .borrow::() + .check_unstable("Deno.openKv"); + state.borrow::>().clone() + }; + let db = handler.open(state.clone(), path).await?; + let rid = state + .borrow_mut() + .resource_table + .add(DatabaseResource { db: Rc::new(db) }); + Ok(rid) +} + +type KvKey = Vec; + +impl From for KeyPart { + fn from(value: AnyValue) -> Self { + match value { + AnyValue::Bool(false) => KeyPart::True, + AnyValue::Bool(true) => KeyPart::False, + AnyValue::Number(n) => KeyPart::Float(n), + AnyValue::BigInt(n) => KeyPart::Int(n), + AnyValue::String(s) => KeyPart::String(s), + AnyValue::Buffer(buf) => KeyPart::Bytes(buf.to_vec()), + } + } +} + +impl From for AnyValue { + fn from(value: KeyPart) -> Self { + match value { + KeyPart::True => AnyValue::Bool(false), + KeyPart::False => AnyValue::Bool(true), + KeyPart::Float(n) => AnyValue::Number(n), + KeyPart::Int(n) => AnyValue::BigInt(n), + KeyPart::String(s) => AnyValue::String(s), + KeyPart::Bytes(buf) => AnyValue::Buffer(buf.into()), + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "kind", content = "value", rename_all = "snake_case")] +enum V8Value { + V8(ZeroCopyBuf), + Bytes(ZeroCopyBuf), + U64(BigInt), +} + +impl TryFrom for Value { + type Error = AnyError; + fn try_from(value: V8Value) -> Result { + Ok(match value { + V8Value::V8(buf) => Value::V8(buf.to_vec()), + V8Value::Bytes(buf) => Value::Bytes(buf.to_vec()), + V8Value::U64(n) => Value::U64(num_bigint::BigInt::from(n).try_into()?), + }) + } +} + +impl From for V8Value { + fn from(value: Value) -> Self { + match value { + Value::V8(buf) => V8Value::V8(buf.into()), + Value::Bytes(buf) => V8Value::Bytes(buf.into()), + Value::U64(n) => V8Value::U64(num_bigint::BigInt::from(n).into()), + } + } +} + +#[derive(Deserialize, Serialize)] +struct V8KvEntry { + key: KvKey, + value: V8Value, + versionstamp: ByteString, +} + +impl TryFrom for V8KvEntry { + type Error = AnyError; + fn try_from(entry: KvEntry) -> Result { + Ok(V8KvEntry { + key: decode_key(&entry.key)? + .0 + .into_iter() + .map(Into::into) + .collect(), + value: entry.value.into(), + versionstamp: hex::encode(entry.versionstamp).into(), + }) + } +} + +#[derive(Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +enum V8Consistency { + Strong, + Eventual, +} + +impl From for Consistency { + fn from(value: V8Consistency) -> Self { + match value { + V8Consistency::Strong => Consistency::Strong, + V8Consistency::Eventual => Consistency::Eventual, + } + } +} + +// (prefix, start, end, limit, reverse, cursor) +type SnapshotReadRange = ( + Option, + Option, + Option, + u32, + bool, + Option, +); + +#[op] +async fn op_kv_snapshot_read( + state: Rc>, + rid: ResourceId, + ranges: Vec, + consistency: V8Consistency, +) -> Result>, AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let db = { + let state = state.borrow(); + let resource = + state.resource_table.get::>(rid)?; + resource.db.clone() + }; + let read_ranges = ranges + .into_iter() + .map(|(prefix, start, end, limit, reverse, cursor)| { + let selector = RawSelector::from_tuple(prefix, start, end)?; + + let (start, end) = + decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?; + Ok(ReadRange { + start, + end, + limit: NonZeroU32::new(limit) + .with_context(|| "limit must be greater than 0")?, + reverse, + }) + }) + .collect::, AnyError>>()?; + let opts = SnapshotReadOptions { + consistency: consistency.into(), + }; + let output_ranges = db.snapshot_read(read_ranges, opts).await?; + let output_ranges = output_ranges + .into_iter() + .map(|x| { + x.entries + .into_iter() + .map(TryInto::try_into) + .collect::, AnyError>>() + }) + .collect::, AnyError>>()?; + Ok(output_ranges) +} + +type V8KvCheck = (KvKey, Option); + +impl TryFrom for KvCheck { + type Error = AnyError; + fn try_from(value: V8KvCheck) -> Result { + let versionstamp = match value.1 { + Some(data) => { + let mut out = [0u8; 10]; + hex::decode_to_slice(data, &mut out) + .map_err(|_| type_error("invalid versionstamp"))?; + Some(out) + } + None => None, + }; + Ok(KvCheck { + key: encode_v8_key(value.0)?, + versionstamp, + }) + } +} + +type V8KvMutation = (KvKey, String, Option); + +impl TryFrom for KvMutation { + type Error = AnyError; + fn try_from(value: V8KvMutation) -> Result { + let key = encode_v8_key(value.0)?; + let kind = match (value.1.as_str(), value.2) { + ("set", Some(value)) => MutationKind::Set(value.try_into()?), + ("delete", None) => MutationKind::Delete, + ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), + ("min", Some(value)) => MutationKind::Min(value.try_into()?), + ("max", Some(value)) => MutationKind::Max(value.try_into()?), + (op, Some(_)) => { + return Err(type_error(format!("invalid mutation '{op}' with value"))) + } + (op, None) => { + return Err(type_error(format!( + "invalid mutation '{op}' without value" + ))) + } + }; + Ok(KvMutation { key, kind }) + } +} + +type V8Enqueue = (ZeroCopyBuf, u64, Vec, Option>); + +impl TryFrom for Enqueue { + type Error = AnyError; + fn try_from(value: V8Enqueue) -> Result { + Ok(Enqueue { + payload: value.0.to_vec(), + deadline_ms: value.1, + keys_if_undelivered: value + .2 + .into_iter() + .map(encode_v8_key) + .collect::>()?, + backoff_schedule: value.3, + }) + } +} + +fn encode_v8_key(key: KvKey) -> Result, std::io::Error> { + encode_key(&Key(key.into_iter().map(From::from).collect())) +} + +enum RawSelector { + Prefixed { + prefix: Vec, + start: Option>, + end: Option>, + }, + Range { + start: Vec, + end: Vec, + }, +} + +impl RawSelector { + fn from_tuple( + prefix: Option, + start: Option, + end: Option, + ) -> Result { + let prefix = prefix.map(encode_v8_key).transpose()?; + let start = start.map(encode_v8_key).transpose()?; + let end = end.map(encode_v8_key).transpose()?; + + match (prefix, start, end) { + (Some(prefix), None, None) => Ok(Self::Prefixed { + prefix, + start: None, + end: None, + }), + (Some(prefix), Some(start), None) => Ok(Self::Prefixed { + prefix, + start: Some(start), + end: None, + }), + (Some(prefix), None, Some(end)) => Ok(Self::Prefixed { + prefix, + start: None, + end: Some(end), + }), + (None, Some(start), Some(end)) => Ok(Self::Range { start, end }), + (None, Some(start), None) => { + let end = start.iter().copied().chain(Some(0)).collect(); + Ok(Self::Range { start, end }) + } + _ => Err(type_error("invalid range")), + } + } + + fn start(&self) -> Option<&[u8]> { + match self { + Self::Prefixed { start, .. } => start.as_deref(), + Self::Range { start, .. } => Some(start), + } + } + + fn end(&self) -> Option<&[u8]> { + match self { + Self::Prefixed { end, .. } => end.as_deref(), + Self::Range { end, .. } => Some(end), + } + } + + fn common_prefix(&self) -> &[u8] { + match self { + Self::Prefixed { prefix, .. } => prefix, + Self::Range { start, end } => common_prefix_for_bytes(start, end), + } + } + + fn range_start_key(&self) -> Vec { + match self { + Self::Prefixed { + start: Some(start), .. + } => start.clone(), + Self::Range { start, .. } => start.clone(), + Self::Prefixed { prefix, .. } => { + prefix.iter().copied().chain(Some(0)).collect() + } + } + } + + fn range_end_key(&self) -> Vec { + match self { + Self::Prefixed { end: Some(end), .. } => end.clone(), + Self::Range { end, .. } => end.clone(), + Self::Prefixed { prefix, .. } => { + prefix.iter().copied().chain(Some(0xff)).collect() + } + } + } +} + +fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] { + let mut i = 0; + while i < a.len() && i < b.len() && a[i] == b[i] { + i += 1; + } + &a[..i] +} + +fn encode_cursor( + selector: &RawSelector, + boundary_key: &[u8], +) -> Result { + let common_prefix = selector.common_prefix(); + if !boundary_key.starts_with(common_prefix) { + return Err(type_error("invalid boundary key")); + } + + Ok(base64::encode_config( + &boundary_key[common_prefix.len()..], + base64::URL_SAFE, + )) +} + +fn decode_selector_and_cursor( + selector: &RawSelector, + reverse: bool, + cursor: Option<&ByteString>, +) -> Result<(Vec, Vec), AnyError> { + let Some(cursor) = cursor else { + return Ok((selector.range_start_key(), selector.range_end_key())); + }; + + let common_prefix = selector.common_prefix(); + let cursor = base64::decode_config(cursor, base64::URL_SAFE) + .map_err(|_| type_error("invalid cursor"))?; + + let first_key: Vec; + let last_key: Vec; + + if reverse { + first_key = selector.range_start_key(); + last_key = common_prefix + .iter() + .copied() + .chain(cursor.iter().copied()) + .collect(); + } else { + first_key = common_prefix + .iter() + .copied() + .chain(cursor.iter().copied()) + .chain(Some(0)) + .collect(); + last_key = selector.range_end_key(); + } + + // Defend against out-of-bounds reading + if let Some(start) = selector.start() { + if &first_key[..] < start { + return Err(type_error("cursor out of bounds")); + } + } + + if let Some(end) = selector.end() { + if &last_key[..] > end { + return Err(type_error("cursor out of bounds")); + } + } + + Ok((first_key, last_key)) +} + +#[op] +async fn op_kv_atomic_write( + state: Rc>, + rid: ResourceId, + checks: Vec, + mutations: Vec, + enqueues: Vec, +) -> Result +where + DBH: DatabaseHandler + 'static, +{ + let db = { + let state = state.borrow(); + let resource = + state.resource_table.get::>(rid)?; + resource.db.clone() + }; + + let checks = checks + .into_iter() + .map(TryInto::try_into) + .collect::>() + .with_context(|| "invalid check")?; + let mutations = mutations + .into_iter() + .map(TryInto::try_into) + .collect::>() + .with_context(|| "invalid mutation")?; + let enqueues = enqueues + .into_iter() + .map(TryInto::try_into) + .collect::>() + .with_context(|| "invalid enqueue")?; + + let atomic_write = AtomicWrite { + checks, + mutations, + enqueues, + }; + + let result = db.atomic_write(atomic_write).await?; + + Ok(result) +} + +// (prefix, start, end) +type EncodeCursorRangeSelector = (Option, Option, Option); + +#[op] +fn op_kv_encode_cursor( + (prefix, start, end): EncodeCursorRangeSelector, + boundary_key: KvKey, +) -> Result { + let selector = RawSelector::from_tuple(prefix, start, end)?; + let boundary_key = encode_v8_key(boundary_key)?; + let cursor = encode_cursor(&selector, &boundary_key)?; + Ok(cursor) +} diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs new file mode 100644 index 00000000000000..82ff8f8e27c3d7 --- /dev/null +++ b/ext/kv/sqlite.rs @@ -0,0 +1,348 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::marker::PhantomData; +use std::path::Path; +use std::path::PathBuf; +use std::rc::Rc; + +use async_trait::async_trait; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::OpState; +use rusqlite::params; +use rusqlite::OptionalExtension; +use rusqlite::Transaction; + +use crate::AtomicWrite; +use crate::Database; +use crate::DatabaseHandler; +use crate::KvEntry; +use crate::MutationKind; +use crate::ReadRange; +use crate::ReadRangeOutput; +use crate::SnapshotReadOptions; +use crate::Value; + +const STATEMENT_INC_AND_GET_DATA_VERSION: &str = + "update data_version set version = version + 1 where k = 0 returning version"; +const STATEMENT_KV_RANGE_SCAN: &str = + "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?"; +const STATEMENT_KV_RANGE_SCAN_REVERSE: &str = + "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?"; +const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str = + "select v, v_encoding from kv where k = ?"; +const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str = + "select version from kv where k = ?"; +const STATEMENT_KV_POINT_SET: &str = + "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version"; +const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?"; + +const STATEMENT_CREATE_MIGRATION_TABLE: &str = " +create table if not exists migration_state( + k integer not null primary key, + version integer not null +) +"; + +const MIGRATIONS: [&str; 2] = [ + " +create table data_version ( + k integer primary key, + version integer not null +); +insert into data_version (k, version) values (0, 0); +create table kv ( + k blob primary key, + v blob not null, + v_encoding integer not null, + version integer not null +) without rowid; +", + " +create table queue ( + ts integer not null, + id text not null, + data blob not null, + backoff_schedule text not null, + keys_if_undelivered blob not null, + + primary key (ts, id) +); +create table queue_running( + deadline integer not null, + id text not null, + data blob not null, + backoff_schedule text not null, + keys_if_undelivered blob not null, + + primary key (deadline, id) +); +", +]; + +pub struct SqliteDbHandler { + pub default_storage_dir: Option, + _permissions: PhantomData

, +} + +pub trait SqliteDbHandlerPermissions { + fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>; + fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>; +} + +impl SqliteDbHandler

{ + pub fn new(default_storage_dir: Option) -> Self { + Self { + default_storage_dir, + _permissions: PhantomData, + } + } +} + +#[async_trait(?Send)] +impl DatabaseHandler for SqliteDbHandler

{ + type DB = SqliteDb; + + async fn open( + &self, + state: Rc>, + path: Option, + ) -> Result { + let conn = match (path.as_deref(), &self.default_storage_dir) { + (Some(":memory:") | None, None) => { + rusqlite::Connection::open_in_memory()? + } + (Some(path), _) => { + let path = Path::new(path); + { + let mut state = state.borrow_mut(); + let permissions = state.borrow_mut::

(); + permissions.check_read(path, "Deno.openKv")?; + permissions.check_write(path, "Deno.openKv")?; + } + rusqlite::Connection::open(path)? + } + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + rusqlite::Connection::open(&path)? + } + }; + + conn.pragma_update(None, "journal_mode", "wal")?; + conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; + + let current_version: usize = conn + .query_row( + "select version from migration_state where k = 0", + [], + |row| row.get(0), + ) + .optional()? + .unwrap_or(0); + + for (i, migration) in MIGRATIONS.iter().enumerate() { + let version = i + 1; + if version > current_version { + conn.execute_batch(migration)?; + conn.execute( + "replace into migration_state (k, version) values(?, ?)", + [&0, &version], + )?; + } + } + + Ok(SqliteDb(RefCell::new(conn))) + } +} + +pub struct SqliteDb(RefCell); + +#[async_trait(?Send)] +impl Database for SqliteDb { + async fn snapshot_read( + &self, + requests: Vec, + _options: SnapshotReadOptions, + ) -> Result, AnyError> { + let mut responses = Vec::with_capacity(requests.len()); + let mut db = self.0.borrow_mut(); + let tx = db.transaction()?; + + for request in requests { + let mut stmt = tx.prepare_cached(if request.reverse { + STATEMENT_KV_RANGE_SCAN_REVERSE + } else { + STATEMENT_KV_RANGE_SCAN + })?; + let entries = stmt + .query_map( + ( + request.start.as_slice(), + request.end.as_slice(), + request.limit.get(), + ), + |row| { + let key: Vec = row.get(0)?; + let value: Vec = row.get(1)?; + let encoding: i64 = row.get(2)?; + + let value = decode_value(value, encoding); + + let version: i64 = row.get(3)?; + Ok(KvEntry { + key, + value, + versionstamp: version_to_versionstamp(version), + }) + }, + )? + .collect::, rusqlite::Error>>()?; + responses.push(ReadRangeOutput { entries }); + } + + Ok(responses) + } + + async fn atomic_write(&self, write: AtomicWrite) -> Result { + let mut db = self.0.borrow_mut(); + + let tx = db.transaction()?; + + for check in write.checks { + let real_versionstamp = tx + .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? + .query_row([check.key.as_slice()], |row| row.get(0)) + .optional()? + .map(version_to_versionstamp); + if real_versionstamp != check.versionstamp { + return Ok(false); + } + } + + let version: i64 = tx + .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? + .query_row([], |row| row.get(0))?; + + for mutation in write.mutations { + match mutation.kind { + MutationKind::Set(value) => { + let (value, encoding) = encode_value(&value); + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_SET)? + .execute(params![mutation.key, &value, &encoding, &version])?; + assert_eq!(changed, 1) + } + MutationKind::Delete => { + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_DELETE)? + .execute(params![mutation.key])?; + assert!(changed == 0 || changed == 1) + } + MutationKind::Sum(operand) => { + mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| { + a.wrapping_add(b) + })?; + } + MutationKind::Min(operand) => { + mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| { + a.min(b) + })?; + } + MutationKind::Max(operand) => { + mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| { + a.max(b) + })?; + } + } + } + + // TODO(@losfair): enqueues + + tx.commit()?; + + Ok(true) + } +} + +/// Mutates a LE64 value in the database, defaulting to setting it to the +/// operand if it doesn't exist. +fn mutate_le64( + tx: &Transaction, + key: &[u8], + op_name: &str, + operand: &Value, + new_version: i64, + mutate: impl FnOnce(u64, u64) -> u64, +) -> Result<(), AnyError> { + let Value::U64(operand) = *operand else { + return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 operand"))); + }; + + let old_value = tx + .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)? + .query_row([key], |row| { + let value: Vec = row.get(0)?; + let encoding: i64 = row.get(1)?; + + let value = decode_value(value, encoding); + Ok(value) + }) + .optional()?; + + let new_value = match old_value { + Some(Value::U64(old_value) ) => mutate(old_value, operand), + Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))), + None => operand, + }; + + let new_value = Value::U64(new_value); + let (new_value, encoding) = encode_value(&new_value); + + let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ + key, + &new_value[..], + encoding, + new_version + ])?; + assert_eq!(changed, 1); + + Ok(()) +} + +fn version_to_versionstamp(version: i64) -> [u8; 10] { + let mut versionstamp = [0; 10]; + versionstamp[..8].copy_from_slice(&version.to_be_bytes()); + versionstamp +} + +const VALUE_ENCODING_V8: i64 = 1; +const VALUE_ENCODING_LE64: i64 = 2; +const VALUE_ENCODING_BYTES: i64 = 3; + +fn decode_value(value: Vec, encoding: i64) -> crate::Value { + match encoding { + VALUE_ENCODING_V8 => crate::Value::V8(value), + VALUE_ENCODING_BYTES => crate::Value::Bytes(value), + VALUE_ENCODING_LE64 => { + let mut buf = [0; 8]; + buf.copy_from_slice(&value); + crate::Value::U64(u64::from_le_bytes(buf)) + } + _ => todo!(), + } +} + +fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { + match value { + crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8), + crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES), + crate::Value::U64(value) => { + let mut buf = [0; 8]; + buf.copy_from_slice(&value.to_le_bytes()); + (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64) + } + } +} diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 9ed990a23c46e7..c91d6a0569a92e 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -18,7 +18,7 @@ aes.workspace = true cbc.workspace = true deno_core.workspace = true digest = { version = "0.10.5", features = ["core-api", "std"] } -hex = "0.4.3" +hex.workspace = true idna = "0.3.0" indexmap.workspace = true md-5 = "0.10.5" diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e3a98e9afab045..c729e70f62d696 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -47,6 +47,7 @@ deno_http.workspace = true deno_io.workspace = true deno_net.workspace = true deno_node.workspace = true +deno_kv.workspace = true deno_tls.workspace = true deno_url.workspace = true deno_web.workspace = true @@ -71,6 +72,7 @@ deno_flash.workspace = true deno_fs.workspace = true deno_http.workspace = true deno_io.workspace = true +deno_kv.workspace = true deno_napi.workspace = true deno_net.workspace = true deno_node.workspace = true diff --git a/runtime/build.rs b/runtime/build.rs index ec7c9642c59061..5d0ba0cc7c2a96 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -200,6 +200,24 @@ mod startup_snapshot { } } + impl deno_kv::sqlite::SqliteDbHandlerPermissions for Permissions { + fn check_read( + &mut self, + _path: &Path, + _api_name: &str, + ) -> Result<(), AnyError> { + unreachable!("snapshotting!") + } + + fn check_write( + &mut self, + _path: &Path, + _api_name: &str, + ) -> Result<(), AnyError> { + unreachable!("snapshotting!") + } + } + deno_core::extension!(runtime, deps = [ deno_webidl, @@ -289,6 +307,10 @@ mod startup_snapshot { None, ), deno_tls::deno_tls::init_ops_and_esm(), + deno_kv::deno_kv::init_ops_and_esm( + deno_kv::sqlite::SqliteDbHandler::::new(None), + false, // No --unstable + ), deno_napi::deno_napi::init_ops_and_esm::(), deno_http::deno_http::init_ops_and_esm(), deno_io::deno_io::init_ops_and_esm(Default::default()), diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 7c3a9226d56170..54480c9c728c6a 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -23,6 +23,7 @@ import * as signals from "ext:runtime/40_signals.js"; import * as tty from "ext:runtime/40_tty.js"; // TODO(bartlomieju): this is funky we have two `http` imports import * as httpRuntime from "ext:runtime/40_http.js"; +import * as kv from "ext:deno_kv/01_db.ts"; const denoNs = { metrics: core.metrics, @@ -169,6 +170,10 @@ const denoNsUnstable = { funlockSync: fs.funlockSync, upgradeHttp: http.upgradeHttp, upgradeHttpRaw: flash.upgradeHttpRaw, + openKv: kv.openKv, + Kv: kv.Kv, + KvU64: kv.KvU64, + KvListIterator: kv.KvListIterator, }; export { denoNs, denoNsUnstable }; diff --git a/runtime/lib.rs b/runtime/lib.rs index f55833831eff20..02d52cd5ad459c 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -11,6 +11,7 @@ pub use deno_flash; pub use deno_fs; pub use deno_http; pub use deno_io; +pub use deno_kv; pub use deno_napi; pub use deno_net; pub use deno_node; diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs index c985295a7c62b8..2093b08f9a48dd 100644 --- a/runtime/permissions/mod.rs +++ b/runtime/permissions/mod.rs @@ -1967,6 +1967,18 @@ impl deno_ffi::FfiPermissions for PermissionsContainer { } } +impl deno_kv::sqlite::SqliteDbHandlerPermissions for PermissionsContainer { + #[inline(always)] + fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError> { + self.0.lock().read.check(p, Some(api_name)) + } + + #[inline(always)] + fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError> { + self.0.lock().write.check(p, Some(api_name)) + } +} + fn unit_permission_from_flag_bool( flag: bool, name: &'static str, diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 0aa142da8eb511..ab06ab649c3fcf 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -34,6 +34,7 @@ use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; use deno_io::Stdio; +use deno_kv::sqlite::SqliteDbHandler; use deno_node::RequireNpmResolver; use deno_tls::rustls::RootCertStore; use deno_web::create_entangled_message_port; @@ -431,6 +432,10 @@ impl WebWorker { options.unsafely_ignore_certificate_errors.clone(), ), deno_tls::deno_tls::init_ops(), + deno_kv::deno_kv::init_ops( + SqliteDbHandler::::new(None), + unstable, + ), deno_napi::deno_napi::init_ops::(), deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Some(options.stdio)), diff --git a/runtime/worker.rs b/runtime/worker.rs index a24a22c9653898..42874f2094fc0f 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -30,6 +30,7 @@ use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; use deno_io::Stdio; +use deno_kv::sqlite::SqliteDbHandler; use deno_node::RequireNpmResolver; use deno_tls::rustls::RootCertStore; use deno_web::BlobStore; @@ -253,6 +254,12 @@ impl MainWorker { options.unsafely_ignore_certificate_errors.clone(), ), deno_tls::deno_tls::init_ops(), + deno_kv::deno_kv::init_ops( + SqliteDbHandler::::new( + options.origin_storage_dir.clone(), + ), + unstable, + ), deno_napi::deno_napi::init_ops::(), deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Some(options.stdio)), diff --git a/serde_v8/de.rs b/serde_v8/de.rs index 6708daa4d13fb5..5293a705d93353 100644 --- a/serde_v8/de.rs +++ b/serde_v8/de.rs @@ -13,6 +13,7 @@ use crate::magic::transl8::visit_magic; use crate::magic::transl8::FromV8; use crate::magic::transl8::MagicType; use crate::payload::ValueType; +use crate::AnyValue; use crate::BigInt; use crate::ByteString; use crate::DetachedBuffer; @@ -135,6 +136,7 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de> self.deserialize_f64(visitor) } } + ValueType::BigInt => Err(Error::UnsupportedType), ValueType::String => self.deserialize_string(visitor), ValueType::Array => self.deserialize_seq(visitor), ValueType::Object => self.deserialize_map(visitor), @@ -172,7 +174,6 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de> { self.deserialize_f64(visitor) } - fn deserialize_f64(self, visitor: V) -> Result where V: Visitor<'de>, @@ -355,6 +356,9 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de> magic::Value::MAGIC_NAME => { visit_magic(visitor, magic::Value::from_v8(self.scope, self.input)?) } + AnyValue::MAGIC_NAME => { + visit_magic(visitor, AnyValue::from_v8(self.scope, self.input)?) + } _ => { // Regular struct let obj = v8::Local::::try_from(self.input) diff --git a/serde_v8/error.rs b/serde_v8/error.rs index 72d3cc9259b111..e61385946339c6 100644 --- a/serde_v8/error.rs +++ b/serde_v8/error.rs @@ -28,6 +28,7 @@ pub enum Error { ExpectedUtf8, ExpectedLatin1, + UnsupportedType, LengthMismatch, } diff --git a/serde_v8/lib.rs b/serde_v8/lib.rs index 1d17914bbe6273..26d95c67aaf580 100644 --- a/serde_v8/lib.rs +++ b/serde_v8/lib.rs @@ -15,6 +15,7 @@ pub use de::Deserializer; pub use error::Error; pub use error::Result; pub use keys::KeyCache; +pub use magic::any_value::AnyValue; pub use magic::bigint::BigInt; pub use magic::buffer::ZeroCopyBuf; pub use magic::bytestring::ByteString; diff --git a/serde_v8/magic/any_value.rs b/serde_v8/magic/any_value.rs new file mode 100644 index 00000000000000..31a74cfde0cf8d --- /dev/null +++ b/serde_v8/magic/any_value.rs @@ -0,0 +1,66 @@ +use num_bigint::BigInt; + +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use super::buffer::ZeroCopyBuf; +use super::transl8::FromV8; +use super::transl8::ToV8; +use crate::magic::transl8::impl_magic; +use crate::Error; + +/// An untagged enum type that can be any of number, string, bool, bigint, or +/// buffer. +#[derive(Debug)] +pub enum AnyValue { + Buffer(ZeroCopyBuf), + String(String), + Number(f64), + BigInt(BigInt), + Bool(bool), +} + +impl_magic!(AnyValue); + +impl ToV8 for AnyValue { + fn to_v8<'a>( + &mut self, + scope: &mut v8::HandleScope<'a>, + ) -> Result, crate::Error> { + match self { + Self::Buffer(buf) => buf.to_v8(scope), + Self::String(s) => crate::to_v8(scope, s), + Self::Number(num) => crate::to_v8(scope, num), + Self::BigInt(bigint) => { + crate::to_v8(scope, crate::BigInt::from(bigint.clone())) + } + Self::Bool(b) => crate::to_v8(scope, b), + } + } +} + +impl FromV8 for AnyValue { + fn from_v8( + scope: &mut v8::HandleScope, + value: v8::Local, + ) -> Result { + if value.is_string() { + let string = crate::from_v8(scope, value)?; + Ok(AnyValue::String(string)) + } else if value.is_number() { + let string = crate::from_v8(scope, value)?; + Ok(AnyValue::Number(string)) + } else if value.is_big_int() { + let bigint = crate::BigInt::from_v8(scope, value)?; + Ok(AnyValue::BigInt(bigint.into())) + } else if value.is_array_buffer_view() { + let buf = ZeroCopyBuf::from_v8(scope, value)?; + Ok(AnyValue::Buffer(buf)) + } else if value.is_boolean() { + let string = crate::from_v8(scope, value)?; + Ok(AnyValue::Bool(string)) + } else { + Err(Error::Message( + "expected string, number, bigint, ArrayBufferView, boolean".into(), + )) + } + } +} diff --git a/serde_v8/magic/mod.rs b/serde_v8/magic/mod.rs index 9e5064867f1498..3e984527dd1aee 100644 --- a/serde_v8/magic/mod.rs +++ b/serde_v8/magic/mod.rs @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +pub mod any_value; pub mod bigint; pub mod buffer; pub mod bytestring; diff --git a/serde_v8/payload.rs b/serde_v8/payload.rs index 27f12014f21e18..b396ad01d7ec02 100644 --- a/serde_v8/payload.rs +++ b/serde_v8/payload.rs @@ -9,6 +9,7 @@ pub enum ValueType { Null, Bool, Number, + BigInt, String, Array, ArrayBuffer, @@ -26,6 +27,8 @@ impl ValueType { return Self::String; } else if v.is_array() { return Self::Array; + } else if v.is_big_int() { + return Self::BigInt; } else if v.is_array_buffer() { return Self::ArrayBuffer; } else if v.is_array_buffer_view() { diff --git a/serde_v8/ser.rs b/serde_v8/ser.rs index 6c10f3fb4c85dd..74ad9ec103366c 100644 --- a/serde_v8/ser.rs +++ b/serde_v8/ser.rs @@ -14,6 +14,7 @@ use crate::magic::transl8::opaque_recv; use crate::magic::transl8::MagicType; use crate::magic::transl8::ToV8; use crate::magic::transl8::MAGIC_FIELD; +use crate::AnyValue; use crate::BigInt; use crate::ByteString; use crate::DetachedBuffer; @@ -274,6 +275,7 @@ pub enum StructSerializers<'a, 'b, 'c> { ExternalPointer(MagicalSerializer<'a, 'b, 'c, magic::ExternalPointer>), Magic(MagicalSerializer<'a, 'b, 'c, magic::Value<'a>>), ZeroCopyBuf(MagicalSerializer<'a, 'b, 'c, ZeroCopyBuf>), + MagicAnyValue(MagicalSerializer<'a, 'b, 'c, AnyValue>), MagicDetached(MagicalSerializer<'a, 'b, 'c, DetachedBuffer>), MagicByteString(MagicalSerializer<'a, 'b, 'c, ByteString>), MagicU16String(MagicalSerializer<'a, 'b, 'c, U16String>), @@ -295,6 +297,7 @@ impl<'a, 'b, 'c> ser::SerializeStruct for StructSerializers<'a, 'b, 'c> { StructSerializers::ExternalPointer(s) => s.serialize_field(key, value), StructSerializers::Magic(s) => s.serialize_field(key, value), StructSerializers::ZeroCopyBuf(s) => s.serialize_field(key, value), + StructSerializers::MagicAnyValue(s) => s.serialize_field(key, value), StructSerializers::MagicDetached(s) => s.serialize_field(key, value), StructSerializers::MagicByteString(s) => s.serialize_field(key, value), StructSerializers::MagicU16String(s) => s.serialize_field(key, value), @@ -311,6 +314,7 @@ impl<'a, 'b, 'c> ser::SerializeStruct for StructSerializers<'a, 'b, 'c> { StructSerializers::ExternalPointer(s) => s.end(), StructSerializers::Magic(s) => s.end(), StructSerializers::ZeroCopyBuf(s) => s.end(), + StructSerializers::MagicAnyValue(s) => s.end(), StructSerializers::MagicDetached(s) => s.end(), StructSerializers::MagicByteString(s) => s.end(), StructSerializers::MagicU16String(s) => s.end(), @@ -588,6 +592,10 @@ impl<'a, 'b, 'c> ser::Serializer for Serializer<'a, 'b, 'c> { let m = MagicalSerializer::::new(self.scope); Ok(StructSerializers::ZeroCopyBuf(m)) } + AnyValue::MAGIC_NAME => { + let m = MagicalSerializer::::new(self.scope); + Ok(StructSerializers::MagicAnyValue(m)) + } DetachedBuffer::MAGIC_NAME => { let m = MagicalSerializer::::new(self.scope); Ok(StructSerializers::MagicDetached(m))