Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complex arrow columns cause an error during the reducing stage #351

Open
2 of 3 tasks
hombit opened this issue Jul 24, 2024 · 1 comment
Open
2 of 3 tasks

Complex arrow columns cause an error during the reducing stage #351

hombit opened this issue Jul 24, 2024 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@hombit
Copy link
Contributor

hombit commented Jul 24, 2024

Bug report

I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to pa.list_(pa.bool_(), 274).

On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards(), where we convert merged pyarrow table to pandas.

I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:

import pandas as pd
import pyarrow as pa

a = pa.array([1, 2, 3])
b = pa.array([[True, False, True]] * 3, type=pa.list_(pa.bool_(), 3))
table = pa.Table.from_arrays([a, b], names=['a', 'b'])
df = table.to_pandas(types_mapper=pd.ArrowDtype)

Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify reduce_pixel_shards() to use this argument, but it produces the same error.

Details are under the spoilers:

Reader implementation
import numpy as np
import pandas as pd
import pyarrow as pa
from astropy.table import Table
from hipscat_import.catalog.file_readers import FitsReader


class HSCFitsReader(FitsReader):
    """FITS reader that converts ra and dec from radians to degrees"""
    def __init__(self, *args, ra_column, dec_column, **kwargs):
        super().__init__(*args, **kwargs)
        self.ra_column = ra_column
        self.dec_column = dec_column

    def read(self, input_file, read_columns=None):
        # Mostly copy-pasted from the hipscat-import implementation
        self.regular_file_exists(input_file, **self.kwargs)
        table = Table.read(input_file, memmap=True, **self.kwargs)
        if read_columns:
            table.keep_columns(read_columns)
        elif self.column_names:
            table.keep_columns(self.column_names)
        elif self.skip_column_names:
            table.remove_columns(self.skip_column_names)

        total_rows = len(table)
        read_rows = 0

        while read_rows < total_rows:
            chunk = table[read_rows : read_rows + self.chunksize]
            df_chunk = self.astropy_table_to_df(chunk)
            yield df_chunk

            read_rows += self.chunksize

    def astropy_table_to_df(self, table):
        """Data convertion, spoils input table"""
        # Flags wouldn't convert to pandas due to astropy's implementation limitations
        # So we convert them manually and delete from the table
        if 'flags' in table.columns:
            flags = table['flags']
            flags_length = flags.shape[1]
            flags_pyarrow = pa.array(flags.tolist(), type=pa.list_(pa.bool_(), flags_length))

            table.remove_column('flags')

        df = table.to_pandas()

        # Convert coords from radians to degrees
        df[self.ra_column] = np.degrees(df[self.ra_column])
        df[self.dec_column] = np.degrees(df[self.dec_column])

        # Assign flags
        if 'flags' in table.columns:
            df['flags'] = pd.Series(flags_pyarrow, dtype=pd.ArrowDtype(flags_pyarrow.type))
            
        return df
Traceback
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5284, in pyarrow.lib.type_for_alias()
   5283 try:
-> 5284     alias = _type_aliases[name]
   5285 except KeyError:

KeyError: 'fixed_size_list<item: bool>[274]'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2251, in construct_from_string()
   2250 try:
-> 2251     pa_dtype = pa.type_for_alias(base_type)
   2252 except ValueError as err:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5286, in pyarrow.lib.type_for_alias()
   5285 except KeyError:
-> 5286     raise ValueError('No type alias for {0}'.format(name))
   5287 

ValueError: No type alias for fixed_size_list<item: bool>[274]

The above exception was the direct cause of the following exception:

NotImplementedError                       Traceback (most recent call last)
Cell In[3], line 29
     25 with Client(n_workers=64) as client:
     26 # import dask
     27 # with dask.config.set(scheduler='single-threaded'), Client(processes=False, threads_per_worker=1, n_workers=1) as client:
     28     display(client)
---> 29     pipeline_with_client(args, client)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:60, in pipeline_with_client(args, client)
     58 if args.completion_email_address:
     59     _send_failure_email(args, exception)
---> 60 raise exception

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:44, in pipeline_with_client(args, client)
     41     raise ValueError("args is required and should be subclass of RuntimeArguments")
     43 if isinstance(args, ImportArguments):
---> 44     catalog_runner.run(args, client)
     45 elif isinstance(args, IndexArguments):
     46     index_runner.run(args, client)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/run_import.py:111, in run(args, client)
     85         for (
     86             destination_pixel,
     87             source_pixel_count,
     88             destination_pixel_key,
     89         ) in args.resume_plan.get_reduce_items():
     90             futures.append(
     91                 client.submit(
     92                     mr.reduce_pixel_shards,
   (...)
    108                 )
    109             )
--> 111         args.resume_plan.wait_for_reducing(futures)
    113 # All done - write out the metadata
    114 with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/resume_plan.py:298, in ResumePlan.wait_for_reducing(self, futures)
    296 def wait_for_reducing(self, futures):
    297     """Wait for reducing futures to complete."""
--> 298     self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
    299     remaining_reduce_items = self.get_reduce_items()
    300     if len(remaining_reduce_items) > 0:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline_resume_plan.py:143, in PipelineResumePlan.wait_for_futures(self, futures, stage_name, fail_fast)
    141         some_error = True
    142         if fail_fast:
--> 143             raise future.exception()
    145 if some_error:
    146     raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/distributed/worker.py:3000, in apply_function_simple()
   2995 with (
   2996     context_meter.meter("thread-noncpu", func=time) as m,
   2997     context_meter.meter("thread-cpu", func=thread_time),
   2998 ):
   2999     try:
-> 3000         result = function(*args, **kwargs)
   3001     except (SystemExit, KeyboardInterrupt):
   3002         # Special-case these, just like asyncio does all over the place. They will
   3003         # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
   (...)
   3006         # Any other `BaseException` types would ultimately be ignored by asyncio if
   3007         # raised here, after messing up the worker state machine along their way.
   3008         raise

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:331, in reduce_pixel_shards()
    326 except Exception as exception:  # pylint: disable=broad-exception-caught
    327     print_task_failure(
    328         f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
    329         exception,
    330     )
--> 331     raise exception

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
    280 if rows_written != destination_pixel_size:
    281     raise ValueError(
    282         "Unexpected number of objects at pixel "
    283         f"({healpix_pixel})."
    284         f" Expected {destination_pixel_size}, wrote {rows_written}"
    285     )
--> 287 dataframe = merged_table.to_pandas()
    288 if sort_columns:
    289     dataframe = dataframe.sort_values(sort_columns.split(","))

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/array.pxi:885, in pyarrow.lib._PandasConvertible.to_pandas()
    883     coerce_temporal_nanoseconds=coerce_temporal_nanoseconds
    884 )
--> 885 return self._to_pandas(options, categories=categories,
    886                        ignore_metadata=ignore_metadata,
    887                        types_mapper=types_mapper)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/table.pxi:5002, in pyarrow.lib.Table._to_pandas()
   5000            types_mapper=None):
   5001 from pyarrow.pandas_compat import table_to_dataframe
-> 5002 df = table_to_dataframe(
   5003     options, self, categories,
   5004     ignore_metadata=ignore_metadata,

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:774, in table_to_dataframe()
    771     table = _add_any_metadata(table, pandas_metadata)
    772     table, index = _reconstruct_index(table, index_descriptors,
    773                                       all_columns, types_mapper)
--> 774     ext_columns_dtypes = _get_extension_dtypes(
    775         table, all_columns, types_mapper)
    776 else:
    777     index = _pandas_api.pd.RangeIndex(table.num_rows)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:853, in _get_extension_dtypes()
    848 dtype = col_meta['numpy_type']
    850 if dtype not in _pandas_supported_numpy_types:
    851     # pandas_dtype is expensive, so avoid doing this for types
    852     # that are certainly numpy dtypes
--> 853     pandas_dtype = _pandas_api.pandas_dtype(dtype)
    854     if isinstance(pandas_dtype, _pandas_api.extension_dtype):
    855         if hasattr(pandas_dtype, "__from_arrow__"):

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:148, in pyarrow.lib._PandasAPIShim.pandas_dtype()
    146         return self._pd.lib.infer_dtype(obj)
    147 
--> 148 cpdef pandas_dtype(self, dtype):
    149     self._check_import()
    150     try:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:151, in pyarrow.lib._PandasAPIShim.pandas_dtype()
    149 self._check_import()
    150 try:
--> 151     return self._types_api.pandas_dtype(dtype)
    152 except AttributeError:
    153     return None

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/common.py:1624, in pandas_dtype()
   1621     return dtype
   1623 # registered extension types
-> 1624 result = registry.find(dtype)
   1625 if result is not None:
   1626     if isinstance(result, type):
   1627         # GH 31356, GH 54592

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/base.py:576, in find()
    574 for dtype_type in self.dtypes:
    575     try:
--> 576         return dtype_type.construct_from_string(dtype)
    577     except TypeError:
    578         pass

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2262, in construct_from_string()
   2258         except (NotImplementedError, ValueError):
   2259             # Fall through to raise with nice exception message below
   2260             pass
-> 2262         raise NotImplementedError(
   2263             "Passing pyarrow type specific parameters "
   2264             f"({has_parameters.group()}) in the string is not supported. "
   2265             "Please construct an ArrowDtype object with a pyarrow_dtype "
   2266             "instance with specific parameters."
   2267         ) from err
   2268     raise TypeError(f"'{base_type}' is not a valid pyarrow data type.") from err
   2269 return cls(pa_dtype)

NotImplementedError: Passing pyarrow type specific parameters ([274]) in the string is not supported. Please construct an ArrowDtype object with a pyarrow_dtype instance with specific parameters.

Before submitting
Please check the following:

  • I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
  • I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead.
  • If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.
@hombit hombit added the bug Something isn't working label Jul 24, 2024
@delucchi-cmu delucchi-cmu self-assigned this Jul 26, 2024
@delucchi-cmu
Copy link
Contributor

Filed pandas-dev/pandas#59738

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Todo
Development

No branches or pull requests

2 participants