2 回答

TA貢獻(xiàn)1803條經(jīng)驗(yàn) 獲得超3個(gè)贊
是的,可以使用 Spark 監(jiān)聽 TCP 端口并處理任何傳入數(shù)據(jù)。您正在尋找的是Spark Streaming。
為了方便:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start();? ? ? ? ? ? ? // Start the computation
jssc.awaitTermination();? ?// Wait for the computation to terminate

TA貢獻(xiàn)1813條經(jīng)驗(yàn) 獲得超2個(gè)贊
Spark沒有內(nèi)置的TCP服務(wù)器來(lái)等待生產(chǎn)者和緩沖數(shù)據(jù)。Spark 通過(guò)其 API 庫(kù)在 TCP、Kafka 等的輪詢機(jī)制上工作。要使用傳入的 TCP 數(shù)據(jù),您需要有一個(gè) Spark 可以連接到的外部 TCP 服務(wù)器,如 Shaido 在示例中所解釋的那樣。
添加回答
舉報(bào)