当前位置: 首页 > >

Flinkʵս°¸Àý£¨ËÄÊ®°Ë£©: Operators£¨°Ë£©¶àÁ÷ת»»Ëã×Ó£¨Èý£©coGroup

发布时间:

ÉùÃ÷£º±¾ÏµÁв©¿ÍÊǸù¾ÝSGGµÄÊÓƵÕûÀí¶ø³É£¬·Ç³£Êʺϴó¼ÒÈëÃÅѧ*¡£


¡¶2021Äê×îаæ´óÊý¾ÝÃæÊÔÌâÈ«Ã濪Æô¸üС·


CoGroup


¸Ã²Ù×÷Êǽ«Á½¸öÊý¾ÝÁ÷/¼¯º*´ÕÕkey½øÐÐgroup£¬È»ºó½«ÏàͬkeyµÄÊý¾Ý½øÐд¦Àí£¬µ«ÊÇËüºÍjoin²Ù×÷ÉÔÓÐÇø±ð£¬ËüÔÚÒ»¸öÁ÷/Êý¾Ý¼¯ÖÐûÓÐÕÒµ½ÓëÁíÒ»¸öÆ¥ÅäµÄÊý¾Ý»¹ÊÇ»áÊä³ö¡£


1.ÔÚDataStreamÖÐ


    ²àÖØÓëgroup£¬¶Ôͬһ¸ökeyÉϵÄÁ½×鼯ºÏ½øÐвÙ×÷¡£Èç¹ûÔÚÒ»¸öÁ÷ÖÐûÓÐÕÒµ½ÓëÁíÒ»¸öÁ÷µÄwindowÖÐÆ¥ÅäµÄÊý¾Ý£¬ÈκÎÊä³ö½á¹û£¬¼´Ö»Êä³öÒ»¸öÁ÷µÄÊý¾Ý¡£½öÄÜʹÓÃÔÚwindowÖС£

ʵÀýÒ»£º


ÏÂÃæ¿´Ò»¸ö¼òµ¥µÄÀý×Ó£¬Õâ¸öÀý×ÓÖдÓÁ½¸ö²»Í¬µÄ¶Ë¿ÚÀ´¶ÁÈ¡Êý¾Ý£¬Ä£ÄâÁ½¸öÁ÷£¬ÎÒÃÇʹÓÃCoGroupÀ´´¦ÀíÕâÁ½¸öÊý¾ÝÁ÷£¬¹Û²ìÊä³ö½á¹û£º



public class CogroupFunctionDemo02 {


public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

DataStream> input1=env.socketTextStream("192.168.217.110",9002)
.map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});

DataStream> input2=env.socketTextStream("192.168.217.110",9001)
.map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});

input1.coGroup(input2)
.where(new KeySelector, Object>() {

@Override
public Object getKey(Tuple2 value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector, Object>() {

@Override
public Object getKey(Tuple2 value) throws Exception {
return value.f0;
}
}).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
.trigger(CountTrigger.of(1))
.apply(new CoGroupFunction, Tuple2, Object>() {

@Override
public void coGroup(Iterable> iterable, Iterable> iterable1, Collector collector) throws Exception {
StringBuffer buffer=new StringBuffer();
buffer.append("DataStream frist:
");
for(Tuple2 value:iterable){
buffer.append(value.f0+"=>"+value.f1+"
");
}
buffer.append("DataStream second:
");
for(Tuple2 value:iterable1){
buffer.append(value.f0+"=>"+value.f1+"
");
}
collector.collect(buffer.toString());
}
}).print();

env.execute();
}
}


Ê×ÏÈÆô¶¯Á½¸öÖն˴°¿Ú£¬È»ºóʹÓÃnc¹¤¾ß´ò¿ªÁ½¸ö¶Ë¿Ú£¬È»ºóÔËÐÐÉÏÃæ³ÌÐò£º


[shinelon@hadoop-senior Desktop]$ nc -lk 9001
1 lj
1 al
2 af

[shinelon@hadoop-senior Desktop]$ nc -lk 9002
2 ac
1 ao
2 14

ÔËÐÐ ½á¹ûÈçÏÂËùʾ£º



2> DataStream frist:
2=>ac
DataStream second:

4> DataStream frist:
DataStream second:
1=>lj

4> DataStream frist:
1=>ao
DataStream second:

4> DataStream frist:
DataStream second:
1=>al

2> DataStream frist:
2=>14
DataStream second:

2> DataStream frist:
2=>14
DataStream second:
2=>af


2.ÔÚDataSetÖÐ
ÏÂÃæµÄÀý×ÓÖУ¬key´ú±íѧÉú°à¼¶ID£¬valueΪѧÉúname£¬Ê¹ÓÃcogroup²Ù×÷½«Á½¸ö¼¯ºÏÖÐkeyÏàͬÊý¾ÝºÏ²¢£º



public class CoGourpDemo {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

DataSet> source1=env.fromElements(
Tuple2.of(1L,"xiaoming"),
Tuple2.of(2L,"xiaowang"));

DataSet> source2=env.fromElements(
Tuple2.of(2L,"xiaoli"),
Tuple2.of(1L,"shinelon"),
Tuple2.of(3L,"hhhhhh"));

source1.coGroup(source2)
.where(0).equalTo(0)
.with(new CoGroupFunction, Tuple2, Object>() {

@Override
public void coGroup(Iterable> iterable,
Iterable> iterable1, Collector collector) throws Exception {
Map map=new HashMap();
for(Tuple2 tuple:iterable){
String str=map.get(tuple.f0);
if(str==null){
map.put(tuple.f0,tuple.f1);
}else{
if(!str.equals(tuple.f1))
map.put(tuple.f0,str+" "+tuple.f1);
}
}

for(Tuple2 tuple:iterable1){
String str=map.get(tuple.f0);
if(str==null){
map.put(tuple.f0,tuple.f1);
}else{
if(!str.equals(tuple.f1))
map.put(tuple.f0,str+" "+tuple.f1);
}
}
collector.collect(map);
}
}).print();

}
}


ÔËÐнá¹ûÈçÏÂËùʾ£º


{3=hhhhhh}
{1=xiaoming shinelon}
{2=xiaowang xiaoli}

ʵÀý¶þ£º



case class Order(id:String, gdsId:String, amount:Double)

case class Gds(id:String, name:String)

case class RsInfo(orderId:String, gdsId:String, amount:Double, gdsName:String)

object CoGroupDemo{

def main(args:Array[String]):Unit={

val env =StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val kafkaConfig =newProperties();

kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

val orderConsumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema, kafkaConfig)

val gdsConsumer =newFlinkKafkaConsumer011[String]("topic2",newSimpleStringSchema, kafkaConfig)

val orderDs = env.addSource(orderConsumer)

.map(x =>{

val a = x.split(",")

Order(a(0), a(1), a(2).toDouble)

})

val gdsDs = env.addSource(gdsConsumer)

.map(x =>{

val a = x.split(",")

Gds(a(0), a(1))

})



orderDs.coGroup(gdsDs)

.where(_.gdsId)// orderDs ÖÐÑ¡Ôñkey

.equalTo(_.id)//gdsDsÖÐÑ¡Ôñkey

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))

.apply(newCoGroupFunction[Order,Gds,RsInfo]{

overridedef coGroup(first: lang.Iterable[Order], second: lang.Iterable[Gds],out:Collector[RsInfo]):Unit={

//µÃµ½Á½¸öÁ÷ÖÐÏàͬkeyµÄ¼¯ºÏ

}

})

env.execute()

}}


´ÓÔ´Âë½Ç¶È·ÖÎöCoGropµÄʵÏÖ


    Á½¸öDataStream½øÐÐCoGroupµÃµ½µÄÊÇÒ»¸öCoGroupedStreamsÀàÐÍ£¬ºóÃæµÄwhere¡¢equalTo¡¢window¡¢applyÖ®¼äµÄһЩת»»£¬×îÖյõ½Ò»¸öWithWindowÀàÐÍ£¬°üº¬Á½¸ödataStream¡¢keyÑ¡Ôñ¡¢whereÌõ¼þ¡¢windowµÈÊôÐÔ

    Öص㣺WithWindow µÄapply·½·¨



¶ÔÁ½¸öDataStream´ò±êÇ©½øÐÐÇø·Ö£¬µÃµ½TaggedUnion,TaggedUnion°üº¬one¡¢twoÁ½¸öÊôÐÔ£¬·Ö±ð¶ÔÓ¦Á½¸öÁ÷

½«Á½¸ö´ò±êÇ©ºóµÄÁ÷TaggedUnion ½øÐÐunion²Ù×÷ºÏ²¢ÎªÒ»¸öDataStreamÀàÐÍÁ÷unionStream

unionStream¸ù¾Ý²»Í¬µÄÁ÷Ñ¡Ôñ¶ÔÓ¦where/equalToÌõ¼þ½øÐÐkeyBy µÃµ½KeyedStreamÁ÷

ͨ¹ýÖ¸¶¨µÄwindow·½Ê½µÃµ½Ò»¸öWindowedStream£¬È»ºóapplyÒ»¸ö±»CoGroupWindowFunction°ü×°Ö®ºóµÄfunction£¬ºóÐø¾ÍÊÇwindowµÄ²Ù×÷



?


?µ½ÕâÀïÒѾ­½«Ò»¸öCoGroup²Ù×÷ת»»Îªwindow²Ù×÷£¬½Ó×Å¿´ºóÐøÊÇÈçºÎ½«ÏàͬµÄkeyµÄÁ½¸öÁ÷µÄÊý¾ÝÈçºÎ×éºÏÔÚÒ»ÆðµÄ



?


?


1.? ÔÚÓû§¶¨ÒåCoGroupFunction ±»CoGroupWindowFunction°ü×°Ö®ºó£¬»á½Ó×ű»InternalIterableWindowFunction°ü×°£¬Ò»¸ö´°¿ÚÏàͬkeyµÄËùÓÐÊý¾Ý¶¼»áÔÚÒ»¸öIterableÖУ¬ »á½«Æä´«¸øCoGroupWindowFunction


2.? ÔÚCoGroupWindowFunctionÖУ¬»á½«²»Í¬Á÷µÄÊý¾ÝÇø·Ö¿ªÀ´µÃµ½Á½¸ölist,´«¸øÓû§×Ô¶¨ÒåµÄCoGroupFunctionÖÐ


相关推荐


友情链接: