Amazon Redshift
AWS Redshift is a data warehousing solution by AWS. It has an easy clustering and ingestion mechanism ideal for loading large log files and then searching through them with SQL.
System architecture
- Shared nothing MPP architecture. The architecture is similar to the other MPP data warehouse systems such as Netezza, MS PDW, Greenplum etc. Below is how the parallel processing be possible:
- Cluster consists of a leader node and 1 or more compute nodes.
- The leader node receives queries from client applications, parses the queries, and develops query execution plans. And it then coordinates the parallel execution of these plans with the compute nodes, aggregates the intermediate results from these nodes, and finally returns the results back to the client applications (ie. parallel processing).
- On a single-node cluster, the node is shared for leader and compute functionality. On a multi-node cluster, the leader node is separate from the compute nodes.
- A fully-managed petabyte-scalable systems
- It is written in standard SQL based off PostgreSQL 8.0.2.
- Throughput: Within the Amazon Redshift system, each column of a table is stored in data blocks with the goal of reducing I/O so only relevant data is retrieved from disks. Amazon Redshift is able reduce I/O through:
- Columnar data storage reduces the overall disk I/O and the amount of data required from disk to handle an analytic query. Since most BI queries require aggregation of data on each column, columnar storage is ideal when matched with the parallel processing architecture of Redshift.
- An additional benefit of columnar storage is the ability to leverage data compression. This technique is more effective in column-oriented data than it is with row-oriented storage solutions and can significantly reduce the amount of data loaded from disk during a query. Redshift supports many popular compression encodings from Byte Dictionary to LZO and will automatically apply the optimal compression to the data when loaded.
- Zone maps: Each column is divided into 1MB data blocks; Redshift stores the min/max values of each block in memory and is able to identify the blocks that are required for a query and skips the blocks that don’t contain the data. This will minimize the unnecessarily IO.
- Direct-attached storage and large block sizes (1MB) that enable fast I/O
- Backup: S3
- Concurrency: Amazon’s documentation recommends only running 15 – 25 queries at a time for optimal throughput.
- In 2013, Airbnb compared Amazon Redshift to Hive for their interactive analytics. They found that queries of billions of rows were 5 times faster in Redshift and queries over millions of rows were 20 times faster.
Ingestion
- Load your compressed files to S3 (ie. GZIP and LZO). According to the tests, best file format is csv and best compression algorithm is lzo.
- Use COPY command to load those files in parallel to Amazon Redshift. DO NOT run multiple COPY commands at the same time
- When you load all the data from a single large file, Amazon Redshift is forced to perform a serialized load, which is much slower. Split your load data files so that the files are about equal size, between 1 MB and 1 GB after compression. The number of files should be a multiple of the number of slices in your cluster.
Schema Design
- Deletions of a large amount of data is not a trivial task with Amazon Redshift. To make it easy, you can create one new table for each month/ week. By doing so, you can delete your data by executing a “DROP TABLE” command.
- Amazon Redshift is a Distributed Data Warehouse, so data is spread across different nodes. It is possible to change the way data distribute. You should always strive for maximum data locality if possible. For example, data from the same event/ users should be stored in the same node.
- Definition of sort keys may impact the performance of your queries. Sort keys are also used by the query optimizer to come up with optimal query plans. If you work on time-dependent data, a sort key of a timestamp column should exist.
1 2 3 4 5 |
CREATE TABLE events ( Properties_time timestamp sortkey event varchar(200) sortkey distkey ) diststyle key; |
Choose right distributed and sort keys
- Decide the sortkey and distkey
- Choose the correct data distribution style to avoid data hot spot. The default is EVEN (round-robin) distribution which may not be the best choice depending on the size of tables being joined.
- Collocate data from joined tables as much as possible to avoid data broadcasting
Analyze table after load
In Amazon Redshift, data blocks are immutable. When rows are DELETED or UPDATED, they are simply logically deleted (flagged for deletion) but not physically removed from disk. Updates result in a new block being written with new data appended. Both of these operations cause the previous version of the row to continue consuming disk space and continue being scanned when a query scans the table. As a result, table storage space is increased and performance degraded due to otherwise avoidable disk I/O during scans. A VACUUM command recovers the space from deleted rows and restores the sort order.
Benchmark
In term of pricing point comparison, 4x dc2.large (0.16TB SSD) costs 0.25×4 = $1/hr that is similar to 1x dc1.xlarge (2TB HDD). But the overall performance for the SSD option is like 10x faster.
Steps to setup
Create Security Group
The reason we need to do that as default security group has no access to Redshift cluster
- Give a name for the new security group (eg. demo)
- Assign a connection type: EC2 vs IP
Use AWS CLI to launch a new cluster
- If a cluster is provisioned with two or more compute nodes, an additional leader node coordinates the compute nodes and handles external communication.
- Give you cluster a name and pick the region you want to create your cluster on. Launch the Redshift cluster in the same AWS region as the S3 bucket to improve load performance of the COPY command.
- Decide what node type and number of compute nodes you want in your new cluster.
- Once cluster is created, you can see the jdbc connection string and you will use it to connect your jdbc client to it.
- Thru your sql client interface, you can create table and load data (using COPY) to the tables you created.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
pip install awscli # create redshift cluster ~$ aws redshift create-cluster\ --cluster-identifier logdata \ --cluster-type single-node \ --node-type dw1.xlarge \ --master-username luke \ --master-user-password password \ --cluster-security-groups demo # check status and endpoint ~$ aws redshift describe-clusters --cluster-identifier logdata # mysql client from localhost to endpoint ~$ psql -h your-endpoint.us-east-1.redshift.amazonaws.com -p 5439 -U luke -W |
- If you didn’t specify the encoding on the table schema, Redshift will figure out it for you. To check what encoding it picks, you can run the following SQL:
1 2 3 4 5 6 |
select "column", type, encoding from pg_table_def where tablename="listing" # determine the better compression analyze compression listing |
- If you get any error during COPY, you can check the error using:
1 2 |
SELECT * FROM stl_load_errors; |
- http://techblog.thescore.com/2014/05/14/analyzing-s3-and-cloudfront-access-logs-with-redshift/
- https://www.pythian.com/blog/aws-redshift-cluster-sizing/
create table
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
CREATE TABLE s3_logentries ( creator VARCHAR(MAX) ENCODE RUNLENGTH, bucket VARCHAR(255) ENCODE LZO, logdate VARCHAR(MAX) SORTKEY ENCODE LZO, logtime VARCHAR(MAX) ENCODE LZO, cip VARCHAR(50) ENCODE LZO, requestor VARCHAR(MAX) ENCODE LZO, requestid VARCHAR(MAX) ENCODE LZO, operation VARCHAR(MAX) ENCODE LZO, requestkey VARCHAR(MAX) ENCODE LZO, uri VARCHAR(MAX) DISTKEY ENCODE LZO, status VARCHAR(MAX) ENCODE LZO, errorcode VARCHAR(MAX) ENCODE LZO, bytessent VARCHAR(MAX) ENCODE LZO, objectsize VARCHAR(MAX) ENCODE LZO, totaltime VARCHAR(MAX) ENCODE LZO, turnaroundtime VARCHAR(MAX) ENCODE LZO, referer VARCHAR(MAX) ENCODE LZO, useragent VARCHAR(MAX) ENCODE LZO, versionid VARCHAR(10) ENCODE LZO ); CREATE TABLE cf_logentries ( logdate VARCHAR(MAX) SORTKEY, logtime VARCHAR(MAX) ENCODE LZO, edge VARCHAR(40) ENCODE LZO, bytessent INT ENCODE LZO, cip VARCHAR(50) ENCODE LZO, method VARCHAR(50) ENCODE LZO, host VARCHAR(MAX) ENCODE LZO, uri VARCHAR(MAX) DISTKEY ENCODE LZO, status VARCHAR(20) ENCODE LZO, creferrer VARCHAR(MAX) ENCODE LZO, useragent VARCHAR(MAX) ENCODE LZO, cs_uri_query VARCHAR(MAX) ENCODE LZO, cookie VARCHAR(MAX) ENCODE LZO, x_edge_result_type VARCHAR(MAX) ENCODE LZO, x_edge_request_id VARCHAR(MAX) ENCODE LZO, x_host_header VARCHAR(MAX) ENCODE LZO, protocol VARCHAR(10) ENCODE LZO, cs_bytes INT ENCODE LZO, time_taken VARCHAR(MAX) ENCODE LZO ); |
load some data
- We’re loading standard, tab-delimited gzip-compressed log files with the headers ignored. Additionally we’re using the FILLRECORD parameter. This causes the import process to pad out missing records at the end of a line with NULL values. This is pretty useful as periodically the CloudFront and S3 log formats change, and they add new columns. Finally, we’re telling the command to abort the copy if 200 errors are found.
1 2 3 4 5 |
COPY cf_logentries FROM 's3://cloudfront-logs/E1DHT7QI9H0ZOB.2014-04-' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER '\t' MAXERROR 200 FILLRECORD IGNOREHEADER 2 gzip; |
vaccum and analyze
Whenever you add, delete, or modify a significant number of rows, you should run a VACUUM command and then an ANALYZE command. A vacuum recovers the space from deleted rows and restores the sort order. The ANALYZE command updates the statistics metadata, which enables the query optimizer to generate more accurate query plans.
1 2 3 |
vacuum; analyze; |
running queries
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
SELECT uri, count(*) as hits FROM cf_logentries GROUP BY uri ORDER BY hits DESC; # group by day SELECT COUNT(*), LEFT(logdate, 12) as day FROM cf_logentries GROUP BY day ORDER BY day; # cache hit ratio WITH cache_hits AS ( SELECT uri, cs_uri_query, COUNT(*) AS hits FROM cf_logentries WHERE x_edge_result_type IN ('Hit', 'RefreshHit') GROUP BY uri, cs_uri_query ), top_uris AS ( SELECT uri, cs_uri_query, count(*) AS total FROM cf_logentries GROUP BY uri, cs_uri_query ORDER BY total DESC LIMIT 1000 ) SELECT top_uris.uri AS uri, LEFT(top_uris.cs_uri_query,20) AS qs, top_uris.total AS requests, cache_hits.hits AS cachehits, LEFT(((cachehits::float / requests) * 100), 5) AS hitrate FROM top_uris JOIN cache_hits ON top_uris.uri = cache_hits.uri AND top_uris.cs_uri_query = cache_hits.cs_uri_query ORDER BY top_uris.total DESC; |
References
Industry
- http://www.slideshare.net/Hapyrus/amazon-redshift-ssd-queries-for-tbs-of
- https://amplitude.com/blog/2015/03/27/why-we-chose-redshift/
- https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-techniques-for-amazon-redshift/
- https://blog.lodr.io/going-serverless-5x-faster-100x-cheaper-2b7db8c37405#.lfi4te651
Ruby script to setup cluster and load data to it
- https://blog.codeship.com/long-term-log-analysis-with-aws-redshift/
- https://github.com/flomotlik/redshift-logging/blob/master/setup.rb
Spark with Redshift as datasource
- https://github.com/databricks/spark-redshift
Tools
- https://github.com/databricks/spark-redshift
- https://www.lodr.io/
- https://github.com/awslabs/aws-lambda-redshift-loader
- https://vimeo.com/144855636
- https://github.com/embulk/embulk
Connect with us