Contents

Workshop - 1: 👷 Batch Processing with Amazon EMR 📦


👷 Set up and build a large data processing system with Amazon EMR using Apache Spark and some other services such as Amazon S3, VPC, EC2 on 📦 Amazon Web Services (AWS)

1. 🌍 Introduction

Since launching in 2006, Amazon Web Services has been providing world-leading cloud technologies that help any organization and any individual build solutions to transform industries, communities, and lives for the better.

As part of Amazon, we strive to be Earth’s most customer-centric company. We work backwards from our customers’ problems to provide them with cloud infrastructure that meets their needs, so they can reinvent continuously and push through barriers of what people thought was possible.

Whether they are entrepreneurs launching new businesses, established companies reinventing themselves, non-profits working to advance their missions, or governments and cities seeking to serve their citizens more effectively—our customers trust AWS with their livelihoods, their goals, their ideas, and their data.

1.1. Amazon VPC

Amazon Virtual Private Cloud (Amazon VPC) is a Virtual Private Cloud—a customized virtual network hosted within the AWS Cloud and isolated from the external world. This concept resembles the design and implementation of a distinct standalone network environment in an on-premise data center, a practice still widely employed across many countries.

Within this dedicated VPC, users possess comprehensive control over their virtual network environment. This control encompasses the initiation and operation of AWS resources, the ability to choose IP address ranges, establish network subnets, and configure routing tables and network gateways. Secure and convenient resource and application access within the VPC is facilitated through both IPv4 and IPv6 protocols.

The term “Region” refers to vast clusters of AWS data centers situated within specific territories. Multiple VPCs can be established within a single region, with each VPC differentiated by its unique IP address space range. The IPv4 address range is defined by selecting a Classless Inter-Domain Routing (CIDR) notation, such as 10.0.0.0/16. Once created, the Amazon VPC address range remains immutable. These ranges can span from as extensive as /16 (equivalent to 65536 available addresses) to as limited as /28 (allowing for 16 available addresses). Crucially, these ranges must not overlap with any other connected networks.

The Amazon VPC service was introduced subsequent to the launch of Amazon EC2. Consequently, AWS provided two distinct networking platforms for a period: EC2-Classic and EC2-VPC. EC2-Classic established a single flat network where all Amazon EC2 instances operated, enabling shared connectivity among AWS clients. However, as of December 2013, AWS exclusively supports EC2-VPC. Each region includes a default VPC along with a default subnet featuring a CIDR block of 172.31.0.0/16.

1.1.1. Subnets

A subnet is a segment of the IP address range that you use when provisioning your Amazon VPC. It directly provides the active network range to the AWS resources that may run within it, such as Amazon EC2 and Amazon RDS (Amazon Relational Database Service). Subnets are identified through CIDR blocks (e.g., 10.0.1.0/24 and 192.168.0.0/24), and the subnet’s CIDRs must be within the VPC’s CIDR. The smallest subnet that can be created is /28 (16 IP addresses). AWS reserves the first 4 IP addresses and the last 1 IP address of each subnet for intranet networking purposes. For example, a /28 subnet has 16 available IP addresses, but 5 reserved IPs are used for AWS, leaving 11 usable IP addresses for resources operating within this subnet.

An Availability Zone (AZ) is a single or multi-data center located within a Region and identified based on geographical location. Within an AZ, there can be one or more subnets. However, a subnet can only reside in a single AZ and cannot extend to other AZs.

Subnets are categorized as:

  • Public subnet: This subnet has a route table (discussed later) that directs traffic within the subnet to the VPC’s Internet Gateway (IGW) (discussed later).
  • Private subnet: The opposite of a Public subnet, it lacks a route table directing traffic to the VPC’s IGW.
  • VPN-only subnet: This subnet has a route table that directs traffic to Amazon VPC’s Virtual Private Gateway (VPG) (discussed later).

Regardless of the subnet type, the subnet’s internal IP address range is always private. This means that it is not possible to directly connect to addresses within this range from outside the Internet.

1.1.2. Route Table

The Route Table, also referred to as the routing table, is responsible for providing routing instructions within a network and is associated with specific subnets.

For instance, in the scenario where a Virtual Private Cloud (VPC) is established with the network layer 10.10.0.0/16, along with two subnets, 10.10.1.0/24 and 10.10.2.0/24, each default subnet will be allocated a default route table.

Inside the route table, there will exist a route entry with the following details:

  • Destination: 10.10.0.0/16
  • Target: local

This particular route entry signifies that resources created within the same VPC can communicate with each other.

1.1.3.Internet Gateway

The Internet Gateway (IGW) is a crucial component of Amazon VPC that facilitates communication between resources within the VPC, specifically EC2 instances, and the Internet. The IGW exhibits robust horizontal scalability, along with high levels of redundancy and availability. It operates as a designated target within the Amazon VPC’s routing table, playing a vital role in directing traffic from EC2 instances to the external Internet. This process involves translating the network address of the EC2 instance into its corresponding Public IP address.

More precisely, the EC2 instances located within the VPC are only aware of their assigned Private IP addresses. However, when there is a need to transmit traffic from these EC2 instances to the Internet, the IGW intervenes by translating the originating Private IP address into the Public IP address (or Elastic IP addresses, as discussed later) assigned to the respective EC2 instance. This translation is upheld through a one-to-one mapping, which persists until the Public IP address is released.

Conversely, when the EC2 instances receive incoming traffic from the Internet, the IGW undertakes the task of translating the target address (Public IP address) into the corresponding Private IP address of the EC2 instance. Subsequently, the IGW forwards this traffic into the Amazon VPC.

1.1.4. Gateway Endpoint

Gateway VPC endpoints provide reliable connectivity to Amazon S3 and DynamoDB without requiring an internet gateway or a NAT device for your VPC. Gateway endpoints do not use AWS PrivateLink, unlike other types of VPC endpoints.

Amazon S3 and DynamoDB support both gateway endpoints and interface endpoints. For a comparison of the options, see the following:

  • Types of VPC endpoints for Amazon S3
  • Types of VPC endpoints for Amazon DynamoDB

Pricing: There is no additional charge for using gateway endpoints.

You can access Amazon S3 and DynamoDB through their public service endpoints or through gateway endpoints. This overview compares these methods.

Access through an internet gateway

The following diagram shows how instances access Amazon S3 and DynamoDB through their public service endpoints. Traffic to Amazon S3 or DynamoDB from an instance in a public subnet is routed to the internet gateway for the VPC and then to the service. Instances in a private subnet can’t send traffic to Amazon S3 or DynamoDB, because by definition private subnets do not have routes to an internet gateway. To enable instances in the private subnet to send traffic to Amazon S3 or DynamoDB, you would add a NAT device to the public subnet and route traffic in the private subnet to the NAT device. While traffic to Amazon S3 or DynamoDB traverses the internet gateway, it does not leave the AWS network.

Access through a gateway endpoint

The following diagram shows how instances access Amazon S3 and DynamoDB through a gateway endpoint. Traffic from your VPC to Amazon S3 or DynamoDB is routed to the gateway endpoint. Each subnet route table must have a route that sends traffic destined for the service to the gateway endpoint using the prefix list for the service. For more information, see AWS-managed prefix lists in the Amazon VPC User Guide.

Routing

When you create a gateway endpoint, you select the VPC route tables for the subnets that you enable. The following route is automatically added to each route table that you select. The destination is a prefix list for the service owned by AWS and the target is the gateway endpoint.

DestinationTarget
prefix_list_idgateway_endpoint_id

1.1.5. Security Group

  • Basic Features of Security Group

    • Allow Rules Only: Only Allow rules can be added; Deny rules cannot be added.
    • Separate Rules for Traffic: Separate rules can be specified for outgoing and incoming traffic.
    • Initial Inbound Rules: A newly created Security group starts with no Inbound rules. Initially, the instance won’t allow any traffic in, requiring the addition of an Inbound rule to enable access.
    • Default Outbound Rule: By default, the Security group includes an Outbound rule that permits all traffic to leave the instance. This rule can be modified or replaced with specific Outbound rules to control outgoing traffic originating from the instance. If there’s no Outbound rule, no traffic is allowed to exit the instance.
    • Stateful Service: Security groups are stateful services, meaning that if incoming traffic is allowed, outgoing traffic is automatically permitted, and vice versa, regardless of the Outbound rule.
    • Instance Communication: Instances can communicate only if they are associated with a Security group that permits connections, or if a Security group associated with the instance contains a rule allowing traffic. The default Security group has default rules allowing all traffic.
    • Association with Network Interfaces: Security groups are associated with network interfaces. After initialization, you can change the Security group assigned to an instance, which will also update the Security group for the corresponding primary network interface.
  • Security Group Rule

    • A Security group rule is created to grant access to traffic entering or leaving an instance. This access can apply to a specific CIDR or to a Security group in the same VPC, or even to a Security group in another VPC connected by peering.
  • Components of Security Group Rule

    • Inbound Rules: These include the source of the traffic and the destination port or port range. The source can be another security group, an IPv4 or IPv6 CIDR range, or an IPv4/IPv6 address.
    • Outbound Rules: These include the destination of the traffic and the destination port or port range. The destination can be another security group, an IPv4 or IPv6 CIDR range, an IPv4/IPv6 address, or a service identified by a prefix (e.g. igw_xxx) in the prefix ID list (where services are identified by the prefix ID - the name and ID of the available service in the region).
    • Standard Protocols: Each protocol has a standard protocol number associated with it. For instance, SSH is associated with port number 22.

1.2. Amazon S3

Amazon Simple Storage Service (Amazon S3) is an object storage service that provides on-demand scalability, ensuring high levels of data availability, security, and performance. best.

S3 is built to meet the needs of customers of all sizes and industries who can use this service to store and protect any amount of data.

S3 can be used for many use cases such as dataware house, websites, mobile apps, backup and restore, storage, enterprise applications, IoT devices, and data analytics great. In addition, Amazon S3 provides easy-to-use management features, so you can organize your data and configure access controls to meet the specific requirements of your business, organization, and requirements. compliance requirements.

Amazon S3 is designed for 99.999999999% (11 9’s) endurance and stores the data of millions of applications for companies worldwide.

1.3. Amazon EMR

1.4. Amazon EC2

  • Amazon EC2 functions similarly to a conventional physical or virtual server, offering rapid initialization, robust resource scalability, and versatile flexibility.
  • Virtual server: Divides a physical server into multiple virtual servers to optimize resource utilization.
  • Amazon EC2 supports a variety of workloads, including web hosting, applications, databases, authentication services, and other tasks typically handled by a standard server.

Amazon Elastic Compute Cloud (Amazon EC2) provides on-demand, scalable computing capacity within the Amazon Web Services (AWS) Cloud. Utilizing Amazon EC2 reduces hardware expenses, enabling faster application development and deployment. It empowers you to launch as many or as few virtual servers as needed, configure security and networking settings, and manage storage resources. Capacity can be increased (scale up) to manage compute-intensive tasks such as monthly or yearly processes or spikes in website traffic. Conversely, when usage subsides, capacity can be reduced (scale down).

The following diagram depicts a fundamental architecture of an Amazon EC2 instance deployed within an Amazon Virtual Private Cloud (VPC) within a specific Availability Zone in the Region. The EC2 instance’s security is governed by a security group, acting as a virtual firewall controlling inbound and outbound traffic. The instance employs a key pair—consisting of a private key stored locally and a public key stored on the instance—to verify the user’s identity. In this setup, the instance is supported by an Amazon EBS volume.

Configuration of Amazon EC2 is obligatory, with configuration determined by the chosen EC2 instance types.

Instance type influences the following attributes:

  • CPU (Intel / AMD / ARM (Graviton 1/2/3) / GPU)
  • Memory
  • Network
  • Storage

AMI / Backup / Key Pair:

  • Employing AMI (Amazon Machine Image) enables the simultaneous provisioning of one or more EC2 Instances.
  • AMI options encompass those from AWS, AWS Marketplace, and custom AMIs created from existing EC2 Instances.
  • AMI includes root OS volumes; usage rights specify the associated AWS account, and mapping EBS volumes are assigned to EC2 Instances.
  • EC2 instances can be backed up via the creation of snapshots.
  • Key pairs (public key and private key) are employed to encrypt login information for EC2 Instances.

Amazon EC2 offers the following prominent features:

  • Virtual servers.
  • Preconfigured templates for your instances containing necessary components like the operating system and additional software.
  • Various configurations including CPU, memory, storage, networking capacity, and graphics hardware for your instances.
  • Secure login credentials for your instances. AWS retains the public key while you keep the private key in a secure location.
  • Storage volumes for temporary data that gets deleted upon instance stoppage, hibernation, or termination.
  • Persistent storage volumes for your data utilizing Amazon Elastic Block Store (Amazon EBS).
  • Multiple physical locations accommodating your resources such as instances and Amazon EBS volumes.
  • A virtual firewall permitting you to define protocols, ports, source IP ranges that can access your instances, and destination IP ranges your instances can connect to.
  • Static IPv4 addresses for dynamic cloud computing.
  • Metadata that you can generate and assign to your Amazon EC2 resources.
  • Virtual networks that you can establish, offering logical isolation from the wider AWS Cloud. Optionally, these virtual networks can be linked to your own network.

2. 📊 Present the problem

Collecting and processing huge amounts of data is always an issue that businesses are most concerned about because of the value it brings.

  • Rapid Data Growth: Businesses today are generating and collecting vast amounts of data from diverse sources such as customer interactions, social media, sensors, and transactional systems.
  • Big Data Opportunities and Challenges: This massive amount of data, known as “big data,” presents significant opportunities for insights but also poses challenges in handling and processing effectively.
  • Data-Driven Decision Making: Companies must process and analyze big data to make informed, data-driven decisions that improve strategy and performance.
  • Operational Optimization: Efficient data processing allows businesses to streamline operations, improve efficiency, and reduce costs by identifying patterns and trends.
  • Personalization and Customer Insights: Big data helps businesses personalize customer experiences, better understand customer needs, and improve satisfaction.
  • Future Trend Prediction: Analyzing large data sets can provide predictive insights that allow businesses to anticipate future trends and make proactive decisions.
  • Technological Investments: To handle the complexity and volume of big data, companies are investing in advanced technologies like cloud computing, machine learning, and distributed data processing systems.
  • Competitive Necessity: In the digital age, the ability to harness big data is essential for businesses to stay competitive and thrive in the market.

Therefore, designing, building and managing big data processing systems is extremely important today for every business.

3. 🔦 Architecture

./AWS-DE-Diagram.drawio.png
Batch Processing with Amazon EMR - Diagram
  • 📦 Technology and Services:
    • S3
    • EMR
    • EC2
    • CloudFormation
    • Apache Spark
    • VPC
    • IAM

4. 📑 Preparation

4.1. Data Source

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail.The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

Download here: Kaggle - Online Retail Dataset

4.2. PySpark Script

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import argparse
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window



def transform_calc_data(data_source: str, output_uri: str) -> None:
    """
    Processes sample food establishment inspection data and queries the data to find 
    the top 10 establishments with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, 
    such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 
    's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    
    with SparkSession.builder.appName("emr-cluster-{}".format(
        datetime.today())).getOrCreate() as spark:
                                 
        # Load the Online Retail CSV data
        if data_source is not None:
            df:DataFrame = spark.read.csv(data_source, header=True, inferSchema=True)
        # Log into EMR stdout
        print(f"Dataset have shape: {(df.count(), df.columns)}")
        
        # Rename Columns
        col_renames = {
            'InvoiceNo': 'OrderID', 
            'StockCode': 'ProductID', 
            'InvoiceDate': 'OrderDate'
        }
        for old_name, new_name in col_renames.items():
            df = df.withColumnRenamed(old_name, new_name)
        
        # Remove spaces
        df = df.withColumn("OrderDate", F.trim(F.col("OrderDate"))) 
            
        # Change data type for column OrderDate
        DATE_FORMAT = ["M/d/yyyy H:mm", "M/d/yyyy H:mm", "M/d/yyyy H:mm"]
        df = df.withColumn(
            "OrderDate",
            F.coalesce(
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[0]),
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[1]),
                F.to_timestamp(F.col("OrderDate"), DATE_FORMAT[2])
            )
        )
        # Feature Engineering
        TIME_FORMAT = "HH:mm"
        df = df.withColumn("Day", F.dayofmonth("OrderDate"))
        df = df.withColumn("Month", F.month("OrderDate"))
        df = df.withColumn("Year", F.year("OrderDate"))
        df = df.withColumn("HourMinute", F.date_format("OrderDate", TIME_FORMAT))
            
        # Find Start value for CustomerID
        max_customer_id = df.agg(F.max("CustomerID")).collect()[0][0]
        
        # Get a list of OrderIDs with a missing (null) CustomerID value
        order_ids_with_null_customer = df.filter(F.col("CustomerID").isNull()).select("OrderID").distinct()
        
        # Create a sequence number column for OrderIDs with null CustomerID value
        window_spec = Window.orderBy("OrderID")
        order_ids_with_new_customer = order_ids_with_null_customer.withColumn(
            "new_CustomerID", F.row_number().over(window_spec) + max_customer_id)
        
        # Replace null values ​​of the CustomerID column with new values ​​based on OrderID
        df = df.join(order_ids_with_new_customer, "OrderID", "left").withColumn(
            "CustomerID", F.coalesce(F.col("CustomerID"), F.col("new_CustomerID"))
        ).drop("new_CustomerID")
        
        # Use na.fill() to replace null values ​​of the Description column
        df = df.withColumn("Description", F.lower(F.col("Description")))
        df = df.na.fill({"Description": "unknown"})
        
        # Drop NA and Duplicate
        df = df.dropna()
        df = df.dropDuplicates()
        
        # Log into EMR stdout
        print(f"Dataset have shape: {(df.count(), df.columns)}")
        
        # Write our results as parquet files
        df.write.option("header", "true").mode("overwrite").parquet(output_uri)
        
        
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--data_source', help="The URI for you Parquet online retail data, like an S3 bucket location.")
    parser.add_argument('--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    transform_calc_data(args.data_source, args.output_uri)
Tip
For beginners, Python is always the top choice because of its easy-to-understand, easy-to-learn syntax and versatility, it can be used for web or data-related work.

5. 📡 Setup Infrastructure

We can choose one of two ways to set up the infrastructure, I will guide you through both of them below.

Setup byAdvantageDisadvantages
MamualIntuitive and easy to useDifficult to manage when expanding
FlexibleCannot be reused
No deep code knowledge requiredDifficulty in version control
No automation
CloudFormationAutomation and reuseRequires in-depth knowledge
Version management and historyLacks immediate flexibility
Large scaleErrors that are difficult to detect may occur
CI/CD integration
Complex resource management

5.1. Setup with CloudFormation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285

AWSTemplateFormatVersion: "2010-09-09"
Description: Setup infrastructure for batch processing with Amazon EMR on Amazon Web Services (AWS)

Parameters:
  SubnetCidrBlock:
    Description: CidrBlock for Subnet
    Type: CommaDelimitedList
    Default: "10.10.1.0/24, 10.10.2.0/24, 10.10.3.0/24, 10.10.4.0/24"

  InstanceType:
    Description: WebServer EC2 instance type (has default, AllowedValues)
    Type: String
    Default: m5.xlarge
    AllowedValues:
      - m5.2xlarge
      - m5.xlarge
    ConstraintDescription: must be a valid EC2 instance type.

  ReleaseLabel:
    Description: A release contains a set of applications which can be installed on your cluster.
    Type: String
    Default: emr-7.2.0
    AllowedValues:
      - emr-7.2.0
      - emr-7.1.0
      - emr-7.0.0
      - emr-6.12.0
      - emr-6.13.0
      - emr-6.14.0
      - emr-6.15.0
    ConstraintDescription: ReleaseLabel is invalid

  KeyName:
    Description: Name of an existing EC2 KeyPair to enable SSH access to the instances. Linked to AWS Parameter
    Type: AWS::EC2::KeyPair::KeyName
    ConstraintDescription: must be the name of an existing EC2 KeyPair.

  MinimumCapacityUnits:
    Description: |
      The lower boundary of Amazon EC2 units. It is measured through vCPU cores or 
      instances for instance groups and measured through units for instance fleets. 
      Managed scaling activities are not allowed beyond this boundary. 
      The limit only applies to the core and task nodes. 
      The master node cannot be scaled after initial configuration.      
    Type: String
    Default: 2

  MaximumCapacityUnits:
    Description: |
      The upper boundary of Amazon EC2 units. It is measured through vCPU cores or instances for instance groups 
      and measured through units for instance fleets. Managed scaling activities are not allowed beyond this boundary. 
      The limit only applies to the core and task nodes. The master node cannot be scaled after initial configuration.      
    Type: String
    Default: 8

  MaximumCoreCapacityUnits:
    Description: |
      The upper boundary of Amazon EC2 units for core node type in a cluster. 
      It is measured through vCPU cores or instances for instance groups and measured through units for instance fleets. 
      The core units are not allowed to scale beyond this boundary. The parameter is used to split capacity allocation between core and task nodes.      
    Type: String
    Default: 3

  MaximumOnDemandCapacityUnits:
    Description: |
      The upper boundary of On-Demand Amazon EC2 units. It is measured through vCPU cores or instances for instance groups and measured through units for instance fleets. 
      The On-Demand units are not allowed to scale beyond this boundary. The parameter is used to split capacity allocation between On-Demand and Spot Instances.      
    Type: String
    Default: 5

  UnitType:
    Description: The unit type used for specifying a managed scaling policy.
    Type: String
    Default: Instances
    ConstraintDescription: is invalid

Resources:
  # Amazon S3
  s3:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: online-retail-ecommerce
      VersioningConfiguration:
        Status: Enabled
      Tags:
        - Key: name
          Value: Online Retail Ecommerce

  # Amazon VPC
  Vpc:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsSupport: true
      EnableDnsHostnames: true
      InstanceTenancy: default
      Tags:
        - Key: name
          Value: Retail Vpc

  # Public Subnet
  PublicSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref Vpc
      AvailabilityZone: ap-southeast-1a
      CidrBlock: !Select [0, !Ref SubnetCidrBlock]
      MapPublicIpOnLaunch: true
      Tags:
        - Key: name
          Value: Public Subnet 1

  # S3 Gateway Endpoint
  S3GatewayEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref Vpc
      VpcEndpointType: Gateway
      ServiceName: !Sub com.amazonaws.ap-southeast-1.s3
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: Allow-access-to-specific-bucket
            Effect: Allow
            Principal: '*'
            Action:
              - s3:GetObject
              - s3:PutObject
              - s3:DeleteObject
              - s3:ListBucket
            Resource:
              - arn:aws:s3:::online-retail-ecommerce/*
              - arn:aws:s3:::online-retail-ecommerce
      RouteTableIds:
        - !Ref RouteTable
    DependsOn: s3

  # Internet Gateway
  InternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: Internet Gateway

  # Internet Gateway Attachment
  VpcGatewayAttachment:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref Vpc
      InternetGatewayId: !Ref InternetGateway

  # Route Table
  RouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref Vpc
      Tags:
        - Key: name
          Value: Route Table Public

  # Route Table Subnet Associate
  RouteTableSubnetAssociate1:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      RouteTableId: !Ref RouteTable
      SubnetId: !Ref PublicSubnet1

  # Route
  Route:
    Type: AWS::EC2::Route
    Properties:
      RouteTableId: !Ref RouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway
    DependsOn: VpcGatewayAttachment

  # EMR Cluster
  Emr:
    Type: AWS::EMR::Cluster
    Properties:
      Name: emr-retail-cluster
      ReleaseLabel: !Ref ReleaseLabel
      Applications:
        - Name: Spark
        - Name: Livy
        - Name: Hadoop
        - Name: JupyterEnterpriseGateway
        - Name: Hive
      Instances:
        MasterInstanceGroup:
          InstanceCount: 1
          InstanceType: !Ref InstanceType
          Market: ON_DEMAND
          Name: Primary
        CoreInstanceGroup:
          InstanceCount: 1
          InstanceType: !Ref InstanceType
          Market: ON_DEMAND
          Name: Core
        TaskInstanceGroups:
          - InstanceCount: 1
            InstanceType: !Ref InstanceType
            Market: ON_DEMAND
            Name: Task-1
          - InstanceCount: 1
            InstanceType: !Ref InstanceType
            Market: ON_DEMAND
            Name: Task-2
        Ec2SubnetId: !Ref PublicSubnet1
        Ec2KeyName: !Ref KeyName
        AdditionalMasterSecurityGroups:
          - !Ref EmrSG
      ServiceRole: EMR_DefaultRole
      JobFlowRole: EMR_EC2_DefaultRole
      VisibleToAllUsers: true
      LogUri: s3://online-retail-ecommerce/logs/
      ManagedScalingPolicy:
        ComputeLimits:
          MinimumCapacityUnits: !Ref MinimumCapacityUnits
          MaximumCapacityUnits: !Ref MaximumCapacityUnits
          MaximumCoreCapacityUnits: !Ref MaximumCoreCapacityUnits
          MaximumOnDemandCapacityUnits: !Ref MaximumOnDemandCapacityUnits
          UnitType: !Ref UnitType
      # SecurityConfiguration: 
      Tags:
        - Key: name
          Value: emr retail cluster

  # Emr Security Group
  EmrSG:
    Type: AWS::EC2::SecurityGroup
    Properties:
      VpcId: !Ref Vpc
      GroupName: Emr Cluster - SG
      GroupDescription: Allow SSH and Ping for servers in the Emr Cluster
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0
      SecurityGroupEgress:
        - IpProtocol: -1
          CidrIp: 0.0.0.0/0
      Tags:
        - Key: name
          Value: Emr Cluster SG
  
  emrRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2008-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: !Ref ElasticMapReducePrincipal
            Action: sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole

  emrEc2Role:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2008-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: !Ref Ec2Principal
            Action: sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role

  emrEc2InstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Path: /
      Roles:
        - !Ref emrEc2Role

On the search bar, enter the keyword CloudFormation, then click CloudFormation Service. ./1v.png

Next we click Create Stack ./2v.png

In the Create Stack interface, select existing template, then select upload template, continue to click choose file to select the file, after uploading the correct file you want, click Next. ./3v.png

Enter the Stack name and select Instance type. ./4v.png

Select the previously created Key Pair. ./5v.png

Các thông số khác ta để nguyên và bấm Next. ./6v.png

Continue clicking Next and finally click Submit. ./7v.png ./8v.png

We can see the Services and features being initialized according to the template we specified, we can click on the refresh icon to update the status. ./9v.png

Now the Stack has been created successfully. ./10v.png

5.2. Manual Setup

5.2.1. Create S3 Bucket

First we need to select Region, Here I choose Singapore ./set_region.png

We search for the S3 service in the search bar, then click on S3 ./().png

After entering the S3 service, click Create Bucket ./(1).png

Next we enter the bucket name ./(2).png

Enable Bucket Versioning ./(3).png

Finally, click Create Bucket to initialize the bucket ./(4).png

After completing the above operations, we will create a bucket in the region Singapore (ap-southeast-1) ./(5).png

5.2.2. Upload Resources

Click on the previously created bucket, then select Create folder ./(6).png

In the Folder name field, fill in logs, specify the encryption key for the folder, then click Create folder ./(7).png

Why do we need to create a logs folder?
EMR Cluster needs a path to a location in S3 to perform logging for cluster activities such as tasks, nodes,…

We do the same with the input, output and scripts folders ./(8).png ./(9).png ./(10).png

After completion, we have 4 folders as below ./(11).png

In the input folder, we upload input data for EMR Cluster, first click upload ./(12).png

then select add files ./(13).png

then select the data file downloaded from the link above ./(14).png

After confirming that we have added the correct data file, we select upload ./(15).png

We do the same with the scripts folder, this folder will contain code that helps transform and calculate according to the needs and purposes of data use. ./(16).png

In addition, the output folder is used to contain cluster output data

5.2.3. Create VPC

In the search bar, search for the keyword VPC, then click on VPC service.

In the VPC console, click Create VPC. ./1.png

Choose VPC and More, enter a name for the VPC in the Auto-generate field, next fill in the IPv4 CIDR block with the value 10.10.0.0/16 ./2.png

Next we choose the AZ number as 1, the Public Subnet number as 1, the Private Subnet number as 0,Then we choose S3 Gateway so that the Cluster can communicate with S3 Bucket via the internal network ./3.png

Finally, click Create VPC ./4.png

Below are the features built with VPC ./5.png

5.2.3.1. Subnets

We can see that the VPC was created with a Public Subnet in AZ ap-southeast-1a with an IPv4 CIDR of 10.10.0.0/20 ./6.png ./7.png

Detailed parameters of Public Subnet ./8.png

5.2.3.2. Route Table

We can see that in addition to the default route, we have a route with Destination of 0.0.0.0/0 and Target of Internet Gateway ID, allowing services located in the Public Subnet to go out to the Internet. ./9.png

Public Subnet and Route Table are associated with each other, helping to specify routes for the public subnet ./10.png

5.2.3.3. Internet Gateway

The Internet Gateway is created and attached to the VPC we created above and routed in the route table ./11.png

5.2.3.4. VpcEnpoint

We can see that a VpcEndpoint is created under the VPC we created above with the S3 service and the Endpoint Type is of type Gateway, this helps the EMR Cluster to communicate with the S3 Bucket through the internal network. ./12.png

5.2.3.5. Security Group

A Security Group is created belonging to the VPC we created above ./13.png

We can see that Inbound rules and Outbound rules allow All traffic to enter and exit ./14.png ./15.png

Warning
Inbound and Outbound are configured with All traffic which is not a security issue, which we need to consider and handle.

5.2.4. Create KeyPair

In EC2 Console choose Key pair ./1q.png

Next, click Create Key pair ./2q.png

Enter the Key pair name, then click Create Key pair. ./3q.png

We get a new key pair. ./4q.png

5.2.5. Create and Add Roles

We will create Roles using CLI, first we need to export environment variables.

1
2
export AWS_ACCESS_KEY_ID=KJFKLSJJFKSLDJSLDJK
export AWS_SECRET_ACCESS_KEY=SHJKLSDKF+KSJDFKJSLDKJF+SJKS+LKSJLK

We run the following command to start creating EMR Roles.

1
aws emr create-default-roles

Run the command below to check the roles that have been created.

1
aws iam list-roles | grep 'EMR_DefaultRole\ | EMR_EC2_DefaultRole'

We will check again in Amazon IAM to make sure the roles have been created. In IAM console, choose Roles ./1z.png

We see that the necessary Roles have been created. ./2z.png

5.2.6. Create EMR Cluster

On the search bar, search with the keyword EMR, then click EMR service ./long.png

In the EMR console, select Create cluster ./2s.png

First we enter a name for the cluster, next we select the latest EMR Release (currently emr-7.2.0), then we select Application (Spark). ./3s.png

We select Amazon Linux release, automatically applying the latest updated version. ./4s.png

Choose Instance Type for Master Node ./5s.png

Next we choose Instance Type for Worker Node including 1 core and 1 task ./6s.png

Set the number of cores and tasks, here we leave 1 core and 1 task, then we choose VPC and Subnet for EMR Cluster. ./7s.png

We select the VPC and Subnet that were previously created for the cluster. ./8s.png ./9s.png

We specify the path to the logs directory in the s3 bucket for the cluster. ./10s.png

Choose the keypair for the cluster we created above ./11s.png

Specify the Service Role and EC2 Instance profile that we created and added above ./12s.png ./13s.png

Confirm the information and click Create Cluster. ./14s.png

At this point, the Cluster will be in Starting state, after a few minutes it will switch to waiting state (At this point it is ready to receive jobs and tasks) ./15s.png ./16s.png

6. 👷 Submit Work to EMR Cluster

When the Cluster is in the Waiting state, we will perform Submit work for the EMR Cluster by clicking on the Steps tab. ./ww.png

Next we choose Add step. ./2w.png

In the Type section, select Spark application, in the Name field, fill in the step name, and Deploy mode, select Cluster mode. ./3w.png

In Application location, point to the folder containing the PySpark script containing the calculation and data conversion logic. In the Arguments section, fill in the following arguments:

1
2
--data-source s3://online-retail-ecommerce/input/OnlineRetail.csv 
--output-uri s3://online-retail-ecommerce/output/

In step action we choose continue. Finally we click Add step. ./4w.png

After creating the step, it will now have a Pending status. You can click Refresh table to update the step’s status. ./5w.png

After 20 to 25 seconds, the step has moved to the Running state, which means the EMR Cluster is performing tasks. ./6w.png

After 50 seconds, the step has moved to the Completed state, which means the EMR Cluster has completed its assigned work. ./7w.png

7. 🔎 View Results

7.1. View Output

After EMR Cluster completes its work, we can move to the output folder to see the output results. ./8w.png

Note
The output data is divided into multiple files while the input is only 1 file. This is because Spark automatically splits the data to perform parallel calculations and processing, helping to increase performance when working with data. big material.

7.2. View Data

To check if the data has actually been processed, you can download one of the files in the output folder, by checking the file you want to download and clicking download. After downloading you can view the data. This is before and after the data is processed. ./data-2.png ./data-1.png

7.2. View Logs

We have specified the Logs location for EMR Cluster, so it will perform Logs during data processing. ./9w.png

8. 🗑 Clean Up

8.1. Terminate EMR CLuster

In the EMR Console, we select the cluster to terminate, then we click Terminate. ./10w.png ./11w.png

The cluster is now in Terminating state. ./12w.png

The status changes to Terminated, meaning the cluster has successfully terminated. ./1t.png

8.2. Delete VPC

Trong Tab Your VPCs ta chọn VPC cần xóa, sau đó chọn Delete VPC. ./2t.png

We can see that when we delete a VPC, the features created with it will also be deleted, this helps clean up resources faster, features include Internet Gateway, Route Table, Security Group, Subnet, VpcEnpoint. Then we enter delete to confirm deleting the VPC, finally click Delete. ./3t.png

8.3. Delete S3 Bucket

Select the Bucket you want to delete, then click Empty to delete the bucket’s contents. ./4t.png

Enter permanently delete to confirm deletion of the content, then click Empty. ./5t.png

We re-select the Bucket that just deleted its contents and click Delete. ./6t.png

Enter the bucket name to confirm deletion then click Delete bucket. ./7t.png