Skip to content

Load Data

pyshbundle contains scripts to seemlessly extract and load spherical harmonics coefficients from various data centres. See Tutorials

extract_C10_11_replcmnt_coeff(data_tn13, source, epoch_begin, epoch_end=None)

Extract degree-1 replacement coefficients C10 and C11 from TN-13 data for the given epoch.

Parameters:

Name Type Description Default
data_tn13 ndarray

TN-13 replacement coefficient matrix.

required
source str

Data centre identifier ('jpl', 'csr', or 'itsg').

required
epoch_begin date | str

Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).

required
epoch_end optional

End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

None

Returns:

Name Type Description
tuple tuple[ndarray, ndarray]

A tuple containing: - C10 (numpy.ndarray): Replacement C10 coefficient row. - C11 (numpy.ndarray): Replacement C11 coefficient row.

Raises:

Type Description
ValueError

If source is not 'jpl', 'csr', or 'itsg'.

Source code in pyshbundle/io.py
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
def extract_C10_11_replcmnt_coeff(data_tn13: np.ndarray, source: str, epoch_begin: date | str, epoch_end: date | None = None) -> tuple[np.ndarray, np.ndarray]:
    """Extract degree-1 replacement coefficients C10 and C11 from TN-13 data for the given epoch.

    Args:
        data_tn13 (numpy.ndarray): TN-13 replacement coefficient matrix.
        source (str): Data centre identifier ('jpl', 'csr', or 'itsg').
        epoch_begin: Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).
        epoch_end (optional): End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

    Returns:
        tuple: A tuple containing:
            - C10 (numpy.ndarray): Replacement C10 coefficient row.
            - C11 (numpy.ndarray): Replacement C11 coefficient row.

    Raises:
        ValueError: If source is not 'jpl', 'csr', or 'itsg'.
    """
    # match the date
    file_type = "tn-13"

    if epoch_end is not None:
        end_epoch = epoch_end
    else:
        end_epoch = None

    if source == "jpl" or source == "csr":
        # find the necessary indxes
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn13, file_type, epoch_begin, end_epoch
        )
        # extract the coeff from tn13 for required dates

        C10 = data_tn13[replcmnt_idxs[0], :-2]
        # extract the coeff from tn13 for required dates

        C11 = data_tn13[replcmnt_idxs[1], :-2]

    elif source == "itsg":
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn13,
            file_type,
            f"{epoch_begin.year}-{str(epoch_begin.month).zfill(2)}",  # type: ignore[union-attr]
            end_epoch,
        )
        # extract the coeff from tn13 for required dates

        C10 = data_tn13[replcmnt_idxs[0], :-2]
        # extract the coeff from tn13 for required dates

        C11 = data_tn13[replcmnt_idxs[1], :-2]

    else:
        raise ValueError(
            "Invalid Source. The sources recoginized are CSR, ITSG and JPL"
        )

    return C10, C11

extract_C20_replcmnt_coeff(data_tn14, source, epoch_begin, epoch_end=None)

Extract the C20 replacement coefficient from TN-14 data for the given epoch.

Parameters:

Name Type Description Default
data_tn14 ndarray

TN-14 replacement coefficient matrix.

required
source str

Data centre identifier ('jpl', 'csr', or 'itsg').

required
epoch_begin date | str

Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).

required
epoch_end optional

End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

None

Returns:

Type Description
ndarray

numpy.ndarray: Array of shape (6,) containing [l, m, Clm, Slm, Clm_sdev, Slm_sdev] for C20.

Source code in pyshbundle/io.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
def extract_C20_replcmnt_coeff(data_tn14: np.ndarray, source: str, epoch_begin: date | str, epoch_end: date | None = None) -> np.ndarray:
    """Extract the C20 replacement coefficient from TN-14 data for the given epoch.

    Args:
        data_tn14 (numpy.ndarray): TN-14 replacement coefficient matrix.
        source (str): Data centre identifier ('jpl', 'csr', or 'itsg').
        epoch_begin: Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).
        epoch_end (optional): End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

    Returns:
        numpy.ndarray: Array of shape (6,) containing [l, m, Clm, Slm, Clm_sdev, Slm_sdev] for C20.
    """
    # For JPL
    # generating a CLM array for C20 and C30
    # NOTE: Zonal coeff. does not have Slm - its taken as 0
    if source == "jpl" or source == "csr":
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn14, "tn-14", epoch_begin, epoch_end
        )

        C20 = np.array(
            [
                2,
                0,
                data_tn14[replcmnt_idxs[0], 0:2][0],
                data_tn14[replcmnt_idxs[0], 0:2][1],
                0,
                0,
            ]
        )

    elif source == "itsg":
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn14, "tn-14", epoch_begin, epoch_end=None
        )

        C20 = np.array(
            [
                2,
                0,
                data_tn14[replcmnt_idxs[0], 0:2][0],
                data_tn14[replcmnt_idxs[0], 0:2][1],
                0,
                0,
            ]
        )

    return C20

extract_C30_replcmnt_coeff(data_tn14, source, epoch_begin, epoch_end=None)

Extract the C30 replacement coefficient from TN-14 data for the given epoch.

Parameters:

Name Type Description Default
data_tn14 ndarray

TN-14 replacement coefficient matrix.

required
source str

Data centre identifier ('jpl', 'csr', or 'itsg').

required
epoch_begin date | str

Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).

required
epoch_end optional

End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

None

Returns:

Type Description
ndarray

numpy.ndarray: Array of shape (6,) containing [l, m, Clm, Slm, Clm_sdev, Slm_sdev] for C30, with NaN values replaced by zero.

Source code in pyshbundle/io.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
def extract_C30_replcmnt_coeff(data_tn14: np.ndarray, source: str, epoch_begin: date | str, epoch_end: date | None = None) -> np.ndarray:
    """Extract the C30 replacement coefficient from TN-14 data for the given epoch.

    Args:
        data_tn14 (numpy.ndarray): TN-14 replacement coefficient matrix.
        source (str): Data centre identifier ('jpl', 'csr', or 'itsg').
        epoch_begin: Start date of the GRACE epoch (date object or YYYY-MM string for ITSG).
        epoch_end (optional): End date of the GRACE epoch. Required for JPL/CSR. Defaults to None.

    Returns:
        numpy.ndarray: Array of shape (6,) containing [l, m, Clm, Slm, Clm_sdev, Slm_sdev] for C30,
            with NaN values replaced by zero.
    """
    if source == "jpl" or source == "csr":
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn14, "tn-14", epoch_begin, epoch_end
        )

        # think about handling nan values while replacing and its impact
        # handle the nan issue

        C30 = np.array(
            [
                3,
                0,
                data_tn14[replcmnt_idxs[0], 2:4][0],
                data_tn14[replcmnt_idxs[0], 2:4][1],
                0,
                0,
            ]
        )

        # replace nan values with zeros
        C30[np.isnan(C30)] = 0

    elif source == "itsg":
        replcmnt_idxs = find_date_in_replacemnt_file(
            data_tn14, "tn-14", epoch_begin, epoch_end=None
        )

        C30 = np.array(
            [
                3,
                0,
                data_tn14[replcmnt_idxs[0], 2:4][0],
                data_tn14[replcmnt_idxs[0], 2:4][1],
                0,
                0,
            ]
        )

        # replace nan values with zeros
        C30[np.isnan(C30)] = 0

    return C30

extract_SH_data(file_path, source)

Extracts the spherical harmonic coefficients from all the given files.

Currently supports JPL, CSR, and ITSG data sources ONLY. Extracts the spherical harmonic coefficients from the given file and returns them in a dictionary. Uses the degree and order of a coefficient as the key and the coefficient values as the value.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file.

required
source str

Source of the data (JPL, CSR, or ITSG).

required

Returns:

Type Description
dict

Dictionary containing the coefficients and time coverage start and end dates.

Source code in pyshbundle/io.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def extract_SH_data(file_path: str, source: str) -> dict:
    """Extracts the spherical harmonic coefficients from all the given files.

    Currently supports JPL, CSR, and ITSG data sources ONLY. Extracts the spherical harmonic
    coefficients from the given file and returns them in a dictionary. Uses the degree and
    order of a coefficient as the key and the coefficient values as the value.

    Args:
        file_path (str): Absolute path to the file.
        source (str): Source of the data (JPL, CSR, or ITSG).

    Returns:
        (dict): Dictionary containing the coefficients and time coverage start and end dates.
    """
    # Initialize an empty dictionary to store the coefficients and dates
    data: dict[str, Any] = {"coefficients": {}, "time_coverage_start": None, "time_coverage_end": None}

    # Regular expression pattern to match the lines with coefficients
    coeff_pattern_csr = re.compile(
        r"^GRCOF2\s+(\d+)\s+(\d+)\s+([-+]?\d*\.\d+E[-+]?\d+)\s+([-+]?\d*\.\d+E[-+]?\d+)\s+([-+]?\d*\.\d+E[-+]?\d+)\s+([-+]?\d*\.\d+E[-+]?\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+"
    )
    coeff_pattern_jpl = re.compile(
        r"^GRCOF2\s+(\d+)\s+(\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)"
    )
    coeff_pattern_itsg = re.compile(
        r"^gfc\s+(\d+)\s+(\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)$"
    )

    if source == "jpl":
        coeff_pattern = coeff_pattern_jpl
    elif source == "csr":
        coeff_pattern = coeff_pattern_csr
    elif source == "itsg":
        coeff_pattern = coeff_pattern_itsg
    else:
        raise ValueError("Invalid source, pyshbundle only supports JPL, CSR, and ITSG")

    # Regular expression patterns to match the time coverage start and end lines
    start_pattern = re.compile(r"time_coverage_start\s*:\s*([\d\-T:.]+)")
    end_pattern = re.compile(r"time_coverage_end\s*:\s*([\d\-T:.]+)")
    timeindex_itsg = re.compile(r"^modelname\s+(.+)$")

    # Open and read the gzipped file to extract the time coverage start and end dates
    if source == "itsg":
        with open(file_path, "rt") as file:
            for line in file:
                # Strip any leading/trailing whitespace characters
                line = line.strip()

                # Search for time coverage start
                start_match = timeindex_itsg.search(line)
                if start_match:
                    data["time_coverage_start"] = start_match.group(1)

                # Break the loop if both dates are found
                if data["time_coverage_start"]:
                    break
            # File is automatically closed here due to the 'with' statement
        with open(file_path, "rt") as file:
            for line in file:
                # Strip any leading/trailing whitespace characters
                line = line.strip()
                # print(line)

                # Search for the coefficient pattern in the line
                coeff_match = coeff_pattern.search(line)
                if coeff_match:
                    # Extract degree, order, Clm, and Slm
                    degree = int(coeff_match.group(1))
                    order = int(coeff_match.group(2))
                    clm = np.double(coeff_match.group(3))
                    slm = np.double(coeff_match.group(4))
                    clm_sdev = np.double(coeff_match.group(5))
                    slm_sdev = np.double(coeff_match.group(6))

                    # Store the coefficients in the dictionary
                    data["coefficients"][(degree, order)] = {
                        "Clm": clm,
                        "Slm": slm,
                        "Clm_sdev": clm_sdev,
                        "Slm_sdev": slm_sdev,
                    }

    elif source == "csr" or source == "jpl":
        with gzip.open(file_path, "rt") as file:  # gzip.open
            for line in file:
                # Strip any leading/trailing whitespace characters
                line = line.strip()

                # Search for time coverage start
                start_match = start_pattern.search(line)
                if start_match:
                    data["time_coverage_start"] = start_match.group(1)

                # Search for time coverage end
                end_match = end_pattern.search(line)
                if end_match:
                    data["time_coverage_end"] = end_match.group(1)

                # Break the loop if both dates are found
                if data["time_coverage_start"] and data["time_coverage_end"]:
                    break
            # File is automatically closed here due to the 'with' statement

        # Open and read the gzipped file again to extract the coefficients
        with gzip.open(file_path, "rt") as file:
            for line in file:
                # Strip any leading/trailing whitespace characters
                line = line.strip()
                # print(line)

                # Search for the coefficient pattern in the line
                coeff_match = coeff_pattern.search(line)
                if coeff_match:
                    # Extract degree, order, Clm, and Slm
                    degree = int(coeff_match.group(1))
                    order = int(coeff_match.group(2))
                    clm = np.double(coeff_match.group(3))
                    slm = np.double(coeff_match.group(4))
                    clm_sdev = np.double(coeff_match.group(5))
                    slm_sdev = np.double(coeff_match.group(6))

                    # Store the coefficients in the dictionary
                    data["coefficients"][(degree, order)] = {
                        "Clm": clm,
                        "Slm": slm,
                        "Clm_sdev": clm_sdev,
                        "Slm_sdev": slm_sdev,
                    }
    return data

extract_deg1_coeff_tn13(file_path)

Extracts the degree 1 coefficients from the given TN-13 file.

Ensure the TN-13 file used is the one recommended by respective data centres (JPL, CSR, or ITSG). Similar to extract_SH_data, but specifically for TN-13 files. Returns degree 1 replacement coefficients as a dictionary. Uses the degree and order of a coefficient as the key and the coefficient values as the value.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file.

required

Returns:

Type Description
dict

Dictionary containing the degree 1 (order 1) coefficients and time coverage start and end dates.

Source code in pyshbundle/io.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def extract_deg1_coeff_tn13(file_path: str) -> dict:
    """Extracts the degree 1 coefficients from the given TN-13 file.

    Ensure the TN-13 file used is the one recommended by respective data centres (JPL, CSR, or ITSG).
    Similar to extract_SH_data, but specifically for TN-13 files.
    Returns degree 1 replacement coefficients as a dictionary.
    Uses the degree and order of a coefficient as the key and the coefficient values as the value.

    Args:
        file_path (str): Absolute path to the file.

    Returns:
        (dict): Dictionary containing the degree 1 (order 1) coefficients and time coverage start and end dates.
    """
    data_dict = {}

    with open(file_path, "rt") as file:
        lines = file.readlines()
        for line in lines:
            # Extract data using regex
            pattern = re.compile(
                r"^GRCOF2\s+(\d+)\s+(\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+([-+]?\d*\.\d+e[-+]?\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)"
            )
            match = pattern.match(line)

            if match:
                degree = int(match.group(1))
                order = int(match.group(2))
                Clm = float(match.group(3))
                Slm = float(match.group(4))
                Clm_sdev = np.double(match.group(5))
                Slm_sdev = np.double(match.group(6))
                epoch_begin = match.group(7)
                epoch_end = match.group(8)

                # Use epoch start as key but in yyyy-mm-dd format
                epoch_key = datetime.strptime(epoch_begin, "%Y%m%d.%H%M%S").strftime(
                    "%Y-%m"
                )
                data_dict[epoch_key, degree, order] = {
                    "degree": degree,
                    "order": order,
                    "Clm": Clm,
                    "Slm": Slm,
                    "Clm_sdev": Clm_sdev,
                    "Slm_sdev": Slm_sdev,
                    "epoch_begin": epoch_begin,
                    "epoch_end": epoch_end,
                }
    # Print a sample of the data to check if it's parsed correctly
    # for key in sorted(data_dict.keys())[:5]:  # print first 5 entries
    #     print(f"{key}: {data_dict[key]}")
    return data_dict

extract_deg2_3_coeff_tn14(file_path)

Extracts the degree 2 and 3 coefficients from the given file.

Ensure the TN-14 file used is the one recommended by respective data centres (JPL, CSR, or ITSG). Similar to extract_SH_data, but specifically for TN-14 files. Returns degree 2, 3 replacement coefficients as a dictionary. Uses the degree and order of a coefficient as the key and the coefficient values as the value.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file.

required

Returns:

Type Description
dict

Dictionary containing the degree 2, 3 (order 0) coefficients and time coverage start and end dates.

Source code in pyshbundle/io.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def extract_deg2_3_coeff_tn14(file_path: str) -> dict:
    """Extracts the degree 2 and 3 coefficients from the given file.

    Ensure the TN-14 file used is the one recommended by respective data centres (JPL, CSR, or ITSG).
    Similar to extract_SH_data, but specifically for TN-14 files.
    Returns degree 2, 3 replacement coefficients as a dictionary.
    Uses the degree and order of a coefficient as the key and the coefficient values as the value.

    Args:
        file_path (str): Absolute path to the file.

    Returns:
        (dict): Dictionary containing the degree 2, 3 (order 0) coefficients and time coverage start and end dates.
    """
    data_dict = {}

    with open(file_path, "rt") as file:
        lines = file.readlines()
        for line in lines:
            # Extract data using regex
            pattern = re.compile(
                r"(\d+\.\d+)\s+(\d+\.\d+)\s+([-\d.eE+]+)\s+([-\d.eE+]+)\s+([-\d.eE+]+)\s+([-\d.eE+]+|NaN)?\s+([-\d.eE+]+|NaN)?\s+([-\d.eE+]+|NaN)?\s+(\d+\.\d+)\s+(\d+\.\d+)"
            )
            match = pattern.match(line)

            if match:
                mjd_start = float(match.group(1))
                year_frac_start = float(match.group(2))
                c20 = np.double(match.group(3))
                c20_mean_diff = np.double(match.group(4))
                c20_sigma = np.double(match.group(5))
                c30 = match.group(6)
                c30_mean_diff = match.group(7)
                c30_sigma = match.group(8)
                mjd_end = float(match.group(9))
                year_frac_end = float(match.group(10))

                # Only add C30 if it exists (not)
                if c30.lower() != "nan":
                    c30 = np.double(c30)
                    c30_mean_diff = np.double(c30_mean_diff)
                    c30_sigma = np.double(c30_sigma)
                else:
                    c30 = None
                    c30_mean_diff = None
                    c30_sigma = None

                # Use mjd as key but in yyyy-mm-dd format
                mjd_key = julian.from_jd(mjd_start, fmt="mjd").date().strftime("%Y-%m")
                data_dict[mjd_key] = {
                    "year_frac_start": year_frac_start,
                    "mjd_start": mjd_start,
                    "c20": c20,
                    "c20_mean_diff": c20_mean_diff,
                    "c20_sigma": c20_sigma,
                    "c30": c30,
                    "c30_mean_diff": c30_mean_diff,
                    "c30_sigma": c30_sigma,
                    "mjd_end": mjd_end,
                    "year_frac_end": year_frac_end,
                }
    # Print a sample of the data to check if it's parsed correctly
    # for key in sorted(data_dict.keys())[:5]:  # print first 5 entries
    #     print(f"{key}: {data_dict[key]}")
    return data_dict

find_date_in_replacemnt_file(replacemnt_mat, file_type, epoch_begin, epoch_end=None)

Find row indices in a replacement coefficient matrix matching the given epoch dates.

Parameters:

Name Type Description Default
replacemnt_mat ndarray

Matrix of replacement coefficients with date columns.

required
file_type str

Type of replacement file; either 'tn-13' or 'tn-14'.

required
epoch_begin date | str

Start date of the GRACE data epoch (date object or YYYY-MM string for ITSG).

required
epoch_end optional

End date of the GRACE data epoch. Required for JPL/CSR sources. Defaults to None.

None

Returns:

Name Type Description
list list

List of row indices in the replacement matrix that match the given epoch.

Raises:

Type Description
ValueError

If file_type is not 'tn-13' or 'tn-14'.

Source code in pyshbundle/io.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
def find_date_in_replacemnt_file(
    replacemnt_mat: np.ndarray, file_type: str, epoch_begin: date | str, epoch_end: date | None = None
) -> list:
    """Find row indices in a replacement coefficient matrix matching the given epoch dates.

    Args:
        replacemnt_mat (numpy.ndarray): Matrix of replacement coefficients with date columns.
        file_type (str): Type of replacement file; either 'tn-13' or 'tn-14'.
        epoch_begin: Start date of the GRACE data epoch (date object or YYYY-MM string for ITSG).
        epoch_end (optional): End date of the GRACE data epoch. Required for JPL/CSR sources. Defaults to None.

    Returns:
        list: List of row indices in the replacement matrix that match the given epoch.

    Raises:
        ValueError: If file_type is not 'tn-13' or 'tn-14'.
    """
    # epoch_begin and epoch_end -> date from the grace data file
    # begin_date and end_data -> date from the replacement file (tn-13 or tn-14)

    rows, cols = replacemnt_mat.shape

    if file_type == "tn-13":
        time_buffer_itsg = timedelta(days=23)
        date_idxs = set()
        # think of a rather efficient searching scheme
        for i in range(rows):
            begin_date = datetime.strptime(
                str(int(replacemnt_mat[i][-2])), "%Y%m%d"
            ).date()
            end_date = datetime.strptime(
                str(int(replacemnt_mat[i][-1])), "%Y%m%d"
            ).date()

            if epoch_end:
                # for jpl and csr
                if begin_date == epoch_begin and end_date == epoch_end:
                    date_idxs.add(i)
                    print(
                        f"epoch-begin: {epoch_begin}, epoch-end: {epoch_end}, start: {begin_date}, end: {end_date}"
                    )
            else:
                # for itsg
                # begin_date = f"{begin_date.year}-{str(begin_date.month).zfill(2)}"
                if isinstance(epoch_begin, str):
                    epoch_begin = datetime.strptime(epoch_begin, "%Y-%m").date()

                if (
                    begin_date - time_buffer_itsg
                    <= epoch_begin
                    <= begin_date + time_buffer_itsg
                ):
                    date_idxs.add(i)
                    print(
                        f"start: {begin_date - time_buffer_itsg}, epoch-begin: {epoch_begin}, UB:{begin_date + time_buffer_itsg}"
                    )

            # Add bit more error handling statments
            # rest is fine -> if inputs's right - output is right

    elif file_type == "tn-14":
        # there will be only one row per month -> for sake of consistency using set
        # print("TN-14 Replacement file")
        date_idxs = set()
        # think of a rather efficient searching scheme
        time_buffer = timedelta(days=5)
        time_buffer_itsg = timedelta(days=23)
        for i in range(rows):
            begin_date = julian.from_jd(replacemnt_mat[i][-2], fmt="mjd").date()
            end_date = julian.from_jd(replacemnt_mat[i][-1], fmt="mjd").date()

            if epoch_end:
                if (
                    begin_date >= epoch_begin - time_buffer  # type: ignore[operator]
                    and end_date <= epoch_end + time_buffer
                ):
                    date_idxs.add(i)
                    print(
                        f"start: {begin_date}, epoch-begin: {epoch_begin}, LB:{epoch_begin - time_buffer}, UB: {epoch_end + time_buffer}, end: {end_date}, epoch-end: {epoch_end}"  # type: ignore[operator]
                    )
            else:
                # for itsg
                if isinstance(epoch_begin, str):
                    epoch_begin = datetime.strptime(epoch_begin, "%Y-%m").date()
                if (
                    begin_date - time_buffer_itsg
                    <= epoch_begin
                    <= begin_date + time_buffer_itsg
                ):
                    date_idxs.add(i)
                    print(
                        f"start: {begin_date - time_buffer_itsg}, epoch-begin: {epoch_begin}, UB:{begin_date + time_buffer_itsg}"
                    )

                    # Add bit more error handling statments
            # rest is fine -> if inputs's right - output is right

    else:
        raise ValueError(
            "Technical Note-13 (tn-13) and Technical Note 14 (tn-14) supported..."
        )

    return list(date_idxs)

find_word(info_lines, search_key)

Find the index of the first line containing the given keyword.

Parameters:

Name Type Description Default
info_lines list

List of lines (strings or bytes) to search through.

required
search_key str

The keyword to locate within the lines.

required

Returns:

Name Type Description
int int

Index of the first line that contains the search key.

Source code in pyshbundle/io.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def find_word(info_lines: list, search_key: str) -> int:
    """Find the index of the first line containing the given keyword.

    Args:
        info_lines (list): List of lines (strings or bytes) to search through.
        search_key (str): The keyword to locate within the lines.

    Returns:
        int: Index of the first line that contains the search key.
    """
    # finding the target word in the read lines

    for i in range(len(info_lines)):
        parsed_array = parse_lines(info_lines[i], parse_fmt=r"\s+")
        if search_key in parsed_array:
            search_idx = i
            break

    return search_idx

parse_csr_file(file_path)

Read a CSR GRACE .gz file and return the header and spherical harmonic data.

Parameters:

Name Type Description Default
file_path str

Path to the CSR GRACE .gz file.

required

Returns:

Name Type Description
tuple tuple[list, dict]

A tuple containing: - csr_header (list): Raw header lines from the file. - csr_data (dict): Parsed spherical harmonic coefficients and metadata.

Source code in pyshbundle/io.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def parse_csr_file(file_path: str) -> tuple[list, dict]:
    """Read a CSR GRACE .gz file and return the header and spherical harmonic data.

    Args:
        file_path (str): Path to the CSR GRACE .gz file.

    Returns:
        tuple: A tuple containing:
            - csr_header (list): Raw header lines from the file.
            - csr_data (dict): Parsed spherical harmonic coefficients and metadata.
    """
    # ensure that the file path is valid then proceed

    # check if the file is ziped or not
    if file_path[-3:] == ".gz":
        # open the file and read the lines
        with gzip.open(file_path, "r") as file:
            # read the file line wise -> obtain a list of bytes
            info_lines = file.readlines()

            for i in range(len(info_lines)):
                # find the index of line which indicates end of header info
                if str(info_lines[i]) == str(
                    b"# End of YAML header\n",
                ):
                    end_of_header_idx = i
                    break

        header_info = info_lines[:end_of_header_idx]

        # parse the header strings to extract relavant metadata info
        csr_header = header_info  # parse_jpl_header(header_info)

        # parse the data strings to extract numeric data in suitable matrix fmt
        csr_data = extract_SH_data(file_path, source="csr")

        # Organize the data into either matrix, dataframe or dictionary format

    return csr_header, csr_data

parse_csr_header()

Parse CSR GRACE file header (not yet implemented).

Raises:

Type Description
NotImplementedError

This function is not yet implemented; use parse_jpl_header as reference.

Source code in pyshbundle/io.py
535
536
537
538
539
540
541
542
543
544
545
def parse_csr_header() -> None:
    """Parse CSR GRACE file header (not yet implemented).

    Raises:
        NotImplementedError: This function is not yet implemented; use parse_jpl_header as reference.
    """
    # similar to JPL one

    raise NotImplementedError(
        "Similar to `parse_jpl_header`... not yet implemented seperately."
    )

parse_itsg_file(file_path)

Read an ITSG GRACE .gfc file and return the header and spherical harmonic data.

Parameters:

Name Type Description Default
file_path str

Path to the ITSG GRACE .gfc file.

required

Returns:

Name Type Description
tuple tuple[list, dict]

A tuple containing: - istg_header (list): Raw header lines from the file. - itsg_data (dict): Parsed spherical harmonic coefficients and metadata.

Source code in pyshbundle/io.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
def parse_itsg_file(file_path: str) -> tuple[list, dict]:
    """Read an ITSG GRACE .gfc file and return the header and spherical harmonic data.

    Args:
        file_path (str): Path to the ITSG GRACE .gfc file.

    Returns:
        tuple: A tuple containing:
            - istg_header (list): Raw header lines from the file.
            - itsg_data (dict): Parsed spherical harmonic coefficients and metadata.
    """
    # ensure that the file path is valid then proceed

    # check if the file is ziped or not

    # open the file and read the lines
    if file_path[-4:] == ".gfc":
        with open(file_path, "r") as file:
            # read the file line wise -> obtain a list of bytes
            info_lines = file.readlines()

            for i in range(len(info_lines)):
                if str(info_lines[i]) == str(
                    "end_of_head ==================================================================================\n",
                ):
                    end_of_header_idx = i
                    break

        istg_header = info_lines[:end_of_header_idx]

        # parse the header strings to extract relavant metadata info
        itsg_data = extract_SH_data(file_path, source="itsg")

    return istg_header, itsg_data

parse_itsg_header(header_info)

Parse ITSG GRACE file header and return a metadata dictionary and model date string.

Parameters:

Name Type Description Default
header_info list

List of header lines read from the ITSG .gfc file.

required

Returns:

Name Type Description
tuple tuple[dict, str]

A tuple containing: - header_dict (dict): Dictionary of parsed header metadata. - date_str (str): Date string extracted from the model name (YYYY-MM format).

Source code in pyshbundle/io.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def parse_itsg_header(header_info: list) -> tuple[dict, str]:
    """Parse ITSG GRACE file header and return a metadata dictionary and model date string.

    Args:
        header_info (list): List of header lines read from the ITSG .gfc file.

    Returns:
        tuple: A tuple containing:
            - header_dict (dict): Dictionary of parsed header metadata.
            - date_str (str): Date string extracted from the model name (YYYY-MM format).
    """
    normal_keys = [
        "modelname",
        "product_type",
        "norm",
        "tide_system",
        "errors",
        "earth_gravity_constant",
        "radius",
        "max_degree",
    ]

    # physical_constant_keys = ['earth_gravity_constant', 'radius', ]

    for key in normal_keys:
        key_index_in_header = find_word(header_info, key)
        # print(f"{key} - header line = {key_index_in_header} value= {parse_lines(header_info[key_index_in_header])[1]}")

    model_name_idx = find_word(header_info, "modelname")
    date_str = parse_lines(header_info[model_name_idx])[1][-7:]

    header_dict: dict = {}
    """
    for key in physical_constant_keys:
        key_index_in_header = find_word(header_info, key)

        const_long_name = parse_lines(header_info[key_index_in_header])[0]
        const_value = float(parse_lines(header_info[key_index_in_header])[1])
        const_dict = {'long_name': const_long_name, 'value': const_value}
        print(const_dict)

    """
    return header_dict, date_str

parse_jpl_file(file_path)

Reads the spherical harmonic data provided by JPL.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file.

required

Returns:

Name Type Description
tuple tuple[dict, dict]

A tuple containing: - jpl_header (dict): Parsed header information. - jpl_data (dict): Extracted spherical harmonic coefficients data.

Source code in pyshbundle/io.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def parse_jpl_file(file_path: str) -> tuple[dict, dict]:
    """Reads the spherical harmonic data provided by JPL.

    Args:
        file_path (str): Absolute path to the file.

    Returns:
        tuple: A tuple containing:
            - jpl_header (dict): Parsed header information.
            - jpl_data (dict): Extracted spherical harmonic coefficients data.
    """
    # ensure that the file path is valid then proceed

    # check if the file is ziped or not

    if file_path[-3:] == ".gz":
        # open the file and read the lines
        with gzip.open(file_path, "r") as file:
            # read the file line wise -> obtain a list of bytes
            info_lines = file.readlines()

            for i in range(len(info_lines)):
                # find the end of header sentence in the text file
                if str(info_lines[i]) == str(
                    b"# End of YAML header\n",
                ):
                    end_of_header_idx = i
                    break

        # everything after the header is the numerical data
        header_info = info_lines[:end_of_header_idx]

        # parse the header strings to extract relavant metadata info
        jpl_header = parse_jpl_header(header_info)

        # parse the header strings to extract relavant metadata info
        jpl_data = extract_SH_data(file_path, source="itsg")

    return jpl_header, jpl_data

parse_jpl_header(header_info)

Parse JPL GRACE file header and return a dictionary of metadata.

Parameters:

Name Type Description Default
header_info list

List of byte strings read from the JPL file header.

required

Returns:

Name Type Description
dict dict

Dictionary containing parsed header fields including title, institution, product_version, processing_level, normalization, permanent_tide_flag, degree, order, earth_gravity_param, and mean_equator_radius.

Source code in pyshbundle/io.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def parse_jpl_header(header_info: list) -> dict:
    """Parse JPL GRACE file header and return a dictionary of metadata.

    Args:
        header_info (list): List of byte strings read from the JPL file header.

    Returns:
        dict: Dictionary containing parsed header fields including title, institution,
            product_version, processing_level, normalization, permanent_tide_flag,
            degree, order, earth_gravity_param, and mean_equator_radius.
    """
    # parse the header info passed by the reader in as list of bytes
    # create a dictionary with key = important params from header file

    header: dict[str, Any] = {}

    # important info from header file
    # Dimension - Degree and Order
    # normalization info
    # permanent_tide_flag
    # earth_gravity_param - G*M_earth with units and value
    # mean_equator_radius - units and value
    # processing_level
    # product_version
    # conventions
    # institution
    # title
    # time_coverage_start
    # time_coverage_end
    # unused_days

    normal_keys = [
        "title",
        "institution",
        "product_version",
        "processing_level",
        "normalization",
        "permanent_tide_flag",
    ]
    dimension_keys = ["degree", "order"]
    date_time_keys = ["time_coverage_start", "time_coverage_end", "unused_days"]
    physical_constant_keys = ["earth_gravity_param", "mean_equator_radius"]

    for key in normal_keys:
        key_index_in_header = find_word(header_info, key)
        # print(f"{key} - header line = {key_index_in_header +1} value= {' '.join(parse_lines(header_info[key_index_in_header])[3:])[: -3]}")
        header[key] = " ".join(parse_lines(header_info[key_index_in_header])[3:])[:-3]

    for key in dimension_keys:
        key_index_in_header = find_word(header_info, key)
        val = int(
            " ".join(
                parse_lines(header_info[key_index_in_header], parse_fmt=r"\s+")[3:]
            )[:-3]
        )
        # print(f"{key} - {val}")
        header[key] = val

    for key in date_time_keys:
        # TODO: Look back and find what you meant....
        key_index_in_header = find_word(header_info, key)
        # find a way to make it date time object so it can be used later
        pass

    for key in physical_constant_keys:
        key_index_in_header = find_word(header_info, key)

        const_units = " ".join(parse_lines(header_info[key_index_in_header + 2])[3:])[
            :-3
        ]
        const_value = float(
            " ".join(parse_lines(header_info[key_index_in_header + 3])[3:])[:-3]
        )
        const_dict = {"units": const_units, "value": const_value}
        # returning a dict with value and corresponding units
        header[key] = const_dict

    return header

parse_lines(line, parse_fmt='\\s+')

Split a line string using the given regex pattern.

Parameters:

Name Type Description Default
line str | bytes

The input line to parse (string or bytes).

required
parse_fmt str

Regex pattern used to split the line. Defaults to whitespace.

'\\s+'

Returns:

Name Type Description
list list

List of substrings after splitting.

Source code in pyshbundle/io.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def parse_lines(line: str | bytes, parse_fmt: str = r"\s+") -> list:
    """Split a line string using the given regex pattern.

    Args:
        line: The input line to parse (string or bytes).
        parse_fmt (str, optional): Regex pattern used to split the line. Defaults to whitespace.

    Returns:
        list: List of substrings after splitting.
    """
    #  parses the liness and reutrns an array
    # '\s+' returns array with no whitespace

    parsed_array = re.split(parse_fmt, str(line))

    return parsed_array

parse_tn13_header(header_info)

Parse TN-13 replacement coefficient file header and return title and last reported date.

Parameters:

Name Type Description Default
header_info list

List of header lines read from the TN-13 file.

required

Returns:

Name Type Description
tuple tuple[str, str]

A tuple containing: - title (str): Title string extracted from the header. - last_reported_date (str): Last reported data point date as a string.

Source code in pyshbundle/io.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def parse_tn13_header(header_info: list) -> tuple[str, str]:
    """Parse TN-13 replacement coefficient file header and return title and last reported date.

    Args:
        header_info (list): List of header lines read from the TN-13 file.

    Returns:
        tuple: A tuple containing:
            - title (str): Title string extracted from the header.
            - last_reported_date (str): Last reported data point date as a string.
    """
    # IMP Info
    # - Title
    # - Last reported data point
    # Special Notes
    # - 1, 2, 3, 4, 5

    # finding the index of important sub-headers like Title and Notes
    for i in range(len(header_info)):
        if "TITLE" in header_info[i]:
            title_idx = i
            break

    for i in range(len(header_info)):
        if "SPECIAL NOTES" in header_info[i]:
            break

    # The tile is
    title = (
        " ".join(re.split(r"\s+", header_info[title_idx + 1])[1:-1])
        + " ".join(re.split(r"\s+", header_info[title_idx + 2])[1:-1])
        + " ".join(re.split(r"\s+", header_info[title_idx + 3])[1:-1])
    )

    # TODO: later convert the str object to a date-time object
    last_reported_date = (re.split(r"\s+", header_info[title_idx + 3])[-2])[:-1]

    return title, last_reported_date

parse_tn14_header()

Parse TN-14 replacement coefficient file header (not yet implemented).

Raises:

Type Description
NotImplementedError

This function is not yet implemented.

Source code in pyshbundle/io.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def parse_tn14_header() -> None:
    """Parse TN-14 replacement coefficient file header (not yet implemented).

    Raises:
        NotImplementedError: This function is not yet implemented.
    """
    # Key info
    # - Title
    # - Version
    # - Date Span
    # - Notes:

    # Constants
    # - Mean C20
    # - Mean C30
    # - GM
    # R

    pass

sub2ind(array_shape, rows, cols)

Convert row and column subscripts to linear indices.

Parameters:

Name Type Description Default
array_shape tuple

Shape of the array as a tuple (num_rows, num_cols).

required
rows int or array - like

Row indices.

required
cols int or array - like

Column indices.

required

Returns:

Type Description
int | ndarray

int or array-like: Linear indices corresponding to the row and column subscripts.

Source code in pyshbundle/io.py
961
962
963
964
965
966
967
968
969
970
971
972
973
def sub2ind(array_shape: tuple, rows: int | np.ndarray, cols: int | np.ndarray) -> int | np.ndarray:
    """Convert row and column subscripts to linear indices.

    Args:
        array_shape (tuple): Shape of the array as a tuple (num_rows, num_cols).
        rows (int or array-like): Row indices.
        cols (int or array-like): Column indices.

    Returns:
        int or array-like: Linear indices corresponding to the row and column subscripts.
    """
    # rows, list need to be linear array
    return rows * array_shape[1] + cols

Physical and Geodetic Constants

This script contains some of the major relavant Physical and Geodetic(GRS80) constants.

  • clight speed of light - \(2.99792458e+8\) \(m/s\)
  • G Gravitational constant- \(6.67259e-11\) \(\frac{m^3} {kg \cdot s^2}\)
  • au astronomical unit - \(149.597870691e+9\) \(m\)

  • ae semi-major axis of ellipsoid GRS 80- \(6378137\) m

  • GM geocentric grav. constant GRS 80- \(3.986005e+14\) \(\frac{m^3}{s^2}\)
  • J2 earth's dynamic form factor GRS 80 - \(1.08263e-3\) [unitless C20 unnormalized coefficient]
  • Omega mean ang. velocity GRS 80 - $7.292115e-5 \(\frac{rad}{s}\)

  • flat flattening - \(\frac{1}{298.257222101}\)