pig--> seema acharya Book content
Pig material click here to download
data sets :
https://www.ncei.noaa.gov/access/crn/qcdatasets.html
https://www.ncei.noaa.gov/pub/data/uscrn/products/monthly01/
weather data mining code
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
public class MyMaxMin {
public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, Text>
{
public static final int MISSING=9999;
public void map(LongWritable arg0, Text Value, Context context) throws IOException, InterruptedException
{
String line = Value.toString();
if (!(line.length() == 0))
{
String date = line.substring(6, 14);
float temp_Min = Float.parseFloat(line.substring(39, 45).trim());
float temp_Max = Float.parseFloat(line.substring(47, 53).trim());
if (temp_Max > 35.0 && temp_Max!=MISSING)
{
context.write(new Text("Hot Day " + date),new Text(String.valueOf(temp_Max)));
}
if (temp_Min < 10 && temp_Max!=MISSING)
{
context.write(new Text("Cold Day " + date),new Text(String.valueOf(temp_Min)));
}
}
}
}
public static class MaxTemperatureReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text Key, Iterator<Text> Values, Context context)throws IOException, InterruptedException
{
String temperature = Values.next().toString();
context.write(Key, new Text(temperature));
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "weather example");
job.setJarByClass(MyMaxMin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path OutputPath = new Path(args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MatrixDriver
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
// M is an m-by-n matrix; N is an n-by-p matrix.
conf.set("m", "2");
conf.set("n", "2");
conf.set("p", "2");
Job job = Job.getInstance(conf, "MatrixMultiplication");
job.setJarByClass(MatrixDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.submit();
}
}
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class MatrixReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
String[] value;
HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
for (Text val : values)
{
value = val.toString().split(",");
if (value[0].equals("M"))
{
hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
}
else
{
hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
}
}
int n = Integer.parseInt(context.getConfiguration().get("n")); float result = 0.0f;
float a_ij; float b_jk;
for (int j = 0; j < n; j++)
{
a_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
b_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
result += a_ij * b_jk;
}
if (result != 0.0f)
{
context.write(null, new Text(key.toString() + "," + Float.toString(result)));
}
}
}
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class MatrixMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
int m = Integer.parseInt(conf.get("m"));
int p = Integer.parseInt(conf.get("p"));
String line = value.toString();
String[] indicesAndValue = line.split(",");
Text outputKey = new Text();
Text outputValue = new Text();
if (indicesAndValue[0].equals("M"))
{
for (int k = 0; k < p; k++)
{
outputKey.set(indicesAndValue[1] + "," + k);
outputValue.set("M," + indicesAndValue[2] + "," + indicesAndValue[3]);
context.write(outputKey, outputValue);
}
}
else
{
for (int i = 0; i < m; i++)
{
outputKey.set(i + "," + indicesAndValue[2]);
outputValue.set("N," + indicesAndValue[1] + "," + indicesAndValue[3]); context.write(outputKey, outputValue);
}
}
}
}
source code files
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
DBMS class materials click here to join the class room click here https://www.youtube.com/channel/UC93Sqlk_tv9A9cFv-QjZFAQ