الوقت قد حان لاستخدام ميزات Akka Streams للتوقف عندما يتحقق شرط معين. للقيام بذلك، يمكنك استخدام تيارات takeWhile
أو via
مع الكثير من الأدوات الممتازة التي توفرها Akka Streams.
أحد الخيارات المتاحة هو استخدام takeWhile
، الذي يأخذ عنصرًا من التيار الأصلي ما لم يتم تحقيق الشرط. على سبيل المثال، إذا كان لديك تيارًا من السطور، يمكنك استخدام takeWhile
للتوقف عندما يصبح السطر مساويًا لقيمة محددة:
-
دمج صور Docker في حاوية واحدة31/03/2024
-
أفضل لغات البرمجة لتتعلمها في 202324/10/2023
-
تجنب أخطاء نوع البيانات في Apache Flink05/03/2024
scalaimport akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream._ object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() // Define your condition val condition: String => Boolean = line => line.contains("stop") // Stream from a file val source = FileIO.fromPath(Paths.get("your-file.txt")) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)) .map(_.utf8String) // Use takeWhile to stop the stream when the condition is met val stream = source.takeWhile(!condition(_)) // Run the stream stream.runForeach(println) .onComplete(_ => system.terminate()) }
في هذا المثال، يتم إيجاد الشرط condition
في كل سطر، وإذا تحقق، سيتوقف التيار. سيستمر البرنامج في القراءة من الملف حتى يتم العثور على السطر الذي يفي بالشرط.
الطريقة الأخرى هي استخدام via
, حيث يمكنك استخدام StatefulStage
لإنشاء مكون خاص يقوم بالتحقق من الشرط ويقوم بإيقاف التيار إذا تم تحقيقه. ومع ذلك، يعتبر استخدام takeWhile
أكثر بساطة وفعالية في معظم الحالات.
المزيد من المعلومات
بالطبع! يمكننا أن نعمق قليلاً في كيفية استخدام takeWhile
بشكل أكثر تفصيلًا في سياق Akka Streams.
takeWhile
في Akka Streams يأخذ دالة تقوم بتقديم قيمة بوليانية (true
أو false
) بناءً على العنصر الحالي في تيار. إذا كانت القيمة true
، يستمر التدفق. وإذا كانت القيمة false
، يتوقف التدفق.
هناك بعض النقاط التي يجب مراعاتها عند استخدام takeWhile
:
-
النوع العام (Generic Type): يجب أن يكون النوع العام لـ
takeWhile
متوافقًا مع النوع العام للتدفق الأصلي. على سبيل المثال، إذا كان التدفق الأصلي من نوعSource[String, _]
، يجب أن تكون دالة الشرط من النوعString => Boolean
. -
التدفق الناتج (Output Stream): يجب أن تكون العملية التي تأتي بعد
takeWhile
جاهزة للتعامل مع توقف التدفق. على سبيل المثال، يمكنك استخدامrunForeach
للقيام بإجراء ما على كل عنصر في التدفق قبل التوقف. -
الإنهاء الصحيح (Proper Termination): يجب أن يتم إنهاء تطبيقك بشكل صحيح بعد انتهاء التدفق. يمكنك استخدام
onComplete
لتنظيف الموارد بعد انتهاء التدفق.
هذا مثال كامل على كيفية استخدام takeWhile
في سياق Akka Streams:
scalaimport akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() val condition: String => Boolean = line => line.contains("stop") val source = FileIO.fromPath(Paths.get("your-file.txt")) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)) .map(_.utf8String) val stream = source.takeWhile(!condition(_)) stream.runForeach(println) .onComplete(_ => { println("Stream completed.") system.terminate() }) }
في هذا المثال، سيتوقف التدفق عندما يتم العثور على سطر يحتوي على الكلمة “stop”، ثم يتم طباعة رسالة “Stream completed.” قبل إنهاء التطبيق بشكل صحيح.