Подписывайтесь на канал @apache_superset_bi, чтобы быть в курсе последних новостей про Apache Superset
Как сгенерировать таблицу в PostgreSQL с помощью SQL
CREATE TABLE my_db_schema.my_table_for_superset ( datetime TIMESTAMP, category_1 VARCHAR(255), category_2 VARCHAR(255), product VARCHAR(255), customer VARCHAR(255), item_cnt INTEGER, item_price DOUBLE PRECISION ); -- Генерируем данные для table_1 INSERT INTO my_db_schema.my_table_for_superset (datetime, category_1, category_2, product, customer, item_cnt, item_price) SELECT NOW() - INTERVAL '1 day' * (random() * 365)::float, 'Category1_' || (random() * 5 + 1)::int, 'Category2_' || (random() * 7 + 1)::int, 'Product_' || (random() * 39 + 1)::int, 'Customer_' || (random() * 1000 + 1)::int, (random() * 10 + 1)::int, (random() * 100 + 1)::numeric(10, 2) FROM generate_series(1, 1000); SELECT * FROM my_db_schema.my_table_for_superset LIMIT 10 ;
Как сгенерировать таблицу с тестовыми данными для Apache Superset в ClickHouse
CREATE TABLE my_table_for_superset ( datetime DateTime, category_1 String, category_2 String, product String, customer String, item_cnt Int32, item_price Float64 ) ENGINE = MergeTree ORDER BY (datetime, category_1, category_2, product, customer); INSERT INTO my_table_for_superset (datetime, category_1, category_2, product, customer, item_cnt, item_price) SELECT now() - toIntervalSecond(rand() % (365 * 24 * 60 * 60)) AS datetime, 'Category' || toString(rand() % 3 + 1) AS category_1, 'SubCategory' || toString(rand() % 5 + 1) AS category_2, 'Product' || toString(rand() % 10 + 1) AS product, 'Customer' || toString(rand() % 100 + 1) AS customer, randUniform(1,10)::Int AS item_cnt, randUniform(15.5, 299.99) AS item_price FROM system.numbers LIMIT 1000; SELECT * FROM my_table_for_superset LIMIT 10;
Результат:
Вставка данных из csv (dataframe) в ClickHouse с автоматической генерацией CREATE TABLE
Ниже приведен python скрипт, с помощью которого можно загрузить данные из csv, excel, txt, parquet и т.п. источников в Dataframe (pandas) и затем этот DataFrame вставить в сгенерированную таблицу в ClickHouse.
Соответствие типов между форматами pandas и ClickHouse в примере не полное, возможно придется подкрутить.
#==================================== # Пример вставки данных в ClickHouse # Установка библиотек # pip install clickhouse_connect #==================================== import pandas as pd import numpy as np import random from datetime import datetime, timedelta import clickhouse_connect from dateutil.relativedelta import relativedelta #==================================== # Подключение к ClickHouse #==================================== # Создание клиента для подключения к ClickHouse client = clickhouse_connect.get_client(host='11.11.11.11', port='8123', user='login', password='password') #==================================== # Загрузка данных в dataframe #==================================== data_df = pd.read_csv('/file_path_location/Amazon Sale Report.csv') #==================================== # Форматирование данных в колонках #==================================== # Преобразование строки в дату data_df['Date'] = pd.to_datetime(data_df['Date'], format='%m-%d-%y') print(data_df.head()) print(data_df.dtypes) #====================================== # Создание sql запроса для CREATE TABLE #====================================== # Словарь типов данных ClickHouse clickhouse_types = { 'object': 'String', 'int64': 'Int64', 'datetime64[ns]': 'DateTime', 'float64': 'Float64', 'bool': 'Bool' } # Список столбцов для ORDER BY order_by_columns = ['Status'] # Название таблицы в ClickHouse table_name_for_ch = 'amazon_sale_report' # Генерация SQL-запроса на создание таблицы create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name_for_ch} (" for column_name, dtype in data_df.dtypes.items(): create_table_sql += f"\n `{column_name}` {clickhouse_types[str(dtype)]}," create_table_sql = create_table_sql.rstrip(',') # Удаление последней запятой create_table_sql += "\n) ENGINE = MergeTree()" create_table_sql += f"\n ORDER BY ({', '.join(order_by_columns)})" # Склеивание имен столбцов для ORDER BY print(create_table_sql) #==================================== # Заполнение пропусков данных # в Dataframe в колонках #==================================== # Проход по каждой колонке DataFrame for column in data_df.columns: # Получение типа данных текущей колонки dtype = data_df[column].dtype # Проверка типа данных и заполнение в соответствии с условиями if dtype == 'object': # строковый тип данных data_df[column].fillna('(no_info)', inplace=True) elif dtype == 'int64': # целочисленный тип данных data_df[column].fillna(0, inplace=True) elif dtype == 'float64': # вещественный тип данных data_df[column].fillna(0, inplace=True) #==================================== # Удаление и создание заново таблицы #==================================== client.command(f'DROP TABLE IF EXISTS {table_name_for_ch}') client.command(create_table_sql) #========================================= # Вставка данных из dataFrame в ClickHouse #========================================= # Sending data to ClickHouse client.insert_df(table_name_for_ch, data_df) #==================================== # Проверка количества вставленных # строк в таблицу в ClickHouse #==================================== # Check count of rows in the table result_df = client.query_df(f'SELECT count(1) FROM {table_name_for_ch}') print(f'Количество строк данных в таблице "{table_name_for_ch}"') print(result_df.dtypes) print() print(result_df)
Leave a Reply