// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include #include #include #include "boost/date_time/gregorian/gregorian.hpp" #include "boost/date_time/posix_time/posix_time.hpp" #include #include #include #include #include #include #include #include using parquet::ConvertedType; using parquet::Repetition; using parquet::Type; using parquet::schema::GroupNode; using parquet::schema::PrimitiveNode; /* * This example describes writing and reading Parquet Files in C++ and serves as a * reference to the API. * The file contains all the physical data types supported by Parquet. * This example uses the RowGroupWriter API that supports writing RowGroups based on a *certain size **/ /* Parquet is a structured columnar file format * Parquet File = "Parquet data" + "Parquet Metadata" * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a * columnar layout * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their * Columns * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a * complex (nested) type (internal nodes) * For specific details, please refer the format here: * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md **/ #include #include using namespace boost; using namespace std; //constexpr int NUM_ROWS = 10000000; constexpr int NUM_ROWS = 10000; //constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB constexpr int64_t ROW_GROUP_SIZE = 1024 * 1024; const char PARQUET_FILENAME[] = "csv_converted.parquet"; static std::shared_ptr column_string_2(uint32_t num_of_columns) { parquet::schema::NodeVector fields; for(uint32_t i=0;i( GroupNode::Make("schema", Repetition::REQUIRED, fields)); } class tokenize { public: const char *s; std::string input; const char *p; bool last_token; tokenize(std::string& in):s(0),input(in),p(input.c_str()),last_token(false) { }; void get_token(std::string& token) { if(!*p) { token = ""; last_token = true; return; } s=p; while(*p && *p != ',' && *p != '\n') p++; token = std::string(s,p); p++; } bool is_last() { return last_token == true; } }; void generate_rand_columns_csv_datetime(std::string& out, size_t size) { std::stringstream ss; auto year = [](){return rand()%100 + 1900;}; auto month = [](){return 1 + rand()%12;}; auto day = [](){return 1 + rand()%28;}; auto hours = [](){return rand()%24;}; auto minutes = [](){return rand()%60;}; auto seconds = [](){return rand()%60;}; for (auto i = 0U; i < size; ++i) { ss << year() << "-" << std::setw(2) << std::setfill('0')<< month() << "-" << std::setw(2) << std::setfill('0')<< day() << "T" < out_file; PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); // Setup the parquet schema std::shared_ptr schema = column_string_2(csv_num_of_columns); // Add writer properties parquet::WriterProperties::Builder builder; // builder.compression(parquet::Compression::SNAPPY); std::shared_ptr props = builder.build(); // Create a ParquetFileWriter instance std::shared_ptr file_writer = parquet::ParquetFileWriter::Open(out_file, schema, props); // Append a BufferedRowGroup to keep the RowGroup open until a certain size parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); int num_columns = file_writer->num_columns(); std::vector buffered_values_estimate(num_columns, 0); for (int i = 0; !csv_tokens.is_last() && itotal_bytes_written() + rg_writer->total_compressed_bytes() + estimated_bytes) > ROW_GROUP_SIZE) { rg_writer->Close(); std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); rg_writer = file_writer->AppendBufferedRowGroup(); } int col_id; for(col_id=0;col_id(rg_writer->column(col_id)); parquet::ByteArray ba_value; std::string token; csv_tokens.get_token(token); if(token.size() == 0) {//null column int16_t definition_level = 0; ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); } else { int16_t definition_level = 1; ba_value.ptr = (uint8_t*)(token.data()); ba_value.len = token.size(); ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); } buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); } //end-for columns if(csv_tokens.is_last() && col_id(rg_writer->column(col_id)); int16_t definition_level = 0; ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); } } } // end-for rows // Close the RowGroupWriter rg_writer->Close(); // Close the ParquetFileWriter file_writer->Close(); // Write the bytes to file DCHECK(out_file->Close().ok()); } catch (const std::exception& e) { std::cerr << "Parquet write error: " << e.what() << std::endl; return -1; } return 0; } static int csv_file_to_parquet(int argc,char **argv) { //open file (CSV) and load into std::string, convert to parquet(save to FS) if (argc<2) exit(-1); FILE* fp; struct stat l_buf; int st = lstat(argv[1], &l_buf); if(st<0) exit(-1); printf("input csv file size = %ld\n",l_buf.st_size); char * buffer = new char[ l_buf.st_size ]; fp = fopen(argv[1],"r"); if(!fp) exit(-1); size_t read_sz = fread(buffer, 1, l_buf.st_size,fp); std::string csv_obj; csv_obj.append(buffer,read_sz); csv_to_parquet(csv_obj); return 0; } int csv_object_to_parquet(int argc,char **argv) { srand(time(0)); std::string csv_obj; std::string expected_result; generate_rand_columns_csv(csv_obj, 128); //generate_rand_csv_datetime_to_string(csv_obj, expected_result, 10000); //generate_rand_columns_csv_with_null(csv_obj, 10000); //generate_columns_csv(csv_obj,128); //generate_rand_columns_csv_datetime(csv_obj,10000); generate_fix_columns_csv(csv_obj,128); FILE *fp = fopen("10k.csv","w"); if(fp) { fwrite(csv_obj.data(),csv_obj.size(),1,fp); fclose(fp); } else { exit(-1); } //csv_obj="1,2,3,4,5,6,7,8,9,10\n10,20,30,40,50,60,70,80,90,100\n"; csv_obj="1,2,3,4\n"; csv_to_parquet(csv_obj); return 0; } int main(int argc,char **argv) { return csv_file_to_parquet(argc,argv); }