-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GraphQL: updating arrays with delta updates #6307
Comments
Sadly this is impacting the Info view (cylc/cylc-ui#1886) which requires both I'm tempted to lead towards solution (1) at the moment. It's quick and dirty, but it should be enough to get us moving again. We could chalk up solution (3) as a possible efficiency enhancement in the future. Whatever solution we pick, we need to apply it globally rather than waiting for bug reports and changing the fields one at a time. |
I've tested on 8.3.x and master, and in the networks panel of devtools I see all the prerequisites listed in every update, even if only one of them changed |
Strange! I've had a go, and it looks like all prereqs are included with each update that contains any prereqs (correct). I must have jumped the gun when writing up this issue, however, the same is not true for outputs, where we only get the updated outputs pushed down the wire. E.G. Using this example workflow: [scheduling]
[[graph]]
R1 = a => b
[runtime]
[[a]]
script = """
sleep 5
cylc message -- xxx
sleep 5
"""
[[[outputs]]]
x = xxx
[[b]] Then opening the info view on 1/a and inspecting the messages sent down the websocket (via browser devtools), I got the following: Complete message chain as a list:[
{
"id": "9",
"type": "start",
"payload": {
"variables": {
"workflowId": "~me/tmp.IgNNIrr6jj/run1",
"taskID": "1/a"
},
"extensions": {},
"operationName": "InfoViewSubscription",
"query": "subscription InfoViewSubscription($workflowId: ID, $taskID: ID) {\n deltas(workflows: [$workflowId]) {\n added {\n ...AddedDelta\n __typename\n }\n updated(stripNull: true) {\n ...UpdatedDelta\n __typename\n }\n __typename\n }\n}\n\nfragment AddedDelta on Added {\n taskProxies(ids: [$taskID]) {\n ...TaskProxyData\n __typename\n }\n __typename\n}\n\nfragment UpdatedDelta on Updated {\n taskProxies(ids: [$taskID]) {\n ...TaskProxyData\n __typename\n }\n __typename\n}\n\nfragment TaskProxyData on TaskProxy {\n id\n state\n isHeld\n isQueued\n isRunahead\n task {\n ...TaskDefinitionData\n __typename\n }\n jobs {\n ...JobData\n __typename\n }\n prerequisites {\n satisfied\n expression\n conditions {\n taskId\n reqState\n exprAlias\n satisfied\n __typename\n }\n __typename\n }\n outputs {\n label\n satisfied\n __typename\n }\n __typename\n}\n\nfragment TaskDefinitionData on Task {\n meanElapsedTime\n meta {\n title\n description\n URL\n userDefined\n __typename\n }\n __typename\n}\n\nfragment JobData on Job {\n id\n jobId\n startedTime\n state\n __typename\n}"
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"state": "waiting",
"isHeld": false,
"isQueued": true,
"isRunahead": false,
"task": {
"meanElapsedTime": 0.0,
"meta": {
"title": "",
"description": "",
"URL": "",
"userDefined": {},
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"jobs": [],
"prerequisites": [],
"outputs": [
{
"label": "expired",
"satisfied": false,
"__typename": "Output"
},
{
"label": "submitted",
"satisfied": false,
"__typename": "Output"
},
{
"label": "submit-failed",
"satisfied": false,
"__typename": "Output"
},
{
"label": "started",
"satisfied": false,
"__typename": "Output"
},
{
"label": "succeeded",
"satisfied": false,
"__typename": "Output"
},
{
"label": "failed",
"satisfied": false,
"__typename": "Output"
},
{
"label": "x",
"satisfied": false,
"__typename": "Output"
}
],
"__typename": "TaskProxy"
}
],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"state": "preparing",
"isQueued": false,
"task": {
"meta": {
"title": "",
"description": "",
"URL": "",
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"__typename": "TaskProxy"
}
],
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"state": "submitted",
"isQueued": false,
"task": {
"meta": {
"title": "",
"description": "",
"URL": "",
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"jobs": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
"jobId": "123116",
"state": "submitted",
"__typename": "Job"
}
],
"outputs": [
{
"label": "submitted",
"satisfied": true,
"__typename": "Output"
}
],
"__typename": "TaskProxy"
}
],
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"state": "running",
"task": {
"meta": {
"title": "",
"description": "",
"URL": "",
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"jobs": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
"jobId": "123116",
"startedTime": "2024-10-07T16:21:46 01:00",
"state": "running",
"__typename": "Job"
}
],
"outputs": [
{
"label": "started",
"satisfied": true,
"__typename": "Output"
}
],
"__typename": "TaskProxy"
}
],
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"task": {
"meta": {
"title": "",
"description": "",
"URL": "",
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"jobs": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
"jobId": "123116",
"startedTime": "2024-10-07T16:21:46 01:00",
"state": "running",
"__typename": "Job"
}
],
"outputs": [
{
"label": "x",
"satisfied": true,
"__typename": "Output"
}
],
"__typename": "TaskProxy"
}
],
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"taskProxies": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a",
"state": "succeeded",
"task": {
"meanElapsedTime": 12.0,
"meta": {
"title": "",
"description": "",
"URL": "",
"__typename": "NodeMeta"
},
"__typename": "Task"
},
"jobs": [
{
"id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
"jobId": "123116",
"startedTime": "2024-10-07T16:21:46 01:00",
"state": "succeeded",
"__typename": "Job"
}
],
"outputs": [
{
"label": "succeeded",
"satisfied": true,
"__typename": "Output"
}
],
"__typename": "TaskProxy"
}
],
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
},
{
"id": "9",
"type": "data",
"payload": {
"data": {
"deltas": {
"added": {
"taskProxies": [],
"__typename": "Added"
},
"updated": {
"__typename": "Updated"
},
"__typename": "Deltas"
}
}
}
}
]
Summary:
|
With this diff, I get all of the outputs, every time any one of them is updated: diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index 01b43bfe9..fda578e6c 100644
--- a/cylc/flow/data_store_mgr.py
b/cylc/flow/data_store_mgr.py
@@ -2428,11 2428,25 @@ class DataStoreMgr:
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
- output = tp_delta.outputs[label]
- output.label = label
- output.message = message
- output.satisfied = outputs.is_message_complete(message)
- output.time = update_time
for _label in tproxy.outputs:
output = tp_delta.outputs[_label]
# set the new output
if _label == label:
output.label = label
output.message = message
output.satisfied = outputs.is_message_complete(message)
# ensure all outputs are included in the delta
else:
_output = tproxy.outputs[_label]
output.label = _output.label
output.message = _output.message
output.satisfied = _output.satisfied
output.time = update_time
self.updates_pending = True
def delta_task_outputs(self, itask: TaskProxy) -> None: So this might be enough for us to press ahead with the metadata view, although there are many other lists out there in the schema, and any one of them could go wrong in this way. It's a shame as this is a negative change really. The data store code is doing the right thing at both ends, it's just that we don't have any protocol for incrementally updating lists. Q) So why are prerequisites behaving differently to outputs? A) Because the prerequsite update is done in a lazier fashion, every time any of the prereqs are updated, we re-generate the all of them from the Cylc cylc-flow/cylc/flow/data_store_mgr.py Lines 2464 to 2490 in da8772c
|
* We don't have a protocol for updating arbitrary lists incrementally. * See cylc#6307 * This is a temporary patch to disable incremental output updates until we have a suitable protocol for doing so.
* We don't have a protocol for updating arbitrary lists incrementally. * See cylc#6307 * This is a temporary patch to disable incremental output updates until we have a suitable protocol for doing so.
* We don't have a protocol for updating arbitrary lists incrementally. * See #6307 * This is a temporary patch to disable incremental output updates until we have a suitable protocol for doing so.
Data stores maintained by GraphQL updates (i.e. cylc-ui) are not presently able to synchronise some array fields.
I think the following fields are affected (all
graphene.List
type):This has been discussed before (can't find the issue dammit). We have previously worked around the issue by repeating all elements of updated arrays in the returned deltas.
Example: Outputs
This subscription tracks task outputs:
The corresponding
added
subscription, would initially yield a list of all outputs, i.e:Whenever there is a change, e.g. when the first job submits, we will receive only the element that was updated e.g:
And so on:
This makes sense, we are pushing only the minimal information down the wire.
But how is the remote data store supposed to work with these deltas?
The remote data store needs to be able to associate the updated element back to the corresponding element in the store in order to update it.
In this case it's easy to do by inspection, the
label
is the unique key representing the object. The remote data store should look through the outputs until it finds one with the same label, and update thesatisfied
field to match the update. Although this would require the remote data store to know thatlabel
is the unique key foroutputs
arrays which makes updates complicated.Example: Prerequisites
However, it's not always so easy. Take the prerequisites field for example:
There is no unique key here. The
expression
field (which has values likec0 & c1
orc0 | c1
) is not unique. The conditions array isn't unique either since the same conditions could be arranged into different prerequisite expressions.The only way we can get a unique key is to form a composite of the following fields:
Problem: Cylc UI
At present, the Cylc UI data store uses
Object.assign(data_store, update)
to update the data. This will overwrite each key indata_store
with the value inupdate
.The result is that the full list of task outputs that we received in the
added
delta, will be completely overwritten by the single output we received in theupdated
delta.Solution 1: Repeat all array elements in delta updates
One option would be to just repeat all elements in the update.
This isn't ideal efficiency wise, but it is a quick & dirty fix that would allow us to overcome this hurdle quickly without getting too involved in protocols and update functions.
I.E, rather than returning only the updated item in a delta update:
We would return the whole array:
Whenever there is a change, e.g. when the first job submits, we will receive only the element that was updated e.g:
Solution 2: Replace arrays with objects
We could replace arrays with
Object
s where the array index is used as the key.For example, rather than doing this:
We would do this:
The downside to this is that it would create a backwards incompatible change. The only alternative to breaking compatibility would be to create a duplicate field for the new object.
Another downside is that there would be no obvious mechanism for removing elements. We can add them, we can update them, but the only way to remove them is to prune the containing object and (re)send and added delta (essentially destroy and rebuild that part of the store).
Solution 3: Develop an array update protocol
The most sophisticated solution would be to enable to UI to update these arrays intelligently by providing a standard "key" for the update algorithm to use for mapping the update back to the original object in the data store.
E.G. we could standardise on using a field called
id
:So that updates can be easily mapped onto data store elements:
This protocol would need to include a mechanism for "pruning" an element, e.g:
The text was updated successfully, but these errors were encountered: