自己定义一个分区类,需要自己实现一个Partitione接口 !(泛型最好写上)
import org.apache.flink.api.common.functions.Partitioner
public class MyPartition implements Partitioner<Long> {
public int partition(Long key, int numPartitions) {
System.out.println("分区总数"+numPartitions);
if(key % 2== 0){
return 0;
}else {
return 1;
}
}
}
具体实现类:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamingDemoWithMyPartition {
/**
* 使用自定义分区
* 根据数字的奇偶性来进行分区
*
* @param args
*/
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());
// 对数据进行转换,把long类型的转换成tuple类型
DataStream<Tuple1<Long>> tupData = text.map(new MapFunction<Long, Tuple1<Long>>() {
public Tuple1<Long> map(Long aLong) throws Exception {
return new Tuple1<Long>(aLong);
}
});
//分区之后的数据
DataStream<Tuple1<Long>> partData = tupData.partitionCustom(new MyPartition(), 0);
SingleOutputStreamOperator<Long> maps = partData.map(new MapFunction<Tuple1<Long>, Long>() {
public Long map(Tuple1<Long> value) throws Exception {
System.out.println("获取当前线程id" + Thread.currentThread().getId() + ",value" + value);
return value.getField(0);
}
});
maps.print().setParallelism(1);
env.execute("own definite partiotn");
/**
* 结果为:
* 分区总数8
* 获取当前线程id77,value(1)
* 1
* 分区总数8
* 获取当前线程id81,value(2)
* 2
* 分区总数8
* 获取当前线程id77,value(3)
* 3
* 分区总数8
* 获取当前线程id81,value(4)
* 4
* 分区总数8
* 获取当前线程id77,value(5)
* 5
* 分区总数8
* 获取当前线程id81,value(6)
* 6
* 分区总数8
* 获取当前线程id77,value(7)
* 7
* 分区总数8
* 获取当前线程id81,value(8)
* 8
*/
}
}
scala代码实现案例:
package streaming
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamingDemoWithMyPartition {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
import org.apache.flink.api.scala._
val text = env.addSource(new MyNoParallelScala)
val tupdata = text.map(line=>{Tuple1(line)})
val partdata = tupdata.partitionCustom(new MyPartition,0)
val result = partdata.map(line => {
println("当前线程id" + Thread.currentThread().getId + ",value" + line)
line._1
})
result.print().setParallelism(1)
env.execute("StreamingDemoWithMyPartition")
}
class MyPartition extends Partitioner[Long]{
override def partition(key: Long, numpartition: Int): Int = {
System.out.println("总的分区数"+numpartition)
if (key%2==0){
0
}else{
1
}
}
}
}
版权声明:本文为weixin_39150719原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。