Подписывайтесь на канал @apache_superset_bi, чтобы быть в курсе последних новостей про Apache Superset
Как сгенерировать таблицу в PostgreSQL с помощью SQL
CREATE TABLE my_db_schema.my_table_for_superset (
datetime TIMESTAMP,
category_1 VARCHAR(255),
category_2 VARCHAR(255),
product VARCHAR(255),
customer VARCHAR(255),
item_cnt INTEGER,
item_price DOUBLE PRECISION
);
-- Генерируем данные для table_1
INSERT INTO my_db_schema.my_table_for_superset (datetime, category_1, category_2, product, customer, item_cnt, item_price)
SELECT
NOW() - INTERVAL '1 day' * (random() * 365)::float,
'Category1_' || (random() * 5 + 1)::int,
'Category2_' || (random() * 7 + 1)::int,
'Product_' || (random() * 39 + 1)::int,
'Customer_' || (random() * 1000 + 1)::int,
(random() * 10 + 1)::int,
(random() * 100 + 1)::numeric(10, 2)
FROM generate_series(1, 1000);
SELECT *
FROM my_db_schema.my_table_for_superset
LIMIT 10
;
Как сгенерировать таблицу с тестовыми данными для Apache Superset в ClickHouse
CREATE TABLE my_table_for_superset (
datetime DateTime,
category_1 String,
category_2 String,
product String,
customer String,
item_cnt Int32,
item_price Float64
) ENGINE = MergeTree
ORDER BY (datetime, category_1, category_2, product, customer);
INSERT INTO my_table_for_superset (datetime, category_1, category_2, product, customer, item_cnt, item_price)
SELECT
now() - toIntervalSecond(rand() % (365 * 24 * 60 * 60)) AS datetime,
'Category' || toString(rand() % 3 + 1) AS category_1,
'SubCategory' || toString(rand() % 5 + 1) AS category_2,
'Product' || toString(rand() % 10 + 1) AS product,
'Customer' || toString(rand() % 100 + 1) AS customer,
randUniform(1,10)::Int AS item_cnt,
randUniform(15.5, 299.99) AS item_price
FROM system.numbers LIMIT 1000;
SELECT *
FROM my_table_for_superset
LIMIT 10;
Результат:
Вставка данных из csv (dataframe) в ClickHouse с автоматической генерацией CREATE TABLE
Ниже приведен python скрипт, с помощью которого можно загрузить данные из csv, excel, txt, parquet и т.п. источников в Dataframe (pandas) и затем этот DataFrame вставить в сгенерированную таблицу в ClickHouse.
Соответствие типов между форматами pandas и ClickHouse в примере не полное, возможно придется подкрутить.
#====================================
# Пример вставки данных в ClickHouse
# Установка библиотек
# pip install clickhouse_connect
#====================================
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import clickhouse_connect
from dateutil.relativedelta import relativedelta
#====================================
# Подключение к ClickHouse
#====================================
# Создание клиента для подключения к ClickHouse
client = clickhouse_connect.get_client(host='11.11.11.11', port='8123', user='login', password='password')
#====================================
# Загрузка данных в dataframe
#====================================
data_df = pd.read_csv('/file_path_location/Amazon Sale Report.csv')
#====================================
# Форматирование данных в колонках
#====================================
# Преобразование строки в дату
data_df['Date'] = pd.to_datetime(data_df['Date'], format='%m-%d-%y')
print(data_df.head())
print(data_df.dtypes)
#======================================
# Создание sql запроса для CREATE TABLE
#======================================
# Словарь типов данных ClickHouse
clickhouse_types = {
'object': 'String',
'int64': 'Int64',
'datetime64[ns]': 'DateTime',
'float64': 'Float64',
'bool': 'Bool'
}
# Список столбцов для ORDER BY
order_by_columns = ['Status']
# Название таблицы в ClickHouse
table_name_for_ch = 'amazon_sale_report'
# Генерация SQL-запроса на создание таблицы
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name_for_ch} ("
for column_name, dtype in data_df.dtypes.items():
create_table_sql += f"\n `{column_name}` {clickhouse_types[str(dtype)]},"
create_table_sql = create_table_sql.rstrip(',') # Удаление последней запятой
create_table_sql += "\n) ENGINE = MergeTree()"
create_table_sql += f"\n ORDER BY ({', '.join(order_by_columns)})" # Склеивание имен столбцов для ORDER BY
print(create_table_sql)
#====================================
# Заполнение пропусков данных
# в Dataframe в колонках
#====================================
# Проход по каждой колонке DataFrame
for column in data_df.columns:
# Получение типа данных текущей колонки
dtype = data_df[column].dtype
# Проверка типа данных и заполнение в соответствии с условиями
if dtype == 'object': # строковый тип данных
data_df[column].fillna('(no_info)', inplace=True)
elif dtype == 'int64': # целочисленный тип данных
data_df[column].fillna(0, inplace=True)
elif dtype == 'float64': # вещественный тип данных
data_df[column].fillna(0, inplace=True)
#====================================
# Удаление и создание заново таблицы
#====================================
client.command(f'DROP TABLE IF EXISTS {table_name_for_ch}')
client.command(create_table_sql)
#=========================================
# Вставка данных из dataFrame в ClickHouse
#=========================================
# Sending data to ClickHouse
client.insert_df(table_name_for_ch, data_df)
#====================================
# Проверка количества вставленных
# строк в таблицу в ClickHouse
#====================================
# Check count of rows in the table
result_df = client.query_df(f'SELECT count(1) FROM {table_name_for_ch}')
print(f'Количество строк данных в таблице "{table_name_for_ch}"')
print(result_df.dtypes)
print()
print(result_df)






Leave a Reply