Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing SSLContext for the HTTPS X509 to uproot.dask fails because the context is not serializable #1248

Open
bockjoo opened this issue Jul 11, 2024 · 8 comments
Labels
bug (unverified) The problem described would be a bug, but needs to be triaged

Comments

@bockjoo
Copy link

bockjoo commented Jul 11, 2024

I am using Coffea 2024.6.1 and uproot 5.3.10

python -c "import uproot ; print(uproot.__version__)"
5.3.10

To reproduce the issue, this script can be used:

import os
import ssl
from coffea.nanoevents import NanoEventsFactory, BaseSchema

sslctx = ssl.create_default_context()
sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])

filename="https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"
events = NanoEventsFactory.from_root(
    {filename: "Events"},
    steps_per_file=100_000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
    uproot_options = {'ssl':sslctx},
).events()
p = MyProcessor()
out = p.process(events)
(computed,) = dask.compute(out)
print(computed)

The environmental variable X509_USER_PROXY should point to the voms-proxy file.
The input file is

https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root

but may be irrelevant as SSL Connection can not be established as the SSLContext is not serializable.
The stracetrace of the above script looks like:

2024-07-11 08:04:26,616 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1490a8411c00>
 0. from-uproot-40070443cb475fde1ee0133653b4c164
 1. hist-on-block-b00467fa12eb096c7b30b70057935672
 2. histreduce-agg-aed8002a664acbeb30a8fcf9eb35df08
 3. <dask-awkward.lib.core.ArgsKwargsPackedFunction ob-c407c0e517af63d39ccc70a2134bb7db
 4. numaxis0-d04f5a3766776402558a814b4dff0437
>.
Traceback (most recent call last):
  File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/bockjoo/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'SSLContext' object
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

AttributeError: Can't pickle local object 'unpack_collections.<locals>.repack'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:363, in serialize(x, serializers, on_error, context, iterate_collection)
    362 try:
--> 363     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    364     header["serializer"] = name

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:78, in pickle_dumps(x, context)
     76     writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
     79     x,
     80     buffer_callback=buffer_callback,
     81     protocol=context.get("pickle-protocol", None) if context else None,
     82 )
     83 header = {
     84     "serializer": "pickle",
     85     "writeable": tuple(writeable),
     86 }

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
     80     buffers.clear()
---> 81     result = cloudpickle.dumps(x, **dump_kwargs)
     82 except Exception:

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    601 try:
--> 602     return Pickler.dump(self, obj)
    603 except RuntimeError as e:

TypeError: cannot pickle 'SSLContext' object

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[3], line 20
     18 p = MyProcessor()
     19 out = p.process(events)
---> 20 (computed,) = dask.compute(out)
     21 print(computed)

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    659     postcomputes.append(x.__dask_postcompute__())
    661 with shorten_traceback():
--> 662     results = schedule(dsk, keys, **kwargs)
    664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/opt/cmsio2/anaconda3/lib/python3.10/site-packages/distributed/protocol/serialize.py:389, in serialize(x, serializers, on_error, context, iterate_collection)
    387     except Exception:
    388         raise TypeError(msg) from exc
--> 389     raise TypeError(msg, str_x) from exc
    390 else:  # pragma: nocover
    391     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")  

Thanks for looking into this issue!

@bockjoo bockjoo added the bug (unverified) The problem described would be a bug, but needs to be triaged label Jul 11, 2024
@nsmith- nsmith- changed the title Passing SSLContext for the HTTPS X509 to uproot fails because the context is not serializable Passing SSLContext for the HTTPS X509 to uproot.dask fails because the context is not serializable Jul 11, 2024
@bockjoo
Copy link
Author

bockjoo commented Jul 11, 2024

I apologize I pasted a wrong script.

 filename="https://cmsio2.rc.ufl.edu:1094/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"
 events = NanoEventsFactory.from_root(
    {filename: "Events"},
    steps_per_file=100_000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
    uproot_options = {'ssl':sslctx},
 ).events()
 print (dir(events))
 #

works. This is the script that reproduces the issue:

import os
import time
import ssl

import dask
from distributed import LocalCluster, Client

import hist
#import dask
import awkward as ak
import hist.dask as hda
import dask_awkward as dak

from coffea import processor

from coffea.dataset_tools import (
    apply_to_fileset,
    max_chunks,
    preprocess,
)

from coffea.nanoevents import NanoEventsFactory, BaseSchema
    

if __name__ == '__main__':
    sslctx = ssl.create_default_context()
    sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
    
    cluster = LocalCluster(n_workers=10, threads_per_worker=1)
    client = cluster.get_client()
    
    redirector = "https://cmsio2.rc.ufl.edu:1094"
    fileset = {
        "TTbarPowheg_Dilepton": {
            "files": {
                redirector "/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root": "Events", 
            },
        },
    }
    dataset_runnable, dataset_updated = preprocess(
        fileset,
        align_clusters=False,
        step_size=100_000,
        files_per_batch=1,
        skip_bad_files=True,
        save_form=False,
        uproot_options={'ssl': sslctx},
    )
    print ('dataset_runnable ',dataset_runnable)

@bockjoo
Copy link
Author

bockjoo commented Jul 14, 2024

It looks like CA certs that are set up by :

    sslctx = ssl.create_default_context()
    sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])

disappear when I checked like so:

print ( "size of CA certs ", len ( sslctx.get_ca_certs()) ) 
size of CA certs 0

After updating the main script readNanoEventsMyProcessor.py and hacking lib/python3.12/asyncio/base_events.py,
HTTPS X509 runs with the servers (cmsio3.rc.ufl.edu), but with the redirector ( cmsio2.rc.ufl.edu in the script), it only
succeeds once in a while with either

Traceback (most recent call last):
  File "/opt/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/readNanoEventsMyProcessor.py", line 126, in <module>
    (out,) = dask.compute(to_compute)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1345, in __call__
    result, _ = self._call_impl(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1298, in _call_impl
    return self.read_tree(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 985, in read_tree
    mapping = self.form_mapping_info.load_buffers(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/coffea/nanoevents/factory.py", line 157, in load_buffers
    arrays = tree.arrays(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 823, in arrays
    _ranges_or_baskets_to_arrays(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3105, in _ranges_or_baskets_to_arrays
    uproot.source.futures.delayed_raise(*obj)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/futures.py", line 38, in delayed_raise
    raise exception_value.with_traceback(traceback)
      ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3026, in chunk_to_basket
    basket = uproot.models.TBasket.Model_TBasket.read(
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/model.py", line 854, in read
    self.read_members(chunk, cursor, context, file)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/models/TBasket.py", line 227, in read_members
    ) = cursor.fields(chunk, _tbasket_format1, context)
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/cursor.py", line 201, in fields
    return format.unpack(chunk.get(start, stop, self, context))
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 446, in get
    self.wait(insist=stop)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 388, in wait
    self._raw_data = numpy.frombuffer(self._future.result(), dtype=self._dtype)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/coalesce.py", line 36, in result
    return self._parent.result(timeout=timeout)[self._s]
  ^^^^^^^^^^^^^^^^^
TypeError: 'ServerDisconnectedError' object is not subscriptable

or

Traceback (most recent call last):
  File "/opt/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/readNanoEventsMyProcessor.py", line 126, in <module>
    (out,) = dask.compute(to_compute)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1345, in __call__
    result, _ = self._call_impl(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 1298, in _call_impl
    return self.read_tree(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/_dask.py", line 985, in read_tree
    mapping = self.form_mapping_info.load_buffers(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/coffea/nanoevents/factory.py", line 157, in load_buffers
    arrays = tree.arrays(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 823, in arrays
    _ranges_or_baskets_to_arrays(
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3105, in _ranges_or_baskets_to_arrays
    uproot.source.futures.delayed_raise(*obj)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/futures.py", line 38, in delayed_raise
    raise exception_value.with_traceback(traceback)
      ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/behaviors/TBranch.py", line 3026, in chunk_to_basket
    basket = uproot.models.TBasket.Model_TBasket.read(
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/model.py", line 854, in read
    self.read_members(chunk, cursor, context, file)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/models/TBasket.py", line 227, in read_members
    ) = cursor.fields(chunk, _tbasket_format1, context)
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/cursor.py", line 201, in fields
    return format.unpack(chunk.get(start, stop, self, context))
  ^^^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 446, in get
    self.wait(insist=stop)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/chunk.py", line 388, in wait
    self._raw_data = numpy.frombuffer(self._future.result(), dtype=self._dtype)
^^^^^^^^^^^^^^^
  File "/opt/cms/services/anaconda3/lib/python3.12/site-packages/uproot/source/coalesce.py", line 36, in result
    return self._parent.result(timeout=timeout)[self._s]
  ^^^^^^^^^^^^^^^^^
TypeError: 'ClientOSError' object is not subscriptable

I am not sure how this information can help resolve the issue, though.

@bockjoo
Copy link
Author

bockjoo commented Jul 16, 2024

After decreasing the step size from 100,000 to 10,000 in the preprocess, the script (See below) is stable and there is no TypeError.
However, CA certs in SSLContext is not preserved and I had to add hacks to python3.12/site-packages/coffea/dataset_tools/preprocess.py and python3.12/asyncio/base_events.py.
Can something be done about this?

# readNanoEventsMyProcessor.py
import sys
import os
import time
import ssl

import dask
from distributed import LocalCluster, Client

import hist
import awkward as ak
import hist.dask as hda
import dask_awkward as dak

from coffea import processor

from coffea.dataset_tools import (
    apply_to_fileset,
    max_chunks,
    preprocess,
)

from coffea.nanoevents import NanoEventsFactory, BaseSchema
from coffea.nanoevents.methods import candidate

from collections import defaultdict
import numba

import pickle, copyreg
from multiprocessing.reduction import _rebuild_socket, _reduce_socket

def save_sslcontext(obj):
    return obj.__class__, (obj.protocol,)

class MyProcessor(processor.ProcessorABC):
    def __init__(self):
        pass

    def process(self, events):
        dataset = events.metadata['dataset']
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
                "isolation": events.Muon_pfRelIso03_all,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )
        # make sure they are sorted by transverse momentum
        muons = muons[ak.argsort(muons.pt, axis=1)]
        
        # impose some quality and minimum pt cuts on the muons
        muons = muons[ (muons.pt > 5) ]
        
        h_mass = (
            hda.Hist.new
            .StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.00000000002, 200., name="mass", label="$m_{\\mu\\mu}$ [GeV]")
            .Int64()
        )

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
        dimuon = muons[cut][:, 0]   muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)
        
        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
        dimuon = muons[cut][:, 0]   muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            dataset: {
                "entries": ak.num(events, axis=0),
                "mass": h_mass,
            }
        }

    def postprocess(self, accumulator):
        pass

if __name__ == '__main__':
    
    cluster = LocalCluster(n_workers=10, threads_per_worker=1)
    client = cluster.get_client()

    redirector = "/cmsuf/data"
    redirector = "root://cms-xrd-global.cern.ch//"
    #redirector = "https://cmsio3.rc.ufl.edu:1094/"
    redirector = "https://cmsio2.rc.ufl.edu:1094/"
    fileset = {
        "TTbarPowheg_Dilepton": {
            "files": {
                redirector "/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root": "Events",
                #redirector "/store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/44187D37-0301-3942-A6F7-C723E9F4813D.root" 
            },
        },
    }

    copyreg.pickle(ssl.SSLSocket, _reduce_socket, _rebuild_socket)
    copyreg.pickle(ssl.SSLContext, save_sslcontext)
    sslctx = ssl.create_default_context()
    sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])

    if True:
     dataset_runnable, dataset_updated = preprocess(
         fileset,
         align_clusters=False,
         #step_size=100_000,
         step_size=10_000,
         files_per_batch=1,
         skip_bad_files=True,
         save_form=False,
         uproot_options={'ssl': sslctx, 'timeout': 300},
     )
     #print ('dataset_runnable ',dataset_runnable)
     #sys.exit(0)
    #dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'https://cmsio2.rc.ufl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539], [101539, 203078], [203078, 304617], [304617, 406156], [406156, 507695], [507695, 609234], [609234, 710773], [710773, 812312], [812312, 913851], [913851, 1015390], [1015390, 1116929], [1116929, 1218468], [1218468, 1320000]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
    #dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'root://cms-xrd-global.cern.ch//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539], [101539, 203078], [203078, 304617], [304617, 406156], [406156, 507695], [507695, 609234], [609234, 710773], [710773, 812312], [812312, 913851], [913851, 1015390], [1015390, 1116929], [1116929, 1218468], [1218468, 1320000]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
    #dataset_runnable = {'TTbarPowheg_Dilepton': {'files': {'https://cmsio2.rc.ufl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root': {'object_path': 'Events', 'steps': [[0, 101539]], 'num_entries': 1320000, 'uuid': '33ff1236-eef3-11eb-b91b-31c010acbeef'}}, 'form': None, 'metadata': None}}
    print ('dataset_runnable ',dataset_runnable)

    tstart = time.time()
    to_compute = apply_to_fileset(
                #FancyDimuonProcessor(),
                #VLLProcessor(),
                MyProcessor(),
                max_chunks(dataset_runnable, 300),
                schemaclass=BaseSchema,
                uproot_options={'ssl': sslctx, 'timeout': 300 },
            )
    (out,) = dask.compute(to_compute)
    print(out)

    elapsed = time.time() - tstart
    print(elapsed)
     
    sys.exit(0)
#python3.12/site-packages/coffea/dataset_tools/preprocess.py 2024.6.1 version between line 67 and line 68
            sslctx = ssl.create_default_context()
            sslctx.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])
            uproot_options['ssl'] = sslctx
#python3.12/asyncio/base_events.py between line 1039 and line 1040
        import ssl as s
        
        if isinstance(ssl, s.SSLContext) :
            if len ( ssl.get_ca_certs() ) == 0:
                ssl = s.create_default_context()
                ssl.load_cert_chain(os.environ["X509_USER_PROXY"], os.environ["X509_USER_PROXY"])

@nsmith-
Copy link
Member

nsmith- commented Jul 16, 2024

TypeError: 'ServerDisconnectedError' object is not subscriptable

This might be the same as #1233

@nsmith-
Copy link
Member

nsmith- commented Jul 24, 2024

Here is a self-contained pickleable ssl context class:

import os
import ssl
import tempfile
from typing import Any


class PickleableSSLContext(ssl.SSLContext):
    @classmethod
    def create(cls, protocol=ssl.PROTOCOL_TLS_CLIENT):
        out = cls(protocol)
        out._set_default_state()
        return out

    def _set_default_state(self):
        # this should do the same setup as ssl.create_default_context()
        # for now it just loads default certificates
        if self.verify_mode != ssl.CERT_NONE:
            self.load_default_certs()

    def load_cert_chain(self, certfile, keyfile=None, password=None):
        with open(certfile, "rb") as fin:
            self.certdata = fin.read()
        self.keydata = None
        if keyfile is not None:
            with open(keyfile, "rb") as fin:
                self.keydata = fin.read()
        self.password = password
        self._load_cert_chain()

    def _load_cert_chain(self):
        with tempfile.TemporaryDirectory() as dirname:
            certpath = os.path.join(dirname, "cert.pem")
            with open(certpath, "wb") as fout:
                fout.write(self.certdata)
            keypath = None
            if self.keydata is not None:
                keypath = os.path.join(dirname, "key.pem")
                with open(keypath, "wb") as fout:
                    fout.write(self.keydata)
            super().load_cert_chain(certpath, keypath, self.password)

    def __getnewargs__(self):
        return (self.protocol,)

    def __getstate__(self) -> dict[str, Any]:
        return {
            "certdata": self.certdata,
            "keydata": self.keydata,
            "password": self.password,
        }

    def __setstate__(self, state: dict[str, Any]) -> None:
        self.__dict__.update(state)
        self._set_default_state()
        self._load_cert_chain()

It's use is as follows:

import os
import pickle
import uproot
from pickleablessl import PickleableSSLContext


sslctx = PickleableSSLContext.create()
sslctx.load_cert_chain(os.environ['X509_USER_PROXY'])
sslctx = pickle.loads(pickle.dumps(sslctx))

url = "https://xrootd-local.unl.edu:1094//store/mc/RunIISummer20UL18NanoAODv9/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/130000/3094251D-BAB4-6446-86F1-2F9C4D8F339D.root"

with uproot.open(url, ssl=sslctx) as file:
    print(file["Events"].num_entries)

Now the question is, where to put this class?

@bockjoo
Copy link
Author

bockjoo commented Jul 24, 2024

The picklable ssl context reaches up to get_steps in preprocessor.py,
but does not arrive at async def _file_info and async def _cat_file in fsspec/implementations/http.py :

  File "/home/bockjoo/opt/cmsio2/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/lib/python3.12/site-packages/fsspec/implementations/http.py", line 877, in _file_info
    print ("DEBUG bockjoo hhtp.py sslctx ", kwargs['ssl'], " size of CA certs ", len(kwargs['ssl'].get_ca_certs()))
                                            ~~~~~~^^^^^^^
KeyError: 'ssl'
  File "/home/bockjoo/opt/cmsio2/cms/services/T2/ops/Work/AAA/vll-analysis.Coffea2024.6.1/lib/python3.12/site-packages/fsspec/implementations/http.py", line 227, in _cat_file
    print ("DEBUG bockjoo hhtp.py sslctx ", kw['ssl'], " size of CA certs ", len(kw['ssl'].get_ca_certs()))
                                            ~~~~^^^^^
KeyError: 'ssl'

If I mannualy add kw['ssl'] and kwargs['ssl'] , there is no issue.
So, I am wondering if there is some disconnect that does not pass uproot_options?

@bockjoo
Copy link
Author

bockjoo commented Jul 24, 2024

My apology. It wasn't working inside http.py (called by apply_to_fileset ) by my mistake.
I forgot to pass ssl through uproot_options. It works now.
I will validate this with SLURMCluster as well. I guess it should work there as well, though.

@bockjoo
Copy link
Author

bockjoo commented Jul 24, 2024

It worked with the SLURMCluster as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug (unverified) The problem described would be a bug, but needs to be triaged
Projects
None yet
Development

No branches or pull requests

2 participants