Skip to content

Data Preprocessing

Our Data class allows users to read-in raw zipped LOB Data from EPEX (2020 and later), process them accordingly and save each trading day as a separate CSV file. All Data is ultimately stored in UTC timezone format. We show and test this for German Market Data of the years 2020 and 2021, specifically using the 1h products of the continuous intraday market, but this can easily be adapted to other regions or other products. Inputs to the parsing function simply are the start-day and end-day of the data we want to parse, plus the path to the zipped EPEX market data.

Source code in bitepy/data.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
class Data:
    def __init__(self):
        """Initialize a Data instance."""
        pass

    def _load_csv(self, file_path):
        """
        Load a single zipped CSV file with specified dtypes.
        """
        df = pd.read_csv(
            file_path,
            compression="zip",
            dtype={
                "id": np.int64,
                "initial": np.int64,
                "side": "string",
                "start": "string",
                "transaction": "string",
                "validity": "string",
                "price": np.float64,
                "quantity": np.float64,
            },
        )
        df.rename(columns={"Unnamed: 0": "id"}, inplace=True)
        ids = df["id"].to_numpy(dtype=np.int64).tolist()
        initials = df["initial"].to_numpy(dtype=np.int64).tolist()
        sides = df["side"].to_numpy(dtype="str").tolist()
        starts = df["start"].to_numpy(dtype="str").tolist()
        transactions = df["transaction"].to_numpy(dtype="str").tolist()
        validities = df["validity"].to_numpy(dtype="str").tolist()
        prices = df["price"].to_numpy(dtype=np.float64).tolist()
        quantities = df["quantity"].to_numpy(dtype=np.float64).tolist()
        return ids, initials, sides, starts, transactions, validities, prices, quantities

    def _read_id_table_2020(self, timestamp, datapath):
        year = timestamp.strftime("%Y")
        month = timestamp.strftime("%m")
        datestr = "Continuous_Orders_DE_" + timestamp.strftime("%Y%m%d")

        # Get file name of zip-file and CSV file within the zip file
        file_list = os.listdir(f"{datapath}/{year}/{month}")
        zip_file_name = [i for i in file_list if datestr in i][0]
        csv_file_name = zip_file_name[:-4]

        # Read data from the CSV inside the zip file
        zip_file = ZipFile(f"{datapath}/{year}/{month}/" + zip_file_name)
        df = (pd.read_csv(zip_file.open(csv_file_name), sep=";", decimal=".")
              .drop_duplicates(subset=["Order ID", "Initial ID", "Action code", "Validity time", "Price", "Quantity"])
              .loc[lambda x: x["Is User Defined Block"] == 0]
              .loc[lambda x: (x["Product"] == "Intraday_Hour_Power") | (x["Product"] == "XBID_Hour_Power")]
              .loc[lambda x: (x["Action code"] == "A") | (x["Action code"] == "D") | (x["Action code"] == "C") | (x["Action code"] == "I")]
              .drop(["Delivery area", "Execution restriction", "Market area", "Parent ID", "Delivery End",
                     "Currency", "Product", "isOTC", "Is User Defined Block", "Unnamed: 20", "RevisionNo", "Entry time"],
                    axis=1)
              .rename({"Order ID": "order",
                       "Initial ID": "initial",
                       "Delivery Start": "start",
                       "Side": "side",
                       "Price": "price",
                       "Volume": "volume",
                       "Validity time": "validity",
                       "Action code": "action",
                       "Transaction Time": "transaction",
                       "Quantity": "quantity"}, axis=1)
              .assign(start=lambda x: pd.to_datetime(x.start, format="%Y-%m-%dT%H:%M:%SZ"))
              .assign(validity=lambda x: pd.to_datetime(x.validity, format="%Y-%m-%dT%H:%M:%SZ"))
              .assign(transaction=lambda x: pd.to_datetime(x.transaction, format="%Y-%m-%dT%H:%M:%S.%fZ"))
              )

        # Remove iceberg orders
        iceberg_IDs = df.loc[df["action"] == "I", "initial"].unique()
        df = df.loc[~df["initial"].isin(iceberg_IDs)]

        # Process change messages (action code 'C')
        change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
        not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]

        change_exists = change_messages.shape[0] > 0
        change_counter = 0
        while change_exists:
            indexer_messA_with_change = df[(df["order"].isin(change_messages["order"])) & (df["action"] == "A")] \
                .sort_values("transaction").groupby("order").tail(1).index

            df["df_index_copy"] = df.index
            merged = pd.merge(change_messages, df.loc[indexer_messA_with_change], on='order')
            df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()

            # Change the action code from "C" to "A" for processed messages
            df.loc[df.index.isin(change_messages.index), "action"] = "A"
            df.drop("df_index_copy", axis=1, inplace=True)

            # Redo the procedure for remaining change messages
            change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
            not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
            change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]
            change_exists = change_messages.shape[0] > 0
            change_counter += 1

        # Process cancel messages (action code 'D')
        cancel_messages = df[df["action"] == "D"]
        not_added = cancel_messages[~(cancel_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        cancel_messages = cancel_messages[~(cancel_messages["order"].isin(not_added["order"]))]

        indexer_messA_with_cancel = df[(df["order"].isin(cancel_messages["order"])) & (df["action"] == "A")] \
            .sort_values("transaction").groupby("order").tail(1).index
        df["df_index_copy"] = df.index
        merged = pd.merge(cancel_messages, df.loc[indexer_messA_with_cancel], on='order')
        df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()

        df = df.loc[lambda x: ~(x["action"] == "D")]
        df = df.drop(["order", "action", "df_index_copy"], axis=1)

        # Reorder and format columns
        newOrder = ["initial", "side", "start", "transaction", "validity", "price", "quantity"]
        df = df[newOrder]
        df['side'] = df['side'].str.upper()

        df["start"] = df["start"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%SZ')
        df["transaction"] = df["transaction"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'
        df["validity"] = df["validity"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'

        return df

    def _read_id_table_2021(self, timestamp, datapath):
        year = timestamp.strftime("%Y")
        month = timestamp.strftime("%m")
        datestr = "Continuous_Orders-DE-" + timestamp.strftime("%Y%m%d")

        # Get file name of zip-file and CSV file within the zip file
        file_list = os.listdir(f"{datapath}/{year}/{month}")
        zip_file_name = [i for i in file_list if datestr in i][0]
        csv_file_name = zip_file_name[:-4]

        # Read data from the CSV inside the zip file
        zip_file = ZipFile(f"{datapath}/{year}/{month}/" + zip_file_name)
        df = (pd.read_csv(zip_file.open(csv_file_name), sep=",", decimal=".", skiprows=1)
              .drop_duplicates(subset=["OrderId", "InitialId", "ActionCode", "ValidityTime", "Price", "Quantity"])
              .loc[lambda x: x["UserDefinedBlock"] == "N"]
              .loc[lambda x: (x["Product"] == "Intraday_Hour_Power") | (x["Product"] == "XBID_Hour_Power")]
              .loc[lambda x: (x["ActionCode"] == "A") | (x["ActionCode"] == "D") | (x["ActionCode"] == "C") | (x["ActionCode"] == "I")]
              .drop(["LinkedBasketId", "DeliveryArea", "ParentId", "DeliveryEnd", "Currency", "Product",
                     "UserDefinedBlock", "RevisionNo", "ExecutionRestriction", "CreationTime", "QuantityUnit",
                     "Volume", "VolumeUnit"], axis=1)
              .rename({"OrderId": "order",
                       "InitialId": "initial",
                       "DeliveryStart": "start",
                       "Side": "side",
                       "Price": "price",
                       "Volume": "volume",
                       "ValidityTime": "validity",
                       "ActionCode": "action",
                       "TransactionTime": "transaction",
                       "Quantity": "quantity"}, axis=1)
              .assign(start=lambda x: pd.to_datetime(x.start, format="%Y-%m-%dT%H:%M:%SZ"))
              .assign(validity=lambda x: pd.to_datetime(x.validity, format="%Y-%m-%dT%H:%M:%SZ"))
              .assign(transaction=lambda x: pd.to_datetime(x.transaction, format="%Y-%m-%dT%H:%M:%S.%fZ"))
              )
        # Remove iceberg orders
        iceberg_IDs = df.loc[df["action"] == "I", "initial"].unique()
        df = df.loc[~df["initial"].isin(iceberg_IDs)]

        # Process change messages (action code 'C')
        change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
        not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]

        change_exists = change_messages.shape[0] > 0
        change_counter = 0
        while change_exists:
            indexer_messA_with_change = df[(df["order"].isin(change_messages["order"])) & (df["action"] == "A")] \
                .sort_values("transaction").groupby("order").tail(1).index

            df["df_index_copy"] = df.index
            merged = pd.merge(change_messages, df.loc[indexer_messA_with_change], on='order')
            df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()

            # Change the action code from "C" to "A" so it can be processed in the next iteration
            df.loc[df.index.isin(change_messages.index), "action"] = "A"
            df.drop("df_index_copy", axis=1, inplace=True)

            # Redo procedure for remaining change messages
            change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
            not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
            change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]
            change_exists = change_messages.shape[0] > 0
            change_counter += 1

        # Process cancel messages (action code 'D')
        cancel_messages = df[df["action"] == "D"]
        not_added = cancel_messages[~(cancel_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        cancel_messages = cancel_messages[~(cancel_messages["order"].isin(not_added["order"]))]

        indexer_messA_with_cancel = df[(df["order"].isin(cancel_messages["order"])) & (df["action"] == "A")] \
            .sort_values("transaction").groupby("order").tail(1).index
        df["df_index_copy"] = df.index
        merged = pd.merge(cancel_messages, df.loc[indexer_messA_with_cancel], on='order')
        df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()

        df = df.loc[lambda x: ~(x["action"] == "D")]
        df = df.drop(["order", "action", "df_index_copy"], axis=1)

        df["start"] = df["start"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%SZ')
        df["transaction"] = df["transaction"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'
        df["validity"] = df["validity"].dt.tz_localize('UTC').dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'

        return df


    def _read_nordpool_table(self, date, marketdatapath):
        """Read and process NordPool parquet files for a specific date.
           Nordpool contains flags for full and partial execution of orders. We disregard this, as it will become apparent in our backtesting LOB traversal. After partial execution, orders are sometimes modified, deleted etc., this all stays relevant and is handled.
           We also currently still disregard FoK and IoC orders (treat them as 0 validity duration). They have all the same updateTime in their message-chain.
        """
        date_folder = date.strftime("%Y%m%d")
        folder_path = Path(marketdatapath) / date_folder

        if not folder_path.exists():
            raise FileNotFoundError(f"Folder not found: {folder_path}")

        parquet_files = sorted(folder_path.glob("NordPool_*.parquet"))

        if not parquet_files:
            raise FileNotFoundError(f"No parquet files found in folder: {folder_path}")

        # Read and concatenate all hourly parquet files
        dfs = []
        for file in parquet_files:
            df_temp = pd.read_parquet(file)
            dfs.append(df_temp)

        df = pd.concat(dfs, ignore_index=True)

        # Convert timestamps (needed for subsequent filtering and processing)
        df = (df
            .assign(createdTime=lambda x: pd.to_datetime(x['createdTime'], format='ISO8601'))
            .assign(updatedTime=lambda x: pd.to_datetime(x['updatedTime'], format='ISO8601'))
            .assign(expirationTime=lambda x: pd.to_datetime(x['expirationTime'], format='ISO8601'))
            .assign(deliveryStart=lambda x: pd.to_datetime(x['deliveryStart'], format='ISO8601'))
            .assign(deliveryEnd=lambda x: pd.to_datetime(x['deliveryEnd'], format='ISO8601'))
            )

        # Filter and prepare data
        df = (df
            .drop_duplicates(subset=['orderId', 'originalOrderId', 'action', 'expirationTime', 'price', 'volume'])
            .loc[lambda x: x['contractName'].str.startswith('PH')]
            .loc[lambda x: x['action'].isin(['UserAdded', 'UserModified', 'UserDeleted', 'SystemDeleted', 'UserHibernated'])]
            )

        # Remove iceberg orders
        iceberg_IDs = df.loc[df['orderType'] == 'Iceberg', 'originalOrderId'].unique()
        df = df.loc[~df['originalOrderId'].isin(iceberg_IDs)]

        # Replace letters with numbers in originalOrderId and orderId
        unique_letters = sorted(df['originalOrderId'].astype(str).str.findall(r'[A-Za-z]').str.join('').unique())
        # Create mapping of unique letters to numbers starting from 11
        letter_to_num = {letter: str(i+11) for i, letter in enumerate(unique_letters)}
        # Function to replace letters with numbers
        def replace_letters(order_id):
            order_id = str(order_id)
            for letter, num in letter_to_num.items():
                order_id = order_id.replace(letter, num)
            return order_id

        # Apply replacement to originalOrderId column
        df['originalOrderId'] = df['originalOrderId'].apply(replace_letters)
        df['orderId'] = df['orderId'].apply(replace_letters)

        # Rename columns to standardized format
        df = df.rename(columns={
            'orderId': 'order',
            'originalOrderId': 'initial',
            'deliveryStart': 'start',
            'updatedTime': 'transaction',
            'expirationTime': 'validity',
            'volume': 'quantity',
            'action': 'action_original'
        })

        # Map NordPool actions to standardized codes
        df['action'] = df['action_original'].map({
            'UserAdded': 'A',
            'UserModified': 'C',
            'UserDeleted': 'D',
            'SystemDeleted': 'D',
            'UserHibernated': 'H'
        })

        # Process change messages (modifications)
        change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
        not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]

        change_exists = change_messages.shape[0] > 0
        while change_exists:
            indexer_messA_with_change = df[(df["order"].isin(change_messages["order"])) & (df["action"] == "A")] \
                .sort_values("transaction").groupby("order").tail(1).index
            df["df_index_copy"] = df.index
            merged = pd.merge(change_messages, df.loc[indexer_messA_with_change], on='order')
            df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()
            df.loc[df.index.isin(change_messages.index), "action"] = "A"
            df.drop("df_index_copy", axis=1, inplace=True)

            change_messages = df[df["action"] == "C"].drop_duplicates(subset=["order"], keep="first")
            not_added = change_messages[~(change_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
            change_messages = change_messages[~(change_messages["order"].isin(not_added["order"]))]
            change_exists = change_messages.shape[0] > 0

        # Process cancel messages (deletions)
        cancel_messages = df[df["action"] == "D"]
        not_added = cancel_messages[~(cancel_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        cancel_messages = cancel_messages[~(cancel_messages["order"].isin(not_added["order"]))]

        indexer_messA_with_cancel = df[(df["order"].isin(cancel_messages["order"])) & (df["action"] == "A")] \
            .sort_values("transaction").groupby("order").tail(1).index
        df["df_index_copy"] = df.index
        merged = pd.merge(cancel_messages, df.loc[indexer_messA_with_cancel], on='order')
        df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()
        df.drop("df_index_copy", axis=1, inplace=True)

        df = df.loc[lambda x: ~(x["action"] == "D")]

        # Process hibernation messages
        hibernated_messages = df[df["action"] == "H"]
        not_added = hibernated_messages[~(hibernated_messages["order"].isin(df.loc[df["action"] == "A", "order"]))]
        hibernated_messages = hibernated_messages[~(hibernated_messages["order"].isin(not_added["order"]))]

        if not hibernated_messages.empty:
            indexer_messA_with_hibernated = df[(df["order"].isin(hibernated_messages["order"])) & (df["action"] == "A")] \
                .sort_values("transaction").groupby("order").tail(1).index
            df["df_index_copy"] = df.index
            merged = pd.merge(hibernated_messages, df.loc[indexer_messA_with_hibernated], on='order')
            df.loc[merged["df_index_copy"].to_numpy(), "validity"] = merged["transaction_x"].to_numpy()
            df.drop("df_index_copy", axis=1, inplace=True)

        df = df.loc[lambda x: ~(x["action"] == "H")]
        df = df.drop(["order", "action", "action_original"], axis=1, errors='ignore')

        # Filter out orders where validity time is not after transaction time; Sometimes orders are added and deleted at the same time.
        df = df[df['validity'] > df['transaction']]

        # Convert timestamps to string format
        df["start"] = df["start"].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
        df["transaction"] = df["transaction"].dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'
        df["validity"] = df["validity"].dt.strftime('%Y-%m-%dT%H:%M:%S.%f').str[:-3] + 'Z'

        # rename side to all uppercase
        df['side'] = df['side'].str.upper()

        # Select and order final columns
        df = df[['initial', 'side', 'start', 'transaction', 'validity', 'price', 'quantity']]

        return df


    def parse_market_data(self, start_date_str: str, end_date_str: str, marketdatapath: str, 
                        savepath: str, market_type: str, verbose: bool = True):
        """
        Parse market data between two dates and save processed zipped CSV files.

        Processes raw order book data from EPEX or NordPool markets and converts them into 
        standardized sorted CSV files for each day in UTC time format. Handles order lifecycle 
        events (additions, modifications, cancellations) and reconstructs order validity periods.

        Args:
            start_date_str (str): Start date in format "YYYY-MM-DD"
            end_date_str (str): End date in format "YYYY-MM-DD"
            marketdatapath (str): Path to market data folder with yearly/monthly subfolders
            savepath (str): Directory where processed CSV files will be saved
            market_type (str): "EPEX" or "NordPool"
            verbose (bool, optional): Print progress messages. Defaults to True.
        """

        if not os.path.exists(savepath):
            os.makedirs(savepath)

        start_date = pd.Timestamp(start_date_str)
        end_date = pd.Timestamp(end_date_str)

        if start_date > end_date:
            raise ValueError("Error: Start date is after end date.")
        if market_type == "EPEX" and start_date.year < 2020:
            raise ValueError("Error: Years before 2020 are not supported.")

        dates = pd.date_range(start_date, end_date, freq="D")
        df1 = pd.DataFrame()
        df2 = pd.DataFrame()

        with tqdm(total=len(dates), desc="Loading and saving CSV data", ncols=100, disable=not verbose) as pbar:
            for dt1 in dates:
                pbar.set_description(f"Currently loading and saving date {str(dt1.date())} ... ")
                df1 = df2
                df2 = pd.DataFrame()
                dt2 = dt1 + pd.Timedelta(days=1)

                # Read current day data
                if df1.empty:
                    if market_type == "EPEX":
                        if dt1.year == 2020:
                            df1 = self._read_id_table_2020(dt1, marketdatapath)
                        elif dt1.year >= 2021:
                            df1 = self._read_id_table_2021(dt1, marketdatapath)
                        else:
                            raise ValueError("Error: Year not >= 2020")
                    elif market_type == "NordPool":
                        df1 = self._read_nordpool_table(dt1, marketdatapath)
                    else:
                        raise ValueError(f"Unknown market_type: {market_type}")

                # Read next day data (captures orders with transaction today, delivery tomorrow)
                if dt2 <= end_date:
                    if market_type == "EPEX":
                        if dt2.year == 2020:
                            df2 = self._read_id_table_2020(dt2, marketdatapath)
                        elif dt2.year >= 2021:
                            df2 = self._read_id_table_2021(dt2, marketdatapath)
                        else:
                            raise ValueError("Error: Year not >= 2020")
                    elif market_type == "NordPool":
                        df2 = self._read_nordpool_table(dt2, marketdatapath)
                    else:
                        raise ValueError(f"Unknown market_type: {market_type}")

                # Combine and filter by transaction date
                df = pd.concat([df1, df2])
                df = df.sort_values(by='transaction')
                df['transaction_date'] = pd.to_datetime(df['transaction']).dt.date
                grouped = df.groupby('transaction_date')

                # round price to 2 decimals and quantity to 1 decimal
                df['price'] = df['price'].round(2)
                df['quantity'] = df['quantity'].round(1)

                save_date = dt1.date()
                group = grouped.get_group(save_date)
                daily_filename = f"{savepath}orderbook_{save_date}.csv"
                compression_options = dict(method='zip', archive_name=Path(daily_filename).name)
                group.drop(columns='transaction_date').sort_values(by='transaction').fillna("").to_csv(
                    f'{daily_filename}.zip', compression=compression_options)
                pbar.update(1)

        print("\nWriting CSV data completed.")

    def create_bins_from_csv(self, csv_list: list, save_path: str, verbose: bool = True):
        """
        Convert zipped CSV files of pre-processed order book data into binary files.

        This method sequentially loads each previously generated zipped CSV file, converts it to a binary format using the C++ simulation
        extension, and saves the binary file in the specified directory. Binary files allow for much (10x) quicker loading
        of the data at runtime.

        Args:
            csv_list (list): List of file paths to the zipped CSV files containing pre-processed order book data.
            save_path (str): Directory path where the binary files should be saved. The binary files will use the same base name as the CSV files.
            verbose (bool, optional): If True, print progress messages. Defaults to True.
        """
        if not os.path.exists(save_path):
            os.makedirs(save_path)

        _sim = Simulation_cpp()
        with tqdm(total=len(csv_list), desc="Writing Binaries", ncols=100, disable=not verbose) as pbar:
            for csv_file_path in csv_list:
                filename = os.path.basename(csv_file_path)
                bin_file_path = os.path.join(save_path, filename.replace(".csv.zip", ".bin"))
                pbar.set_description(f"Currently saving binary {bin_file_path.split('/')[-1]} ... ")
                ids, initials, sides, starts, transactions, validities, prices, quantities = self._load_csv(csv_file_path)
                _sim.writeOrderBinFromPandas(
                    bin_file_path,
                    ids,
                    initials,
                    sides,
                    starts,
                    transactions,
                    validities,
                    prices,
                    quantities,
                )
                pbar.update(1)

        print("\nWriting Binaries completed.")

__init__()

Initialize a Data instance.

Source code in bitepy/data.py
28
29
30
def __init__(self):
    """Initialize a Data instance."""
    pass

create_bins_from_csv(csv_list, save_path, verbose=True)

Convert zipped CSV files of pre-processed order book data into binary files.

This method sequentially loads each previously generated zipped CSV file, converts it to a binary format using the C++ simulation extension, and saves the binary file in the specified directory. Binary files allow for much (10x) quicker loading of the data at runtime.

Parameters:

Name Type Description Default
csv_list list

List of file paths to the zipped CSV files containing pre-processed order book data.

required
save_path str

Directory path where the binary files should be saved. The binary files will use the same base name as the CSV files.

required
verbose bool

If True, print progress messages. Defaults to True.

True
Source code in bitepy/data.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def create_bins_from_csv(self, csv_list: list, save_path: str, verbose: bool = True):
    """
    Convert zipped CSV files of pre-processed order book data into binary files.

    This method sequentially loads each previously generated zipped CSV file, converts it to a binary format using the C++ simulation
    extension, and saves the binary file in the specified directory. Binary files allow for much (10x) quicker loading
    of the data at runtime.

    Args:
        csv_list (list): List of file paths to the zipped CSV files containing pre-processed order book data.
        save_path (str): Directory path where the binary files should be saved. The binary files will use the same base name as the CSV files.
        verbose (bool, optional): If True, print progress messages. Defaults to True.
    """
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    _sim = Simulation_cpp()
    with tqdm(total=len(csv_list), desc="Writing Binaries", ncols=100, disable=not verbose) as pbar:
        for csv_file_path in csv_list:
            filename = os.path.basename(csv_file_path)
            bin_file_path = os.path.join(save_path, filename.replace(".csv.zip", ".bin"))
            pbar.set_description(f"Currently saving binary {bin_file_path.split('/')[-1]} ... ")
            ids, initials, sides, starts, transactions, validities, prices, quantities = self._load_csv(csv_file_path)
            _sim.writeOrderBinFromPandas(
                bin_file_path,
                ids,
                initials,
                sides,
                starts,
                transactions,
                validities,
                prices,
                quantities,
            )
            pbar.update(1)

    print("\nWriting Binaries completed.")

parse_market_data(start_date_str, end_date_str, marketdatapath, savepath, market_type, verbose=True)

Parse market data between two dates and save processed zipped CSV files.

Processes raw order book data from EPEX or NordPool markets and converts them into standardized sorted CSV files for each day in UTC time format. Handles order lifecycle events (additions, modifications, cancellations) and reconstructs order validity periods.

Parameters:

Name Type Description Default
start_date_str str

Start date in format "YYYY-MM-DD"

required
end_date_str str

End date in format "YYYY-MM-DD"

required
marketdatapath str

Path to market data folder with yearly/monthly subfolders

required
savepath str

Directory where processed CSV files will be saved

required
market_type str

"EPEX" or "NordPool"

required
verbose bool

Print progress messages. Defaults to True.

True
Source code in bitepy/data.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def parse_market_data(self, start_date_str: str, end_date_str: str, marketdatapath: str, 
                    savepath: str, market_type: str, verbose: bool = True):
    """
    Parse market data between two dates and save processed zipped CSV files.

    Processes raw order book data from EPEX or NordPool markets and converts them into 
    standardized sorted CSV files for each day in UTC time format. Handles order lifecycle 
    events (additions, modifications, cancellations) and reconstructs order validity periods.

    Args:
        start_date_str (str): Start date in format "YYYY-MM-DD"
        end_date_str (str): End date in format "YYYY-MM-DD"
        marketdatapath (str): Path to market data folder with yearly/monthly subfolders
        savepath (str): Directory where processed CSV files will be saved
        market_type (str): "EPEX" or "NordPool"
        verbose (bool, optional): Print progress messages. Defaults to True.
    """

    if not os.path.exists(savepath):
        os.makedirs(savepath)

    start_date = pd.Timestamp(start_date_str)
    end_date = pd.Timestamp(end_date_str)

    if start_date > end_date:
        raise ValueError("Error: Start date is after end date.")
    if market_type == "EPEX" and start_date.year < 2020:
        raise ValueError("Error: Years before 2020 are not supported.")

    dates = pd.date_range(start_date, end_date, freq="D")
    df1 = pd.DataFrame()
    df2 = pd.DataFrame()

    with tqdm(total=len(dates), desc="Loading and saving CSV data", ncols=100, disable=not verbose) as pbar:
        for dt1 in dates:
            pbar.set_description(f"Currently loading and saving date {str(dt1.date())} ... ")
            df1 = df2
            df2 = pd.DataFrame()
            dt2 = dt1 + pd.Timedelta(days=1)

            # Read current day data
            if df1.empty:
                if market_type == "EPEX":
                    if dt1.year == 2020:
                        df1 = self._read_id_table_2020(dt1, marketdatapath)
                    elif dt1.year >= 2021:
                        df1 = self._read_id_table_2021(dt1, marketdatapath)
                    else:
                        raise ValueError("Error: Year not >= 2020")
                elif market_type == "NordPool":
                    df1 = self._read_nordpool_table(dt1, marketdatapath)
                else:
                    raise ValueError(f"Unknown market_type: {market_type}")

            # Read next day data (captures orders with transaction today, delivery tomorrow)
            if dt2 <= end_date:
                if market_type == "EPEX":
                    if dt2.year == 2020:
                        df2 = self._read_id_table_2020(dt2, marketdatapath)
                    elif dt2.year >= 2021:
                        df2 = self._read_id_table_2021(dt2, marketdatapath)
                    else:
                        raise ValueError("Error: Year not >= 2020")
                elif market_type == "NordPool":
                    df2 = self._read_nordpool_table(dt2, marketdatapath)
                else:
                    raise ValueError(f"Unknown market_type: {market_type}")

            # Combine and filter by transaction date
            df = pd.concat([df1, df2])
            df = df.sort_values(by='transaction')
            df['transaction_date'] = pd.to_datetime(df['transaction']).dt.date
            grouped = df.groupby('transaction_date')

            # round price to 2 decimals and quantity to 1 decimal
            df['price'] = df['price'].round(2)
            df['quantity'] = df['quantity'].round(1)

            save_date = dt1.date()
            group = grouped.get_group(save_date)
            daily_filename = f"{savepath}orderbook_{save_date}.csv"
            compression_options = dict(method='zip', archive_name=Path(daily_filename).name)
            group.drop(columns='transaction_date').sort_values(by='transaction').fillna("").to_csv(
                f'{daily_filename}.zip', compression=compression_options)
            pbar.update(1)

    print("\nWriting CSV data completed.")